From 8f558434bc9ebe9ac87d7c34c6d802aafb697643 Mon Sep 17 00:00:00 2001 From: Stephen Sorriaux Date: Fri, 22 Feb 2019 17:40:46 +0100 Subject: [PATCH] Add support for time precision when using influxdb as output This allows to set the time precision (s, ms, us, ns) when sending metrics to InfluxDB. Previously, only ns was possible. --- plugins/outputs/influxdb/README.md | 4 ++++ plugins/outputs/influxdb/http.go | 9 ++++++-- plugins/outputs/influxdb/http_test.go | 12 +++++++++++ plugins/outputs/influxdb/influxdb.go | 11 ++++++++++ plugins/serializers/influx/influx.go | 26 ++++++++++++++++++----- plugins/serializers/influx/influx_test.go | 17 +++++++++++++++ 6 files changed, 72 insertions(+), 7 deletions(-) diff --git a/plugins/outputs/influxdb/README.md b/plugins/outputs/influxdb/README.md index 48ab3d51b92c1..bb35675a69ba9 100644 --- a/plugins/outputs/influxdb/README.md +++ b/plugins/outputs/influxdb/README.md @@ -72,6 +72,10 @@ The InfluxDB output plugin writes metrics to the [InfluxDB v1.x] HTTP or UDP ser ## integer values. Enabling this option will result in field type errors if ## existing data has been written. # influx_uint_support = false + + ## When writing data to InfluxDB, time precision that should be used (s, ms, us, ns) + ## defaults to "ns" + # time_precision = "" ``` [InfluxDB v1.x]: https://github.com/influxdata/influxdb diff --git a/plugins/outputs/influxdb/http.go b/plugins/outputs/influxdb/http.go index 43aa55ea86736..863089fb7eedc 100644 --- a/plugins/outputs/influxdb/http.go +++ b/plugins/outputs/influxdb/http.go @@ -97,6 +97,7 @@ type HTTPConfig struct { RetentionPolicy string Consistency string SkipDatabaseCreation bool + TimePrecision string InfluxUintSupport bool `toml:"influx_uint_support"` Serializer *influx.Serializer @@ -272,7 +273,7 @@ func (c *httpClient) Write(ctx context.Context, metrics []telegraf.Metric) error } func (c *httpClient) writeBatch(ctx context.Context, db string, metrics []telegraf.Metric) error { - url, err := makeWriteURL(c.config.URL, db, c.config.RetentionPolicy, c.config.Consistency) + url, err := makeWriteURL(c.config.URL, db, c.config.RetentionPolicy, c.config.Consistency, c.config.TimePrecision) if err != nil { return err } @@ -407,7 +408,7 @@ func (c *httpClient) addHeaders(req *http.Request) { } } -func makeWriteURL(loc *url.URL, db, rp, consistency string) (string, error) { +func makeWriteURL(loc *url.URL, db, rp, consistency, timePrecision string) (string, error) { params := url.Values{} params.Set("db", db) @@ -419,6 +420,10 @@ func makeWriteURL(loc *url.URL, db, rp, consistency string) (string, error) { params.Set("consistency", consistency) } + if timePrecision != "" { + params.Set("precision", timePrecision) + } + u := *loc switch u.Scheme { case "unix": diff --git a/plugins/outputs/influxdb/http_test.go b/plugins/outputs/influxdb/http_test.go index 2b6b45eefa8d0..fe926be08ae61 100644 --- a/plugins/outputs/influxdb/http_test.go +++ b/plugins/outputs/influxdb/http_test.go @@ -358,6 +358,18 @@ func TestHTTP_Write(t *testing.T) { w.WriteHeader(http.StatusNoContent) }, }, + { + name: "send time precision", + config: influxdb.HTTPConfig{ + URL: u, + Database: "telegraf", + TimePrecision: "ms", + }, + queryHandlerFunc: func(t *testing.T, w http.ResponseWriter, r *http.Request) { + require.Equal(t, "ms", r.FormValue("precision")) + w.WriteHeader(http.StatusNoContent) + }, + }, { name: "hinted handoff not empty no log no error", config: influxdb.HTTPConfig{ diff --git a/plugins/outputs/influxdb/influxdb.go b/plugins/outputs/influxdb/influxdb.go index 3b3e8020607ab..603f1addda7d6 100644 --- a/plugins/outputs/influxdb/influxdb.go +++ b/plugins/outputs/influxdb/influxdb.go @@ -48,6 +48,7 @@ type InfluxDB struct { SkipDatabaseCreation bool `toml:"skip_database_creation"` InfluxUintSupport bool `toml:"influx_uint_support"` tls.ClientConfig + TimePrecision string `toml:"time_precision"` Precision string // precision deprecated in 1.0; value is ignored @@ -125,6 +126,10 @@ var sampleConfig = ` ## integer values. Enabling this option will result in field type errors if ## existing data has been written. # influx_uint_support = false + + ## When writing data to InfluxDB, time precision that should be used (s, ms, us, ns) + ## defaults to "ns" + # time_precision = "" ` func (i *InfluxDB) Connect() error { @@ -141,10 +146,15 @@ func (i *InfluxDB) Connect() error { } i.serializer = influx.NewSerializer() + if i.InfluxUintSupport { i.serializer.SetFieldTypeSupport(influx.UintSupport) } + if i.TimePrecision != "" { + i.serializer.SetTimeFormat(i.TimePrecision) + } + for _, u := range urls { parts, err := url.Parse(u) if err != nil { @@ -262,6 +272,7 @@ func (i *InfluxDB) httpClient(ctx context.Context, url *url.URL, proxy *url.URL) RetentionPolicy: i.RetentionPolicy, Consistency: i.WriteConsistency, Serializer: i.serializer, + TimePrecision: i.TimePrecision, } c, err := i.CreateHTTPClientF(config) diff --git a/plugins/serializers/influx/influx.go b/plugins/serializers/influx/influx.go index e7063cbd2f62a..2ded2aab55ff4 100644 --- a/plugins/serializers/influx/influx.go +++ b/plugins/serializers/influx/influx.go @@ -62,10 +62,11 @@ type Serializer struct { fieldSortOrder FieldSortOrder fieldTypeSupport FieldTypeSupport - buf bytes.Buffer - header []byte - footer []byte - pair []byte + buf bytes.Buffer + header []byte + footer []byte + pair []byte + timeFormat string } func NewSerializer() *Serializer { @@ -91,6 +92,10 @@ func (s *Serializer) SetFieldTypeSupport(typeSupport FieldTypeSupport) { s.fieldTypeSupport = typeSupport } +func (s *Serializer) SetTimeFormat(timeFormat string) { + s.timeFormat = timeFormat +} + // Serialize writes the telegraf.Metric to a byte slice. May produce multiple // lines of output if longer than maximum line length. Lines are terminated // with a newline (LF) char. @@ -170,7 +175,18 @@ func (s *Serializer) buildHeader(m telegraf.Metric) error { func (s *Serializer) buildFooter(m telegraf.Metric) { s.footer = s.footer[:0] s.footer = append(s.footer, ' ') - s.footer = strconv.AppendInt(s.footer, m.Time().UnixNano(), 10) + switch s.timeFormat { + case "s": + s.footer = strconv.AppendInt(s.footer, m.Time().Unix(), 10) + case "ms": + s.footer = strconv.AppendInt(s.footer, m.Time().UnixNano()/1000000, 10) + case "us": + s.footer = strconv.AppendInt(s.footer, m.Time().UnixNano()/1000, 10) + case "ns": + s.footer = strconv.AppendInt(s.footer, m.Time().UnixNano(), 10) + default: + s.footer = strconv.AppendInt(s.footer, m.Time().UnixNano(), 10) + } s.footer = append(s.footer, '\n') } diff --git a/plugins/serializers/influx/influx_test.go b/plugins/serializers/influx/influx_test.go index 8102bd973702b..d80615afd9859 100644 --- a/plugins/serializers/influx/influx_test.go +++ b/plugins/serializers/influx/influx_test.go @@ -24,6 +24,7 @@ var tests = []struct { input telegraf.Metric output []byte errReason string + timeFormat string }{ { name: "minimal", @@ -243,6 +244,21 @@ var tests = []struct { ), output: []byte("cpu value=42 1519194109000000042\n"), }, + { + name: "timestamp in milliseconds", + timeFormat: "ms", + input: MustMetric( + metric.New( + "cpu", + map[string]string{}, + map[string]interface{}{ + "value": 42.0, + }, + time.Unix(1519194109, 42), + ), + ), + output: []byte("cpu value=42 1519194109000\n"), + }, { name: "split fields exact", maxBytes: 33, @@ -444,6 +460,7 @@ func TestSerializer(t *testing.T) { serializer.SetMaxLineBytes(tt.maxBytes) serializer.SetFieldSortOrder(SortFields) serializer.SetFieldTypeSupport(tt.typeSupport) + serializer.SetTimeFormat(tt.timeFormat) output, err := serializer.Serialize(tt.input) if tt.errReason != "" { require.Error(t, err)