Skip to content

Commit

Permalink
Add encodeAsIterator/encodeAsAsyncIterator for support of streaming e…
Browse files Browse the repository at this point in the history
…ncoding, #57
  • Loading branch information
kriszyp committed Dec 14, 2022
1 parent db15202 commit 3f2e2a2
Show file tree
Hide file tree
Showing 3 changed files with 192 additions and 3 deletions.
120 changes: 119 additions & 1 deletion encode.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ let extensions, extensionClasses
const hasNodeBuffer = typeof Buffer !== 'undefined'
const ByteArrayAllocate = hasNodeBuffer ? Buffer.allocUnsafeSlow : Uint8Array
const ByteArray = hasNodeBuffer ? Buffer : Uint8Array
const RECORD_INLINE_ID = 0xdfff // temporary first-come first-serve tag // proposed tag: 0x7265 // 're'
const MAX_STRUCTURES = 0x100
const MAX_BUFFER_SIZE = hasNodeBuffer ? 0x100000000 : 0x7fd00000
let serializationId = 1
let throwOnIterator
let target
let targetView
let position = 0
Expand Down Expand Up @@ -175,6 +175,7 @@ export class Encoder extends Decoder {
}
}
}
throwOnIterator = encodeOptions & THROW_ON_ITERATOR;
try {
encode(value)
if (bundledStrings) {
Expand Down Expand Up @@ -536,13 +537,23 @@ export class Encoder extends Decoder {
}
}
if (value[Symbol.iterator]) {
if (throwOnIterator) {
let error = new Error('Iterator should be serialized as iterator')
error.iteratorNotHandled = true;
throw error;
}
target[position++] = 0x9f // indefinite length array
for (let entry of value) {
encode(entry)
}
target[position++] = 0xff // stop-code
return
}
if (value[Symbol.asyncIterator] || constructor === Blob) {
let error = new Error('Iterator/blob should be serialized as iterator')
error.iteratorNotHandled = true;
throw error;
}
// no extension found, write as object
writeObject(value, !value.hasOwnProperty) // if it doesn't have hasOwnProperty, don't do hasOwnProperty checks
}
Expand Down Expand Up @@ -741,6 +752,98 @@ export class Encoder extends Decoder {
safeEnd = newBuffer.length - 10
return target = newBuffer
}
this.encodeAsIterator = function(value) {
if (value && typeof value === 'object') {
return encodeObjectAsIterator.call(encoder, value, encoder.iterateProperties || (encoder.iterateProperties = {}));
}
return encoder.encode(value);
}
function* encodeObjectAsIterator(object, iterateProperties) {
if (object[Symbol.iterator]) {
yield new Uint8Array([0x9f]); // start indefinite array
for (let value of object) {
if (value && typeof value === 'object' && iterateProperties.element)
yield* encodeObjectAsIterator.call(this, value, iterateProperties.element);
else {
try {
yield this.encode(value, THROW_ON_ITERATOR);
} catch (error) {
if (error.iteratorNotHandled) {
yield* encodeObjectAsIterator.call(this, value, iterateProperties.element = {});
} else
throw error;
}
}
}
yield new Uint8Array([0xff]); // stop byte
return;
}
let constructor = object.constructor;
if (constructor === Object) {
yield encodeLength(Object.keys(object).length, 0xa0);
for (let key in object) {
let value = object[key];
yield this.encode(key);
if (value && typeof value === 'object' && iterateProperties[key]) {
yield* encodeObjectAsIterator.call(this, value, iterateProperties[key]);
} else {
try {
yield this.encode(value, THROW_ON_ITERATOR);
} catch (error) {
if (error.iteratorNotHandled) {
yield* encodeObjectAsIterator.call(this, value, iterateProperties[key] = {});
} else
throw error;
}
}
}
} else if (constructor === Array) {
yield encodeLength(object.length, 0x80);
for (let i = 0; i < length; i++) {
let value = object[i];
if (value && typeof value === 'object' && iterateProperties.element)
yield* encodeObjectAsIterator.call(this, value, iterateProperties.element);
else {
try {
yield this.encode(value, THROW_ON_ITERATOR);
} catch (error) {
if (error.iteratorNotHandled) {
yield* encodeObjectAsIterator.call(this, value, iterateProperties.element = {});
} else
throw error;
}
}
}
} else if (constructor === Blob || object[Symbol.asyncIterator]) {
yield object; // directly return blobs and async iterators, they have to be encoded asynchronously
} else {
yield this.encode(object);
}
}
this.encodeAsAsyncIterator = async function*(value) {
for (let encodedValue of encoder.encodeAsIterator(value)) {
let constructor = encodedValue.constructor;
if (constructor === Buffer || constructor === Uint8Array)
yield encodedValue;
else if (constructor === Blob) {
yield encodeLength(encodedValue.size, 0x40); // encode as binary data
let reader = encodedValue.stream().getReader();
let next;
while (!(next = await reader.read()).done) {
yield next.value;
}
} else if (encodedValue[Symbol.asyncIterator]) {
yield new Uint8Array([0x9f]); // start indefinite array
for await (let asyncValue of encodedValue) {
yield encoder.encode(asyncValue); // TODO: need to recursively encode as async iterator
}
yield new Uint8Array([0xff]); // stop byte
} else {
yield encodedValue;
}
}
}

}
useBuffer(buffer) {
// this means we are finished using our own buffer and we can write over it safely
Expand Down Expand Up @@ -776,6 +879,17 @@ export class Encoder extends Decoder {
return saveResults
}
}
function encodeLength(length, majorValue) {
if (length < 0x18) {
return new Uint8Array([majorValue | length]);
} else if (length < 0x100) {
return new Uint8Array([majorValue || 0x18, length]);
} else if (length < 0x10000) {
return new Uint8Array([majorValue || 0x19, length >> 8, length & 0xff]);
} else {
return new Uint8Array([majorValue || 0x1a, length >> 24, (length >> 16) & 0xff, (length >> 8) & 0xff, length & 0xff]);
}
}
class SharedData {
constructor(structures, values, version) {
this.structures = structures
Expand Down Expand Up @@ -1033,9 +1147,13 @@ export function addExtension(extension) {
}
let defaultEncoder = new Encoder({ useRecords: false })
export const encode = defaultEncoder.encode
export const encodeAsIterator = defaultEncoder.encodeAsIterator
export const encodeAsAsyncIterator = defaultEncoder.encodeAsAsyncIterator
export { FLOAT32_OPTIONS } from './decode.js'
import { FLOAT32_OPTIONS } from './decode.js'
export const { NEVER, ALWAYS, DECIMAL_ROUND, DECIMAL_FIT } = FLOAT32_OPTIONS
export const REUSE_BUFFER_MODE = 512
export const RESET_BUFFER_MODE = 1024
export const THROW_ON_ITERATOR = 2048


2 changes: 1 addition & 1 deletion index.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
export { Encoder, addExtension, encode, NEVER, ALWAYS, DECIMAL_ROUND, DECIMAL_FIT, REUSE_BUFFER_MODE } from './encode.js'
export { Encoder, addExtension, encode, encodeAsIterator, encodeAsAsyncIterator, NEVER, ALWAYS, DECIMAL_ROUND, DECIMAL_FIT, REUSE_BUFFER_MODE } from './encode.js'
export { Tag, Decoder, decodeMultiple, decode, FLOAT32_OPTIONS, clearSource, roundFloat32, isNativeAccelerationEnabled } from './decode.js'
export { decodeIter, encodeIter } from './iterators.js'
73 changes: 72 additions & 1 deletion tests/test.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ var EncoderStream = CBOR.EncoderStream
var DecoderStream = CBOR.DecoderStream
var decode = CBOR.decode
var encode = CBOR.encode
var encodeAsIterator = CBOR.encodeAsIterator
var encodeAsAsyncIterator = CBOR.encodeAsAsyncIterator
var DECIMAL_FIT = CBOR.DECIMAL_FIT

var addExtension = CBOR.addExtension
Expand All @@ -50,7 +52,7 @@ suite('CBOR basic tests', function(){
var data = senmlData
let cborSenml = new Encoder({ useRecords: false, keyMap: senmlKeys })
let cborBasic = new Encoder()
var serialized = cborSenml.encode(data)
var serialized = cborSenml.encode(data)
var deserialized = cborSenml.decode(serialized)
assert(serialized.length < cborBasic.encode(data).length)
assert.deepEqual(deserialized, data)
Expand Down Expand Up @@ -700,6 +702,75 @@ suite('CBOR basic tests', function(){
let badInput = Buffer.from('7b2273657269616c6e6f223a2265343a30222c226970223a223139322e3136382e312e3335222c226b6579223a226770735f736563726574227d', 'hex');
assert.throws(function(){ decode(badInput) }) // should throw, not crash
})
test('encode as iterator', function() {
let hasIterators = {
a: 1,
iterator: (function*() {
yield 2;
yield {
b: (function*() {
yield 3;
})(),
};
})()
};
let encodedIterator = encodeAsIterator(hasIterators);
let result = [...encodedIterator];
assert(result.length > 10);
result = Buffer.concat(result);
let deserialized = decode(result);
const expectedResult = {
a: 1,
iterator: [2, { b: [3]}]
};
assert.deepEqual(deserialized, expectedResult);
});
test('encode as iterator with async/blob parts', function() {
let blob = new Blob([Buffer.from([4,5])]);
let hasIterators = {
a: 1,
iterator: (async function*() {
yield 2;
yield {
b: (function*() {
yield 3;
})(),
};
})(),
blob
};
let encodedIterator = encodeAsIterator(hasIterators);
let result = [...encodedIterator];
assert.equal(result[result.length - 1].constructor, Blob);
});
test('encode as async iterator with async/blob parts', async function() {
let blob = new Blob([Buffer.from([4, 5])]);
let hasIterators = {
a: 1,
iterator: (async function* () {
yield 2;
yield {
b: (function* () {
yield 3;
})(),
};
})(),
blob
};
let encodedIterator = encodeAsAsyncIterator(hasIterators);
let result = [];
for await (let encodedPart of encodedIterator) {
result.push(encodedPart)
}
let deserialized = decode(Buffer.concat(result));
const expectedResult = {
a: 1,
iterator: [2, { b: [3]}],
blob: Buffer.from([4,5]),
};
assert.deepEqual(deserialized, expectedResult);
});

})
suite('CBOR performance tests', function(){
test('performance JSON.parse', function() {
Expand Down

0 comments on commit 3f2e2a2

Please sign in to comment.