Skip to content

Commit

Permalink
lambda-promtail: Add support for Kinesis data stream events (#5977)
Browse files Browse the repository at this point in the history
**What this PR does / why we need it**:

This PR adds support for sending Kinesis data stream events to lambda-promtail. One use case would be e.g. to send CloudFront realtime logs to Loki.

**Which issue(s) this PR fixes**:

Fixes #5978 

**Special notes for your reviewer**:

n/a

**Checklist**
- [x] Documentation added
- [x] Tests updated
- [x] Is this an important fix or new feature? Add an entry in the `CHANGELOG.md`.
- [ ] Changes that require user attention or interaction to upgrade are documented in `docs/sources/upgrading/_index.md`
  • Loading branch information
juissi-t authored Aug 26, 2022
1 parent e3cd312 commit 073c620
Show file tree
Hide file tree
Showing 7 changed files with 174 additions and 6 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#### Promtail

##### Enhancements
* [5977](https://github.com/grafana/loki/pull/5977) **juissi-t** lambda-promtail: Add support for Kinesis data stream events
* [6395](https://github.com/grafana/loki/pull/6395) **DylanGuedes**: Add encoding support
* [6828](https://github.com/grafana/loki/pull/6828) **alexandre1984rj** Add the BotScore and BotScoreSrc fields once the Cloudflare API returns those two fields on the list of all available log fields.
* [6656](https://github.com/grafana/loki/pull/6656) **carlospeon**: Allow promtail to add matches to the journal reader
Expand Down
15 changes: 10 additions & 5 deletions docs/sources/clients/lambda-promtail/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ terraform apply -var "lambda_promtail_image=<repo:tag>" -var "write_address=http
```

The first few lines of `main.tf` define the AWS region to deploy to.
Modify as desired, or remove and deploy to
Modify as desired, or remove and deploy to
```
provider "aws" {
region = "us-east-2"
Expand Down Expand Up @@ -80,7 +80,7 @@ To modify an existing CloudFormation stack, use [update-stack](https://docs.aws.

This workflow is intended to be an effective approach for monitoring ephemeral jobs such as those run on AWS Lambda which are otherwise hard/impossible to monitor via one of the other Loki [clients](../).

Ephemeral jobs can quite easily run afoul of cardinality best practices. During high request load, an AWS lambda function might balloon in concurrency, creating many log streams in Cloudwatch. For this reason lambda-promtail defaults to **not** keeping the log stream value as a label when propagating the logs to Loki. This is only possible because new versions of Loki no longer have an ingestion ordering constraint on logs within a single stream.
Ephemeral jobs can quite easily run afoul of cardinality best practices. During high request load, an AWS lambda function might balloon in concurrency, creating many log streams in Cloudwatch. For this reason lambda-promtail defaults to **not** keeping the log stream value as a label when propagating the logs to Loki. This is only possible because new versions of Loki no longer have an ingestion ordering constraint on logs within a single stream.

### Proof of concept Loki deployments

Expand All @@ -92,14 +92,19 @@ Note: Propagating logs from Cloudwatch to Loki means you'll still need to _pay_

This workflow allows ingesting AWS loadbalancer logs stored on S3 to Loki.

### Cloudfront real-time logs

Cloudfront [real-time logs](https://docs.aws.amazon.com/AmazonCloudFront/latest/DeveloperGuide/real-time-logs.html) can be sent to a Kinesis data stream. The data stream can be mapped to be an [event source](https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventsourcemapping.html) for lambda-promtail to deliver the logs to Loki.

## Propagated Labels

Incoming logs can have six special labels assigned to them which can be used in [relabeling](../promtail/configuration/#relabel_config) or later stages in a Promtail [pipeline](../promtail/pipelines/):
Incoming logs can have seven special labels assigned to them which can be used in [relabeling](../promtail/configuration/#relabel_config) or later stages in a Promtail [pipeline](../promtail/pipelines/):

- `__aws_log_type`: Where this log came from (Cloudwatch or S3).
- `__aws_log_type`: Where this log came from (Cloudwatch, Kinesis or S3).
- `__aws_cloudwatch_log_group`: The associated Cloudwatch Log Group for this log.
- `__aws_cloudwatch_log_stream`: The associated Cloudwatch Log Stream for this log (if `KEEP_STREAM=true`).
- `__aws_cloudwatch_owner`: The AWS ID of the owner of this event.
- `__aws_kinesis_event_source_arn`: The Kinesis event source ARN.
- `__aws_s3_log_lb`: The name of the loadbalancer.
- `__aws_s3_log_lb_owner`: The Account ID of the loadbalancer owner.

Expand All @@ -109,7 +114,7 @@ Incoming logs can have six special labels assigned to them which can be used in

Note: This section is relevant if running Promtail between lambda-promtail and the end Loki deployment and was used to circumvent `out of order` problems prior to the v2.4 Loki release which removed the ordering constraint.

As stated earlier, this workflow moves the worst case stream cardinality from `number_of_log_streams` -> `number_of_log_groups` * `number_of_promtails`. For this reason, each Promtail must have a unique label attached to logs it processes (ideally via something like `--client.external-labels=promtail=${HOSTNAME}`) and it's advised to run a small number of Promtails behind a load balancer according to your throughput and redundancy needs.
As stated earlier, this workflow moves the worst case stream cardinality from `number_of_log_streams` -> `number_of_log_groups` * `number_of_promtails`. For this reason, each Promtail must have a unique label attached to logs it processes (ideally via something like `--client.external-labels=promtail=${HOSTNAME}`) and it's advised to run a small number of Promtails behind a load balancer according to your throughput and redundancy needs.

This trade-off is very effective when you have a large number of log streams but want to aggregate them by the log group. This is very common in AWS Lambda, where log groups are the "application" and log streams are the individual application containers which are spun up and down at a whim, possibly just for a single function invocation.

Expand Down
49 changes: 49 additions & 0 deletions tools/lambda-promtail/lambda-promtail/kinesis.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package main

import (
"context"
"time"

"github.com/aws/aws-lambda-go/events"
"github.com/grafana/loki/pkg/logproto"
"github.com/prometheus/common/model"
)

func parseKinesisEvent(ctx context.Context, b batchIf, ev *events.KinesisEvent) error {
if ev == nil {
return nil
}

for _, record := range ev.Records {
timestamp := time.UnixMilli(record.Kinesis.ApproximateArrivalTimestamp.Unix())

labels := model.LabelSet{
model.LabelName("__aws_log_type"): model.LabelValue("kinesis"),
model.LabelName("__aws_kinesis_event_source_arn"): model.LabelValue(record.EventSourceArn),
}

labels = applyExtraLabels(labels)

b.add(ctx, entry{labels, logproto.Entry{
Line: string(record.Kinesis.Data),
Timestamp: timestamp,
}})
}

return nil
}

func processKinesisEvent(ctx context.Context, ev *events.KinesisEvent) error {
batch, _ := newBatch(ctx)

err := parseKinesisEvent(ctx, batch, ev)
if err != nil {
return err
}

err = sendToPromtail(ctx, batch)
if err != nil {
return err
}
return nil
}
67 changes: 67 additions & 0 deletions tools/lambda-promtail/lambda-promtail/kinesis_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package main

import (
"context"
"encoding/json"
"io/ioutil"
"testing"

"github.com/aws/aws-lambda-go/events"
"github.com/grafana/loki/pkg/logproto"
"github.com/stretchr/testify/require"
)

type MockBatch struct {
streams map[string]*logproto.Stream
size int
}

func (b *MockBatch) add(ctx context.Context, e entry) error {
b.streams[e.labels.String()] = &logproto.Stream{
Labels: e.labels.String(),
}
return nil
}

func (b *MockBatch) flushBatch(ctx context.Context) error {
return nil
}
func (b *MockBatch) encode() ([]byte, int, error) {
return nil, 0, nil
}
func (b *MockBatch) createPushRequest() (*logproto.PushRequest, int) {
return nil, 0
}

func ReadJSONFromFile(t *testing.T, inputFile string) []byte {
inputJSON, err := ioutil.ReadFile(inputFile)
if err != nil {
t.Errorf("could not open test file. details: %v", err)
}

return inputJSON
}

func TestLambdaPromtail_KinesisParseEvents(t *testing.T) {
inputJson, err := ioutil.ReadFile("../testdata/kinesis-event.json")

if err != nil {
t.Errorf("could not open test file. details: %v", err)
}

var testEvent events.KinesisEvent
if err := json.Unmarshal(inputJson, &testEvent); err != nil {
t.Errorf("could not unmarshal event. details: %v", err)
}

ctx := context.TODO()
b := &MockBatch{
streams: map[string]*logproto.Stream{},
}

err = parseKinesisEvent(ctx, b, &testEvent)
require.Nil(t, err)

labels_str := "{__aws_kinesis_event_source_arn=\"arn:aws:kinesis:us-east-1:123456789012:stream/simple-stream\", __aws_log_type=\"kinesis\"}"
require.Contains(t, b.streams, labels_str)
}
5 changes: 4 additions & 1 deletion tools/lambda-promtail/lambda-promtail/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,9 @@ func applyExtraLabels(labels model.LabelSet) model.LabelSet {
func checkEventType(ev map[string]interface{}) (interface{}, error) {
var s3Event events.S3Event
var cwEvent events.CloudwatchLogsEvent
var kinesisEvent events.KinesisEvent

types := [...]interface{}{&s3Event, &cwEvent}
types := [...]interface{}{&s3Event, &cwEvent, &kinesisEvent}

j, _ := json.Marshal(ev)
reader := strings.NewReader(string(j))
Expand Down Expand Up @@ -142,6 +143,8 @@ func handler(ctx context.Context, ev map[string]interface{}) error {
return processS3Event(ctx, evt)
case *events.CloudwatchLogsEvent:
return processCWEvent(ctx, evt)
case *events.KinesisEvent:
return processKinesisEvent(ctx, evt)
}

return err
Expand Down
7 changes: 7 additions & 0 deletions tools/lambda-promtail/lambda-promtail/promtail.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,13 @@ type batch struct {
size int
}

type batchIf interface {
add(ctx context.Context, e entry) error
encode() ([]byte, int, error)
createPushRequest() (*logproto.PushRequest, int)
flushBatch(ctx context.Context) error
}

func newBatch(ctx context.Context, entries ...entry) (*batch, error) {
b := &batch{
streams: map[string]*logproto.Stream{},
Expand Down
36 changes: 36 additions & 0 deletions tools/lambda-promtail/testdata/kinesis-event.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
{
"Records": [
{
"kinesis": {
"kinesisSchemaVersion": "1.0",
"partitionKey": "s1",
"sequenceNumber": "49568167373333333333333333333333333333333333333333333333",
"data": "SGVsbG8gV29ybGQ=",
"approximateArrivalTimestamp": 1480641523.477
},
"eventSource": "aws:kinesis",
"eventVersion": "1.0",
"eventID": "shardId-000000000000:49568167373333333333333333333333333333333333333333333333",
"eventName": "aws:kinesis:record",
"invokeIdentityArn": "arn:aws:iam::123456789012:role/LambdaRole",
"awsRegion": "us-east-1",
"eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/simple-stream"
},
{
"kinesis": {
"kinesisSchemaVersion": "1.0",
"partitionKey": "s1",
"sequenceNumber": "49568167373333333334444444444444444444444444444444444444",
"data": "SGVsbG8gV29ybGQ=",
"approximateArrivalTimestamp": 1480841523.477
},
"eventSource": "aws:kinesis",
"eventVersion": "1.0",
"eventID": "shardId-000000000000:49568167373333333334444444444444444444444444444444444444",
"eventName": "aws:kinesis:record",
"invokeIdentityArn": "arn:aws:iam::123456789012:role/LambdaRole",
"awsRegion": "us-east-1",
"eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/simple-stream"
}
]
}

0 comments on commit 073c620

Please sign in to comment.