Skip to content

Commit

Permalink
Merge pull request #385 from ianpyw/master
Browse files Browse the repository at this point in the history
Add bootstarpServers and headers to Kafka event
  • Loading branch information
keshayad authored Jul 29, 2021
2 parents ee817dd + 151e061 commit 85e7bc7
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 13 deletions.
8 changes: 5 additions & 3 deletions events/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@
package events

type KafkaEvent struct {
EventSource string `json:"eventSource"`
EventSourceARN string `json:"eventSourceArn"`
Records map[string][]KafkaRecord `json:"records"`
EventSource string `json:"eventSource"`
EventSourceARN string `json:"eventSourceArn"`
Records map[string][]KafkaRecord `json:"records"`
BootstrapServers string `json:"bootstrapServers"`
}

type KafkaRecord struct {
Expand All @@ -16,4 +17,5 @@ type KafkaRecord struct {
TimestampType string `json:"timestampType"`
Key string `json:"key,omitempty"`
Value string `json:"value,omitempty"`
Headers []map[string][]byte `json:"headers"`
}
22 changes: 13 additions & 9 deletions events/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,25 @@ func TestKafkaEventMarshaling(t *testing.T) {
t.Errorf("could not unmarshal event. details: %v", err)
}

assert.Equal(t, inputEvent.BootstrapServers, "b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092")
assert.Equal(t, inputEvent.EventSource, "aws:kafka")
assert.Equal(t, inputEvent.EventSourceARN, "arn:aws:kafka:us-west-2:012345678901:cluster/ExampleMSKCluster/e9f754c6-d29a-4430-a7db-958a19fd2c54-4")
for _, records := range inputEvent.Records {
for _, record := range records {
utc := record.Timestamp.UTC()
assert.Equal(t, 2020, utc.Year())
assert.Equal(t, record.Key, "OGQ1NTk2YjQtMTgxMy00MjM4LWIyNGItNmRhZDhlM2QxYzBj")
assert.Equal(t, record.Value, "OGQ1NTk2YjQtMTgxMy00MjM4LWIyNGItNmRhZDhlM2QxYzBj")

for _, header := range record.Headers {
for key, value := range header {
assert.Equal(t, key, "headerKey")
var headerValue string = string(value)
assert.Equal(t, headerValue, "headerValue")
}
}
}
}

// 3. serialize to JSON
outputJson, err := json.Marshal(inputEvent)
if err != nil {
t.Errorf("could not marshal event. details: %v", err)
}

// 4. check result
assert.JSONEq(t, string(inputJson), string(outputJson))
}

func TestKafkaMarshalingMalformedJson(t *testing.T) {
Expand Down
20 changes: 19 additions & 1 deletion events/testdata/kafka-event.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{
"eventSource": "aws:kafka",
"eventSourceArn": "arn:aws:kafka:us-west-2:012345678901:cluster/ExampleMSKCluster/e9f754c6-d29a-4430-a7db-958a19fd2c54-4",
"bootstrapServers": "b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092",
"records": {
"AWSKafkaTopic-0": [
{
Expand All @@ -10,7 +11,24 @@
"timestamp": 1595035749700,
"timestampType": "CREATE_TIME",
"key": "OGQ1NTk2YjQtMTgxMy00MjM4LWIyNGItNmRhZDhlM2QxYzBj",
"value": "OGQ1NTk2YjQtMTgxMy00MjM4LWIyNGItNmRhZDhlM2QxYzBj"
"value": "OGQ1NTk2YjQtMTgxMy00MjM4LWIyNGItNmRhZDhlM2QxYzBj",
"headers": [
{
"headerKey": [
104,
101,
97,
100,
101,
114,
86,
97,
108,
117,
101
]
}
]
}
]
}
Expand Down

0 comments on commit 85e7bc7

Please sign in to comment.