Skip to content

Commit

Permalink
packager: BatchProcessor: use Promise for processBatch()
Browse files Browse the repository at this point in the history
Reviewed By: cpojer

Differential Revision: D4572495

fbshipit-source-id: 4a18b6ae16ea588104c337f2085707be07609005
  • Loading branch information
Jean Lauliac authored and facebook-github-bot committed Feb 20, 2017
1 parent bac576c commit 564126f
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 75 deletions.
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@
"event-target-shim": "^1.0.5",
"fbjs": "^0.8.9",
"fbjs-scripts": "^0.7.0",
"form-data": "^2.1.1",
"fs-extra": "^0.26.2",
"glob": "^5.0.15",
"graceful-fs": "^4.1.3",
Expand Down
56 changes: 28 additions & 28 deletions packager/src/lib/BatchProcessor.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,7 @@

const invariant = require('fbjs/lib/invariant');

type ProcessBatch<TItem, TResult> = (
batch: Array<TItem>,
callback: (error?: Error, orderedResults?: Array<TResult>) => mixed,
) => mixed;
type ProcessBatch<TItem, TResult> = (batch: Array<TItem>) => Promise<Array<TResult>>;

type BatchProcessorOptions = {
maximumDelayMs: number,
Expand Down Expand Up @@ -45,10 +42,7 @@ class BatchProcessor<TItem, TResult> {
_queue: Array<QueueItem<TItem, TResult>>;
_timeoutHandle: ?number;

constructor(
options: BatchProcessorOptions,
processBatch: ProcessBatch<TItem, TResult>,
) {
constructor(options: BatchProcessorOptions, processBatch: ProcessBatch<TItem, TResult>) {
this._options = options;
this._processBatch = processBatch;
this._queue = [];
Expand All @@ -57,30 +51,36 @@ class BatchProcessor<TItem, TResult> {
(this: any)._processQueue = this._processQueue.bind(this);
}

_onBatchFinished() {
this._currentProcessCount--;
this._processQueueOnceReady();
}

_onBatchResults(jobs: Array<QueueItem<TItem, TResult>>, results: Array<TResult>) {
invariant(results.length === jobs.length, 'Not enough results returned.');
for (let i = 0; i < jobs.length; ++i) {
jobs[i].resolve(results[i]);
}
this._onBatchFinished();
}

_onBatchError(jobs: Array<QueueItem<TItem, TResult>>, error: mixed) {
for (let i = 0; i < jobs.length; ++i) {
jobs[i].reject(error);
}
this._onBatchFinished();
}

_processQueue() {
this._timeoutHandle = null;
while (
this._queue.length > 0 &&
this._currentProcessCount < this._options.concurrency
) {
const {concurrency} = this._options;
while (this._queue.length > 0 && this._currentProcessCount < concurrency) {
this._currentProcessCount++;
const jobs = this._queue.splice(0, this._options.maximumItems);
const items = jobs.map(job => job.item);
this._processBatch(items, (error, results) => {
if (error != null) {
for (let i = 0; i < jobs.length; ++i) {
jobs[i].reject(error);
}
} else {
invariant(results != null, 'Neither results or error were returned.');
invariant(results.length === items.length, 'Not enough results returned.');
for (let i = 0; i < jobs.length; ++i) {
jobs[i].resolve(results[i]);
}
}
this._currentProcessCount--;
this._processQueueOnceReady();
});
this._processBatch(jobs.map(job => job.item)).then(
this._onBatchResults.bind(this, jobs),
this._onBatchError.bind(this, jobs),
);
}
}

Expand Down
49 changes: 18 additions & 31 deletions packager/src/lib/GlobalTransformCache.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,8 @@ import type {Options as TransformOptions} from '../JSTransformer/worker/worker';
import type {CachedResult, GetTransformCacheKey} from './TransformCache';
import type {Reporter} from './reporting';

type FetchResultURIs = (
keys: Array<string>,
callback: (error?: Error, results?: Map<string, string>) => void,
) => mixed;

type StoreResults = (
resultsByKey: Map<string, CachedResult>,
callback: (error?: Error) => void,
) => mixed;
type FetchResultURIs = (keys: Array<string>) => Promise<Map<string, string>>;
type StoreResults = (resultsByKey: Map<string, CachedResult>) => Promise<void>;

type FetchProps = {
filePath: string,
Expand Down Expand Up @@ -60,17 +53,15 @@ class KeyURIFetcher {
* and we proceed as if there were no result for these keys instead. That way
* a build will not fail just because of the cache.
*/
_processKeys(
keys: Array<string>,
callback: (error?: Error, keyURIs: Array<?URI>) => mixed,
) {
this._fetchResultURIs(keys, (error, URIsByKey) => {
if (error != null) {
this._processError(error);
}
const URIs = keys.map(key => URIsByKey && URIsByKey.get(key));
callback(undefined, URIs);
});
async _processKeys(keys: Array<string>): Promise<Array<?URI>> {
let URIsByKey;
try {
URIsByKey = await this._fetchResultURIs(keys);
} catch (error) {
this._processError(error);
return new Array(keys.length);
}
return keys.map(key => URIsByKey.get(key));
}

fetch(key: string, callback: FetchURICallback) {
Expand All @@ -92,21 +83,17 @@ class KeyURIFetcher {

}

type KeyedResult = {key: string, result: CachedResult};

class KeyResultStore {

_storeResults: StoreResults;
_batchProcessor: BatchProcessor<{key: string, result: CachedResult}, void>;
_batchProcessor: BatchProcessor<KeyedResult, void>;

_processResults(
keyResults: Array<{key: string, result: CachedResult}>,
callback: (error?: Error) => mixed,
) {
const resultsByKey = new Map(
keyResults.map(pair => [pair.key, pair.result]),
);
this._storeResults(resultsByKey, error => {
callback(error);
});
async _processResults(keyResults: Array<KeyedResult>): Promise<Array<void>> {
const resultsByKey = new Map(keyResults.map(pair => [pair.key, pair.result]));
await this._storeResults(resultsByKey);
return new Array(keyResults.length);
}

store(key: string, result: CachedResult) {
Expand Down
30 changes: 14 additions & 16 deletions packager/src/lib/__tests__/BatchProcessor-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@

'use strict';

jest.dontMock('../BatchProcessor');
jest
.useRealTimers()
.dontMock('../BatchProcessor');

const BatchProcessor = require('../BatchProcessor');

Expand All @@ -21,29 +23,27 @@ describe('BatchProcessor', () => {
concurrency: 2,
};

it('aggregate items concurrently', () => {
it('aggregate items concurrently', async () => {
const input = [...Array(9).keys()].slice(1);
const transform = e => e * 10;
const batches = [];
let concurrency = 0;
let maxConcurrency = 0;
const bp = new BatchProcessor(options, (items, callback) => {
const bp = new BatchProcessor(options, (items) => new Promise(resolve => {
++concurrency;
expect(concurrency).toBeLessThanOrEqual(options.concurrency);
maxConcurrency = Math.max(maxConcurrency, concurrency);
batches.push(items);
setTimeout(() => {
callback(null, items.map(transform));
resolve(items.map(transform));
--concurrency;
}, 0);
});
}));
const results = [];
input.forEach(e => bp.queue(e).then(
await Promise.all(input.map(e => bp.queue(e).then(
res => results.push(res),
error => process.nextTick(() => { throw error; }),
));
jest.runAllTimers();
jest.runAllTicks();
)));
expect(batches).toEqual([
[1, 2, 3],
[4, 5, 6],
Expand All @@ -53,17 +53,15 @@ describe('BatchProcessor', () => {
expect(results).toEqual(input.map(transform));
});

it('report errors', () => {
it('report errors', async () => {
const error = new Error('oh noes');
const bp = new BatchProcessor(options, (items, callback) => {
setTimeout(callback.bind(null, error), 0);
});
const bp = new BatchProcessor(options, (items) => new Promise((_, reject) => {
setTimeout(reject.bind(null, error), 0);
}));
let receivedError;
bp.queue('foo').catch(
await bp.queue('foo').catch(
err => { receivedError = err; },
);
jest.runAllTimers();
jest.runAllTicks();
expect(receivedError).toBe(error);
});

Expand Down

0 comments on commit 564126f

Please sign in to comment.