From 564126f2bf23f87bc80d9d02c7a9ed227dd1bb4e Mon Sep 17 00:00:00 2001 From: Jean Lauliac Date: Mon, 20 Feb 2017 08:36:01 -0800 Subject: [PATCH] packager: BatchProcessor: use Promise for processBatch() Reviewed By: cpojer Differential Revision: D4572495 fbshipit-source-id: 4a18b6ae16ea588104c337f2085707be07609005 --- package.json | 1 + packager/src/lib/BatchProcessor.js | 56 +++++++++---------- packager/src/lib/GlobalTransformCache.js | 49 ++++++---------- .../src/lib/__tests__/BatchProcessor-test.js | 30 +++++----- 4 files changed, 61 insertions(+), 75 deletions(-) diff --git a/package.json b/package.json index b224cdbdff50a8..a2825abd13d47d 100644 --- a/package.json +++ b/package.json @@ -151,6 +151,7 @@ "event-target-shim": "^1.0.5", "fbjs": "^0.8.9", "fbjs-scripts": "^0.7.0", + "form-data": "^2.1.1", "fs-extra": "^0.26.2", "glob": "^5.0.15", "graceful-fs": "^4.1.3", diff --git a/packager/src/lib/BatchProcessor.js b/packager/src/lib/BatchProcessor.js index 586fd397026fd9..c26b36d518aea5 100644 --- a/packager/src/lib/BatchProcessor.js +++ b/packager/src/lib/BatchProcessor.js @@ -13,10 +13,7 @@ const invariant = require('fbjs/lib/invariant'); -type ProcessBatch = ( - batch: Array, - callback: (error?: Error, orderedResults?: Array) => mixed, -) => mixed; +type ProcessBatch = (batch: Array) => Promise>; type BatchProcessorOptions = { maximumDelayMs: number, @@ -45,10 +42,7 @@ class BatchProcessor { _queue: Array>; _timeoutHandle: ?number; - constructor( - options: BatchProcessorOptions, - processBatch: ProcessBatch, - ) { + constructor(options: BatchProcessorOptions, processBatch: ProcessBatch) { this._options = options; this._processBatch = processBatch; this._queue = []; @@ -57,30 +51,36 @@ class BatchProcessor { (this: any)._processQueue = this._processQueue.bind(this); } + _onBatchFinished() { + this._currentProcessCount--; + this._processQueueOnceReady(); + } + + _onBatchResults(jobs: Array>, results: Array) { + invariant(results.length === jobs.length, 'Not enough results returned.'); + for (let i = 0; i < jobs.length; ++i) { + jobs[i].resolve(results[i]); + } + this._onBatchFinished(); + } + + _onBatchError(jobs: Array>, error: mixed) { + for (let i = 0; i < jobs.length; ++i) { + jobs[i].reject(error); + } + this._onBatchFinished(); + } + _processQueue() { this._timeoutHandle = null; - while ( - this._queue.length > 0 && - this._currentProcessCount < this._options.concurrency - ) { + const {concurrency} = this._options; + while (this._queue.length > 0 && this._currentProcessCount < concurrency) { this._currentProcessCount++; const jobs = this._queue.splice(0, this._options.maximumItems); - const items = jobs.map(job => job.item); - this._processBatch(items, (error, results) => { - if (error != null) { - for (let i = 0; i < jobs.length; ++i) { - jobs[i].reject(error); - } - } else { - invariant(results != null, 'Neither results or error were returned.'); - invariant(results.length === items.length, 'Not enough results returned.'); - for (let i = 0; i < jobs.length; ++i) { - jobs[i].resolve(results[i]); - } - } - this._currentProcessCount--; - this._processQueueOnceReady(); - }); + this._processBatch(jobs.map(job => job.item)).then( + this._onBatchResults.bind(this, jobs), + this._onBatchError.bind(this, jobs), + ); } } diff --git a/packager/src/lib/GlobalTransformCache.js b/packager/src/lib/GlobalTransformCache.js index 2aa831bcb0a08e..ce132b21aed888 100644 --- a/packager/src/lib/GlobalTransformCache.js +++ b/packager/src/lib/GlobalTransformCache.js @@ -23,15 +23,8 @@ import type {Options as TransformOptions} from '../JSTransformer/worker/worker'; import type {CachedResult, GetTransformCacheKey} from './TransformCache'; import type {Reporter} from './reporting'; -type FetchResultURIs = ( - keys: Array, - callback: (error?: Error, results?: Map) => void, -) => mixed; - -type StoreResults = ( - resultsByKey: Map, - callback: (error?: Error) => void, -) => mixed; +type FetchResultURIs = (keys: Array) => Promise>; +type StoreResults = (resultsByKey: Map) => Promise; type FetchProps = { filePath: string, @@ -60,17 +53,15 @@ class KeyURIFetcher { * and we proceed as if there were no result for these keys instead. That way * a build will not fail just because of the cache. */ - _processKeys( - keys: Array, - callback: (error?: Error, keyURIs: Array) => mixed, - ) { - this._fetchResultURIs(keys, (error, URIsByKey) => { - if (error != null) { - this._processError(error); - } - const URIs = keys.map(key => URIsByKey && URIsByKey.get(key)); - callback(undefined, URIs); - }); + async _processKeys(keys: Array): Promise> { + let URIsByKey; + try { + URIsByKey = await this._fetchResultURIs(keys); + } catch (error) { + this._processError(error); + return new Array(keys.length); + } + return keys.map(key => URIsByKey.get(key)); } fetch(key: string, callback: FetchURICallback) { @@ -92,21 +83,17 @@ class KeyURIFetcher { } +type KeyedResult = {key: string, result: CachedResult}; + class KeyResultStore { _storeResults: StoreResults; - _batchProcessor: BatchProcessor<{key: string, result: CachedResult}, void>; + _batchProcessor: BatchProcessor; - _processResults( - keyResults: Array<{key: string, result: CachedResult}>, - callback: (error?: Error) => mixed, - ) { - const resultsByKey = new Map( - keyResults.map(pair => [pair.key, pair.result]), - ); - this._storeResults(resultsByKey, error => { - callback(error); - }); + async _processResults(keyResults: Array): Promise> { + const resultsByKey = new Map(keyResults.map(pair => [pair.key, pair.result])); + await this._storeResults(resultsByKey); + return new Array(keyResults.length); } store(key: string, result: CachedResult) { diff --git a/packager/src/lib/__tests__/BatchProcessor-test.js b/packager/src/lib/__tests__/BatchProcessor-test.js index facebd143936ed..9e75f3841899c3 100644 --- a/packager/src/lib/__tests__/BatchProcessor-test.js +++ b/packager/src/lib/__tests__/BatchProcessor-test.js @@ -9,7 +9,9 @@ 'use strict'; -jest.dontMock('../BatchProcessor'); +jest + .useRealTimers() + .dontMock('../BatchProcessor'); const BatchProcessor = require('../BatchProcessor'); @@ -21,29 +23,27 @@ describe('BatchProcessor', () => { concurrency: 2, }; - it('aggregate items concurrently', () => { + it('aggregate items concurrently', async () => { const input = [...Array(9).keys()].slice(1); const transform = e => e * 10; const batches = []; let concurrency = 0; let maxConcurrency = 0; - const bp = new BatchProcessor(options, (items, callback) => { + const bp = new BatchProcessor(options, (items) => new Promise(resolve => { ++concurrency; expect(concurrency).toBeLessThanOrEqual(options.concurrency); maxConcurrency = Math.max(maxConcurrency, concurrency); batches.push(items); setTimeout(() => { - callback(null, items.map(transform)); + resolve(items.map(transform)); --concurrency; }, 0); - }); + })); const results = []; - input.forEach(e => bp.queue(e).then( + await Promise.all(input.map(e => bp.queue(e).then( res => results.push(res), error => process.nextTick(() => { throw error; }), - )); - jest.runAllTimers(); - jest.runAllTicks(); + ))); expect(batches).toEqual([ [1, 2, 3], [4, 5, 6], @@ -53,17 +53,15 @@ describe('BatchProcessor', () => { expect(results).toEqual(input.map(transform)); }); - it('report errors', () => { + it('report errors', async () => { const error = new Error('oh noes'); - const bp = new BatchProcessor(options, (items, callback) => { - setTimeout(callback.bind(null, error), 0); - }); + const bp = new BatchProcessor(options, (items) => new Promise((_, reject) => { + setTimeout(reject.bind(null, error), 0); + })); let receivedError; - bp.queue('foo').catch( + await bp.queue('foo').catch( err => { receivedError = err; }, ); - jest.runAllTimers(); - jest.runAllTicks(); expect(receivedError).toBe(error); });