Skip to content

Commit

Permalink
Merge pull request #79 from irohalab/master
Browse files Browse the repository at this point in the history
Persistent task management (#78)
  • Loading branch information
lordfriend authored Jul 4, 2020
2 parents 1c72c03 + 7900f46 commit 05049d6
Show file tree
Hide file tree
Showing 20 changed files with 537 additions and 157 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "indexer",
"version": "0.3.4",
"version": "0.4.0",
"scripts": {
"clean:dist": "rimraf dist/*",
"build": "npm run clean:dist && tsc && cp package.json dist",
Expand Down
29 changes: 16 additions & 13 deletions src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,56 +21,59 @@ import { BangumiMoe } from './scraper/bangumi-moe';
import { DmhyScraper } from './scraper/dmhy';
import { NyaaScraper } from './scraper/nyaa';
import { RESTServer } from './server';
import { MongodbStore } from './storage/mongodb-store';
import { DatabaseService } from './service/database-service';
import { MongodbItemStore } from './storage/mongodb-item-store';
import { MongodbTaskStore } from './storage/mongodb-task-store';
import { TaskOrchestra } from './task/task-orchestra';
import { TaskTiming } from './task/task-timing';
import { ConfigLoader, PersistentStorage, Scraper, TYPES } from './types';
import './service/items-query';
import { ConfigLoader, ItemStorage, Scraper, TaskStorage, TYPES } from './types';
import './rest-api/items-query';
import { captureMessage } from './utils/sentry';

/* Initialize container */
const container = new Container();
container.bind<ConfigLoader>(TYPES.ConfigLoader).to(ConfigManager).inSingletonScope();
const config = container.get<ConfigLoader>(TYPES.ConfigLoader);
config.load();

/* bind TaskStorage */
container.bind<DatabaseService>(DatabaseService).toSelf().inSingletonScope();
container.bind<TaskStorage>(TYPES.TaskStorage).to(MongodbTaskStore).inSingletonScope();

/* bind TaskOrchestra */
container.bind<interfaces.Factory<number>>(TYPES.TaskTimingFactory).toFactory<number>(TaskTiming);
container.bind<TaskOrchestra>(TaskOrchestra).toSelf().inTransientScope();

let store: PersistentStorage<number|string>;

switch (config.mode) {
case ConfigManager.DMHY:
container.bind<PersistentStorage<number>>(TYPES.PersistentStorage).to(MongodbStore).inSingletonScope();
container.bind<ItemStorage<number>>(TYPES.ItemStorage).to(MongodbItemStore).inSingletonScope();
container.bind<Scraper>(TYPES.Scraper).to(DmhyScraper).inSingletonScope();
store = container.get<PersistentStorage<number>>(TYPES.PersistentStorage);
break;
case ConfigManager.BANGUMI_MOE:
container.bind<PersistentStorage<string>>(TYPES.PersistentStorage).to(MongodbStore).inSingletonScope();
container.bind<ItemStorage<string>>(TYPES.ItemStorage).to(MongodbItemStore).inSingletonScope();
container.bind<Scraper>(TYPES.Scraper).to(BangumiMoe).inSingletonScope();
store = container.get<PersistentStorage<string>>(TYPES.PersistentStorage);
break;
case ConfigManager.NYAA:
container.bind<PersistentStorage<number>>(TYPES.PersistentStorage).to(MongodbStore).inSingletonScope();
container.bind<ItemStorage<number>>(TYPES.ItemStorage).to(MongodbItemStore).inSingletonScope();
container.bind<Scraper>(TYPES.Scraper).to(NyaaScraper).inSingletonScope();
store = container.get<PersistentStorage<number>>(TYPES.PersistentStorage);
break;
default:
throw new Error('Mode is not supported yet');
}

const scraper = container.get<Scraper>(TYPES.Scraper);
const databaseService = container.get<DatabaseService>(DatabaseService);

// catches Ctrl+C event
process.on('SIGINT', async () => {
console.log('stopping scrapper and store...');
await scraper.end();
await store.onEnd();
await databaseService.onEnd();
process.exit();
});

(async () => {
await store.onStart();
await databaseService.onStart();
await scraper.start();
})();

Expand Down
4 changes: 2 additions & 2 deletions src/service/items-query.ts → src/rest-api/items-query.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@
import { inject } from 'inversify';
import { controller, httpGet, queryParam, BaseHttpController } from 'inversify-express-utils';
import { interfaces } from 'inversify-express-utils/dts/interfaces';
import { PersistentStorage, TYPES } from '../types';
import { ItemStorage, TYPES } from '../types';

@controller('/item')
export class ItemsQuery<T> extends BaseHttpController {

constructor(@inject(TYPES.PersistentStorage) private _storage: PersistentStorage<T>) {
constructor(@inject(TYPES.ItemStorage) private _storage: ItemStorage<T>) {
super();
}

Expand Down
64 changes: 13 additions & 51 deletions src/scraper/abstract/base-scraper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,26 +19,23 @@ import { Item } from '../../entity/Item';
import { MainTask } from '../../task/main-task';
import { SubTask } from '../../task/sub-task';
import { TaskOrchestra } from '../../task/task-orchestra';
import { TaskStatus } from '../../task/task-status';
import { Task, TaskType } from '../../task/task-types';
import { ConfigLoader, PersistentStorage, Scraper } from '../../types';
import { captureMessage } from '../../utils/sentry';

const MAX_TASK_RETRIED_TIMES = 10;
import { ConfigLoader, ItemStorage, Scraper } from '../../types';

@injectable()
export abstract class BaseScraper<T> implements Scraper {
protected className: string;
protected _taskRetriedTimes: Map<number, number>;

protected constructor(protected _taskOrchestra: TaskOrchestra,
protected _config: ConfigLoader,
protected _store: PersistentStorage<T>) {
protected _store: ItemStorage<T>) {
this.className = this.constructor.name;
}

public abstract executeMainTask(pageNo?: number): Promise<{items: Array<Item<T>>, hasNext: boolean}>;
public abstract executeSubTask(item: Item<T>): Promise<number>;
public async executeTask(task: Task): Promise<any> {
public async executeTask(task: Task): Promise<TaskStatus> {
if (task.type === TaskType.MAIN) {
let result;
if (task instanceof MainTask) {
Expand All @@ -47,13 +44,10 @@ export abstract class BaseScraper<T> implements Scraper {
result = await this.executeMainTask();
}
if (!result) {
this.retryTask(task);
return;
} else if (this._taskRetriedTimes.has(task.id)) {
this._taskRetriedTimes.delete(task.id);
return TaskStatus.NeedRetry;
}
for (let item of result.items) {
this._taskOrchestra.queue(new SubTask<T>(TaskType.SUB, item));
await this._taskOrchestra.queue(new SubTask<T>(TaskType.SUB, item));
}
if (result.hasNext && (task as MainTask).pageNo < this._config.maxPageNo) {
let newTask = new MainTask(TaskType.MAIN);
Expand All @@ -63,60 +57,28 @@ export abstract class BaseScraper<T> implements Scraper {
} else {
newTask.pageNo = 2;
}
this._taskOrchestra.queue(newTask);
await this._taskOrchestra.queue(newTask);
}
return TaskStatus.Success;
} else {
let item = (task as SubTask<T>).item;
let httpStatusCode = await this.executeSubTask(item);
if (httpStatusCode === 200) {
if (this._taskRetriedTimes.has(task.id)) {
this._taskRetriedTimes.delete(task.id);
}
return await this._store.putItem(item);
await this._store.putItem(item);
return TaskStatus.Success;
} else if (httpStatusCode !== 404) {
this.retryTask(task);
return;
}
if (this._taskRetriedTimes.has(task.id)) {
this._taskRetriedTimes.delete(task.id);
return TaskStatus.NeedRetry;
}
return TaskStatus.Fail;
}
}

public async start(): Promise<any> {
this._taskOrchestra.queue(new MainTask(TaskType.MAIN));
await this._taskOrchestra.queue(new MainTask(TaskType.MAIN));
this._taskOrchestra.start(this);
}

public async end(): Promise<any> {
this._taskOrchestra.stop();
this._taskRetriedTimes.clear();
}

private checkMaxRetriedTime(task: Task): boolean {
if (this._taskRetriedTimes.has(task.id)) {
if (this._taskRetriedTimes.get(task.id) > MAX_TASK_RETRIED_TIMES) {
return true;
}
this._taskRetriedTimes.set(task.id, this._taskRetriedTimes.get(task.id) + 1);
} else {
this._taskRetriedTimes.set(task.id, 1);
}
return false;
}

private retryTask(task: Task): void {
// retry this task
if (this.checkMaxRetriedTime(task)) {
if (task instanceof MainTask && task.type === TaskType.MAIN) {
captureMessage(`${this.className}, maximum retries reached, Main Task (${task.pageNo})`);
} else if (task instanceof SubTask && task.type === TaskType.SUB) {
captureMessage(`${this.className}, maximum retries reached, Sub Task (${JSON.stringify(task.item)})`);
} else {
captureMessage(`${this.className}, maximum retries reached, Common Task`);
}
return;
}
this._taskOrchestra.queue(task);
}
}
5 changes: 2 additions & 3 deletions src/scraper/bangumi-moe.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,18 @@ import { MediaFile } from '../entity/media-file';
import { Publisher } from '../entity/publisher';
import { Team } from '../entity/Team';
import { TaskOrchestra } from '../task/task-orchestra';
import { ConfigLoader, PersistentStorage, TYPES } from '../types';
import { ConfigLoader, ItemStorage, TYPES } from '../types';
import { captureException } from '../utils/sentry';
import { BaseScraper } from './abstract/base-scraper';

@injectable()
export class BangumiMoe extends BaseScraper<string> {
private static _host = 'https://bangumi.moe';

constructor(@inject(TYPES.PersistentStorage) store: PersistentStorage<string>,
constructor(@inject(TYPES.ItemStorage) store: ItemStorage<string>,
@inject(TYPES.ConfigLoader) config: ConfigLoader,
@inject(TaskOrchestra) taskOrchestra: TaskOrchestra) {
super(taskOrchestra, config, store);
this._taskRetriedTimes = new Map<number, number>();
}

public async executeMainTask(pageNo: number = 1): Promise<{items: Array<Item<string>>, hasNext: boolean}> {
Expand Down
5 changes: 2 additions & 3 deletions src/scraper/dmhy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import { MediaFile } from '../entity/media-file';
import { Publisher } from '../entity/publisher';
import { Team } from '../entity/Team';
import { TaskOrchestra } from '../task/task-orchestra';
import { ConfigLoader, PersistentStorage, TYPES } from '../types';
import { ConfigLoader, ItemStorage, TYPES } from '../types';
import { toUTCDate, trimDomain } from '../utils/normalize';
import { captureException } from '../utils/sentry';
import { BaseScraper } from './abstract/base-scraper';
Expand Down Expand Up @@ -73,11 +73,10 @@ export class DmhyScraper extends BaseScraper<number> {
private static _host = 'https://share.dmhy.org';
private _browser: Browser;

constructor(@inject(TYPES.PersistentStorage) store: PersistentStorage<number>,
constructor(@inject(TYPES.ItemStorage) store: ItemStorage<number>,
@inject(TaskOrchestra) taskOrchestra: TaskOrchestra,
@inject(TYPES.ConfigLoader) config: ConfigLoader) {
super(taskOrchestra, config, store);
this._taskRetriedTimes = new Map<number, number>();
}

public async start(): Promise<any> {
Expand Down
5 changes: 2 additions & 3 deletions src/scraper/nyaa.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import { ItemType } from '../entity/item-type';
import { MediaFile } from '../entity/media-file';
import { Publisher } from '../entity/publisher';
import { TaskOrchestra } from '../task/task-orchestra';
import { ConfigLoader, PersistentStorage, TYPES } from '../types';
import { ConfigLoader, ItemStorage, TYPES } from '../types';
import { captureException } from '../utils/sentry';
import { BaseScraper } from './abstract/base-scraper';
import cheerio = require('cheerio');
Expand All @@ -32,12 +32,11 @@ export class NyaaScraper extends BaseScraper<number> {
private static _host = 'https://nyaa.si';

constructor(
@inject(TYPES.PersistentStorage) store: PersistentStorage<number>,
@inject(TYPES.ItemStorage) store: ItemStorage<number>,
@inject(TaskOrchestra) taskOrchestra: TaskOrchestra,
@inject(TYPES.ConfigLoader) config: ConfigLoader
) {
super(taskOrchestra, config, store);
this._taskRetriedTimes = new Map<number, number>();
}

public async executeMainTask(pageNo?: number): Promise<{ items: Array<Item<number>>, hasNext: boolean }> {
Expand Down
53 changes: 53 additions & 0 deletions src/service/database-service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright 2020 IROHA LAB
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import { inject, injectable } from 'inversify';
import { Db, MongoClient } from 'mongodb';
import { ConfigLoader, TYPES } from '../types';

@injectable()
export class DatabaseService {

private _db: Db;
private _client: MongoClient;

public get isStarted(): boolean {
return this._client && this._client.isConnected();
}

public get db(): Db {
return this._db;
}

constructor(@inject(TYPES.ConfigLoader) private _config: ConfigLoader) {
}

public async onEnd(): Promise<void> {
await this._client.close();
}

public async onStart(): Promise<void> {
if (this.isStarted) {
return;
}
const url = `mongodb://${this._config.dbUser}:${this._config.dbPass}@${
this._config.dbHost
}:${this._config.dbPort}?authSource=${this._config.authSource}`;
this._client = await MongoClient.connect(url, { useNewUrlParser: true, useUnifiedTopology: true });
this._db = this._client.db(this._config.dbName);
return Promise.resolve();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,22 @@
* limitations under the License.
*/

import { Expect, Ignore, Setup, SetupFixture, Teardown, TeardownFixture, Test, TestCase, TestFixture } from 'alsatian';
import { Expect, Setup, SetupFixture, Teardown, Test, TestCase, TestFixture } from 'alsatian';
import { fail } from 'assert';
import { Container } from 'inversify';
import { MongoClient } from 'mongodb';
import { DatabaseService } from '../service/database-service';
import { FakeConfigManager } from '../test/fake-config';
import { ConfigLoader, PersistentStorage, TYPES } from '../types';
import { items } from '../test/test-samples';
import { MongoClient } from 'mongodb';
import { MongodbStore } from './mongodb-store';
import { ConfigLoader, ItemStorage, TYPES } from '../types';
import { MongodbItemStore } from './mongodb-item-store';

@TestFixture('MongodbStore test spec')
export class MongodbStoreSpec {
private _store: MongodbStore<number>;
export class MongodbItemStoreSpec {
private _store: MongodbItemStore<number>;
private _config: ConfigLoader;
private _container: Container;
private _databaseService: DatabaseService;

private _collectionName: string = 'items';

Expand All @@ -38,7 +40,8 @@ export class MongodbStoreSpec {
}
this._container = new Container();
this._container.bind<ConfigLoader>(TYPES.ConfigLoader).to(FakeConfigManager).inSingletonScope();
this._container.bind<PersistentStorage<number>>(TYPES.PersistentStorage).to(MongodbStore).inTransientScope();
this._container.bind<DatabaseService>(DatabaseService).toSelf().inSingletonScope();
this._container.bind<ItemStorage<number>>(TYPES.ItemStorage).to(MongodbItemStore).inTransientScope();
this._config = this._container.get<ConfigLoader>(TYPES.ConfigLoader);
this._config.load();
// this._config.dbHost = 'mongo';
Expand All @@ -48,13 +51,14 @@ export class MongodbStoreSpec {
@Setup
public async databaseInit(): Promise<void> {
// Cast to PostgresStore<number>
this._store = this._container.get<PersistentStorage<number>>(TYPES.PersistentStorage) as MongodbStore<number>;
await this._store.onStart();
this._databaseService = await this._container.get<DatabaseService>(DatabaseService);
await this._databaseService.onStart();
this._store = this._container.get<ItemStorage<number>>(TYPES.ItemStorage) as MongodbItemStore<number>;
}

@Teardown
public async databaseCleanUp(): Promise<void> {
await this._store.onEnd();
await this._databaseService.onEnd();
const client = await this._createClient();
await client.db(this._config.dbName).collection(this._collectionName).drop();
}
Expand Down
Loading

0 comments on commit 05049d6

Please sign in to comment.