Skip to content

Commit

Permalink
Add precision rounding to accumulator
Browse files Browse the repository at this point in the history
Adding precision rounding to the accumulator. This means that now every
input metric will get rounded at collection, rather than at write (and
only for the influxdb output).

This feature is disabled for service inputs, because service inputs
should be in control of their own timestamps & precisions.
  • Loading branch information
sparrc committed Jun 13, 2016
1 parent 4d24283 commit d7efb7a
Show file tree
Hide file tree
Showing 13 changed files with 213 additions and 40 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
### Features

- [#1340](https://github.com/influxdata/telegraf/issues/1340): statsd: do not log every dropped metric.
- [#1368](https://github.com/influxdata/telegraf/pull/1368): Add precision rounding to all metrics on collection.

### Bugfixes

Expand Down
4 changes: 4 additions & 0 deletions accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,8 @@ type Accumulator interface {

Debug() bool
SetDebug(enabled bool)

SetPrecision(precision, interval time.Duration)

DisablePrecision()
}
29 changes: 29 additions & 0 deletions agent/accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ func NewAccumulator(
acc := accumulator{}
acc.metrics = metrics
acc.inputConfig = inputConfig
acc.precision = time.Nanosecond
return &acc
}

Expand All @@ -32,6 +33,8 @@ type accumulator struct {
inputConfig *internal_models.InputConfig

prefix string

precision time.Duration
}

func (ac *accumulator) Add(
Expand Down Expand Up @@ -141,6 +144,7 @@ func (ac *accumulator) AddFields(
} else {
timestamp = time.Now()
}
timestamp = timestamp.Round(ac.precision)

if ac.prefix != "" {
measurement = ac.prefix + measurement
Expand Down Expand Up @@ -173,6 +177,31 @@ func (ac *accumulator) SetTrace(trace bool) {
ac.trace = trace
}

// SetPrecision takes two time.Duration objects. If the first is non-zero,
// it sets that as the precision. Otherwise, it takes the second argument
// as the order of time that the metrics should be rounded to, with the
// maximum being 1s.
func (ac *accumulator) SetPrecision(precision, interval time.Duration) {
if precision > 0 {
ac.precision = precision
return
}
switch {
case interval >= time.Second:
ac.precision = time.Second
case interval >= time.Millisecond:
ac.precision = time.Millisecond
case interval >= time.Microsecond:
ac.precision = time.Microsecond
default:
ac.precision = time.Nanosecond
}
}

func (ac *accumulator) DisablePrecision() {
ac.precision = time.Nanosecond
}

func (ac *accumulator) setDefaultTags(tags map[string]string) {
ac.defaultTags = tags
}
Expand Down
122 changes: 122 additions & 0 deletions agent/accumulator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,128 @@ func TestAdd(t *testing.T) {
actual)
}

func TestAddNoPrecisionWithInterval(t *testing.T) {
a := accumulator{}
now := time.Date(2006, time.February, 10, 12, 0, 0, 82912748, time.UTC)
a.metrics = make(chan telegraf.Metric, 10)
defer close(a.metrics)
a.inputConfig = &internal_models.InputConfig{}

a.SetPrecision(0, time.Second)
a.Add("acctest", float64(101), map[string]string{})
a.Add("acctest", float64(101), map[string]string{"acc": "test"})
a.Add("acctest", float64(101), map[string]string{"acc": "test"}, now)

testm := <-a.metrics
actual := testm.String()
assert.Contains(t, actual, "acctest value=101")

testm = <-a.metrics
actual = testm.String()
assert.Contains(t, actual, "acctest,acc=test value=101")

testm = <-a.metrics
actual = testm.String()
assert.Equal(t,
fmt.Sprintf("acctest,acc=test value=101 %d", int64(1139572800000000000)),
actual)
}

func TestAddNoIntervalWithPrecision(t *testing.T) {
a := accumulator{}
now := time.Date(2006, time.February, 10, 12, 0, 0, 82912748, time.UTC)
a.metrics = make(chan telegraf.Metric, 10)
defer close(a.metrics)
a.inputConfig = &internal_models.InputConfig{}

a.SetPrecision(time.Second, time.Millisecond)
a.Add("acctest", float64(101), map[string]string{})
a.Add("acctest", float64(101), map[string]string{"acc": "test"})
a.Add("acctest", float64(101), map[string]string{"acc": "test"}, now)

testm := <-a.metrics
actual := testm.String()
assert.Contains(t, actual, "acctest value=101")

testm = <-a.metrics
actual = testm.String()
assert.Contains(t, actual, "acctest,acc=test value=101")

testm = <-a.metrics
actual = testm.String()
assert.Equal(t,
fmt.Sprintf("acctest,acc=test value=101 %d", int64(1139572800000000000)),
actual)
}

func TestAddDisablePrecision(t *testing.T) {
a := accumulator{}
now := time.Date(2006, time.February, 10, 12, 0, 0, 82912748, time.UTC)
a.metrics = make(chan telegraf.Metric, 10)
defer close(a.metrics)
a.inputConfig = &internal_models.InputConfig{}

a.SetPrecision(time.Second, time.Millisecond)
a.DisablePrecision()
a.Add("acctest", float64(101), map[string]string{})
a.Add("acctest", float64(101), map[string]string{"acc": "test"})
a.Add("acctest", float64(101), map[string]string{"acc": "test"}, now)

testm := <-a.metrics
actual := testm.String()
assert.Contains(t, actual, "acctest value=101")

testm = <-a.metrics
actual = testm.String()
assert.Contains(t, actual, "acctest,acc=test value=101")

testm = <-a.metrics
actual = testm.String()
assert.Equal(t,
fmt.Sprintf("acctest,acc=test value=101 %d", int64(1139572800082912748)),
actual)
}

func TestDifferentPrecisions(t *testing.T) {
a := accumulator{}
now := time.Date(2006, time.February, 10, 12, 0, 0, 82912748, time.UTC)
a.metrics = make(chan telegraf.Metric, 10)
defer close(a.metrics)
a.inputConfig = &internal_models.InputConfig{}

a.SetPrecision(0, time.Second)
a.Add("acctest", float64(101), map[string]string{"acc": "test"}, now)
testm := <-a.metrics
actual := testm.String()
assert.Equal(t,
fmt.Sprintf("acctest,acc=test value=101 %d", int64(1139572800000000000)),
actual)

a.SetPrecision(0, time.Millisecond)
a.Add("acctest", float64(101), map[string]string{"acc": "test"}, now)
testm = <-a.metrics
actual = testm.String()
assert.Equal(t,
fmt.Sprintf("acctest,acc=test value=101 %d", int64(1139572800083000000)),
actual)

a.SetPrecision(0, time.Microsecond)
a.Add("acctest", float64(101), map[string]string{"acc": "test"}, now)
testm = <-a.metrics
actual = testm.String()
assert.Equal(t,
fmt.Sprintf("acctest,acc=test value=101 %d", int64(1139572800082913000)),
actual)

a.SetPrecision(0, time.Nanosecond)
a.Add("acctest", float64(101), map[string]string{"acc": "test"}, now)
testm = <-a.metrics
actual = testm.String()
assert.Equal(t,
fmt.Sprintf("acctest,acc=test value=101 %d", int64(1139572800082912748)),
actual)
}

func TestAddDefaultTags(t *testing.T) {
a := accumulator{}
a.addDefaultTag("default", "tag")
Expand Down
7 changes: 7 additions & 0 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ func (a *Agent) gatherer(

acc := NewAccumulator(input.Config, metricC)
acc.SetDebug(a.Config.Agent.Debug)
acc.SetPrecision(a.Config.Agent.Precision.Duration,
a.Config.Agent.Interval.Duration)
acc.setDefaultTags(a.Config.Tags)

internal.RandomSleep(a.Config.Agent.CollectionJitter.Duration, shutdown)
Expand Down Expand Up @@ -201,6 +203,8 @@ func (a *Agent) Test() error {
for _, input := range a.Config.Inputs {
acc := NewAccumulator(input.Config, metricC)
acc.SetTrace(true)
acc.SetPrecision(a.Config.Agent.Precision.Duration,
a.Config.Agent.Interval.Duration)
acc.setDefaultTags(a.Config.Tags)

fmt.Printf("* Plugin: %s, Collection 1\n", input.Name)
Expand Down Expand Up @@ -289,6 +293,9 @@ func (a *Agent) Run(shutdown chan struct{}) error {
case telegraf.ServiceInput:
acc := NewAccumulator(input.Config, metricC)
acc.SetDebug(a.Config.Agent.Debug)
// Service input plugins should set their own precision of their
// metrics.
acc.DisablePrecision()
acc.setDefaultTags(a.Config.Tags)
if err := p.Start(acc); err != nil {
log.Printf("Service for input %s failed to start, exiting\n%s\n",
Expand Down
8 changes: 5 additions & 3 deletions etc/telegraf.conf
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@
## ie, a jitter of 5s and interval 10s means flushes will happen every 10-15s
flush_jitter = "0s"

## By default, precision will be set to the same timestamp order as the
## collection interval, with the maximum being 1s.
## Precision will NOT be used for service inputs, such as logparser and statsd.
## Valid values are "Nns", "Nus" (or "Nµs"), "Nms", "Ns".
precision = ""
## Run telegraf in debug mode
debug = false
## Run telegraf in quiet mode
Expand All @@ -75,9 +80,6 @@
urls = ["http://localhost:8086"] # required
## The target database for metrics (telegraf will create it if not exists).
database = "telegraf" # required
## Precision of writes, valid values are "ns", "us" (or "µs"), "ms", "s", "m", "h".
## note: using "s" precision greatly improves InfluxDB compression.
precision = "s"

## Retention policy to write to.
retention_policy = "default"
Expand Down
18 changes: 15 additions & 3 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,14 @@ type AgentConfig struct {
// ie, if Interval=10s then always collect on :00, :10, :20, etc.
RoundInterval bool

// By default, precision will be set to the same timestamp order as the
// collection interval, with the maximum being 1s.
// ie, when interval = "10s", precision will be "1s"
// when interval = "250ms", precision will be "1ms"
// Precision will NOT be used for service inputs. It is up to each individual
// service input to set the timestamp at the appropriate precision.
Precision internal.Duration

// CollectionJitter is used to jitter the collection by a random amount.
// Each plugin will sleep for a random time within jitter before collecting.
// This can be used to avoid many plugins querying things like sysfs at the
Expand Down Expand Up @@ -108,11 +116,10 @@ type AgentConfig struct {
// does _not_ deactivate FlushInterval.
FlushBufferWhenFull bool

// TODO(cam): Remove UTC and Precision parameters, they are no longer
// TODO(cam): Remove UTC and parameter, they are no longer
// valid for the agent config. Leaving them here for now for backwards-
// compatability
UTC bool `toml:"utc"`
Precision string
UTC bool `toml:"utc"`

// Debug is the option for running in debug mode
Debug bool
Expand Down Expand Up @@ -209,6 +216,11 @@ var header = `# Telegraf Configuration
## ie, a jitter of 5s and interval 10s means flushes will happen every 10-15s
flush_jitter = "0s"
## By default, precision will be set to the same timestamp order as the
## collection interval, with the maximum being 1s.
## Precision will NOT be used for service inputs, such as logparser and statsd.
## Valid values are "Nns", "Nus" (or "Nµs"), "Nms", "Ns".
precision = ""
## Run telegraf in debug mode
debug = false
## Run telegraf in quiet mode
Expand Down
9 changes: 2 additions & 7 deletions metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,9 @@ func NewMetric(
name string,
tags map[string]string,
fields map[string]interface{},
t ...time.Time,
t time.Time,
) (Metric, error) {
var T time.Time
if len(t) > 0 {
T = t[0]
}

pt, err := client.NewPoint(name, tags, fields, T)
pt, err := client.NewPoint(name, tags, fields, t)
if err != nil {
return nil, err
}
Expand Down
17 changes: 0 additions & 17 deletions metric_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,23 +51,6 @@ func TestNewMetricString(t *testing.T) {
assert.Equal(t, lineProtoPrecision, m.PrecisionString("s"))
}

func TestNewMetricStringNoTime(t *testing.T) {
tags := map[string]string{
"host": "localhost",
}
fields := map[string]interface{}{
"usage_idle": float64(99),
}
m, err := NewMetric("cpu", tags, fields)
assert.NoError(t, err)

lineProto := fmt.Sprintf("cpu,host=localhost usage_idle=99")
assert.Equal(t, lineProto, m.String())

lineProtoPrecision := fmt.Sprintf("cpu,host=localhost usage_idle=99")
assert.Equal(t, lineProtoPrecision, m.PrecisionString("s"))
}

func TestNewMetricFailNaN(t *testing.T) {
now := time.Now()

Expand Down
9 changes: 8 additions & 1 deletion plugins/inputs/prometheus/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"io"
"math"
"mime"
"time"

"github.com/influxdata/telegraf"

Expand Down Expand Up @@ -88,7 +89,13 @@ func (p *PrometheusParser) Parse(buf []byte) ([]telegraf.Metric, error) {
}
// converting to telegraf metric
if len(fields) > 0 {
metric, err := telegraf.NewMetric(metricName, tags, fields)
var t time.Time
if m.TimestampMs != nil && *m.TimestampMs > 0 {
t = time.Unix(0, *m.TimestampMs*1000000)
} else {
t = time.Now()
}
metric, err := telegraf.NewMetric(metricName, tags, fields, t)
if err == nil {
metrics = append(metrics, metric)
}
Expand Down
8 changes: 3 additions & 5 deletions plugins/outputs/influxdb/influxdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ type InfluxDB struct {
Password string
Database string
UserAgent string
Precision string
RetentionPolicy string
WriteConsistency string
Timeout internal.Duration
Expand All @@ -39,6 +38,9 @@ type InfluxDB struct {
// Use SSL but skip chain & host verification
InsecureSkipVerify bool

// Precision is only here for legacy support. It will be ignored.
Precision string

conns []client.Client
}

Expand All @@ -50,9 +52,6 @@ var sampleConfig = `
urls = ["http://localhost:8086"] # required
## The target database for metrics (telegraf will create it if not exists).
database = "telegraf" # required
## Precision of writes, valid values are "ns", "us" (or "µs"), "ms", "s", "m", "h".
## note: using "s" precision greatly improves InfluxDB compression.
precision = "s"
## Retention policy to write to.
retention_policy = "default"
Expand Down Expand Up @@ -184,7 +183,6 @@ func (i *InfluxDB) Write(metrics []telegraf.Metric) error {
}
bp, err := client.NewBatchPoints(client.BatchPointsConfig{
Database: i.Database,
Precision: i.Precision,
RetentionPolicy: i.RetentionPolicy,
WriteConsistency: i.WriteConsistency,
})
Expand Down
Loading

0 comments on commit d7efb7a

Please sign in to comment.