diff --git a/packages/backend/package-lock.json b/packages/backend/package-lock.json index 2697f66df2..2700dbf028 100644 --- a/packages/backend/package-lock.json +++ b/packages/backend/package-lock.json @@ -16,6 +16,7 @@ "@nestjs/platform-express": "^10.2.10", "@peculiar/webcrypto": "1.4.3", "abortable-iterator": "^3.0.0", + "bufferutil": "^4.0.8", "class-transformer": "^0.5.1", "class-validator": "^0.13.1", "cli-table": "^0.3.6", @@ -57,6 +58,7 @@ "socks-proxy-agent": "^5.0.0", "string-replace-loader": "3.1.0", "ts-jest-resolver": "^2.0.0", + "utf-8-validate": "^6.0.4", "validator": "^13.11.0" }, "devDependencies": { @@ -1513,6 +1515,20 @@ "ws": "7.4.6" } }, + "node_modules/@ethersproject/providers/node_modules/utf-8-validate": { + "version": "5.0.10", + "resolved": "https://registry.npmjs.org/utf-8-validate/-/utf-8-validate-5.0.10.tgz", + "integrity": "sha512-Z6czzLq4u8fPOyx7TU6X3dvUZVvoJmxSQ+IcrlmagKhilxlhZgxPK6C5Jqbkw1IDUmFTM+cz9QDnnLTwDz/2gQ==", + "hasInstallScript": true, + "optional": true, + "peer": true, + "dependencies": { + "node-gyp-build": "^4.3.0" + }, + "engines": { + "node": ">=6.14.2" + } + }, "node_modules/@ethersproject/providers/node_modules/ws": { "version": "7.4.6", "license": "MIT", @@ -8743,6 +8759,18 @@ "version": "1.0.3", "license": "MIT" }, + "node_modules/bufferutil": { + "version": "4.0.8", + "resolved": "https://registry.npmjs.org/bufferutil/-/bufferutil-4.0.8.tgz", + "integrity": "sha512-4T53u4PdgsXqKaIctwF8ifXlRTTmEPJ8iEPWFdGZvcf7sbwYo6FKFEX9eNNAnzFZ7EzJAQ3CJeOtCRA4rDp7Pw==", + "hasInstallScript": true, + "dependencies": { + "node-gyp-build": "^4.3.0" + }, + "engines": { + "node": ">=6.14.2" + } + }, "node_modules/busboy": { "version": "1.6.0", "dependencies": { @@ -10373,6 +10401,20 @@ "xmlhttprequest-ssl": "~2.0.0" } }, + "node_modules/engine.io-client/node_modules/utf-8-validate": { + "version": "5.0.10", + "resolved": "https://registry.npmjs.org/utf-8-validate/-/utf-8-validate-5.0.10.tgz", + "integrity": "sha512-Z6czzLq4u8fPOyx7TU6X3dvUZVvoJmxSQ+IcrlmagKhilxlhZgxPK6C5Jqbkw1IDUmFTM+cz9QDnnLTwDz/2gQ==", + "hasInstallScript": true, + "optional": true, + "peer": true, + "dependencies": { + "node-gyp-build": "^4.3.0" + }, + "engines": { + "node": ">=6.14.2" + } + }, "node_modules/engine.io-client/node_modules/ws": { "version": "8.2.3", "license": "MIT", @@ -22890,6 +22932,18 @@ "node": ">=4" } }, + "node_modules/utf-8-validate": { + "version": "6.0.4", + "resolved": "https://registry.npmjs.org/utf-8-validate/-/utf-8-validate-6.0.4.tgz", + "integrity": "sha512-xu9GQDeFp+eZ6LnCywXN/zBancWvOpUMzgjLPSjy4BRHSmTelvn2E0DG0o1sTiw5hkCKBHo8rwSKncfRfv2EEQ==", + "hasInstallScript": true, + "dependencies": { + "node-gyp-build": "^4.3.0" + }, + "engines": { + "node": ">=6.14.2" + } + }, "node_modules/utf8-byte-length": { "version": "1.0.4", "license": "WTFPL" @@ -24434,6 +24488,16 @@ "ws": "7.4.6" }, "dependencies": { + "utf-8-validate": { + "version": "5.0.10", + "resolved": "https://registry.npmjs.org/utf-8-validate/-/utf-8-validate-5.0.10.tgz", + "integrity": "sha512-Z6czzLq4u8fPOyx7TU6X3dvUZVvoJmxSQ+IcrlmagKhilxlhZgxPK6C5Jqbkw1IDUmFTM+cz9QDnnLTwDz/2gQ==", + "optional": true, + "peer": true, + "requires": { + "node-gyp-build": "^4.3.0" + } + }, "ws": { "version": "7.4.6", "requires": {} @@ -29428,6 +29492,14 @@ "buffer-xor": { "version": "1.0.3" }, + "bufferutil": { + "version": "4.0.8", + "resolved": "https://registry.npmjs.org/bufferutil/-/bufferutil-4.0.8.tgz", + "integrity": "sha512-4T53u4PdgsXqKaIctwF8ifXlRTTmEPJ8iEPWFdGZvcf7sbwYo6FKFEX9eNNAnzFZ7EzJAQ3CJeOtCRA4rDp7Pw==", + "requires": { + "node-gyp-build": "^4.3.0" + } + }, "busboy": { "version": "1.6.0", "requires": { @@ -30512,6 +30584,16 @@ "xmlhttprequest-ssl": "~2.0.0" }, "dependencies": { + "utf-8-validate": { + "version": "5.0.10", + "resolved": "https://registry.npmjs.org/utf-8-validate/-/utf-8-validate-5.0.10.tgz", + "integrity": "sha512-Z6czzLq4u8fPOyx7TU6X3dvUZVvoJmxSQ+IcrlmagKhilxlhZgxPK6C5Jqbkw1IDUmFTM+cz9QDnnLTwDz/2gQ==", + "optional": true, + "peer": true, + "requires": { + "node-gyp-build": "^4.3.0" + } + }, "ws": { "version": "8.2.3", "requires": {} @@ -38336,6 +38418,14 @@ "nan": "^2.14.2" } }, + "utf-8-validate": { + "version": "6.0.4", + "resolved": "https://registry.npmjs.org/utf-8-validate/-/utf-8-validate-6.0.4.tgz", + "integrity": "sha512-xu9GQDeFp+eZ6LnCywXN/zBancWvOpUMzgjLPSjy4BRHSmTelvn2E0DG0o1sTiw5hkCKBHo8rwSKncfRfv2EEQ==", + "requires": { + "node-gyp-build": "^4.3.0" + } + }, "utf8-byte-length": { "version": "1.0.4" }, diff --git a/packages/backend/package.json b/packages/backend/package.json index e663a57080..6cee5e72c3 100644 --- a/packages/backend/package.json +++ b/packages/backend/package.json @@ -98,6 +98,7 @@ "@quiet/logger": "^2.0.2-alpha.0", "@quiet/types": "^2.0.2-alpha.1", "abortable-iterator": "^3.0.0", + "bufferutil": "^4.0.8", "class-transformer": "^0.5.1", "class-validator": "^0.13.1", "cli-table": "^0.3.6", @@ -139,6 +140,7 @@ "socks-proxy-agent": "^5.0.0", "string-replace-loader": "3.1.0", "ts-jest-resolver": "^2.0.0", + "utf-8-validate": "^6.0.4", "validator": "^13.11.0" }, "overrides": { diff --git a/packages/backend/src/nest/app.module.ts b/packages/backend/src/nest/app.module.ts index 01b4215930..e575fb302f 100644 --- a/packages/backend/src/nest/app.module.ts +++ b/packages/backend/src/nest/app.module.ts @@ -99,8 +99,9 @@ export class AppModule { allowedHeaders: ['authorization'], credentials: true, }, - pingInterval: 1000_000, - pingTimeout: 1000_000, + pingInterval: 10_000, + pingTimeout: 2_000, + connectTimeout: 60_000, }) io.engine.use((req, res, next) => { const authHeader = req.headers['authorization'] diff --git a/packages/backend/src/nest/connections-manager/connections-manager.service.tor.spec.ts b/packages/backend/src/nest/connections-manager/connections-manager.service.tor.spec.ts index 4506ed02d8..1504f51d13 100644 --- a/packages/backend/src/nest/connections-manager/connections-manager.service.tor.spec.ts +++ b/packages/backend/src/nest/connections-manager/connections-manager.service.tor.spec.ts @@ -31,19 +31,18 @@ import waitForExpect from 'wait-for-expect' import { Libp2pEvents } from '../libp2p/libp2p.types' import { sleep } from '../common/sleep' import { createLibp2pAddress } from '@quiet/common' -import { lib } from 'crypto-js' -jest.setTimeout(100_000) +jest.setTimeout(120_000) let tmpDir: DirResult let tmpAppDataPath: string -let module: TestingModule -let connectionsManagerService: ConnectionsManagerService -let tor: Tor -let localDbService: LocalDbService -let registrationService: RegistrationService -let libp2pService: Libp2pService +let module: TestingModule | undefined +let connectionsManagerService: ConnectionsManagerService | undefined +let tor: Tor | undefined +let localDbService: LocalDbService | undefined +let registrationService: RegistrationService | undefined +let libp2pService: Libp2pService | undefined let lazyModuleLoader: LazyModuleLoader let quietDir: string let store: Store @@ -55,6 +54,7 @@ let peerId: PeerId let torControl: TorControl beforeEach(async () => { + console.log('Starting test') jest.clearAllMocks() tmpDir = createTmpDir() tmpAppDataPath = tmpQuietDirPath(tmpDir.name) @@ -87,15 +87,15 @@ beforeEach(async () => { }) .compile() - connectionsManagerService = await module.resolve(ConnectionsManagerService) - localDbService = await module.resolve(LocalDbService) - registrationService = await module.resolve(RegistrationService) - tor = await module.resolve(Tor) - await tor.init() - const torPassword = crypto.randomBytes(16).toString('hex') torControl = await module.resolve(TorControl) torControl.authString = 'AUTHENTICATE ' + torPassword + '\r\n' + tor = await module.resolve(Tor) + await tor!.init() + + connectionsManagerService = await module.resolve(ConnectionsManagerService) + localDbService = await module.resolve(LocalDbService) + registrationService = await module.resolve(RegistrationService) lazyModuleLoader = await module.resolve(LazyModuleLoader) const { Libp2pModule: Module } = await import('../libp2p/libp2p.module') @@ -105,20 +105,26 @@ beforeEach(async () => { const params = await libp2pInstanceParams() peerId = params.peerId - connectionsManagerService.libp2pService = libp2pService + connectionsManagerService!.libp2pService = libp2pService! quietDir = await module.resolve(QUIET_DIR) const pskBase64 = Libp2pService.generateLibp2pPSK().psk - await localDbService.put(LocalDBKeys.PSK, pskBase64) + await localDbService!.put(LocalDBKeys.PSK, pskBase64) }) afterEach(async () => { + await tor?.kill() await libp2pService?.libp2pInstance?.stop() - if (connectionsManagerService) { - await connectionsManagerService.closeAllServices() - } + await connectionsManagerService?.closeAllServices() removeFilesFromDir(quietDir) + + tor = undefined + connectionsManagerService = undefined + libp2pService = undefined + registrationService = undefined + localDbService = undefined + module = undefined }) describe('Connections manager', () => { @@ -134,15 +140,15 @@ describe('Connections manager', () => { return this.peerId } } - const emitSpy = jest.spyOn(libp2pService, 'emit') + const emitSpy = jest.spyOn(libp2pService!, 'emit') // Peer connected - await connectionsManagerService.init() - await connectionsManagerService.launchCommunity({ + await connectionsManagerService!.init() + await connectionsManagerService!.launchCommunity({ community, network: { peerId: userIdentity.peerId, hiddenService: userIdentity.hiddenService }, }) - libp2pService.connectedPeers.set(peerId.toString(), { + libp2pService!.connectedPeers.set(peerId.toString(), { connectedAtSeconds: DateTime.utc().valueOf(), address: peerId.toString(), }) @@ -153,15 +159,15 @@ describe('Connections manager', () => { remotePeer: new RemotePeerEventDetail(peerId.toString()), remoteAddr: new RemotePeerEventDetail(remoteAddr), } - libp2pService.libp2pInstance?.dispatchEvent( + libp2pService!.libp2pInstance?.dispatchEvent( new CustomEvent('peer:disconnect', { detail: peerDisconectEventDetail }) ) - expect(libp2pService.connectedPeers.size).toEqual(0) + expect(libp2pService!.connectedPeers.size).toEqual(0) await waitForExpect(async () => { - expect(await localDbService.get(LocalDBKeys.PEERS)).not.toBeNull() + expect(await localDbService!.get(LocalDBKeys.PEERS)).not.toBeNull() }, 2000) - const peerStats: Record = await localDbService.get(LocalDBKeys.PEERS) + const peerStats: Record = await localDbService!.get(LocalDBKeys.PEERS) expect(Object.keys(peerStats)[0]).toEqual(remoteAddr) expect(emitSpy).toHaveBeenCalledWith(Libp2pEvents.PEER_DISCONNECTED, { peer: peerStats[remoteAddr].peerId, @@ -170,18 +176,6 @@ describe('Connections manager', () => { }) }) - it('creates network', async () => { - const spyOnDestroyHiddenService = jest.spyOn(tor, 'destroyHiddenService') - await connectionsManagerService.init() - const network = await connectionsManagerService.getNetwork() - console.log('network', network) - expect(network.hiddenService.onionAddress.split('.')[0]).toHaveLength(56) - expect(network.hiddenService.privateKey).toHaveLength(99) - const peerId = await PeerId.createFromJSON(network.peerId) - expect(PeerId.isPeerId(peerId)).toBeTruthy() - expect(await spyOnDestroyHiddenService.mock.results[0].value).toBeTruthy() - }) - it('dials many peers on start', async () => { const store = prepareStore().store const factory = await getFactory(store) @@ -208,8 +202,9 @@ describe('Connections manager', () => { hiddenService: userIdentity.hiddenService, }, } - await connectionsManagerService.init() - await connectionsManagerService.launchCommunity(launchCommunityPayload) + + await connectionsManagerService!.init() + await connectionsManagerService!.launchCommunity(launchCommunityPayload) await sleep(5000) // It looks LibP2P dials peers initially when it's started and // then IPFS service dials peers again when started, thus @@ -219,13 +214,25 @@ describe('Connections manager', () => { await sleep(5000) }) + it('creates network', async () => { + const spyOnDestroyHiddenService = jest.spyOn(tor!, 'destroyHiddenService') + await connectionsManagerService!.init() + const network = await connectionsManagerService!.getNetwork() + console.log('network', network) + expect(network.hiddenService.onionAddress.split('.')[0]).toHaveLength(56) + expect(network.hiddenService.privateKey).toHaveLength(99) + const peerId = await PeerId.createFromJSON(network.peerId) + expect(PeerId.isPeerId(peerId)).toBeTruthy() + expect(await spyOnDestroyHiddenService.mock.results[0].value).toBeTruthy() + }) + it.skip('Bug reproduction - iOS app crashing because lack of data server', async () => { const store = prepareStore().store const factory = await getFactory(store) const community = await factory.create('Community', { rootCa: 'rootCa' }) const userIdentity = await factory.create('Identity', { id: community.id, nickname: 'john' }) - await connectionsManagerService.init() + await connectionsManagerService!.init() const spyOnDial = jest.spyOn(WebSockets.prototype, 'dial') const peerList: string[] = [] @@ -247,13 +254,13 @@ describe('Connections manager', () => { }, } - await connectionsManagerService.launchCommunity(launchCommunityPayload) + await connectionsManagerService!.launchCommunity(launchCommunityPayload) expect(spyOnDial).toHaveBeenCalledTimes(peersCount) - await connectionsManagerService.closeAllServices() + await connectionsManagerService!.closeAllServices() await sleep(5000) - const launchSpy = jest.spyOn(connectionsManagerService, 'launch') - await connectionsManagerService.init() + const launchSpy = jest.spyOn(connectionsManagerService!, 'launch') + await connectionsManagerService!.init() expect(launchSpy).toBeCalledTimes(1) // Temporary fix for hanging test - websocketOverTor doesn't have abortController await sleep(5000) diff --git a/packages/backend/src/nest/connections-manager/connections-manager.service.ts b/packages/backend/src/nest/connections-manager/connections-manager.service.ts index 3c992fa1e1..3aa96d676e 100644 --- a/packages/backend/src/nest/connections-manager/connections-manager.service.ts +++ b/packages/backend/src/nest/connections-manager/connections-manager.service.ts @@ -66,6 +66,7 @@ import { emitError } from '../socket/socket.errors' import { createLibp2pAddress, isPSKcodeValid } from '@quiet/common' import { CertFieldsTypes, createRootCA, getCertFieldValue, loadCertificate } from '@quiet/identity' import { DateTime } from 'luxon' +import { platform } from 'os' @Injectable() export class ConnectionsManagerService extends EventEmitter implements OnModuleInit { @@ -75,6 +76,7 @@ export class ConnectionsManagerService extends EventEmitter implements OnModuleI private ports: GetPorts isTorInit: TorInitState = TorInitState.NOT_STARTED private peerInfo: Libp2pPeerInfo | undefined = undefined + private initializationInterval: NodeJS.Timer private readonly logger = Logger(ConnectionsManagerService.name) constructor( @@ -93,6 +95,7 @@ export class ConnectionsManagerService extends EventEmitter implements OnModuleI } async onModuleInit() { + this.logger('Initializing connection manager') process.on('unhandledRejection', error => { console.error(error) throw new Error() @@ -264,6 +267,11 @@ export class ConnectionsManagerService extends EventEmitter implements OnModuleI this.logger('Resuming!') await this.openSocket() const peersToDial = await this.getPeersOnResume() + const callback = async () => { + this.logger('Bootstrapping is finished') + this.libp2pService?.resume(peersToDial) + } + if (await this.runOnTorBootstrap(callback)) return this.libp2pService?.resume(peersToDial) } @@ -578,7 +586,8 @@ export class ConnectionsManagerService extends EventEmitter implements OnModuleI peers: peers ? peers.slice(1) : [], psk: Libp2pService.generateLibp2pPSK(community.psk).fullKey, } - await this.libp2pService.createInstance(params) + const startDialImmediately = this.tor.isTorInitialized + await this.libp2pService.createInstance(params, startDialImmediately) // Libp2p event listeners this.libp2pService.on(Libp2pEvents.PEER_CONNECTED, async (payload: { peers: string[] }) => { @@ -634,6 +643,14 @@ export class ConnectionsManagerService extends EventEmitter implements OnModuleI SocketActionTypes.CONNECTION_PROCESS_INFO, ConnectionProcessInfo.CONNECTING_TO_COMMUNITY ) + + const callback = async () => { + console.log(`Sending ${SocketActionTypes.TOR_INITIALIZED}`) + this.serverIoProvider.io.emit(SocketActionTypes.TOR_INITIALIZED) + console.log(`Sending ${SocketActionTypes.INITIAL_DIAL}`) + this.libp2pService?.emit(Libp2pEvents.INITIAL_DIAL) + } + await this.runOnTorBootstrap(callback) } private attachTorEventsListeners() { @@ -642,10 +659,9 @@ export class ConnectionsManagerService extends EventEmitter implements OnModuleI this.tor.on(SocketActionTypes.CONNECTION_PROCESS_INFO, data => { this.serverIoProvider.io.emit(SocketActionTypes.CONNECTION_PROCESS_INFO, data) }) - this.tor.on(SocketActionTypes.REDIAL_PEERS, async data => { - this.logger(`Socket - ${SocketActionTypes.REDIAL_PEERS}`) - const peerInfo = this.libp2pService?.getCurrentPeerInfo() - await this.libp2pService?.redialPeers([...peerInfo.connected, ...peerInfo.dialed]) + this.tor.on(SocketActionTypes.INITIAL_DIAL, async () => { + this.logger(`Socket - ${SocketActionTypes.INITIAL_DIAL}`) + this.libp2pService?.emit(Libp2pEvents.INITIAL_DIAL) }) this.socketService.on(SocketActionTypes.CONNECTION_PROCESS_INFO, data => { this.serverIoProvider.io.emit(SocketActionTypes.CONNECTION_PROCESS_INFO, data) @@ -839,4 +855,20 @@ export class ConnectionsManagerService extends EventEmitter implements OnModuleI this.serverIoProvider.io.emit(SocketActionTypes.USER_PROFILES_STORED, payload) }) } + + private async runOnTorBootstrap(callback: () => Promise, intervalTimerMs: number = 2500): Promise { + if (!this.tor.isTorServiceUsed) { + this.logger(`We aren't using the tor service in this client, checking bootstrap status in connection manager`) + this.initializationInterval = setInterval(async () => { + console.log('Checking bootstrap interval') + const bootstrapDone = await this.tor.isBootstrappingFinished() + if (bootstrapDone) { + clearInterval(this.initializationInterval) + await callback() + } + }, intervalTimerMs) + return true + } + return false + } } diff --git a/packages/backend/src/nest/libp2p/libp2p.service.spec.ts b/packages/backend/src/nest/libp2p/libp2p.service.spec.ts index 91d9d177c4..5822a1502a 100644 --- a/packages/backend/src/nest/libp2p/libp2p.service.spec.ts +++ b/packages/backend/src/nest/libp2p/libp2p.service.spec.ts @@ -9,6 +9,7 @@ import { toString as uint8ArrayToString } from 'uint8arrays/to-string' import validator from 'validator' import waitForExpect from 'wait-for-expect' import { DEFAULT_NUM_TRIES, ProcessInChunksService } from './process-in-chunks.service' +import { sleep } from '../common/sleep' describe('Libp2pService', () => { let module: TestingModule @@ -16,7 +17,7 @@ describe('Libp2pService', () => { let params: Libp2pNodeParams let processInChunks: ProcessInChunksService - beforeAll(async () => { + beforeEach(async () => { module = await Test.createTestingModule({ imports: [TestModule, Libp2pModule], }).compile() @@ -26,7 +27,7 @@ describe('Libp2pService', () => { params = await libp2pInstanceParams() }) - afterAll(async () => { + afterEach(async () => { await libp2pService.libp2pInstance?.stop() await module.close() }) @@ -71,14 +72,19 @@ describe('Libp2pService', () => { libp2pService.createLibp2pAddress('onionAddress1.onion', peerId1.toString()), libp2pService.createLibp2pAddress('onionAddress2.onion', peerId2.toString()), ] - await libp2pService.createInstance(params) - // @ts-expect-error processItem is private - const spyOnProcessItem = jest.spyOn(processInChunks, 'processItem') + await libp2pService.createInstance(params, false) expect(libp2pService.libp2pInstance).not.toBeNull() + + // @ts-expect-error processItem is private + const processItemSpy = jest.spyOn(processInChunks, 'processItem') + const dialSpy = jest.spyOn(libp2pService.libp2pInstance!, 'dial') + libp2pService.emit(Libp2pEvents.INITIAL_DIAL, addresses) libp2pService.emit(Libp2pEvents.DIAL_PEERS, addresses) + await waitForExpect(async () => { - expect(spyOnProcessItem).toBeCalledTimes(addresses.length) - }) + expect(processItemSpy).toBeCalledTimes(6) + expect(dialSpy).toBeCalledTimes(3) + }, 30000) }) it(`Do not dial peer on '${Libp2pEvents.DIAL_PEERS}' event if peer was already dialed`, async () => { @@ -90,15 +96,18 @@ describe('Libp2pService', () => { alreadyDialedAddress, libp2pService.createLibp2pAddress('onionAddress2.onion', peerId2.toString()), ] - await libp2pService.createInstance(params) + await libp2pService.createInstance(params, false) expect(libp2pService.libp2pInstance).not.toBeNull() + // @ts-expect-error processItem is private const processItemSpy = jest.spyOn(processInChunks, 'processItem') const dialSpy = jest.spyOn(libp2pService.libp2pInstance!, 'dial') + libp2pService.emit(Libp2pEvents.INITIAL_DIAL, addresses) libp2pService.emit(Libp2pEvents.DIAL_PEERS, addresses) + await waitForExpect(async () => { - expect(processItemSpy).toBeCalledTimes(2 * DEFAULT_NUM_TRIES) - expect(dialSpy).toBeCalledTimes(1) - }) + expect(processItemSpy).toBeCalledTimes(4) + expect(dialSpy).toBeCalledTimes(2) + }, 30000) }) }) diff --git a/packages/backend/src/nest/libp2p/libp2p.service.ts b/packages/backend/src/nest/libp2p/libp2p.service.ts index 47a90e1487..5b552217e6 100644 --- a/packages/backend/src/nest/libp2p/libp2p.service.ts +++ b/packages/backend/src/nest/libp2p/libp2p.service.ts @@ -41,13 +41,22 @@ export class Libp2pService extends EventEmitter { super() } - private dialPeer = async (peerAddress: string) => { - if (this.dialedPeers.has(peerAddress)) { + private dialPeer = async (peerAddress: string): Promise => { + const ma = multiaddr(peerAddress) + const peerId = peerIdFromString(ma.getPeerId()!) + + const libp2pHasPeer = await this.libp2pInstance?.peerStore.has(peerId as any) + const weHaveDialedPeer = this.dialedPeers.has(peerAddress) + + if (weHaveDialedPeer || libp2pHasPeer) { this.logger(`Skipping dial of ${peerAddress} because its already been dialed`) - return + return true } this.dialedPeers.add(peerAddress) - await this.libp2pInstance?.dial(multiaddr(peerAddress)) + const connection = await this.libp2pInstance?.dial(multiaddr(peerAddress)) + + if (connection) return true + return false } public getCurrentPeerInfo = (): Libp2pPeerInfo => { @@ -153,21 +162,22 @@ export class Libp2pService extends EventEmitter { this.processInChunksService.updateQueue(toDial) } - public async createInstance(params: Libp2pNodeParams): Promise { + public async createInstance(params: Libp2pNodeParams, startDialImmediately: boolean = false): Promise { if (this.libp2pInstance) { return this.libp2pInstance } let libp2p: Libp2p + const maxParallelDials = 2 try { libp2p = await createLibp2p({ start: false, connectionManager: { - minConnections: 3, // TODO: increase? + minConnections: 5, // TODO: increase? maxConnections: 20, // TODO: increase? - dialTimeout: 120_000, - maxParallelDials: 10, + dialTimeout: 120000, + maxParallelDials, autoDial: true, // It's a default but let's set it to have explicit information }, peerId: params.peerId, @@ -198,18 +208,26 @@ export class Libp2pService extends EventEmitter { }), ], dht: kadDHT(), - pubsub: gossipsub({ allowPublishToZeroPeers: true }), + pubsub: gossipsub({ + allowPublishToZeroPeers: true, + doPX: true, + }), }) } catch (err) { this.logger.error('Create libp2p:', err) throw err } this.libp2pInstance = libp2p - await this.afterCreation(params.peers, params.peerId) + await this.afterCreation(params.peers, params.peerId, maxParallelDials, startDialImmediately) return libp2p } - private async afterCreation(peers: string[], peerId: PeerId) { + private async afterCreation( + peers: string[], + peerId: PeerId, + maxParallelDials: number, + startDialImmediately: boolean + ) { if (!this.libp2pInstance) { this.logger.error('libp2pInstance was not created') throw new Error('libp2pInstance was not created') @@ -222,9 +240,21 @@ export class Libp2pService extends EventEmitter { this.processInChunksService.updateQueue(nonDialedAddresses) }) + this.on(Libp2pEvents.INITIAL_DIAL, async () => { + this.logger('Starting initial dial') + this.processInChunksService.resume() + }) + this.logger(`Initializing libp2p for ${peerId.toString()}, bootstrapping with ${peers.length} peers`) this.serverIoProvider.io.emit(SocketActionTypes.CONNECTION_PROCESS_INFO, ConnectionProcessInfo.INITIALIZING_LIBP2P) - this.processInChunksService.init([], this.dialPeer) + + this.logger(`Initializing processInChunksService and adding ${peers.length} peers to dial initially`) + this.processInChunksService.init({ + initialData: peers, + processItem: this.dialPeer, + startImmediately: startDialImmediately, + chunkSize: maxParallelDials, + }) this.libp2pInstance.addEventListener('peer:discovery', peer => { this.logger(`${peerId.toString()} discovered ${peer.detail.id}`) @@ -278,8 +308,6 @@ export class Libp2pService extends EventEmitter { this.emit(Libp2pEvents.PEER_DISCONNECTED, peerStat) }) - this.processInChunksService.updateQueue(peers) - this.logger(`Initialized libp2p for peer ${peerId.toString()}`) } diff --git a/packages/backend/src/nest/libp2p/libp2p.types.ts b/packages/backend/src/nest/libp2p/libp2p.types.ts index 34b7056db2..3047a3278b 100644 --- a/packages/backend/src/nest/libp2p/libp2p.types.ts +++ b/packages/backend/src/nest/libp2p/libp2p.types.ts @@ -6,6 +6,7 @@ export enum Libp2pEvents { PEER_DISCONNECTED = 'peerDisconnected', NETWORK_STATS = 'networkStats', DIAL_PEERS = 'dialPeers', + INITIAL_DIAL = 'initialDial', } export interface Libp2pNodeParams { diff --git a/packages/backend/src/nest/libp2p/process-in-chunks.service.ts b/packages/backend/src/nest/libp2p/process-in-chunks.service.ts index f83fd8d218..97c194f176 100644 --- a/packages/backend/src/nest/libp2p/process-in-chunks.service.ts +++ b/packages/backend/src/nest/libp2p/process-in-chunks.service.ts @@ -2,7 +2,7 @@ import { EventEmitter } from 'events' import fastq, { queueAsPromised } from 'fastq' import Logger from '../common/logger' -import { randomUUID } from 'crypto' +import CryptoJS from 'crypto-js' const DEFAULT_CHUNK_SIZE = 10 export const DEFAULT_NUM_TRIES = 2 @@ -13,24 +13,39 @@ type ProcessTask = { taskId: string } +export type ProcessInChunksServiceOptions = { + initialData: T[] + processItem: (arg: T) => Promise + chunkSize?: number | undefined + startImmediately?: boolean +} + export class ProcessInChunksService extends EventEmitter { private isActive: boolean private chunkSize: number private taskQueue: queueAsPromised> private deadLetterQueue: ProcessTask[] = [] - private processItem: (arg: T) => Promise + private runningTaskIds: Set = new Set() + private processItem: (arg: T) => Promise private readonly logger = Logger(ProcessInChunksService.name) constructor() { super() } - public init(data: T[], processItem: (arg: T) => Promise, chunkSize: number = DEFAULT_CHUNK_SIZE) { - this.logger(`Initializing process-in-chunks.service with peers ${JSON.stringify(data, null, 2)}`) - this.processItem = processItem - this.chunkSize = chunkSize + public init(options: ProcessInChunksServiceOptions) { + this.logger(`Initializing process-in-chunks.service with peers ${JSON.stringify(options.initialData, null, 2)}`) + this.processItem = options.processItem + this.chunkSize = options.chunkSize ?? DEFAULT_CHUNK_SIZE this.taskQueue = fastq.promise(this, this.processOneItem, this.chunkSize) - this.isActive = true - this.updateQueue(data) + const startImmediately = options.startImmediately ?? true + if (startImmediately) { + this.logger(`Starting processing immediately`) + this.isActive = true + } else { + this.logger(`Deferring processing`) + this.pause() + } + this.updateQueue(options.initialData) } public updateQueue(items: T[]) { @@ -51,7 +66,11 @@ export class ProcessInChunksService extends EventEmitter { task = itemOrTask as ProcessTask } else { this.logger(`Creating new task for ${itemOrTask}`) - task = { data: itemOrTask as T, tries: 0, taskId: randomUUID() } + task = { data: itemOrTask as T, tries: 0, taskId: this.generateTaskId(itemOrTask as T) } + } + + if (this.isTaskDuplicate(task.taskId)) { + return } if (!this.isActive) { @@ -79,8 +98,7 @@ export class ProcessInChunksService extends EventEmitter { let success: boolean = false try { this.logger(`Processing task ${task.taskId} with data ${task.data}`) - await this.processItem(task.data) - success = true + success = await this.processItem(task.data) } catch (e) { this.logger.error(`Processing task ${task.taskId} with data ${task.data} failed`, e) } finally { @@ -93,7 +111,9 @@ export class ProcessInChunksService extends EventEmitter { this.logger( `Pushing task ${task.taskId} to queue, there will now be ${this.taskQueue.length() + 1} items in the queue` ) + this.runningTaskIds.add(task.taskId) const success = await this.taskQueue.push(task) + this.runningTaskIds.delete(task.taskId) if (success) { this.logger(`Task ${task.taskId} completed successfully`) } else { @@ -102,6 +122,33 @@ export class ProcessInChunksService extends EventEmitter { return success } + private isTaskDuplicate(taskId: string): boolean { + if (!this.isActive) { + this.logger( + 'ProcessInChunksService is not active, adding tasks to the dead letter queue!\n\nWARNING: You must call "resume" on the ProcessInChunksService to process the dead letter queue!!!' + ) + return this.deadLetterQueue.find(thisTask => thisTask.taskId === taskId) != null + } + + if (this.runningTaskIds.has(taskId)) { + this.logger(`Skipping task with ID ${taskId} because there is another task with the same ID currently running.`) + return true + } + + if (this.taskQueue.getQueue().find(thisTask => thisTask.taskId === taskId)) { + this.logger( + `Skipping task with ID ${taskId} because there is another task with the same ID already in the task queue.` + ) + return true + } + + return false + } + + private generateTaskId(data: T): string { + return CryptoJS.MD5(JSON.stringify(data)).toString(CryptoJS.enc.Hex) + } + public resume() { if (this.isActive) { this.logger('ProcessInChunksService is already active') diff --git a/packages/backend/src/nest/libp2p/process-in-chunks.spec.ts b/packages/backend/src/nest/libp2p/process-in-chunks.spec.ts index 24d05dd42f..0075ba3371 100644 --- a/packages/backend/src/nest/libp2p/process-in-chunks.spec.ts +++ b/packages/backend/src/nest/libp2p/process-in-chunks.spec.ts @@ -20,12 +20,13 @@ describe('ProcessInChunks', () => { const mockProcessItem = jest .fn(async a => { console.log('processing', a) + return true }) - .mockResolvedValueOnce() + .mockResolvedValueOnce(true) .mockRejectedValueOnce(new Error('Rejected 1')) - .mockResolvedValueOnce() + .mockResolvedValueOnce(true) .mockRejectedValueOnce(new Error('Rejected 2')) - processInChunks.init(['a', 'b', 'c', 'd'], mockProcessItem) + processInChunks.init({ initialData: ['a', 'b', 'c', 'd'], processItem: mockProcessItem, chunkSize: 10 }) await waitForExpect(() => { expect(mockProcessItem).toBeCalledTimes(6) }) @@ -35,10 +36,11 @@ describe('ProcessInChunks', () => { const mockProcessItem = jest .fn(async a => { console.log('processing', a) + return true }) - .mockResolvedValueOnce() + .mockResolvedValueOnce(true) .mockRejectedValueOnce(new Error('Rejected 1')) - processInChunks.init(['a', 'b'], mockProcessItem) + processInChunks.init({ initialData: ['a', 'b'], processItem: mockProcessItem, chunkSize: 10 }) processInChunks.updateQueue(['e', 'f']) await waitForExpect(() => { expect(mockProcessItem).toBeCalledTimes(5) @@ -49,13 +51,14 @@ describe('ProcessInChunks', () => { const mockProcessItem = jest .fn(async a => { console.log('processing', a) + return true }) - .mockResolvedValueOnce() + .mockResolvedValueOnce(true) .mockRejectedValueOnce(new Error('Rejected 1')) - .mockResolvedValueOnce() + .mockResolvedValueOnce(true) .mockRejectedValueOnce(new Error('Rejected 2')) const chunkSize = 2 - processInChunks.init(['a', 'b', 'c', 'd'], mockProcessItem, chunkSize) + processInChunks.init({ initialData: ['a', 'b', 'c', 'd'], processItem: mockProcessItem, chunkSize }) await sleep(10000) await waitForExpect(() => { expect(mockProcessItem).toBeCalledTimes(6) @@ -63,16 +66,20 @@ describe('ProcessInChunks', () => { }) it('does not process more data if stopped', async () => { - const mockProcessItem = jest.fn(async () => {}) - processInChunks.init([], mockProcessItem) + const mockProcessItem = jest.fn(async () => { + return true + }) + processInChunks.init({ initialData: [], processItem: mockProcessItem, chunkSize: 10 }) processInChunks.pause() processInChunks.updateQueue(['a', 'b', 'c', 'd']) expect(mockProcessItem).not.toBeCalled() }) it('processes tasks after resuming from pause', async () => { - const mockProcessItem = jest.fn(async () => {}) - processInChunks.init([], mockProcessItem) + const mockProcessItem = jest.fn(async () => { + return true + }) + processInChunks.init({ initialData: [], processItem: mockProcessItem, chunkSize: 10 }) processInChunks.pause() processInChunks.updateQueue(['a', 'b', 'c', 'd']) processInChunks.resume() @@ -80,4 +87,23 @@ describe('ProcessInChunks', () => { expect(mockProcessItem).toBeCalledTimes(4) }) }) + + it('processes tasks when deferred', async () => { + const mockProcessItem = jest.fn(async () => { + return true + }) + processInChunks.init({ + initialData: ['a', 'b', 'c', 'd'], + processItem: mockProcessItem, + startImmediately: false, + chunkSize: 10, + }) + await waitForExpect(() => { + expect(mockProcessItem).toBeCalledTimes(0) + }) + processInChunks.resume() + await waitForExpect(() => { + expect(mockProcessItem).toBeCalledTimes(4) + }) + }) }) diff --git a/packages/backend/src/nest/tor/tor.service.tor.spec.ts b/packages/backend/src/nest/tor/tor.service.tor.spec.ts index 1689d1ad21..a6859d0663 100644 --- a/packages/backend/src/nest/tor/tor.service.tor.spec.ts +++ b/packages/backend/src/nest/tor/tor.service.tor.spec.ts @@ -14,9 +14,9 @@ import { sleep } from '../common/sleep' jest.setTimeout(200_000) describe('TorControl', () => { - let module: TestingModule - let torService: Tor - let torControl: TorControl + let module: TestingModule | undefined + let torService: Tor | undefined + let torControl: TorControl | undefined let tmpDir: DirResult let tmpAppDataPath: string @@ -31,47 +31,56 @@ describe('TorControl', () => { imports: [TestModule, TorModule], }) .overrideProvider(TOR_PASSWORD_PROVIDER) - .useValue({ torPassword, torHashedPassword }) - .overrideProvider(TOR_PARAMS_PROVIDER) .useValue({ - torPath: torBinForPlatform(), - options: { - env: { - LD_LIBRARY_PATH: torDirForPlatform(), - HOME: tmpAppDataPath, - }, - detached: true, - }, + torPassword, + torHashedPassword, }) - .overrideProvider(TOR_CONTROL_PARAMS) - .useValue({ - port: defaultConfigForTest.torControlPort, - host: 'localhost', - auth: { - value: torPassword, - type: TorControlAuthType.PASSWORD, - }, - }) - .overrideProvider(QUIET_DIR) - .useValue(tmpAppDataPath) .compile() - - torService = await module.resolve(Tor) - torControl = await module.resolve(TorControl) - torControl.authString = 'AUTHENTICATE ' + torPassword + '\r\n' + // .overrideProvider(TOR_PASSWORD_PROVIDER) + // .useValue({ torPassword, torHashedPassword }) + // .overrideProvider(TOR_PARAMS_PROVIDER) + // .useValue({ + // torPath: torBinForPlatform(), + // options: { + // env: { + // LD_LIBRARY_PATH: torDirForPlatform(), + // HOME: tmpAppDataPath, + // }, + // detached: true, + // }, + // }) + // .overrideProvider(TOR_CONTROL_PARAMS) + // .useValue({ + // port: defaultConfigForTest.torControlPort, + // host: 'localhost', + // auth: { + // value: torPassword, + // type: TorControlAuthType.PASSWORD, + // }, + // }) + // .overrideProvider(QUIET_DIR) + // .useValue(tmpAppDataPath) + // .compile() + + torService = await module!.resolve(Tor) + torControl = await module!.resolve(TorControl) + torControl!.authString = 'AUTHENTICATE ' + torPassword + '\r\n' }) afterEach(async () => { - await torService.kill() tmpDir.removeCallback() removeFilesFromDir(tmpAppDataPath) - torService.clearHangingTorProcess() - await module.close() + torService!.clearHangingTorProcess() + await torService!.kill() + await module!.close() + torService = undefined + torControl = undefined + module = undefined }) it('Init tor', async () => { expect(torService).toBeDefined() - await torService.init() + await torService!.init() }) // it('should detect and kill old tor process before new tor is spawned', async () => { @@ -110,14 +119,14 @@ describe('TorControl', () => { // }) it('spawns new hidden service', async () => { - await torService.init() - const hiddenService = await torService.createNewHiddenService({ targetPort: 4343 }) + await torService!.init() + const hiddenService = await torService!.createNewHiddenService({ targetPort: 4343 }) expect(hiddenService.onionAddress.split('.')[0]).toHaveLength(56) }) it('spawns hidden service using private key', async () => { - await torService.init() - const hiddenServiceOnionAddress = await torService.spawnHiddenService({ + await torService!.init() + const hiddenServiceOnionAddress = await torService!.spawnHiddenService({ targetPort: 4343, privKey: 'ED25519-V3:uCr5t3EcOCwig4cu7pWY6996whV+evrRlI0iIIsjV3uCz4rx46sB3CPq8lXEWhjGl2jlyreomORirKcz9mmcdQ==', }) @@ -125,36 +134,32 @@ describe('TorControl', () => { }) it('tor spawn repeats', async () => { - const spyOnInit = jest.spyOn(torService, 'init') - await torService.init(1000) + const spyOnInit = jest.spyOn(torService!, 'init') + await torService!.init(10000) await sleep(4000) expect(spyOnInit).toHaveBeenCalledTimes(2) }) - it('tor is initializing correctly with 40 seconds timeout', async () => { - await torService.init() - }) - it('creates and destroys hidden service', async () => { - await torService.init() - const hiddenService = await torService.createNewHiddenService({ targetPort: 4343 }) + await torService!.init() + const hiddenService = await torService!.createNewHiddenService({ targetPort: 4343 }) const serviceId = hiddenService.onionAddress.split('.')[0] - const status = await torService.destroyHiddenService(serviceId) + const status = await torService!.destroyHiddenService(serviceId) expect(status).toBe(true) }) it('attempt destroy nonexistent hidden service', async () => { - await torService.init() + await torService!.init() - const status = await torService.destroyHiddenService('u2rg2direy34dj77375h2fbhsc2tvxj752h4tlso64mjnlevcv54oaad') + const status = await torService!.destroyHiddenService('u2rg2direy34dj77375h2fbhsc2tvxj752h4tlso64mjnlevcv54oaad') expect(status).toBe(false) }) it('should find hanging tor processes and kill them', async () => { const processKill = jest.spyOn(process, 'kill') - await torService.init() - const torIds = torService.getTorProcessIds() - torService.clearHangingTorProcess() + await torService!.init() + const torIds = torService!.getTorProcessIds() + torService!.clearHangingTorProcess() expect(processKill).toHaveBeenCalledTimes(torIds.length) // Spawning with {shell:true} starts 2 processes so we need to kill 2 processes }) @@ -162,9 +167,9 @@ describe('TorControl', () => { tmpDir = createTmpDir('quietTest Tmp_') // On MacOS quiet data lands in '(...)/Application Support/(...)' which caused problems with grep tmpAppDataPath = tmpQuietDirPath(tmpDir.name) const processKill = jest.spyOn(process, 'kill') - await torService.init() - const torIds = torService.getTorProcessIds() - torService.clearHangingTorProcess() + await torService!.init() + const torIds = torService!.getTorProcessIds() + torService!.clearHangingTorProcess() expect(processKill).toHaveBeenCalledTimes(torIds.length) // Spawning with {shell:true} starts 2 processes so we need to kill 2 processes }) }) diff --git a/packages/backend/src/nest/tor/tor.service.ts b/packages/backend/src/nest/tor/tor.service.ts index 6b739f0572..0bffcd4e81 100644 --- a/packages/backend/src/nest/tor/tor.service.ts +++ b/packages/backend/src/nest/tor/tor.service.ts @@ -12,9 +12,10 @@ import { TorControl } from './tor-control.service' import { GetInfoTorSignal, HiddenServiceData, TorParams, TorParamsProvider, TorPasswordProvider } from './tor.types' import Logger from '../common/logger' +import { sleep } from '../common/sleep' export class Tor extends EventEmitter implements OnModuleInit { - socksPort: number + socksPort: number | undefined process: child_process.ChildProcessWithoutNullStreams | null = null torDataDirectory: string torPidPath: string @@ -22,9 +23,12 @@ export class Tor extends EventEmitter implements OnModuleInit { controlPort: number | undefined interval: any initTimeout: any + public isTorInitialized: boolean = false + public isTorServiceUsed: boolean = false private readonly logger = Logger(Tor.name) private hiddenServices: Map = new Map() private initializedHiddenServices: Map = new Map() + constructor( @Inject(CONFIG_OPTIONS) public configOptions: ConfigOptions, @Inject(QUIET_DIR) public readonly quietDir: string, @@ -36,11 +40,16 @@ export class Tor extends EventEmitter implements OnModuleInit { super() this.controlPort = configOptions.torControlPort - console.log('QUIET DIR', this.quietDir) + this.logger('Created tor service') + this.logger('QUIET DIR', this.quietDir) } async onModuleInit() { - if (!this.torParamsProvider.torPath) return + this.logger('Running onModuleInit in tor.service') + if (!this.torParamsProvider.torPath) { + console.warn('No tor binary path, not running the tor service') + return + } await this.init() } @@ -59,7 +68,7 @@ export class Tor extends EventEmitter implements OnModuleInit { return Array.from(Object.entries(this.extraTorProcessParams)).flat() } - private async isBootstrappingFinished(): Promise { + public async isBootstrappingFinished(): Promise { this.logger('Checking bootstrap status') const output = await this.torControl.sendCommand('GETINFO status/bootstrap-phase') if (output.messages[0] === '250-status/bootstrap-phase=NOTICE BOOTSTRAP PROGRESS=100 TAG=done SUMMARY="Done"') { @@ -69,80 +78,96 @@ export class Tor extends EventEmitter implements OnModuleInit { return false } - public async init(timeout = 120_000): Promise { - if (!this.socksPort) this.socksPort = await getPort() + public async init(timeout: number = 120_000): Promise { + this.isTorServiceUsed = true + if (!this.socksPort) { + this.logger('Getting new socks port') + this.socksPort = await getPort() + } this.logger('Initializing tor...') + await this._init(timeout) + } - return await new Promise((resolve, reject) => { - if (!fs.existsSync(this.quietDir)) { - this.logger("Quiet dir doesn't exist, creating it now") - fs.mkdirSync(this.quietDir) - } - - this.torDataDirectory = path.join.apply(null, [this.quietDir, 'TorDataDirectory']) - this.torPidPath = path.join.apply(null, [this.quietDir, 'torPid.json']) - let oldTorPid: number | null = null - if (fs.existsSync(this.torPidPath)) { - const file = fs.readFileSync(this.torPidPath) - oldTorPid = Number(file.toString()) - this.logger(`${this.torPidPath} exists. Old tor pid: ${oldTorPid}`) - } - - this.initTimeout = setTimeout(async () => { - this.logger('Checking init timeout') - const bootstrapDone = await this.isBootstrappingFinished() - if (!bootstrapDone) { - this.initializedHiddenServices = new Map() - clearInterval(this.interval) - await this.init() + private async _init(timeout: number) { + try { + return await new Promise((resolve, reject) => { + if (!fs.existsSync(this.quietDir)) { + this.logger("Quiet dir doesn't exist, creating it now") + fs.mkdirSync(this.quietDir) } - }, timeout) - const tryToSpawnTor = async () => { - if (oldTorPid != null) { - this.logger(`Clearing out old tor process with pid ${oldTorPid}`) - this.clearOldTorProcess(oldTorPid) + this.torDataDirectory = path.join.apply(null, [this.quietDir, 'TorDataDirectory']) + this.torPidPath = path.join.apply(null, [this.quietDir, 'torPid.json']) + let oldTorPid: number | null = null + if (fs.existsSync(this.torPidPath)) { + const file = fs.readFileSync(this.torPidPath) + oldTorPid = Number(file.toString()) + this.logger(`${this.torPidPath} exists. Old tor pid: ${oldTorPid}`) } - try { - this.logger('Clearing out hanging tor process(es)') - this.clearHangingTorProcess() - } catch (e) { - this.logger('Error occured while trying to clear hanging tor processes', e) - } + this.initTimeout = setTimeout(async () => { + this.logger('Checking init timeout') + const bootstrapDone = await this.isBootstrappingFinished() + if (!bootstrapDone) { + this.initializedHiddenServices = new Map() + clearInterval(this.interval) + reject(new Error(`Failed to initialize in timeout of ${timeout}ms`)) + } + }, timeout) - try { - this.logger('Spawning new tor process(es)') - await this.spawnTor() - - this.interval = setInterval(async () => { - this.logger('Checking bootstrap interval') - const bootstrapDone = await this.isBootstrappingFinished() - if (bootstrapDone) { - this.logger(`Sending ${SocketActionTypes.TOR_INITIALIZED}`) - this.serverIoProvider.io.emit(SocketActionTypes.TOR_INITIALIZED) - // TODO: Figure out how to get redialing (or, ideally, initial dialing) on tor initialization working - // this.logger('Attempting to redial peers (if possible)') - // this.emit(SocketActionTypes.REDIAL_PEERS) - clearInterval(this.interval) - } - }, 2500) + const tryToSpawnTor = async () => { + if (oldTorPid != null) { + this.logger(`Clearing out old tor process with pid ${oldTorPid}`) + this.clearOldTorProcess(oldTorPid) + } + + try { + this.logger('Clearing out hanging tor process(es)') + this.clearHangingTorProcess() + } catch (e) { + this.logger('Error occured while trying to clear hanging tor processes', e) + } - this.logger(`Spawned tor with pid(s): ${this.getTorProcessIds()}`) + try { + this.logger('Spawning new tor process(es)') + await this.spawnTor() + this.logger(`Spawned tor with pid(s): ${this.getTorProcessIds()}`) - resolve() - } catch (e) { - this.logger('Killing tor due to error', e) - this.clearHangingTorProcess() - removeFilesFromDir(this.torDataDirectory) + await sleep(10000) + if (this.interval) { + clearInterval(this.interval) + this.interval = undefined + } + this.interval = setInterval(async () => { + this.logger('Checking bootstrap interval') + const bootstrapDone = await this.isBootstrappingFinished() + if (bootstrapDone) { + this.isTorInitialized = true + this.logger(`Sending ${SocketActionTypes.TOR_INITIALIZED}`) + this.serverIoProvider.io.emit(SocketActionTypes.TOR_INITIALIZED) + this.logger(`Sending ${SocketActionTypes.INITIAL_DIAL}`) + this.emit(SocketActionTypes.INITIAL_DIAL) + clearInterval(this.interval) + resolve() + } + }, 5000) + // resolve() + } catch (e) { + this.logger('Killing tor due to error', e) + this.clearHangingTorProcess() + removeFilesFromDir(this.torDataDirectory) - // eslint-disable-next-line - process.nextTick(tryToSpawnTor) + // eslint-disable-next-line + process.nextTick(tryToSpawnTor) + } } - } - tryToSpawnTor() - }) + tryToSpawnTor() + }) + } catch (e) { + this.logger.error(`Initialization failed due to error: ${e.message}, retrying...`) + await this.init() + } } public resetHiddenServices() { @@ -214,7 +239,11 @@ export class Tor extends EventEmitter implements OnModuleInit { try { process.kill(oldTorPid, 'SIGTERM') } catch (e) { - this.logger.error(`Tried killing old tor process. Failed. Reason: ${e.message}`) + if ((e as Error).message.includes('ESRCH')) { + this.logger(`Tor process with PID ${oldTorPid} was already closed`) + } else { + this.logger.error(`Tried killing old tor process. Failed`, e) + } } } else { this.logger(`Deleting ${this.torPidPath}`) @@ -248,7 +277,7 @@ export class Tor extends EventEmitter implements OnModuleInit { this.torParamsProvider.torPath, [ '--SocksPort', - this.socksPort.toString(), + this.socksPort!.toString(), '--HTTPTunnelPort', this.configOptions.httpTunnelPort?.toString(), '--ControlPort', @@ -381,15 +410,17 @@ export class Tor extends EventEmitter implements OnModuleInit { } public async kill(): Promise { + this.socksPort = undefined return await new Promise((resolve, reject) => { + this.logger('Clearing timeout and interval, if not null') + if (this.initTimeout) clearTimeout(this.initTimeout) + if (this.interval) clearInterval(this.interval) this.logger('Killing tor... with pid', this.process?.pid) if (this.process === null) { this.logger('TOR: Process is not initalized.') resolve() return } - if (this.initTimeout) clearTimeout(this.initTimeout) - if (this.interval) clearInterval(this.interval) this.process?.on('close', () => { this.process = null resolve() diff --git a/packages/backend/src/nest/websocketOverTor/index.ts b/packages/backend/src/nest/websocketOverTor/index.ts index c0b1160214..ef17b19e79 100644 --- a/packages/backend/src/nest/websocketOverTor/index.ts +++ b/packages/backend/src/nest/websocketOverTor/index.ts @@ -18,6 +18,7 @@ import { type ServerOptions, type WebSocketServer as ItWsWebsocketServer } from import { multiaddr } from '@multiformats/multiaddr' import { type MultiaddrConnection, type Connection } from '@libp2p/interface-connection' import logger from '../common/logger' +import { DuplexWebSocket } from 'it-ws/dist/src/duplex' const log = logger('libp2p:websockets') @@ -73,10 +74,11 @@ export class WebSockets extends EventEmitter { async dial(ma: Multiaddr, options: DialOptions) { let conn: Connection - let socket + let socket: DuplexWebSocket let maConn: MultiaddrConnection try { + log(`Connecting socket with ${ma.toString()}`) socket = await this._connect(ma, { websocket: { ...this._websocketOpts, @@ -88,6 +90,7 @@ export class WebSockets extends EventEmitter { throw e } try { + log(`Creating multiaddr connection from socket with ${ma.toString()}`) maConn = socketToMaConn(socket, ma, { signal: options.signal }) log('new outbound connection %s', maConn.remoteAddr) } catch (e) { @@ -96,6 +99,7 @@ export class WebSockets extends EventEmitter { } try { + log(`Upgrading outbound connection with ${maConn.remoteAddr.toString()}`) conn = await options.upgrader.upgradeOutbound(maConn) log('outbound connection %s upgraded', maConn.remoteAddr) return conn @@ -120,6 +124,7 @@ export class WebSockets extends EventEmitter { const myUri = `${toUri(ma)}/?remoteAddress=${encodeURIComponent(this.localAddress)}` + log(`Creating raw socket connection to ${ma.toString()}`) const rawSocket = connect(myUri, Object.assign({ binary: true }, options)) if (rawSocket.socket.on) { @@ -129,6 +134,7 @@ export class WebSockets extends EventEmitter { } if (!options.signal) { + log(`Waiting for socket connection to ${ma.toString()} with no abort signal`) await Promise.race([rawSocket.connected(), errorPromise.promise]) log(`${this.localAddress} connected %s`, ma) @@ -155,6 +161,7 @@ export class WebSockets extends EventEmitter { }) try { + log(`Waiting for socket connection to ${ma.toString()}`) await Promise.race([abort, errorPromise.promise, rawSocket.connected()]) } finally { options.signal.removeEventListener('abort', onAbort) @@ -200,8 +207,10 @@ export class WebSockets extends EventEmitter { if (!query.remoteAddress) return const remoteAddress = query.remoteAddress.toString() + const ma = multiaddr(remoteAddress) try { - maConn = socketToMaConn(stream, multiaddr(remoteAddress)) + log(`Creating multiaddr connection for inbound peer ${ma.toString()}`) + maConn = socketToMaConn(stream, ma) const peer = { id: PeerId.createFromB58String(remoteAddress.split('/p2p/')[1]), multiaddrs: [maConn.remoteAddr], @@ -214,6 +223,7 @@ export class WebSockets extends EventEmitter { } try { + log(`Upgrading inbound connection with ${maConn.remoteAddr.toString()}`) conn = await upgrader.upgradeInbound(maConn) } catch (err) { log.error('inbound connection failed to upgrade', err) diff --git a/packages/desktop/package.json b/packages/desktop/package.json index 7d4907220d..33fc5c7d46 100644 --- a/packages/desktop/package.json +++ b/packages/desktop/package.json @@ -116,7 +116,7 @@ "build:renderer:prod": "webpack --config webpack/webpack.config.renderer.prod.js", "postBuild": "node scripts/postBuild.js", "prestart": "npm run build:main", - "start": "cross-env DEBUG='backend*,quiet*,state-manager*,desktop*,utils*,libp2p:websockets:listener:backend,libp2p:connection-manager:auto-dialler' npm run start:renderer", + "start": "cross-env DEBUG='backend*,quiet*,state-manager*,desktop*,utils*,libp2p:websockets:listener:backend,libp2p:connection-manager:auto-dialler,libp2p:pnet,libp2p:upgrader' npm run start:renderer", "start:main": "cross-env NODE_ENV=development electron .", "start:renderer": "cross-env NODE_ENV=development webpack-dev-server --config webpack/webpack.config.renderer.dev.js", "storybook": "export NODE_OPTIONS=--openssl-legacy-provider && start-storybook -p 6006", diff --git a/packages/mobile/ios/NodeJsMobile/NodeRunner.mm b/packages/mobile/ios/NodeJsMobile/NodeRunner.mm index 35f909e793..8ee169bbf9 100644 --- a/packages/mobile/ios/NodeJsMobile/NodeRunner.mm +++ b/packages/mobile/ios/NodeJsMobile/NodeRunner.mm @@ -205,7 +205,7 @@ - (void) startEngineWithArguments:(NSArray*)arguments:(NSString*)builtinModulesP nodePath = [nodePath stringByAppendingString:builtinModulesPath]; } setenv([@"NODE_PATH" UTF8String], (const char*)[nodePath UTF8String], 1); - setenv([@"DEBUG" UTF8String], "backend:*,state-manager:*,libp2p:pnet", 1); + setenv([@"DEBUG" UTF8String], "backend:*,state-manager:*,libp2p:websockets:listener:backend,libp2p:connection-manager:auto-dialler,libp2p:pnet,libp2p:upgrader", 1); int c_arguments_size=0; diff --git a/packages/mobile/ios/Quiet/Info.plist b/packages/mobile/ios/Quiet/Info.plist index 4cfb2bd721..a274246b08 100644 --- a/packages/mobile/ios/Quiet/Info.plist +++ b/packages/mobile/ios/Quiet/Info.plist @@ -36,7 +36,7 @@ CFBundleVersion 376 ITSAppUsesNonExemptEncryption - + LSRequiresIPhoneOS LSSupportsOpeningDocumentsInPlace @@ -44,15 +44,15 @@ NSAppTransportSecurity NSAllowsArbitraryLoads - + NSAllowsLocalNetworking - + NSExceptionDomains localhost NSExceptionAllowsInsecureHTTPLoads - + @@ -100,6 +100,6 @@ UIInterfaceOrientationLandscapeRight UIViewControllerBasedStatusBarAppearance - + diff --git a/packages/types/src/socket.ts b/packages/types/src/socket.ts index 4fcb44cfdb..07a8b77e1d 100644 --- a/packages/types/src/socket.ts +++ b/packages/types/src/socket.ts @@ -71,7 +71,7 @@ export enum SocketActionTypes { PEER_CONNECTED = 'peerConnected', PEER_DISCONNECTED = 'peerDisconnected', TOR_INITIALIZED = 'torInitialized', - REDIAL_PEERS = 'redialPeers', + INITIAL_DIAL = 'initialDial', // ====== Misc ======