Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add GELF support for Promtail. #4744

Merged
merged 8 commits into from
Nov 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
## Main

* [4736](https://github.com/grafana/loki/pull/4736) **sandeepsukhani**: allow applying retention at different interval than compaction
* [4744](https://github.com/grafana/loki/pull/4744) **cyriltovena**: Promtail: Adds GELF UDP support.

# 2.4.1 (2021/11/07)

Release notes for 2.4.1 can be found on the [release notes page](https://grafana.com/docs/loki/latest/release-notes/v2-4/)
Expand All @@ -9,7 +12,6 @@ Release notes for 2.4.1 can be found on the [release notes page](https://grafana
* [4687](https://github.com/grafana/loki/pull/4687) **owen-d**: overrides checks for nil tenant limits on AllByUserID
* [4683](https://github.com/grafana/loki/pull/4683) **owen-d**: Adds replication_factor doc to common config
* [4681](https://github.com/grafana/loki/pull/4681) **slim-bean**: Loki: check new Read target when initializing boltdb-shipper store
* [4736](https://github.com/grafana/loki/pull/4736) **sandeepsukhani**: allow applying retention at different interval than compaction

# 2.4.0 (2021/11/05)

Expand Down
26 changes: 26 additions & 0 deletions clients/cmd/promtail/promtail-gelf.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
server:
http_listen_port: 9080
grpc_listen_port: 0

clients:
- url: http://localhost:3100/loki/api/v1/push

scrape_configs:
- job_name: gelf
gelf:
use_incoming_timestamp: false
labels:
job: gelf
relabel_configs:
- action: replace
source_labels:
- __gelf_message_level
target_label: level
- action: replace
source_labels:
- __gelf_message_host
target_label: host
- action: replace
source_labels:
- __gelf_message_facility
target_label: facility
14 changes: 14 additions & 0 deletions clients/pkg/promtail/scrapeconfig/scrapeconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type Config struct {
PushConfig *PushTargetConfig `yaml:"loki_push_api,omitempty"`
WindowsConfig *WindowsEventsTargetConfig `yaml:"windows_events,omitempty"`
KafkaConfig *KafkaTargetConfig `yaml:"kafka,omitempty"`
GelfConfig *GelfTargetConfig `yaml:"gelf,omitempty"`
RelabelConfigs []*relabel.Config `yaml:"relabel_configs,omitempty"`
ServiceDiscoveryConfig ServiceDiscoveryConfig `yaml:",inline"`
}
Expand Down Expand Up @@ -295,6 +296,19 @@ type KafkaSASLConfig struct {
TLSConfig promconfig.TLSConfig `yaml:",inline"`
}

// GelfTargetConfig describes a scrape config that read GELF messages on UDP.
type GelfTargetConfig struct {
// ListenAddress is the address to listen on UDP for gelf messages. (Default to `:12201`)
ListenAddress string `yaml:"listen_address"`

// Labels optionally holds labels to associate with each record read from gelf messages.
Labels model.LabelSet `yaml:"labels"`

// UseIncomingTimestamp sets the timestamp to the incoming gelf messages
// timestamp if it's set.
UseIncomingTimestamp bool `yaml:"use_incoming_timestamp"`
}

// GcplogTargetConfig describes a scrape config to pull logs from any pubsub topic.
type GcplogTargetConfig struct {
// ProjectID is the Cloud project id
Expand Down
198 changes: 198 additions & 0 deletions clients/pkg/promtail/targets/gelf/gelftarget.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
package gelf

import (
"bytes"
"context"
"strings"
"sync"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/relabel"
"gopkg.in/Graylog2/go-gelf.v2/gelf"

"github.com/grafana/loki/clients/pkg/promtail/api"
"github.com/grafana/loki/clients/pkg/promtail/scrapeconfig"
"github.com/grafana/loki/clients/pkg/promtail/targets/target"

"github.com/grafana/loki/pkg/logproto"
)

// SeverityLevels maps severity levels to severity string levels.
var SeverityLevels = map[int32]string{
0: "emergency",
1: "alert",
2: "critical",
3: "error",
4: "warning",
5: "notice",
6: "informational",
7: "debug",
}

// Target listens to gelf messages on udp.
type Target struct {
metrics *Metrics
logger log.Logger
handler api.EntryHandler
config *scrapeconfig.GelfTargetConfig
relabelConfig []*relabel.Config
gelfReader *gelf.Reader
encodeBuff *bytes.Buffer
wg sync.WaitGroup

ctx context.Context
ctxCancel context.CancelFunc
}

// NewTarget configures a new Gelf Target.
func NewTarget(
metrics *Metrics,
logger log.Logger,
handler api.EntryHandler,
relabel []*relabel.Config,
config *scrapeconfig.GelfTargetConfig,
) (*Target, error) {

if config.ListenAddress == "" {
config.ListenAddress = ":12201"
}

gelfReader, err := gelf.NewReader(config.ListenAddress)
if err != nil {
return nil, err
}
ctx, cancel := context.WithCancel(context.Background())

t := &Target{
metrics: metrics,
logger: logger,
handler: handler,
config: config,
relabelConfig: relabel,
gelfReader: gelfReader,
encodeBuff: bytes.NewBuffer(make([]byte, 0, 1024)),

ctx: ctx,
ctxCancel: cancel,
}

t.run()
return t, err
}

func (t *Target) run() {
t.wg.Add(1)
go func() {
defer t.wg.Done()
level.Info(t.logger).Log("msg", "listening for GELF UDP messages", "listen_address", t.config.ListenAddress)
for {
select {
case <-t.ctx.Done():
level.Info(t.logger).Log("msg", "GELF UDP listener shutdown", "listen_address", t.config.ListenAddress)
return
default:
msg, err := t.gelfReader.ReadMessage()
if err != nil {
level.Error(t.logger).Log("msg", "error while reading gelf message", "listen_address", t.config.ListenAddress, "err", err)
t.metrics.gelfErrors.Inc()
continue
}
if msg != nil {
t.metrics.gelfEntries.Inc()
t.handleMessage(msg)
}
}
}
}()
}

func (t *Target) handleMessage(msg *gelf.Message) {
lb := labels.NewBuilder(nil)

// Add all labels from the config.
for k, v := range t.config.Labels {
lb.Set(string(k), string(v))
}
Comment on lines +116 to +119
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this the same for each message? Why is it done each time?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To ensure label are clone and not mutated after. Might not be required.

lb.Set("__gelf_message_level", SeverityLevels[msg.Level])
lb.Set("__gelf_message_host", msg.Host)
lb.Set("__gelf_message_version", msg.Version)
lb.Set("__gelf_message_facility", msg.Facility)

processed := relabel.Process(lb.Labels(), t.relabelConfig...)

filtered := make(model.LabelSet)
for _, lbl := range processed {
if strings.HasPrefix(lbl.Name, "__") {
continue
}
filtered[model.LabelName(lbl.Name)] = model.LabelValue(lbl.Value)
}

var timestamp time.Time
if t.config.UseIncomingTimestamp && msg.TimeUnix != 0 {
// TimeUnix is the timestamp of the message, in seconds since the UNIX epoch with decimals for fractional seconds.
timestamp = secondsToUnixTimestamp(msg.TimeUnix)
} else {
timestamp = time.Now()
}
t.encodeBuff.Reset()
err := msg.MarshalJSONBuf(t.encodeBuff)
if err != nil {
level.Error(t.logger).Log("msg", "error while marshalling gelf message", "listen_address", t.config.ListenAddress, "err", err)
t.metrics.gelfErrors.Inc()
return
}
t.handler.Chan() <- api.Entry{
Labels: filtered,
Entry: logproto.Entry{
Timestamp: timestamp,
Line: t.encodeBuff.String(),
},
}
}

func secondsToUnixTimestamp(seconds float64) time.Time {
return time.Unix(0, int64(seconds*float64(time.Second)))
}

// Type returns GelfTargetType.
func (t *Target) Type() target.TargetType {
return target.GelfTargetType
}

// Ready indicates whether or not the gelf target is ready to be read from.
func (t *Target) Ready() bool {
return true
}

// DiscoveredLabels returns the set of labels discovered by the gelf target, which
// is always nil. Implements Target.
func (t *Target) DiscoveredLabels() model.LabelSet {
return nil
}

// Labels returns the set of labels that statically apply to all log entries
// produced by the GelfTarget.
func (t *Target) Labels() model.LabelSet {
return t.config.Labels
}

// Details returns target-specific details.
func (t *Target) Details() interface{} {
return map[string]string{}
}

// Stop shuts down the GelfTarget.
func (t *Target) Stop() {
level.Info(t.logger).Log("msg", "Shutting down GELF UDP listener", "listen_address", t.config.ListenAddress)
t.ctxCancel()
if err := t.gelfReader.Close(); err != nil {
level.Error(t.logger).Log("msg", "error while closing gelf reader", "err", err)
}
t.wg.Wait()
t.handler.Stop()
}
106 changes: 106 additions & 0 deletions clients/pkg/promtail/targets/gelf/gelftarget_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package gelf

import (
"testing"
"time"

"github.com/go-kit/log"
"github.com/grafana/loki/clients/pkg/promtail/client/fake"
"github.com/grafana/loki/clients/pkg/promtail/scrapeconfig"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/relabel"
"github.com/stretchr/testify/require"
"gopkg.in/Graylog2/go-gelf.v2/gelf"
)

func Test_Gelf(t *testing.T) {
client := fake.New(func() {})

tm, err := NewTargetManager(NewMetrics(nil), log.NewNopLogger(), client, []scrapeconfig.Config{
{
JobName: "gelf",
GelfConfig: &scrapeconfig.GelfTargetConfig{
ListenAddress: ":12201",
UseIncomingTimestamp: true,
Labels: model.LabelSet{"cfg": "true"},
},
RelabelConfigs: []*relabel.Config{
{
SourceLabels: model.LabelNames{"__gelf_message_level"},
TargetLabel: "level",
Replacement: "$1",
Action: relabel.Replace,
Regex: relabel.MustNewRegexp("(.*)"),
},
{
SourceLabels: model.LabelNames{"__gelf_message_host"},
TargetLabel: "hostname",
Replacement: "$1",
Action: relabel.Replace,
Regex: relabel.MustNewRegexp("(.*)"),
},
{
SourceLabels: model.LabelNames{"__gelf_message_facility"},
TargetLabel: "facility",
Replacement: "$1",
Action: relabel.Replace,
Regex: relabel.MustNewRegexp("(.*)"),
},
},
},
})
require.NoError(t, err)

w, err := gelf.NewUDPWriter(":12201")
require.NoError(t, err)
baseTs := float64(time.Unix(10, 0).Unix()) + 0.250
ts := baseTs

for i := 0; i < 10; i++ {
require.NoError(t, w.WriteMessage(&gelf.Message{
Short: "short",
Full: "full",
Version: "2.2",
Host: "thishost",
TimeUnix: ts,
Level: gelf.LOG_ERR,
Facility: "gelftest",
Extra: map[string]interface{}{
"_foo": "bar",
},
}))
ts += 0.250
}

require.Eventually(t, func() bool {
return len(client.Received()) == 10
}, 200*time.Millisecond, 20*time.Millisecond)

for i, actual := range client.Received() {
require.Equal(t, "error", string(actual.Labels["level"]))
require.Equal(t, "true", string(actual.Labels["cfg"]))
require.Equal(t, "thishost", string(actual.Labels["hostname"]))
require.Equal(t, "gelftest", string(actual.Labels["facility"]))

require.Equal(t, secondsToUnixTimestamp(baseTs+float64(i)*0.250), actual.Timestamp)

var gelfMsg gelf.Message

require.NoError(t, gelfMsg.UnmarshalJSON([]byte(actual.Line)))

require.Equal(t, "short", gelfMsg.Short)
require.Equal(t, "full", gelfMsg.Full)
require.Equal(t, "2.2", gelfMsg.Version)
require.Equal(t, "thishost", gelfMsg.Host)
require.Equal(t, "bar", gelfMsg.Extra["_foo"])
require.Equal(t, gelf.LOG_ERR, int(gelfMsg.Level))
require.Equal(t, "gelftest", gelfMsg.Facility)

}

tm.Stop()
}

func TestConvertTime(t *testing.T) {
require.Equal(t, time.Unix(0, int64(time.Second+(time.Duration(250)*time.Millisecond))), secondsToUnixTimestamp(float64(time.Unix(1, 0).Unix())+0.250))
}
Loading