Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion packages/core/src/utils/transform/fromDevice.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@ export const fromDeviceStream: () => TransformStream<Uint8Array, DeviceOutput> =
let byteBuffer = new Uint8Array([]);
const textDecoder = new TextDecoder();
return new TransformStream<Uint8Array, DeviceOutput>({
transform(chunk: Uint8Array, controller): void {
transform(raw: Uint8Array | ArrayBuffer, controller): void {
// onReleaseEvent.subscribe(() => {
// controller.terminate();
// });
let chunk = raw instanceof Uint8Array ? raw : new Uint8Array(raw);
byteBuffer = new Uint8Array([...byteBuffer, ...chunk]);
let processingExhausted = false;
while (byteBuffer.length !== 0 && !processingExhausted) {
Expand Down
32 changes: 32 additions & 0 deletions packages/transport-ws/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# @meshtastic/transport-ws

[![JSR](https://jsr.io/badges/@meshtastic/transport-ws)](https://jsr.io/@meshtastic/transport-ws)
[![CI](https://img.shields.io/github/actions/workflow/status/meshtastic/js/ci.yml?branch=master&label=actions&logo=github&color=yellow)](https://github.com/meshtastic/js/actions/workflows/ci.yml)
[![CLA assistant](https://cla-assistant.io/readme/badge/meshtastic/meshtastic.js)](https://cla-assistant.io/meshtastic/meshtastic.js)
[![Fiscal Contributors](https://opencollective.com/meshtastic/tiers/badge.svg?label=Fiscal%20Contributors&color=deeppink)](https://opencollective.com/meshtastic/)
[![Vercel](https://img.shields.io/static/v1?label=Powered%20by&message=Vercel&style=flat&logo=vercel&color=000000)](https://vercel.com?utm_source=meshtastic&utm_campaign=oss)

## Overview

`@meshtastic/transport-ws` Provides WebSocket transport for Meshtastic
devices. Installation instructions are avaliable at
[JSR](https://jsr.io/@meshtastic/transport-ws)
[NPM](https://www.npmjs.com/package/@meshtastic/transport-ws)

## Usage

```ts
import { MeshDevice } from "@meshtastic/core";
import { TransportWebSocket } from "@meshtastic/transport-ws";

const transport = await TransportWebSocket.createFromUrl(new URL("ws://server:port/endpoint"));
const device = new MeshDevice(transport);
```

## Stats

![Alt](https://repobeats.axiom.co/api/embed/5330641586e92a2ec84676fedb98f6d4a7b25d69.svg "Repobeats analytics image")

### Compatibility

The WebSocket API's used here are available almost everywhere.
1 change: 1 addition & 0 deletions packages/transport-ws/mod.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export { TransportWebSocket } from "./src/transport.ts";
50 changes: 50 additions & 0 deletions packages/transport-ws/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
{
"name": "@meshtastic/transport-ws",
"version": "0.2.5",
"description": "A transport layer for Meshtastic applications using Web Serial API.",
"exports": {
".": "./mod.ts"
},
"type": "module",
"main": "./dist/mod.js",
"module": "./dist/mod.js",
"types": "./dist/mod.d.ts",
"files": [
"package.json",
"README.md",
"LICENSE",
"dist"
],
"license": "GPL-3.0-only",
"tsdown": {
"entry": "mod.ts",
"dts": true,
"format": [
"esm"
],
"splitting": false,
"clean": true
},
"jsrInclude": [
"mod.ts",
"src",
"README.md",
"LICENSE"
],
"jsrExclude": [
"src/**/*.test.ts"
],
"scripts": {
"preinstall": "npx only-allow pnpm",
"prepack": "cp ../../LICENSE ./LICENSE",
"clean": "rm -rf dist LICENSE",
"build:npm": "tsdown",
"publish:npm": "pnpm clean && pnpm build:npm && pnpm publish --access public --no-git-checks",
"prepare:jsr": "rm -rf dist && pnpm dlx pkg-to-jsr",
"publish:jsr": "pnpm run prepack && pnpm prepare:jsr && deno publish --allow-dirty --no-check"
},
"dependencies": {
"@meshtastic/core": "workspace:*",
"websocketstream-ponyfill": "^0.1.3"
}
}
113 changes: 113 additions & 0 deletions packages/transport-ws/src/transport.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
import { Types, Utils } from "@meshtastic/core";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import { runTransportContract } from "../../../tests/utils/transportContract.ts";
import { TransportWebSocket } from "./transport.ts";

function stubCoreTransforms() {
const toDevice = () =>
new TransformStream<Uint8Array, Uint8Array>({
transform(chunk, controller) {
controller.enqueue(chunk);
},
});

// maps raw bytes -> DeviceOutput.packet
const fromDeviceFactory = () =>
new TransformStream<Uint8Array, Types.DeviceOutput>({
transform(chunk, controller) {
controller.enqueue({ type: "packet", data: chunk });
},
});

const transform = Utils.toDeviceStream;
const restoreTo = vi
.spyOn(Utils, "toDeviceStream", "get")
.mockReturnValue(toDevice as unknown as typeof transform);

const restoreFrom = vi
.spyOn(Utils, "fromDeviceStream")
.mockImplementation(
() =>
fromDeviceFactory() as unknown as TransformStream<
Uint8Array,
Types.DeviceOutput
>,
);

return {
restore: () => {
restoreTo.mockRestore();
restoreFrom.mockRestore();
},
};
}

class FakeWebSocketStream {
readable: ReadableStream<Uint8Array>;
writable: WritableStream<Uint8Array>;
lastWritten?: Uint8Array;

private _readController!: ReadableStreamDefaultController<Uint8Array>;

constructor() {
this.readable = new ReadableStream<Uint8Array>({
start: (controller) => {
this._readController = controller;
},
});

this.writable = new WritableStream<Uint8Array>({
write: async (chunk) => {
this.lastWritten = chunk;
},
});
}

open(_options?: { baudRate?: number }): Promise<void> {
return Promise.resolve();
}

close(): Promise<void> {
try {
this._readController.close();
} catch { }
return Promise.resolve();
}

pushIncoming(bytes: Uint8Array) {
this._readController.enqueue(bytes);
}
}

describe("TransportWebSocket (contract)", () => {
let transforms: { restore(): void } | undefined;

beforeEach(() => {
transforms = stubCoreTransforms();
});

afterEach(() => {
transforms?.restore();
vi.restoreAllMocks();
});

runTransportContract({
name: "TransportWebSocket",
setup: () => { },
teardown: () => { },
create: async () => {
const fake = new FakeWebSocketStream();
const transport = new TransportWebSocket(fake as any, fake.readable, fake.writable);
(globalThis as any).__ws = { fake };
await Promise.resolve();
return transport;
},
pushIncoming: async (bytes) => {
(globalThis as any).__ws.fake.pushIncoming(bytes);
await Promise.resolve();
},
assertLastWritten: (bytes) => {
expect((globalThis as any).__ws.fake.lastWritten).toEqual(bytes);
},
});
});
174 changes: 174 additions & 0 deletions packages/transport-ws/src/transport.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
import { Types, Utils } from "@meshtastic/core";
import { WebSocketStream } from "websocketstream-ponyfill";

/**
* Provides Web Serial transport for Meshtastic devices.
*
* Implements the {@link Types.Transport} contract using the Web Serial API.
* Use {@link TransportWebSocket.createFromUrl}
* to construct an instance.
*/
export class TransportWebSocket implements Types.Transport {
private _toDevice: WritableStream<Uint8Array>;
private _fromDevice: ReadableStream<Types.DeviceOutput>;
private fromDeviceController?: ReadableStreamDefaultController<Types.DeviceOutput>;
private connection: WebSocketStream;
private pipePromise: Promise<void> | null = null;
private abortController: AbortController;
private portReadable: ReadableStream<Uint8Array>;

private lastStatus: Types.DeviceStatusEnum =
Types.DeviceStatusEnum.DeviceDisconnected;
private closingByUser = false;

/**
* Creates a new TransportWebSocket instance from an existing, provided {@link SerialPort}.
* Opens it if not already open.
*/
public static async createFromUrl(
url: URL,
): Promise<TransportWebSocket> {
const ws = new WebSocketStream(url.toString());
const { readable, writable } = await ws.opened;
return new TransportWebSocket(ws, readable, writable);
}

/**
* Constructs a transport around a given {@link SerialPort}.
* @throws If the port lacks readable or writable streams.
*/
constructor(connection: WebSocketStream, readable: ReadableStream, writable: WritableStream) {

if (!readable || !writable) {
throw new Error("Stream not accessible");
}

this.connection = connection;
this.portReadable = readable;
this.abortController = new AbortController();
const abortController = this.abortController;

// Set up the pipe with abort signal for clean cancellation
const toDeviceTransform = Utils.toDeviceStream();
this.pipePromise = toDeviceTransform.readable
.pipeTo(writable, { signal: this.abortController.signal })
.catch((err) => {
// Ignore expected rejection when we cancel it via the AbortController.
if (abortController.signal.aborted) {
return;
}
console.error("Error piping data to web socket:", err);
this.connection.close();
this.emitStatus(
Types.DeviceStatusEnum.DeviceDisconnected,
"write-error",
);
});

this._toDevice = toDeviceTransform.writable;

// Wrap + capture controller to inject status packets
this._fromDevice = new ReadableStream<Types.DeviceOutput>({
start: async (ctrl) => {
this.fromDeviceController = ctrl;

this.emitStatus(Types.DeviceStatusEnum.DeviceConnecting);

const transformed = this.portReadable.pipeThrough(
Utils.fromDeviceStream(),
);
const reader = transformed.getReader();

this.emitStatus(Types.DeviceStatusEnum.DeviceConnected);

try {
while (true) {
const { value, done } = await reader.read();
if (done) {
break;
}
ctrl.enqueue(value);
}
ctrl.close();
} catch (error) {
if (!this.closingByUser) {
this.emitStatus(
Types.DeviceStatusEnum.DeviceDisconnected,
"read-error",
);
}
ctrl.error(error instanceof Error ? error : new Error(String(error)));
try {
await transformed.cancel();
} catch { }
} finally {
reader.releaseLock();
}
},
});
}

/** Writable stream of bytes to the device. */
public get toDevice(): WritableStream<Uint8Array> {
return this._toDevice;
}

/** Readable stream of {@link Types.DeviceOutput} from the device. */
public get fromDevice(): ReadableStream<Types.DeviceOutput> {
return this._fromDevice;
}

private emitStatus(next: Types.DeviceStatusEnum, reason?: string): void {
if (next === this.lastStatus) {
return;
}
this.lastStatus = next;
this.fromDeviceController?.enqueue({
type: "status",
data: { status: next, reason },
});
}

/**
* Closes the serial port and emits `DeviceDisconnected("user")`.
*/
public async disconnect(): Promise<void> {
try {
this.closingByUser = true;

// Stop outbound piping
this.abortController.abort();
if (this.pipePromise) {
await this.pipePromise;
}

// Cancel any remaining streams
if (this._fromDevice?.locked) {
try {
await this._fromDevice.cancel();
} catch {
// Stream cancellation might fail if already cancelled
}
}

await this.connection.close();
} catch (error) {
// If we can't close cleanly, let the browser handle cleanup
console.warn("Could not cleanly disconnect web socket:", error);
} finally {
this.emitStatus(Types.DeviceStatusEnum.DeviceDisconnected, "user");
this.closingByUser = false;
}
}

/**
* Reconnects the transport by creating a new AbortController and re-establishing
* the pipe connection. Only call this after disconnect() or if the connection failed.
*/
public async reconnect() {
this.emitStatus(
Types.DeviceStatusEnum.DeviceDisconnected,
"reconnect-failed",
);
}
}
Loading