Skip to content

Commit

Permalink
Add BulkResponse wrapper for improved decoding of HTTP bulk responses
Browse files Browse the repository at this point in the history
CrateDB HTTP bulk responses include `rowcount=` items, either signalling
if a bulk operation succeeded or failed.

- success means `rowcount=1`
- failure means `rowcount=-2`

https://cratedb.com/docs/crate/reference/en/latest/interfaces/http.html#error-handling
  • Loading branch information
amotl committed Oct 3, 2024
1 parent 7cb2c68 commit 50347d4
Show file tree
Hide file tree
Showing 5 changed files with 426 additions and 284 deletions.
2 changes: 2 additions & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ Unreleased
"Threads may share the module, but not connections."
- Added ``error_trace`` to string representation of an Error to relay
server stacktraces into exception messages.
- Added ``BulkResponse`` wrapper for improved decoding of CrateDB HTTP bulk
responses including ``rowcount=`` items.

.. _Migrate from crate.client to sqlalchemy-cratedb: https://cratedb.com/docs/sqlalchemy-cratedb/migrate-from-crate-client.html
.. _sqlalchemy-cratedb: https://pypi.org/project/sqlalchemy-cratedb/
Expand Down
66 changes: 66 additions & 0 deletions src/crate/client/result.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import typing as t
from functools import cached_property


class BulkResultItem(t.TypedDict):
"""
Define the shape of a CrateDB bulk request response item.
"""

rowcount: int


class BulkResponse:
"""
Manage CrateDB bulk request responses.
Accepts a list of bulk arguments (parameter list) and a list of bulk response items.
https://cratedb.com/docs/crate/reference/en/latest/interfaces/http.html#bulk-operations
"""

def __init__(
self,
records: t.Union[t.Iterable[t.Dict[str, t.Any]], None],
results: t.Union[t.Iterable[BulkResultItem], None]):
self.records = records
self.results = results

@cached_property
def failed_records(self) -> t.List[t.Dict[str, t.Any]]:
"""
Compute list of failed records.
CrateDB signals failed inserts using `rowcount=-2`.
https://cratedb.com/docs/crate/reference/en/latest/interfaces/http.html#error-handling
"""
if self.records is None or self.results is None:
return []
errors: t.List[t.Dict[str, t.Any]] = []
for record, status in zip(self.records, self.results):
if status["rowcount"] == -2:
errors.append(record)
return errors

@cached_property
def record_count(self) -> int:
"""
Compute bulk size / length of parameter list.
"""
if not self.records:
return 0
return len(self.records)

@cached_property
def success_count(self) -> int:
"""
Compute number of succeeding records within a batch.
"""
return self.record_count - self.failed_count

@cached_property
def failed_count(self) -> int:
"""
Compute number of failed records within a batch.
"""
return len(self.failed_records)
69 changes: 69 additions & 0 deletions src/crate/client/test_result.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import sys
import unittest

from crate import client
from crate.client.exceptions import ProgrammingError
from crate.client.test_support import setUpCrateLayerBaseline, tearDownDropEntitiesBaseline
from crate.testing.settings import crate_host


class BulkOperationTest(unittest.TestCase):

def setUp(self):
setUpCrateLayerBaseline(self)

Check warning on line 13 in src/crate/client/test_result.py

View check run for this annotation

Codecov / codecov/patch

src/crate/client/test_result.py#L13

Added line #L13 was not covered by tests

def tearDown(self):
tearDownDropEntitiesBaseline(self)

Check warning on line 16 in src/crate/client/test_result.py

View check run for this annotation

Codecov / codecov/patch

src/crate/client/test_result.py#L16

Added line #L16 was not covered by tests

@unittest.skipIf(sys.version_info < (3, 8), "BulkResponse needs Python 3.8 or higher")
def test_executemany_with_bulk_response_partial(self):

# Import at runtime is on purpose, to permit skipping the test case.
from crate.client.result import BulkResponse

Check warning on line 22 in src/crate/client/test_result.py

View check run for this annotation

Codecov / codecov/patch

src/crate/client/test_result.py#L22

Added line #L22 was not covered by tests

