Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switch from fullNode catchup to catchpoint catchup service. #1076

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 28 additions & 27 deletions processor/blockprocessor/initialize.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,20 @@ import (
"context"
"fmt"
"os"
"path/filepath"
"time"

log "github.com/sirupsen/logrus"

"github.com/algorand/go-algorand-sdk/client/v2/algod"
algodConfig "github.com/algorand/go-algorand/config"
"github.com/algorand/go-algorand/data/basics"
"github.com/algorand/go-algorand/data/bookkeeping"
"github.com/algorand/go-algorand/ledger/ledgercore"
"github.com/algorand/go-algorand/logging"
"github.com/algorand/go-algorand/node"
"github.com/algorand/go-algorand/protocol"
"github.com/algorand/go-algorand/rpcs"
log "github.com/sirupsen/logrus"

"github.com/algorand/indexer/fetcher"
"github.com/algorand/indexer/idb"
"github.com/algorand/indexer/processor"
"github.com/algorand/indexer/processor/blockprocessor/internal"
)

// InitializeLedgerSimple executes the migration core functionality.
Expand Down Expand Up @@ -67,6 +64,9 @@ func InitializeLedgerSimple(ctx context.Context, logger *log.Logger, round uint6
return nil
}

/*
// TODO: When we are happy with the state of ledger initialization, remove this code.
// If we add "stop at round" to the node, it may be useful to change back to this.
func fullNodeCatchup(ctx context.Context, logger *log.Logger, round basics.Round, catchpoint, dataDir string, genesis bookkeeping.Genesis) error {
ctx, cf := context.WithCancel(ctx)
defer cf()
Expand Down Expand Up @@ -111,29 +111,10 @@ func fullNodeCatchup(ctx context.Context, logger *log.Logger, round basics.Round
logger.Infof("Catchpoint Catchup Total Blocks %d ", status.CatchpointCatchupTotalBlocks)
logger.Infof("Catchpoint Catchup Acquired Blocks %d ", status.CatchpointCatchupAcquiredBlocks)
}

}
logger.Infof("fast catchup completed in %v", status.CatchupTime.Seconds())
return nil
}

// InitializeLedgerFastCatchup executes the migration core functionality.
func InitializeLedgerFastCatchup(ctx context.Context, logger *log.Logger, catchpoint, dataDir string, genesis bookkeeping.Genesis) error {
if dataDir == "" {
return fmt.Errorf("InitializeLedgerFastCatchup() err: indexer data directory missing")
}
// catchpoint round
round, _, err := ledgercore.ParseCatchpointLabel(catchpoint)
if err != nil {
return fmt.Errorf("InitializeLedgerFastCatchup() err: %w", err)
}
logger.Infof("fast catchup completed in %v, moving files to data directory", status.CatchupTime.Seconds())

// TODO: switch to catchup service catchup.
//err = internal.CatchupServiceCatchup(logger, round, catchpoint, dataDir, genesis)
err = fullNodeCatchup(ctx, logger, round, catchpoint, dataDir, genesis)
if err != nil {
return fmt.Errorf("fullNodeCatchup() err: %w", err)
}
// remove node directory after fast catchup completes
defer os.RemoveAll(filepath.Join(dataDir, genesis.ID()))
// move ledger to indexer directory
Expand All @@ -148,11 +129,31 @@ func InitializeLedgerFastCatchup(ctx context.Context, logger *log.Logger, catchp
for _, f := range ledgerFiles {
err = os.Rename(filepath.Join(dataDir, genesis.ID(), f), filepath.Join(dataDir, f))
if err != nil {
return fmt.Errorf("InitializeLedgerFastCatchup() err: %w", err)
return fmt.Errorf("fullNodeCatchup() err: %w", err)
}
}
return nil
}
*/

// InitializeLedgerFastCatchup executes the migration core functionality.
func InitializeLedgerFastCatchup(ctx context.Context, logger *log.Logger, catchpoint, dataDir string, genesis bookkeeping.Genesis) error {
if dataDir == "" {
return fmt.Errorf("InitializeLedgerFastCatchup() err: indexer data directory missing")
}
// catchpoint round
round, _, err := ledgercore.ParseCatchpointLabel(catchpoint)
if err != nil {
return fmt.Errorf("InitializeLedgerFastCatchup() err: %w", err)
}

err = internal.CatchupServiceCatchup(ctx, logger, round, catchpoint, dataDir, genesis)
//err = fullNodeCatchup(ctx, logger, round, catchpoint, dataDir, genesis)
if err != nil {
return fmt.Errorf("InitializeLedgerFastCatchup() err: %w", err)
}
return nil
}

// blockHandler creates a handler complying to the fetcher block handler interface. In case of a failure it keeps
// attempting to add the block until the fetcher shuts down.
Expand Down
50 changes: 50 additions & 0 deletions processor/blockprocessor/initialize_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,23 @@ import (
"fmt"
"os"
"testing"
"time"

"github.com/jarcoal/httpmock"
"github.com/sirupsen/logrus"
test2 "github.com/sirupsen/logrus/hooks/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/algorand/go-algorand-sdk/client/v2/algod"
"github.com/algorand/go-algorand-sdk/encoding/json"
"github.com/algorand/go-algorand-sdk/encoding/msgpack"
"github.com/algorand/go-algorand/data/basics"
"github.com/algorand/go-algorand/data/bookkeeping"
"github.com/algorand/go-algorand/rpcs"

"github.com/algorand/indexer/idb"
"github.com/algorand/indexer/processor/blockprocessor/internal"
"github.com/algorand/indexer/util"
"github.com/algorand/indexer/util/test"
)
Expand Down Expand Up @@ -83,3 +87,49 @@ func TestRunMigration(t *testing.T) {
assert.Equal(t, uint64(6), uint64(l.Latest()))
l.Close()
}

