diff --git a/CHANGELOG.md b/CHANGELOG.md index f2bcf495be4d..3306913a15e1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,8 @@ ## Main +* [4736](https://github.com/grafana/loki/pull/4736) **sandeepsukhani**: allow applying retention at different interval than compaction +* [4744](https://github.com/grafana/loki/pull/4744) **cyriltovena**: Promtail: Adds GELF UDP support. + # 2.4.1 (2021/11/07) Release notes for 2.4.1 can be found on the [release notes page](https://grafana.com/docs/loki/latest/release-notes/v2-4/) @@ -9,7 +12,6 @@ Release notes for 2.4.1 can be found on the [release notes page](https://grafana * [4687](https://github.com/grafana/loki/pull/4687) **owen-d**: overrides checks for nil tenant limits on AllByUserID * [4683](https://github.com/grafana/loki/pull/4683) **owen-d**: Adds replication_factor doc to common config * [4681](https://github.com/grafana/loki/pull/4681) **slim-bean**: Loki: check new Read target when initializing boltdb-shipper store -* [4736](https://github.com/grafana/loki/pull/4736) **sandeepsukhani**: allow applying retention at different interval than compaction # 2.4.0 (2021/11/05) diff --git a/clients/cmd/promtail/promtail-gelf.yaml b/clients/cmd/promtail/promtail-gelf.yaml new file mode 100644 index 000000000000..65b25ca51211 --- /dev/null +++ b/clients/cmd/promtail/promtail-gelf.yaml @@ -0,0 +1,26 @@ +server: + http_listen_port: 9080 + grpc_listen_port: 0 + +clients: + - url: http://localhost:3100/loki/api/v1/push + +scrape_configs: +- job_name: gelf + gelf: + use_incoming_timestamp: false + labels: + job: gelf + relabel_configs: + - action: replace + source_labels: + - __gelf_message_level + target_label: level + - action: replace + source_labels: + - __gelf_message_host + target_label: host + - action: replace + source_labels: + - __gelf_message_facility + target_label: facility diff --git a/clients/pkg/promtail/scrapeconfig/scrapeconfig.go b/clients/pkg/promtail/scrapeconfig/scrapeconfig.go index 007c9c94f1b2..8b5789186648 100644 --- a/clients/pkg/promtail/scrapeconfig/scrapeconfig.go +++ b/clients/pkg/promtail/scrapeconfig/scrapeconfig.go @@ -41,6 +41,7 @@ type Config struct { PushConfig *PushTargetConfig `yaml:"loki_push_api,omitempty"` WindowsConfig *WindowsEventsTargetConfig `yaml:"windows_events,omitempty"` KafkaConfig *KafkaTargetConfig `yaml:"kafka,omitempty"` + GelfConfig *GelfTargetConfig `yaml:"gelf,omitempty"` RelabelConfigs []*relabel.Config `yaml:"relabel_configs,omitempty"` ServiceDiscoveryConfig ServiceDiscoveryConfig `yaml:",inline"` } @@ -295,6 +296,19 @@ type KafkaSASLConfig struct { TLSConfig promconfig.TLSConfig `yaml:",inline"` } +// GelfTargetConfig describes a scrape config that read GELF messages on UDP. +type GelfTargetConfig struct { + // ListenAddress is the address to listen on UDP for gelf messages. (Default to `:12201`) + ListenAddress string `yaml:"listen_address"` + + // Labels optionally holds labels to associate with each record read from gelf messages. + Labels model.LabelSet `yaml:"labels"` + + // UseIncomingTimestamp sets the timestamp to the incoming gelf messages + // timestamp if it's set. + UseIncomingTimestamp bool `yaml:"use_incoming_timestamp"` +} + // GcplogTargetConfig describes a scrape config to pull logs from any pubsub topic. type GcplogTargetConfig struct { // ProjectID is the Cloud project id diff --git a/clients/pkg/promtail/targets/gelf/gelftarget.go b/clients/pkg/promtail/targets/gelf/gelftarget.go new file mode 100644 index 000000000000..7295c3e5aecf --- /dev/null +++ b/clients/pkg/promtail/targets/gelf/gelftarget.go @@ -0,0 +1,198 @@ +package gelf + +import ( + "bytes" + "context" + "strings" + "sync" + "time" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/pkg/relabel" + "gopkg.in/Graylog2/go-gelf.v2/gelf" + + "github.com/grafana/loki/clients/pkg/promtail/api" + "github.com/grafana/loki/clients/pkg/promtail/scrapeconfig" + "github.com/grafana/loki/clients/pkg/promtail/targets/target" + + "github.com/grafana/loki/pkg/logproto" +) + +// SeverityLevels maps severity levels to severity string levels. +var SeverityLevels = map[int32]string{ + 0: "emergency", + 1: "alert", + 2: "critical", + 3: "error", + 4: "warning", + 5: "notice", + 6: "informational", + 7: "debug", +} + +// Target listens to gelf messages on udp. +type Target struct { + metrics *Metrics + logger log.Logger + handler api.EntryHandler + config *scrapeconfig.GelfTargetConfig + relabelConfig []*relabel.Config + gelfReader *gelf.Reader + encodeBuff *bytes.Buffer + wg sync.WaitGroup + + ctx context.Context + ctxCancel context.CancelFunc +} + +// NewTarget configures a new Gelf Target. +func NewTarget( + metrics *Metrics, + logger log.Logger, + handler api.EntryHandler, + relabel []*relabel.Config, + config *scrapeconfig.GelfTargetConfig, +) (*Target, error) { + + if config.ListenAddress == "" { + config.ListenAddress = ":12201" + } + + gelfReader, err := gelf.NewReader(config.ListenAddress) + if err != nil { + return nil, err + } + ctx, cancel := context.WithCancel(context.Background()) + + t := &Target{ + metrics: metrics, + logger: logger, + handler: handler, + config: config, + relabelConfig: relabel, + gelfReader: gelfReader, + encodeBuff: bytes.NewBuffer(make([]byte, 0, 1024)), + + ctx: ctx, + ctxCancel: cancel, + } + + t.run() + return t, err +} + +func (t *Target) run() { + t.wg.Add(1) + go func() { + defer t.wg.Done() + level.Info(t.logger).Log("msg", "listening for GELF UDP messages", "listen_address", t.config.ListenAddress) + for { + select { + case <-t.ctx.Done(): + level.Info(t.logger).Log("msg", "GELF UDP listener shutdown", "listen_address", t.config.ListenAddress) + return + default: + msg, err := t.gelfReader.ReadMessage() + if err != nil { + level.Error(t.logger).Log("msg", "error while reading gelf message", "listen_address", t.config.ListenAddress, "err", err) + t.metrics.gelfErrors.Inc() + continue + } + if msg != nil { + t.metrics.gelfEntries.Inc() + t.handleMessage(msg) + } + } + } + }() +} + +func (t *Target) handleMessage(msg *gelf.Message) { + lb := labels.NewBuilder(nil) + + // Add all labels from the config. + for k, v := range t.config.Labels { + lb.Set(string(k), string(v)) + } + lb.Set("__gelf_message_level", SeverityLevels[msg.Level]) + lb.Set("__gelf_message_host", msg.Host) + lb.Set("__gelf_message_version", msg.Version) + lb.Set("__gelf_message_facility", msg.Facility) + + processed := relabel.Process(lb.Labels(), t.relabelConfig...) + + filtered := make(model.LabelSet) + for _, lbl := range processed { + if strings.HasPrefix(lbl.Name, "__") { + continue + } + filtered[model.LabelName(lbl.Name)] = model.LabelValue(lbl.Value) + } + + var timestamp time.Time + if t.config.UseIncomingTimestamp && msg.TimeUnix != 0 { + // TimeUnix is the timestamp of the message, in seconds since the UNIX epoch with decimals for fractional seconds. + timestamp = secondsToUnixTimestamp(msg.TimeUnix) + } else { + timestamp = time.Now() + } + t.encodeBuff.Reset() + err := msg.MarshalJSONBuf(t.encodeBuff) + if err != nil { + level.Error(t.logger).Log("msg", "error while marshalling gelf message", "listen_address", t.config.ListenAddress, "err", err) + t.metrics.gelfErrors.Inc() + return + } + t.handler.Chan() <- api.Entry{ + Labels: filtered, + Entry: logproto.Entry{ + Timestamp: timestamp, + Line: t.encodeBuff.String(), + }, + } +} + +func secondsToUnixTimestamp(seconds float64) time.Time { + return time.Unix(0, int64(seconds*float64(time.Second))) +} + +// Type returns GelfTargetType. +func (t *Target) Type() target.TargetType { + return target.GelfTargetType +} + +// Ready indicates whether or not the gelf target is ready to be read from. +func (t *Target) Ready() bool { + return true +} + +// DiscoveredLabels returns the set of labels discovered by the gelf target, which +// is always nil. Implements Target. +func (t *Target) DiscoveredLabels() model.LabelSet { + return nil +} + +// Labels returns the set of labels that statically apply to all log entries +// produced by the GelfTarget. +func (t *Target) Labels() model.LabelSet { + return t.config.Labels +} + +// Details returns target-specific details. +func (t *Target) Details() interface{} { + return map[string]string{} +} + +// Stop shuts down the GelfTarget. +func (t *Target) Stop() { + level.Info(t.logger).Log("msg", "Shutting down GELF UDP listener", "listen_address", t.config.ListenAddress) + t.ctxCancel() + if err := t.gelfReader.Close(); err != nil { + level.Error(t.logger).Log("msg", "error while closing gelf reader", "err", err) + } + t.wg.Wait() + t.handler.Stop() +} diff --git a/clients/pkg/promtail/targets/gelf/gelftarget_test.go b/clients/pkg/promtail/targets/gelf/gelftarget_test.go new file mode 100644 index 000000000000..9059189045a1 --- /dev/null +++ b/clients/pkg/promtail/targets/gelf/gelftarget_test.go @@ -0,0 +1,106 @@ +package gelf + +import ( + "testing" + "time" + + "github.com/go-kit/log" + "github.com/grafana/loki/clients/pkg/promtail/client/fake" + "github.com/grafana/loki/clients/pkg/promtail/scrapeconfig" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/pkg/relabel" + "github.com/stretchr/testify/require" + "gopkg.in/Graylog2/go-gelf.v2/gelf" +) + +func Test_Gelf(t *testing.T) { + client := fake.New(func() {}) + + tm, err := NewTargetManager(NewMetrics(nil), log.NewNopLogger(), client, []scrapeconfig.Config{ + { + JobName: "gelf", + GelfConfig: &scrapeconfig.GelfTargetConfig{ + ListenAddress: ":12201", + UseIncomingTimestamp: true, + Labels: model.LabelSet{"cfg": "true"}, + }, + RelabelConfigs: []*relabel.Config{ + { + SourceLabels: model.LabelNames{"__gelf_message_level"}, + TargetLabel: "level", + Replacement: "$1", + Action: relabel.Replace, + Regex: relabel.MustNewRegexp("(.*)"), + }, + { + SourceLabels: model.LabelNames{"__gelf_message_host"}, + TargetLabel: "hostname", + Replacement: "$1", + Action: relabel.Replace, + Regex: relabel.MustNewRegexp("(.*)"), + }, + { + SourceLabels: model.LabelNames{"__gelf_message_facility"}, + TargetLabel: "facility", + Replacement: "$1", + Action: relabel.Replace, + Regex: relabel.MustNewRegexp("(.*)"), + }, + }, + }, + }) + require.NoError(t, err) + + w, err := gelf.NewUDPWriter(":12201") + require.NoError(t, err) + baseTs := float64(time.Unix(10, 0).Unix()) + 0.250 + ts := baseTs + + for i := 0; i < 10; i++ { + require.NoError(t, w.WriteMessage(&gelf.Message{ + Short: "short", + Full: "full", + Version: "2.2", + Host: "thishost", + TimeUnix: ts, + Level: gelf.LOG_ERR, + Facility: "gelftest", + Extra: map[string]interface{}{ + "_foo": "bar", + }, + })) + ts += 0.250 + } + + require.Eventually(t, func() bool { + return len(client.Received()) == 10 + }, 200*time.Millisecond, 20*time.Millisecond) + + for i, actual := range client.Received() { + require.Equal(t, "error", string(actual.Labels["level"])) + require.Equal(t, "true", string(actual.Labels["cfg"])) + require.Equal(t, "thishost", string(actual.Labels["hostname"])) + require.Equal(t, "gelftest", string(actual.Labels["facility"])) + + require.Equal(t, secondsToUnixTimestamp(baseTs+float64(i)*0.250), actual.Timestamp) + + var gelfMsg gelf.Message + + require.NoError(t, gelfMsg.UnmarshalJSON([]byte(actual.Line))) + + require.Equal(t, "short", gelfMsg.Short) + require.Equal(t, "full", gelfMsg.Full) + require.Equal(t, "2.2", gelfMsg.Version) + require.Equal(t, "thishost", gelfMsg.Host) + require.Equal(t, "bar", gelfMsg.Extra["_foo"]) + require.Equal(t, gelf.LOG_ERR, int(gelfMsg.Level)) + require.Equal(t, "gelftest", gelfMsg.Facility) + + } + + tm.Stop() +} + +func TestConvertTime(t *testing.T) { + require.Equal(t, time.Unix(0, int64(time.Second+(time.Duration(250)*time.Millisecond))), secondsToUnixTimestamp(float64(time.Unix(1, 0).Unix())+0.250)) +} diff --git a/clients/pkg/promtail/targets/gelf/gelftargetmanager.go b/clients/pkg/promtail/targets/gelf/gelftargetmanager.go new file mode 100644 index 000000000000..f9824d3152f0 --- /dev/null +++ b/clients/pkg/promtail/targets/gelf/gelftargetmanager.go @@ -0,0 +1,85 @@ +package gelf + +import ( + "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" + + "github.com/grafana/loki/clients/pkg/logentry/stages" + "github.com/grafana/loki/clients/pkg/promtail/api" + "github.com/grafana/loki/clients/pkg/promtail/scrapeconfig" + "github.com/grafana/loki/clients/pkg/promtail/targets/target" +) + +// TargetManager manages a series of Gelf Targets. +type TargetManager struct { + logger log.Logger + targets map[string]*Target +} + +// NewTargetManager creates a new Gelf TargetManager. +func NewTargetManager( + metrics *Metrics, + logger log.Logger, + client api.EntryHandler, + scrapeConfigs []scrapeconfig.Config, +) (*TargetManager, error) { + reg := metrics.reg + if reg == nil { + reg = prometheus.DefaultRegisterer + } + + tm := &TargetManager{ + logger: logger, + targets: make(map[string]*Target), + } + + for _, cfg := range scrapeConfigs { + pipeline, err := stages.NewPipeline(log.With(logger, "component", "gelf_pipeline"), cfg.PipelineStages, &cfg.JobName, reg) + if err != nil { + return nil, err + } + + t, err := NewTarget(metrics, logger, pipeline.Wrap(client), cfg.RelabelConfigs, cfg.GelfConfig) + if err != nil { + return nil, err + } + + tm.targets[cfg.JobName] = t + } + + return tm, nil +} + +// Ready returns true if at least one GelfTarget is also ready. +func (tm *TargetManager) Ready() bool { + for _, t := range tm.targets { + if t.Ready() { + return true + } + } + return false +} + +// Stop stops the GelfTargetManager and all of its GelfTargets. +func (tm *TargetManager) Stop() { + for _, t := range tm.targets { + t.Stop() + } +} + +// ActiveTargets returns the list of GelfTargets where Gelf data +// is being read. ActiveTargets is an alias to AllTargets as +// GelfTargets cannot be deactivated, only stopped. +func (tm *TargetManager) ActiveTargets() map[string][]target.Target { + return tm.AllTargets() +} + +// AllTargets returns the list of all targets where gelf data +// is currently being read. +func (tm *TargetManager) AllTargets() map[string][]target.Target { + result := make(map[string][]target.Target, len(tm.targets)) + for k, v := range tm.targets { + result[k] = []target.Target{v} + } + return result +} diff --git a/clients/pkg/promtail/targets/gelf/metrics.go b/clients/pkg/promtail/targets/gelf/metrics.go new file mode 100644 index 000000000000..4fbb7d6aacae --- /dev/null +++ b/clients/pkg/promtail/targets/gelf/metrics.go @@ -0,0 +1,38 @@ +package gelf + +import "github.com/prometheus/client_golang/prometheus" + +// Metrics holds a set of gelf metrics. +type Metrics struct { + reg prometheus.Registerer + + gelfEntries prometheus.Counter + gelfErrors prometheus.Counter +} + +// NewMetrics creates a new set of gelf metrics. If reg is non-nil, the +// metrics will be registered. +func NewMetrics(reg prometheus.Registerer) *Metrics { + var m Metrics + m.reg = reg + + m.gelfEntries = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: "promtail", + Name: "gelf_target_entries_total", + Help: "Total number of successful entries sent to the gelf target", + }) + m.gelfErrors = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: "promtail", + Name: "gelf_target_parsing_errors_total", + Help: "Total number of parsing errors while receiving gelf messages", + }) + + if reg != nil { + reg.MustRegister( + m.gelfEntries, + m.gelfErrors, + ) + } + + return &m +} diff --git a/clients/pkg/promtail/targets/manager.go b/clients/pkg/promtail/targets/manager.go index 3d3704eb9502..e5afec51faaf 100644 --- a/clients/pkg/promtail/targets/manager.go +++ b/clients/pkg/promtail/targets/manager.go @@ -14,6 +14,7 @@ import ( "github.com/grafana/loki/clients/pkg/promtail/scrapeconfig" "github.com/grafana/loki/clients/pkg/promtail/targets/file" "github.com/grafana/loki/clients/pkg/promtail/targets/gcplog" + "github.com/grafana/loki/clients/pkg/promtail/targets/gelf" "github.com/grafana/loki/clients/pkg/promtail/targets/journal" "github.com/grafana/loki/clients/pkg/promtail/targets/kafka" "github.com/grafana/loki/clients/pkg/promtail/targets/lokipush" @@ -30,7 +31,8 @@ const ( GcplogScrapeConfigs = "gcplogScrapeConfigs" PushScrapeConfigs = "pushScrapeConfigs" WindowsEventsConfigs = "windowsEventsConfigs" - KafkaConfigs = "KafkaConfigs" + KafkaConfigs = "kafkaConfigs" + GelfConfigs = "gelfConfigs" ) type targetManager interface { @@ -86,6 +88,8 @@ func NewTargetManagers( targetScrapeConfigs[WindowsEventsConfigs] = append(targetScrapeConfigs[WindowsEventsConfigs], cfg) case cfg.KafkaConfig != nil: targetScrapeConfigs[KafkaConfigs] = append(targetScrapeConfigs[KafkaConfigs], cfg) + case cfg.GelfConfig != nil: + targetScrapeConfigs[GelfConfigs] = append(targetScrapeConfigs[GelfConfigs], cfg) default: return nil, fmt.Errorf("no valid target scrape config defined for %q", cfg.JobName) } @@ -109,6 +113,7 @@ func NewTargetManagers( fileMetrics *file.Metrics syslogMetrics *syslog.Metrics gcplogMetrics *gcplog.Metrics + gelfMetrics *gelf.Metrics ) if len(targetScrapeConfigs[FileScrapeConfigs]) > 0 { fileMetrics = file.NewMetrics(reg) @@ -119,6 +124,9 @@ func NewTargetManagers( if len(targetScrapeConfigs[GcplogScrapeConfigs]) > 0 { gcplogMetrics = gcplog.NewMetrics(reg) } + if len(targetScrapeConfigs[GelfConfigs]) > 0 { + gelfMetrics = gelf.NewMetrics(reg) + } for target, scrapeConfigs := range targetScrapeConfigs { switch target { @@ -200,6 +208,12 @@ func NewTargetManagers( return nil, errors.Wrap(err, "failed to make kafka target manager") } targetManagers = append(targetManagers, kafkaTargetManager) + case GelfConfigs: + gelfTargetManager, err := gelf.NewTargetManager(gelfMetrics, logger, client, scrapeConfigs) + if err != nil { + return nil, errors.Wrap(err, "failed to make gelf target manager") + } + targetManagers = append(targetManagers, gelfTargetManager) default: return nil, errors.New("unknown scrape config") diff --git a/clients/pkg/promtail/targets/target/target.go b/clients/pkg/promtail/targets/target/target.go index e8e0e2c168ae..29511732801c 100644 --- a/clients/pkg/promtail/targets/target/target.go +++ b/clients/pkg/promtail/targets/target/target.go @@ -32,6 +32,9 @@ const ( // KafkaTargetType is a Kafka target KafkaTargetType = TargetType("Kafka") + + // GelfTargetType is a gelf target + GelfTargetType = TargetType("gelf") ) // Target is a promtail scrape target diff --git a/docs/sources/clients/promtail/configuration.md b/docs/sources/clients/promtail/configuration.md index 1c81c061cd25..c7a0c6770b29 100644 --- a/docs/sources/clients/promtail/configuration.md +++ b/docs/sources/clients/promtail/configuration.md @@ -325,6 +325,9 @@ job_name: # Describes how to fetch logs from Kafka via a Consumer group. [kafka: ] +# Describes how to receive logs from gelf client. +[gelf: ] + # Describes how to relabel targets to determine if they should # be processed. relabel_configs: @@ -932,25 +935,25 @@ By default, timestamps are assigned by Promtail when the message is read, if you authentication: # Type is authentication type. Supported values [none, ssl, sasl] [type: | default = "none"] - + # TLS configuration for authentication and encryption. It is used only when authentication type is ssl. tls_config: [ ] - + # SASL configuration for authentication. It is used only when authentication type is sasl. sasl_config: # SASL mechanism. Supported values [PLAIN, SCRAM-SHA-256, SCRAM-SHA-512] [mechanism: | default = "PLAIN"] - + # The user name to use for SASL authentication [user: ] - + # The password to use for SASL authentication [password: ] - - # If true, SASL authentication is executed over TLS + + # If true, SASL authentication is executed over TLS [use_tls: | default = false] - + # The CA file to use to verify the server [ca_file: ] @@ -961,7 +964,7 @@ authentication: # If true, ignores the server certificate being signed by an # unknown CA. [insecure_skip_verify: | default = false] - + # Label map to add to every log line read from kafka labels: @@ -972,7 +975,7 @@ labels: [use_incoming_timestamp: | default = false] ``` -#### Available Labels +**Available Labels:** The list of labels below are discovered when consuming kafka: @@ -983,6 +986,47 @@ The list of labels below are discovered when consuming kafka: To keep discovered labels to your logs use the [relabel_configs](#relabel_configs) section. +### GELF + +The `gelf` block configures a GELF UDP listener allowing users to push +logs to Promtail with the [GELF](https://docs.graylog.org/docs/gelf) protocol. +Currently only UDP is supported, please submit a feature request if you're interested into TCP support. + +> GELF messages can be sent uncompressed or compressed with either GZIP or ZLIB. + +Each GELF message received will be encoded in JSON as the log line. For example: + +```json +{"version":"1.1","host":"example.org","short_message":"A short message","timestamp":1231231123,"level":5,"_some_extra":"extra"} +``` + +You can leverage [pipeline stages](pipeline_stages) with the GELF target, +if for example, you want to parse the log line and extract more labels or change the log line format. + +```yaml +# UDP address to listen on. Has the format of "host:port". Default to 0.0.0.0:12201 +listen_address: + +# Label map to add to every log message. +labels: + [ : ... ] + +# Whether Promtail should pass on the timestamp from the incoming gelf message. +# When false, or if no timestamp is present on the gelf message, Promtail will assign the current timestamp to the log when it was processed. +# Default is false +use_incoming_timestamp: + +``` + +**Available Labels:** + +- `__gelf_message_level`: The GELF level as string. +- `__gelf_message_host`: The host sending the GELF message. +- `__gelf_message_version`: The GELF level message version set by the client. +- `__gelf_message_facility`: The GELF facility. + +To keep discovered labels to your logs use the [relabel_configs](#relabel_configs) section. + ### relabel_configs Relabeling is a powerful tool to dynamically rewrite the label set of a target diff --git a/docs/sources/clients/promtail/scraping.md b/docs/sources/clients/promtail/scraping.md index 3cc4e79cef5c..8b6f85055394 100644 --- a/docs/sources/clients/promtail/scraping.md +++ b/docs/sources/clients/promtail/scraping.md @@ -303,6 +303,34 @@ scrape_configs: Only the `brokers` and `topics` is required. see the [configuration](../../configuration/#kafka) section for more information. +## GELF + +Promtail supports listening message using the [GELF](https://docs.graylog.org/docs/gelf) UDP protocol. +The GELF targets can be configured using the `gelf` stanza: + +```yaml +scrape_configs: +- job_name: gelf + gelf: + listen_address: "0.0.0.0:12201" + use_incoming_timestamp: true + labels: + job: gelf + relabel_configs: + - action: replace + source_labels: + - __gelf_message_host + target_label: host + - action: replace + source_labels: + - __gelf_message_level + target_label: level + - action: replace + source_labels: + - __gelf_message_facility + target_label: facility +``` + ## Relabeling Each `scrape_configs` entry can contain a `relabel_configs` stanza. diff --git a/go.mod b/go.mod index 5db96ff1bd9d..fb211b5c96b0 100644 --- a/go.mod +++ b/go.mod @@ -102,7 +102,10 @@ require ( k8s.io/klog v1.0.0 ) -require github.com/xdg-go/scram v1.0.2 +require ( + github.com/xdg-go/scram v1.0.2 + gopkg.in/Graylog2/go-gelf.v2 v2.0.0-20191017102106-1550ee647df0 +) require ( cloud.google.com/go v0.94.1 // indirect @@ -305,3 +308,6 @@ replace github.com/bradfitz/gomemcache => github.com/themihai/gomemcache v0.0.0- // confused about which version is the latest one. v0.22.0 was released in July, but latest tag reachable from main // is v0.19.1. We pin version from late september here. Feel free to remove when updating to later version. replace github.com/thanos-io/thanos v0.22.0 => github.com/thanos-io/thanos v0.19.1-0.20210923155558-c15594a03c45 + +// We use a fork of Graylog to avoid leaking goroutine when closing the Promtail target. +replace gopkg.in/Graylog2/go-gelf.v2 => github.com/grafana/go-gelf v0.0.0-20211112153804-126646b86de8 diff --git a/go.sum b/go.sum index 380d46199bb9..8da10a5ada1f 100644 --- a/go.sum +++ b/go.sum @@ -1144,6 +1144,8 @@ github.com/grafana/dskit v0.0.0-20210818123532-6645f87e9e12/go.mod h1:QaNAQaCSFO github.com/grafana/dskit v0.0.0-20210819132858-471020752967/go.mod h1:uF46UNN1/feB1egpq8UGbBBKvJjGgZauW7pcVbeFLLM= github.com/grafana/dskit v0.0.0-20211021180445-3bd016e9d7f1 h1:Qf+/W3Tup0nO21tgJmO14WJK0yyrm4L2UJipZP+Zoow= github.com/grafana/dskit v0.0.0-20211021180445-3bd016e9d7f1/go.mod h1:uPG2nyK4CtgNDmWv7qyzYcdI+S90kHHRWvHnBtEMBXM= +github.com/grafana/go-gelf v0.0.0-20211112153804-126646b86de8 h1:aEOagXOTqtN9gd4jiDuP/5a81HdoJBqkVfn8WaxbsK4= +github.com/grafana/go-gelf v0.0.0-20211112153804-126646b86de8/go.mod h1:QAvS2C7TtQRhhv9Uf/sxD+BUhpkrPFm5jK/9MzUiDCY= github.com/grafana/gocql v0.0.0-20200605141915-ba5dc39ece85 h1:xLuzPoOzdfNb/RF/IENCw+oLVdZB4G21VPhkHBgwSHY= github.com/grafana/gocql v0.0.0-20200605141915-ba5dc39ece85/go.mod h1:crI9WX6p0IhrqB+DqIUHulRW853PaNFf7o4UprV//3I= github.com/grafana/tail v0.0.0-20201004203643-7aa4e4a91f03 h1:fGgFrAraMB0BaPfYumu+iulfDXwHm+GFyHA4xEtBqI8= diff --git a/vendor/gopkg.in/Graylog2/go-gelf.v2/LICENSE b/vendor/gopkg.in/Graylog2/go-gelf.v2/LICENSE new file mode 100644 index 000000000000..bc756ae36537 --- /dev/null +++ b/vendor/gopkg.in/Graylog2/go-gelf.v2/LICENSE @@ -0,0 +1,21 @@ +Copyright 2012 SocialCode + +Permission is hereby granted, free of charge, to any person obtaining +a copy of this software and associated documentation files (the +"Software"), to deal in the Software without restriction, including +without limitation the rights to use, copy, modify, merge, publish, +distribute, sublicense, and/or sell copies of the Software, and to +permit persons to whom the Software is furnished to do so, subject to +the following conditions: + +The above copyright notice and this permission notice shall be +included in all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE +LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION +WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + diff --git a/vendor/gopkg.in/Graylog2/go-gelf.v2/gelf/message.go b/vendor/gopkg.in/Graylog2/go-gelf.v2/gelf/message.go new file mode 100644 index 000000000000..fcb182f9122e --- /dev/null +++ b/vendor/gopkg.in/Graylog2/go-gelf.v2/gelf/message.go @@ -0,0 +1,154 @@ +package gelf + +import ( + "bytes" + "encoding/json" + "fmt" + "time" +) + +// Message represents the contents of the GELF message. It is gzipped +// before sending. +type Message struct { + Version string `json:"version"` + Host string `json:"host"` + Short string `json:"short_message"` + Full string `json:"full_message,omitempty"` + TimeUnix float64 `json:"timestamp"` + Level int32 `json:"level,omitempty"` + Facility string `json:"facility,omitempty"` + Extra map[string]interface{} `json:"-"` + RawExtra json.RawMessage `json:"-"` +} + +// Syslog severity levels +const ( + LOG_EMERG = 0 + LOG_ALERT = 1 + LOG_CRIT = 2 + LOG_ERR = 3 + LOG_WARNING = 4 + LOG_NOTICE = 5 + LOG_INFO = 6 + LOG_DEBUG = 7 +) + +func (m *Message) MarshalJSONBuf(buf *bytes.Buffer) error { + b, err := json.Marshal(m) + if err != nil { + return err + } + // write up until the final } + if _, err = buf.Write(b[:len(b)-1]); err != nil { + return err + } + if len(m.Extra) > 0 { + eb, err := json.Marshal(m.Extra) + if err != nil { + return err + } + // merge serialized message + serialized extra map + if err = buf.WriteByte(','); err != nil { + return err + } + // write serialized extra bytes, without enclosing quotes + if _, err = buf.Write(eb[1 : len(eb)-1]); err != nil { + return err + } + } + + if len(m.RawExtra) > 0 { + if err := buf.WriteByte(','); err != nil { + return err + } + + // write serialized extra bytes, without enclosing quotes + if _, err = buf.Write(m.RawExtra[1 : len(m.RawExtra)-1]); err != nil { + return err + } + } + + // write final closing quotes + return buf.WriteByte('}') +} + +func (m *Message) UnmarshalJSON(data []byte) error { + i := make(map[string]interface{}, 16) + if err := json.Unmarshal(data, &i); err != nil { + return err + } + for k, v := range i { + if k[0] == '_' { + if m.Extra == nil { + m.Extra = make(map[string]interface{}, 1) + } + m.Extra[k] = v + continue + } + + ok := true + switch k { + case "version": + m.Version, ok = v.(string) + case "host": + m.Host, ok = v.(string) + case "short_message": + m.Short, ok = v.(string) + case "full_message": + m.Full, ok = v.(string) + case "timestamp": + m.TimeUnix, ok = v.(float64) + case "level": + var level float64 + level, ok = v.(float64) + m.Level = int32(level) + case "facility": + m.Facility, ok = v.(string) + } + + if !ok { + return fmt.Errorf("invalid type for field %s", k) + } + } + return nil +} + +func (m *Message) toBytes(buf *bytes.Buffer) (messageBytes []byte, err error) { + if err = m.MarshalJSONBuf(buf); err != nil { + return nil, err + } + messageBytes = buf.Bytes() + return messageBytes, nil +} + +func constructMessage(p []byte, hostname string, facility string, file string, line int) (m *Message) { + // remove trailing and leading whitespace + p = bytes.TrimSpace(p) + + // If there are newlines in the message, use the first line + // for the short message and set the full message to the + // original input. If the input has no newlines, stick the + // whole thing in Short. + short := p + full := []byte("") + if i := bytes.IndexRune(p, '\n'); i > 0 { + short = p[:i] + full = p + } + + m = &Message{ + Version: "1.1", + Host: hostname, + Short: string(short), + Full: string(full), + TimeUnix: float64(time.Now().UnixNano()) / float64(time.Second), + Level: 6, // info + Facility: facility, + Extra: map[string]interface{}{ + "_file": file, + "_line": line, + }, + } + + return m +} diff --git a/vendor/gopkg.in/Graylog2/go-gelf.v2/gelf/reader.go b/vendor/gopkg.in/Graylog2/go-gelf.v2/gelf/reader.go new file mode 100644 index 000000000000..83df304e83aa --- /dev/null +++ b/vendor/gopkg.in/Graylog2/go-gelf.v2/gelf/reader.go @@ -0,0 +1,144 @@ +// Copyright 2012 SocialCode. All rights reserved. +// Use of this source code is governed by the MIT +// license that can be found in the LICENSE file. + +package gelf + +import ( + "bytes" + "compress/gzip" + "compress/zlib" + "encoding/json" + "fmt" + "io" + "net" + "strings" + "sync" +) + +type Reader struct { + mu sync.Mutex + conn net.Conn +} + +func NewReader(addr string) (*Reader, error) { + var err error + udpAddr, err := net.ResolveUDPAddr("udp", addr) + if err != nil { + return nil, fmt.Errorf("ResolveUDPAddr('%s'): %s", addr, err) + } + + conn, err := net.ListenUDP("udp", udpAddr) + if err != nil { + return nil, fmt.Errorf("ListenUDP: %s", err) + } + + r := new(Reader) + r.conn = conn + return r, nil +} + +func (r *Reader) Addr() string { + return r.conn.LocalAddr().String() +} + +// FIXME: this will discard data if p isn't big enough to hold the +// full message. +func (r *Reader) Read(p []byte) (int, error) { + msg, err := r.ReadMessage() + if err != nil { + return -1, err + } + + var data string + + if msg.Full == "" { + data = msg.Short + } else { + data = msg.Full + } + + return strings.NewReader(data).Read(p) +} + +func (r *Reader) ReadMessage() (*Message, error) { + cBuf := make([]byte, ChunkSize) + var ( + err error + n, length int + cid, ocid []byte + seq, total uint8 + cHead []byte + cReader io.Reader + chunks [][]byte + ) + + for got := 0; got < 128 && (total == 0 || got < int(total)); got++ { + if n, err = r.conn.Read(cBuf); err != nil { + return nil, fmt.Errorf("Read: %s", err) + } + cHead, cBuf = cBuf[:2], cBuf[:n] + + if bytes.Equal(cHead, magicChunked) { + // fmt.Printf("chunked %v\n", cBuf[:14]) + cid, seq, total = cBuf[2:2+8], cBuf[2+8], cBuf[2+8+1] + if ocid != nil && !bytes.Equal(cid, ocid) { + return nil, fmt.Errorf("out-of-band message %v (awaited %v)", cid, ocid) + } else if ocid == nil { + ocid = cid + chunks = make([][]byte, total) + } + n = len(cBuf) - chunkedHeaderLen + // fmt.Printf("setting chunks[%d]: %d\n", seq, n) + chunks[seq] = append(make([]byte, 0, n), cBuf[chunkedHeaderLen:]...) + length += n + } else { // not chunked + if total > 0 { + return nil, fmt.Errorf("out-of-band message (not chunked)") + } + break + } + } + // fmt.Printf("\nchunks: %v\n", chunks) + + if length > 0 { + if cap(cBuf) < length { + cBuf = append(cBuf, make([]byte, 0, length-cap(cBuf))...) + } + cBuf = cBuf[:0] + for i := range chunks { + // fmt.Printf("appending %d %v\n", i, chunks[i]) + cBuf = append(cBuf, chunks[i]...) + } + cHead = cBuf[:2] + } + + // the data we get from the wire is compressed + if bytes.Equal(cHead, magicGzip) { + cReader, err = gzip.NewReader(bytes.NewReader(cBuf)) + } else if cHead[0] == magicZlib[0] && + (int(cHead[0])*256+int(cHead[1]))%31 == 0 { + // zlib is slightly more complicated, but correct + cReader, err = zlib.NewReader(bytes.NewReader(cBuf)) + } else { + // compliance with https://github.com/Graylog2/graylog2-server + // treating all messages as uncompressed if they are not gzip, zlib or + // chunked + cReader = bytes.NewReader(cBuf) + } + + if err != nil { + return nil, fmt.Errorf("NewReader: %s", err) + } + + msg := new(Message) + if err := json.NewDecoder(cReader).Decode(&msg); err != nil { + return nil, fmt.Errorf("json.Unmarshal: %s", err) + } + + return msg, nil +} + +func (r *Reader) Close() error { + return r.conn.Close() +} diff --git a/vendor/gopkg.in/Graylog2/go-gelf.v2/gelf/tcpreader.go b/vendor/gopkg.in/Graylog2/go-gelf.v2/gelf/tcpreader.go new file mode 100644 index 000000000000..74255ec3bea9 --- /dev/null +++ b/vendor/gopkg.in/Graylog2/go-gelf.v2/gelf/tcpreader.go @@ -0,0 +1,156 @@ +package gelf + +import ( + "bufio" + "encoding/json" + "fmt" + "net" + "time" +) + +type TCPReader struct { + listener *net.TCPListener + conn net.Conn + messages chan []byte +} + +type connChannels struct { + drop chan string + confirm chan string +} + +func newTCPReader(addr string) (*TCPReader, chan string, chan string, error) { + var err error + tcpAddr, err := net.ResolveTCPAddr("tcp", addr) + if err != nil { + return nil, nil, nil, fmt.Errorf("ResolveTCPAddr('%s'): %s", addr, err) + } + + listener, err := net.ListenTCP("tcp", tcpAddr) + if err != nil { + return nil, nil, nil, fmt.Errorf("ListenTCP: %s", err) + } + + r := &TCPReader{ + listener: listener, + messages: make(chan []byte, 100), // Make a buffered channel with at most 100 messages + } + + closeSignal := make(chan string, 1) + doneSignal := make(chan string, 1) + + go r.listenUntilCloseSignal(closeSignal, doneSignal) + + return r, closeSignal, doneSignal, nil +} + +func (r *TCPReader) accepter(connections chan net.Conn) { + for { + conn, err := r.listener.Accept() + if err != nil { + break + } + connections <- conn + } +} + +func (r *TCPReader) listenUntilCloseSignal(closeSignal chan string, doneSignal chan string) { + defer func() { doneSignal <- "done" }() + defer r.listener.Close() + var conns []connChannels + connectionsChannel := make(chan net.Conn, 1) + go r.accepter(connectionsChannel) + for { + select { + case conn := <-connectionsChannel: + dropSignal := make(chan string, 1) + dropConfirm := make(chan string, 1) + channels := connChannels{drop: dropSignal, confirm: dropConfirm} + go handleConnection(conn, r.messages, dropSignal, dropConfirm) + conns = append(conns, channels) + default: + } + + select { + case sig := <-closeSignal: + if sig == "stop" || sig == "drop" { + if len(conns) >= 1 { + for _, s := range conns { + if s.drop != nil { + s.drop <- "drop" + <-s.confirm + conns = append(conns[:0], conns[1:]...) + } + } + if sig == "stop" { + return + } + } else if sig == "stop" { + closeSignal <- "stop" + } + if sig == "drop" { + doneSignal <- "done" + } + } + default: + } + } +} + +func (r *TCPReader) addr() string { + return r.listener.Addr().String() +} + +func handleConnection(conn net.Conn, messages chan<- []byte, dropSignal chan string, dropConfirm chan string) { + defer func() { dropConfirm <- "done" }() + defer conn.Close() + reader := bufio.NewReader(conn) + + var b []byte + var err error + drop := false + canDrop := false + + for { + conn.SetDeadline(time.Now().Add(2 * time.Second)) + if b, err = reader.ReadBytes(0); err != nil { + if drop { + return + } + } else if len(b) > 0 { + messages <- b + canDrop = true + if drop { + return + } + } else if drop { + return + } + select { + case sig := <-dropSignal: + if sig == "drop" { + drop = true + time.Sleep(1 * time.Second) + if canDrop { + return + } + } + default: + } + } +} + +func (r *TCPReader) readMessage() (*Message, error) { + b := <-r.messages + + var msg Message + if err := json.Unmarshal(b[:len(b)-1], &msg); err != nil { + return nil, fmt.Errorf("json.Unmarshal: %s", err) + } + + return &msg, nil +} + +func (r *TCPReader) Close() { + r.listener.Close() +} diff --git a/vendor/gopkg.in/Graylog2/go-gelf.v2/gelf/tcpwriter.go b/vendor/gopkg.in/Graylog2/go-gelf.v2/gelf/tcpwriter.go new file mode 100644 index 000000000000..5a637ff19b8a --- /dev/null +++ b/vendor/gopkg.in/Graylog2/go-gelf.v2/gelf/tcpwriter.go @@ -0,0 +1,107 @@ +package gelf + +import ( + "fmt" + "net" + "os" + "sync" + "time" +) + +const ( + DefaultMaxReconnect = 3 + DefaultReconnectDelay = 1 +) + +type TCPWriter struct { + GelfWriter + mu sync.Mutex + MaxReconnect int + ReconnectDelay time.Duration +} + +func NewTCPWriter(addr string) (*TCPWriter, error) { + var err error + w := new(TCPWriter) + w.MaxReconnect = DefaultMaxReconnect + w.ReconnectDelay = DefaultReconnectDelay + w.proto = "tcp" + w.addr = addr + + if w.conn, err = net.Dial("tcp", addr); err != nil { + return nil, err + } + if w.hostname, err = os.Hostname(); err != nil { + return nil, err + } + + return w, nil +} + +// WriteMessage sends the specified message to the GELF server +// specified in the call to New(). It assumes all the fields are +// filled out appropriately. In general, clients will want to use +// Write, rather than WriteMessage. +func (w *TCPWriter) WriteMessage(m *Message) (err error) { + buf := newBuffer() + defer bufPool.Put(buf) + messageBytes, err := m.toBytes(buf) + if err != nil { + return err + } + + messageBytes = append(messageBytes, 0) + + n, err := w.writeToSocketWithReconnectAttempts(messageBytes) + if err != nil { + return err + } + if n != len(messageBytes) { + return fmt.Errorf("bad write (%d/%d)", n, len(messageBytes)) + } + + return nil +} + +func (w *TCPWriter) Write(p []byte) (n int, err error) { + file, line := getCallerIgnoringLogMulti(1) + + m := constructMessage(p, w.hostname, w.Facility, file, line) + + if err = w.WriteMessage(m); err != nil { + return 0, err + } + + return len(p), nil +} + +func (w *TCPWriter) writeToSocketWithReconnectAttempts(zBytes []byte) (n int, err error) { + var errConn error + var i int + + w.mu.Lock() + for i = 0; i <= w.MaxReconnect; i++ { + errConn = nil + + if w.conn != nil { + n, err = w.conn.Write(zBytes) + } else { + err = fmt.Errorf("Connection was nil, will attempt reconnect") + } + if err != nil { + time.Sleep(w.ReconnectDelay * time.Second) + w.conn, errConn = net.Dial("tcp", w.addr) + } else { + break + } + } + w.mu.Unlock() + + if i > w.MaxReconnect { + return 0, fmt.Errorf("Maximum reconnection attempts was reached; giving up") + } + if errConn != nil { + return 0, fmt.Errorf("Write Failed: %s\nReconnection failed: %s", err, errConn) + } + return n, nil +} diff --git a/vendor/gopkg.in/Graylog2/go-gelf.v2/gelf/udpwriter.go b/vendor/gopkg.in/Graylog2/go-gelf.v2/gelf/udpwriter.go new file mode 100644 index 000000000000..23bbd5e510eb --- /dev/null +++ b/vendor/gopkg.in/Graylog2/go-gelf.v2/gelf/udpwriter.go @@ -0,0 +1,231 @@ +// Copyright 2012 SocialCode. All rights reserved. +// Use of this source code is governed by the MIT +// license that can be found in the LICENSE file. + +package gelf + +import ( + "bytes" + "compress/flate" + "compress/gzip" + "compress/zlib" + "crypto/rand" + "fmt" + "io" + "net" + "os" + "path" + "sync" +) + +type UDPWriter struct { + GelfWriter + CompressionLevel int // one of the consts from compress/flate + CompressionType CompressType +} + +// What compression type the writer should use when sending messages +// to the graylog2 server +type CompressType int + +const ( + CompressGzip CompressType = iota + CompressZlib + CompressNone +) + +// Used to control GELF chunking. Should be less than (MTU - len(UDP +// header)). +// +// TODO: generate dynamically using Path MTU Discovery? +const ( + ChunkSize = 1420 + chunkedHeaderLen = 12 + chunkedDataLen = ChunkSize - chunkedHeaderLen +) + +var ( + magicChunked = []byte{0x1e, 0x0f} + magicZlib = []byte{0x78} + magicGzip = []byte{0x1f, 0x8b} +) + +// numChunks returns the number of GELF chunks necessary to transmit +// the given compressed buffer. +func numChunks(b []byte) int { + lenB := len(b) + if lenB <= ChunkSize { + return 1 + } + return len(b)/chunkedDataLen + 1 +} + +// New returns a new GELF Writer. This writer can be used to send the +// output of the standard Go log functions to a central GELF server by +// passing it to log.SetOutput() +func NewUDPWriter(addr string) (*UDPWriter, error) { + var err error + w := new(UDPWriter) + w.CompressionLevel = flate.BestSpeed + + if w.conn, err = net.Dial("udp", addr); err != nil { + return nil, err + } + if w.hostname, err = os.Hostname(); err != nil { + return nil, err + } + + w.Facility = path.Base(os.Args[0]) + + return w, nil +} + +// writes the gzip compressed byte array to the connection as a series +// of GELF chunked messages. The format is documented at +// http://docs.graylog.org/en/2.1/pages/gelf.html as: +// +// 2-byte magic (0x1e 0x0f), 8 byte id, 1 byte sequence id, 1 byte +// total, chunk-data +func (w *GelfWriter) writeChunked(zBytes []byte) (err error) { + b := make([]byte, 0, ChunkSize) + buf := bytes.NewBuffer(b) + nChunksI := numChunks(zBytes) + if nChunksI > 128 { + return fmt.Errorf("msg too large, would need %d chunks", nChunksI) + } + nChunks := uint8(nChunksI) + // use urandom to get a unique message id + msgId := make([]byte, 8) + n, err := io.ReadFull(rand.Reader, msgId) + if err != nil || n != 8 { + return fmt.Errorf("rand.Reader: %d/%s", n, err) + } + + bytesLeft := len(zBytes) + for i := uint8(0); i < nChunks; i++ { + buf.Reset() + // manually write header. Don't care about + // host/network byte order, because the spec only + // deals in individual bytes. + buf.Write(magicChunked) //magic + buf.Write(msgId) + buf.WriteByte(i) + buf.WriteByte(nChunks) + // slice out our chunk from zBytes + chunkLen := chunkedDataLen + if chunkLen > bytesLeft { + chunkLen = bytesLeft + } + off := int(i) * chunkedDataLen + chunk := zBytes[off : off+chunkLen] + buf.Write(chunk) + + // write this chunk, and make sure the write was good + n, err := w.conn.Write(buf.Bytes()) + if err != nil { + return fmt.Errorf("Write (chunk %d/%d): %s", i, + nChunks, err) + } + if n != len(buf.Bytes()) { + return fmt.Errorf("Write len: (chunk %d/%d) (%d/%d)", + i, nChunks, n, len(buf.Bytes())) + } + + bytesLeft -= chunkLen + } + + if bytesLeft != 0 { + return fmt.Errorf("error: %d bytes left after sending", bytesLeft) + } + return nil +} + +// 1k bytes buffer by default +var bufPool = sync.Pool{ + New: func() interface{} { + return bytes.NewBuffer(make([]byte, 0, 1024)) + }, +} + +func newBuffer() *bytes.Buffer { + b := bufPool.Get().(*bytes.Buffer) + if b != nil { + b.Reset() + return b + } + return bytes.NewBuffer(nil) +} + +// WriteMessage sends the specified message to the GELF server +// specified in the call to New(). It assumes all the fields are +// filled out appropriately. In general, clients will want to use +// Write, rather than WriteMessage. +func (w *UDPWriter) WriteMessage(m *Message) (err error) { + mBuf := newBuffer() + defer bufPool.Put(mBuf) + if err = m.MarshalJSONBuf(mBuf); err != nil { + return err + } + mBytes := mBuf.Bytes() + + var ( + zBuf *bytes.Buffer + zBytes []byte + ) + + var zw io.WriteCloser + switch w.CompressionType { + case CompressGzip: + zBuf = newBuffer() + defer bufPool.Put(zBuf) + zw, err = gzip.NewWriterLevel(zBuf, w.CompressionLevel) + case CompressZlib: + zBuf = newBuffer() + defer bufPool.Put(zBuf) + zw, err = zlib.NewWriterLevel(zBuf, w.CompressionLevel) + case CompressNone: + zBytes = mBytes + default: + panic(fmt.Sprintf("unknown compression type %d", + w.CompressionType)) + } + if zw != nil { + if err != nil { + return + } + if _, err = zw.Write(mBytes); err != nil { + zw.Close() + return + } + zw.Close() + zBytes = zBuf.Bytes() + } + + if numChunks(zBytes) > 1 { + return w.writeChunked(zBytes) + } + n, err := w.conn.Write(zBytes) + if err != nil { + return + } + if n != len(zBytes) { + return fmt.Errorf("bad write (%d/%d)", n, len(zBytes)) + } + + return nil +} + +// Write encodes the given string in a GELF message and sends it to +// the server specified in New(). +func (w *UDPWriter) Write(p []byte) (n int, err error) { + // 1 for the function that called us. + file, line := getCallerIgnoringLogMulti(1) + + m := constructMessage(p, w.hostname, w.Facility, file, line) + + if err = w.WriteMessage(m); err != nil { + return 0, err + } + + return len(p), nil +} diff --git a/vendor/gopkg.in/Graylog2/go-gelf.v2/gelf/utils.go b/vendor/gopkg.in/Graylog2/go-gelf.v2/gelf/utils.go new file mode 100644 index 000000000000..6f1c9f7c5f54 --- /dev/null +++ b/vendor/gopkg.in/Graylog2/go-gelf.v2/gelf/utils.go @@ -0,0 +1,41 @@ +package gelf + +import ( + "runtime" + "strings" +) + +// getCaller returns the filename and the line info of a function +// further down in the call stack. Passing 0 in as callDepth would +// return info on the function calling getCallerIgnoringLog, 1 the +// parent function, and so on. Any suffixes passed to getCaller are +// path fragments like "/pkg/log/log.go", and functions in the call +// stack from that file are ignored. +func getCaller(callDepth int, suffixesToIgnore ...string) (file string, line int) { + // bump by 1 to ignore the getCaller (this) stackframe + callDepth++ +outer: + for { + var ok bool + _, file, line, ok = runtime.Caller(callDepth) + if !ok { + file = "???" + line = 0 + break + } + + for _, s := range suffixesToIgnore { + if strings.HasSuffix(file, s) { + callDepth++ + continue outer + } + } + break + } + return +} + +func getCallerIgnoringLogMulti(callDepth int) (string, int) { + // the +1 is to ignore this (getCallerIgnoringLogMulti) frame + return getCaller(callDepth+1, "/pkg/log/log.go", "/pkg/io/multi.go") +} diff --git a/vendor/gopkg.in/Graylog2/go-gelf.v2/gelf/writer.go b/vendor/gopkg.in/Graylog2/go-gelf.v2/gelf/writer.go new file mode 100644 index 000000000000..153be2c340d2 --- /dev/null +++ b/vendor/gopkg.in/Graylog2/go-gelf.v2/gelf/writer.go @@ -0,0 +1,34 @@ +// Copyright 2012 SocialCode. All rights reserved. +// Use of this source code is governed by the MIT +// license that can be found in the LICENSE file. + +package gelf + +import ( + "net" +) + +type Writer interface { + Close() error + Write([]byte) (int, error) + WriteMessage(*Message) error +} + +// Writer implements io.Writer and is used to send both discrete +// messages to a graylog2 server, or data from a stream-oriented +// interface (like the functions in log). +type GelfWriter struct { + addr string + conn net.Conn + hostname string + Facility string // defaults to current process name + proto string +} + +// Close connection and interrupt blocked Read or Write operations +func (w *GelfWriter) Close() error { + if w.conn == nil { + return nil + } + return w.conn.Close() +} diff --git a/vendor/modules.txt b/vendor/modules.txt index bc2383a05dc1..7292ca8056f4 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1535,6 +1535,9 @@ google.golang.org/protobuf/types/known/emptypb google.golang.org/protobuf/types/known/fieldmaskpb google.golang.org/protobuf/types/known/timestamppb google.golang.org/protobuf/types/known/wrapperspb +# gopkg.in/Graylog2/go-gelf.v2 v2.0.0-20191017102106-1550ee647df0 => github.com/grafana/go-gelf v0.0.0-20211112153804-126646b86de8 +## explicit +gopkg.in/Graylog2/go-gelf.v2/gelf # gopkg.in/alecthomas/kingpin.v2 v2.2.6 ## explicit gopkg.in/alecthomas/kingpin.v2 @@ -1795,3 +1798,4 @@ sigs.k8s.io/yaml # github.com/hashicorp/consul => github.com/hashicorp/consul v1.5.1 # github.com/gocql/gocql => github.com/grafana/gocql v0.0.0-20200605141915-ba5dc39ece85 # github.com/bradfitz/gomemcache => github.com/themihai/gomemcache v0.0.0-20180902122335-24332e2d58ab +# gopkg.in/Graylog2/go-gelf.v2 => github.com/grafana/go-gelf v0.0.0-20211112153804-126646b86de8