From 1ec3bd540331c03c3828cbb1a49509421fe9cf03 Mon Sep 17 00:00:00 2001 From: Taea Vogel Date: Thu, 19 Dec 2024 12:31:20 -0500 Subject: [PATCH] wip additions --- packages/backend/package.json | 48 ++- .../backend/src/nest/libp2p/libp2p.auth.ts | 408 ++++++++++++++++++ .../backend/src/nest/libp2p/libp2p.module.ts | 2 + .../backend/src/nest/libp2p/libp2p.service.ts | 4 +- .../backend/src/nest/libp2p/libp2p.types.ts | 39 ++ 5 files changed, 478 insertions(+), 23 deletions(-) create mode 100644 packages/backend/src/nest/libp2p/libp2p.auth.ts diff --git a/packages/backend/package.json b/packages/backend/package.json index f2a2b96482..7085b3ebb5 100644 --- a/packages/backend/package.json +++ b/packages/backend/package.json @@ -78,8 +78,8 @@ "jest": "^29.4.2", "lint-staged": "^15.2.2", "mock-fs": "^5.1.2", - "tmp": "^0.2.1", "pvutils": "^1.1.3", + "tmp": "^0.2.1", "ts-jest": "^29.0.3", "ts-loader": "9.4.2", "ts-node": "10.9.1", @@ -93,11 +93,32 @@ "@chainsafe/libp2p-gossipsub": "13.2.0", "@chainsafe/libp2p-noise": "13.0.2", "@chainsafe/libp2p-yamux": "6.0.2", + "@helia/block-brokers": "2.0.3", + "@helia/unixfs": "3.0.1", + "@ipld/dag-cbor": "^9.2.1", + "@ipld/dag-pb": "^4.1.2", + "@libp2p/crypto": "^3.0.4", + "@libp2p/echo": "^1.1.1", + "@libp2p/identify": "^2.1.5", + "@libp2p/interface": "^1.7.0", + "@libp2p/interface-internal": "^2.2.1", + "@libp2p/kad-dht": "^12.1.5", + "@libp2p/keychain": "4.1.6", + "@libp2p/mplex": "^10.1.5", + "@libp2p/peer-id": "^4.2.4", + "@libp2p/peer-id-factory": "^4.2.4", + "@libp2p/ping": "1.1.6", + "@libp2p/pnet": "1.0.0-3c8dd5bbf", + "@libp2p/utils": "^5.4.9", + "@libp2p/websockets": "^8.2.0", "@localfirst/auth": "file:../../3rd-party/auth/packages/auth/dist", "@localfirst/crdx": "file:../../3rd-party/auth/packages/crdx/dist", + "@multiformats/multiaddr": "^12.3.0", + "@multiformats/multiaddr-to-uri": "^10.1.0", "@nestjs/common": "^10.2.10", "@nestjs/core": "^10.2.10", "@nestjs/platform-express": "^10.2.10", + "@orbitdb/core": "^2.2.0", "@paralleldrive/cuid2": "^2.2.2", "@peculiar/webcrypto": "1.4.3", "@quiet/common": "^2.0.2-alpha.1", @@ -123,42 +144,24 @@ "fastq": "^1.17.1", "fetch-retry": "^6.0.0", "get-port": "^5.1.1", - "helia": "4.0.2", - "@helia/unixfs": "3.0.1", - "@helia/block-brokers": "2.0.3", "go-ipfs": "npm:mocked-go-ipfs@0.17.0", + "helia": "4.0.2", "http-server": "^0.12.3", "https-proxy-agent": "^7.0.5", "image-size": "^1.0.1", "interface-datastore": "8.3.1", - "@ipld/dag-cbor": "^9.2.1", - "@ipld/dag-pb": "^4.1.2", "it-drain": "^3.0.7", "it-first": "^3.0.6", + "it-length-prefixed": "^9.1.0", "it-pipe": "^3.0.1", + "it-pushable": "^3.2.3", "it-ws": "^6.1.5", "joi": "^17.8.1", "level": "^8.0.1", "libp2p": "^1.7.0", - "@libp2p/crypto": "^3.0.4", - "@libp2p/echo": "^1.1.1", - "@libp2p/identify": "^2.1.5", - "@libp2p/interface": "^1.7.0", - "@libp2p/kad-dht": "^12.1.5", - "@libp2p/keychain": "4.1.6", - "@libp2p/mplex": "^10.1.5", - "@libp2p/peer-id": "^4.2.4", - "@libp2p/peer-id-factory": "^4.2.4", - "@libp2p/ping": "1.1.6", - "@libp2p/pnet": "1.0.0-3c8dd5bbf", - "@libp2p/utils": "^5.4.9", - "@libp2p/websockets": "^8.2.0", "luxon": "^3.4.4", "multiformats": "13.1.0", - "@multiformats/multiaddr": "^12.3.0", - "@multiformats/multiaddr-to-uri": "^10.1.0", "node-fetch": "^3.3.0", - "@orbitdb/core": "^2.2.0", "p-defer": "^4.0.1", "p-queue": "7.3.4", "pkijs": "3.0.15", @@ -171,6 +174,7 @@ "socket.io-client": "^4.7.5", "string-replace-loader": "3.1.0", "ts-jest-resolver": "^2.0.0", + "uint8arraylist": "^2.4.8", "uint8arrays": "^5.1.0", "utf-8-validate": "^5.0.2", "validator": "^13.11.0" diff --git a/packages/backend/src/nest/libp2p/libp2p.auth.ts b/packages/backend/src/nest/libp2p/libp2p.auth.ts new file mode 100644 index 0000000000..ccdd151803 --- /dev/null +++ b/packages/backend/src/nest/libp2p/libp2p.auth.ts @@ -0,0 +1,408 @@ +// ISLA: This is what needs to be added to Quiet to make it work Libp2p and LFA work together +import { ComponentLogger, Connection, PeerId, PeerStore, Stream, Topology } from '@libp2p/interface' +import type { ConnectionManager, IncomingStreamData, Registrar } from '@libp2p/interface-internal' +import * as Auth from '@localfirst/auth' +import { pushable, type Pushable } from 'it-pushable' +import { Uint8ArrayList } from 'uint8arraylist' +import { pipe } from 'it-pipe' +import { encode, decode } from 'it-length-prefixed' + +import { SigChainService } from '../auth/sigchain.service.js' +import { SigChain } from '../auth/sigchain.js' +import { AuthEvents, QuietAuthEvents } from './libp2p.types.js' +import { createLogger } from '../common/logger.js' +import { createQuietLogger } from '@quiet/logger' + +export interface Libp2pAuthComponents { + peerId: PeerId + peerStore: PeerStore + registrar: Registrar + connectionManager: ConnectionManager + logger: ComponentLogger +} + +interface PushableStream { + stream: Stream + pushable: Pushable +} + +enum JoinStatus { + PENDING = 'PENDING', + JOINING = 'JOINING', + JOINED = 'JOINED', +} + +const createLFALogger = createQuietLogger('localfirst:') + +// Implementing local-first-auth as a service just to get started. I think we +// likely want to integrate it in a custom Transport/Muxer. +export class Libp2pAuth { + private readonly protocol: string + private readonly components: Libp2pAuthComponents + private sigChainService: SigChainService + private authConnections: Record + private outboundStreamQueue: Pushable<{ peerId: PeerId; connection: Connection }> + private outboundStreams: Record + private inboundStreams: Record + private restartableAuthConnections: Map + private bufferedConnections: { peerId: PeerId; connection: Connection }[] + private events: QuietAuthEvents + private peerId: PeerId + private joining: boolean = false + private restartInterval: NodeJS.Timeout + private unblockInterval: NodeJS.Timeout + private joinStatus: JoinStatus + private logger: ReturnType + private authContext: Auth.Context + + constructor( + peerId: PeerId, + sigChainService: SigChainService, + components: Libp2pAuthComponents, + events: QuietAuthEvents + ) { + this.protocol = '/local-first-auth/1.0.0' + this.peerId = peerId + this.components = components + this.sigChainService = sigChainService + this.authConnections = {} + this.restartableAuthConnections = new Map() + this.outboundStreamQueue = pushable<{ peerId: PeerId; connection: Connection }>({ objectMode: true }) + this.outboundStreams = {} + this.inboundStreams = {} + this.bufferedConnections = [] + this.joinStatus = JoinStatus.PENDING + this.events = events + this.logger = createLogger(`libp2p:auth:${peerId}`) + + pipe(this.outboundStreamQueue, async source => { + for await (const { peerId, connection } of source) { + await this.openOutboundStream(peerId, connection) + } + }).catch(e => { + this.logger.error('Outbound stream queue error', e) + }) + + this.restartInterval = setInterval( + this.restartStoppedConnections, + 45_000, + this.restartableAuthConnections, + this.logger + ) + this.unblockInterval = setInterval( + this.unblockConnections, + 5_000, + this.bufferedConnections, + this.joinStatus, + this.logger + ) + } + + private restartStoppedConnections(restartableAuthConnections: Map) { + this.logger.info(`Attempting to restart stopped auth connections`) + for (const [ms, connection] of restartableAuthConnections.entries()) { + if (ms >= Date.now()) { + connection.start() + restartableAuthConnections.delete(ms) + } + } + } + + private async unblockConnections( + conns: { peerId: PeerId; connection: Connection }[], + status: JoinStatus, + logger: ReturnType + ) { + if (status !== JoinStatus.JOINED) return + + logger.info(`Unblocking ${conns.length} connections now that we've joined the chain`) + while (conns.length > 0) { + const conn = conns.pop() + if (conn != null) { + await this.onPeerConnected(conn.peerId, conn.connection) + } + } + } + + async start() { + this.logger.info('Auth service starting') + + const topology: Topology = { + onConnect: this.onPeerConnected.bind(this), + onDisconnect: this.onPeerDisconnected.bind(this), + notifyOnTransient: false, + } + + const registrar = this.components.registrar + await registrar.register(this.protocol, topology) + await registrar.handle(this.protocol, this.onIncomingStream.bind(this), { + runOnTransientConnection: false, + }) + } + + async stop() { + // TODO + } + + private async openOutboundStream(peerId: PeerId, connection: Connection) { + if (peerId.toString() in this.outboundStreams) { + return + } + + this.logger.info('Opening outbound stream for peer', peerId.toString()) + const outboundStream = await connection.newStream(this.protocol, { + runOnTransientConnection: false, + negotiateFully: true, + }) + const outboundPushable: Pushable = pushable() + this.outboundStreams[peerId.toString()] = { + stream: outboundStream, + pushable: outboundPushable, + } + + pipe(outboundPushable, outboundStream).catch((e: Error) => + this.logger.error(`Error opening outbound stream to ${peerId}`, e) + ) + + if (connection.direction === 'outbound') { + await this.openInboundStream(peerId, connection) + } + + this.authConnections[peerId.toString()].start() + } + + private async openInboundStream(peerId: PeerId, connection: Connection) { + if (peerId.toString() in this.inboundStreams) { + return + } + + this.logger.info('Opening new inbound stream for peer', peerId.toString()) + const inboundStream = await connection.newStream(this.protocol, { + runOnTransientConnection: false, + negotiateFully: true, + }) + + this.handleIncomingMessages(peerId, inboundStream) + this.inboundStreams[peerId.toString()] = inboundStream + } + + private async onIncomingStream({ stream, connection }: IncomingStreamData) { + const peerId = connection.remotePeer + this.logger.info(`Handling existing incoming stream ${peerId.toString()}`) + + const oldStream = this.inboundStreams[peerId.toString()] + if (oldStream) { + this.logger.info(`Old inbound stream found!`) + await this.closeInboundStream(peerId, true) + } + + this.handleIncomingMessages(peerId, stream) + + this.inboundStreams[peerId.toString()] = stream + } + + private handleIncomingMessages(peerId: PeerId, stream: Stream) { + pipe( + stream, + source => decode(source), + async source => { + for await (const data of source) { + try { + if (!(peerId.toString() in this.authConnections)) { + this.logger.error(`No auth connection established for ${peerId.toString()}`) + } else { + this.authConnections[peerId.toString()].deliver(data.subarray()) + } + } catch (e) { + this.logger.error(`Error while delivering message to ${peerId}`, e) + } + } + } + ) + } + + private sendMessage(peerId: PeerId, message: Uint8Array) { + try { + this.outboundStreams[peerId.toString()]?.pushable.push( + // length-prefix encoded + encode.single(message) + ) + } catch (e) { + this.logger.error(`Error while sending auth message over stream to ${peerId.toString()}`, e) + } + } + + // NOTE: This is not awaited by the registrar + private async onPeerConnected(peerId: PeerId, connection: Connection) { + if (this.joinStatus === JoinStatus.JOINING) { + this.logger.warn(`Connection to ${peerId.toString()} will be buffered due to a concurrent join`) + this.bufferedConnections.push({ peerId, connection }) + return + } + + if (this.joinStatus === JoinStatus.PENDING) { + this.joinStatus = JoinStatus.JOINING + } + + this.logger.info(`Peer connected (direction = ${connection.direction})!`) + + // https://github.com/ChainSafe/js-libp2p-gossipsub/issues/398 + if (connection.status !== 'open') { + this.logger.warn(`The connection with ${peerId.toString()} was not in an open state!`) + return + } + + const context = this.authContext + this.logger.info( + `Context with ${peerId.toString()} is a member context?: ${(context as Auth.InviteeMemberContext).invitationSeed == null}` + ) + if (!context) { + throw new Error('Auth context required to connect to peer') + } + + if (peerId.toString() in this.authConnections) { + this.logger.info( + `A connection with ${peerId.toString()} was already available, skipping connection initialization!` + ) + return + } + + const authConnection = new Auth.Connection({ + context, + sendMessage: (message: Uint8Array) => { + this.sendMessage(peerId, message) + }, + createLogger: createLFALogger, + }) + + // TODO: Listen for updates to context and update context in storage + authConnection.on('joined', payload => { + const { team, user, teamKeyring } = payload + const sigChain = SigChain.createFromTeam(team, context) + + this.logger.info(`${sigChain.context.user.userId}: Joined team ${team.teamName} (userid: ${user.userId})!`) + if (this.sigChainService.getActiveChain() == null && !this.joining) { + this.joining = true + this.logger.info( + `${user.userId}: Creating SigChain for user with name ${user.userName} and team name ${team.teamName}` + ) + const context = sigChain.context + this.storage.setSigChain(SigChain.createFromTeam(team, context).sigChain) + this.logger.info(`${user.userId}: Updating auth context`) + this.storage.setAuthContext({ + user: context.user, + device: context.device, + team, + }) + this.joining = false + } + if (this.joinStatus === JoinStatus.JOINING) { + this.joinStatus = JoinStatus.JOINED + this.unblockConnections(this.bufferedConnections, this.joinStatus, this.logger) + } + this.events.emit(AuthEvents.INITIALIZED_CHAIN) + }) + + const handleAuthConnErrors = (error: Auth.ConnectionErrorPayload, remoteUsername: string | undefined) => { + this.logger.error(`Got an error while handling auth connection with ${remoteUsername}`, JSON.stringify(error)) + if (error.type === 'TIMEOUT') { + this.events.emit(AuthEvents.AUTH_TIMEOUT, { peerId, remoteUsername }) + } else if (error.type === 'DEVICE_UNKNOWN') { + this.events.emit(AuthEvents.MISSING_DEVICE, { peerId, remoteUsername }) + } + } + + authConnection.on('localError', error => { + handleAuthConnErrors(error, authConnection._context.userName) + }) + + authConnection.on('remoteError', error => { + handleAuthConnErrors(error, authConnection._context.userName) + }) + + authConnection.on('connected', () => { + this.logger.info(`LFA Connected!`) + if (this.authContext != null) { + this.logger.debug(`Sending sync message because our chain is intialized`) + const team = this.storage.getSigChain()!.team + const user = this.storage.getContext()!.user + authConnection.emit('sync', { team, user }) + } + }) + + authConnection.on('disconnected', event => { + this.logger.info(`LFA Disconnected!`, event) + authConnection.stop() + this.restartableAuthConnections.set(Date.now() + 30_000, authConnection) + }) + + this.authConnections[peerId.toString()] = authConnection + + this.outboundStreamQueue.push({ peerId, connection }) + } + + private async onPeerDisconnected(peerId: PeerId) { + this.logger.warn(`Disconnecting auth connection with peer ${peerId.toString()}`) + await this.closeAuthConnection(peerId) + } + + private async closeOutboundStream(peerId: PeerId, deleteRecord?: boolean) { + this.logger.warn(`Closing outbound stream with ${peerId.toString()}`) + const outboundStream = this.outboundStreams[peerId.toString()] + + if (outboundStream == null) { + this.logger.warn(`Can't close outbound stream with ${peerId.toString()} as it doesn't exist`) + return + } + + await outboundStream.pushable.end().onEmpty() + await outboundStream.stream.close().catch(e => { + outboundStream.stream.abort(e) + }) + + if (deleteRecord) { + delete this.outboundStreams[peerId.toString()] + } + } + + private async closeInboundStream(peerId: PeerId, deleteRecord?: boolean) { + this.logger.warn(`Closing inbound stream with ${peerId.toString()}`) + const inboundStream = this.inboundStreams[peerId.toString()] + + if (inboundStream == null) { + this.logger.warn(`Can't close inbound stream with ${peerId.toString()} as it doesn't exist`) + return + } + + await inboundStream.close().catch(e => { + inboundStream.abort(e) + }) + + if (deleteRecord) { + delete this.inboundStreams[peerId.toString()] + } + } + + private async closeAuthConnection(peerId: PeerId) { + this.logger.warn(`Closing auth connection with ${peerId.toString()}`) + const connection = this.authConnections[peerId.toString()] + + if (connection == null) { + this.logger.warn(`Can't close auth connection with ${peerId.toString()} as it doesn't exist`) + } else { + connection.stop() + delete this.authConnections[peerId.toString()] + } + + await this.closeOutboundStream(peerId, true) + await this.closeInboundStream(peerId, true) + } +} + +export const libp2pAuth = ( + peerId: PeerId, + sigChainService: SigChainService, + events: QuietAuthEvents +): ((components: Libp2pAuthComponents) => Libp2pAuth) => { + return (components: Libp2pAuthComponents) => new Libp2pAuth(peerId, sigChainService, components, events) +} diff --git a/packages/backend/src/nest/libp2p/libp2p.module.ts b/packages/backend/src/nest/libp2p/libp2p.module.ts index e010bb8b55..b91a80eb44 100644 --- a/packages/backend/src/nest/libp2p/libp2p.module.ts +++ b/packages/backend/src/nest/libp2p/libp2p.module.ts @@ -1,8 +1,10 @@ import { Module } from '@nestjs/common' import { Libp2pService } from './libp2p.service' import { ProcessInChunksService } from './process-in-chunks.service' +import { SigChainService } from '../auth/sigchain.service' @Module({ + imports: [SigChainService], providers: [Libp2pService, ProcessInChunksService], exports: [Libp2pService], }) diff --git a/packages/backend/src/nest/libp2p/libp2p.service.ts b/packages/backend/src/nest/libp2p/libp2p.service.ts index 986f9d8bc8..43e18e7176 100644 --- a/packages/backend/src/nest/libp2p/libp2p.service.ts +++ b/packages/backend/src/nest/libp2p/libp2p.service.ts @@ -29,9 +29,10 @@ import { getUsersAddresses } from '../common/utils' import { LIBP2P_DB_PATH, SERVER_IO_PROVIDER, SOCKS_PROXY_AGENT } from '../const' import { ServerIoProviderTypes } from '../types' import { webSockets } from '../websocketOverTor' -import { Libp2pConnectedPeer, Libp2pEvents, Libp2pNodeParams, Libp2pPeerInfo } from './libp2p.types' +import { Libp2pConnectedPeer, Libp2pEvents, Libp2pNodeParams, Libp2pPeerInfo, QuietAuthEvents } from './libp2p.types' import { createLogger } from '../common/logger' import { Libp2pDatastore } from './libp2p.datastore' +import { libp2pAuth } from './libp2p.auth' const KEY_LENGTH = 32 export const LIBP2P_PSK_METADATA = '/key/swarm/psk/1.0.0/\n/base16/\n' @@ -278,6 +279,7 @@ export class Libp2pService extends EventEmitter { pubsub: gossipsub({ // neccessary to run a single peer allowPublishToZeroTopicPeers: true, + debugName: params.peerId.toString(), fallbackToFloodsub: true, emitSelf: true, }), diff --git a/packages/backend/src/nest/libp2p/libp2p.types.ts b/packages/backend/src/nest/libp2p/libp2p.types.ts index 6626307486..bc35c184b8 100644 --- a/packages/backend/src/nest/libp2p/libp2p.types.ts +++ b/packages/backend/src/nest/libp2p/libp2p.types.ts @@ -1,5 +1,7 @@ import { PeerId } from '@libp2p/interface' import { Agent } from 'http' +import { EventEmitter } from 'stream' +import { createLogger } from '../common/logger' export enum Libp2pEvents { PEER_CONNECTED = 'peerConnected', @@ -30,3 +32,40 @@ export type Libp2pDatastoreOptions = { inMemory: boolean datastorePath?: string } + +export enum AuthEvents { + INITIALIZED_CHAIN = 'INITIALIZED_CHAIN', + DIAL_FINISHED = 'DIAL_FINISHED', + AUTH_TIMEOUT = 'AUTH_TIMEOUT', + MISSING_DEVICE = 'MISSING_DEVICE', +} +export class QuietAuthEvents { + private _events: EventEmitter + private _LOGGER: ReturnType + + constructor(identifier: string) { + this._events = new EventEmitter() + this._LOGGER = createLogger(`quietAuthEvents:${identifier}`) + } + + public emit(event: AuthEvents, ...args: any[]) { + this._LOGGER.debug(`emit ${event}`) + this._events.emit(event, ...args) + } + + public on(event: AuthEvents, listener: (...args: any[]) => void) { + this._events.on( + event, + // this.appendLogToListener(event, listener) + listener + ) + } + + public once(event: AuthEvents, listener: (...args: any[]) => void) { + this._events.once( + event, + // this.appendLogToListener(event, listener) + listener + ) + } +}