diff --git a/lib/chainsync/chainsync/analysis/data_to_analysis.py b/lib/chainsync/chainsync/analysis/data_to_analysis.py index 1f427a11c..9dc97ac22 100644 --- a/lib/chainsync/chainsync/analysis/data_to_analysis.py +++ b/lib/chainsync/chainsync/analysis/data_to_analysis.py @@ -28,6 +28,8 @@ pd.set_option("display.max_columns", None) +MAX_BATCH_SIZE = 10000 + def _df_to_db(insert_df: pd.DataFrame, schema_obj: Type[Base], session: Session): """Helper function to add a dataframe to a database""" @@ -43,6 +45,7 @@ def _df_to_db(insert_df: pd.DataFrame, schema_obj: Type[Base], session: Session) method="multi", index=False, dtype=dtype, # type: ignore + chunksize=MAX_BATCH_SIZE, ) # commit the transaction try: @@ -89,28 +92,29 @@ def calc_current_wallet(wallet_deltas_df: pd.DataFrame, latest_wallet: pd.DataFr # Drop unnecessary columns to match schema wallet_deltas_df = wallet_deltas_df.drop(["id", "transactionHash", "delta"], axis=1) + # There's a chance multiple wallet deltas can happen from the same address at the same block + # Hence, we do a groupby here + wallet_deltas_df = wallet_deltas_df.groupby(["walletAddress", "tokenType", "blockNumber"]).agg( + { + "baseTokenType": "first", + "maturityTime": "first", + "value": "sum", + } + ) + # If there was a initial wallet, add deltas to initial wallet to calculate current positions if len(latest_wallet) > 0: - # There's a chance multiple wallet deltas can happen from the same address at the same block - # Hence, we do a groupby here - wallet_deltas_df = wallet_deltas_df.groupby(["walletAddress", "tokenType", "blockNumber"]).agg( - { - "baseTokenType": "first", - "maturityTime": "first", - "value": "sum", - } - ) latest_wallet = latest_wallet.set_index(["walletAddress", "tokenType"]) # Add the latest wallet to each wallet delta position to calculate most current positions # We broadcast latest wallet across all blockNumbers. If a position does not exist in latest_wallet, # it will treat it as 0 (based on fill_value) wallet_deltas_df["value"] = wallet_deltas_df["value"].add(latest_wallet["value"], fill_value=0) - wallet_deltas_df = wallet_deltas_df.reset_index() - # In the case where latest_wallet has positions not in wallet_deltas, we can ignore them # since if they're not in wallet_deltas, there's no change in positions + wallet_deltas_df = wallet_deltas_df.reset_index() + # Need to keep zero positions in the db since a delta could have made the current wallet 0 # We can filter zero positions after the query of current positions return wallet_deltas_df @@ -136,6 +140,7 @@ def data_to_analysis( hyperdrive_contract: Contract, ) -> None: """Function to query postgres data tables and insert to analysis tables""" + # Get data pool_info = get_pool_info(db_session, start_block, end_block, coerce_float=False)