Skip to content

Commit

Permalink
fix: continue emitting items after non-raw item encountered on raw st…
Browse files Browse the repository at this point in the history
…ream
  • Loading branch information
andogq committed Jul 5, 2024
1 parent 570d3ec commit 784adb4
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 1 deletion.
5 changes: 5 additions & 0 deletions .changeset/heavy-insects-fly.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"windpipe": patch
---

fix: continue emitting stream items after encountering non-raw item on raw stream
2 changes: 1 addition & 1 deletion src/stream/consumption.ts
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ export class StreamConsumption<T, E> extends StreamBase {
const message = `Stream indicated it would emit raw values but emitted a '${typeof atom.value}' object`;
console.error(message);
s.emit("error", new Error(message));
break;
continue;
}

// Show a warning if any atom value is null
Expand Down
25 changes: 25 additions & 0 deletions test/consumption.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,18 @@ describe.concurrent("stream consumption", () => {
const streamPromise = promisifyStream(stream);
expect(streamPromise).rejects.toBeTruthy();
});

test("continue emitting items after non-raw item in raw stream", async ({ expect }) => {
expect.assertions(2);

const stream = $.from([1, "valid"]).toReadable("raw");

const { data, errors } = await emptyStream(stream);

expect(data).toHaveLength(1);
expect(errors).toHaveLength(1);
});

});
});

Expand All @@ -137,3 +149,16 @@ function promisifyStream(stream: Readable): Promise<unknown[]> {
stream.on("end", () => resolve(data));
});
}

async function emptyStream(stream: Readable): Promise<{ data: unknown[]; errors: unknown[] }> {
const errors: unknown[] = [];
const data: unknown[] = [];

await new Promise<void>((resolve) => {
stream.on("data", (d) => data.push(d));
stream.on("error", (e: string) => errors.push(e));
stream.on("end", () => resolve());
});

return { data, errors };
}

0 comments on commit 784adb4

Please sign in to comment.