Skip to content

Commit

Permalink
Fix for changes in httpx 0.20.0: Call of await client.wait() may ha…
Browse files Browse the repository at this point in the history
…ve caused an error (1 HTTP requests/responses were still in-flight) if called shortly before `client.close()`

See discussion encode/httpx#2118
  • Loading branch information
gtsystem committed Mar 25, 2022
1 parent 6f4d398 commit d938e7e
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 15 deletions.
36 changes: 21 additions & 15 deletions lightkube/core/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -635,26 +635,32 @@ async def wait(
for_conditions = list(for_conditions)
raise_for_conditions = list(raise_for_conditions)

async for op, obj in self.watch(res, namespace=namespace, fields={'metadata.name': name}):
if obj.status is None:
continue
watch = self.watch(res, namespace=namespace, fields={'metadata.name': name})
try:
async for op, obj in watch:

if op == "DELETED":
raise ObjectDeleted(full_name)
if obj.status is None:
continue

try:
status = obj.status.to_dict()
except AttributeError:
status = obj.status
if op == "DELETED":
raise ObjectDeleted(full_name)

conditions = [c for c in status.get('conditions', []) if c['status'] == 'True']
if any(c['type'] in for_conditions for c in conditions):
return obj
try:
status = obj.status.to_dict()
except AttributeError:
status = obj.status

failures = [c for c in conditions if c['type'] in raise_for_conditions]
conditions = [c for c in status.get('conditions', []) if c['status'] == 'True']
if any(c['type'] in for_conditions for c in conditions):
return obj

if failures:
raise ConditionError(full_name, [f.get('message', f['type']) for f in failures])
failures = [c for c in conditions if c['type'] in raise_for_conditions]

if failures:
raise ConditionError(full_name, [f.get('message', f['type']) for f in failures])
finally:
# we ensure the async generator is closed before returning
await watch.aclose()

@overload
async def patch(self, res: Type[GlobalSubResource], name: str,
Expand Down
2 changes: 2 additions & 0 deletions lightkube/core/generic_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,8 @@ async def watch(self, br: BasicRequest, on_error: OnErrorHandler = on_error_rais
if handle_error.sleep > 0:
await asyncio.sleep(handle_error.sleep)
continue
finally:
await resp.aclose()

async def request(self, method, res: Type[r.Resource] = None, obj=None, name=None, namespace=None,
watch: bool = False, headers: dict = None, params: dict = None) -> Any:
Expand Down

0 comments on commit d938e7e

Please sign in to comment.