Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

node: Add follower node for sync mode #5009

Merged
merged 28 commits into from
Feb 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
cb2d88a
Add data node for sync mode
Eric-Warehime Jan 12, 2023
82660fe
Add NodeSyncMode to example config
Eric-Warehime Jan 12, 2023
9cdc496
Fix linting issues
Eric-Warehime Jan 12, 2023
faa450b
Register all default handlers
Eric-Warehime Jan 12, 2023
22712bd
Add tests
Eric-Warehime Jan 13, 2023
7bbecc0
Add license
Eric-Warehime Jan 13, 2023
a7f2b6d
PR updates
Eric-Warehime Jan 18, 2023
ab59d0f
Update config value default to v27
Eric-Warehime Jan 18, 2023
04184df
Fix catchpoint catchup, add restriction on sync round
Eric-Warehime Jan 19, 2023
14ced78
Fix handler test, add catchpoint test
Eric-Warehime Jan 19, 2023
12591d4
Remove some more, add ignore messages to wsNetwork
Eric-Warehime Jan 20, 2023
68aa160
Rename new node type
Eric-Warehime Jan 23, 2023
be3df47
Cleanup lefotver name issue
Eric-Warehime Jan 23, 2023
eec2969
Verification
Eric-Warehime Jan 23, 2023
b062a0a
Fix test failure
Eric-Warehime Jan 23, 2023
caccdb9
Stop embedding full node in follower
Eric-Warehime Jan 23, 2023
77bda18
Refactor
Eric-Warehime Jan 24, 2023
592040c
Add mutex back
Eric-Warehime Jan 24, 2023
8b474c9
Combine interfaces
Eric-Warehime Jan 24, 2023
7ff6147
No-op sync methods on full node
Eric-Warehime Jan 24, 2023
168dc2a
Update node/follower_node.go
Eric-Warehime Jan 25, 2023
9feb083
Update node/follower_node.go
Eric-Warehime Jan 25, 2023
2826c08
Add comment to test case
Eric-Warehime Jan 25, 2023
cf18626
PR comments
Eric-Warehime Jan 25, 2023
1126399
Add e2e test, syncNow channel to avoid waiting for timeout
Eric-Warehime Jan 30, 2023
6f75754
Return error when devMode and followMode are true
Eric-Warehime Jan 30, 2023
65c85cf
Fix failed license check
Eric-Warehime Jan 30, 2023
c0f9cf1
Don't ever close the channel
Eric-Warehime Jan 31, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions catchup/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ type Service struct {
protocolErrorLogged bool
lastSupportedRound basics.Round
unmatchedPendingCertificates <-chan PendingUnmatchedCertificate
// This channel signals periodSync to attempt catchup immediately. This allows us to start fetching rounds from
// the network as soon as disableSyncRound is modified.
syncNow chan struct{}
}

// A BlockAuthenticator authenticates blocks given a certificate.
Expand Down Expand Up @@ -122,6 +125,7 @@ func MakeService(log logging.Logger, config config.Local, net network.GossipNode
s.parallelBlocks = config.CatchupParallelBlocks
s.deadlineTimeout = agreement.DeadlineTimeout()
s.blockValidationPool = blockValidationPool
s.syncNow = make(chan struct{})

return s
}
Expand Down Expand Up @@ -160,12 +164,18 @@ func (s *Service) SetDisableSyncRound(rnd uint64) error {
return ErrSyncRoundInvalid
}
atomic.StoreUint64(&s.disableSyncRound, rnd)
if syncing, initial := s.IsSynchronizing(); !syncing && !initial {
s.syncNow <- struct{}{}
}
return nil
}

// UnsetDisableSyncRound removes any previously set disabled sync round
func (s *Service) UnsetDisableSyncRound() {
atomic.StoreUint64(&s.disableSyncRound, 0)
if syncing, initial := s.IsSynchronizing(); !syncing && !initial {
s.syncNow <- struct{}{}
}
}

