Skip to content

Commit

Permalink
Merge pull request #90 from irohalab/master
Browse files Browse the repository at this point in the history
0.5.0
  • Loading branch information
EverettSummer authored Oct 3, 2020
2 parents f6d7638 + 855ecb3 commit 455ead2
Show file tree
Hide file tree
Showing 16 changed files with 338 additions and 29 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,5 @@ node_modules
.vscode
coverage
.env
env.sh
env-test.sh
3 changes: 3 additions & 0 deletions docker-compose.prod.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ services:
DB_NAME: "dmhy_indexer"
SERVER_PORT: "4000"
SENTRY_DSN: "${SENTRY_DSN}"
APPINSIGHTS_INSTRUMENTATIONKEY: "${APPINSIGHTS_INSTRUMENTATIONKEY}"

bangumi_moe:
<< : *web-common
Expand All @@ -36,6 +37,7 @@ services:
DB_NAME: "bangumi_moe_indexer"
SERVER_PORT: "4200"
SENTRY_DSN: "${SENTRY_DSN}"
APPINSIGHTS_INSTRUMENTATIONKEY: "${APPINSIGHTS_INSTRUMENTATIONKEY}"

nyaa:
<< : *web-common
Expand All @@ -51,3 +53,4 @@ services:
DB_NAME: "nyaa_indexer"
SERVER_PORT: "4300"
SENTRY_DSN: "${SENTRY_DSN}"
APPINSIGHTS_INSTRUMENTATIONKEY: "${APPINSIGHTS_INSTRUMENTATIONKEY}"
1 change: 1 addition & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ services:
MAX_PAGE_NO: ${MAX_PAGE_NO-5}
MAX_SEARCH_COUNT: ${MAX_SEARCH_COUNT-100}
SENTRY_DSN: ${SENTRY_DSN}
APPINSIGHTS_INSTRUMENTATIONKEY: "${APPINSIGHTS_INSTRUMENTATIONKEY}"
volumes:
- ".:/irohalab/indexer"
- "/irohalab/indexer/node_modules"
Expand Down
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "indexer",
"version": "0.4.2",
"version": "0.5.0",
"scripts": {
"clean:dist": "rimraf dist/*",
"build": "npm run clean:dist && tsc && cp package.json dist",
Expand All @@ -15,6 +15,7 @@
},
"dependencies": {
"@sentry/node": "5.10.2",
"applicationinsights": "^1.8.7",
"axios": "^0.18.1",
"cheerio": "^1.0.0-rc.3",
"express": "^4.17.1",
Expand Down
25 changes: 19 additions & 6 deletions src/scraper/nyaa.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,13 @@ import { MediaFile } from '../entity/media-file';
import { Publisher } from '../entity/publisher';
import { TaskOrchestra } from '../task/task-orchestra';
import { ConfigLoader, ItemStorage, TYPES } from '../types';
import { AzureLogger } from '../utils/azure-logger';
import { captureException } from '../utils/sentry';
import { BaseScraper } from './abstract/base-scraper';
import cheerio = require('cheerio');

const logger = AzureLogger.getInstance();

@injectable()
export class NyaaScraper extends BaseScraper<number> {
private static _host = 'https://nyaa.si';
Expand All @@ -45,8 +48,9 @@ export class NyaaScraper extends BaseScraper<number> {
if (pageNo) {
listPageUrl += '/?p=' + pageNo;
}
const resp = await Axios.get(listPageUrl);
console.log(`Scrapping ${listPageUrl}`);
const resp = await Axios.get(listPageUrl);

const $ = cheerio.load(resp.data);
const trList = Array.from($('table > tbody > tr'));
let items: Array<Item<number>> = [];
Expand All @@ -71,7 +75,10 @@ export class NyaaScraper extends BaseScraper<number> {
return { hasNext: newIds.length === items.length && newIds.length > 0, items: newItems };

} catch (e) {
captureException(e);
if (e.code !== 'ETIMEDOUT') {
captureException(e);
}
logger.log('exception', e.stack, AzureLogger.ERROR, {line: '81'});
console.error(e.stack);
return null;
}
Expand All @@ -80,9 +87,11 @@ export class NyaaScraper extends BaseScraper<number> {
public async executeSubTask(item: Item<number>): Promise<number> {
let statusCode = -1;
try {
const resp = await Axios.get(`${NyaaScraper._host}${item.uri}`);
const subTaskUrl = `${NyaaScraper._host}${item.uri}`;
console.log(`Scrapping ${subTaskUrl}`);
const resp = await Axios.get(subTaskUrl);
statusCode = resp.status;
console.log(`Scrapping ${NyaaScraper._host}${item.uri}`);

const $ = cheerio.load(resp.data);
const panels = $('.container > .panel');
item.title = panels.eq(0).find('.panel-title').text().trim();
Expand Down Expand Up @@ -129,8 +138,12 @@ export class NyaaScraper extends BaseScraper<number> {
} else {
statusCode = -1;
}
captureException(e);
console.error(e.stack);
logger.log('exception', e.stack, AzureLogger.ERROR, {line: '141'});
if (statusCode !== 404 && e.code !== 'ETIMEDOUT') {
captureException(e);
}

console.error(JSON.stringify(e));
}
return statusCode;
}
Expand Down
61 changes: 61 additions & 0 deletions src/service/database-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,24 @@
import { inject, injectable } from 'inversify';
import { Db, MongoClient } from 'mongodb';
import { ConfigLoader, TYPES } from '../types';
import { AzureLogger } from '../utils/azure-logger';
import { captureException } from '../utils/sentry';

@injectable()
export class DatabaseService {

private _db: Db;
private _client: MongoClient;
private _collectionNames: string[] = [];

private _logger: AzureLogger;

public get db(): Db {
return this._db;
}

constructor(@inject(TYPES.ConfigLoader) private _config: ConfigLoader) {
this._logger = AzureLogger.getInstance();
}

public async onEnd(): Promise<void> {
Expand All @@ -41,6 +47,61 @@ export class DatabaseService {
}:${this._config.dbPort}?authSource=${this._config.authSource}`;
this._client = await MongoClient.connect(url, { useNewUrlParser: true, useUnifiedTopology: true });
this._db = this._client.db(this._config.dbName);
await this._doCheckCollection();
return Promise.resolve();
}

public checkCollection(collectionNames: string[]): void {
this._collectionNames = this._collectionNames.concat(collectionNames);
if (this.db) {
this._doCheckCollection()
.then(() => {
console.log('collection checked');
});
}
}

public async transaction(transactionAction: (client: MongoClient) => Promise<void>): Promise<void> {
let session = null;
try {
session = this._client.startSession();
await session.withTransaction(async () => {
await transactionAction(this._client);
});
} catch (e) {
console.error(e);
this._logger.log('transaction_error', e.Message, AzureLogger.ERROR, {
stack: e.stack
});
captureException(e);
} finally {
if (session != null) {
session.endSession();
}
}
}

private async _doCheckCollection(): Promise<void> {
let collectionNames = this._collectionNames;
let existCollections: string[];
let cur = null;
try {
cur = this.db.listCollections({}, {nameOnly: true});
existCollections = await cur.toArray() as string[];
if (existCollections) {
for (let collectionName of collectionNames) {
if (existCollections.indexOf(collectionName) === -1) {
await this.db.createCollection(collectionName);
}
}
}
} catch (e) {
console.error(e);
captureException(e);
} finally {
if (cur != null && !cur.isClosed()) {
cur.close();
}
}
}
}
1 change: 1 addition & 0 deletions src/storage/mongodb-item-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ export class MongodbItemStore<T> implements ItemStorage<T> {

constructor(@inject(TYPES.ConfigLoader) private _config: ConfigLoader,
private _databaseService: DatabaseService) {
this._databaseService.checkCollection([this._collectionName]);
}

public deleteItem(id: T): Promise<boolean> {
Expand Down
51 changes: 44 additions & 7 deletions src/storage/mongodb-task-store.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,15 @@ import { Expect, Setup, SetupFixture, Teardown, Test, TestFixture } from 'alsati
import { fail } from 'assert';
import { Container } from 'inversify';
import { MongoClient } from 'mongodb';
import { Item } from '../entity/Item';
import { DatabaseService } from '../service/database-service';
import { CommonTask, TaskType } from '../task/task-types';
import { SubTask } from '../task/sub-task';
import { TaskType } from '../task/task-types';
import { FakeConfigManager } from '../test/fake-config';
import { ConfigLoader, TaskStorage, TYPES } from '../types';
import { MongodbTaskStore } from './mongodb-task-store';

// noinspection DuplicatedCode
@TestFixture('MongodbStore test spec')
export class MongodbItemStoreSpec {
private _store: MongodbTaskStore;
Expand Down Expand Up @@ -66,17 +69,34 @@ export class MongodbItemStoreSpec {
public async taskQueueOperation(): Promise<void> {
const tasks = [];
for (let i = 0; i < 10; i++) {
tasks.push(new CommonTask(TaskType.SUB));
let item = new Item<number>();
item.id = i;
item.uri = `/item/${i}`;
tasks.push(new SubTask(TaskType.SUB, item));
}

/* make a duplicate task */
let dupItem = new Item();
dupItem.id = 0;
dupItem.uri = `/item/0`;
tasks.push(new SubTask(TaskType.SUB, dupItem));
/* --- */

for (let task of tasks) {
await this._store.offerTask(task);
}

let idx = 0;
let idx = 1;
let dupCount = 0;
while (await this._store.hasTask()) {
let task = await this._store.pollTask();
Expect(task.id).toBe(tasks[idx].id);
if ((task as SubTask<number>).item.uri === '/item/0') {
Expect(task.id).toBe(tasks[tasks.length - 1].id);
Expect(dupCount).toBe(0);
dupCount++;
} else {
Expect(task.id).toBe(tasks[idx].id);
}
let sleepTime = Math.round(Math.random() * 20);
await this.sleep(sleepTime);
idx++;
Expand All @@ -91,17 +111,34 @@ export class MongodbItemStoreSpec {
public async failedTaskQueueOperation(): Promise<void> {
const tasks = [];
for (let i = 0; i < 10; i++) {
tasks.push(new CommonTask(TaskType.SUB));
let item = new Item<number>();
item.id = i;
item.uri = `/item/${i}`;
tasks.push(new SubTask(TaskType.SUB, item));
}

/* make a duplicate task */
let dupItem = new Item();
dupItem.id = 0;
dupItem.uri = `/item/0`;
tasks.push(new SubTask(TaskType.SUB, dupItem));
/* --- */

for (let task of tasks) {
await this._store.offerFailedTask(task);
}

let idx = 0;
let idx = 1;
let dupCount = 0;
while (await this._store.hasFailedTask()) {
let task = await this._store.pollFailedTask();
Expect(task.id).toBe(tasks[idx].id);
if ((task as SubTask<number>).item.uri === '/item/0') {
Expect(task.id).toBe(tasks[tasks.length - 1].id);
Expect(dupCount).toBe(0);
dupCount++;
} else {
Expect(task.id).toBe(tasks[idx].id);
}
Expect(task.retryCount).toBeGreaterThan(0);
let sleepTime = Math.round(Math.random() * 20);
await this.sleep(sleepTime);
Expand Down
43 changes: 36 additions & 7 deletions src/storage/mongodb-task-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,14 @@
* limitations under the License.
*/

import { injectable } from 'inversify';
import { inject, injectable } from 'inversify';
import { Db } from 'mongodb';
import { DatabaseService } from '../service/database-service';
import { Task } from '../task/task-types';
import { TaskStorage } from '../types';
import { MainTask } from '../task/main-task';
import { SubTask } from '../task/sub-task';
import { Task, TaskType } from '../task/task-types';
import { ConfigLoader, TaskStorage, TYPES } from '../types';
import { AzureLogger } from '../utils/azure-logger';

@injectable()
export class MongodbTaskStore implements TaskStorage {
Expand All @@ -29,40 +32,66 @@ export class MongodbTaskStore implements TaskStorage {

private _taskCollectionName = 'task';
private _failedTaskCollectionName = 'failed_task';
private _logger: AzureLogger;

constructor(private _databaseService: DatabaseService) {
constructor(private _databaseService: DatabaseService,
@inject(TYPES.ConfigLoader) private _config: ConfigLoader) {
this._logger = AzureLogger.getInstance();
this._databaseService.checkCollection([this._taskCollectionName, this._failedTaskCollectionName]);
}

public async pollFailedTask(): Promise<Task> {
let task = await this.poll(this._failedTaskCollectionName);
task.retryCount = task.retryCount ? task.retryCount++ : 1;
this._logger.log('pollFailedTask', `task#${task.id} has been polled from queue`,
AzureLogger.INFO, {task: JSON.stringify(task)});
return task;
}

public pollTask(): Promise<Task> {
return this.poll(this._taskCollectionName);
public async pollTask(): Promise<Task> {
const task = await this.poll(this._taskCollectionName);
console.log(`poll task#${task.id}`);
return task;
}

public offerFailedTask(task: Task): Promise<boolean> {
this._logger.log('offerFailedTask', `task#${task.id} has been offered into queue`,
AzureLogger.INFO, {task: JSON.stringify(task)});
return this.push(this._failedTaskCollectionName, task);
}

public offerTask(task: Task): Promise<boolean> {
console.log(`push task#${task.id}, type ${task.type.toString()}`);
return this.push(this._taskCollectionName, task);
}

public async hasTask(): Promise<boolean> {
let count = await this.db.collection(this._taskCollectionName).estimatedDocumentCount();
console.log('hasTask ', count);
return count > 0;
}

public async hasFailedTask(): Promise<boolean> {
let count = await this.db.collection(this._failedTaskCollectionName).estimatedDocumentCount();
console.log('hasFailedTask', count);
return count > 0;
}

private async push(collection: string, task: Task): Promise<boolean> {
await this.db.collection(collection).insertOne(Object.assign({}, task, {updateTime: Date.now()}));
await this._databaseService.transaction(async (client) => {
const taskCollection = client.db(this._config.dbName).collection(collection);
let filterObj: any = {type: task.type};
if (task.type === TaskType.MAIN) {
if (task instanceof MainTask) {
filterObj.pageNo = task.pageNo;
}
} else {
filterObj['item.uri'] = (task as SubTask<any>).item.uri;
}
await taskCollection.deleteMany(filterObj);
await taskCollection.insertOne(Object.assign({}, task, {updateTime: Date.now()}));
});
console.log(`push task#${task.id}`, `task type ${task.type}`);
return Promise.resolve(true);
}

Expand Down
Loading

0 comments on commit 455ead2

Please sign in to comment.