Skip to content

Commit

Permalink
Rework to continue serialization in same buffer until it is necessary…
Browse files Browse the repository at this point in the history
… to yield, #57
  • Loading branch information
kriszyp committed Dec 20, 2022
1 parent fb50510 commit 87a9904
Show file tree
Hide file tree
Showing 2 changed files with 122 additions and 65 deletions.
162 changes: 99 additions & 63 deletions encode.js
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,8 @@ export class Encoder extends Decoder {
}
}
throwOnIterator = encodeOptions & THROW_ON_ITERATOR;
if (throwOnIterator)
return;
try {
encode(value)
if (bundledStrings) {
Expand Down Expand Up @@ -752,79 +754,107 @@ export class Encoder extends Decoder {
safeEnd = newBuffer.length - 10
return target = newBuffer
}
this.encodeAsIterator = function(value) {
if (value && typeof value === 'object') {
return encodeObjectAsIterator.call(encoder, value, encoder.iterateProperties || (encoder.iterateProperties = {}));
}
return encoder.encode(value);
let chunkThreshold = 100;
let continuedChunkThreshold = 1000;
this.encodeAsIterator = function(value, options) {
return startEncoding(value, options, encodeObjectAsIterator);
}
this.encodeAsAsyncIterator = function(value, options) {
return startEncoding(value, options, encodeObjectAsAsyncIterator);
}
function* encodeObjectAsIterator(object, iterateProperties) {

function* encodeObjectAsIterator(object, iterateProperties, finalIterator) {
let constructor = object.constructor;
if (constructor === Object) {
yield encodeLength(Object.keys(object).length, 0xa0);
writeEntityLength(Object.keys(object).length, 0xa0);
for (let key in object) {
let value = object[key];
yield this.encode(key);
if (value && typeof value === 'object' && iterateProperties[key]) {
yield* encodeObjectAsIterator.call(this, value, iterateProperties[key]);
} else {
try {
yield this.encode(value, THROW_ON_ITERATOR);
} catch (error) {
if (error.iteratorNotHandled) {
yield* encodeObjectAsIterator.call(this, value, iterateProperties[key] = {});
} else
throw error;
}
}
encode(key);
if (value && typeof value === 'object') {
if (iterateProperties[key])
yield* encodeObjectAsIterator(value, iterateProperties[key]);
else
yield* tryEncode(value, iterateProperties, key);
} else encode(value);
}
} else if (constructor === Array) {
yield encodeLength(object.length, 0x80);
writeArrayHeader(object.length);
for (let i = 0; i < length; i++) {
let value = object[i];
if (value && typeof value === 'object' && iterateProperties.element)
yield* encodeObjectAsIterator.call(this, value, iterateProperties.element);
else {
try {
yield this.encode(value, THROW_ON_ITERATOR);
} catch (error) {
if (error.iteratorNotHandled) {
yield* encodeObjectAsIterator.call(this, value, iterateProperties.element = {});
} else
throw error;
}
}
if (value && (typeof value === 'object' || position - start > chunkThreshold)) {
if (iterateProperties.element)
yield* encodeObjectAsIterator(value, iterateProperties.element);
else
yield* tryEncode(value, iterateProperties, 'element');
} else encode(value);
}
} else if (object[Symbol.iterator]) {
yield new Uint8Array([0x9f]); // start indefinite array
target[position++] = 0x9f; // start indefinite array
for (let value of object) {
if (value && typeof value === 'object' && iterateProperties.element)
yield* encodeObjectAsIterator.call(this, value, iterateProperties.element);
else {
try {
yield this.encode(value, THROW_ON_ITERATOR);
} catch (error) {
if (error.iteratorNotHandled) {
yield* encodeObjectAsIterator.call(this, value, iterateProperties.element = {});
} else
throw error;
}
}
if (value && (typeof value === 'object' || position - start > chunkThreshold)) {
if (iterateProperties.element)
yield* encodeObjectAsIterator(value, iterateProperties.element);
else
yield* tryEncode(value, iterateProperties, 'element');
} else encode(value);
}
yield new Uint8Array([0xff]); // stop byte
target[position++] = 0xff; // stop byte
} else if (constructor === Blob){
yield encodeLength(object.size, 0x40); // encode as binary data
writeEntityLength(object.size, 0x40); // encode as binary data
yield target.subarray(start, position);
yield object; // directly return blobs, they have to be encoded asynchronously
restartEncoding();
} else if (object[Symbol.asyncIterator]) {
target[position++] = 0x9f; // start indefinite array
yield target.subarray(start, position);
yield object; // directly return async iterators, they have to be encoded asynchronously
restartEncoding();
target[position++] = 0xff; // stop byte
} else {
yield this.encode(object);
encode(object);
}
if (finalIterator && position > start) yield target.subarray(start, position);
else if (position - start > chunkThreshold) {
yield target.subarray(start, position);
restartEncoding();
}
}
function* tryEncode(value, iterateProperties, key) {
let restart = position - start;
try {
encode(value);
if (position - start > chunkThreshold) {
yield target.subarray(start, position);
restartEncoding();
}
} catch (error) {
if (error.iteratorNotHandled) {
iterateProperties[key] = {};
position = start + restart; // restart our position so we don't have partial data from last encode
yield* encodeObjectAsIterator.call(this, value, iterateProperties[key]);
} else throw error;
}
}
function restartEncoding() {
chunkThreshold = continuedChunkThreshold;
encoder.encode(null, THROW_ON_ITERATOR); // restart encoding
}
function startEncoding(value, options, encodeIterator) {
if (options && options.chunkThreshold) // explicitly specified chunk sizes
chunkThreshold = continuedChunkThreshold = options.chunkThreshold;
else // we start with a smaller threshold to get initial bytes sent quickly
chunkThreshold = 100;
if (value && typeof value === 'object') {
encoder.encode(null, THROW_ON_ITERATOR); // start encoding
return encodeIterator(value, encoder.iterateProperties || (encoder.iterateProperties = {}), true);
}
return [encoder.encode(value)];
}
this.encodeAsAsyncIterator = async function*(value) {
for (let encodedValue of encoder.encodeAsIterator(value)) {

async function* encodeObjectAsAsyncIterator(value, iterateProperties) {
for (let encodedValue of encodeObjectAsIterator(value, iterateProperties, true)) {
let constructor = encodedValue.constructor;
if (constructor === Buffer || constructor === Uint8Array)
if (constructor === ByteArray || constructor === Uint8Array)
yield encodedValue;
else if (constructor === Blob) {
let reader = encodedValue.stream().getReader();
Expand All @@ -833,17 +863,17 @@ export class Encoder extends Decoder {
yield next.value;
}
} else if (encodedValue[Symbol.asyncIterator]) {
yield new Uint8Array([0x9f]); // start indefinite array
for await (let asyncValue of encodedValue) {
yield encoder.encode(asyncValue); // TODO: need to recursively encode as async iterator
restartEncoding();
if (asyncValue)
yield* encodeObjectAsAsyncIterator(asyncValue, iterateProperties.async || (iterateProperties.async = {}));
else yield encoder.encode(asyncValue);
}
yield new Uint8Array([0xff]); // stop byte
} else {
yield encodedValue;
}
}
}

}
useBuffer(buffer) {
// this means we are finished using our own buffer and we can write over it safely
Expand Down Expand Up @@ -879,16 +909,22 @@ export class Encoder extends Decoder {
return saveResults
}
}
function encodeLength(length, majorValue) {
if (length < 0x18) {
return new Uint8Array([majorValue | length]);
} else if (length < 0x100) {
return new Uint8Array([majorValue || 0x18, length]);
function writeEntityLength(length, majorValue) {
if (length < 0x18)
target[position++] = majorValue | length
else if (length < 0x100) {
target[position++] = majorValue | 0x18
target[position++] = length
} else if (length < 0x10000) {
return new Uint8Array([majorValue || 0x19, length >> 8, length & 0xff]);
target[position++] = majorValue | 0x19
target[position++] = length >> 8
target[position++] = length & 0xff
} else {
return new Uint8Array([majorValue || 0x1a, length >> 24, (length >> 16) & 0xff, (length >> 8) & 0xff, length & 0xff]);
target[position++] = majorValue | 0x1a
targetView.setUint32(position, length)
position += 4
}

}
class SharedData {
constructor(structures, values, version) {
Expand Down
25 changes: 23 additions & 2 deletions tests/test.js
Original file line number Diff line number Diff line change
Expand Up @@ -716,7 +716,6 @@ suite('CBOR basic tests', function(){
};
let encodedIterator = encodeAsIterator(hasIterators);
let result = [...encodedIterator];
assert(result.length > 10);
result = Buffer.concat(result);
let deserialized = decode(result);
const expectedResult = {
Expand Down Expand Up @@ -770,7 +769,29 @@ suite('CBOR basic tests', function(){
};
assert.deepEqual(deserialized, expectedResult);
});

test.skip('encode as iterator performance', async function() {
function* iterator() {
for (let i = 0; i < 1000; i++) {
yield {
a: 1,
b: 'hello, world',
c: true,
sub: {
d: 'inside',
e: 3
}
}
}
}
let result;
let start = performance.now();
for (let i = 0; i < 1000; i++) {
let encodedIterator = encodeAsIterator(iterator());
result = [...encodedIterator];
}
let deserialized = decode(Buffer.concat(result));
console.log(performance.now() - start, result.length);
});
})
suite('CBOR performance tests', function(){
test('performance JSON.parse', function() {
Expand Down

0 comments on commit 87a9904

Please sign in to comment.