// GetDisableSyncRound returns the disabled sync round
Expand Down Expand Up @@ -575,6 +585,13 @@ func (s *Service) periodicSync() {
// we want to sleep for a random duration since it would "de-syncronize" us from the ledger advance sync
sleepDuration = time.Duration(crypto.RandUint63()) % s.deadlineTimeout
continue
case <-s.syncNow:
if s.parallelBlocks == 0 || s.ledger.IsWritingCatchpointDataFile() {
continue
}
s.suspendForCatchpointWriting = false
s.log.Info("Immediate resync triggered; resyncing")
s.sync()
case <-time.After(sleepDuration):
if sleepDuration < s.deadlineTimeout || s.cfg.DisableNetworking {
sleepDuration = s.deadlineTimeout
Expand Down
5 changes: 5 additions & 0 deletions config/localTemplate.go
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,11 @@ type Local struct {
// EnableExperimentalAPI enables experimental API endpoint. Note that these endpoints have no
// guarantees in terms of functionality or future support.
EnableExperimentalAPI bool `version[26]:"false"`

// EnableFollowMode launches the node in "follower" mode. This turns off the agreement service,
// and APIs related to broadcasting transactions, and enables APIs which can retrieve detailed information
// from ledger caches and can control the ledger round.
EnableFollowMode bool `version[27]:"false"`
}

// DNSBootstrapArray returns an array of one or more DNS Bootstrap identifiers
Expand Down
1 change: 1 addition & 0 deletions config/local_defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ var defaultLocal = Local{
EnableCatchupFromArchiveServers: false,
EnableDeveloperAPI: false,
EnableExperimentalAPI: false,
EnableFollowMode: false,
EnableGossipBlockService: true,
EnableIncomingMessageFilter: false,
EnableLedgerService: false,
Expand Down
38 changes: 32 additions & 6 deletions daemon/algod/api/client/restClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,8 +266,8 @@ func (client RestClient) getRaw(response RawResponse, path string, request inter
// post sends a POST request to the given path with the given request object.
// No query parameters will be sent if request is nil.
// response must be a pointer to an object as post writes the response there.
func (client RestClient) post(response interface{}, path string, request interface{}) error {
return client.submitForm(response, path, request, "POST", true /* encodeJSON */, true /* decodeJSON */, false)
func (client RestClient) post(response interface{}, path string, request interface{}, expectNoContent bool) error {
return client.submitForm(response, path, request, "POST", true /* encodeJSON */, true /* decodeJSON */, expectNoContent)
}

// Status retrieves the StatusResponse from the running node
Expand Down Expand Up @@ -504,7 +504,7 @@ func (client RestClient) SuggestedParams() (response model.TransactionParameters

// SendRawTransaction gets a SignedTxn and broadcasts it to the network
func (client RestClient) SendRawTransaction(txn transactions.SignedTxn) (response model.PostTransactionsResponse, err error) {
err = client.post(&response, "/v2/transactions", protocol.Encode(&txn))
err = client.post(&response, "/v2/transactions", protocol.Encode(&txn), false)
return
}

Expand All @@ -518,7 +518,7 @@ func (client RestClient) SendRawTransactionGroup(txgroup []transactions.SignedTx
}

var response model.PostTransactionsResponse
return client.post(&response, "/v2/transactions", enc)
return client.post(&response, "/v2/transactions", enc, false)
}

// Block gets the block info for the given round
Expand All @@ -538,7 +538,7 @@ func (client RestClient) RawBlock(round uint64) (response []byte, err error) {
// Shutdown requests the node to shut itself down
func (client RestClient) Shutdown() (err error) {
response := 1
err = client.post(&response, "/v2/shutdown", nil)
err = client.post(&response, "/v2/shutdown", nil, false)
return
}

Expand Down Expand Up @@ -650,7 +650,7 @@ func (client RestClient) TransactionProof(txid string, round uint64, hashType cr

// PostParticipationKey sends a key file to the node.
func (client RestClient) PostParticipationKey(file []byte) (response model.PostParticipationResponse, err error) {
err = client.post(&response, "/v2/participation", file)
err = client.post(&response, "/v2/participation", file, false)
return
}

Expand All @@ -672,3 +672,29 @@ func (client RestClient) RemoveParticipationKeyByID(participationID string) (err
return

}

/* Endpoint registered for follower nodes */

// SetSyncRound sets the sync round for the catchup service
func (client RestClient) SetSyncRound(round uint64) (err error) {
err = client.post(nil, fmt.Sprintf("/v2/ledger/sync/%d", round), nil, true)
return
}

// UnsetSyncRound deletes the sync round constraint
func (client RestClient) UnsetSyncRound() (err error) {
err = client.delete(nil, "/v2/ledger/sync", nil, true)
return
}

// GetSyncRound retrieves the sync round (if set)
func (client RestClient) GetSyncRound() (response model.GetSyncRoundResponse, err error) {
err = client.get(&response, "/v2/ledger/sync", nil)
return
}

// GetLedgerStateDelta retrieves the ledger state delta for the round
func (client RestClient) GetLedgerStateDelta(round uint64) (response model.LedgerStateDeltaResponse, err error) {
err = client.get(&response, fmt.Sprintf("/v2/deltas/%d", round), nil)
return
}
10 changes: 8 additions & 2 deletions daemon/algod/api/server/lib/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,19 @@ import (

"github.com/labstack/echo/v4"

"github.com/algorand/go-algorand/crypto"
"github.com/algorand/go-algorand/logging"
"github.com/algorand/go-algorand/node"
)

// GenesisJSONText is initialized when the node starts.
var GenesisJSONText string

// NodeInterface defines the node's methods required by the common APIs
type NodeInterface interface {
GenesisHash() crypto.Digest
GenesisID() string
}

// HandlerFunc defines a wrapper for http.HandlerFunc that includes a context
type HandlerFunc func(ReqContext, echo.Context)

Expand All @@ -45,7 +51,7 @@ type Routes []Route
// ReqContext is passed to each of the handlers below via wrapCtx, allowing
// handlers to interact with the node
type ReqContext struct {
Node *node.AlgorandFullNode
Node NodeInterface
Log logging.Logger
Context echo.Context
Shutdown <-chan struct{}
Expand Down
28 changes: 23 additions & 5 deletions daemon/algod/api/server/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/algorand/go-algorand/daemon/algod/api/server/lib/middlewares"
"github.com/algorand/go-algorand/daemon/algod/api/server/v1/routes"
v2 "github.com/algorand/go-algorand/daemon/algod/api/server/v2"
"github.com/algorand/go-algorand/daemon/algod/api/server/v2/generated/data"
"github.com/algorand/go-algorand/daemon/algod/api/server/v2/generated/experimental"
npprivate "github.com/algorand/go-algorand/daemon/algod/api/server/v2/generated/nonparticipating/private"
nppublic "github.com/algorand/go-algorand/daemon/algod/api/server/v2/generated/nonparticipating/public"
Expand All @@ -40,6 +41,12 @@ import (
"github.com/algorand/go-algorand/util/tokens"
)

// APINodeInterface describes all the node methods required by common and v2 APIs, and the server/router
type APINodeInterface interface {
lib.NodeInterface
algorandskiy marked this conversation as resolved.
Show resolved Hide resolved
v2.NodeInterface
}

const (
apiV1Tag = "/v1"
// TokenHeader is the header where we put the token.
Expand All @@ -63,7 +70,7 @@ func registerHandlers(router *echo.Echo, prefix string, routes lib.Routes, ctx l
}

// NewRouter builds and returns a new router with our REST handlers registered.
func NewRouter(logger logging.Logger, node *node.AlgorandFullNode, shutdown <-chan struct{}, apiToken string, adminAPIToken string, listener net.Listener, numConnectionsLimit uint64) *echo.Echo {
func NewRouter(logger logging.Logger, node APINodeInterface, shutdown <-chan struct{}, apiToken string, adminAPIToken string, listener net.Listener, numConnectionsLimit uint64) *echo.Echo {
if err := tokens.ValidateAPIToken(apiToken); err != nil {
logger.Errorf("Invalid apiToken was passed to NewRouter ('%s'): %v", apiToken, err)
}
Expand Down Expand Up @@ -104,7 +111,7 @@ func NewRouter(logger logging.Logger, node *node.AlgorandFullNode, shutdown <-ch

// Registering v2 routes
v2Handler := v2.Handlers{
Node: apiNode{node},
Node: node,
Log: logger,
Shutdown: shutdown,
}
Expand All @@ -113,14 +120,25 @@ func NewRouter(logger logging.Logger, node *node.AlgorandFullNode, shutdown <-ch
ppublic.RegisterHandlers(e, &v2Handler, apiAuthenticator)
pprivate.RegisterHandlers(e, &v2Handler, adminAuthenticator)

if node.Config().EnableFollowMode {
data.RegisterHandlers(e, &v2Handler, apiAuthenticator)
}

if node.Config().EnableExperimentalAPI {
experimental.RegisterHandlers(e, &v2Handler, apiAuthenticator)
}

return e
}

// apiNode wraps the AlgorandFullNode to provide v2.NodeInterface.
type apiNode struct{ *node.AlgorandFullNode }
// FollowerNode wraps the AlgorandFollowerNode to provide v2.NodeInterface.
type FollowerNode struct{ *node.AlgorandFollowerNode }

// LedgerForAPI implements the v2.Handlers interface
func (n FollowerNode) LedgerForAPI() v2.LedgerForAPI { return n.Ledger() }

// APINode wraps the AlgorandFullNode to provide v2.NodeInterface.
type APINode struct{ *node.AlgorandFullNode }

func (n apiNode) LedgerForAPI() v2.LedgerForAPI { return n.Ledger() }
// LedgerForAPI implements the v2.Handlers interface
func (n APINode) LedgerForAPI() v2.LedgerForAPI { return n.Ledger() }
2 changes: 2 additions & 0 deletions daemon/algod/api/server/v2/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -1300,6 +1300,8 @@ func (v2 *Handlers) startCatchup(ctx echo.Context, catchpoint string) error {
code = http.StatusOK
case *node.CatchpointUnableToStartError:
return badRequest(ctx, err, err.Error(), v2.Log)
case *node.CatchpointSyncRoundFailure:
return badRequest(ctx, err, fmt.Sprintf(errFailedToStartCatchup, err), v2.Log)
default:
return internalError(ctx, err, fmt.Sprintf(errFailedToStartCatchup, err), v2.Log)
}
Expand Down
10 changes: 5 additions & 5 deletions daemon/algod/api/server/v2/test/handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -706,11 +706,7 @@ func startCatchupTest(t *testing.T, catchpoint string, nodeError error, expected
defer releasefunc()
dummyShutdownChan := make(chan struct{})
mockNode := makeMockNode(mockLedger, t.Name(), nodeError, false)
handler := v2.Handlers{
Node: mockNode,
Log: logging.Base(),
Shutdown: dummyShutdownChan,
}
handler := v2.Handlers{Node: mockNode, Log: logging.Base(), Shutdown: dummyShutdownChan}
e := echo.New()
req := httptest.NewRequest(http.MethodPost, "/", nil)
rec := httptest.NewRecorder()
Expand All @@ -737,6 +733,10 @@ func TestStartCatchup(t *testing.T) {

badCatchPoint := "bad catchpoint"
startCatchupTest(t, badCatchPoint, nil, 400)

// Test that a catchup fails w/ 400 when the catchpoint round is > syncRound (while syncRound is set)
syncRoundError := node.MakeCatchpointSyncRoundFailure(goodCatchPoint, 1)
startCatchupTest(t, goodCatchPoint, syncRoundError, 400)
}

func abortCatchupTest(t *testing.T, catchpoint string, expectedCode int) {
Expand Down
23 changes: 20 additions & 3 deletions daemon/algod/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,14 @@ import (

var server http.Server

// ServerNode is the required methods for any node the server fronts
type ServerNode interface {
apiServer.APINodeInterface
ListeningAddress() (string, bool)
Start()
Stop()
}

// Server represents an instance of the REST API HTTP server
type Server struct {
RootPath string
Expand All @@ -57,7 +65,7 @@ type Server struct {
netFile string
netListenFile string
log logging.Logger
node *node.AlgorandFullNode
node ServerNode
metricCollector *metrics.MetricService
metricServiceStarted bool
stopping chan struct{}
Expand Down Expand Up @@ -171,14 +179,23 @@ func (s *Server) Initialize(cfg config.Local, phonebookAddresses []string, genes
NodeExporterPath: cfg.NodeExporterPath,
})

s.node, err = node.MakeFull(s.log, s.RootPath, cfg, phonebookAddresses, s.Genesis)
var serverNode ServerNode
if cfg.EnableFollowMode {
var followerNode *node.AlgorandFollowerNode
followerNode, err = node.MakeFollower(s.log, s.RootPath, cfg, phonebookAddresses, s.Genesis)
serverNode = apiServer.FollowerNode{AlgorandFollowerNode: followerNode}
} else {
var fullNode *node.AlgorandFullNode
fullNode, err = node.MakeFull(s.log, s.RootPath, cfg, phonebookAddresses, s.Genesis)
serverNode = apiServer.APINode{AlgorandFullNode: fullNode}
}
if os.IsNotExist(err) {
return fmt.Errorf("node has not been installed: %s", err)
}
if err != nil {
return fmt.Errorf("couldn't initialize the node: %s", err)
}

s.node = serverNode
return nil
}

Expand Down
1 change: 1 addition & 0 deletions installer/config.json.example
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
"EnableCatchupFromArchiveServers": false,
"EnableDeveloperAPI": false,
"EnableExperimentalAPI": false,
"EnableFollowMode": false,
"EnableGossipBlockService": true,
"EnableIncomingMessageFilter": false,
"EnableLedgerService": false,
Expand Down
27 changes: 27 additions & 0 deletions libgoal/libgoal.go
Original file line number Diff line number Diff line change
Expand Up @@ -1287,3 +1287,30 @@ func (c *Client) LightBlockHeaderProof(round uint64) (resp model.LightBlockHeade
}
return
}

// SetSyncRound sets the sync round on a node w/ EnableFollowMode
func (c *Client) SetSyncRound(round uint64) (err error) {
algod, err := c.ensureAlgodClient()
if err == nil {
return algod.SetSyncRound(round)
}
return
}

// GetSyncRound gets the sync round on a node w/ EnableFollowMode
func (c *Client) GetSyncRound() (rep model.GetSyncRoundResponse, err error) {
algod, err := c.ensureAlgodClient()
if err == nil {
return algod.GetSyncRound()
}
return
}

// GetLedgerStateDelta gets the LedgerStateDelta on a node w/ EnableFollowMode
func (c *Client) GetLedgerStateDelta(round uint64) (rep model.LedgerStateDeltaResponse, err error) {
algod, err := c.ensureAlgodClient()
if err == nil {
return algod.GetLedgerStateDelta(round)
}
return
}
21 changes: 21 additions & 0 deletions node/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,24 @@ func (e *CatchpointUnableToStartError) Error() string {
e.catchpointRequested,
e.catchpointRunning)
}

// CatchpointSyncRoundFailure indicates that the requested catchpoint is beyond the currently set sync round
type CatchpointSyncRoundFailure struct {
catchpoint string
syncRound uint64
}

// MakeCatchpointSyncRoundFailure creates the error type
func MakeCatchpointSyncRoundFailure(catchpoint string, syncRound uint64) *CatchpointSyncRoundFailure {
return &CatchpointSyncRoundFailure{
catchpoint: catchpoint,
syncRound: syncRound,
}
}

// Error satisfies the builtin `error` interface
func (e *CatchpointSyncRoundFailure) Error() string {
return fmt.Sprintf(
"unable to start catchpoint catchup for '%s' - resulting round is beyond current sync round '%v'",
e.catchpoint, e.syncRound)
}
Loading