connection = client.connect(crate_host)
cursor = connection.cursor()

Check warning on line 25 in src/crate/client/test_result.py

View check run for this annotation

Codecov / codecov/patch

src/crate/client/test_result.py#L24-L25

Added lines #L24 - L25 were not covered by tests

# Run SQL DDL.
cursor.execute("CREATE TABLE foobar (id INTEGER PRIMARY KEY, name STRING);")

Check warning on line 28 in src/crate/client/test_result.py

View check run for this annotation

Codecov / codecov/patch

src/crate/client/test_result.py#L28

Added line #L28 was not covered by tests

# Run a batch insert that only partially succeeds.
invalid_records = [(1, "Hotzenplotz 1"), (1, "Hotzenplotz 2")]
result = cursor.executemany("INSERT INTO foobar (id, name) VALUES (?, ?)", invalid_records)

Check warning on line 32 in src/crate/client/test_result.py

View check run for this annotation

Codecov / codecov/patch

src/crate/client/test_result.py#L31-L32

Added lines #L31 - L32 were not covered by tests

# Verify CrateDB response.
self.assertEqual(result, [{"rowcount": 1}, {"rowcount": -2}])

Check warning on line 35 in src/crate/client/test_result.py

View check run for this annotation

Codecov / codecov/patch

src/crate/client/test_result.py#L35

Added line #L35 was not covered by tests

# Verify decoded response.
bulk_response = BulkResponse(invalid_records, result)
self.assertEqual(bulk_response.failed_records, [(1, "Hotzenplotz 2")])
self.assertEqual(bulk_response.record_count, 2)
self.assertEqual(bulk_response.success_count, 1)
self.assertEqual(bulk_response.failed_count, 1)

Check warning on line 42 in src/crate/client/test_result.py

View check run for this annotation

Codecov / codecov/patch

src/crate/client/test_result.py#L38-L42

Added lines #L38 - L42 were not covered by tests

cursor.execute("REFRESH TABLE foobar;")
cursor.execute("SELECT * FROM foobar;")
result = cursor.fetchall()
self.assertEqual(result, [[1, "Hotzenplotz 1"]])

Check warning on line 47 in src/crate/client/test_result.py

View check run for this annotation

Codecov / codecov/patch

src/crate/client/test_result.py#L44-L47

Added lines #L44 - L47 were not covered by tests

cursor.close()
connection.close()

Check warning on line 50 in src/crate/client/test_result.py

View check run for this annotation

Codecov / codecov/patch

src/crate/client/test_result.py#L49-L50

Added lines #L49 - L50 were not covered by tests

@unittest.skipIf(sys.version_info < (3, 8), "BulkResponse needs Python 3.8 or higher")
def test_executemany_empty(self):

connection = client.connect(crate_host)
cursor = connection.cursor()

Check warning on line 56 in src/crate/client/test_result.py

View check run for this annotation

Codecov / codecov/patch

src/crate/client/test_result.py#L55-L56

Added lines #L55 - L56 were not covered by tests

# Run SQL DDL.
cursor.execute("CREATE TABLE foobar (id INTEGER PRIMARY KEY, name STRING);")

Check warning on line 59 in src/crate/client/test_result.py

View check run for this annotation

Codecov / codecov/patch

src/crate/client/test_result.py#L59

Added line #L59 was not covered by tests

# Run a batch insert that is empty.
with self.assertRaises(ProgrammingError) as cm:
cursor.executemany("INSERT INTO foobar (id, name) VALUES (?, ?)", [])
self.assertEqual(

Check warning on line 64 in src/crate/client/test_result.py

View check run for this annotation

Codecov / codecov/patch

src/crate/client/test_result.py#L62-L64

Added lines #L62 - L64 were not covered by tests
cm.exception.message,
"ArrayIndexOutOfBoundsException[Index 0 out of bounds for length 0]")

cursor.close()
connection.close()

Check warning on line 69 in src/crate/client/test_result.py

View check run for this annotation

Codecov / codecov/patch

src/crate/client/test_result.py#L68-L69

Added lines #L68 - L69 were not covered by tests
Loading

0 comments on commit 50347d4

Please sign in to comment.