From 0ae3bca8456b56262a08925eed95935b151d8139 Mon Sep 17 00:00:00 2001 From: Brian Kassouf Date: Tue, 14 Apr 2020 00:22:24 -0700 Subject: [PATCH 1/6] storage/raft: Split snapshot restore disk write into batches --- physical/raft/fsm.go | 72 ++++++++++++++++++++++++++++++-------------- 1 file changed, 49 insertions(+), 23 deletions(-) diff --git a/physical/raft/fsm.go b/physical/raft/fsm.go index 22f525624f15..356d4f682cbe 100644 --- a/physical/raft/fsm.go +++ b/physical/raft/fsm.go @@ -241,7 +241,7 @@ func (f *FSM) witnessSnapshot(index, term, configurationIndex uint64, configurat // Delete deletes the given key from the bolt file. func (f *FSM) Delete(ctx context.Context, path string) error { - defer metrics.MeasureSince([]string{"raft", "delete"}, time.Now()) + defer metrics.MeasureSince([]string{"raft_storage", "fsm", "delete"}, time.Now()) f.l.RLock() defer f.l.RUnlock() @@ -253,7 +253,7 @@ func (f *FSM) Delete(ctx context.Context, path string) error { // Delete deletes the given key from the bolt file. func (f *FSM) DeletePrefix(ctx context.Context, prefix string) error { - defer metrics.MeasureSince([]string{"raft", "delete_prefix"}, time.Now()) + defer metrics.MeasureSince([]string{"raft_storage", "fsm", "delete_prefix"}, time.Now()) f.l.RLock() defer f.l.RUnlock() @@ -277,7 +277,7 @@ func (f *FSM) DeletePrefix(ctx context.Context, prefix string) error { // Get retrieves the value at the given path from the bolt file. func (f *FSM) Get(ctx context.Context, path string) (*physical.Entry, error) { - defer metrics.MeasureSince([]string{"raft", "get"}, time.Now()) + defer metrics.MeasureSince([]string{"raft_storage", "fsm", "get"}, time.Now()) f.l.RLock() defer f.l.RUnlock() @@ -311,7 +311,7 @@ func (f *FSM) Get(ctx context.Context, path string) (*physical.Entry, error) { // Put writes the given entry to the bolt file. func (f *FSM) Put(ctx context.Context, entry *physical.Entry) error { - defer metrics.MeasureSince([]string{"raft", "put"}, time.Now()) + defer metrics.MeasureSince([]string{"raft_storage", "fsm", "put"}, time.Now()) f.l.RLock() defer f.l.RUnlock() @@ -324,7 +324,7 @@ func (f *FSM) Put(ctx context.Context, entry *physical.Entry) error { // List retrieves the set of keys with the given prefix from the bolt file. func (f *FSM) List(ctx context.Context, prefix string) ([]string, error) { - defer metrics.MeasureSince([]string{"raft", "list"}, time.Now()) + defer metrics.MeasureSince([]string{"raft_storage", "fsm", "list"}, time.Now()) f.l.RLock() defer f.l.RUnlock() @@ -531,6 +531,8 @@ type writeErrorCloser interface { // (size, checksum, etc) and a second for the sink of the data. We also use a // proto delimited writer so we can stream proto messages to the sink. func (f *FSM) writeTo(ctx context.Context, metaSink writeErrorCloser, sink writeErrorCloser) { + defer metrics.MeasureSince([]string{"raft_storage", "fsm", "write_snapshot"}, time.Now()) + protoWriter := protoio.NewDelimitedWriter(sink) metadataProtoWriter := protoio.NewDelimitedWriter(metaSink) @@ -589,6 +591,8 @@ func (f *FSM) SetNoopRestore(enabled bool) { // first deletes the existing bucket to clear all existing data, then recreates // it so we can copy in the snapshot. func (f *FSM) Restore(r io.ReadCloser) error { + defer metrics.MeasureSince([]string{"raft_storage", "fsm", "restore_snapshot"}, time.Now()) + if f.noopRestore == true { return nil } @@ -599,41 +603,63 @@ func (f *FSM) Restore(r io.ReadCloser) error { f.l.Lock() defer f.l.Unlock() - // Start a write transaction. + // Delete the existing data bucket and create a new one. + f.logger.Debug("snapshot restore: deleting bucket") err := f.db.Update(func(tx *bolt.Tx) error { err := tx.DeleteBucket(dataBucketName) if err != nil { return err } - b, err := tx.CreateBucket(dataBucketName) + _, err = tx.CreateBucket(dataBucketName) if err != nil { return err } - for { + return nil + }) + if err != nil { + f.logger.Error("could not restore snapshot: could not clear existing bucket", "error", err) + return err + } + + f.logger.Debug("snapshot restore: deleting bucket done") + f.logger.Debug("snapshot restore: writing keys") + + var done bool + for !done { + err := f.db.Update(func(tx *bolt.Tx) error { + b := tx.Bucket(dataBucketName) s := new(pb.StorageEntry) - err := protoReader.ReadMsg(s) - if err != nil { - if err == io.EOF { - return nil + + // Commit in batches of 50k. Bolt wont split pages until commit so + // we do incremental writes. This is safe since we have a write lock + // on the fsm's lock. + for i := 0; i < 50000; i++ { + err := protoReader.ReadMsg(s) + if err != nil { + if err == io.EOF { + done = true + return nil + } + return err } - return err - } - err = b.Put([]byte(s.Key), s.Value) - if err != nil { - return err + err = b.Put([]byte(s.Key), s.Value) + if err != nil { + return err + } } - } - return nil - }) - if err != nil { - f.logger.Error("could not restore snapshot", "error", err) - return err + return nil + }) + if err != nil { + f.logger.Error("could not restore snapshot", "error", err) + return err + } } + f.logger.Debug("snapshot restore: writing keys done") return nil } From 6b56a615f0551644a8f60f6880ff37f1d31757ed Mon Sep 17 00:00:00 2001 From: Brian Kassouf Date: Fri, 17 Apr 2020 18:54:43 -0700 Subject: [PATCH 2/6] Work on snapshot consistency --- go.sum | 1 + physical/raft/fsm.go | 59 ++++++++++++++++++++----- physical/raft/raft_test.go | 46 ++++++++++++++++--- physical/raft/snapshot.go | 21 ++++++--- physical/raft/snapshot_test.go | 81 ++++++++++++++++++++++++++++++++++ 5 files changed, 186 insertions(+), 22 deletions(-) diff --git a/go.sum b/go.sum index 83b9f26a39cf..7f818bc4143f 100644 --- a/go.sum +++ b/go.sum @@ -150,6 +150,7 @@ github.com/coreos/go-oidc v2.1.0+incompatible h1:sdJrfw8akMnCuUlaZU3tE/uYXFgfqom github.com/coreos/go-oidc v2.1.0+incompatible/go.mod h1:CgnwVTmzoESiwO9qyAFEMiHoZ1nMCKZlZ9V6mm3/LKc= github.com/coreos/go-semver v0.2.0 h1:3Jm3tLmsgAYcjC+4Up7hJrFBPr+n7rAqYeSw/SZazuY= github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= +github.com/coreos/go-systemd v0.0.0-20180511133405-39ca1b05acc7 h1:u9SHYsPQNyt5tgDm3YN7+9dYrpK96E5wFilTFWIDZOM= github.com/coreos/go-systemd v0.0.0-20180511133405-39ca1b05acc7/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= github.com/coreos/go-systemd v0.0.0-20181012123002-c6f51f82210d h1:t5Wuyh53qYyg9eqn4BbnlIT+vmhyww0TatL+zT3uWgI= github.com/coreos/go-systemd v0.0.0-20181012123002-c6f51f82210d/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= diff --git a/physical/raft/fsm.go b/physical/raft/fsm.go index 356d4f682cbe..6669dd006052 100644 --- a/physical/raft/fsm.go +++ b/physical/raft/fsm.go @@ -88,6 +88,10 @@ type FSM struct { storeLatestState bool chunker *raftchunking.ChunkingBatchingFSM + + // testSnapshotRestoreError is used in tests to simulate an error while + // restoring a snapshot. + testSnapshotRestoreError bool } // NewFSM constructs a FSM using the given directory @@ -193,12 +197,12 @@ func (f *FSM) witnessIndex(i *IndexValue) { } } -func (f *FSM) witnessSnapshot(index, term, configurationIndex uint64, configuration raft.Configuration) error { +func (f *FSM) witnessSnapshot(metadata *raft.SnapshotMeta) error { var indexBytes []byte latestIndex, _ := f.LatestState() - latestIndex.Index = index - latestIndex.Term = term + latestIndex.Index = metadata.Index + latestIndex.Term = metadata.Term var err error indexBytes, err = proto.Marshal(latestIndex) @@ -206,7 +210,7 @@ func (f *FSM) witnessSnapshot(index, term, configurationIndex uint64, configurat return err } - protoConfig := raftConfigurationToProtoConfiguration(configurationIndex, configuration) + protoConfig := raftConfigurationToProtoConfiguration(metadata.ConfigurationIndex, metadata.Configuration) configBytes, err := proto.Marshal(protoConfig) if err != nil { return err @@ -232,8 +236,8 @@ func (f *FSM) witnessSnapshot(index, term, configurationIndex uint64, configurat } } - atomic.StoreUint64(f.latestIndex, index) - atomic.StoreUint64(f.latestTerm, term) + atomic.StoreUint64(f.latestIndex, metadata.Index) + atomic.StoreUint64(f.latestTerm, metadata.Term) f.latestConfig.Store(protoConfig) return nil @@ -575,7 +579,9 @@ func (f *FSM) writeTo(ctx context.Context, metaSink writeErrorCloser, sink write // Snapshot implements the FSM interface. It returns a noop snapshot object. func (f *FSM) Snapshot() (raft.FSMSnapshot, error) { - return &noopSnapshotter{}, nil + return &noopSnapshotter{ + fsm: f, + }, nil } // SetNoopRestore is used to disable restore operations on raft startup. Because @@ -597,6 +603,8 @@ func (f *FSM) Restore(r io.ReadCloser) error { return nil } + snapMeta := r.(*boltSnapshotMetadataReader).Metadata() + protoReader := protoio.NewDelimitedReader(r, math.MaxInt32) defer protoReader.Close() @@ -623,16 +631,23 @@ func (f *FSM) Restore(r io.ReadCloser) error { return err } + // If we are testing a failed snapshot error here. + if f.testSnapshotRestoreError { + return errors.New("Test error") + } + f.logger.Debug("snapshot restore: deleting bucket done") f.logger.Debug("snapshot restore: writing keys") var done bool + var keys int for !done { err := f.db.Update(func(tx *bolt.Tx) error { b := tx.Bucket(dataBucketName) s := new(pb.StorageEntry) - // Commit in batches of 50k. Bolt wont split pages until commit so + // Commit in batches of 50k. Bolt wont split pages until commit and + // holds // we do incremental writes. This is safe since we have a write lock // on the fsm's lock. for i := 0; i < 50000; i++ { @@ -649,6 +664,7 @@ func (f *FSM) Restore(r io.ReadCloser) error { if err != nil { return err } + keys += 1 } return nil @@ -657,18 +673,41 @@ func (f *FSM) Restore(r io.ReadCloser) error { f.logger.Error("could not restore snapshot", "error", err) return err } + + f.logger.Trace("snapshot restore: writing keys", "num_written", keys) } f.logger.Debug("snapshot restore: writing keys done") + + // Write the metadata after we have applied all the snapshot data + f.logger.Debug("snapshot restore: writing metadata") + if err := f.witnessSnapshot(snapMeta); err != nil { + f.logger.Error("could not write metadata", "error", err) + return err + } + return nil } // noopSnapshotter implements the fsm.Snapshot interface. It doesn't do anything // since our SnapshotStore reads data out of the FSM on Open(). -type noopSnapshotter struct{} +type noopSnapshotter struct { + fsm *FSM +} -// Persist doesn't do anything. +// Persist implements the fsm.Snapshot interface. It doesn't need to persist any +// state data, but it does persist the raft metadata. This is necessary so we +// can be sure to capture indexes for operation types that are not sent to the +// FSM. func (s *noopSnapshotter) Persist(sink raft.SnapshotSink) error { + boltSnapshotSink := sink.(*BoltSnapshotSink) + + // We are processing a snapshot, fastforward the index, term, and + // configuration to the latest seen by the raft system. + if err := s.fsm.witnessSnapshot(&boltSnapshotSink.meta); err != nil { + return err + } + return nil } diff --git a/physical/raft/raft_test.go b/physical/raft/raft_test.go index 40faa09b18f3..c462fb7ac3f0 100644 --- a/physical/raft/raft_test.go +++ b/physical/raft/raft_test.go @@ -76,22 +76,56 @@ func getRaftWithDir(t testing.TB, bootstrap bool, noStoreState bool, raftDir str return backend, raftDir } +func connectPeers(nodes ...*RaftBackend) { + for _, node := range nodes { + for _, peer := range nodes { + if node == peer { + continue + } + + node.raftTransport.(*raft.InmemTransport).Connect(raft.ServerAddress(peer.NodeID()), peer.raftTransport) + peer.raftTransport.(*raft.InmemTransport).Connect(raft.ServerAddress(node.NodeID()), node.raftTransport) + } + } +} + +func waitForLeader(t *testing.T, nodes ...*RaftBackend) *RaftBackend { + timeout := time.Now().Add(time.Second * 10) + for !time.Now().After(timeout) { + for _, node := range nodes { + if node.raft.Leader() == raft.ServerAddress(node.NodeID()) { + return node + } + } + time.Sleep(100 * time.Millisecond) + } + + t.Fatal("no leader") + return nil +} + func compareFSMs(t *testing.T, fsm1, fsm2 *FSM) { + if err := compareFSMsWithErr(t, fsm1, fsm2); err != nil { + t.Fatal(err) + } +} + +func compareFSMsWithErr(t *testing.T, fsm1, fsm2 *FSM) error { t.Helper() index1, config1 := fsm1.LatestState() index2, config2 := fsm2.LatestState() if !proto.Equal(index1, index2) { - t.Fatalf("indexes did not match: %+v != %+v", index1, index2) + return fmt.Errorf("indexes did not match: %+v != %+v", index1, index2) } if !proto.Equal(config1, config2) { - t.Fatalf("configs did not match: %+v != %+v", config1, config2) + return fmt.Errorf("configs did not match: %+v != %+v", config1, config2) } - compareDBs(t, fsm1.db, fsm2.db) + return compareDBs(t, fsm1.db, fsm2.db) } -func compareDBs(t *testing.T, boltDB1, boltDB2 *bolt.DB) { +func compareDBs(t *testing.T, boltDB1, boltDB2 *bolt.DB) error { db1 := make(map[string]string) db2 := make(map[string]string) @@ -135,8 +169,10 @@ func compareDBs(t *testing.T, boltDB1, boltDB2 *bolt.DB) { } if diff := deep.Equal(db1, db2); diff != nil { - t.Fatal(diff) + return fmt.Errorf("%+v", diff) } + + return nil } func TestRaft_Backend(t *testing.T) { diff --git a/physical/raft/snapshot.go b/physical/raft/snapshot.go index 8538778b5d06..7139cce7d9d7 100644 --- a/physical/raft/snapshot.go +++ b/physical/raft/snapshot.go @@ -104,13 +104,6 @@ func (f *BoltSnapshotStore) Create(version raft.SnapshotVersion, index, term uin return nil, fmt.Errorf("unsupported snapshot version %d", version) } - // We are processing a snapshot, fastforward the index, term, and - // configuration to the latest seen by the raft system. This could include - // log indexes for operation types that are never sent to the FSM. - if err := f.fsm.witnessSnapshot(index, term, configurationIndex, configuration); err != nil { - return nil, err - } - // Create the sink sink := &BoltSnapshotSink{ store: f, @@ -208,6 +201,11 @@ func (f *BoltSnapshotStore) Open(id string) (*raft.SnapshotMeta, io.ReadCloser, if err != nil { return nil, nil, err } + + readCloser = &boltSnapshotMetadataReader{ + meta: meta, + ReadCloser: readCloser, + } } return meta, readCloser, nil @@ -286,3 +284,12 @@ func (s *BoltSnapshotSink) Cancel() error { return nil } + +type boltSnapshotMetadataReader struct { + io.ReadCloser + meta *raft.SnapshotMeta +} + +func (r *boltSnapshotMetadataReader) Metadata() *raft.SnapshotMeta { + return r.meta +} diff --git a/physical/raft/snapshot_test.go b/physical/raft/snapshot_test.go index 5851a2c0e918..547d2386e5c0 100644 --- a/physical/raft/snapshot_test.go +++ b/physical/raft/snapshot_test.go @@ -345,6 +345,87 @@ func TestRaft_Snapshot_Restart(t *testing.T) { compareFSMs(t, raft1.fsm, raft2.fsm) } +func TestRaft_Snapshot_ErrorRecovery(t *testing.T) { + raft1, dir := getRaft(t, true, false) + raft2, dir2 := getRaft(t, false, false) + raft3, dir3 := getRaft(t, false, false) + defer os.RemoveAll(dir) + defer os.RemoveAll(dir2) + defer os.RemoveAll(dir3) + + // Add raft2 to the cluster + addPeer(t, raft1, raft2) + + // Write some data + for i := 0; i < 100; i++ { + err := raft1.Put(context.Background(), &physical.Entry{ + Key: fmt.Sprintf("key-%d", i), + Value: []byte(fmt.Sprintf("value-%d", i)), + }) + if err != nil { + t.Fatal(err) + } + } + + // Take a snapshot + snapFuture := raft1.raft.Snapshot() + if err := snapFuture.Error(); err != nil { + t.Fatal(err) + } + // Advance FSM's index past configuration change + raft1.Put(context.Background(), &physical.Entry{ + Key: "key", + Value: []byte("value"), + }) + + // Error on snapshot restore + raft3.fsm.testSnapshotRestoreError = true + + // Add raft3 to the cluster + addPeer(t, raft1, raft3) + + time.Sleep(2 * time.Second) + + // Restart the failing node to make sure fresh state does not have invalid + // values. + if err := raft3.TeardownCluster(nil); err != nil { + t.Fatal(err) + } + + // Ensure the databases are not equal + if err := compareFSMsWithErr(t, raft1.fsm, raft3.fsm); err == nil { + t.Fatal("nil error") + } + + // Remove error and make sure we can reconcile state + raft3.fsm.testSnapshotRestoreError = false + + // Shutdown raft1 + if err := raft1.TeardownCluster(nil); err != nil { + t.Fatal(err) + } + + // Start Raft1 + if err := raft1.SetupCluster(context.Background(), SetupOpts{}); err != nil { + t.Fatal(err) + } + + connectPeers(raft1, raft2) + waitForLeader(t, raft1, raft2) + + // Start Raft3 + if err := raft3.SetupCluster(context.Background(), SetupOpts{}); err != nil { + t.Fatal(err) + } + + connectPeers(raft1, raft2, raft3) + waitForLeader(t, raft1, raft2) + + time.Sleep(5 * time.Second) + // Make sure state gets re-replicated. + compareFSMs(t, raft1.fsm, raft3.fsm) +} + func TestRaft_Snapshot_Take_Restore(t *testing.T) { raft1, dir := getRaft(t, true, false) defer os.RemoveAll(dir) From 064e244104de94ca1a1252be29d698812cc198ed Mon Sep 17 00:00:00 2001 From: Brian Kassouf Date: Tue, 21 Apr 2020 11:04:51 -0700 Subject: [PATCH 3/6] make sure tests send a snapshot --- physical/raft/raft_test.go | 21 ++++++++++++++++++++ physical/raft/snapshot_test.go | 35 +++++++++++++++++----------------- 2 files changed, 39 insertions(+), 17 deletions(-) diff --git a/physical/raft/raft_test.go b/physical/raft/raft_test.go index c462fb7ac3f0..930e46a40d07 100644 --- a/physical/raft/raft_test.go +++ b/physical/raft/raft_test.go @@ -89,7 +89,26 @@ func connectPeers(nodes ...*RaftBackend) { } } +func stepDownLeader(t *testing.T, node *RaftBackend) { + t.Helper() + + if err := node.raft.LeadershipTransfer().Error(); err != nil { + t.Fatal(err) + } + + timeout := time.Now().Add(time.Second * 10) + for !time.Now().After(timeout) { + if err := node.raft.VerifyLeader().Error(); err != nil { + return + } + time.Sleep(100 * time.Millisecond) + } + + t.Fatal("still leader") +} + func waitForLeader(t *testing.T, nodes ...*RaftBackend) *RaftBackend { + t.Helper() timeout := time.Now().Add(time.Second * 10) for !time.Now().After(timeout) { for _, node := range nodes { @@ -105,6 +124,7 @@ func waitForLeader(t *testing.T, nodes ...*RaftBackend) *RaftBackend { } func compareFSMs(t *testing.T, fsm1, fsm2 *FSM) { + t.Helper() if err := compareFSMsWithErr(t, fsm1, fsm2); err != nil { t.Fatal(err) } @@ -126,6 +146,7 @@ func compareFSMsWithErr(t *testing.T, fsm1, fsm2 *FSM) error { } func compareDBs(t *testing.T, boltDB1, boltDB2 *bolt.DB) error { + t.Helper() db1 := make(map[string]string) db2 := make(map[string]string) diff --git a/physical/raft/snapshot_test.go b/physical/raft/snapshot_test.go index 547d2386e5c0..33b1d1b22e82 100644 --- a/physical/raft/snapshot_test.go +++ b/physical/raft/snapshot_test.go @@ -367,13 +367,22 @@ func TestRaft_Snapshot_ErrorRecovery(t *testing.T) { } } - // Take a snapshot + // Take a snapshot on each node to ensure we no longer have older logs snapFuture := raft1.raft.Snapshot() if err := snapFuture.Error(); err != nil { t.Fatal(err) } - // Advance FSM's index past configuration change - raft1.Put(context.Background(), &physical.Entry{ + + stepDownLeader(t, raft1) + leader := waitForLeader(t, raft1, raft2) + + snapFuture = leader.raft.Snapshot() + if err := snapFuture.Error(); err != nil { + t.Fatal(err) + } + + // Advance FSM's index past snapshot index + leader.Put(context.Background(), &physical.Entry{ Key: "key", Value: []byte("value"), }) @@ -382,7 +391,7 @@ func TestRaft_Snapshot_ErrorRecovery(t *testing.T) { raft3.fsm.testSnapshotRestoreError = true // Add raft3 to the cluster - addPeer(t, raft1, raft3) + addPeer(t, leader, raft3) time.Sleep(2 * time.Second) @@ -393,25 +402,16 @@ func TestRaft_Snapshot_ErrorRecovery(t *testing.T) { } // Ensure the databases are not equal - if err := compareFSMsWithErr(t, raft1.fsm, raft3.fsm); err == nil { + if err := compareFSMsWithErr(t, leader.fsm, raft3.fsm); err == nil { t.Fatal("nil error") } // Remove error and make sure we can reconcile state raft3.fsm.testSnapshotRestoreError = false - // Shutdown raft1 - if err := raft1.TeardownCluster(nil); err != nil { - t.Fatal(err) - } - - // Start Raft1 - if err := raft1.SetupCluster(context.Background(), SetupOpts{}); err != nil { - t.Fatal(err) - } - - connectPeers(raft1, raft2) - waitForLeader(t, raft1, raft2) + // Step down leader node + stepDownLeader(t, leader) + leader = waitForLeader(t, raft1, raft2) // Start Raft3 if err := raft3.SetupCluster(context.Background(), SetupOpts{}); err != nil { @@ -422,6 +422,7 @@ func TestRaft_Snapshot_ErrorRecovery(t *testing.T) { waitForLeader(t, raft1, raft2) time.Sleep(5 * time.Second) + // Make sure state gets re-replicated. compareFSMs(t, raft1.fsm, raft3.fsm) } From 1de70afef471e5dce86cf912951df7715a7e52e0 Mon Sep 17 00:00:00 2001 From: Brian Kassouf Date: Tue, 21 Apr 2020 13:59:36 -0700 Subject: [PATCH 4/6] Fix comment --- physical/raft/fsm.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/physical/raft/fsm.go b/physical/raft/fsm.go index 6669dd006052..338b785092c9 100644 --- a/physical/raft/fsm.go +++ b/physical/raft/fsm.go @@ -646,10 +646,9 @@ func (f *FSM) Restore(r io.ReadCloser) error { b := tx.Bucket(dataBucketName) s := new(pb.StorageEntry) - // Commit in batches of 50k. Bolt wont split pages until commit and - // holds - // we do incremental writes. This is safe since we have a write lock - // on the fsm's lock. + // Commit in batches of 50k. Bolt holds all the data in memory and + // doesn't split the pages until commit so we do incremental writes. + // This is safe since we have a write lock on the fsm's lock. for i := 0; i < 50000; i++ { err := protoReader.ReadMsg(s) if err != nil { From c44a39c50fbbb258fbd92cc10d6ff483117fa870 Mon Sep 17 00:00:00 2001 From: Brian Kassouf Date: Thu, 23 Apr 2020 11:08:15 -0700 Subject: [PATCH 5/6] Don't remove metrics --- physical/raft/fsm.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/physical/raft/fsm.go b/physical/raft/fsm.go index 338b785092c9..5cb49307c7ad 100644 --- a/physical/raft/fsm.go +++ b/physical/raft/fsm.go @@ -281,6 +281,8 @@ func (f *FSM) DeletePrefix(ctx context.Context, prefix string) error { // Get retrieves the value at the given path from the bolt file. func (f *FSM) Get(ctx context.Context, path string) (*physical.Entry, error) { + // TODO: Remove this outdated metric name in an older release + defer metrics.MeasureSince([]string{"raft", "get"}, time.Now()) defer metrics.MeasureSince([]string{"raft_storage", "fsm", "get"}, time.Now()) f.l.RLock() @@ -328,6 +330,8 @@ func (f *FSM) Put(ctx context.Context, entry *physical.Entry) error { // List retrieves the set of keys with the given prefix from the bolt file. func (f *FSM) List(ctx context.Context, prefix string) ([]string, error) { + // TODO: Remove this outdated metric name in an older release + defer metrics.MeasureSince([]string{"raft", "list"}, time.Now()) defer metrics.MeasureSince([]string{"raft_storage", "fsm", "list"}, time.Now()) f.l.RLock() From 1f434f34023585db52ca4adad3933142282db139 Mon Sep 17 00:00:00 2001 From: Brian Kassouf Date: Thu, 23 Apr 2020 11:10:35 -0700 Subject: [PATCH 6/6] Fix comment --- physical/raft/fsm.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/physical/raft/fsm.go b/physical/raft/fsm.go index 5cb49307c7ad..7c49fbadd054 100644 --- a/physical/raft/fsm.go +++ b/physical/raft/fsm.go @@ -330,7 +330,7 @@ func (f *FSM) Put(ctx context.Context, entry *physical.Entry) error { // List retrieves the set of keys with the given prefix from the bolt file. func (f *FSM) List(ctx context.Context, prefix string) ([]string, error) { - // TODO: Remove this outdated metric name in an older release + // TODO: Remove this outdated metric name in a future release defer metrics.MeasureSince([]string{"raft", "list"}, time.Now()) defer metrics.MeasureSince([]string{"raft_storage", "fsm", "list"}, time.Now())