From cb1a4e50f3aac6cce161853b781b9c1929fde456 Mon Sep 17 00:00:00 2001 From: Kirill Fedoseev Date: Fri, 27 Sep 2024 15:42:44 +0400 Subject: [PATCH] fix(user-ops-indexer): stop indexing from already closed ws connections --- .../src/indexer/base_indexer.rs | 20 ++++++++++++------- .../user-ops-indexer-server/src/indexer.rs | 7 ++++++- 2 files changed, 19 insertions(+), 8 deletions(-) diff --git a/user-ops-indexer/user-ops-indexer-logic/src/indexer/base_indexer.rs b/user-ops-indexer/user-ops-indexer-logic/src/indexer/base_indexer.rs index 48392569d..c9f6c3cfc 100644 --- a/user-ops-indexer/user-ops-indexer-logic/src/indexer/base_indexer.rs +++ b/user-ops-indexer/user-ops-indexer-logic/src/indexer/base_indexer.rs @@ -12,12 +12,12 @@ use ethers::prelude::{ abi::{AbiEncode, Error}, parse_log, types::{Address, Bytes, Filter, Log, TransactionReceipt}, - EthEvent, Middleware, NodeClient, Provider, ProviderError, H256, + EthEvent, Middleware, NodeClient, Provider, ProviderError, WsClientError, H256, }; use futures::{ stream, stream::{repeat_with, BoxStream}, - Stream, StreamExt, + Stream, StreamExt, TryStreamExt, }; use sea_orm::DatabaseConnection; use std::{future, num::NonZeroUsize, sync::Arc, time, time::Duration}; @@ -216,9 +216,16 @@ impl Indexer { .filter_map(|tx_hash| async move { tx_hash }); stream_txs - .for_each_concurrent(Some(self.settings.concurrency as usize), |tx| async move { + .map(Ok) + .try_for_each_concurrent(Some(self.settings.concurrency as usize), |tx| async move { let mut backoff = vec![5, 20, 120].into_iter().map(Duration::from_secs); - while let Err(err) = &self.handle_tx(tx, variant).await { + while let Err(err) = self.handle_tx(tx, variant).await { + // terminate stream if WS connection is closed, indexer will be restarted + if self.client.as_ref().supports_subscriptions() && err.to_string() == WsClientError::UnexpectedClose.to_string() { + tracing::error!(error = ?err, tx_hash = ?tx, "tx handler failed, ws connection closed, exiting"); + return Err(err); + } + match backoff.next() { None => { tracing::error!(error = ?err, tx_hash = ?tx, "tx handler failed, skipping"); @@ -230,10 +237,9 @@ impl Indexer { } }; } + Ok(()) }) - .await; - - Ok(()) + .await } async fn fetch_jobs_for_block_range( diff --git a/user-ops-indexer/user-ops-indexer-server/src/indexer.rs b/user-ops-indexer/user-ops-indexer-server/src/indexer.rs index ee8c8a952..d25519dff 100644 --- a/user-ops-indexer/user-ops-indexer-server/src/indexer.rs +++ b/user-ops-indexer/user-ops-indexer-server/src/indexer.rs @@ -70,7 +70,12 @@ async fn start_indexer_with_retries { - tracing::error!(error = ?err, version = L::version(), ?delay, "indexer startup failed, retrying"); + tracing::error!( + error = ?err, + version = L::version(), + ?delay, + "indexer stream ended with error, retrying" + ); } Ok(_) => { if !settings.realtime.enabled {