Skip to content

Commit

Permalink
fluentd: Remove non utf-8 characters from log lines
Browse files Browse the repository at this point in the history
When a log line in hash format contains non UTF-8 characters fluentd
would drop the complete line because it failed to convert the line in
key-value format.

By forcing UTF-8 encoding and replacing non UTF-8 characters with empty
strings the log line will not be dropped but only contain the valid
UTF-8 characters.

Fixes #5099

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>
  • Loading branch information
chaudum committed Jan 12, 2022
1 parent fc49a87 commit 80735f6
Show file tree
Hide file tree
Showing 11 changed files with 95 additions and 44 deletions.
9 changes: 5 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -425,19 +425,20 @@ fluent-bit-test:
# fluentd plugin #
##################
fluentd-plugin:
gem install bundler --version 1.16.2
gem install bundler --version 2.3.4
bundle config silence_root_warning true
bundle install --gemfile=clients/cmd/fluentd/Gemfile --path=clients/cmd/fluentd/vendor/bundle
bundle config set --local path clients/cmd/fluentd/vendor/bundle
bundle install --gemfile=clients/cmd/fluentd/Gemfile

fluentd-image:
$(SUDO) docker build -t $(IMAGE_PREFIX)/fluent-plugin-loki:$(IMAGE_TAG) -f clients/cmd/fluentd/Dockerfile .

fluentd-push:
$(SUDO) $(PUSH_OCI) $(IMAGE_PREFIX)/fluent-plugin-loki:$(IMAGE_TAG)

fluentd-test: LOKI_URL ?= http://localhost:3100/loki/api/
fluentd-test: LOKI_URL ?= http://loki:3100
fluentd-test:
LOKI_URL="$(LOKI_URL)" docker-compose -f clients/cmd/fluentd/docker/docker-compose.yml up --build $(IMAGE_PREFIX)/fluent-plugin-loki:$(IMAGE_TAG)
LOKI_URL="$(LOKI_URL)" docker-compose -f clients/cmd/fluentd/docker/docker-compose.yml up --build

##################
# logstash plugin #
Expand Down
12 changes: 9 additions & 3 deletions clients/cmd/fluentd/.gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
/coverage/
/.rspec_status
/Gemfile.lock
Gemfile.lock
.rspec_status
# rbenv
.ruby-version
# bundler
.bundle/
vendor/
# simplecov
coverage/
3 changes: 2 additions & 1 deletion clients/cmd/fluentd/.rubocop.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
require: rubocop-rspec

AllCops:
NewCops: disable
Exclude:
- 'bin/**'
- 'test/**/*.rb'
Expand Down Expand Up @@ -30,4 +31,4 @@ Style/HashEachMethods:
Style/HashTransformKeys:
Enabled: true
Style/HashTransformValues:
Enabled: true
Enabled: true
2 changes: 1 addition & 1 deletion clients/cmd/fluentd/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM ruby:2.6 as build
FROM ruby:2.7.5 as build

ENV DEBIAN_FRONTEND=noninteractive

