Skip to content

Commit

Permalink
feat: functional net
Browse files Browse the repository at this point in the history
  • Loading branch information
indietyp committed Dec 1, 2024
1 parent 0066bef commit fa58de8
Show file tree
Hide file tree
Showing 7 changed files with 124 additions and 43 deletions.
24 changes: 20 additions & 4 deletions libs/@local/harpc/client/typescript/src/net/Client.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { Effect } from "effect";
import { Effect, Function } from "effect";

import { createProto } from "../utils.js";
import type * as Connection from "./Connection.js";
import * as Connection from "./Connection.js";
import * as internalTransport from "./internal/transport.js";
import type * as Transport from "./Transport.js";

Expand All @@ -19,9 +19,10 @@ export interface Client {

interface ClientImpl extends Client {
readonly client: internalTransport.Transport;
readonly config?: ClientConfig;
}

const ClientProto: Omit<ClientImpl, "client"> = {
const ClientProto: Omit<ClientImpl, "client" | "config"> = {
[TypeId]: TypeId,
};

Expand All @@ -30,5 +31,20 @@ export const make = (config?: ClientConfig) =>
Effect.gen(function* () {
const client = yield* internalTransport.make(config?.transport);

return createProto(ClientProto, { client }) satisfies ClientImpl as Client;
return createProto(ClientProto, {
client,
config,
}) satisfies ClientImpl as Client;
});

export const connect: {
(
address: Transport.Address,
): (self: Client) => Effect.Effect<Connection.Connection>;
(
self: Client,
address: Transport.Address,
): Effect.Effect<Connection.Connection>;
} = Function.dual(2, (self: ClientImpl, address: Transport.Address) =>
Connection.makeUnchecked(self.client, self.config?.connection ?? {}, address),
);
74 changes: 52 additions & 22 deletions libs/@local/harpc/client/typescript/src/net/Connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import {
Deferred,
Duration,
Effect,
Function,
MutableHashMap,
Option,
pipe,
Expand All @@ -14,14 +15,17 @@ import {
Stream,
} from "effect";

import { Buffer, RequestIdProducer } from "../wire-protocol/index.js";
import { createProto } from "../utils.js";
import { Buffer } from "../wire-protocol/index.js";
import type { RequestId } from "../wire-protocol/models/request/index.js";
import { Request as WireRequest } from "../wire-protocol/models/request/index.js";
import type { Response as WireResponse } from "../wire-protocol/models/response/index.js";
import { ResponseFlags } from "../wire-protocol/models/response/index.js";
import { ResponseFromBytesStream } from "../wire-protocol/stream/index.js";
import type { IncompleteResponseError } from "../wire-protocol/stream/ResponseFromBytesStream.js";
import type * as internalTransport from "./internal/transport.js";
import type * as Request from "./Request.js";
import * as Transaction from "./Transaction.js";

const TypeId: unique symbol = Symbol("@local/harpc-client/net/Connection");
export type TypeId = typeof TypeId;
Expand Down Expand Up @@ -74,22 +78,29 @@ export interface Connection {
[TypeId]: TypeId;
}

interface TransactionTask {
interface TransactionContext {
queue: Queue.Enqueue<WireResponse.Response>;
drop: Effect.Effect<void>;
}

interface ConnectionImpl extends Connection {
readonly transactions: MutableHashMap.MutableHashMap<
RequestId.RequestId,
TransactionTask
TransactionContext
>;

readonly duplex: ConnectionDuplex;

readonly config: ConnectionConfig;
}

const ConnectionProto: Omit<
ConnectionImpl,
"transactions" | "duplex" | "config"
> = {
[TypeId]: TypeId,
};

const makeSink = (connection: ConnectionImpl) =>
// eslint-disable-next-line unicorn/no-array-for-each
Sink.forEach((response: WireResponse.Response) =>
Expand Down Expand Up @@ -134,7 +145,6 @@ const makeSink = (connection: ConnectionImpl) =>
}).pipe(Effect.annotateLogs({ id: response.header.requestId })),
);

// TODO: get remove so we can close the transaction, transaction can register an effect that's run when it's closed / dropped
const wrapDrop = (
connection: ConnectionImpl,
id: RequestId.RequestId,
Expand Down Expand Up @@ -218,27 +228,47 @@ export const makeUnchecked = (

const duplex = { read: readStream, write: writeSink } as ConnectionDuplex;

// TODO: task
});

export const send = <E, R>(
connection: Connection,
request: Request.Request<E, R>,
) =>
Effect.gen(function* () {
const impl = connection as ConnectionImpl;
const self: ConnectionImpl = createProto(ConnectionProto, {
transactions: MutableHashMap.empty<
RequestId.RequestId,
TransactionContext
>(),
duplex,
config,
});

const deferredDrop = yield* Deferred.make<void>();
// TODO: we might want to observe the task, for that we would need to have a partial connection that we then patch
yield* Effect.fork(task(self));

const queue = yield* Queue.bounded(impl.config.responseBufferSize ?? 16);
return self as Connection;
});

const transaction = (connection: ConnectionImpl) =>
Effect.gen(function* () {
const producer = yield* RequestIdProducer.RequestIdProducer;
export const send: {
<E, R>(
request: Request.Request<E, R>,
): (self: Connection) => Effect.Effect<Transaction.Transaction>;
<E, R>(
self: Connection,
request: Request.Request<E, R>,
): Effect.Effect<Transaction.Transaction>;
} = Function.dual(
2,
<E, R>(self: ConnectionImpl, request: Request.Request<E, R>) =>
Effect.gen(function* () {
const deferredDrop = yield* Deferred.make<void>();
const drop = wrapDrop(self, request.id, deferredDrop);

const id = yield* RequestIdProducer.next(producer);
});
const queue = yield* Queue.bounded<WireResponse.Response>(
self.config.responseBufferSize ?? 16,
);

const transactionContext: TransactionContext = {
queue,
drop,
};

MutableHashMap.set(self.transactions, request.id, transactionContext);

// TODO: Client that opens a stream to a server, then returns a connection, that connection can then be used to open a transaction!
// TODO: cleanup - what if we finish the stream, we need to drop to close the connection (that we do over scope on make!)
return Transaction.makeUnchecked(request.id, queue, deferredDrop);
}),
);
21 changes: 17 additions & 4 deletions libs/@local/harpc/client/typescript/src/net/Request.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import {
ProtocolVersion,
} from "../wire-protocol/models/index.js";
import type { PayloadTooLargeError } from "../wire-protocol/models/Payload.js";
import type { RequestId } from "../wire-protocol/models/request/index.js";
import {
Request,
RequestBegin,
Expand All @@ -27,6 +28,8 @@ export type TypeId = typeof TypeId;
export interface Request<E, R> {
readonly [TypeId]: TypeId;

readonly id: RequestId.RequestId;

readonly subsystem: SubsystemDescriptor.SubsystemDescriptor;
readonly procedure: ProcedureDescriptor.ProcedureDescriptor;

Expand All @@ -35,7 +38,7 @@ export interface Request<E, R> {

const RequestProto: Omit<
Request<unknown, unknown>,
"subsystem" | "procedure" | "body"
"id" | "subsystem" | "procedure" | "body"
> = {
[TypeId]: TypeId,
};
Expand All @@ -45,7 +48,18 @@ export const make = <E, R>(
procedure: ProcedureDescriptor.ProcedureDescriptor,

body: Stream.Stream<ArrayBuffer, E, R>,
): Request<E, R> => createProto(RequestProto, { subsystem, procedure, body });
) =>
Effect.gen(function* () {
const producer = yield* RequestIdProducer.RequestIdProducer;
const id = yield* RequestIdProducer.next(producer);

return createProto(RequestProto, {
id,
subsystem,
procedure,
body,
}) as Request<E, R>;
});

interface Scratch {
buffer: ArrayBuffer;
Expand Down Expand Up @@ -149,8 +163,7 @@ export interface EncodeOptions {

const encodeImpl = <E, R>(self: Request<E, R>, options?: EncodeOptions) =>
Effect.gen(function* () {
const requestIdProducer = yield* RequestIdProducer.RequestIdProducer;
const requestId = yield* RequestIdProducer.next(requestIdProducer);
const requestId = self.id;

const nonEmpty = yield* Ref.make(false);

Expand Down
36 changes: 24 additions & 12 deletions libs/@local/harpc/client/typescript/src/net/Transaction.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import type { Queue } from "effect";
import { Deferred, Effect, Streamable } from "effect";
import type { Effect, Queue } from "effect";
import { Deferred, Function } from "effect";

import { createProto } from "../utils.js";
import type { RequestId } from "../wire-protocol/models/request/index.js";
import type { Response as WireResponse } from "../wire-protocol/models/response/index.js";
import * as Response from "./Response.js";
Expand All @@ -9,7 +10,7 @@ const TypeId: unique symbol = Symbol("@local/harpc-client/net/Transaction");
export type TypeId = typeof TypeId;

export interface Transaction {
[TypeId]: TypeId;
readonly [TypeId]: TypeId;

readonly id: RequestId.RequestId;
}
Expand All @@ -20,15 +21,26 @@ interface TransactionImpl extends Transaction {
readonly drop: Deferred.Deferred<void>;
}

export const onDrop = (
transaction: Transaction,
execute: Effect.Effect<void>,
) =>
Effect.gen(function* () {
const impl = transaction as TransactionImpl;

return yield* Deferred.completeWith(impl.drop, execute);
});
const TransactionProto: Omit<TransactionImpl, "id" | "read" | "drop"> = {
[TypeId]: TypeId,
};

/** @internal */
export const makeUnchecked = (
id: RequestId.RequestId,
read: Queue.Dequeue<WireResponse.Response>,
drop: Deferred.Deferred<void>,
): Transaction =>
createProto(TransactionProto, { id, read, drop }) as Transaction;

export const registerDestructor: {
(
destructor: Effect.Effect<void>,
): (self: Transaction) => Effect.Effect<boolean>;
(self: Transaction, destructor: Effect.Effect<void>): Effect.Effect<boolean>;
} = Function.dual(2, (self: TransactionImpl, destructor: Effect.Effect<void>) =>
Deferred.completeWith(self.drop, destructor),
);

export const read = (transaction: Transaction) =>
Response.decode((transaction as TransactionImpl).read);
4 changes: 4 additions & 0 deletions libs/@local/harpc/client/typescript/src/net/Transport.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import type { PeerId } from "@libp2p/interface";
import type { Multiaddr } from "@multiformats/multiaddr";
import { Data } from "effect";

import type { NoiseConfig, TCPConfig, YamuxConfig } from "./Config.js";
Expand All @@ -10,6 +12,8 @@ export class InitializationError extends Data.TaggedError(
}
}

export type Address = PeerId | Multiaddr | Multiaddr[];

export interface TransportConfig {
tcp?: TCPConfig;
yamux?: YamuxConfig;
Expand Down
6 changes: 6 additions & 0 deletions libs/@local/harpc/client/typescript/src/net/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
/* eslint-disable canonical/filename-no-index */

export * as Client from "./Client.js";
export * as Config from "./Config.js";
export * as Connection from "./Connection.js";
export * as NetworkLogger from "./NetworkLogger.js";
export * as Request from "./Request.js";
export * as Response from "./Response.js";
export * as Transaction from "./Transaction.js";
export * as Transport from "./Transport.js";
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import {

const makeRequest = <E, R>(stream: Stream.Stream<ArrayBuffer, E, R>) =>
Effect.gen(function* () {
return Request.make(
return yield* Request.make(
SubsystemDescriptor.make(
yield* SubsystemId.make(0x00),
Version.make(0x00, 0x00),
Expand Down

0 comments on commit fa58de8

Please sign in to comment.