diff --git a/docs/DATA_FORMATS_INPUT.md b/docs/DATA_FORMATS_INPUT.md index ded0170ec80d2..7f7c94930e1e6 100644 --- a/docs/DATA_FORMATS_INPUT.md +++ b/docs/DATA_FORMATS_INPUT.md @@ -10,6 +10,7 @@ Telegraf is able to parse the following input data formats into metrics: 1. [Collectd](#collectd) 1. [Dropwizard](#dropwizard) 1. [Grok](#grok) +1. [Logfmt](#logfmt) 1. [Wavefront](#wavefront) Telegraf metrics, like InfluxDB @@ -882,6 +883,22 @@ the file output will only print once per `flush_interval`. - If successful, add the next token, update the pattern and retest. - Continue one token at a time until the entire line is successfully parsed. +# Logfmt +This parser implements the logfmt format by extracting and converting key-value pairs from log text in the form `=`. +At the moment, the plugin will produce one metric per line and all keys +are added as fields. +A typical log +``` +method=GET host=influxdata.org ts=2018-07-24T19:43:40.275Z +connect=4ms service=8ms status=200 bytes=1653 +``` +will be converted into +``` +logfmt method="GET",host="influxdata.org",ts="2018-07-24T19:43:40.275Z",connect="4ms",service="8ms",status=200i,bytes=1653i + +``` +Additional information about the logfmt format can be found [here](https://brandur.org/logfmt). + # Wavefront: Wavefront Data Format is metrics are parsed directly into Telegraf metrics. diff --git a/plugins/parsers/logfmt/parser.go b/plugins/parsers/logfmt/parser.go new file mode 100644 index 0000000000000..603dbbae862b9 --- /dev/null +++ b/plugins/parsers/logfmt/parser.go @@ -0,0 +1,111 @@ +package logfmt + +import ( + "bytes" + "fmt" + "strconv" + "time" + + "github.com/go-logfmt/logfmt" + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/metric" +) + +var ( + ErrNoMetric = fmt.Errorf("no metric in line") +) + +// Parser decodes logfmt formatted messages into metrics. +type Parser struct { + MetricName string + DefaultTags map[string]string + Now func() time.Time +} + +// NewParser creates a parser. +func NewParser(metricName string, defaultTags map[string]string) *Parser { + return &Parser{ + MetricName: metricName, + DefaultTags: defaultTags, + Now: time.Now, + } +} + +// Parse converts a slice of bytes in logfmt format to metrics. +func (p *Parser) Parse(b []byte) ([]telegraf.Metric, error) { + reader := bytes.NewReader(b) + decoder := logfmt.NewDecoder(reader) + metrics := make([]telegraf.Metric, 0) + for { + ok := decoder.ScanRecord() + if !ok { + err := decoder.Err() + if err != nil { + return nil, err + } + break + } + fields := make(map[string]interface{}) + for decoder.ScanKeyval() { + if string(decoder.Value()) == "" { + continue + } + + //type conversions + value := string(decoder.Value()) + if iValue, err := strconv.ParseInt(value, 10, 64); err == nil { + fields[string(decoder.Key())] = iValue + } else if fValue, err := strconv.ParseFloat(value, 64); err == nil { + fields[string(decoder.Key())] = fValue + } else if bValue, err := strconv.ParseBool(value); err == nil { + fields[string(decoder.Key())] = bValue + } else { + fields[string(decoder.Key())] = value + } + } + if len(fields) == 0 { + continue + } + + m, err := metric.New(p.MetricName, map[string]string{}, fields, p.Now()) + if err != nil { + return nil, err + } + + metrics = append(metrics, m) + } + p.applyDefaultTags(metrics) + return metrics, nil +} + +// ParseLine converts a single line of text in logfmt format to metrics. +func (p *Parser) ParseLine(s string) (telegraf.Metric, error) { + metrics, err := p.Parse([]byte(s)) + if err != nil { + return nil, err + } + + if len(metrics) < 1 { + return nil, ErrNoMetric + } + return metrics[0], nil +} + +// SetDefaultTags adds tags to the metrics outputs of Parse and ParseLine. +func (p *Parser) SetDefaultTags(tags map[string]string) { + p.DefaultTags = tags +} + +func (p *Parser) applyDefaultTags(metrics []telegraf.Metric) { + if len(p.DefaultTags) == 0 { + return + } + + for _, m := range metrics { + for k, v := range p.DefaultTags { + if !m.HasTag(k) { + m.AddTag(k, v) + } + } + } +} diff --git a/plugins/parsers/logfmt/parser_test.go b/plugins/parsers/logfmt/parser_test.go new file mode 100644 index 0000000000000..c9096468467dc --- /dev/null +++ b/plugins/parsers/logfmt/parser_test.go @@ -0,0 +1,231 @@ +package logfmt + +import ( + "testing" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/metric" + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/require" +) + +func MustMetric(t *testing.T, m *testutil.Metric) telegraf.Metric { + t.Helper() + v, err := metric.New(m.Measurement, m.Tags, m.Fields, m.Time) + if err != nil { + t.Fatal(err) + } + return v +} + +func TestParse(t *testing.T) { + tests := []struct { + name string + measurement string + now func() time.Time + bytes []byte + want []testutil.Metric + wantErr bool + }{ + { + name: "no bytes returns no metrics", + now: func() time.Time { return time.Unix(0, 0) }, + want: []testutil.Metric{}, + }, + { + name: "test without trailing end", + bytes: []byte("foo=\"bar\""), + now: func() time.Time { return time.Unix(0, 0) }, + measurement: "testlog", + want: []testutil.Metric{ + testutil.Metric{ + Measurement: "testlog", + Tags: map[string]string{}, + Fields: map[string]interface{}{ + "foo": "bar", + }, + Time: time.Unix(0, 0), + }, + }, + }, + { + name: "test with trailing end", + bytes: []byte("foo=\"bar\"\n"), + now: func() time.Time { return time.Unix(0, 0) }, + measurement: "testlog", + want: []testutil.Metric{ + testutil.Metric{ + Measurement: "testlog", + Tags: map[string]string{}, + Fields: map[string]interface{}{ + "foo": "bar", + }, + Time: time.Unix(0, 0), + }, + }, + }, + { + name: "logfmt parser returns all the fields", + bytes: []byte(`ts=2018-07-24T19:43:40.275Z lvl=info msg="http request" method=POST`), + now: func() time.Time { return time.Unix(0, 0) }, + measurement: "testlog", + want: []testutil.Metric{ + testutil.Metric{ + Measurement: "testlog", + Tags: map[string]string{}, + Fields: map[string]interface{}{ + "lvl": "info", + "msg": "http request", + "method": "POST", + "ts": "2018-07-24T19:43:40.275Z", + }, + Time: time.Unix(0, 0), + }, + }, + }, + { + name: "logfmt parser parses every line", + bytes: []byte("ts=2018-07-24T19:43:40.275Z lvl=info msg=\"http request\" method=POST\nparent_id=088876RL000 duration=7.45 log_id=09R4e4Rl000"), + now: func() time.Time { return time.Unix(0, 0) }, + measurement: "testlog", + want: []testutil.Metric{ + testutil.Metric{ + Measurement: "testlog", + Tags: map[string]string{}, + Fields: map[string]interface{}{ + "lvl": "info", + "msg": "http request", + "method": "POST", + "ts": "2018-07-24T19:43:40.275Z", + }, + Time: time.Unix(0, 0), + }, + testutil.Metric{ + Measurement: "testlog", + Tags: map[string]string{}, + Fields: map[string]interface{}{ + "parent_id": "088876RL000", + "duration": 7.45, + "log_id": "09R4e4Rl000", + }, + Time: time.Unix(0, 0), + }, + }, + }, + { + name: "keys without = or values are ignored", + now: func() time.Time { return time.Unix(0, 0) }, + bytes: []byte(`i am no data.`), + want: []testutil.Metric{}, + wantErr: false, + }, + { + name: "keys without values are ignored", + now: func() time.Time { return time.Unix(0, 0) }, + bytes: []byte(`foo="" bar=`), + want: []testutil.Metric{}, + wantErr: false, + }, + { + name: "unterminated quote produces error", + now: func() time.Time { return time.Unix(0, 0) }, + measurement: "testlog", + bytes: []byte(`bar=baz foo="bar`), + want: []testutil.Metric{}, + wantErr: true, + }, + { + name: "malformed key", + now: func() time.Time { return time.Unix(0, 0) }, + measurement: "testlog", + bytes: []byte(`"foo=" bar=baz`), + want: []testutil.Metric{}, + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + l := Parser{ + MetricName: tt.measurement, + Now: tt.now, + } + got, err := l.Parse(tt.bytes) + if (err != nil) != tt.wantErr { + t.Errorf("Logfmt.Parse error = %v, wantErr %v", err, tt.wantErr) + return + } + require.Equal(t, len(tt.want), len(got)) + for i, m := range got { + testutil.MustEqual(t, m, tt.want[i]) + } + }) + } +} + +func TestParseLine(t *testing.T) { + tests := []struct { + name string + s string + measurement string + now func() time.Time + want testutil.Metric + wantErr bool + }{ + { + name: "No Metric In line", + now: func() time.Time { return time.Unix(0, 0) }, + want: testutil.Metric{}, + wantErr: true, + }, + { + name: "Log parser fmt returns all fields", + now: func() time.Time { return time.Unix(0, 0) }, + measurement: "testlog", + s: `ts=2018-07-24T19:43:35.207268Z lvl=5 msg="Write failed" log_id=09R4e4Rl000`, + want: testutil.Metric{ + Measurement: "testlog", + Fields: map[string]interface{}{ + "ts": "2018-07-24T19:43:35.207268Z", + "lvl": int64(5), + "msg": "Write failed", + "log_id": "09R4e4Rl000", + }, + Tags: map[string]string{}, + Time: time.Unix(0, 0), + }, + }, + { + name: "ParseLine only returns metrics from first string", + now: func() time.Time { return time.Unix(0, 0) }, + measurement: "testlog", + s: "ts=2018-07-24T19:43:35.207268Z lvl=5 msg=\"Write failed\" log_id=09R4e4Rl000\nmethod=POST parent_id=088876RL000 duration=7.45 log_id=09R4e4Rl000", + want: testutil.Metric{ + Measurement: "testlog", + Fields: map[string]interface{}{ + "ts": "2018-07-24T19:43:35.207268Z", + "lvl": int64(5), + "msg": "Write failed", + "log_id": "09R4e4Rl000", + }, + Tags: map[string]string{}, + Time: time.Unix(0, 0), + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + l := Parser{ + MetricName: tt.measurement, + Now: tt.now, + } + got, err := l.ParseLine(tt.s) + if (err != nil) != tt.wantErr { + t.Fatalf("Logfmt.Parse error = %v, wantErr %v", err, tt.wantErr) + } + if got != nil { + testutil.MustEqual(t, got, tt.want) + } + }) + } +} diff --git a/plugins/parsers/registry.go b/plugins/parsers/registry.go index 1e395047adc9b..e198cb2cb96c6 100644 --- a/plugins/parsers/registry.go +++ b/plugins/parsers/registry.go @@ -11,6 +11,7 @@ import ( "github.com/influxdata/telegraf/plugins/parsers/grok" "github.com/influxdata/telegraf/plugins/parsers/influx" "github.com/influxdata/telegraf/plugins/parsers/json" + "github.com/influxdata/telegraf/plugins/parsers/logfmt" "github.com/influxdata/telegraf/plugins/parsers/nagios" "github.com/influxdata/telegraf/plugins/parsers/value" "github.com/influxdata/telegraf/plugins/parsers/wavefront" @@ -142,6 +143,8 @@ func NewParser(config *Config) (Parser, error) { config.GrokCustomPatterns, config.GrokCustomPatternFiles, config.GrokTimeZone) + case "logfmt": + parser, err = NewLogFmtParser(config.MetricName, config.DefaultTags) default: err = fmt.Errorf("Invalid data format: %s", config.DataFormat) } @@ -242,6 +245,11 @@ func NewDropwizardParser( return parser, err } +// NewLogFmtParser returns a logfmt parser with the default options. +func NewLogFmtParser(metricName string, defaultTags map[string]string) (Parser, error) { + return logfmt.NewParser(metricName, defaultTags), nil +} + func NewWavefrontParser(defaultTags map[string]string) (Parser, error) { return wavefront.NewWavefrontParser(defaultTags), nil }