From d938e7ec99e3871036d4819a120619790b6f53d6 Mon Sep 17 00:00:00 2001 From: Giuseppe Tribulato Date: Mon, 21 Mar 2022 10:12:12 +0100 Subject: [PATCH] Fix for changes in httpx 0.20.0: Call of `await client.wait()` may have caused an error (1 HTTP requests/responses were still in-flight) if called shortly before `client.close()` See discussion https://github.com/encode/httpx/discussions/2118 --- lightkube/core/client.py | 36 +++++++++++++++++++------------- lightkube/core/generic_client.py | 2 ++ 2 files changed, 23 insertions(+), 15 deletions(-) diff --git a/lightkube/core/client.py b/lightkube/core/client.py index a26b7dd..944e9fe 100644 --- a/lightkube/core/client.py +++ b/lightkube/core/client.py @@ -635,26 +635,32 @@ async def wait( for_conditions = list(for_conditions) raise_for_conditions = list(raise_for_conditions) - async for op, obj in self.watch(res, namespace=namespace, fields={'metadata.name': name}): - if obj.status is None: - continue + watch = self.watch(res, namespace=namespace, fields={'metadata.name': name}) + try: + async for op, obj in watch: - if op == "DELETED": - raise ObjectDeleted(full_name) + if obj.status is None: + continue - try: - status = obj.status.to_dict() - except AttributeError: - status = obj.status + if op == "DELETED": + raise ObjectDeleted(full_name) - conditions = [c for c in status.get('conditions', []) if c['status'] == 'True'] - if any(c['type'] in for_conditions for c in conditions): - return obj + try: + status = obj.status.to_dict() + except AttributeError: + status = obj.status - failures = [c for c in conditions if c['type'] in raise_for_conditions] + conditions = [c for c in status.get('conditions', []) if c['status'] == 'True'] + if any(c['type'] in for_conditions for c in conditions): + return obj - if failures: - raise ConditionError(full_name, [f.get('message', f['type']) for f in failures]) + failures = [c for c in conditions if c['type'] in raise_for_conditions] + + if failures: + raise ConditionError(full_name, [f.get('message', f['type']) for f in failures]) + finally: + # we ensure the async generator is closed before returning + await watch.aclose() @overload async def patch(self, res: Type[GlobalSubResource], name: str, diff --git a/lightkube/core/generic_client.py b/lightkube/core/generic_client.py index 8664db9..6d1c8f1 100644 --- a/lightkube/core/generic_client.py +++ b/lightkube/core/generic_client.py @@ -280,6 +280,8 @@ async def watch(self, br: BasicRequest, on_error: OnErrorHandler = on_error_rais if handle_error.sleep > 0: await asyncio.sleep(handle_error.sleep) continue + finally: + await resp.aclose() async def request(self, method, res: Type[r.Resource] = None, obj=None, name=None, namespace=None, watch: bool = False, headers: dict = None, params: dict = None) -> Any: