Skip to content

Commit

Permalink
add RequestId to NetworkMessage::SendResponse
Browse files Browse the repository at this point in the history
  • Loading branch information
jxs committed Sep 28, 2024
1 parent 1e02b00 commit 1705160
Show file tree
Hide file tree
Showing 5 changed files with 353 additions and 83 deletions.
8 changes: 7 additions & 1 deletion beacon_node/lighthouse_network/src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -946,7 +946,13 @@ impl<E: EthSpec> Network<E> {
}

/// Send a successful response to a peer over RPC.
pub fn send_response(&mut self, peer_id: PeerId, id: PeerRequestId, response: Response<E>) {
pub fn send_response(
&mut self,
peer_id: PeerId,
id: PeerRequestId,
_request_id: rpc::RequestId,
response: Response<E>,
) {
self.eth2_rpc_mut()
.send_response(peer_id, id, response.into())
}
Expand Down
130 changes: 105 additions & 25 deletions beacon_node/network/src/network_beacon_processor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@ use beacon_processor::{
DuplicateCache, GossipAggregatePackage, GossipAttestationPackage, Work,
WorkEvent as BeaconWorkEvent,
};
use lighthouse_network::discovery::ConnectionId;
use lighthouse_network::rpc::methods::{
BlobsByRangeRequest, BlobsByRootRequest, DataColumnsByRangeRequest, DataColumnsByRootRequest,
};
use lighthouse_network::rpc::{RequestId, SubstreamId};
use lighthouse_network::{
rpc::{BlocksByRangeRequest, BlocksByRootRequest, LightClientBootstrapRequest, StatusMessage},
Client, MessageId, NetworkGlobals, PeerId, PeerRequestId,
Client, MessageId, NetworkGlobals, PeerId,
};
use slog::{debug, Logger};
use slot_clock::ManualSlotClock;
Expand Down Expand Up @@ -596,13 +598,21 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
pub fn send_blocks_by_range_request(
self: &Arc<Self>,
peer_id: PeerId,
request_id: PeerRequestId,
connection_id: ConnectionId,
substream_id: SubstreamId,
request_id: RequestId,
request: BlocksByRangeRequest,
) -> Result<(), Error<T::EthSpec>> {
let processor = self.clone();
let process_fn = async move {
processor
.handle_blocks_by_range_request(peer_id, request_id, request)
.handle_blocks_by_range_request(
peer_id,
connection_id,
substream_id,
request_id,
request,
)
.await;
};

Expand All @@ -616,13 +626,21 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
pub fn send_blocks_by_roots_request(
self: &Arc<Self>,
peer_id: PeerId,
request_id: PeerRequestId,
connection_id: ConnectionId,
substream_id: SubstreamId,
request_id: RequestId,
request: BlocksByRootRequest,
) -> Result<(), Error<T::EthSpec>> {
let processor = self.clone();
let process_fn = async move {
processor
.handle_blocks_by_root_request(peer_id, request_id, request)
.handle_blocks_by_root_request(
peer_id,
connection_id,
substream_id,
request_id,
request,
)
.await;
};

Expand All @@ -636,12 +654,21 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
pub fn send_blobs_by_range_request(
self: &Arc<Self>,
peer_id: PeerId,
request_id: PeerRequestId,
connection_id: ConnectionId,
substream_id: SubstreamId,
request_id: RequestId,
request: BlobsByRangeRequest,
) -> Result<(), Error<T::EthSpec>> {
let processor = self.clone();
let process_fn =
move || processor.handle_blobs_by_range_request(peer_id, request_id, request);
let process_fn = move || {
processor.handle_blobs_by_range_request(
peer_id,
connection_id,
substream_id,
request_id,
request,
)
};

self.try_send(BeaconWorkEvent {
drop_during_sync: false,
Expand All @@ -653,12 +680,21 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
pub fn send_blobs_by_roots_request(
self: &Arc<Self>,
peer_id: PeerId,
request_id: PeerRequestId,
connection_id: ConnectionId,
substream_id: SubstreamId,
request_id: RequestId,
request: BlobsByRootRequest,
) -> Result<(), Error<T::EthSpec>> {
let processor = self.clone();
let process_fn =
move || processor.handle_blobs_by_root_request(peer_id, request_id, request);
let process_fn = move || {
processor.handle_blobs_by_root_request(
peer_id,
connection_id,
substream_id,
request_id,
request,
)
};

self.try_send(BeaconWorkEvent {
drop_during_sync: false,
Expand All @@ -670,12 +706,21 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
pub fn send_data_columns_by_roots_request(
self: &Arc<Self>,
peer_id: PeerId,
request_id: PeerRequestId,
connection_id: ConnectionId,
substream_id: SubstreamId,
request_id: RequestId,
request: DataColumnsByRootRequest,
) -> Result<(), Error<T::EthSpec>> {
let processor = self.clone();
let process_fn =
move || processor.handle_data_columns_by_root_request(peer_id, request_id, request);
let process_fn = move || {
processor.handle_data_columns_by_root_request(
peer_id,
connection_id,
substream_id,
request_id,
request,
)
};

self.try_send(BeaconWorkEvent {
drop_during_sync: false,
Expand All @@ -687,12 +732,21 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
pub fn send_data_columns_by_range_request(
self: &Arc<Self>,
peer_id: PeerId,
request_id: PeerRequestId,
connection_id: ConnectionId,
substream_id: SubstreamId,
request_id: RequestId,
request: DataColumnsByRangeRequest,
) -> Result<(), Error<T::EthSpec>> {
let processor = self.clone();
let process_fn =
move || processor.handle_data_columns_by_range_request(peer_id, request_id, request);
let process_fn = move || {
processor.handle_data_columns_by_range_request(
peer_id,
connection_id,
substream_id,
request_id,
request,
)
};

self.try_send(BeaconWorkEvent {
drop_during_sync: false,
Expand All @@ -704,12 +758,21 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
pub fn send_light_client_bootstrap_request(
self: &Arc<Self>,
peer_id: PeerId,
request_id: PeerRequestId,
connection_id: ConnectionId,
substream_id: SubstreamId,
request_id: RequestId,
request: LightClientBootstrapRequest,
) -> Result<(), Error<T::EthSpec>> {
let processor = self.clone();
let process_fn =
move || processor.handle_light_client_bootstrap(peer_id, request_id, request);
let process_fn = move || {
processor.handle_light_client_bootstrap(
peer_id,
connection_id,
substream_id,
request_id,
request,
)
};

self.try_send(BeaconWorkEvent {
drop_during_sync: true,
Expand All @@ -721,11 +784,19 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
pub fn send_light_client_optimistic_update_request(
self: &Arc<Self>,
peer_id: PeerId,
request_id: PeerRequestId,
connection_id: ConnectionId,
substream_id: SubstreamId,
request_id: RequestId,
) -> Result<(), Error<T::EthSpec>> {
let processor = self.clone();
let process_fn =
move || processor.handle_light_client_optimistic_update(peer_id, request_id);
let process_fn = move || {
processor.handle_light_client_optimistic_update(
peer_id,
connection_id,
substream_id,
request_id,
)
};

self.try_send(BeaconWorkEvent {
drop_during_sync: true,
Expand All @@ -737,10 +808,19 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
pub fn send_light_client_finality_update_request(
self: &Arc<Self>,
peer_id: PeerId,
request_id: PeerRequestId,
connection_id: ConnectionId,
substream_id: SubstreamId,
request_id: RequestId,
) -> Result<(), Error<T::EthSpec>> {
let processor = self.clone();
let process_fn = move || processor.handle_light_client_finality_update(peer_id, request_id);
let process_fn = move || {
processor.handle_light_client_finality_update(
peer_id,
connection_id,
substream_id,
request_id,
)
};

self.try_send(BeaconWorkEvent {
drop_during_sync: true,
Expand Down
Loading

0 comments on commit 1705160

Please sign in to comment.