Skip to content

Commit

Permalink
Merge pull request #18 from jovijovi/dev-ci
Browse files Browse the repository at this point in the history
Release v0.11.2
  • Loading branch information
jovijovi authored Oct 12, 2022
2 parents 49b25ba + 110025b commit fad475c
Show file tree
Hide file tree
Showing 15 changed files with 262 additions and 124 deletions.
18 changes: 18 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,23 @@
# Changelog

## [v0.11.2](https://github.com/jovijovi/ether-goblin/releases/tag/v0.11.2)

### Features

- (module/event/fetcher/db): add `BulkSave`

### Performance

- (module/event/fetcher): improve dump events performance by `BulkSave`

### Refactor

- (module/event/fetcher): refactor callback

### Build

- Bump packages

## [v0.11.0](https://github.com/jovijovi/ether-goblin/releases/tag/v0.11.0)

### BREAKING CHANGES
Expand Down
6 changes: 3 additions & 3 deletions conf/app.config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -194,12 +194,12 @@ custom:
# Keep running fetcher
keepRunning: false

# Force update database if the data already exists
forceUpdate: true

# Supported Database: postgres, mysql or sqlite
db: postgres

# Chunk size for saving data to the database
chunkSize: 200

# NFT contract owners (TODO)
contractOwners:
- 'CONTRACT_OWNER_ADDRESS_1'
Expand Down
4 changes: 2 additions & 2 deletions devenv/dev.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ networks:

services:
postgres:
container_name: devenv_postgres_pedrojs
container_name: devenv_postgres_goblin
image: postgres:13.8
restart: always
environment:
Expand All @@ -19,7 +19,7 @@ services:
- devenv-network-ether-goblin

mysql:
container_name: devenv_mysql_pedrojs
container_name: devenv_mysql_goblin
image: mysql:8.0.26
command: --default-authentication-plugin=mysql_native_password
restart: always
Expand Down
8 changes: 4 additions & 4 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "ether-goblin",
"version": "0.11.0",
"version": "0.11.2",
"description": "A microservice for the Ethereum ecosystem.",
"author": "jovijovi <mageyul@hotmail.com>",
"license": "MIT",
Expand Down Expand Up @@ -38,10 +38,10 @@
"@jovijovi/express-2fa-token": "^1.1.0",
"@jovijovi/pedrojs-common": "1.1.22",
"@jovijovi/pedrojs-loader": "^1.1.23",
"@jovijovi/pedrojs-mysql": "1.1.22",
"@jovijovi/pedrojs-mysql": "1.1.23",
"@jovijovi/pedrojs-network-http": "1.1.23",
"@jovijovi/pedrojs-pg": "1.1.22",
"@jovijovi/pedrojs-sqlite": "1.1.22",
"@jovijovi/pedrojs-pg": "1.1.23",
"@jovijovi/pedrojs-sqlite": "1.1.23",
"@openzeppelin/contracts": "4.7.3",
"@types/progress": "^2.0.5",
"ethers": "5.7.1",
Expand Down
2 changes: 1 addition & 1 deletion src/config/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ export namespace customConfig {
pushJobIntervals?: number
executeJobConcurrency?: number
keepRunning?: boolean
forceUpdate?: boolean
db: string
chunkSize: number
contractOwners: string[]
}

