Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Analysis hotfix #1043

Merged
merged 2 commits into from
Oct 24, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 16 additions & 11 deletions lib/chainsync/chainsync/analysis/data_to_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@

pd.set_option("display.max_columns", None)

MAX_BATCH_SIZE = 10000


def _df_to_db(insert_df: pd.DataFrame, schema_obj: Type[Base], session: Session):
"""Helper function to add a dataframe to a database"""
Expand All @@ -43,6 +45,7 @@ def _df_to_db(insert_df: pd.DataFrame, schema_obj: Type[Base], session: Session)
method="multi",
index=False,
dtype=dtype, # type: ignore
chunksize=MAX_BATCH_SIZE,
)
# commit the transaction
try:
Expand Down Expand Up @@ -89,28 +92,29 @@ def calc_current_wallet(wallet_deltas_df: pd.DataFrame, latest_wallet: pd.DataFr
# Drop unnecessary columns to match schema
wallet_deltas_df = wallet_deltas_df.drop(["id", "transactionHash", "delta"], axis=1)

# There's a chance multiple wallet deltas can happen from the same address at the same block
# Hence, we do a groupby here
wallet_deltas_df = wallet_deltas_df.groupby(["walletAddress", "tokenType", "blockNumber"]).agg(
{
"baseTokenType": "first",
"maturityTime": "first",
"value": "sum",
}
)

# If there was a initial wallet, add deltas to initial wallet to calculate current positions
if len(latest_wallet) > 0:
# There's a chance multiple wallet deltas can happen from the same address at the same block
# Hence, we do a groupby here
wallet_deltas_df = wallet_deltas_df.groupby(["walletAddress", "tokenType", "blockNumber"]).agg(
{
"baseTokenType": "first",
"maturityTime": "first",
"value": "sum",
}
)
latest_wallet = latest_wallet.set_index(["walletAddress", "tokenType"])

# Add the latest wallet to each wallet delta position to calculate most current positions
# We broadcast latest wallet across all blockNumbers. If a position does not exist in latest_wallet,
# it will treat it as 0 (based on fill_value)
wallet_deltas_df["value"] = wallet_deltas_df["value"].add(latest_wallet["value"], fill_value=0)
wallet_deltas_df = wallet_deltas_df.reset_index()

# In the case where latest_wallet has positions not in wallet_deltas, we can ignore them
# since if they're not in wallet_deltas, there's no change in positions

wallet_deltas_df = wallet_deltas_df.reset_index()

# Need to keep zero positions in the db since a delta could have made the current wallet 0
# We can filter zero positions after the query of current positions
return wallet_deltas_df
Expand All @@ -136,6 +140,7 @@ def data_to_analysis(
hyperdrive_contract: Contract,
) -> None:
"""Function to query postgres data tables and insert to analysis tables"""

# Get data
pool_info = get_pool_info(db_session, start_block, end_block, coerce_float=False)

Expand Down
Loading