diff --git a/Cargo.lock b/Cargo.lock index 6e7c1085828f1..170a36b5d8fc2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7218,6 +7218,7 @@ dependencies = [ "rand 0.7.3", "sc-network", "sp-runtime", + "substrate-prometheus-endpoint", "substrate-test-runtime-client", "wasm-timer", ] diff --git a/client/finality-grandpa/src/communication/mod.rs b/client/finality-grandpa/src/communication/mod.rs index 77d2d15e5d020..66b7f004895fc 100644 --- a/client/finality-grandpa/src/communication/mod.rs +++ b/client/finality-grandpa/src/communication/mod.rs @@ -217,7 +217,8 @@ impl> NetworkBridge { let gossip_engine = Arc::new(Mutex::new(GossipEngine::new( service.clone(), GRANDPA_PROTOCOL_NAME, - validator.clone() + validator.clone(), + prometheus_registry, ))); { diff --git a/client/network-gossip/Cargo.toml b/client/network-gossip/Cargo.toml index 3a10c62ac6dd4..9ad591d0af697 100644 --- a/client/network-gossip/Cargo.toml +++ b/client/network-gossip/Cargo.toml @@ -20,6 +20,7 @@ futures-timer = "3.0.1" libp2p = { version = "0.33.0", default-features = false } log = "0.4.8" lru = "0.6.1" +prometheus-endpoint = { package = "substrate-prometheus-endpoint", version = "0.8.0", path = "../../utils/prometheus" } sc-network = { version = "0.8.0", path = "../network" } sp-runtime = { version = "2.0.0", path = "../../primitives/runtime" } wasm-timer = "0.2" diff --git a/client/network-gossip/src/bridge.rs b/client/network-gossip/src/bridge.rs index d444409d1cd3d..4e8ebfda20c25 100644 --- a/client/network-gossip/src/bridge.rs +++ b/client/network-gossip/src/bridge.rs @@ -25,6 +25,7 @@ use futures::prelude::*; use futures::channel::mpsc::{channel, Sender, Receiver}; use libp2p::PeerId; use log::trace; +use prometheus_endpoint::Registry; use sp_runtime::traits::Block as BlockT; use std::{ borrow::Cow, @@ -72,12 +73,13 @@ impl GossipEngine { network: N, protocol: impl Into>, validator: Arc>, + metrics_registry: Option<&Registry>, ) -> Self where B: 'static { let protocol = protocol.into(); let network_event_stream = network.event_stream(); GossipEngine { - state_machine: ConsensusGossip::new(validator, protocol.clone()), + state_machine: ConsensusGossip::new(validator, protocol.clone(), metrics_registry), network: Box::new(network), periodic_maintenance_interval: futures_timer::Delay::new(PERIODIC_MAINTENANCE_INTERVAL), protocol, @@ -372,7 +374,8 @@ mod tests { let mut gossip_engine = GossipEngine::::new( network.clone(), "/my_protocol", - Arc::new(AllowAll{}), + Arc::new(AllowAll {}), + None, ); // Drop network event stream sender side. @@ -399,7 +402,8 @@ mod tests { let mut gossip_engine = GossipEngine::::new( network.clone(), protocol.clone(), - Arc::new(AllowAll{}), + Arc::new(AllowAll {}), + None, ); let mut event_sender = network.inner.lock() @@ -533,7 +537,8 @@ mod tests { let mut gossip_engine = GossipEngine::::new( network.clone(), protocol.clone(), - Arc::new(TestValidator{}), + Arc::new(TestValidator {}), + None, ); // Create channels. @@ -549,8 +554,10 @@ mod tests { // Insert sender sides into `gossip_engine`. for (topic, tx) in txs { match gossip_engine.message_sinks.get_mut(&topic) { - Some(entry) => entry.push(tx), - None => {gossip_engine.message_sinks.insert(topic, vec![tx]);}, + Some(entry) => entry.push(tx), + None => { + gossip_engine.message_sinks.insert(topic, vec![tx]); + } } } diff --git a/client/network-gossip/src/state_machine.rs b/client/network-gossip/src/state_machine.rs index 58a0f62cb1304..805f2e82ea25c 100644 --- a/client/network-gossip/src/state_machine.rs +++ b/client/network-gossip/src/state_machine.rs @@ -23,15 +23,24 @@ use std::collections::{HashMap, HashSet}; use std::sync::Arc; use std::iter; use std::time; -use log::{error, trace}; +use log::{debug, error, trace}; use lru::LruCache; use libp2p::PeerId; +use prometheus_endpoint::{register, Counter, PrometheusError, Registry, U64}; use sp_runtime::traits::{Block as BlockT, Hash, HashFor}; use sc_network::ObservedRole; use wasm_timer::Instant; // FIXME: Add additional spam/DoS attack protection: https://github.com/paritytech/substrate/issues/1115 -const KNOWN_MESSAGES_CACHE_SIZE: usize = 4096; +// NOTE: The current value is adjusted based on largest production network deployment (Kusama) and +// the current main gossip user (GRANDPA). Currently there are ~800 validators on Kusama, as such, +// each GRANDPA round should generate ~1600 messages, and we currently keep track of the last 2 +// completed rounds and the current live one. That makes it so that at any point we will be holding +// ~4800 live messages. +// +// Assuming that each known message is tracked with a 32 byte hash (common for `Block::Hash`), then +// this cache should take about 256 KB of memory. +const KNOWN_MESSAGES_CACHE_SIZE: usize = 8192; const REBROADCAST_INTERVAL: time::Duration = time::Duration::from_secs(30); @@ -151,11 +160,25 @@ pub struct ConsensusGossip { protocol: Cow<'static, str>, validator: Arc>, next_broadcast: Instant, + metrics: Option, } impl ConsensusGossip { /// Create a new instance using the given validator. - pub fn new(validator: Arc>, protocol: Cow<'static, str>) -> Self { + pub fn new( + validator: Arc>, + protocol: Cow<'static, str>, + metrics_registry: Option<&Registry>, + ) -> Self { + let metrics = match metrics_registry.map(Metrics::register) { + Some(Ok(metrics)) => Some(metrics), + Some(Err(e)) => { + debug!(target: "gossip", "Failed to register metrics: {:?}", e); + None + } + None => None, + }; + ConsensusGossip { peers: HashMap::new(), messages: Default::default(), @@ -163,6 +186,7 @@ impl ConsensusGossip { protocol, validator, next_broadcast: Instant::now() + REBROADCAST_INTERVAL, + metrics, } } @@ -197,6 +221,10 @@ impl ConsensusGossip { message, sender, }); + + if let Some(ref metrics) = self.metrics { + metrics.registered_messages.inc(); + } } } @@ -264,10 +292,17 @@ impl ConsensusGossip { let before = self.messages.len(); let mut message_expired = self.validator.message_expired(); - self.messages.retain(|entry| !message_expired(entry.topic, &entry.message)); + self.messages + .retain(|entry| !message_expired(entry.topic, &entry.message)); + + let expired_messages = before - self.messages.len(); + + if let Some(ref metrics) = self.metrics { + metrics.expired_messages.inc_by(expired_messages as u64) + } trace!(target: "gossip", "Cleaned up {} stale messages, {} left ({} known)", - before - self.messages.len(), + expired_messages, self.messages.len(), known_messages.len(), ); @@ -429,6 +464,32 @@ impl ConsensusGossip { } } +struct Metrics { + registered_messages: Counter, + expired_messages: Counter, +} + +impl Metrics { + fn register(registry: &Registry) -> Result { + Ok(Self { + registered_messages: register( + Counter::new( + "network_gossip_registered_messages_total", + "Number of registered messages by the gossip service.", + )?, + registry, + )?, + expired_messages: register( + Counter::new( + "network_gossip_expired_messages_total", + "Number of expired messages by the gossip service.", + )?, + registry, + )?, + }) + } +} + #[cfg(test)] mod tests { use futures::prelude::*; @@ -538,7 +599,7 @@ mod tests { let prev_hash = H256::random(); let best_hash = H256::random(); - let mut consensus = ConsensusGossip::::new(Arc::new(AllowAll), "/foo".into()); + let mut consensus = ConsensusGossip::::new(Arc::new(AllowAll), "/foo".into(), None); let m1_hash = H256::random(); let m2_hash = H256::random(); let m1 = vec![1, 2, 3]; @@ -565,11 +626,11 @@ mod tests { #[test] fn message_stream_include_those_sent_before_asking() { - let mut consensus = ConsensusGossip::::new(Arc::new(AllowAll), "/foo".into()); + let mut consensus = ConsensusGossip::::new(Arc::new(AllowAll), "/foo".into(), None); // Register message. let message = vec![4, 5, 6]; - let topic = HashFor::::hash(&[1,2,3]); + let topic = HashFor::::hash(&[1, 2, 3]); consensus.register_message(topic, message.clone()); assert_eq!( @@ -580,7 +641,7 @@ mod tests { #[test] fn can_keep_multiple_messages_per_topic() { - let mut consensus = ConsensusGossip::::new(Arc::new(AllowAll), "/foo".into()); + let mut consensus = ConsensusGossip::::new(Arc::new(AllowAll), "/foo".into(), None); let topic = [1; 32].into(); let msg_a = vec![1, 2, 3]; @@ -594,7 +655,7 @@ mod tests { #[test] fn peer_is_removed_on_disconnect() { - let mut consensus = ConsensusGossip::::new(Arc::new(AllowAll), "/foo".into()); + let mut consensus = ConsensusGossip::::new(Arc::new(AllowAll), "/foo".into(), None); let mut network = NoOpNetwork::default(); @@ -608,14 +669,12 @@ mod tests { #[test] fn on_incoming_ignores_discarded_messages() { - let to_forward = ConsensusGossip::::new( - Arc::new(DiscardAll), - "/foo".into(), - ).on_incoming( - &mut NoOpNetwork::default(), - PeerId::random(), - vec![vec![1, 2, 3]], - ); + let to_forward = ConsensusGossip::::new(Arc::new(DiscardAll), "/foo".into(), None) + .on_incoming( + &mut NoOpNetwork::default(), + PeerId::random(), + vec![vec![1, 2, 3]], + ); assert!( to_forward.is_empty(), @@ -628,15 +687,13 @@ mod tests { let mut network = NoOpNetwork::default(); let remote = PeerId::random(); - let to_forward = ConsensusGossip::::new( - Arc::new(AllowAll), - "/foo".into(), - ).on_incoming( - &mut network, - // Unregistered peer. - remote.clone(), - vec![vec![1, 2, 3]], - ); + let to_forward = ConsensusGossip::::new(Arc::new(AllowAll), "/foo".into(), None) + .on_incoming( + &mut network, + // Unregistered peer. + remote.clone(), + vec![vec![1, 2, 3]], + ); assert!( to_forward.is_empty(),