Skip to content

Commit

Permalink
Reworked inbound queue
Browse files Browse the repository at this point in the history
  • Loading branch information
eandersson committed Sep 28, 2024
1 parent 3846499 commit 58411b9
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 10 deletions.
18 changes: 10 additions & 8 deletions amqpstorm/channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ def open(self):
:return:
"""
self._inbound = collections.deque()
self._inbound.clear()
self._exceptions = []
self._confirming_deliveries = False
self.set_state(self.OPENING)
Expand Down Expand Up @@ -445,11 +445,14 @@ def _build_message(self, auto_decode, message_impl):
"""
if len(self._inbound) < 2:
return None
headers = self._build_message_headers()
if not headers:
try:
headers = self._build_message_headers()
if not headers:
return None
basic_deliver, content_header = headers
body = self._build_message_body(content_header.body_size)
except IndexError:
return None
basic_deliver, content_header = headers
body = self._build_message_body(content_header.body_size)

message = message_impl(channel=self,
body=body,
Expand All @@ -464,15 +467,15 @@ def _build_message_headers(self):
:rtype: tuple,None
"""
basic_deliver = self._inbound.popleft()
content_header = self._inbound.popleft()
if not isinstance(basic_deliver, specification.Basic.Deliver):
LOGGER.warning(
'Received an out-of-order frame: %s was '
'expecting a Basic.Deliver frame',
type(basic_deliver)
)
return None
content_header = self._inbound.popleft()
if not isinstance(content_header, ContentHeader):
elif not isinstance(content_header, ContentHeader):
LOGGER.warning(
'Received an out-of-order frame: %s was '
'expecting a ContentHeader frame',
Expand Down Expand Up @@ -514,7 +517,6 @@ def _close_channel(self, frame_in):
self.remove_consumer_tag()
if self._inbound:
self._inbound.clear()
self._inbound = None
self.exceptions.append(AMQPChannelError(
'Channel %d was closed by remote server: %s' %
(
Expand Down
2 changes: 1 addition & 1 deletion amqpstorm/tests/unit/channel/test_channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ def test_channel_close_channel(self):
# Close Channel.
channel._close_channel(close_frame)

self.assertIsNone(channel._inbound)
self.assertFalse(channel._inbound)
self.assertEqual(channel._consumer_tags, [])
self.assertEqual(channel._state, channel.CLOSED)

Expand Down
2 changes: 1 addition & 1 deletion amqpstorm/tests/unit/channel/test_channel_exception.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ def test_channel_raise_with_close_reply_code_500(self):
)
channel._close_channel(close_frame)

self.assertIsNone(channel._inbound)
self.assertFalse(channel._inbound)
self.assertEqual(channel._consumer_tags, [])
self.assertEqual(channel._state, channel.CLOSED)

Expand Down

0 comments on commit 58411b9

Please sign in to comment.