-
Notifications
You must be signed in to change notification settings - Fork 8
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
AlchemiscalClient async+bulk for results, other methods; add request, response compression for large objects #150
Conversation
Need to implement the `set_tasks_status` async/await version next
Using/abusing an async lock for this.
We now hit the bulk API endpoint with async using batches. Get 45s locally for 10,000 tasks.
We're seeing what looks like weird performance issues using `httpx` for synchronous requests vs. `requests`. Sticking with `requests` for synchronous, `httpx` for async for now.
For consistency.
Codecov ReportPatch coverage:
Additional details and impacted files@@ Coverage Diff @@
## main #150 +/- ##
==========================================
- Coverage 83.67% 82.30% -1.38%
==========================================
Files 21 21
Lines 2426 2656 +230
==========================================
+ Hits 2030 2186 +156
- Misses 396 470 +74
☔ View full report in Codecov by Sentry. |
…alls Now by default compress retrievals of AlchemicalNetwork, Transformation, and ChemicalSystem. Also compress retrieval of ProtocolDAGResults.
@hmacdope almost done with this one! Could I get a review from you when you get the chance? |
Also, added `rich`-based progress bar to result retrieval.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great work! Few queries, see comments. :)
token: TokenData = Depends(get_token_data_depends), | ||
) -> List[Union[str, None]]: | ||
status = TaskStatusEnum(status) | ||
if status not in ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be if status in ...
? I could be missing something but I thought we didn't want to mutate state of waiting, invalid or deleted
tasks, same with the HTTPException
below, seems to suggest that status
can be changed from terminal state invalid, deleted
etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here status
isn't the current status of the Task
s we want to set; it's the desired status
. waiting
, invalid
, and deleted
are all set-able by the user, at least under most conditions (e.g. going from 'complete' to 'waiting' isn't allowed by the underlying Neo4jStore
method).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah I understand, sorry about that.
alchemiscale/interface/api.py
Outdated
except HTTPException: | ||
tasks_updated.append(None) | ||
else: | ||
tasks_updated.extend(n4js.set_task_status([task_sk], status)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still uses one at a time set unlike /bulk/tasks/status/get
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct; we can optimize this further, but I think I'm running out of time on this one. Since we have more complex queries to deal with for status setting, I'd like to make that a future PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perfect! raise an issue, and happy to move on.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I may have managed to get this one in. 😁
alchemiscale/interface/client.py
Outdated
|
||
def get_tasks_status( | ||
self, tasks: Union[ScopedKey, List[ScopedKey]] | ||
self, tasks: List[ScopedKey], batch_size=1000 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Love the batching ❤️
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🙏
@@ -201,13 +329,18 @@ def _query_resource(self, resource, params=None): | |||
|
|||
@_retry | |||
@_use_token | |||
def _get_resource(self, resource, params=None): | |||
def _get_resource(self, resource, params=None, compress=False): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did we want to compress by default here and below? Up to you. It seemed set to default to True
on a lot of the interface/client.py
methods.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My thinking here is to keep compression on these private methods opt-in. Not all post
and get
calls will benefit from compression, especially for tiny requests/responses, so making it something we enable as the default on specific user-facing methods made the most sense to me.
|
||
return resp.json() | ||
|
||
@staticmethod |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very cool.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Haha thank the standard lib: https://docs.python.org/3/library/itertools.html
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think Python 3.12 will have a itertools.batched
we can just switch to.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great work! Few queries, see comments. :)
Also made set_tasks_status work as async/batch, same as get_tasks_status
Also, add scope-based ordering to query outputs.
We use the same patterns we applied for `get_tasks_status`.
This PR adds async/await methods to the
AlchemiscaleBaseClient
, as well as usage of these methods to theAlchemiscaleClient
for use by users.It also establishes the pattern for
/bulk
endpoints on API services, which are not strictly RESTful but do allow for much greater performance when requesting operations on manyScopedKey
s in a single call.This PR adds performance improvements using the above to:
AlchemiscaleClient.get_tasks_status
AlchemiscaleClient.set_tasks_status
AlchemiscaleClient.get_transformation_results
AlchemiscaleClient.get_transformation_failures
AlchemiscaleClient.get_task_results
AlchemiscaleClient.get_task_failures
This PR also adds use of
gzip
compression for large requests and responses between theAlchemiscaleBaseClient
and the API services. For theAlchemiscaleClient
, this optimization is by default applied to:AlchemiscaleClient.create_network
AlchemiscaleClient.get_network
AlchemiscaleClient.get_transformation
AlchemiscaleClient.get_chemicalsystem
AlchemiscaleClient.get_transformation_results
AlchemiscaleClient.get_transformation_failures
AlchemiscaleClient.get_task_results
AlchemiscaleClient.get_task_failures