Skip to content

Commit

Permalink
stream: iterator helpers synchronous errors
Browse files Browse the repository at this point in the history
`streams/operators/map` is no longer a generator function, instead it returns a called generator
so that validation can happen synchronously and not wait for the first iteration

Fixes: nodejs#41648
  • Loading branch information
iMoses committed Jan 22, 2022
1 parent d01c645 commit 68713a4
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 91 deletions.
185 changes: 94 additions & 91 deletions lib/internal/streams/operators.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ const {
const kEmpty = Symbol('kEmpty');
const kEof = Symbol('kEof');

async function * map(fn, options) {
function map(fn, options) {
if (typeof fn !== 'function') {
throw new ERR_INVALID_ARG_TYPE(
'fn', ['Function', 'AsyncFunction'], fn);
Expand All @@ -41,118 +41,121 @@ async function * map(fn, options) {

validateInteger(concurrency, 'concurrency', 1);

const ac = new AbortController();
const stream = this;
const queue = [];
const signal = ac.signal;
const signalOpt = { signal };

const abort = () => ac.abort();
if (options?.signal?.aborted) {
abort();
}

options?.signal?.addEventListener('abort', abort);
return async function* map() {
const ac = new AbortController();
const queue = [];
const signal = ac.signal;
const signalOpt = { signal };

let next;
let resume;
let done = false;

function onDone() {
done = true;
}

async function pump() {
try {
for await (let val of stream) {
if (done) {
return;
}
const abort = () => ac.abort();
if (options?.signal?.aborted) {
abort();
}

if (signal.aborted) {
throw new AbortError();
}
options?.signal?.addEventListener('abort', abort);

try {
val = fn(val, signalOpt);
} catch (err) {
val = PromiseReject(err);
}
let next;
let resume;
let done = false;

if (val === kEmpty) {
continue;
}
function onDone() {
done = true;
}

if (typeof val?.catch === 'function') {
val.catch(onDone);
async function pump() {
try {
for await (let val of stream) {
if (done) {
return;
}

if (signal.aborted) {
throw new AbortError();
}

try {
val = fn(val, signalOpt);
} catch (err) {
val = PromiseReject(err);
}

if (val === kEmpty) {
continue;
}

if (typeof val?.catch === 'function') {
val.catch(onDone);
}

queue.push(val);
if (next) {
next();
next = null;
}

if (!done && queue.length && queue.length >= concurrency) {
await new Promise((resolve) => {
resume = resolve;
});
}
}

queue.push(kEof);
} catch (err) {
const val = PromiseReject(err);
PromisePrototypeCatch(val, onDone);
queue.push(val);
} finally {
done = true;
if (next) {
next();
next = null;
}

if (!done && queue.length && queue.length >= concurrency) {
await new Promise((resolve) => {
resume = resolve;
});
}
options?.signal?.removeEventListener('abort', abort);
}
queue.push(kEof);
} catch (err) {
const val = PromiseReject(err);
PromisePrototypeCatch(val, onDone);
queue.push(val);
} finally {
done = true;
if (next) {
next();
next = null;
}
options?.signal?.removeEventListener('abort', abort);
}
}

pump();

try {
while (true) {
while (queue.length > 0) {
const val = await queue[0];

if (val === kEof) {
return;
}

if (signal.aborted) {
throw new AbortError();
}
pump();

if (val !== kEmpty) {
yield val;
try {
while (true) {
while (queue.length > 0) {
const val = await queue[0];

if (val === kEof) {
return;
}

if (signal.aborted) {
throw new AbortError();
}

if (val !== kEmpty) {
yield val;
}

queue.shift();
if (resume) {
resume();
resume = null;
}
}

queue.shift();
if (resume) {
resume();
resume = null;
}
await new Promise((resolve) => {
next = resolve;
});
}
} finally {
ac.abort();

await new Promise((resolve) => {
next = resolve;
});
}
} finally {
ac.abort();

done = true;
if (resume) {
resume();
resume = null;
done = true;
if (resume) {
resume();
resume = null;
}
}
}
}();
}

async function some(fn, options) {
Expand Down
4 changes: 4 additions & 0 deletions test/parallel/test-stream-map.js
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ const { setTimeout } = require('timers/promises');

{
// Error cases
assert.rejects(async () => {
// Validation errors thrown synchronously
Readable.from([1]).map(1);
}, /ERR_INVALID_ARG_TYPE/).then(common.mustCall());
assert.rejects(async () => {
// eslint-disable-next-line no-unused-vars
for await (const unused of Readable.from([1]).map(1));
Expand Down

0 comments on commit 68713a4

Please sign in to comment.