func TestInitializeLedgerFastCatchup_Errors(t *testing.T) {
log, _ := test2.NewNullLogger()
err := InitializeLedgerFastCatchup(context.Background(), log, "asdf", "", bookkeeping.Genesis{})
require.EqualError(t, err, "InitializeLedgerFastCatchup() err: indexer data directory missing")

err = InitializeLedgerFastCatchup(context.Background(), log, "asdf", t.TempDir(), bookkeeping.Genesis{})
require.EqualError(t, err, "InitializeLedgerFastCatchup() err: catchpoint parsing failed")

tryToRun := func(ctx context.Context) {
var addr basics.Address
genesis := bookkeeping.Genesis{
SchemaID: "1",
Network: "test",
Proto: "future",
Allocation: nil,
RewardsPool: addr.String(),
FeeSink: addr.String(),
Timestamp: 0,
Comment: "",
DevMode: false,
}
err = InitializeLedgerFastCatchup(
ctx,
logrus.New(),
"21890000#BOGUSTCNVEDIBNRPNCKWRBQLJ7ILXIJBYKAHF67TLUOYRUGHW7ZA",
t.TempDir(),
genesis)
require.EqualError(t, err, "InitializeLedgerFastCatchup() err: context canceled")
}

// Run with an immediate cancel
ctx, cf := context.WithCancel(context.Background())
cf() // cancel immediately
tryToRun(ctx)

// This should hit a couple extra branches
ctx, cf = context.WithCancel(context.Background())
internal.Delay = 1 * time.Millisecond
// cancel after a short delay
go func() {
time.Sleep(1 * time.Second)
cf()
}()
tryToRun(ctx)
}
28 changes: 20 additions & 8 deletions processor/blockprocessor/internal/catchupservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (
"github.com/algorand/indexer/util"
)

var Delay = 5 * time.Second

// makeNodeProvider initializes the node provider.
func makeNodeProvider(ctx context.Context) nodeProvider {
return nodeProvider{
Expand Down Expand Up @@ -50,29 +52,32 @@ func (n nodeProvider) SetCatchpointCatchupMode(enabled bool) (newContextCh <-cha
}

// CatchupServiceCatchup initializes a ledger using the catchup service.
func CatchupServiceCatchup(logger *log.Logger, round basics.Round, catchpoint, dataDir string, genesis bookkeeping.Genesis) error {
func CatchupServiceCatchup(ctx context.Context, logger *log.Logger, round basics.Round, catchpoint, dataDir string, genesis bookkeeping.Genesis) error {
logger.Infof("Starting catchup service with catchpoint: %s", catchpoint)
wrappedLogger := logging.NewWrappedLogger(logger)

start := time.Now()
ctx := context.Background()
cfg := config.AutogenLocal

node := makeNodeProvider(ctx)
l, err := util.MakeLedger(logger, false, &genesis, dataDir)
if err != nil {
return fmt.Errorf("CatchupServiceCatchup() MakeLedger err: %w", err)
}
defer func() {
l.WaitForCommit(l.Latest())
l.Close()
}()

p2pNode, err := network.NewWebsocketNetwork(wrappedLogger, cfg, nil, genesis.ID(), genesis.Network, node)
if err != nil {
return fmt.Errorf("CatchupServiceCatchup() NewWebsocketNetwork err: %w", err)
}
// TODO: Do we need to implement the peer prioritization interface?
//p2pNode.SetPrioScheme(node)
p2pNode.Start()
defer p2pNode.Stop()

// TODO: if the ledger already has a catchpoint, use MakeResumedCatchpointCatchupService
// TODO: Can use MakeResumedCatchpointCatchupService if ledger exists.
// Without this ledger is re-initialized instead of resumed on restart.
service, err := catchup.MakeNewCatchpointCatchupService(
catchpoint,
node,
Expand All @@ -85,12 +90,20 @@ func CatchupServiceCatchup(logger *log.Logger, round basics.Round, catchpoint, d
return fmt.Errorf("CatchupServiceCatchup() MakeNewCatchpointCatchupService err: %w", err)
}

time.Sleep(5 * time.Second)
select {
case <-time.After(Delay):
case <-ctx.Done():
return ctx.Err()
}
service.Start(ctx)

running := true
for running {
time.Sleep(5 * time.Second)
select {
case <-time.After(Delay):
case <-ctx.Done():
return ctx.Err()
}
stats := service.GetStatistics()
running = stats.TotalBlocks == 0 || stats.TotalBlocks != stats.VerifiedBlocks

Expand All @@ -109,6 +122,5 @@ func CatchupServiceCatchup(logger *log.Logger, round basics.Round, catchpoint, d
}

logger.Infof("Catchup finished in %s", time.Since(start))
l.WaitForCommit(l.Latest())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this removed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I replaced it with l.Close(). I'm not sure if either is needed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we still need it to make sure all blocks are committed before closing the ledger?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Honestly... I have no idea. I'll go ahead and add it back, it didn't seem to cause any harm.

return nil
}