From 20b0a8eddf3e93b548fa314c87c8607892d44a81 Mon Sep 17 00:00:00 2001 From: taea Date: Thu, 9 Jan 2025 19:41:04 -0500 Subject: [PATCH] working invite acceptance and initial sync --- packages/backend/package-lock.json | 38 + packages/backend/package.json | 1 + .../backend/src/nest/auth/sigchain.service.ts | 15 +- packages/backend/src/nest/auth/sigchain.ts | 9 +- .../connections-manager.service.ts | 29 +- .../backend/src/nest/libp2p/libp2p.auth.ts | 698 +++++++++--------- .../backend/src/nest/libp2p/libp2p.service.ts | 11 +- .../backend/src/nest/libp2p/libp2p.types.ts | 4 +- .../src/nest/local-db/local-db.service.ts | 4 +- packages/desktop/src/main/main.ts | 2 +- packages/logger/src/index.ts | 4 + .../appConnection/invite/createInvite.saga.ts | 1 + 12 files changed, 428 insertions(+), 388 deletions(-) diff --git a/packages/backend/package-lock.json b/packages/backend/package-lock.json index b82c8d6e0..85b2ac2f8 100644 --- a/packages/backend/package-lock.json +++ b/packages/backend/package-lock.json @@ -45,6 +45,7 @@ "blockstore-fs": "^2.0.2", "blockstore-level": "^2.0.1", "bs58": "^6.0.0", + "bufferutil": "^4.0.9", "class-transformer": "^0.5.1", "class-validator": "^0.14.1", "cli-table": "^0.3.6", @@ -11821,6 +11822,28 @@ "resolved": "https://registry.npmjs.org/interface-store/-/interface-store-6.0.2.tgz", "integrity": "sha512-KSFCXtBlNoG0hzwNa0RmhHtrdhzexp+S+UY2s0rWTBJyfdEIgn6i6Zl9otVqrcFYbYrneBT7hbmHQ8gE0C3umA==" }, + "node_modules/bufferutil": { + "version": "4.0.9", + "resolved": "https://registry.npmjs.org/bufferutil/-/bufferutil-4.0.9.tgz", + "integrity": "sha512-WDtdLmJvAuNNPzByAYpRo2rF1Mmradw6gvWsQKf63476DDXmomT9zUiGypLcG4ibIM67vhAj8jJRdbmEws2Aqw==", + "hasInstallScript": true, + "dependencies": { + "node-gyp-build": "^4.3.0" + }, + "engines": { + "node": ">=6.14.2" + } + }, + "node_modules/bufferutil/node_modules/node-gyp-build": { + "version": "4.8.4", + "resolved": "https://registry.npmjs.org/node-gyp-build/-/node-gyp-build-4.8.4.tgz", + "integrity": "sha512-LA4ZjwlnUblHVgq0oBF3Jl/6h/Nvs5fzBLwdEF4nuxnFdsfajde4WfxtJr3CaiH+F6ewcIB/q4jQ4UzPyid+CQ==", + "bin": { + "node-gyp-build": "bin.js", + "node-gyp-build-optional": "optional.js", + "node-gyp-build-test": "build-test.js" + } + }, "node_modules/class-transformer": { "version": "0.5.1", "resolved": "https://registry.npmjs.org/class-transformer/-/class-transformer-0.5.1.tgz", @@ -30451,6 +30474,21 @@ } } }, + "bufferutil": { + "version": "4.0.9", + "resolved": "https://registry.npmjs.org/bufferutil/-/bufferutil-4.0.9.tgz", + "integrity": "sha512-WDtdLmJvAuNNPzByAYpRo2rF1Mmradw6gvWsQKf63476DDXmomT9zUiGypLcG4ibIM67vhAj8jJRdbmEws2Aqw==", + "requires": { + "node-gyp-build": "^4.3.0" + }, + "dependencies": { + "node-gyp-build": { + "version": "4.8.4", + "resolved": "https://registry.npmjs.org/node-gyp-build/-/node-gyp-build-4.8.4.tgz", + "integrity": "sha512-LA4ZjwlnUblHVgq0oBF3Jl/6h/Nvs5fzBLwdEF4nuxnFdsfajde4WfxtJr3CaiH+F6ewcIB/q4jQ4UzPyid+CQ==" + } + } + }, "class-transformer": { "version": "0.5.1", "resolved": "https://registry.npmjs.org/class-transformer/-/class-transformer-0.5.1.tgz", diff --git a/packages/backend/package.json b/packages/backend/package.json index 8c5b66137..541fecbe5 100644 --- a/packages/backend/package.json +++ b/packages/backend/package.json @@ -131,6 +131,7 @@ "blockstore-fs": "^2.0.2", "blockstore-level": "^2.0.1", "bs58": "^6.0.0", + "bufferutil": "^4.0.9", "class-transformer": "^0.5.1", "class-validator": "^0.14.1", "cli-table": "^0.3.6", diff --git a/packages/backend/src/nest/auth/sigchain.service.ts b/packages/backend/src/nest/auth/sigchain.service.ts index 53e1973a2..a71629fab 100644 --- a/packages/backend/src/nest/auth/sigchain.service.ts +++ b/packages/backend/src/nest/auth/sigchain.service.ts @@ -60,13 +60,14 @@ export class SigChainService implements OnModuleInit { * @param setActive Whether to set the chain as active * @returns Whether the chain was set as active */ - addChain(chain: SigChain, setActive: boolean): boolean { - if (this.chains.has(chain.team!.teamName)) { - throw new Error(`Chain for team ${chain.team!.teamName} already exists`) + addChain(chain: SigChain, setActive: boolean, teamName?: string): boolean { + teamName = teamName || chain.team!.teamName + if (this.chains.has(teamName)) { + throw new Error(`Chain for team ${teamName} already exists`) } - this.chains.set(chain.team!.teamName, chain) + this.chains.set(teamName, chain) if (setActive) { - this.setActiveChain(chain.team!.teamName) + this.setActiveChain(teamName) return true } return false @@ -103,9 +104,9 @@ export class SigChainService implements OnModuleInit { return sigChain } - async createChainFromInvite(username: string, seed: string, setActive: boolean): Promise { + async createChainFromInvite(username: string, teamName: string, seed: string, setActive: boolean): Promise { const sigChain = SigChain.createFromInvite(username, seed) - this.addChain(sigChain, setActive) + this.addChain(sigChain, setActive, teamName) return sigChain } diff --git a/packages/backend/src/nest/auth/sigchain.ts b/packages/backend/src/nest/auth/sigchain.ts index 79252cc6f..f6855fddf 100644 --- a/packages/backend/src/nest/auth/sigchain.ts +++ b/packages/backend/src/nest/auth/sigchain.ts @@ -25,8 +25,8 @@ class SigChain { private _invites: InviteService | null = null private _crypto: CryptoService | null = null - private constructor(context: auth.LocalUserContext, team?: auth.Team) { - this.localUserContext = context + private constructor(localUserContext: auth.LocalUserContext, team?: auth.Team) { + this.localUserContext = localUserContext if (team) this.team = team } @@ -41,6 +41,11 @@ class SigChain { const context = UserService.create(username) const team: auth.Team = this.lfa.createTeam(teamName, context) const sigChain = this.init(context, team) + sigChain.context = { + user: context.user, + device: context.device, + team: team, + } as auth.MemberContext // sigChain.roles.createWithMembers(RoleName.ADMIN, [context.user.userId]) sigChain.roles.createWithMembers(RoleName.MEMBER, [context.user.userId]) diff --git a/packages/backend/src/nest/connections-manager/connections-manager.service.ts b/packages/backend/src/nest/connections-manager/connections-manager.service.ts index f6abc7470..3ea150e57 100644 --- a/packages/backend/src/nest/connections-manager/connections-manager.service.ts +++ b/packages/backend/src/nest/connections-manager/connections-manager.service.ts @@ -588,6 +588,14 @@ export class ConnectionsManagerService extends EventEmitter implements OnModuleI id: payload.id, } await this.storageService.setIdentity(identity) + + if (!community.name) { + this.logger.error('Community name is required to create sigchain') + return community + } + this.logger.info(`Creating new LFA chain`) + await this.sigChainService.createChain(community.name, identity.nickname, true) + await this.launchCommunity(community) const meta = await this.storageService.updateCommunityMetadata({ @@ -610,14 +618,6 @@ export class ConnectionsManagerService extends EventEmitter implements OnModuleI await this.storageService.saveCSR({ csr: identity.userCsr.userCsr }) } - // create sigchain - if (!community.name) { - this.logger.error('Community name is required to create sigchain') - return community - } - - this.logger.info(`Creating new LFA chain`) - await this.sigChainService.createChain(community.name, identity.nickname, true) // this is the forever invite that all users get this.logger.info(`Creating long lived LFA invite code`) this.socketService.emit(SocketActionTypes.CREATE_LONG_LIVED_LFA_INVITE) @@ -663,7 +663,12 @@ export class ConnectionsManagerService extends EventEmitter implements OnModuleI const inviteData = payload.inviteData if (inviteData && inviteData?.version == InvitationDataVersion.v2) { - this.sigChainService.createChainFromInvite(identity.nickname, inviteData.authData.seed, true) + this.sigChainService.createChainFromInvite( + identity.nickname, + inviteData.authData.communityName, + inviteData.authData.seed, + true + ) } if (!metadata.peers || metadata.peers.length === 0) { @@ -954,18 +959,18 @@ export class ConnectionsManagerService extends EventEmitter implements OnModuleI SocketActionTypes.VALIDATE_OR_CREATE_LONG_LIVED_LFA_INVITE, async ( inviteId: Base58, - callback: (response: { isValid: boolean; newInvite?: InviteResult } | undefined) => void + callback: (response: { valid: boolean; newInvite?: InviteResult } | undefined) => void ) => { this.logger.info(`socketService - ${SocketActionTypes.VALIDATE_OR_CREATE_LONG_LIVED_LFA_INVITE}`) if (this.sigChainService.activeChainTeamName != null) { if (this.sigChainService.getActiveChain().invites.isValidLongLivedUserInvite(inviteId)) { this.logger.info(`Invite is a valid long lived LFA invite code!`) - callback({ isValid: true }) + callback({ valid: true }) } else { this.logger.info(`Invite is an invalid long lived LFA invite code! Generating a new code!`) const newInvite = this.sigChainService.getActiveChain().invites.createLongLivedUserInvite() this.serverIoProvider.io.emit(SocketActionTypes.CREATED_LONG_LIVED_LFA_INVITE, newInvite) - callback({ isValid: false, newInvite }) + callback({ valid: false, newInvite }) } } else { this.logger.warn(`No sigchain configured, skipping long lived LFA invite code validation/generation!`) diff --git a/packages/backend/src/nest/libp2p/libp2p.auth.ts b/packages/backend/src/nest/libp2p/libp2p.auth.ts index bfbabb8ec..4e3c7f7f7 100644 --- a/packages/backend/src/nest/libp2p/libp2p.auth.ts +++ b/packages/backend/src/nest/libp2p/libp2p.auth.ts @@ -1,22 +1,25 @@ // ISLA: This is what needs to be added to Quiet to make it work Libp2p and LFA work together -import { ComponentLogger, Connection, PeerId, PeerStore, Stream, Topology } from '@libp2p/interface' -// import type { ConnectionManager, IncomingStreamData, Registrar } from '@libp2p/interface-internal' +import { ComponentLogger, Connection, NewStreamOptions, PeerId, PeerStore, Stream, Topology } from '@libp2p/interface' +import type { ConnectionManager, IncomingStreamData, Registrar } from '@libp2p/interface-internal' import * as Auth from '@localfirst/auth' import { pushable, type Pushable } from 'it-pushable' import { Uint8ArrayList } from 'uint8arraylist' import { pipe } from 'it-pipe' import { encode, decode } from 'it-length-prefixed' -import { SigChainService } from '../auth/sigchain.service.js' -import { AuthEvents, QuietAuthEvents } from './libp2p.types.js' -import { createLogger } from '../common/logger.js' +import { SigChainService } from '../auth/sigchain.service' +import { AuthEvents, QuietAuthEvents } from './libp2p.types' +import { createLogger } from '../common/logger' import { createQuietLogger } from '@quiet/logger' +import { ConnectionParams } from '3rd-party/auth/packages/auth/dist/connection/Connection' + +const logger = createLogger('libp2p:auth') export interface Libp2pAuthComponents { peerId: PeerId peerStore: PeerStore - // registrar: Registrar - // connectionManager: ConnectionManager + registrar: Registrar + connectionManager: ConnectionManager logger: ComponentLogger } @@ -34,23 +37,21 @@ enum JoinStatus { const createLFALogger = createQuietLogger('localfirst:') export class Libp2pAuth { - // private readonly protocol: string - // private readonly components: Libp2pAuthComponents - // private sigChainService: SigChainService - // private authConnections: Record - // private outboundStreamQueue: Pushable<{ peerId: PeerId; connection: Connection }> - // private outboundStreams: Record - // private inboundStreams: Record - // private restartableAuthConnections: Map - // private bufferedConnections: { peerId: PeerId; connection: Connection }[] - // private events: QuietAuthEvents - // private peerId: PeerId - // private joining: boolean = false - // private restartInterval: any - // private unblockInterval: NodeJS.Timeout - // private joinStatus: JoinStatus - // private logger: ReturnType - // private authContext: Auth.Context + private readonly protocol: string + private readonly components: Libp2pAuthComponents + private sigChainService: SigChainService + private authConnections: Record + private outboundStreamQueue: Pushable<{ peerId: PeerId; connection: Connection }> + private outboundStreams: Record + private inboundStreams: Record + private restartableAuthConnections: Map + private bufferedConnections: { peerId: PeerId; connection: Connection }[] + private events: QuietAuthEvents + private peerId: PeerId + private joining: boolean = false + private restartInterval: any + private unblockInterval: NodeJS.Timeout + private joinStatus: JoinStatus constructor( peerId: PeerId, @@ -58,344 +59,321 @@ export class Libp2pAuth { components: Libp2pAuthComponents, events: QuietAuthEvents ) { - // this.protocol = '/local-first-auth/1.0.0' - // this.peerId = peerId - // this.components = components - // this.sigChainService = sigChainService - // this.authConnections = {} - // this.restartableAuthConnections = new Map() - // this.outboundStreamQueue = pushable<{ peerId: PeerId; connection: Connection }>({ objectMode: true }) - // this.outboundStreams = {} - // this.inboundStreams = {} - // this.bufferedConnections = [] - // this.joinStatus = JoinStatus.PENDING - // this.events = events - // this.logger = createLogger(`libp2p:auth:${peerId}`) + this.protocol = '/local-first-auth/1.0.0' + this.peerId = peerId + this.components = components + this.sigChainService = sigChainService + this.authConnections = {} + this.restartableAuthConnections = new Map() + this.outboundStreamQueue = pushable<{ peerId: PeerId; connection: Connection }>({ objectMode: true }) + this.outboundStreams = {} + this.inboundStreams = {} + this.bufferedConnections = [] + this.joinStatus = JoinStatus.PENDING + this.events = events + + logger.info('Auth service initialized') + logger.info('sigChainService', sigChainService.activeChainTeamName) + + pipe(this.outboundStreamQueue, async source => { + for await (const { peerId, connection } of source) { + await this.openOutboundStream(peerId, connection) + } + }).catch(e => { + logger.error('Outbound stream queue error', e) + }) + + this.restartInterval = setInterval(this.restartStoppedConnections, 45_000, this.restartableAuthConnections) + this.unblockInterval = setInterval(this.unblockConnections, 5_000, this.bufferedConnections, this.joinStatus) + } + private restartStoppedConnections(restartableAuthConnections: Map) { + logger.info(`Attempting to restart stopped auth connections`) + for (const [ms, connection] of restartableAuthConnections.entries()) { + if (ms >= Date.now()) { + connection.start() + restartableAuthConnections.delete(ms) + } + } + } + + private async unblockConnections(conns: { peerId: PeerId; connection: Connection }[], status: JoinStatus) { + if (status !== JoinStatus.JOINED) return + + logger.info(`Unblocking ${conns.length} connections now that we've joined the chain`) + while (conns.length > 0) { + const conn = conns.pop() + if (conn != null) { + await this.onPeerConnected(conn.peerId, conn.connection) + } + } + } + + async start() { + logger.info('Auth service starting') + + const topology: Topology = { + onConnect: this.onPeerConnected.bind(this), + onDisconnect: this.onPeerDisconnected.bind(this), + notifyOnLimitedConnection: false, + } + + const registrar = this.components.registrar + await registrar.register(this.protocol, topology) + await registrar.handle(this.protocol, this.onIncomingStream.bind(this), { + runOnLimitedConnection: false, + }) + } + + async stop() { + // TODO + } + + private async openOutboundStream(peerId: PeerId, connection: Connection) { + if (peerId.toString() in this.outboundStreams) { + return + } + + logger.info('Opening outbound stream for peer', peerId.toString()) + const outboundStream = await connection.newStream(this.protocol, { + runOnLimitedConnection: false, + negotiateFully: true, + }) + const outboundPushable: Pushable = pushable() + this.outboundStreams[peerId.toString()] = { + stream: outboundStream, + pushable: outboundPushable, + } + + pipe(outboundPushable, outboundStream).catch((e: Error) => + logger.error(`Error opening outbound stream to ${peerId}`, e) + ) + + if (connection.direction === 'outbound') { + await this.openInboundStream(peerId, connection) + } + + this.authConnections[peerId.toString()].start() + } + + private async openInboundStream(peerId: PeerId, connection: Connection) { + if (peerId.toString() in this.inboundStreams) { + return + } + + logger.info('Opening new inbound stream for peer', peerId.toString()) + const inboundStream = await connection.newStream(this.protocol, { + runOnLimitedConnection: false, + negotiateFully: true, + } as NewStreamOptions) + + this.handleIncomingMessages(peerId, inboundStream) + this.inboundStreams[peerId.toString()] = inboundStream + } + + private async onIncomingStream({ stream, connection }: IncomingStreamData) { + const peerId = connection.remotePeer + logger.info(`Handling existing incoming stream ${peerId.toString()}`) + + const oldStream = this.inboundStreams[peerId.toString()] + if (oldStream) { + logger.info(`Old inbound stream found!`) + await this.closeInboundStream(peerId, true) + } + + this.handleIncomingMessages(peerId, stream) + + this.inboundStreams[peerId.toString()] = stream + } + + private handleIncomingMessages(peerId: PeerId, stream: Stream) { + pipe( + stream, + source => decode(source), + async source => { + for await (const data of source) { + try { + if (!(peerId.toString() in this.authConnections)) { + logger.error(`No auth connection established for ${peerId.toString()}`) + } else { + this.authConnections[peerId.toString()].deliver(data.subarray()) + } + } catch (e) { + logger.error(`Error while delivering message to ${peerId}`, e) + } + } + } + ) + } + + private sendMessage(peerId: PeerId, message: Uint8Array) { + try { + this.outboundStreams[peerId.toString()]?.pushable.push( + // length-prefix encoded + encode.single(message) + ) + } catch (e) { + logger.error(`Error while sending auth message over stream to ${peerId.toString()}`, e) + } + } + + // NOTE: This is not awaited by the registrar + private async onPeerConnected(peerId: PeerId, connection: Connection) { + if (this.joinStatus === JoinStatus.JOINING) { + logger.warn(`Connection to ${peerId.toString()} will be buffered due to a concurrent join`) + this.bufferedConnections.push({ peerId, connection }) + return + } + + if (this.joinStatus === JoinStatus.PENDING) { + this.joinStatus = JoinStatus.JOINING + } + + logger.info(`Peer connected (direction = ${connection.direction})!`) + + // https://github.com/ChainSafe/js-libp2p-gossipsub/issues/398 + if (connection.status !== 'open') { + logger.warn(`The connection with ${peerId.toString()} was not in an open state!`) + return + } + + const context = this.sigChainService.getActiveChain().context + + if (peerId.toString() in this.authConnections) { + logger.info(`A connection with ${peerId.toString()} was already available, skipping connection initialization!`) + return + } + + const authConnection = new Auth.Connection({ + context, + sendMessage: (message: Uint8Array) => { + this.sendMessage(peerId, message) + }, + createLogger: createLFALogger, + } as ConnectionParams) + + const handleAuthConnErrors = (error: Auth.ConnectionErrorPayload, remoteUsername: string | undefined) => { + logger.error(`Got an error while handling auth connection with ${remoteUsername}`, JSON.stringify(error)) + if (error.type === 'TIMEOUT') { + this.events.emit(AuthEvents.AUTH_TIMEOUT, { peerId, remoteUsername }) + } else if (error.type === 'DEVICE_UNKNOWN') { + this.events.emit(AuthEvents.MISSING_DEVICE, { peerId, remoteUsername }) + } + } + + // TODO: Listen for updates to context and update context in storage + authConnection.on('joined', payload => { + const { team, user } = payload + const sigChain = this.sigChainService.getActiveChain() + logger.info(`${sigChain.localUserContext.user.userId}: Joined team ${team.teamName} (userid: ${user.userId})!`) + if (sigChain.team == null && !this.joining) { + this.joining = true + logger.info( + `${user.userId}: Creating SigChain for user with name ${user.userName} and team name ${team.teamName}` + ) + logger.info(`${user.userId}: Updating auth context`) + + sigChain.context = { + ...sigChain.context, + team, + user, + } as Auth.MemberContext + sigChain.team = team + this.joining = false + } + if (this.joinStatus === JoinStatus.JOINING) { + this.joinStatus = JoinStatus.JOINED + this.unblockConnections(this.bufferedConnections, this.joinStatus) + } + this.events.emit(AuthEvents.INITIALIZED_CHAIN) + }) + + authConnection.on('localError', error => { + handleAuthConnErrors(error, authConnection._context.userName) + }) + + authConnection.on('remoteError', error => { + handleAuthConnErrors(error, authConnection._context.userName) + }) + + authConnection.on('connected', () => { + logger.info(`LFA Connected!`) + if (this.sigChainService.activeChainTeamName != null) { + logger.debug(`Sending sync message because our chain is intialized`) + const sigChain = this.sigChainService.getActiveChain() + const team = sigChain.team + const user = sigChain.localUserContext.user + authConnection.emit('sync', { team, user }) + } + }) + + authConnection.on('disconnected', event => { + logger.info(`LFA Disconnected!`, event) + authConnection.stop() + this.restartableAuthConnections.set(Date.now() + 30_000, authConnection) + }) + + this.authConnections[peerId.toString()] = authConnection + + this.outboundStreamQueue.push({ peerId, connection }) + } + + private async onPeerDisconnected(peerId: PeerId) { + logger.warn(`Disconnecting auth connection with peer ${peerId.toString()}`) + await this.closeAuthConnection(peerId) + } + + private async closeOutboundStream(peerId: PeerId, deleteRecord?: boolean) { + logger.warn(`Closing outbound stream with ${peerId.toString()}`) + const outboundStream = this.outboundStreams[peerId.toString()] + + if (outboundStream == null) { + logger.warn(`Can't close outbound stream with ${peerId.toString()} as it doesn't exist`) + return + } + + await outboundStream.pushable.end().onEmpty() + await outboundStream.stream.close().catch(e => { + outboundStream.stream.abort(e) + }) + + if (deleteRecord) { + delete this.outboundStreams[peerId.toString()] + } + } + + private async closeInboundStream(peerId: PeerId, deleteRecord?: boolean) { + logger.warn(`Closing inbound stream with ${peerId.toString()}`) + const inboundStream = this.inboundStreams[peerId.toString()] + + if (inboundStream == null) { + logger.warn(`Can't close inbound stream with ${peerId.toString()} as it doesn't exist`) + return + } + + await inboundStream.close().catch(e => { + inboundStream.abort(e) + }) + + if (deleteRecord) { + delete this.inboundStreams[peerId.toString()] + } + } + + private async closeAuthConnection(peerId: PeerId) { + logger.warn(`Closing auth connection with ${peerId.toString()}`) + const connection = this.authConnections[peerId.toString()] + + if (connection == null) { + logger.warn(`Can't close auth connection with ${peerId.toString()} as it doesn't exist`) + } else { + connection.stop() + delete this.authConnections[peerId.toString()] + } + + await this.closeOutboundStream(peerId, true) + await this.closeInboundStream(peerId, true) } } -// pipe(this.outboundStreamQueue, async source => { -// for await (const { peerId, connection } of source) { -// await this.openOutboundStream(peerId, connection) -// } -// }).catch(e => { -// this.logger.error('Outbound stream queue error', e) -// }) - -// this.restartInterval = setInterval( -// this.restartStoppedConnections, -// 45_000, -// this.restartableAuthConnections, -// this.logger -// ) -// this.unblockInterval = setInterval( -// this.unblockConnections, -// 5_000, -// this.bufferedConnections, -// this.joinStatus, -// this.logger -// ) -// } - -// private restartStoppedConnections(restartableAuthConnections: Map) { -// this.logger.info(`Attempting to restart stopped auth connections`) -// for (const [ms, connection] of restartableAuthConnections.entries()) { -// if (ms >= Date.now()) { -// connection.start() -// restartableAuthConnections.delete(ms) -// } -// } -// } - -// private async unblockConnections( -// conns: { peerId: PeerId; connection: Connection }[], -// status: JoinStatus, -// logger: ReturnType -// ) { -// if (status !== JoinStatus.JOINED) return - -// logger.info(`Unblocking ${conns.length} connections now that we've joined the chain`) -// while (conns.length > 0) { -// const conn = conns.pop() -// if (conn != null) { -// await this.onPeerConnected(conn.peerId, conn.connection) -// } -// } -// } - -// async start() { -// this.logger.info('Auth service starting') - -// const topology: Topology = { -// onConnect: this.onPeerConnected.bind(this), -// onDisconnect: this.onPeerDisconnected.bind(this), -// notifyOnTransient: false, -// } - -// const registrar = this.components.registrar -// await registrar.register(this.protocol, topology) -// await registrar.handle(this.protocol, this.onIncomingStream.bind(this), { -// runOnTransientConnection: false, -// }) -// } - -// async stop() { -// // TODO -// } - -// private async openOutboundStream(peerId: PeerId, connection: Connection) { -// if (peerId.toString() in this.outboundStreams) { -// return -// } - -// this.logger.info('Opening outbound stream for peer', peerId.toString()) -// const outboundStream = await connection.newStream(this.protocol, { -// runOnTransientConnection: false, -// negotiateFully: true, -// }) -// const outboundPushable: Pushable = pushable() -// this.outboundStreams[peerId.toString()] = { -// stream: outboundStream, -// pushable: outboundPushable, -// } - -// pipe(outboundPushable, outboundStream).catch((e: Error) => -// this.logger.error(`Error opening outbound stream to ${peerId}`, e) -// ) - -// if (connection.direction === 'outbound') { -// await this.openInboundStream(peerId, connection) -// } - -// this.authConnections[peerId.toString()].start() -// } - -// private async openInboundStream(peerId: PeerId, connection: Connection) { -// if (peerId.toString() in this.inboundStreams) { -// return -// } - -// this.logger.info('Opening new inbound stream for peer', peerId.toString()) -// const inboundStream = await connection.newStream(this.protocol, { -// runOnTransientConnection: false, -// negotiateFully: true, -// }) - -// this.handleIncomingMessages(peerId, inboundStream) -// this.inboundStreams[peerId.toString()] = inboundStream -// } - -// private async onIncomingStream({ stream, connection }: IncomingStreamData) { -// const peerId = connection.remotePeer -// this.logger.info(`Handling existing incoming stream ${peerId.toString()}`) - -// const oldStream = this.inboundStreams[peerId.toString()] -// if (oldStream) { -// this.logger.info(`Old inbound stream found!`) -// await this.closeInboundStream(peerId, true) -// } - -// this.handleIncomingMessages(peerId, stream) - -// this.inboundStreams[peerId.toString()] = stream -// } - -// private handleIncomingMessages(peerId: PeerId, stream: Stream) { -// pipe( -// stream, -// source => decode(source), -// async source => { -// for await (const data of source) { -// try { -// if (!(peerId.toString() in this.authConnections)) { -// this.logger.error(`No auth connection established for ${peerId.toString()}`) -// } else { -// this.authConnections[peerId.toString()].deliver(data.subarray()) -// } -// } catch (e) { -// this.logger.error(`Error while delivering message to ${peerId}`, e) -// } -// } -// } -// ) -// } - -// private sendMessage(peerId: PeerId, message: Uint8Array) { -// try { -// this.outboundStreams[peerId.toString()]?.pushable.push( -// // length-prefix encoded -// encode.single(message) -// ) -// } catch (e) { -// this.logger.error(`Error while sending auth message over stream to ${peerId.toString()}`, e) -// } -// } - -// // NOTE: This is not awaited by the registrar -// private async onPeerConnected(peerId: PeerId, connection: Connection) { -// if (this.joinStatus === JoinStatus.JOINING) { -// this.logger.warn(`Connection to ${peerId.toString()} will be buffered due to a concurrent join`) -// this.bufferedConnections.push({ peerId, connection }) -// return -// } - -// if (this.joinStatus === JoinStatus.PENDING) { -// this.joinStatus = JoinStatus.JOINING -// } - -// this.logger.info(`Peer connected (direction = ${connection.direction})!`) - -// // https://github.com/ChainSafe/js-libp2p-gossipsub/issues/398 -// if (connection.status !== 'open') { -// this.logger.warn(`The connection with ${peerId.toString()} was not in an open state!`) -// return -// } - -// const context = this.authContext -// this.logger.info( -// `Context with ${peerId.toString()} is a member context?: ${(context as Auth.InviteeMemberContext).invitationSeed == null}` -// ) -// if (!context) { -// throw new Error('Auth context required to connect to peer') -// } - -// if (peerId.toString() in this.authConnections) { -// this.logger.info( -// `A connection with ${peerId.toString()} was already available, skipping connection initialization!` -// ) -// return -// } - -// const authConnection = new Auth.Connection({ -// context, -// sendMessage: (message: Uint8Array) => { -// this.sendMessage(peerId, message) -// }, -// createLogger: createLFALogger, -// }) - -// const handleAuthConnErrors = (error: Auth.ConnectionErrorPayload, remoteUsername: string | undefined) => { -// this.logger.error(`Got an error while handling auth connection with ${remoteUsername}`, JSON.stringify(error)) -// if (error.type === 'TIMEOUT') { -// this.events.emit(AuthEvents.AUTH_TIMEOUT, { peerId, remoteUsername }) -// } else if (error.type === 'DEVICE_UNKNOWN') { -// this.events.emit(AuthEvents.MISSING_DEVICE, { peerId, remoteUsername }) -// } -// } - -// // TODO: Listen for updates to context and update context in storage -// authConnection.on('joined', payload => { -// const { team, user } = payload -// const sigChain = this.sigChainService.getActiveChain() -// this.logger.info( -// `${sigChain.localUserContext.user.userId}: Joined team ${team.teamName} (userid: ${user.userId})!` -// ) -// if (sigChain.team == null && !this.joining) { -// this.joining = true -// this.logger.info( -// `${user.userId}: Creating SigChain for user with name ${user.userName} and team name ${team.teamName}` -// ) -// this.logger.info(`${user.userId}: Updating auth context`) -// this.sigChainService.getActiveChain().context = { -// ...this.sigChainService.getActiveChain().context, -// team, -// user, -// } as Auth.MemberContext -// this.joining = false -// } -// if (this.joinStatus === JoinStatus.JOINING) { -// this.joinStatus = JoinStatus.JOINED -// this.unblockConnections(this.bufferedConnections, this.joinStatus, this.logger) -// } -// this.events.emit(AuthEvents.INITIALIZED_CHAIN) -// }) - -// authConnection.on('localError', error => { -// handleAuthConnErrors(error, authConnection._context.userName) -// }) - -// authConnection.on('remoteError', error => { -// handleAuthConnErrors(error, authConnection._context.userName) -// }) - -// authConnection.on('connected', () => { -// this.logger.info(`LFA Connected!`) -// if (this.sigChainService.activeChainTeamName != null) { -// this.logger.debug(`Sending sync message because our chain is intialized`) -// const sigChain = this.sigChainService.getActiveChain() -// const team = sigChain.team -// const user = sigChain.localUserContext.user -// authConnection.emit('sync', { team, user }) -// } -// }) - -// authConnection.on('disconnected', event => { -// this.logger.info(`LFA Disconnected!`, event) -// authConnection.stop() -// this.restartableAuthConnections.set(Date.now() + 30_000, authConnection) -// }) - -// this.authConnections[peerId.toString()] = authConnection - -// this.outboundStreamQueue.push({ peerId, connection }) -// } - -// private async onPeerDisconnected(peerId: PeerId) { -// this.logger.warn(`Disconnecting auth connection with peer ${peerId.toString()}`) -// await this.closeAuthConnection(peerId) -// } - -// private async closeOutboundStream(peerId: PeerId, deleteRecord?: boolean) { -// this.logger.warn(`Closing outbound stream with ${peerId.toString()}`) -// const outboundStream = this.outboundStreams[peerId.toString()] - -// if (outboundStream == null) { -// this.logger.warn(`Can't close outbound stream with ${peerId.toString()} as it doesn't exist`) -// return -// } - -// await outboundStream.pushable.end().onEmpty() -// await outboundStream.stream.close().catch(e => { -// outboundStream.stream.abort(e) -// }) - -// if (deleteRecord) { -// delete this.outboundStreams[peerId.toString()] -// } -// } - -// private async closeInboundStream(peerId: PeerId, deleteRecord?: boolean) { -// this.logger.warn(`Closing inbound stream with ${peerId.toString()}`) -// const inboundStream = this.inboundStreams[peerId.toString()] - -// if (inboundStream == null) { -// this.logger.warn(`Can't close inbound stream with ${peerId.toString()} as it doesn't exist`) -// return -// } - -// await inboundStream.close().catch(e => { -// inboundStream.abort(e) -// }) - -// if (deleteRecord) { -// delete this.inboundStreams[peerId.toString()] -// } -// } - -// private async closeAuthConnection(peerId: PeerId) { -// this.logger.warn(`Closing auth connection with ${peerId.toString()}`) -// const connection = this.authConnections[peerId.toString()] - -// if (connection == null) { -// this.logger.warn(`Can't close auth connection with ${peerId.toString()} as it doesn't exist`) -// } else { -// connection.stop() -// delete this.authConnections[peerId.toString()] -// } - -// await this.closeOutboundStream(peerId, true) -// await this.closeInboundStream(peerId, true) -// } -// } export const libp2pAuth = ( peerId: PeerId, diff --git a/packages/backend/src/nest/libp2p/libp2p.service.ts b/packages/backend/src/nest/libp2p/libp2p.service.ts index 828855473..208bcb6c3 100644 --- a/packages/backend/src/nest/libp2p/libp2p.service.ts +++ b/packages/backend/src/nest/libp2p/libp2p.service.ts @@ -35,12 +35,14 @@ import { Libp2pEvents, Libp2pNodeParams, Libp2pPeerInfo, + QuietAuthEvents, } from './libp2p.types' import { createLogger } from '../common/logger' import { Libp2pDatastore } from './libp2p.datastore' import { WEBSOCKET_CIPHER_SUITE, BITSWAP_PROTOCOL } from './libp2p.const' -import { libp2pAuth } from './libp2p.auth' +import { libp2pAuth, Libp2pAuth } from './libp2p.auth' +import { SigChainService } from '../auth/sigchain.service' const KEY_LENGTH = 32 export const LIBP2P_PSK_METADATA = '/key/swarm/psk/1.0.0/\n/base16/\n' @@ -54,16 +56,20 @@ export class Libp2pService extends EventEmitter { public libp2pDatastore: Libp2pDatastore private redialTimeout: NodeJS.Timeout private localAddress: string + private events: QuietAuthEvents private readonly logger = createLogger(Libp2pService.name) constructor( @Inject(SERVER_IO_PROVIDER) public readonly serverIoProvider: ServerIoProviderTypes, @Inject(SOCKS_PROXY_AGENT) public readonly socksProxyAgent: Agent, - @Inject(LIBP2P_DB_PATH) public readonly datastorePath: string + @Inject(LIBP2P_DB_PATH) public readonly datastorePath: string, + private sigchainService: SigChainService ) { super() + this.logger.info('SigChainService', this.sigchainService) + this.events = new QuietAuthEvents() this.dialQueue = [] this.connectedPeers = new Map() this.dialedPeers = new Set() @@ -294,6 +300,7 @@ export class Libp2pService extends EventEmitter { }), ], services: { + auth: libp2pAuth(params.peerId.peerId, this.sigchainService, this.events), ping: ping({ timeout: 30_000 }), pubsub: gossipsub({ // neccessary to run a single peer diff --git a/packages/backend/src/nest/libp2p/libp2p.types.ts b/packages/backend/src/nest/libp2p/libp2p.types.ts index c2fa86d5d..7625c7547 100644 --- a/packages/backend/src/nest/libp2p/libp2p.types.ts +++ b/packages/backend/src/nest/libp2p/libp2p.types.ts @@ -43,9 +43,9 @@ export class QuietAuthEvents { private _events: EventEmitter private _LOGGER: ReturnType - constructor(identifier: string) { + constructor() { this._events = new EventEmitter() - this._LOGGER = createLogger(`quietAuthEvents:${identifier}`) + this._LOGGER = createLogger(`quietAuthEvents`) } public emit(event: AuthEvents, ...args: any[]) { diff --git a/packages/backend/src/nest/local-db/local-db.service.ts b/packages/backend/src/nest/local-db/local-db.service.ts index 514139d20..ca61f505d 100644 --- a/packages/backend/src/nest/local-db/local-db.service.ts +++ b/packages/backend/src/nest/local-db/local-db.service.ts @@ -165,8 +165,8 @@ export class LocalDbService { return await this.get(LocalDBKeys.IDENTITIES) } - public async setSigChain(sigChain: SigChain) { - const teamName = sigChain.team!.teamName + public async setSigChain(sigChain: SigChain, teamName?: string) { + teamName = teamName || sigChain.team!.teamName const key = `${LocalDBKeys.SIGCHAINS}${teamName}` const serializedSigChain: SigChainSaveData = { serializedTeam: Buffer.from(sigChain.save()).toString('base64'), diff --git a/packages/desktop/src/main/main.ts b/packages/desktop/src/main/main.ts index 36745c1b8..554f752b6 100644 --- a/packages/desktop/src/main/main.ts +++ b/packages/desktop/src/main/main.ts @@ -115,7 +115,7 @@ export const applyDevTools = async () => { if (!isDev || isE2Etest) return /* eslint-disable */ require('electron-debug')({ - showDevTools: true, + showDevTools: false, }) const installer = require('electron-devtools-installer') const { REACT_DEVELOPER_TOOLS, REDUX_DEVTOOLS } = require('electron-devtools-installer') diff --git a/packages/logger/src/index.ts b/packages/logger/src/index.ts index 5e44becab..d42c0a47a 100644 --- a/packages/logger/src/index.ts +++ b/packages/logger/src/index.ts @@ -85,6 +85,10 @@ export class QuietLogger { this.isDebug = debug.enabled(name) } + extend(moduleName: string): QuietLogger { + return new QuietLogger(`${this.name}:${moduleName}`, this.parallelConsoleLog) + } + /* Log Level Methods */ diff --git a/packages/state-manager/src/sagas/appConnection/invite/createInvite.saga.ts b/packages/state-manager/src/sagas/appConnection/invite/createInvite.saga.ts index 15a3d7f15..6043cf730 100644 --- a/packages/state-manager/src/sagas/appConnection/invite/createInvite.saga.ts +++ b/packages/state-manager/src/sagas/appConnection/invite/createInvite.saga.ts @@ -24,6 +24,7 @@ export function* createInviteSaga( socket.emitWithAck, applyEmitParams(SocketActionTypes.VALIDATE_OR_CREATE_LONG_LIVED_LFA_INVITE, existingLongLivedInvite?.id) ) + logger.info(`lfaInviteData: ${JSON.stringify(lfaInviteData)}`) if (!lfaInviteData?.valid && lfaInviteData?.newInvite != null) { logger.info(`Existing long-lived invite was invalid, the invite has been replaced`) yield* putResolve(connectionActions.setLongLivedInvite(lfaInviteData.newInvite))