From 8294c87650519af8ca981aebe334a4e5a305e8b0 Mon Sep 17 00:00:00 2001 From: Lance O'Connor Date: Mon, 18 Nov 2019 12:38:34 -0800 Subject: [PATCH] Add Splunk MultiMetric support (#6640) --- internal/config/config.go | 13 ++ plugins/serializers/registry.go | 9 +- plugins/serializers/splunkmetric/README.md | 35 +++- .../serializers/splunkmetric/splunkmetric.go | 179 +++++++++++++----- .../splunkmetric/splunkmetric_test.go | 80 ++++++-- 5 files changed, 250 insertions(+), 66 deletions(-) diff --git a/internal/config/config.go b/internal/config/config.go index d45e52c665e29..3ef4cee584ac3 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -1952,6 +1952,18 @@ func buildSerializer(name string, tbl *ast.Table) (serializers.Serializer, error } } + if node, ok := tbl.Fields["splunkmetric_multimetric"]; ok { + if kv, ok := node.(*ast.KeyValue); ok { + if b, ok := kv.Value.(*ast.Boolean); ok { + var err error + c.SplunkmetricMultiMetric, err = b.Boolean() + if err != nil { + return nil, err + } + } + } + } + if node, ok := tbl.Fields["wavefront_source_override"]; ok { if kv, ok := node.(*ast.KeyValue); ok { if ary, ok := kv.Value.(*ast.Array); ok { @@ -1985,6 +1997,7 @@ func buildSerializer(name string, tbl *ast.Table) (serializers.Serializer, error delete(tbl.Fields, "template") delete(tbl.Fields, "json_timestamp_units") delete(tbl.Fields, "splunkmetric_hec_routing") + delete(tbl.Fields, "splunkmetric_multimetric") delete(tbl.Fields, "wavefront_source_override") delete(tbl.Fields, "wavefront_use_strict") return serializers.NewSerializer(c) diff --git a/plugins/serializers/registry.go b/plugins/serializers/registry.go index cfdb784ccfe73..aae590f787126 100644 --- a/plugins/serializers/registry.go +++ b/plugins/serializers/registry.go @@ -73,6 +73,9 @@ type Config struct { // Include HEC routing fields for splunkmetric output HecRouting bool + // Enable Splunk MultiMetric output (Splunk 8.0+) + SplunkmetricMultiMetric bool + // Point tags to use as the source name for Wavefront (if none found, host will be used). WavefrontSourceOverride []string @@ -93,7 +96,7 @@ func NewSerializer(config *Config) (Serializer, error) { case "json": serializer, err = NewJsonSerializer(config.TimestampUnits) case "splunkmetric": - serializer, err = NewSplunkmetricSerializer(config.HecRouting) + serializer, err = NewSplunkmetricSerializer(config.HecRouting, config.SplunkmetricMultiMetric) case "nowmetric": serializer, err = NewNowSerializer() case "carbon2": @@ -118,8 +121,8 @@ func NewCarbon2Serializer() (Serializer, error) { return carbon2.NewSerializer() } -func NewSplunkmetricSerializer(splunkmetric_hec_routing bool) (Serializer, error) { - return splunkmetric.NewSerializer(splunkmetric_hec_routing) +func NewSplunkmetricSerializer(splunkmetric_hec_routing bool, splunkmetric_multimetric bool) (Serializer, error) { + return splunkmetric.NewSerializer(splunkmetric_hec_routing, splunkmetric_multimetric) } func NewNowSerializer() (Serializer, error) { diff --git a/plugins/serializers/splunkmetric/README.md b/plugins/serializers/splunkmetric/README.md index 552b90ea47b4b..47ad8e1bff6f0 100644 --- a/plugins/serializers/splunkmetric/README.md +++ b/plugins/serializers/splunkmetric/README.md @@ -27,6 +27,36 @@ In the above snippet, the following keys are dimensions: * dc * user +## Using Multimetric output + +Starting with Splunk Enterprise and Splunk Cloud 8.0, you can now send multiple metric values in one payload. This means, for example, that +you can send all of your CPU stats in one JSON struct, an example event looks like: + +```javascript +{ + "time": 1572469920, + "event": "metric", + "host": "mono.local", + "fields": { + "_config_hecRouting": false, + "_config_multiMetric": true, + "class": "osx", + "cpu": "cpu0", + "metric_name:telegraf.cpu.usage_guest": 0, + "metric_name:telegraf.cpu.usage_guest_nice": 0, + "metric_name:telegraf.cpu.usage_idle": 65.1, + "metric_name:telegraf.cpu.usage_iowait": 0, + "metric_name:telegraf.cpu.usage_irq": 0, + "metric_name:telegraf.cpu.usage_nice": 0, + "metric_name:telegraf.cpu.usage_softirq": 0, + "metric_name:telegraf.cpu.usage_steal": 0, + "metric_name:telegraf.cpu.usage_system": 10.2, + "metric_name:telegraf.cpu.usage_user": 24.7, + } +} +``` +In order to enable this mode, there's a new option `splunkmetric_multimetric` that you set in the appropriate output module you plan on using. + ## Using with the HTTP output To send this data to a Splunk HEC, you can use the HTTP output, there are some custom headers that you need to add @@ -61,6 +91,7 @@ to manage the HEC authorization, here's a sample config for an HTTP output: data_format = "splunkmetric" ## Provides time, index, source overrides for the HEC splunkmetric_hec_routing = true + # splunkmentric_multimetric = true ## Additional HTTP headers [outputs.http.headers] @@ -118,7 +149,6 @@ disabled = false INDEXED_EXTRACTIONS = json KV_MODE = none TIMESTAMP_FIELDS = time -TIME_FORMAT = %s.%3N ``` An example configuration of a file based output is: @@ -134,5 +164,6 @@ An example configuration of a file based output is: ## more about them here: ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md data_format = "splunkmetric" - hec_routing = false + splunkmetric_hec_routing = false + splunkmetric_multimetric = true ``` diff --git a/plugins/serializers/splunkmetric/splunkmetric.go b/plugins/serializers/splunkmetric/splunkmetric.go index cdcf6cc592567..77c724aa8570a 100644 --- a/plugins/serializers/splunkmetric/splunkmetric.go +++ b/plugins/serializers/splunkmetric/splunkmetric.go @@ -9,12 +9,33 @@ import ( ) type serializer struct { - HecRouting bool + HecRouting bool + SplunkmetricMultiMetric bool } -func NewSerializer(splunkmetric_hec_routing bool) (*serializer, error) { +type CommonTags struct { + Time float64 + Host string + Index string + Source string + Fields map[string]interface{} +} + +type HECTimeSeries struct { + Time float64 `json:"time"` + Event string `json:"event"` + Host string `json:"host,omitempty"` + Index string `json:"index,omitempty"` + Source string `json:"source,omitempty"` + Fields map[string]interface{} `json:"fields"` +} + +// NewSerializer Setup our new serializer +func NewSerializer(splunkmetric_hec_routing bool, splunkmetric_multimetric bool) (*serializer, error) { + /* Define output params */ s := &serializer{ - HecRouting: splunkmetric_hec_routing, + HecRouting: splunkmetric_hec_routing, + SplunkmetricMultiMetric: splunkmetric_multimetric, } return s, nil } @@ -45,26 +66,61 @@ func (s *serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) { return serialized, nil } -func (s *serializer) createObject(metric telegraf.Metric) (metricGroup []byte, err error) { +func (s *serializer) createMulti(metric telegraf.Metric, dataGroup HECTimeSeries, commonTags CommonTags) (metricGroup []byte, err error) { + /* When splunkmetric_multimetric is true, then we can write out multiple name=value pairs as part of the same + ** event payload. This only works when the time, host, and dimensions are the same for every name=value pair + ** in the timeseries data. + ** + ** The format for multimetric data is 'metric_name:nameOfMetric = valueOfMetric' + */ + var metricJSON []byte + + // Set the event data from the commonTags above. + dataGroup.Event = "metric" + dataGroup.Time = commonTags.Time + dataGroup.Host = commonTags.Host + dataGroup.Index = commonTags.Index + dataGroup.Source = commonTags.Source + dataGroup.Fields = commonTags.Fields + + // Stuff the metrid data into the structure. + for _, field := range metric.FieldList() { + value, valid := verifyValue(field.Value) - /* Splunk supports one metric json object, and does _not_ support an array of JSON objects. - ** Splunk has the following required names for the metric store: - ** metric_name: The name of the metric - ** _value: The value for the metric - ** time: The timestamp for the metric - ** All other index fields become dimensions. - */ - type HECTimeSeries struct { - Time float64 `json:"time"` - Event string `json:"event"` - Host string `json:"host,omitempty"` - Index string `json:"index,omitempty"` - Source string `json:"source,omitempty"` - Fields map[string]interface{} `json:"fields"` + if !valid { + log.Printf("D! Can not parse value: %v for key: %v", field.Value, field.Key) + continue + } + + dataGroup.Fields["metric_name:"+metric.Name()+"."+field.Key] = value } - dataGroup := HECTimeSeries{} - var metricJson []byte + // Manage the rest of the event details based upon HEC routing rules + switch s.HecRouting { + case true: + // Output the data as a fields array and host,index,time,source overrides for the HEC. + metricJSON, err = json.Marshal(dataGroup) + default: + // Just output the data and the time, useful for file based outuputs + dataGroup.Fields["time"] = dataGroup.Time + metricJSON, err = json.Marshal(dataGroup.Fields) + } + if err != nil { + return nil, err + } + // Let the JSON fall through to the return below + metricGroup = metricJSON + + return metricGroup, nil +} + +func (s *serializer) createSingle(metric telegraf.Metric, dataGroup HECTimeSeries, commonTags CommonTags) (metricGroup []byte, err error) { + /* The default mode is to generate one JSON entitiy per metric (required for pre-8.0 Splunks) + ** + ** The format for single metric is 'nameOfMetric = valueOfMetric' + */ + + var metricJSON []byte for _, field := range metric.FieldList() { @@ -75,39 +131,30 @@ func (s *serializer) createObject(metric telegraf.Metric) (metricGroup []byte, e continue } - obj := map[string]interface{}{} - obj["metric_name"] = metric.Name() + "." + field.Key - obj["_value"] = value - dataGroup.Event = "metric" - // Convert ns to float seconds since epoch. - dataGroup.Time = float64(metric.Time().UnixNano()) / float64(1000000000) - dataGroup.Fields = obj - - // Break tags out into key(n)=value(t) pairs - for n, t := range metric.Tags() { - if n == "host" { - dataGroup.Host = t - } else if n == "index" { - dataGroup.Index = t - } else if n == "source" { - dataGroup.Source = t - } else { - dataGroup.Fields[n] = t - } - } + + dataGroup.Time = commonTags.Time + + // Apply the common tags from above to every record. + dataGroup.Host = commonTags.Host + dataGroup.Index = commonTags.Index + dataGroup.Source = commonTags.Source + dataGroup.Fields = commonTags.Fields + + dataGroup.Fields["metric_name"] = metric.Name() + "." + field.Key + dataGroup.Fields["_value"] = value switch s.HecRouting { case true: // Output the data as a fields array and host,index,time,source overrides for the HEC. - metricJson, err = json.Marshal(dataGroup) + metricJSON, err = json.Marshal(dataGroup) default: // Just output the data and the time, useful for file based outuputs dataGroup.Fields["time"] = dataGroup.Time - metricJson, err = json.Marshal(dataGroup.Fields) + metricJSON, err = json.Marshal(dataGroup.Fields) } - metricGroup = append(metricGroup, metricJson...) + metricGroup = append(metricGroup, metricJSON...) if err != nil { return nil, err @@ -117,6 +164,52 @@ func (s *serializer) createObject(metric telegraf.Metric) (metricGroup []byte, e return metricGroup, nil } +func (s *serializer) createObject(metric telegraf.Metric) (metricGroup []byte, err error) { + + /* Splunk supports one metric json object, and does _not_ support an array of JSON objects. + ** Splunk has the following required names for the metric store: + ** metric_name: The name of the metric + ** _value: The value for the metric + ** time: The timestamp for the metric + ** All other index fields become dimensions. + */ + + dataGroup := HECTimeSeries{} + + // The tags are common to all events in this timeseries + commonTags := CommonTags{} + + commonObj := map[string]interface{}{} + + commonObj["config:hecRouting"] = s.HecRouting + commonObj["config:multiMetric"] = s.SplunkmetricMultiMetric + + commonTags.Fields = commonObj + + // Break tags out into key(n)=value(t) pairs + for n, t := range metric.Tags() { + if n == "host" { + commonTags.Host = t + } else if n == "index" { + commonTags.Index = t + } else if n == "source" { + commonTags.Source = t + } else { + commonTags.Fields[n] = t + } + } + commonTags.Time = float64(metric.Time().UnixNano()) / float64(1000000000) + switch s.SplunkmetricMultiMetric { + case true: + metricGroup, _ = s.createMulti(metric, dataGroup, commonTags) + default: + metricGroup, _ = s.createSingle(metric, dataGroup, commonTags) + } + + // Return the metric group regardless of if it's multimetric or single metric. + return metricGroup, nil +} + func verifyValue(v interface{}) (value interface{}, valid bool) { switch v.(type) { case string: diff --git a/plugins/serializers/splunkmetric/splunkmetric_test.go b/plugins/serializers/splunkmetric/splunkmetric_test.go index 04f6e6538294a..70037e28a43c8 100644 --- a/plugins/serializers/splunkmetric/splunkmetric_test.go +++ b/plugins/serializers/splunkmetric/splunkmetric_test.go @@ -29,11 +29,11 @@ func TestSerializeMetricFloat(t *testing.T) { m, err := metric.New("cpu", tags, fields, now) assert.NoError(t, err) - s, _ := NewSerializer(false) + s, _ := NewSerializer(false, false) var buf []byte buf, err = s.Serialize(m) assert.NoError(t, err) - expS := `{"_value":91.5,"cpu":"cpu0","metric_name":"cpu.usage_idle","time":1529875740.819}` + expS := `{"_value":91.5,"config:hecRouting":false,"config:multiMetric":false,"cpu":"cpu0","metric_name":"cpu.usage_idle","time":1529875740.819}` assert.Equal(t, string(expS), string(buf)) } @@ -49,11 +49,11 @@ func TestSerializeMetricFloatHec(t *testing.T) { m, err := metric.New("cpu", tags, fields, now) assert.NoError(t, err) - s, _ := NewSerializer(true) + s, _ := NewSerializer(true, false) var buf []byte buf, err = s.Serialize(m) assert.NoError(t, err) - expS := `{"time":1529875740.819,"event":"metric","fields":{"_value":91.5,"cpu":"cpu0","metric_name":"cpu.usage_idle"}}` + expS := `{"time":1529875740.819,"event":"metric","fields":{"_value":91.5,"config:hecRouting":true,"config:multiMetric":false,"cpu":"cpu0","metric_name":"cpu.usage_idle"}}` assert.Equal(t, string(expS), string(buf)) } @@ -68,12 +68,12 @@ func TestSerializeMetricInt(t *testing.T) { m, err := metric.New("cpu", tags, fields, now) assert.NoError(t, err) - s, _ := NewSerializer(false) + s, _ := NewSerializer(false, false) var buf []byte buf, err = s.Serialize(m) assert.NoError(t, err) - expS := `{"_value":90,"cpu":"cpu0","metric_name":"cpu.usage_idle","time":0}` + expS := `{"_value":90,"config:hecRouting":false,"config:multiMetric":false,"cpu":"cpu0","metric_name":"cpu.usage_idle","time":0}` assert.Equal(t, string(expS), string(buf)) } @@ -88,12 +88,12 @@ func TestSerializeMetricIntHec(t *testing.T) { m, err := metric.New("cpu", tags, fields, now) assert.NoError(t, err) - s, _ := NewSerializer(true) + s, _ := NewSerializer(true, false) var buf []byte buf, err = s.Serialize(m) assert.NoError(t, err) - expS := `{"time":0,"event":"metric","fields":{"_value":90,"cpu":"cpu0","metric_name":"cpu.usage_idle"}}` + expS := `{"time":0,"event":"metric","fields":{"_value":90,"config:hecRouting":true,"config:multiMetric":false,"cpu":"cpu0","metric_name":"cpu.usage_idle"}}` assert.Equal(t, string(expS), string(buf)) } @@ -108,12 +108,12 @@ func TestSerializeMetricBool(t *testing.T) { m, err := metric.New("docker", tags, fields, now) assert.NoError(t, err) - s, _ := NewSerializer(false) + s, _ := NewSerializer(false, false) var buf []byte buf, err = s.Serialize(m) assert.NoError(t, err) - expS := `{"_value":1,"container-name":"telegraf-test","metric_name":"docker.oomkiller","time":0}` + expS := `{"_value":1,"config:hecRouting":false,"config:multiMetric":false,"container-name":"telegraf-test","metric_name":"docker.oomkiller","time":0}` assert.Equal(t, string(expS), string(buf)) } @@ -128,12 +128,12 @@ func TestSerializeMetricBoolHec(t *testing.T) { m, err := metric.New("docker", tags, fields, now) assert.NoError(t, err) - s, _ := NewSerializer(true) + s, _ := NewSerializer(true, false) var buf []byte buf, err = s.Serialize(m) assert.NoError(t, err) - expS := `{"time":0,"event":"metric","fields":{"_value":0,"container-name":"telegraf-test","metric_name":"docker.oomkiller"}}` + expS := `{"time":0,"event":"metric","fields":{"_value":0,"config:hecRouting":true,"config:multiMetric":false,"container-name":"telegraf-test","metric_name":"docker.oomkiller"}}` assert.Equal(t, string(expS), string(buf)) } @@ -149,12 +149,12 @@ func TestSerializeMetricString(t *testing.T) { m, err := metric.New("cpu", tags, fields, now) assert.NoError(t, err) - s, _ := NewSerializer(false) + s, _ := NewSerializer(false, false) var buf []byte buf, err = s.Serialize(m) assert.NoError(t, err) - expS := `{"_value":5,"cpu":"cpu0","metric_name":"cpu.usage_idle","time":0}` + expS := `{"_value":5,"config:hecRouting":false,"config:multiMetric":false,"cpu":"cpu0","metric_name":"cpu.usage_idle","time":0}` assert.Equal(t, string(expS), string(buf)) assert.NoError(t, err) } @@ -182,11 +182,33 @@ func TestSerializeBatch(t *testing.T) { ) metrics := []telegraf.Metric{m, n} - s, _ := NewSerializer(false) + s, _ := NewSerializer(false, false) buf, err := s.SerializeBatch(metrics) assert.NoError(t, err) - expS := `{"_value":42,"metric_name":"cpu.value","time":0}` + `{"_value":92,"metric_name":"cpu.value","time":0}` + expS := `{"_value":42,"config:hecRouting":false,"config:multiMetric":false,"metric_name":"cpu.value","time":0}{"_value":92,"config:hecRouting":false,"config:multiMetric":false,"metric_name":"cpu.value","time":0}` + assert.Equal(t, string(expS), string(buf)) +} + +func TestSerializeMulti(t *testing.T) { + m := MustMetric( + metric.New( + "cpu", + map[string]string{}, + map[string]interface{}{ + "user": 42.0, + "system": 8.0, + }, + time.Unix(0, 0), + ), + ) + + metrics := []telegraf.Metric{m} + s, _ := NewSerializer(false, true) + buf, err := s.SerializeBatch(metrics) + assert.NoError(t, err) + + expS := `{"config:hecRouting":false,"config:multiMetric":true,"metric_name:cpu.system":8,"metric_name:cpu.user":42,"time":0}` assert.Equal(t, string(expS), string(buf)) } @@ -213,10 +235,32 @@ func TestSerializeBatchHec(t *testing.T) { ) metrics := []telegraf.Metric{m, n} - s, _ := NewSerializer(true) + s, _ := NewSerializer(true, false) + buf, err := s.SerializeBatch(metrics) + assert.NoError(t, err) + + expS := `{"time":0,"event":"metric","fields":{"_value":42,"config:hecRouting":true,"config:multiMetric":false,"metric_name":"cpu.value"}}{"time":0,"event":"metric","fields":{"_value":92,"config:hecRouting":true,"config:multiMetric":false,"metric_name":"cpu.value"}}` + assert.Equal(t, string(expS), string(buf)) +} + +func TestSerializeMultiHec(t *testing.T) { + m := MustMetric( + metric.New( + "cpu", + map[string]string{}, + map[string]interface{}{ + "usage": 42.0, + "system": 8.0, + }, + time.Unix(0, 0), + ), + ) + + metrics := []telegraf.Metric{m} + s, _ := NewSerializer(true, true) buf, err := s.SerializeBatch(metrics) assert.NoError(t, err) - expS := `{"time":0,"event":"metric","fields":{"_value":42,"metric_name":"cpu.value"}}` + `{"time":0,"event":"metric","fields":{"_value":92,"metric_name":"cpu.value"}}` + expS := `{"time":0,"event":"metric","fields":{"config:hecRouting":true,"config:multiMetric":true,"metric_name:cpu.system":8,"metric_name:cpu.usage":42}}` assert.Equal(t, string(expS), string(buf)) }