Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(cat-voices): implement web worker for compression logic #1020

Merged
merged 22 commits into from
Oct 18, 2024
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ final class RegistrationTransactionBuilder {

return _buildUnsignedRbacTx(
auxiliaryData: AuxiliaryData.fromCbor(
x509Envelope.toCbor(serializer: (e) => e.toCbor()),
await x509Envelope.toCbor(serializer: (e) => e.toCbor()),
),
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ Future<void> _signAndSubmitRbacTx({
);

final auxiliaryData = AuxiliaryData.fromCbor(
x509Envelope.toCbor(serializer: (e) => e.toCbor()),
await x509Envelope.toCbor(serializer: (e) => e.toCbor()),
);

final unsignedTx = _buildUnsignedRbacTx(
Expand Down Expand Up @@ -122,7 +122,9 @@ Future<X509MetadataEnvelope<RegistrationData>> _buildMetadataEnvelope({

print('unsigned x509 envelope:');
print(
hex.encode(cbor.encode(x509Envelope.toCbor(serializer: (e) => e.toCbor()))),
hex.encode(
cbor.encode(await x509Envelope.toCbor(serializer: (e) => e.toCbor())),
),
);

final signedX509Envelope = await x509Envelope.sign(
Expand All @@ -133,7 +135,9 @@ Future<X509MetadataEnvelope<RegistrationData>> _buildMetadataEnvelope({
print('signed x509 envelope:');
print(
hex.encode(
cbor.encode(signedX509Envelope.toCbor(serializer: (e) => e.toCbor())),
cbor.encode(
await signedX509Envelope.toCbor(serializer: (e) => e.toCbor()),
),
),
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,16 +129,16 @@ final class X509MetadataEnvelope<T> extends Equatable {
///
/// The [deserializer] in most cases is going
/// to be [RegistrationData.fromCbor].
factory X509MetadataEnvelope.fromCbor(
static Future<X509MetadataEnvelope<T>> fromCbor<T>(
CborValue value, {
required ChunkedDataDeserializer<T> deserializer,
}) {
}) async {
final metadata = value as CborMap;
final envelope = metadata[const CborSmallInt(509)]! as CborMap;
final purpose = envelope[const CborSmallInt(0)]! as CborBytes;
final txInputsHash = envelope[const CborSmallInt(1)]!;
final previousTransactionId = envelope[const CborSmallInt(2)];
final chunkedData = _deserializeChunkedData(envelope);
final chunkedData = await _deserializeChunkedData(envelope);
final validationSignature = envelope[const CborSmallInt(99)]!;

return X509MetadataEnvelope(
Expand All @@ -155,10 +155,12 @@ final class X509MetadataEnvelope<T> extends Equatable {
/// Serializes the type as cbor.
///
/// The [serializer] in most cases is going to be [RegistrationData.toCbor].
CborValue toCbor({required ChunkedDataSerializer<T> serializer}) {
Future<CborValue> toCbor({
required ChunkedDataSerializer<T> serializer,
}) async {
final chunkedData = this.chunkedData;
final metadata = chunkedData != null
? _serializeChunkedData(serializer(chunkedData))
? await _serializeChunkedData(serializer(chunkedData))
: null;

return CborMap({
Expand All @@ -182,7 +184,7 @@ final class X509MetadataEnvelope<T> extends Equatable {
required Ed25519PrivateKey privateKey,
required ChunkedDataSerializer<T> serializer,
}) async {
final bytes = cbor.encode(toCbor(serializer: serializer));
final bytes = cbor.encode(await toCbor(serializer: serializer));
final signature = await privateKey.sign(bytes);
return withValidationSignature(signature);
}
Expand All @@ -197,7 +199,7 @@ final class X509MetadataEnvelope<T> extends Equatable {
required ChunkedDataSerializer<T> serializer,
}) async {
final envelope = withValidationSignature(Ed25519Signature.seeded(0));
final bytes = cbor.encode(envelope.toCbor(serializer: serializer));
final bytes = cbor.encode(await envelope.toCbor(serializer: serializer));
return signature.verify(bytes, publicKey: publicKey);
}

Expand All @@ -215,7 +217,7 @@ final class X509MetadataEnvelope<T> extends Equatable {
);
}

static CborValue? _deserializeChunkedData(CborMap map) {
static Future<CborValue?> _deserializeChunkedData(CborMap map) async {
final rawCbor = map[const CborSmallInt(10)] as CborList?;
if (rawCbor != null) {
final bytes = _unchunkCborBytes(rawCbor);
Expand All @@ -226,27 +228,27 @@ final class X509MetadataEnvelope<T> extends Equatable {
if (brotliCbor != null) {
final bytes = _unchunkCborBytes(brotliCbor);
final uncompressedBytes =
CatalystCompression.instance.brotli.decompress(bytes);
await CatalystCompression.instance.brotli.decompress(bytes);
return cbor.decode(uncompressedBytes);
}

final zstdCbor = map[const CborSmallInt(12)] as CborList?;
if (zstdCbor != null) {
final bytes = _unchunkCborBytes(zstdCbor);
final uncompressedBytes =
CatalystCompression.instance.zstd.decompress(bytes);
await CatalystCompression.instance.zstd.decompress(bytes);
return cbor.decode(uncompressedBytes);
}

return null;
}

static MapEntry<CborSmallInt, CborList> _serializeChunkedData(
static Future<MapEntry<CborSmallInt, CborList>> _serializeChunkedData(
CborValue value,
) {
) async {
final rawBytes = cbor.encode(value);
final brotliBytes = _compressBrotli(rawBytes);
final zstdBytes = _compressZstd(rawBytes);
final brotliBytes = await _compressBrotli(rawBytes);
final zstdBytes = await _compressZstd(rawBytes);

final bytesByKey = {
10: rawBytes,
Expand All @@ -266,15 +268,15 @@ final class X509MetadataEnvelope<T> extends Equatable {
);
}

static List<int>? _compressBrotli(List<int> bytes) {
static Future<List<int>?> _compressBrotli(List<int> bytes) async {
try {
return CatalystCompression.instance.brotli.compress(bytes);
return await CatalystCompression.instance.brotli.compress(bytes);
} on CompressionNotSupportedException {
return null;
}
}

static List<int>? _compressZstd(List<int> bytes) {
static Future<List<int>?> _compressZstd(List<int> bytes) async {
try {
return CatalystCompression.instance.zstd.compress(bytes);
} on CompressionNotSupportedException {
apskhem marked this conversation as resolved.
Show resolved Hide resolved
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,13 @@ E61E8EE7D77E9F7F9804E03EBC31B458
'''
.replaceAll('\n', '');

void main() {
Future<void> main() async {
final rawBytes = hex.decode(derCertHex);

// brotli
final brotli = CatalystCompression.instance.brotli;
final brotliCompressed = brotli.compress(rawBytes);
final brotliDecompressed = brotli.decompress(brotliCompressed);
final brotliCompressed = await brotli.compress(rawBytes);
final brotliDecompressed = await brotli.decompress(brotliCompressed);

assert(
listEquals(rawBytes, brotliDecompressed),
Expand All @@ -100,8 +100,8 @@ void main() {

// zstd
final zstd = CatalystCompression.instance.zstd;
final zstdCompressed = zstd.compress(rawBytes);
final zstdDecompressed = zstd.decompress(zstdCompressed);
final zstdCompressed = await zstd.compress(rawBytes);
final zstdDecompressed = await zstd.decompress(zstdCompressed);

assert(
listEquals(rawBytes, zstdDecompressed),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,13 @@ E61E8EE7D77E9F7F9804E03EBC31B458
'''
.replaceAll('\n', '');

void main() {
Future<void> main() async {
final rawBytes = hex.decode(derCertHex);

// brotli
final brotli = CatalystCompression.instance.brotli;
final brotliCompressed = brotli.compress(rawBytes);
final brotliDecompressed = brotli.decompress(brotliCompressed);
final brotliCompressed = await brotli.compress(rawBytes);
final brotliDecompressed = await brotli.decompress(brotliCompressed);

assert(
listEquals(rawBytes, brotliDecompressed),
Expand All @@ -57,8 +57,8 @@ void main() {

// zstd
final zstd = CatalystCompression.instance.zstd;
final zstdCompressed = zstd.compress(rawBytes);
final zstdDecompressed = zstd.decompress(zstdCompressed);
final zstdCompressed = await zstd.compress(rawBytes);
final zstdDecompressed = await zstd.decompress(zstdCompressed);

assert(
listEquals(rawBytes, zstdDecompressed),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@ abstract class CatalystCompressor {
///
/// Compressing and then decompressing the [bytes]
/// should yield the original [bytes].
List<int> compress(List<int> bytes);
Future<List<int>> compress(List<int> bytes);

/// Returns the list of decompressed [bytes].
///
/// Compressing and then decompressing the [bytes]
/// should yield the original [bytes].
List<int> decompress(List<int> bytes);
Future<List<int>> decompress(List<int> bytes);
}

/// Exception thrown when [CatalystCompressor.compress] can't compress
Expand Down
Original file line number Diff line number Diff line change
@@ -1,70 +1,86 @@
const brotli = await import("https://unpkg.com/[email protected]/index.web.js?module").then(m => m.default);
const zstd = await import("https://unpkg.com/@oneidentity/[email protected]/wasm/index.js?module");
// initialize a web worker for compression works.
// this is a persistent worker, will last for life of the app.
const compressionWorker = new Worker(new URL('./catalyst_compression_worker.js', import.meta.url));
apskhem marked this conversation as resolved.
Show resolved Hide resolved

// Initializes the zstd module, must be called before it can be used.
await zstd.ZstdInit();
const processingIdsPool = new Set();

/// Compresses hex bytes using brotli compression algorithm and returns compressed hex bytes.
function _brotliCompress(bytesHex) {
const bytes = _hexStringToUint8Array(bytesHex);
const compressedBytes = brotli.compress(bytes);
return _uint8ArrayToHexString(compressedBytes);
}
// A simple id generator function. The generated id must be unique across all the processing ids.
function generateId() {
let id
apskhem marked this conversation as resolved.
Show resolved Hide resolved
do {
const timestamp = Date.now().toString(36);
const randomNum = Math.random().toString(36).substring(2, 8);
id = `${timestamp}-${randomNum}`;
} while (processingIdsPool.has(id));
processingIdsPool.add(id);

/// Decompresses hex bytes using brotli compression algorithm and returns decompressed hex bytes.
function _brotliDecompress(bytesHex) {
const bytes = _hexStringToUint8Array(bytesHex);
const decompressedBytes = brotli.decompress(bytes);
return _uint8ArrayToHexString(decompressedBytes);
return id;
}

/// Compresses hex bytes using zstd compression algorithm and returns compressed hex bytes.
function _zstdCompress(bytesHex) {
const bytes = _hexStringToUint8Array(bytesHex);
const compressedBytes = zstd.ZstdSimple.compress(bytes);
return _uint8ArrayToHexString(compressedBytes);
}
function registerWorkerEventHandler(worker, handleMessage, handleError) {
const wrappedHandleMessage = (event) => handleMessage(event, complete);
const wrappedHandleError = (error) => handleError(error, complete);

/// Decompresses hex bytes using zstd compression algorithm and returns decompressed hex bytes.
function _zstdDecompress(bytesHex) {
const bytes = _hexStringToUint8Array(bytesHex);
const decompressedBytes = zstd.ZstdSimple.decompress(bytes);
return _uint8ArrayToHexString(decompressedBytes);
}

// Converts a hex string into a byte array.
function _hexStringToUint8Array(hexString) {
// Ensure the hex string length is even
if (hexString.length % 2 !== 0) {
throw new Error('Invalid hex string');
function complete() {
worker.removeEventListener("message", wrappedHandleMessage);
worker.removeEventListener("error", wrappedHandleError)
}

// Create a Uint8Array
const byteArray = new Uint8Array(hexString.length / 2);
worker.addEventListener("message", wrappedHandleMessage)
worker.addEventListener("error", wrappedHandleError)
}

// Parse the hex string into byte values
for (let i = 0; i < hexString.length; i += 2) {
byteArray[i / 2] = parseInt(hexString.substr(i, 2), 16);
}
// A function to create a compression function according to its name.
function runCompressionInWorker(fnName) {
return (data) => {
return new Promise((resolve, reject) => {
const id = generateId();

return byteArray;
}
registerWorkerEventHandler(
compressionWorker,
(event, complete) => {
const {
id: responseId,
result,
error,
initialized
} = event.data;

// skip the initializing completion event,
// and the id that is not itself.
if (initialized || responseId !== id) {
return;
}

if (result) {
resolve(result);
} else {
reject(error || 'Unexpected error');
}

processingIdsPool.delete(id);
complete();
},
(error, complete) => {
reject(error);

processingIdsPool.clear();
complete();
}
);

// Converts a byte array into a hex string.
function _uint8ArrayToHexString(uint8Array) {
return Array.from(uint8Array)
.map(byte => byte.toString(16).padStart(2, '0'))
.join('');
compressionWorker.postMessage({ id, action: fnName, bytesHex: data });
});
}
}


// A namespace containing the JS functions that
// can be executed from dart side
const catalyst_compression = {
brotliCompress: _brotliCompress,
brotliDecompress: _brotliDecompress,
zstdCompress: _zstdCompress,
zstdDecompress: _zstdDecompress,
brotliCompress: runCompressionInWorker("brotliCompress"),
brotliDecompress: runCompressionInWorker("brotliDecompress"),
zstdCompress: runCompressionInWorker("zstdCompress"),
zstdDecompress: runCompressionInWorker("zstdDecompress"),
}

// Expose catalyst compression as globally accessible
Expand Down
Loading
Loading