From cde9a711f3f253cea78dd40d1cf978ab4a41a1f5 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Fri, 28 Jan 2022 10:43:29 +0100 Subject: [PATCH] fluent-bit: Attempt to unmarshal nested json. (#5223) * fluent-bit: Attempt to unmarshal nested json. Signed-off-by: Cyril Tovena * Also support nested json array. Signed-off-by: Cyril Tovena --- clients/cmd/fluent-bit/loki.go | 17 +++++++++++++++-- clients/cmd/fluent-bit/loki_test.go | 24 ++++++++++++++++-------- 2 files changed, 31 insertions(+), 10 deletions(-) diff --git a/clients/cmd/fluent-bit/loki.go b/clients/cmd/fluent-bit/loki.go index 112dc2cd4e75..d5f3b79835d9 100644 --- a/clients/cmd/fluent-bit/loki.go +++ b/clients/cmd/fluent-bit/loki.go @@ -2,6 +2,7 @@ package main import ( "bytes" + "encoding/json" "errors" "fmt" "os" @@ -78,7 +79,7 @@ func (l *loki) sendRecord(r map[interface{}]interface{}, ts time.Time) error { return nil } } - line, err := createLine(records, l.cfg.lineFormat) + line, err := l.createLine(records, l.cfg.lineFormat) if err != nil { return fmt.Errorf("error creating line: %v", err) } @@ -220,9 +221,21 @@ func removeKeys(records map[string]interface{}, keys []string) { } } -func createLine(records map[string]interface{}, f format) (string, error) { +func (l *loki) createLine(records map[string]interface{}, f format) (string, error) { switch f { case jsonFormat: + for k, v := range records { + if s, ok := v.(string); ok && (strings.Contains(s, "{") || strings.Contains(s, "[")) { + var data interface{} + err := json.Unmarshal([]byte(s), &data) + if err != nil { + // keep this debug as it can be very verbose + level.Debug(l.logger).Log("msg", "error unmarshalling json", "err", err) + continue + } + records[k] = data + } + } js, err := jsoniter.ConfigCompatibleWithStandardLibrary.Marshal(records) if err != nil { return "", err diff --git a/clients/cmd/fluent-bit/loki_test.go b/clients/cmd/fluent-bit/loki_test.go index f1a73501b552..a6033360cd30 100644 --- a/clients/cmd/fluent-bit/loki_test.go +++ b/clients/cmd/fluent-bit/loki_test.go @@ -18,12 +18,12 @@ import ( var now = time.Now() func Test_loki_sendRecord(t *testing.T) { - var simpleRecordFixture = map[interface{}]interface{}{ + simpleRecordFixture := map[interface{}]interface{}{ "foo": "bar", "bar": 500, "error": make(chan struct{}), } - var mapRecordFixture = map[interface{}]interface{}{ + mapRecordFixture := map[interface{}]interface{}{ // lots of key/value pairs in map to increase chances of test hitting in case of unsorted map marshalling "A": "A", "B": "B", @@ -34,14 +34,14 @@ func Test_loki_sendRecord(t *testing.T) { "G": "G", "H": "H", } - var byteArrayRecordFixture = map[interface{}]interface{}{ + byteArrayRecordFixture := map[interface{}]interface{}{ "label": "label", "outer": []byte("foo"), "map": map[interface{}]interface{}{ "inner": []byte("bar"), }, } - var mixedTypesRecordFixture = map[interface{}]interface{}{ + mixedTypesRecordFixture := map[interface{}]interface{}{ "label": "label", "int": 42, "float": 42.42, @@ -53,7 +53,7 @@ func Test_loki_sendRecord(t *testing.T) { }, }, } - var nestedJSONFixture = map[interface{}]interface{}{ + nestedJSONFixture := map[interface{}]interface{}{ "kubernetes": map[interface{}]interface{}{ "annotations": map[interface{}]interface{}{ "kubernetes.io/psp": "test", @@ -124,10 +124,15 @@ func Test_createLine(t *testing.T) { {"kv with map", map[string]interface{}{"foo": "bar", "map": map[string]interface{}{"foo": "bar", "bar ": "foo "}}, kvPairFormat, `foo=bar map="map[bar :foo foo:bar]"`, false}, {"kv empty", map[string]interface{}{}, kvPairFormat, ``, false}, {"bad format", nil, format(3), "", true}, + {"nested json", map[string]interface{}{"log": `{"level":"error"}`}, jsonFormat, `{"log":{"level":"error"}}`, false}, + {"nested json", map[string]interface{}{"log": `["level","error"]`}, jsonFormat, `{"log":["level","error"]}`, false}, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got, err := createLine(tt.records, tt.f) + l := &loki{ + logger: logger, + } + got, err := l.createLine(tt.records, tt.f) if (err != nil) != tt.wantErr { t.Errorf("createLine() error = %v, wantErr %v", err, tt.wantErr) return @@ -217,8 +222,11 @@ func Test_toStringMap(t *testing.T) { }{ {"already string", map[interface{}]interface{}{"string": "foo", "bar": []byte("buzz")}, map[string]interface{}{"string": "foo", "bar": "buzz"}}, {"skip non string", map[interface{}]interface{}{"string": "foo", 1.0: []byte("buzz")}, map[string]interface{}{"string": "foo"}}, - {"byteslice in array", map[interface{}]interface{}{"string": "foo", "bar": []interface{}{map[interface{}]interface{}{"baz": []byte("quux")}}}, - map[string]interface{}{"string": "foo", "bar": []interface{}{map[string]interface{}{"baz": "quux"}}}}, + { + "byteslice in array", + map[interface{}]interface{}{"string": "foo", "bar": []interface{}{map[interface{}]interface{}{"baz": []byte("quux")}}}, + map[string]interface{}{"string": "foo", "bar": []interface{}{map[string]interface{}{"baz": "quux"}}}, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) {