Skip to content

Commit

Permalink
Add support for time precision when using influxdb as output
Browse files Browse the repository at this point in the history
This allows to set the time precision (s, ms, us, ns) when sending metrics to InfluxDB. Previously, only ns was possible.
  • Loading branch information
StephenSorriaux committed Mar 28, 2019
1 parent 3045ffb commit 8f55843
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 7 deletions.
4 changes: 4 additions & 0 deletions plugins/outputs/influxdb/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ The InfluxDB output plugin writes metrics to the [InfluxDB v1.x] HTTP or UDP ser
## integer values. Enabling this option will result in field type errors if
## existing data has been written.
# influx_uint_support = false

## When writing data to InfluxDB, time precision that should be used (s, ms, us, ns)
## defaults to "ns"
# time_precision = ""
```

[InfluxDB v1.x]: https://github.com/influxdata/influxdb
9 changes: 7 additions & 2 deletions plugins/outputs/influxdb/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ type HTTPConfig struct {
RetentionPolicy string
Consistency string
SkipDatabaseCreation bool
TimePrecision string

InfluxUintSupport bool `toml:"influx_uint_support"`
Serializer *influx.Serializer
Expand Down Expand Up @@ -272,7 +273,7 @@ func (c *httpClient) Write(ctx context.Context, metrics []telegraf.Metric) error
}

func (c *httpClient) writeBatch(ctx context.Context, db string, metrics []telegraf.Metric) error {
url, err := makeWriteURL(c.config.URL, db, c.config.RetentionPolicy, c.config.Consistency)
url, err := makeWriteURL(c.config.URL, db, c.config.RetentionPolicy, c.config.Consistency, c.config.TimePrecision)
if err != nil {
return err
}
Expand Down Expand Up @@ -407,7 +408,7 @@ func (c *httpClient) addHeaders(req *http.Request) {
}
}

func makeWriteURL(loc *url.URL, db, rp, consistency string) (string, error) {
func makeWriteURL(loc *url.URL, db, rp, consistency, timePrecision string) (string, error) {
params := url.Values{}
params.Set("db", db)

Expand All @@ -419,6 +420,10 @@ func makeWriteURL(loc *url.URL, db, rp, consistency string) (string, error) {
params.Set("consistency", consistency)
}

if timePrecision != "" {
params.Set("precision", timePrecision)
}

u := *loc
switch u.Scheme {
case "unix":
Expand Down
12 changes: 12 additions & 0 deletions plugins/outputs/influxdb/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,18 @@ func TestHTTP_Write(t *testing.T) {
w.WriteHeader(http.StatusNoContent)
},
},
{
name: "send time precision",
config: influxdb.HTTPConfig{
URL: u,
Database: "telegraf",
TimePrecision: "ms",
},
queryHandlerFunc: func(t *testing.T, w http.ResponseWriter, r *http.Request) {
require.Equal(t, "ms", r.FormValue("precision"))
w.WriteHeader(http.StatusNoContent)
},
},
{
name: "hinted handoff not empty no log no error",
config: influxdb.HTTPConfig{
Expand Down
11 changes: 11 additions & 0 deletions plugins/outputs/influxdb/influxdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type InfluxDB struct {
SkipDatabaseCreation bool `toml:"skip_database_creation"`
InfluxUintSupport bool `toml:"influx_uint_support"`
tls.ClientConfig
TimePrecision string `toml:"time_precision"`

Precision string // precision deprecated in 1.0; value is ignored

Expand Down Expand Up @@ -125,6 +126,10 @@ var sampleConfig = `
## integer values. Enabling this option will result in field type errors if
## existing data has been written.
# influx_uint_support = false
## When writing data to InfluxDB, time precision that should be used (s, ms, us, ns)
## defaults to "ns"
# time_precision = ""
`

func (i *InfluxDB) Connect() error {
Expand All @@ -141,10 +146,15 @@ func (i *InfluxDB) Connect() error {
}

i.serializer = influx.NewSerializer()

if i.InfluxUintSupport {
i.serializer.SetFieldTypeSupport(influx.UintSupport)
}

if i.TimePrecision != "" {
i.serializer.SetTimeFormat(i.TimePrecision)
}

for _, u := range urls {
parts, err := url.Parse(u)
if err != nil {
Expand Down Expand Up @@ -262,6 +272,7 @@ func (i *InfluxDB) httpClient(ctx context.Context, url *url.URL, proxy *url.URL)
RetentionPolicy: i.RetentionPolicy,
Consistency: i.WriteConsistency,
Serializer: i.serializer,
TimePrecision: i.TimePrecision,
}

c, err := i.CreateHTTPClientF(config)
Expand Down
26 changes: 21 additions & 5 deletions plugins/serializers/influx/influx.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,11 @@ type Serializer struct {
fieldSortOrder FieldSortOrder
fieldTypeSupport FieldTypeSupport

buf bytes.Buffer
header []byte
footer []byte
pair []byte
buf bytes.Buffer
header []byte
footer []byte
pair []byte
timeFormat string
}

func NewSerializer() *Serializer {
Expand All @@ -91,6 +92,10 @@ func (s *Serializer) SetFieldTypeSupport(typeSupport FieldTypeSupport) {
s.fieldTypeSupport = typeSupport
}

func (s *Serializer) SetTimeFormat(timeFormat string) {
s.timeFormat = timeFormat
}

// Serialize writes the telegraf.Metric to a byte slice. May produce multiple
// lines of output if longer than maximum line length. Lines are terminated
// with a newline (LF) char.
Expand Down Expand Up @@ -170,7 +175,18 @@ func (s *Serializer) buildHeader(m telegraf.Metric) error {
func (s *Serializer) buildFooter(m telegraf.Metric) {
s.footer = s.footer[:0]
s.footer = append(s.footer, ' ')
s.footer = strconv.AppendInt(s.footer, m.Time().UnixNano(), 10)
switch s.timeFormat {
case "s":
s.footer = strconv.AppendInt(s.footer, m.Time().Unix(), 10)
case "ms":
s.footer = strconv.AppendInt(s.footer, m.Time().UnixNano()/1000000, 10)
case "us":
s.footer = strconv.AppendInt(s.footer, m.Time().UnixNano()/1000, 10)
case "ns":
s.footer = strconv.AppendInt(s.footer, m.Time().UnixNano(), 10)
default:
s.footer = strconv.AppendInt(s.footer, m.Time().UnixNano(), 10)
}
s.footer = append(s.footer, '\n')
}

Expand Down
17 changes: 17 additions & 0 deletions plugins/serializers/influx/influx_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ var tests = []struct {
input telegraf.Metric
output []byte
errReason string
timeFormat string
}{
{
name: "minimal",
Expand Down Expand Up @@ -243,6 +244,21 @@ var tests = []struct {
),
output: []byte("cpu value=42 1519194109000000042\n"),
},
{
name: "timestamp in milliseconds",
timeFormat: "ms",
input: MustMetric(
metric.New(
"cpu",
map[string]string{},
map[string]interface{}{
"value": 42.0,
},
time.Unix(1519194109, 42),
),
),
output: []byte("cpu value=42 1519194109000\n"),
},
{
name: "split fields exact",
maxBytes: 33,
Expand Down Expand Up @@ -444,6 +460,7 @@ func TestSerializer(t *testing.T) {
serializer.SetMaxLineBytes(tt.maxBytes)
serializer.SetFieldSortOrder(SortFields)
serializer.SetFieldTypeSupport(tt.typeSupport)
serializer.SetTimeFormat(tt.timeFormat)
output, err := serializer.Serialize(tt.input)
if tt.errReason != "" {
require.Error(t, err)
Expand Down

0 comments on commit 8f55843

Please sign in to comment.