Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Logfmt parser #4539

Merged
merged 22 commits into from
Aug 22, 2018
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions docs/DATA_FORMATS_INPUT.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ Telegraf is able to parse the following input data formats into metrics:
1. [Collectd](#collectd)
1. [Dropwizard](#dropwizard)
1. [Grok](#grok)
1. [Logfmt](#logfmt)

Telegraf metrics, like InfluxDB
[points](https://docs.influxdata.com/influxdb/v0.10/write_protocols/line/),
Expand Down Expand Up @@ -886,4 +887,25 @@ the file output will only print once per `flush_interval`.
- If successful, add the next token, update the pattern and retest.
- Continue one token at a time until the entire line is successfully parsed.

# Logfmt
This parser implements the logfmt format by extracting key-value pairs from log text in the form `<key>=<value>`.
At the moment, the plugin will produce one metric per line and all keys
are added as fields. Values are left as strings (for now).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Values are left as strings

The code below doesn't do this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Values are left as strings (for now).

This isn't true, we are doing auto conversion to other types.

```
[[inputs.exec]]

## Data format to consume.
## Each data format has its own unique set of configuration options, read
## more about them [here](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md).
data_format = "logfmt"
```

A typical log

method=GET host=influxdata.org ts=2018-07-24T19:43:40.275Z connect=4ms service=8ms status=200 bytes=1653
```
will be converted into
```
method=GET, host=influxdata.org, ts=2018-07-24T19:43:40.275Z, connect=4ms, service=8ms, status=200, bytes=1653,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Turn this into valid line protocol:

logfmt method="GET",host="influxdata.org",ts="2018-07-24T19:43:40.275Z",connect="4ms",service="8ms",status=200i,bytes=1653i

```
Additional information about the logfmt format can be found [here](https://brandur.org/logfmt).
102 changes: 102 additions & 0 deletions plugins/parsers/logfmt/parser.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package logfmt

import (
"bytes"
"fmt"
"log"
"strconv"
"time"

"github.com/go-logfmt/logfmt"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
)

var (
ErrNoMetric = fmt.Errorf("no metric in line")
)

// Parser decodes logfmt formatted messages into metrics.
type Parser struct {
MetricName string
DefaultTags map[string]string
Now func() time.Time
}

// NewParser creates a parser.
func NewParser(metricName string, defaultTags map[string]string) *Parser {
return &Parser{
MetricName: metricName,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if this is not set?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By default, doesn't telegraf use the plugin name of the input/processor that uses it?

DefaultTags: defaultTags,
Now: time.Now,
}
}

// Parse converts a slice of bytes in logfmt format to metrics.
func (p *Parser) Parse(b []byte) ([]telegraf.Metric, error) {
reader := bytes.NewReader(b)
decoder := logfmt.NewDecoder(reader)
metrics := make([]telegraf.Metric, 0)
for decoder.ScanRecord() {
tags := make(map[string]string)
fields := make(map[string]interface{})
for decoder.ScanKeyval() {
if string(decoder.Value()) == "" {
return metrics, fmt.Errorf("value could not be found for key: %v", string(decoder.Key()))
}

//type conversions
value := string(decoder.Value())
if iValue, err := strconv.ParseInt(value, 10, 64); err == nil {
fields[string(decoder.Key())] = iValue
} else if fValue, err := strconv.ParseFloat(value, 64); err == nil {
fields[string(decoder.Key())] = fValue
} else if bValue, err := strconv.ParseBool(value); err == nil {
fields[string(decoder.Key())] = bValue
} else {
fields[string(decoder.Key())] = value
}
}
m, err := metric.New(p.MetricName, tags, fields, p.Now())
if err != nil {
log.Println("Error occurred")
return nil, err
}

metrics = append(metrics, m)
}
p.applyDefaultTags(metrics)
return metrics, nil
}

// ParseLine converts a single line of text in logfmt format to metrics.
func (p *Parser) ParseLine(s string) (telegraf.Metric, error) {
metrics, err := p.Parse([]byte(s))
if err != nil {
return nil, err
}

if len(metrics) < 1 {
return nil, ErrNoMetric
}
return metrics[0], nil
}

// SetDefaultTags adds tags to the metrics outputs of Parse and ParseLine.
func (p *Parser) SetDefaultTags(tags map[string]string) {
p.DefaultTags = tags
}

func (p *Parser) applyDefaultTags(metrics []telegraf.Metric) {
if len(p.DefaultTags) == 0 {
return
}

for _, m := range metrics {
for k, v := range p.DefaultTags {
if !m.HasTag(k) {
m.AddTag(k, v)
}
}
}
}
208 changes: 208 additions & 0 deletions plugins/parsers/logfmt/parser_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
package logfmt

import (
"testing"
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
)

func MustMetric(t *testing.T, m *testutil.Metric) telegraf.Metric {
t.Helper()
v, err := metric.New(m.Measurement, m.Tags, m.Fields, m.Time)
if err != nil {
t.Fatal(err)
}
return v
}

func TestParse(t *testing.T) {
tests := []struct {
name string
measurement string
now func() time.Time
bytes []byte
want []testutil.Metric
wantErr bool
}{
{
name: "no bytes returns no metrics",
now: func() time.Time { return time.Unix(0, 0) },
want: []testutil.Metric{},
},
{
name: "test without trailing end",
bytes: []byte("foo=\"bar\""),
now: func() time.Time { return time.Unix(0, 0) },
measurement: "testlog",
want: []testutil.Metric{
testutil.Metric{
Measurement: "testlog",
Tags: map[string]string{},
Fields: map[string]interface{}{
"foo": "bar",
},
Time: time.Unix(0, 0),
},
},
},
{
name: "test with trailing end",
bytes: []byte("foo=\"bar\"\n"),
now: func() time.Time { return time.Unix(0, 0) },
measurement: "testlog",
want: []testutil.Metric{
testutil.Metric{
Measurement: "testlog",
Tags: map[string]string{},
Fields: map[string]interface{}{
"foo": "bar",
},
Time: time.Unix(0, 0),
},
},
},
{
name: "logfmt parser returns all the fields",
bytes: []byte(`ts=2018-07-24T19:43:40.275Z lvl=info msg="http request" method=POST`),
now: func() time.Time { return time.Unix(0, 0) },
measurement: "testlog",
want: []testutil.Metric{
testutil.Metric{
Measurement: "testlog",
Tags: map[string]string{},
Fields: map[string]interface{}{
"lvl": "info",
"msg": "http request",
"method": "POST",
"ts": "2018-07-24T19:43:40.275Z",
},
Time: time.Unix(0, 0),
},
},
},
{
name: "logfmt parser parses every line",
bytes: []byte("ts=2018-07-24T19:43:40.275Z lvl=info msg=\"http request\" method=POST\nparent_id=088876RL000 duration=7.45 log_id=09R4e4Rl000"),
now: func() time.Time { return time.Unix(0, 0) },
measurement: "testlog",
want: []testutil.Metric{
testutil.Metric{
Measurement: "testlog",
Tags: map[string]string{},
Fields: map[string]interface{}{
"lvl": "info",
"msg": "http request",
"method": "POST",
"ts": "2018-07-24T19:43:40.275Z",
},
Time: time.Unix(0, 0),
},
testutil.Metric{
Measurement: "testlog",
Tags: map[string]string{},
Fields: map[string]interface{}{
"parent_id": "088876RL000",
"duration": 7.45,
"log_id": "09R4e4Rl000",
},
Time: time.Unix(0, 0),
},
},
},
{
name: "poorly formatted logfmt returns error",
now: func() time.Time { return time.Unix(0, 0) },
bytes: []byte(`i am garbage data.`),
want: []testutil.Metric{},
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
l := Parser{
MetricName: tt.measurement,
Now: tt.now,
}
got, err := l.Parse(tt.bytes)
if (err != nil) != tt.wantErr {
t.Errorf("Logfmt.Parse error = %v, wantErr %v", err, tt.wantErr)
return
}
require.Equal(t, len(tt.want), len(got))
for i, m := range got {
testutil.MustEqual(t, m, tt.want[i])
}
})
}
}

func TestParseLine(t *testing.T) {
tests := []struct {
name string
s string
measurement string
now func() time.Time
want testutil.Metric
wantErr bool
}{
{
name: "No Metric In line",
now: func() time.Time { return time.Unix(0, 0) },
want: testutil.Metric{},
wantErr: true,
},
{
name: "Log parser fmt returns all fields",
now: func() time.Time { return time.Unix(0, 0) },
measurement: "testlog",
s: `ts=2018-07-24T19:43:35.207268Z lvl=5 msg="Write failed" log_id=09R4e4Rl000`,
want: testutil.Metric{
Measurement: "testlog",
Fields: map[string]interface{}{
"ts": "2018-07-24T19:43:35.207268Z",
"lvl": int64(5),
"msg": "Write failed",
"log_id": "09R4e4Rl000",
},
Tags: map[string]string{},
Time: time.Unix(0, 0),
},
},
{
name: "ParseLine only returns metrics from first string",
now: func() time.Time { return time.Unix(0, 0) },
measurement: "testlog",
s: "ts=2018-07-24T19:43:35.207268Z lvl=5 msg=\"Write failed\" log_id=09R4e4Rl000\nmethod=POST parent_id=088876RL000 duration=7.45 log_id=09R4e4Rl000",
want: testutil.Metric{
Measurement: "testlog",
Fields: map[string]interface{}{
"ts": "2018-07-24T19:43:35.207268Z",
"lvl": int64(5),
"msg": "Write failed",
"log_id": "09R4e4Rl000",
},
Tags: map[string]string{},
Time: time.Unix(0, 0),
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
l := Parser{
MetricName: tt.measurement,
Now: tt.now,
}
got, err := l.ParseLine(tt.s)
if (err != nil) != tt.wantErr {
t.Fatalf("Logfmt.Parse error = %v, wantErr %v", err, tt.wantErr)
}
if got != nil {
testutil.MustEqual(t, got, tt.want)
}
})
}
}
8 changes: 8 additions & 0 deletions plugins/parsers/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/influxdata/telegraf/plugins/parsers/grok"
"github.com/influxdata/telegraf/plugins/parsers/influx"
"github.com/influxdata/telegraf/plugins/parsers/json"
"github.com/influxdata/telegraf/plugins/parsers/logfmt"
"github.com/influxdata/telegraf/plugins/parsers/nagios"
"github.com/influxdata/telegraf/plugins/parsers/value"
)
Expand Down Expand Up @@ -139,6 +140,8 @@ func NewParser(config *Config) (Parser, error) {
config.GrokCustomPatterns,
config.GrokCustomPatternFiles,
config.GrokTimeZone)
case "logfmt":
parser, err = NewLogFmtParser(config.MetricName, config.DefaultTags)
default:
err = fmt.Errorf("Invalid data format: %s", config.DataFormat)
}
Expand Down Expand Up @@ -238,3 +241,8 @@ func NewDropwizardParser(
}
return parser, err
}

// NewLogFmtParser returns a logfmt parser with the default options.
func NewLogFmtParser(metricName string, defaultTags map[string]string) (Parser, error) {
return logfmt.NewParser(metricName, defaultTags), nil
}