diff --git a/amqpstorm/channel.py b/amqpstorm/channel.py index 90688a3..d0c7362 100644 --- a/amqpstorm/channel.py +++ b/amqpstorm/channel.py @@ -297,7 +297,7 @@ def open(self): :return: """ - self._inbound = collections.deque() + self._inbound.clear() self._exceptions = [] self._confirming_deliveries = False self.set_state(self.OPENING) @@ -445,11 +445,14 @@ def _build_message(self, auto_decode, message_impl): """ if len(self._inbound) < 2: return None - headers = self._build_message_headers() - if not headers: + try: + headers = self._build_message_headers() + if not headers: + return None + basic_deliver, content_header = headers + body = self._build_message_body(content_header.body_size) + except IndexError: return None - basic_deliver, content_header = headers - body = self._build_message_body(content_header.body_size) message = message_impl(channel=self, body=body, @@ -464,6 +467,7 @@ def _build_message_headers(self): :rtype: tuple,None """ basic_deliver = self._inbound.popleft() + content_header = self._inbound.popleft() if not isinstance(basic_deliver, specification.Basic.Deliver): LOGGER.warning( 'Received an out-of-order frame: %s was ' @@ -471,8 +475,7 @@ def _build_message_headers(self): type(basic_deliver) ) return None - content_header = self._inbound.popleft() - if not isinstance(content_header, ContentHeader): + elif not isinstance(content_header, ContentHeader): LOGGER.warning( 'Received an out-of-order frame: %s was ' 'expecting a ContentHeader frame', @@ -514,7 +517,6 @@ def _close_channel(self, frame_in): self.remove_consumer_tag() if self._inbound: self._inbound.clear() - self._inbound = None self.exceptions.append(AMQPChannelError( 'Channel %d was closed by remote server: %s' % ( diff --git a/amqpstorm/tests/unit/channel/test_channel.py b/amqpstorm/tests/unit/channel/test_channel.py index b9f117f..a2d5d89 100644 --- a/amqpstorm/tests/unit/channel/test_channel.py +++ b/amqpstorm/tests/unit/channel/test_channel.py @@ -150,7 +150,7 @@ def test_channel_close_channel(self): # Close Channel. channel._close_channel(close_frame) - self.assertIsNone(channel._inbound) + self.assertFalse(channel._inbound) self.assertEqual(channel._consumer_tags, []) self.assertEqual(channel._state, channel.CLOSED) diff --git a/amqpstorm/tests/unit/channel/test_channel_exception.py b/amqpstorm/tests/unit/channel/test_channel_exception.py index afb70fc..81054d8 100644 --- a/amqpstorm/tests/unit/channel/test_channel_exception.py +++ b/amqpstorm/tests/unit/channel/test_channel_exception.py @@ -217,7 +217,7 @@ def test_channel_raise_with_close_reply_code_500(self): ) channel._close_channel(close_frame) - self.assertIsNone(channel._inbound) + self.assertFalse(channel._inbound) self.assertEqual(channel._consumer_tags, []) self.assertEqual(channel._state, channel.CLOSED)