Expand Down
1 change: 1 addition & 0 deletions src/module/event/common/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ export type EventTransfer = {
blockNumber: number // Block number
blockHash: string // Block hash
blockTimestamp?: number // Block timestamp
blockDatetime?: string // Block datetime
transactionHash: string // Tx hash
from: string // From
to: string // To
Expand Down
62 changes: 62 additions & 0 deletions src/module/event/fetcher/callback.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import got from 'got';
import fastq, {queueAsPromised} from 'fastq';
import {auditor, log, util} from '@jovijovi/pedrojs-common';
import {customConfig} from '../../../config';
import {EventTransfer, Response} from '../common/types';
import {DefaultLoopInterval} from './params';

// Callback queue (ASC, FIFO)
const callbackQueue = new util.Queue<EventTransfer>();

// Callback job
const callbackJob: queueAsPromised<util.Queue<EventTransfer>> = fastq.promise(callback, 1);

// Schedule processing job
export async function Run() {
let isEmpty = true;
setInterval(() => {
auditor.Check(callbackQueue, "Callback queue is nil");
if (callbackQueue.Length() === 0) {
if (!isEmpty) {
log.RequestId().info("All callback finished, queue is empty");
isEmpty = true;
}
return;
}

callbackJob.push(callbackQueue).catch((err) => log.RequestId().error(err));
isEmpty = false;
}, DefaultLoopInterval);
}

// Event callback
async function callback(queue: util.Queue<EventTransfer>): Promise<void> {
try {
const conf = customConfig.GetEvents().fetcher;
// Check URL
if (!conf.callback) {
return;
}

const len = queue.Length();
if (len === 0) {
return;
}

for (let i = 0; i < len; i++) {
const evt = queue.Shift();

// Callback
log.RequestId().debug("Fetcher calling back(%s)... event:", conf.callback, evt);
const rsp: Response = await got.post(conf.callback, {
json: evt
}).json();

log.RequestId().trace("Fetcher callback response=", rsp);
}
} catch (e) {
log.RequestId().error("Fetcher callback failed, error=", e);
}

return;
}
21 changes: 16 additions & 5 deletions src/module/event/fetcher/db/interface.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import {ModelCtor} from 'sequelize';
import {log, util} from '@jovijovi/pedrojs-common';
import {log} from '@jovijovi/pedrojs-common';
import {EventTransfer} from '../../common/types';
import {IMintEvents} from './model';
import {IQuery} from './types';
Expand All @@ -22,7 +22,7 @@ export class Database implements IDatabase {
block_number: evt.blockNumber.toString(), // Block number
block_hash: evt.blockHash, // Block hash
block_timestamp: evt.blockTimestamp, // Block timestamp
block_datetime: util.time.GetUnixTimestamp(evt.blockTimestamp, 'UTC'), // Block datetime
block_datetime: evt.blockDatetime, // Block datetime
transaction_hash: evt.transactionHash, // Tx hash
from: evt.from, // From
to: evt.to, // To
Expand All @@ -37,6 +37,19 @@ export class Database implements IDatabase {
return;
}

// Save records in bulk, ignore duplicates
async BulkSave(records: any[]): Promise<any> {
try {
await this.ModelEvent.bulkCreate(records,
{
ignoreDuplicates: true,
}
);
} catch (e) {
log.RequestId().error('BulkSave failed, error=', e.message);
}
}

// Check if exists
async IsExists(query: IQuery): Promise<boolean> {
try {
Expand All @@ -53,7 +66,7 @@ export class Database implements IDatabase {
}
}

// Query token history (all event type)
// Query token history(all event types) order by 'block_number' ASC
async QueryTokenHistory(address: string, tokenId: string): Promise<any> {
try {
return await this.ModelEvent.findAll(
Expand All @@ -71,5 +84,3 @@ export class Database implements IDatabase {
}
}
}


92 changes: 92 additions & 0 deletions src/module/event/fetcher/dump.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
import fastq, {queueAsPromised} from 'fastq';
import {auditor, log, util} from '@jovijovi/pedrojs-common';
import {EventTransfer} from '../common/types';
import {customConfig} from '../../../config';
import {DefaultChunkSize, DefaultLoopInterval} from './params';
import {DB} from './db';
import {NewJobID} from '../utils';

// Event queue (ASC, FIFO)
const eventQueue = new util.Queue<EventTransfer>();

// Dump job
const dumpJob: queueAsPromised<util.Queue<EventTransfer>> = fastq.promise(dump, 1);

// Schedule dump job
export async function Run() {
let isEmpty = true;
setInterval(() => {
auditor.Check(eventQueue, "Event queue is nil");
if (eventQueue.Length() === 0) {
if (!isEmpty) {
log.RequestId().info("All events dumped, queue is empty");
isEmpty = true;
}
return;
}

dumpJob.push(eventQueue).catch((err) => log.RequestId().error(err));
isEmpty = false;
}, DefaultLoopInterval);
}

export function Push(evt: EventTransfer) {
eventQueue.Push(evt);
}

// Dump events
async function dump(queue: util.Queue<EventTransfer>): Promise<void> {
try {
const len = queue.Length();
if (len === 0) {
return;
}

const conf = customConfig.GetEvents().fetcher;
const defaultChunkSize = conf.chunkSize ? conf.chunkSize : DefaultChunkSize;
const jobId = NewJobID();

let leftEvents = len;
do {
const chunkSize = leftEvents < defaultChunkSize ? leftEvents : defaultChunkSize;

const events = [];
for (let i = 0; i < chunkSize; i++) {
const evt = queue.Shift();

events.push({
address: evt.address, // NFT Contract address
block_number: evt.blockNumber.toString(), // Block number
block_hash: evt.blockHash, // Block hash
block_timestamp: evt.blockTimestamp, // Block timestamp
block_datetime: evt.blockDatetime, // Block datetime
transaction_hash: evt.transactionHash, // Tx hash
from: evt.from, // From
to: evt.to, // To
token_id: evt.tokenId.toString(), // NFT Token ID
event_type: evt.eventType // Event type
});
}

// Save events in bulk
await DB.Client().BulkSave(events);

// Calc left events
leftEvents -= chunkSize;

log.RequestId().debug("EXEC JOB(Dump|id:%s), %d events dumped, progress=%d%(%d/%d), lastBlockInChunk=%s",
jobId,
events.length,
((len - leftEvents) * 100 / len).toFixed(1),
len - leftEvents,
len,
events[chunkSize - 1].block_number,
);
} while (leftEvents > 0);
} catch (e) {
log.RequestId().error("Dump failed, error=", e);
return;
}

return;
}
Loading

0 comments on commit fad475c

Please sign in to comment.