Expand Down
11 changes: 7 additions & 4 deletions clients/cmd/fluentd/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,17 @@ See [docs/client/fluentd/README.md](../../docs/sources/clients/fluentd/_index.md

## Development

After checking out the repo, run `bin/setup` to install dependencies. Then, run `bundle exec rake spec` to run the tests. You can also run `bin/console` for an interactive prompt that will allow you to experiment.
After checking out the repo, run `bin/setup` to install dependencies. Then, run `bin/test` to run the tests. You can also run `bin/console` for an interactive prompt that will allow you to experiment.

To install this gem onto your local machine, run `bundle exec rake install`. To release a new version, update the version number in `fluent-plugin-grafana-loki.gemspec`, and then run `bundle exec rake release`, which will create a git tag for the version, push git commits and tags, and push the `.gem` file to [rubygems.org](https://rubygems.org).
To install this gem onto your local machine, run `ruby -S bundle exec rake install`. To release a new version, update the version number in `fluent-plugin-grafana-loki.gemspec`, and then run `ruby -S bundle exec rake release`, which will create a git tag for the version, push git commits and tags, and push the `.gem` file to [rubygems.org](https://rubygems.org).

To create the gem: `gem build fluent-plugin-grafana-loki.gemspec`
To create the gem: `ruby -S gem build fluent-plugin-grafana-loki.gemspec`

Useful additions:
`gem install rubocop`

```bash
ruby -S gem install rubocop
```

## Testing

Expand Down
10 changes: 6 additions & 4 deletions clients/cmd/fluentd/bin/setup
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
#!/usr/bin/env bash

set -euo pipefail
IFS=$'\n\t'
set -vx

gem install bundler
bundle install
ruby --version
echo ""
ruby -S gem install bundler --version 2.3.4
ruby -S bundle config set --local path $(pwd)/vendor/bundle
ruby -S bundle install
5 changes: 5 additions & 0 deletions clients/cmd/fluentd/bin/test
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#!/usr/bin/env bash

set -euo pipefail

ruby -S bundle exec rspec
20 changes: 17 additions & 3 deletions clients/cmd/fluentd/docker/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,15 +1,28 @@
version: '3'
services:
loki:
build:
context: ../../../../
dockerfile: ./cmd/loki/Dockerfile
image: grafana/loki:main
ports:
- 3100
volumes:
- ./fluentd.conf:/fluentd/etc/fluent.conf

# Receive forwarded logs and send to /fluentd/logs/data.log and loki
fluentd:
build:
context: ../../../..
dockerfile: ../Dockerfile
image: fluentd:loki
context: ../../../../
dockerfile: ./clients/cmd/fluentd/Dockerfile
image: grafana/fluent-plugin-loki:main
volumes:
- ./fluentd.conf:/fluentd/etc/fluent.conf
environment:
- LOKI_URL
depends_on:
- loki

# Read /var/log/syslog and send it to fluentd
fluentbit:
image: fluent/fluent-bit:1.0
Expand All @@ -20,3 +33,4 @@ services:
- /var/log/syslog:/var/log/syslog:ro
depends_on:
- fluentd
- loki
3 changes: 2 additions & 1 deletion clients/cmd/fluentd/lib/fluent/plugin/out_loki.rb
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,8 @@ def record_to_line(record)
when :key_value
formatted_labels = []
record.each do |k, v|
# Remove non UTF-8 characters by force-encoding the string and replacing said chars with empty string
v = v.encode('utf-8', invalid: :replace, replace: '')
# Escape double quotes and backslashes by prefixing them with a backslash
v = v.to_s.gsub(%r{(["\\])}, '\\\\\1')
if v.include?(' ') || v.include?('=')
Expand All @@ -292,7 +294,6 @@ def record_to_line(record)
line
end

#
# convert a line to loki line with labels
def line_to_loki(record)
chunk_labels = {}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
� rest of line
63 changes: 40 additions & 23 deletions clients/cmd/fluentd/spec/gems/fluent/plugin/loki_output_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,12 @@
driver = Fluent::Test::Driver::Output.new(described_class)
driver.configure(config)
content = File.readlines('spec/gems/fluent/plugin/data/syslog2')
single_chunk = [Time.at(1_546_270_458), content]
payload = driver.instance.generic_to_loki([single_chunk])
body = { 'streams': payload }
expect(body[:streams][0]['stream'].empty?).to eq true
expect(body[:streams][0]['values'].count).to eq 1
expect(body[:streams][0]['values'][0][0]).to eq '1546270458000000000'
chunk = [Time.at(1_546_270_458), content[0]]
payload = driver.instance.generic_to_loki([chunk])
expect(payload[0]['stream'].empty?).to eq true
expect(payload[0]['values'].count).to eq 1
expect(payload[0]['values'][0][0]).to eq '1546270458000000000'
expect(payload[0]['values'][0][1]).to eq content[0]
end

it 'converts syslog output with extra labels to loki output' do
Expand All @@ -66,12 +66,12 @@
driver = Fluent::Test::Driver::Output.new(described_class)
driver.configure(config)
content = File.readlines('spec/gems/fluent/plugin/data/syslog2')
single_chunk = [Time.at(1_546_270_458), content]
payload = driver.instance.generic_to_loki([single_chunk])
body = { 'streams': payload }
expect(body[:streams][0]['stream']).to eq('env' => 'test')
expect(body[:streams][0]['values'].count).to eq 1
expect(body[:streams][0]['values'][0][0]).to eq '1546270458000000000'
chunk = [Time.at(1_546_270_458), content[0]]
payload = driver.instance.generic_to_loki([chunk])
expect(payload[0]['stream']).to eq('env' => 'test')
expect(payload[0]['values'].count).to eq 1
expect(payload[0]['values'][0][0]).to eq '1546270458000000000'
expect(payload[0]['values'][0][1]).to eq content[0]
end

it 'converts multiple syslog output lines to loki output' do
Expand All @@ -84,11 +84,12 @@
line1 = [Time.at(1_546_270_458), content[0]]
line2 = [Time.at(1_546_270_460), content[1]]
payload = driver.instance.generic_to_loki([line1, line2])
body = { 'streams': payload }
expect(body[:streams][0]['stream'].empty?).to eq true
expect(body[:streams][0]['values'].count).to eq 2
expect(body[:streams][0]['values'][0][0]).to eq '1546270458000000000'
expect(body[:streams][0]['values'][1][0]).to eq '1546270460000000000'
expect(payload[0]['stream'].empty?).to eq true
expect(payload[0]['values'].count).to eq 2
expect(payload[0]['values'][0][0]).to eq '1546270458000000000'
expect(payload[0]['values'][0][1]).to eq content[0]
expect(payload[0]['values'][1][0]).to eq '1546270460000000000'
expect(payload[0]['values'][1][1]).to eq content[1]
end

it 'converts multiple syslog output lines with extra labels to loki output' do
Expand All @@ -102,11 +103,27 @@
line1 = [Time.at(1_546_270_458), content[0]]
line2 = [Time.at(1_546_270_460), content[1]]
payload = driver.instance.generic_to_loki([line1, line2])
body = { 'streams': payload }
expect(body[:streams][0]['stream']).to eq('env' => 'test')
expect(body[:streams][0]['values'].count).to eq 2
expect(body[:streams][0]['values'][0][0]).to eq '1546270458000000000'
expect(body[:streams][0]['values'][1][0]).to eq '1546270460000000000'
expect(payload[0]['stream']).to eq('env' => 'test')
expect(payload[0]['values'].count).to eq 2
expect(payload[0]['values'][0][0]).to eq '1546270458000000000'
expect(payload[0]['values'][0][1]).to eq content[0]
expect(payload[0]['values'][1][0]).to eq '1546270460000000000'
expect(payload[0]['values'][1][1]).to eq content[1]
end

it 'removed non utf-8 characters from log lines' do
config = <<-CONF
url https://logs-us-west1.grafana.net
CONF
driver = Fluent::Test::Driver::Output.new(described_class)
driver.configure(config)
content = File.readlines('spec/gems/fluent/plugin/data/non_utf8.log')
chunk = [Time.at(1_546_270_458), {'message'=>content[0], 'stream'=>'stdout'}]
payload = driver.instance.generic_to_loki([chunk])
expect(payload[0]['stream'].empty?).to eq true
expect(payload[0]['values'].count).to eq 1
expect(payload[0]['values'][0][0]).to eq '1546270458000000000'
expect(payload[0]['values'][0][1]).to eq 'message=" rest of line" stream=stdout'
end

it 'formats record hash as key_value' do
Expand All @@ -122,7 +139,7 @@
expect(body[:streams][0]['stream'].empty?).to eq true
expect(body[:streams][0]['values'].count).to eq 1
expect(body[:streams][0]['values'][0][0]).to eq '1546270458000000000'
expect(body[:streams][0]['values'][0][1]).to eq 'message="' + content[0] + '" stream="stdout"'
expect(body[:streams][0]['values'][0][1]).to eq 'message="' + content[0] + '" stream=stdout'
end

it 'formats record hash as json' do
Expand Down

0 comments on commit 80735f6

Please sign in to comment.