Skip to content

Commit

Permalink
fix(user-ops-indexer): stop indexing from already closed ws connections
Browse files Browse the repository at this point in the history
  • Loading branch information
k1rill-fedoseev committed Sep 27, 2024
1 parent 4cff309 commit cb1a4e5
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@ use ethers::prelude::{
abi::{AbiEncode, Error},
parse_log,
types::{Address, Bytes, Filter, Log, TransactionReceipt},
EthEvent, Middleware, NodeClient, Provider, ProviderError, H256,
EthEvent, Middleware, NodeClient, Provider, ProviderError, WsClientError, H256,
};
use futures::{
stream,
stream::{repeat_with, BoxStream},
Stream, StreamExt,
Stream, StreamExt, TryStreamExt,
};
use sea_orm::DatabaseConnection;
use std::{future, num::NonZeroUsize, sync::Arc, time, time::Duration};
Expand Down Expand Up @@ -216,9 +216,16 @@ impl<L: IndexerLogic + Sync> Indexer<L> {
.filter_map(|tx_hash| async move { tx_hash });

stream_txs
.for_each_concurrent(Some(self.settings.concurrency as usize), |tx| async move {
.map(Ok)
.try_for_each_concurrent(Some(self.settings.concurrency as usize), |tx| async move {
let mut backoff = vec![5, 20, 120].into_iter().map(Duration::from_secs);
while let Err(err) = &self.handle_tx(tx, variant).await {
while let Err(err) = self.handle_tx(tx, variant).await {
// terminate stream if WS connection is closed, indexer will be restarted
if self.client.as_ref().supports_subscriptions() && err.to_string() == WsClientError::UnexpectedClose.to_string() {
tracing::error!(error = ?err, tx_hash = ?tx, "tx handler failed, ws connection closed, exiting");
return Err(err);
}

match backoff.next() {
None => {
tracing::error!(error = ?err, tx_hash = ?tx, "tx handler failed, skipping");
Expand All @@ -230,10 +237,9 @@ impl<L: IndexerLogic + Sync> Indexer<L> {
}
};
}
Ok(())
})
.await;

Ok(())
.await
}

async fn fetch_jobs_for_block_range(
Expand Down
7 changes: 6 additions & 1 deletion user-ops-indexer/user-ops-indexer-server/src/indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,12 @@ async fn start_indexer_with_retries<L: IndexerLogic + Sync + Clone + Send + 'sta
loop {
match indexer.start().await {
Err(err) => {
tracing::error!(error = ?err, version = L::version(), ?delay, "indexer startup failed, retrying");
tracing::error!(
error = ?err,
version = L::version(),
?delay,
"indexer stream ended with error, retrying"
);
}
Ok(_) => {
if !settings.realtime.enabled {
Expand Down

0 comments on commit cb1a4e5

Please sign in to comment.