Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhancement: update indexer table schema #1143

Closed
wants to merge 13 commits into from
36 changes: 18 additions & 18 deletions idb/postgres/internal/schema/setup_postgres.sql
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
-- TODO? replace all 'addr bytea' with 'addr_id bigint' and a mapping table? makes addrs an 8 byte int that fits in a register instead of a 32 byte string

CREATE TABLE IF NOT EXISTS block_header (
round bigint PRIMARY KEY,
round numeric(20) PRIMARY KEY,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the rationale behind choosing 20 as opposed to another number?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is because uint64 max is 18446744073709551615 and it's a value of 20 digits.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it thanks!

realtime timestamp without time zone NOT NULL,
rewardslevel bigint NOT NULL,
header jsonb NOT NULL
Expand All @@ -14,10 +14,10 @@ CREATE TABLE IF NOT EXISTS block_header (
CREATE INDEX IF NOT EXISTS block_header_time ON block_header (realtime);

CREATE TABLE IF NOT EXISTS txn (
round bigint NOT NULL,
round numeric(20) NOT NULL,
intra integer NOT NULL,
typeenum smallint NOT NULL,
asset bigint NOT NULL, -- 0=Algos, otherwise AssetIndex
asset numeric(20) NOT NULL, -- 0=Algos, otherwise AssetIndex
txid bytea, -- base32 of [32]byte hash, or NULL for inner transactions.
txn jsonb NOT NULL, -- json encoding of signed txn with apply data; inner txns exclude nested inner txns
extra jsonb NOT NULL,
Expand All @@ -32,7 +32,7 @@ CREATE INDEX IF NOT EXISTS txn_by_tixid ON txn ( txid );

CREATE TABLE IF NOT EXISTS txn_participation (
addr bytea NOT NULL,
round bigint NOT NULL,
round numeric(20) NOT NULL,
intra integer NOT NULL
);

Expand All @@ -46,21 +46,21 @@ CREATE TABLE IF NOT EXISTS account (
rewardsbase bigint NOT NULL,
rewards_total bigint NOT NULL,
deleted bool NOT NULL, -- whether or not it is currently deleted
created_at bigint NOT NULL, -- round that the account is first used
closed_at bigint, -- round that the account was last closed
created_at numeric(20) NOT NULL, -- round that the account is first used
closed_at numeric(20), -- round that the account was last closed
keytype varchar(8), -- "sig", "msig", "lsig", or NULL if unknown
account_data jsonb NOT NULL -- trimmed ledgercore.AccountData that excludes the fields above; SQL 'NOT NULL' is held though the json string will be "null" iff account is deleted
);

-- data.basics.AccountData Assets[asset id] AssetHolding{}
CREATE TABLE IF NOT EXISTS account_asset (
addr bytea NOT NULL, -- [32]byte
assetid bigint NOT NULL,
assetid numeric(20) NOT NULL,
amount numeric(20) NOT NULL, -- need the full 18446744073709551615
frozen boolean NOT NULL,
deleted bool NOT NULL, -- whether or not it is currently deleted
created_at bigint NOT NULL, -- round that the asset was added to an account
closed_at bigint, -- round that the asset was last removed from the account
created_at numeric(20) NOT NULL, -- round that the asset was added to an account
closed_at numeric(20), -- round that the asset was last removed from the account
PRIMARY KEY (addr, assetid)
);

Expand All @@ -72,12 +72,12 @@ CREATE INDEX IF NOT EXISTS account_asset_by_addr_partial ON account_asset(addr)

-- data.basics.AccountData AssetParams[index] AssetParams{}
CREATE TABLE IF NOT EXISTS asset (
index bigint PRIMARY KEY,
index numeric(20) PRIMARY KEY,
creator_addr bytea NOT NULL,
params jsonb NOT NULL, -- data.basics.AssetParams; json string "null" iff asset is deleted
deleted bool NOT NULL, -- whether or not it is currently deleted
created_at bigint NOT NULL, -- round that the asset was created
closed_at bigint -- round that the asset was closed; cannot be recreated because the index is unique
created_at numeric(20) NOT NULL, -- round that the asset was created
closed_at numeric(20) -- round that the asset was closed; cannot be recreated because the index is unique
);

-- For account lookup
Expand All @@ -93,12 +93,12 @@ CREATE TABLE IF NOT EXISTS metastate (
-- per app global state
-- roughly go-algorand/data/basics/userBalance.go AppParams
CREATE TABLE IF NOT EXISTS app (
index bigint PRIMARY KEY,
index numeric(20) PRIMARY KEY,
creator bytea NOT NULL, -- account address
params jsonb NOT NULL, -- json string "null" iff app is deleted
deleted bool NOT NULL, -- whether or not it is currently deleted
created_at bigint NOT NULL, -- round that the asset was created
closed_at bigint -- round that the app was deleted; cannot be recreated because the index is unique
created_at numeric(20) NOT NULL, -- round that the asset was created
closed_at numeric(20) -- round that the app was deleted; cannot be recreated because the index is unique
);

-- For account lookup
Expand All @@ -107,11 +107,11 @@ CREATE INDEX IF NOT EXISTS app_by_creator_deleted ON app(creator, deleted);
-- per-account app local state
CREATE TABLE IF NOT EXISTS account_app (
addr bytea,
app bigint,
app numeric(20),
localstate jsonb NOT NULL, -- json string "null" iff deleted from the account
deleted bool NOT NULL, -- whether or not it is currently deleted
created_at bigint NOT NULL, -- round that the app was added to an account
closed_at bigint, -- round that the account_app was last removed from the account
created_at numeric(20) NOT NULL, -- round that the app was added to an account
closed_at numeric(20), -- round that the account_app was last removed from the account
PRIMARY KEY (addr, app)
);

Expand Down
2 changes: 1 addition & 1 deletion idb/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ func (db *IndexerDb) init(opts idb.IndexerDbOptions) (chan struct{}, error) {
}

// see postgres_migrations.go
return db.runAvailableMigrations(opts)
return db.runAvailableMigrations()
}

// Preload asset and app creators.
Expand Down
31 changes: 23 additions & 8 deletions idb/postgres/postgres_migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,12 @@ func init() {
{upgradeNotSupported, true, "notify the user that upgrade is not supported"},
{dropTxnBytesColumn, true, "drop txnbytes column"},
{convertAccountData, true, "convert account.account_data column"},
{convertBigIntType, true, "convert columns from bigint type to numeric(20) for all tables"},
}
}

// A migration function should take care of writing back to metastate migration row
type postgresMigrationFunc func(*IndexerDb, *types.MigrationState, *idb.IndexerDbOptions) error
type postgresMigrationFunc func(*IndexerDb, *types.MigrationState) error

type migrationStruct struct {
migrate postgresMigrationFunc
Expand All @@ -67,9 +68,9 @@ type migrationStruct struct {

var migrations []migrationStruct

func wrapPostgresHandler(handler postgresMigrationFunc, db *IndexerDb, state *types.MigrationState, opts *idb.IndexerDbOptions) migration.Handler {
func wrapPostgresHandler(handler postgresMigrationFunc, db *IndexerDb, state *types.MigrationState) migration.Handler {
return func() error {
return handler(db, state, opts)
return handler(db, state)
}
}

Expand All @@ -90,7 +91,7 @@ func needsMigration(state types.MigrationState) bool {

// Returns an error object and a channel that gets closed when blocking migrations
// finish running successfully.
func (db *IndexerDb) runAvailableMigrations(opts idb.IndexerDbOptions) (chan struct{}, error) {
func (db *IndexerDb) runAvailableMigrations() (chan struct{}, error) {
state, err := db.getMigrationState(context.Background(), nil)
if err == idb.ErrorNotInitialized {
state = types.MigrationState{}
Expand All @@ -103,7 +104,7 @@ func (db *IndexerDb) runAvailableMigrations(opts idb.IndexerDbOptions) (chan str
tasks := make([]migration.Task, 0)
for nextMigration < len(migrations) {
tasks = append(tasks, migration.Task{
Handler: wrapPostgresHandler(migrations[nextMigration].migrate, db, &state, &opts),
Handler: wrapPostgresHandler(migrations[nextMigration].migrate, db, &state),
MigrationID: nextMigration,
Description: migrations[nextMigration].description,
DBUnavailable: migrations[nextMigration].blocking,
Expand Down Expand Up @@ -214,17 +215,17 @@ func disabled(version string) func(db *IndexerDb, migrationState *types.Migratio
}
}

func upgradeNotSupported(db *IndexerDb, migrationState *types.MigrationState, opts *idb.IndexerDbOptions) error {
func upgradeNotSupported(db *IndexerDb, migrationState *types.MigrationState) error {
return errors.New(
"upgrading from this version is not supported; create a new database")
}

func dropTxnBytesColumn(db *IndexerDb, migrationState *types.MigrationState, opts *idb.IndexerDbOptions) error {
func dropTxnBytesColumn(db *IndexerDb, migrationState *types.MigrationState) error {
return sqlMigration(
db, migrationState, []string{"ALTER TABLE txn DROP COLUMN txnbytes"})
}

func convertAccountData(db *IndexerDb, migrationState *types.MigrationState, opts *idb.IndexerDbOptions) error {
func convertAccountData(db *IndexerDb, migrationState *types.MigrationState) error {
newMigrationState := *migrationState
newMigrationState.NextMigration++

Expand All @@ -249,3 +250,17 @@ func convertAccountData(db *IndexerDb, migrationState *types.MigrationState, opt
*migrationState = newMigrationState
return nil
}

func convertBigIntType(db *IndexerDb, migrationState *types.MigrationState) error {
return sqlMigration(
db, migrationState, []string{"ALTER TABLE block_header ALTER COLUMN round SET DATA TYPE numeric(20)",
"ALTER TABLE txn ALTER COLUMN round SET DATA TYPE numeric(20),ALTER COLUMN asset SET DATA TYPE numeric(20);",
"ALTER TABLE txn_participation ALTER COLUMN round SET DATA TYPE numeric(20)",
"ALTER TABLE account ALTER COLUMN created_at SET DATA TYPE numeric(20),ALTER COLUMN closed_at SET DATA TYPE numeric(20);",
"ALTER TABLE account_asset ALTER COLUMN assetid SET DATA TYPE numeric(20),ALTER COLUMN created_at SET DATA TYPE numeric(20), ALTER COLUMN closed_at SET DATA TYPE numeric(20);",
"ALTER TABLE asset ALTER COLUMN index SET DATA TYPE numeric(20),ALTER COLUMN created_at SET DATA TYPE numeric(20), ALTER COLUMN closed_at SET DATA TYPE numeric(20);",
"ALTER TABLE app ALTER COLUMN index SET DATA TYPE numeric(20),ALTER COLUMN created_at SET DATA TYPE numeric(20), ALTER COLUMN closed_at SET DATA TYPE numeric(20);",
"ALTER TABLE account_app ALTER COLUMN app SET DATA TYPE numeric(20),ALTER COLUMN created_at SET DATA TYPE numeric(20), ALTER COLUMN closed_at SET DATA TYPE numeric(20);",
"",
})
}
35 changes: 34 additions & 1 deletion idb/postgres/postgres_migrations_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,44 @@ func TestConvertAccountDataIncrementsMigrationNumber(t *testing.T) {
err := db.setMigrationState(nil, &migrationState)
require.NoError(t, err)

err = convertAccountData(&db, &migrationState, nil)
err = convertAccountData(&db, &migrationState)
require.NoError(t, err)

migrationState, err = db.getMigrationState(context.Background(), nil)
require.NoError(t, err)

assert.Equal(t, types.MigrationState{NextMigration: 6}, migrationState)
}

func TestConvertBigIntType(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if this is necessary, but should we add a test case that asserts the existing values aren't changed by the type conversion?

i.e. insert round 1,000,000 convert the type and assert that the round is still 1,000,000?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point. I think this can be covered by running the block validator.

pdb, _, shutdownFunc := pgtest.SetupPostgresWithSchema(t)
defer shutdownFunc()

db := IndexerDb{db: pdb}
defer db.Close()

// old table schema. insert a record with round 18446744073709551613
query := "INSERT INTO txn_participation(addr,round,intra) VALUES('\\x013d7d16d7ad4fefb61bd95b765c8ceb'::bytea,18446744073709551613,1)"
_, err := pdb.Exec(context.Background(), query)
assert.Contains(t, err.Error(), "ERROR: bigint out of range")

// run type conversion
migrationState := types.MigrationState{
NextMigration: 6,
}
err = db.setMigrationState(nil, &migrationState)
require.NoError(t, err)

err = convertBigIntType(&db, &migrationState)
require.NoError(t, err)

migrationState, err = db.getMigrationState(context.Background(), nil)
require.NoError(t, err)

assert.Equal(t, types.MigrationState{NextMigration: 7}, migrationState)

// after table schema is updated. insert a record with round 18446744073709551613
_, err = pdb.Exec(context.Background(), query)
assert.NoError(t, err)

}