From 0d167554a95467af07cb3162df9a25745dbe8589 Mon Sep 17 00:00:00 2001 From: Harold Newman Date: Thu, 12 Sep 2024 02:00:38 +1000 Subject: [PATCH] tmp turbine --- src/turbine/lib.zig | 1 + src/turbine/retransmit.zig | 181 +++++++++++++++++++++++++++++++++++ src/turbine/turbine_tree.zig | 47 ++++++++- 3 files changed, 228 insertions(+), 1 deletion(-) create mode 100644 src/turbine/retransmit.zig diff --git a/src/turbine/lib.zig b/src/turbine/lib.zig index 805281c23..d7157d437 100644 --- a/src/turbine/lib.zig +++ b/src/turbine/lib.zig @@ -1,3 +1,4 @@ pub const turbine_tree = @import("turbine_tree.zig"); pub const TurbineTree = turbine_tree.TurbineTree; +pub const TurbineTreeCache = turbine_tree.TurbineTreeCache; diff --git a/src/turbine/retransmit.zig b/src/turbine/retransmit.zig new file mode 100644 index 000000000..5b2d48e11 --- /dev/null +++ b/src/turbine/retransmit.zig @@ -0,0 +1,181 @@ +const std = @import("std"); +const net = @import("zig-network"); +const sig = @import("../sig.zig"); + +const UdpSocket = net.Socket; +const AtomicBool = std.atomic.Value(bool); +const AtomicU64 = std.atomic.Value(u64); +const KeyPair = std.crypto.sign.Ed25519.KeyPair; +const Instant = std.time.Instant; + +const bincode = sig.bincode; + +const Pubkey = sig.core.Pubkey; +const Hash = sig.core.Hash; +const Slot = sig.core.Slot; +const Epoch = sig.core.Epoch; +const Duration = sig.time.Duration; +const TurbineTree = sig.turbine.TurbineTree; +const TurbineTreeCache = sig.turbine.TurbineTreeCache; +const Channel = sig.sync.Channel; +const ShredId = sig.ledger.shred.ShredId; +const BlockstoreReader = sig.ledger.BlockstoreReader; +const BlockstoreWriter = sig.ledger.BlockstoreWriter; +const ShredInserter = sig.ledger.insert_shred.ShredInserter; +const Shred = sig.ledger.shred.Shred; +const LeaderScheduleCache = sig.core.leader_schedule.LeaderScheduleCache; +const BankFields = sig.accounts_db.snapshots.BankFields; + +// MISSING DATA STRUCTURES +// const Bank = struct {}; +// const UdpSocket = struct {}; +// const Blockstore = struct {}; +// const BankForks = struct {}; +// const WorkingBankEntry = struct { +// bank: Bank, +// entry: PohEntry, +// last_tick_height: u64, +// }; + +pub fn runRetransmitService( + // sockets: Arc>, // Sockets to read from + // quic_endpoint_sender: AsyncSender<(SocketAddr, Bytes)>, + // bank_forks: Arc>, + // leader_schedule_cache: Arc, + // cluster_info: Arc, + // shreds_receiver: Receiver>>, + // max_slots: Arc, + // rpc_subscriptions: Option>, +) !void { + // Init cluster node cache + // Init rng + // Init shred deduper + // Init thread pool with max threads equal to the number of sockets + // Loop + // call retransmit +} + +const MAX_DUPLICATE_COUNT: usize = 2; +const DEDUPER_FALSE_POSITIVE_RATE: f64 = 0.001; +const DEDUPER_RESET_CYCLE: Duration = Duration.fromSecs(5 * 60); + +fn retransmit( + allocator: std.mem.Allocator, + rand: std.rand.Random, + bank_fields: BankFields, + leader_schedule_cache: *LeaderScheduleCache, + shreds_receiver: *Channel(std.ArrayList(std.ArrayList(u8))), + sockets: []const UdpSocket, + turbine_tree_cache: *TurbineTreeCache, + shred_deduper: *ShredDeduper(2), + // max_slots: &MaxSlots, // When starting validator shared in json rpc service, completed data sets service and tvu retransmit stage +) !void { + // Drain shred receiver into raw shreds + const raw_shreds = try shreds_receiver.try_drain() orelse return error.NoShreds; // Add timeout? + + shred_deduper.maybeReset( + rand, + DEDUPER_FALSE_POSITIVE_RATE, + DEDUPER_RESET_CYCLE, + ); + + // Group shreds by slot + var slot_shreds = std.AutoArrayHashMap(Slot, std.ArrayList(struct { ShredId, []const u8 })).init(allocator); + for (raw_shreds) |raw_shred| { + const shred_id = (try bincode.readFromSlice(allocator, Shred, raw_shred.items, .{})).id(); // Agave just reads shred id using byte offsets into struct + if (shred_deduper.dedup(shred_id, raw_shred, MAX_DUPLICATE_COUNT)) continue; + if (slot_shreds.getEntry(shred_id.slot)) |entry| { + try entry.value_ptr.append(.{ shred_id, raw_shred }); + } else { + const new_slot_shreds = std.ArrayList(struct { ShredId, []const u8 }).init(allocator); + try new_slot_shreds.append(.{ shred_id, raw_shred }); + try slot_shreds.put(shred_id.slot, new_slot_shreds); + } + } + + // Retransmit shreds + for (slot_shreds.keys(), slot_shreds.values()) |slot, shreds| { + // max_slots.retransmit.fetch_max(slot, Ordering::Relaxed); + const slot_leader = leader_schedule_cache.getSlotLeader(slot, &bank_fields); // Need bank here, if leader schedule cache does not have leader schedule for slot, we need to compute the leader schedule by getting the staked nodes from the bank for the epoch which contains the provided slot + const turbine_tree = turbine_tree_cache.getTurbineTree(slot); // Need bank here, if turbine tree cache does not have ... + + // PERF: Move outside for loop and parallelize + for (shreds, 0..) |shred, i| { + const shred_id, const shred_bytes = shred; + const socket = sockets[i % sockets.len]; + + const addresses = turbine_tree.getRetransmitAddresses( + allocator, + slot_leader, + shred_id, + TurbineTree.getDataPlaneFanout(), + ); + defer allocator.free(addresses); + + for (addresses) |address| { + try socket.sendTo(address.toEndpoint(), shred_bytes); + } + } + } +} + +pub fn ShredDeduper(comptime K: usize) type { + return struct { + deduper: Deduper(K, []const u8), + shred_id_filter: Deduper(K, struct { ShredId, usize }), + + pub fn init() ShredDeduper(K) { + return .{ + .deduper = Deduper(K, []const u8).init(), + .shred_id_filter = Deduper(K, struct { ShredId, usize }), + }; + } + + pub fn maybeReset(self: *ShredDeduper(K), rand: std.rand.Random, false_positive_rate: f64, reset_cycle: Duration) void { + // TODO: + _ = self; + _ = rand; + _ = false_positive_rate; + _ = reset_cycle; + } + + pub fn dedup(self: ShredDeduper(K), shred_id: *ShredId, shred_bytes: []const u8, max_duplicate_count: MAX_DUPLICATE_COUNT) bool { + // TODO: + _ = self; + _ = shred_id; + _ = shred_bytes; + _ = max_duplicate_count; + return false; + } + }; +} + +pub fn Deduper(comptime K: usize, comptime T: type) type { + return struct { + num_bits: u64, + bits: std.ArrayList(AtomicU64), + state: [K]RandomState, + clock: Instant, + popcount: AtomicU64, + + pub fn init(allocator: std.mem.Allocator) Deduper(K, T) { + // TODO + return .{ + .num_bits = 0, + .bits = std.ArrayList(AtomicU64).init(allocator), + .state = [_]RandomState{.{}} ** K, + .clock = Instant.now(), + .popcount = AtomicU64.init(0), + }; + } + + pub fn dedup(self: *Deduper(K, T), data: *const T) bool { + // TODO + _ = self; + _ = data; + return false; + } + }; +} + +pub const RandomState = struct {}; diff --git a/src/turbine/turbine_tree.zig b/src/turbine/turbine_tree.zig index 5d12497da..05354faef 100644 --- a/src/turbine/turbine_tree.zig +++ b/src/turbine/turbine_tree.zig @@ -6,13 +6,58 @@ const SocketAddr = sig.net.SocketAddr; const ShredId = sig.ledger.shred.ShredId; const RwMux = sig.sync.RwMux; const ThreadSafeContactInfo = sig.gossip.data.ThreadSafeContactInfo; +const BankFields = sig.accounts_db.snapshots.BankFields; const Pubkey = sig.core.Pubkey; const Epoch = sig.core.Epoch; +const Slot = sig.core.Slot; const Duration = sig.time.Duration; const Instant = sig.time.Instant; const WeightedShuffle = sig.rand.WeightedShuffle(u64); const ChaChaRng = sig.rand.ChaChaRng(20); +pub const TurbineTreeCache = struct { + allocator: std.mem.Allocator, + cache: std.AutoArrayHashMap(Epoch, CacheEntry), + ttl: Duration, + + pub const CacheEntry = struct { + created: Instant, + turbine_tree: TurbineTree, + + pub fn alive(self: *const CacheEntry, ttl: Duration) bool { + return self.created.elapsed().asNanos() < ttl.asNanos(); + } + }; + + pub fn init(allocator: std.mem.Allocator, ttl: Duration) TurbineTreeCache { + return .{ + .allocator = allocator, + .cache = std.AutoArrayHashMap(Epoch, CacheEntry).init(allocator), + .ttl = ttl, + }; + } + + pub fn getTurbineTree(self: TurbineTreeCache, bank_fields: *const BankFields) *const TurbineTree { + const entry = try self.cache.getOrPut(bank_fields.epoch); + if (entry.found_existing and self.cacheEntryAlive(entry.value_ptr)) { + return &entry.value_ptr[1]; + } + + const epoch_staked_nodes = try bank_fields.getStakedNodes(); + // const turbine_tree = TurbineTree.initForRetransmit( + // self.allocator, + // my_contact_info, + // tvu_peers, + // epoch_staked_nodes, + // ); + _ = epoch_staked_nodes; + } + + pub fn cacheEntryAlive(self: TurbineTreeCache, cache_entry: *CacheEntry) bool { + return cache_entry[0].elapsed().asNanos() < self.ttl.asNanos(); + } +}; + /// Analogous to [ClusterNodes](https://github.com/anza-xyz/agave/blob/efd47046c1bb9bb027757ddabe408315bc7865cc/turbine/src/cluster_nodes.rs#L65) pub const TurbineTree = struct { allocator: std.mem.Allocator, @@ -124,7 +169,7 @@ pub const TurbineTree = struct { fanout: usize, ) struct { usize, - []ThreadSafeContactInfo, + []SocketAddr, } { const root_distance, const children, const addresses = try self.getRetransmitChildren(slot_leader, shred, fanout); var peers = std.ArrayList(SocketAddr).init(allocator);