Skip to content

Commit

Permalink
DynamoDB CDC: Fix MODIFY operation by propagating NewImage fully
Browse files Browse the repository at this point in the history
  • Loading branch information
amotl committed Sep 25, 2024
1 parent 4c510c5 commit 7799ee7
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 58 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Changelog

## Unreleased
- DynamoDB CDC: Fix `MODIFY` operation by propagating `NewImage` fully

## 2024/09/25 v0.0.18
- MongoDB: Improved `MongoDBCrateDBConverter.decode_canonical` to also
Expand Down
39 changes: 6 additions & 33 deletions src/commons_codec/transform/dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
from commons_codec.model import (
DualRecord,
SQLOperation,
SQLParameterizedSetClause,
SQLParameterizedWhereClause,
)
from commons_codec.util.data import TaggableList
Expand Down Expand Up @@ -182,11 +181,14 @@ def to_sql(self, event: t.Dict[str, t.Any]) -> SQLOperation:
del new_image[key]

dual_record = self.decode_record(event["dynamodb"]["NewImage"])
set_clause = self.update_clause(dual_record)

where_clause = self.keys_to_where(event["dynamodb"]["Keys"])
sql = f"UPDATE {self.table_name} SET {set_clause.to_sql()} WHERE {where_clause.to_sql()};"
parameters = set_clause.values # noqa: PD011
sql = (
f"UPDATE {self.table_name} "
f"SET {self.TYPED_COLUMN}=:typed, {self.UNTYPED_COLUMN}=:untyped "
f"WHERE {where_clause.to_sql()};"
)
parameters = {"typed": dual_record.typed, "untyped": dual_record.untyped}
parameters.update(where_clause.values)

elif event_name == "REMOVE":
Expand All @@ -199,35 +201,6 @@ def to_sql(self, event: t.Dict[str, t.Any]) -> SQLOperation:

return SQLOperation(sql, parameters)

def update_clause(self, dual_record: DualRecord) -> SQLParameterizedSetClause:
"""
Serializes an image to a comma-separated list of column/values pairs
that can be used in the `SET` clause of an `UPDATE` statement.
IN:
{'humidity': {'N': '84.84'}, 'temperature': {'N': '55.66'}}
OUT:
data['humidity'] = '84.84', data['temperature'] = '55.66'
"""

clause = SQLParameterizedSetClause()
self.record_to_set_clause(dual_record.typed, self.TYPED_COLUMN, clause)
self.record_to_set_clause(dual_record.untyped, self.UNTYPED_COLUMN, clause)
return clause

@staticmethod
def record_to_set_clause(record: t.Dict[str, t.Any], container_column: str, clause: SQLParameterizedSetClause):
for column, value in record.items():
rval = None
if isinstance(value, dict):
rval = f"CAST(:{column} AS OBJECT)"

elif isinstance(value, list) and value and isinstance(value[0], dict):
rval = f"CAST(:{column} AS OBJECT[])"

clause.add(lval=f"{container_column}['{column}']", name=column, value=value, rval=rval)

def keys_to_where(self, keys: t.Dict[str, t.Dict[str, str]]) -> SQLParameterizedWhereClause:
"""
Serialize CDC event's "Keys" representation to an SQL `WHERE` clause in CrateDB SQL syntax.
Expand Down
49 changes: 24 additions & 25 deletions tests/transform/test_dynamodb_cdc.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

pytestmark = pytest.mark.dynamodb


READING_BASIC = {"device": "foo", "temperature": 42.42, "humidity": 84.84}

MSG_UNKNOWN_SOURCE = {
Expand Down Expand Up @@ -240,44 +239,44 @@ def test_decode_cdc_insert_nested():

def test_decode_cdc_modify_basic():
assert DynamoDBCDCTranslator(table_name="foo").to_sql(MSG_MODIFY_BASIC) == SQLOperation(
statement="UPDATE foo SET "
"data['humidity']=:humidity, data['temperature']=:temperature, data['location']=:location, "
"data['string_set']=:string_set, data['number_set']=:number_set, data['binary_set']=:binary_set, "
"data['empty_string']=:empty_string, data['null_string']=:null_string "
statement="UPDATE foo SET data=:typed, aux=:untyped "
"WHERE data['device']=:device AND data['timestamp']=:timestamp;",
parameters={
"device": "foo",
"timestamp": "2024-07-12T01:17:42",
"humidity": 84.84,
"temperature": 55.66,
"location": "Sydney",
"string_set": ["location_1"],
"number_set": [0.34, 1.0, 2.0, 3.0],
"binary_set": ["U3Vubnk="],
"empty_string": "",
"null_string": None,
"typed": {
"humidity": 84.84,
"temperature": 55.66,
"location": "Sydney",
"string_set": ["location_1"],
"number_set": [0.34, 1.0, 2.0, 3.0],
"binary_set": ["U3Vubnk="],
"empty_string": "",
"null_string": None,
},
"untyped": {},
},
)


def test_decode_cdc_modify_nested():
assert DynamoDBCDCTranslator(table_name="foo").to_sql(MSG_MODIFY_NESTED) == SQLOperation(
statement="UPDATE foo SET "
"data['tags']=:tags, data['empty_map']=CAST(:empty_map AS OBJECT), data['empty_list']=:empty_list, "
"data['string_set']=:string_set, data['number_set']=:number_set, data['binary_set']=:binary_set, "
"data['somemap']=CAST(:somemap AS OBJECT), data['list_of_objects']=CAST(:list_of_objects AS OBJECT[]) "
statement="UPDATE foo SET data=:typed, aux=:untyped "
"WHERE data['device']=:device AND data['timestamp']=:timestamp;",
parameters={
"device": "foo",
"timestamp": "2024-07-12T01:17:42",
"tags": ["foo", "bar"],
"empty_map": {},
"empty_list": [],
"string_set": ["location_1"],
"number_set": [0.34, 1.0, 2.0, 3.0],
"binary_set": ["U3Vubnk="],
"somemap": {"test": 1.0, "test2": 2.0},
"list_of_objects": [{"foo": "bar"}, {"baz": "qux"}],
"typed": {
"tags": ["foo", "bar"],
"empty_map": {},
"empty_list": [],
"string_set": ["location_1"],
"number_set": [0.34, 1.0, 2.0, 3.0],
"binary_set": ["U3Vubnk="],
"somemap": {"test": 1.0, "test2": 2.0},
"list_of_objects": [{"foo": "bar"}, {"baz": "qux"}],
},
"untyped": {},
},
)

Expand Down
29 changes: 29 additions & 0 deletions tests/transform/test_model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from commons_codec.model import SQLParameterizedClause


def test_parameterized_clause_rval_set():
clause = SQLParameterizedClause()

container_column = "data"
column = "foo"
value = "bar"
rval = f"CAST(:{column} AS OBJECT)"

clause.add(lval=f"{container_column}['{column}']", name=column, value=value, rval=rval)

assert clause == SQLParameterizedClause(
lvals=["data['foo']"], rvals=["CAST(:foo AS OBJECT)"], values={"foo": "bar"}
)


def test_parameterized_clause_rval_unset():
clause = SQLParameterizedClause()

container_column = "data"
column = "foo"
value = "bar"
rval = None

clause.add(lval=f"{container_column}['{column}']", name=column, value=value, rval=rval)

assert clause == SQLParameterizedClause(lvals=["data['foo']"], rvals=[":foo"], values={"foo": "bar"})

0 comments on commit 7799ee7

Please sign in to comment.