Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve connection speed/reliability and add more logging around connections #2542

Open
wants to merge 13 commits into
base: 2.2.0
Choose a base branch
from
Open
90 changes: 90 additions & 0 deletions packages/backend/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions packages/backend/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@
"@quiet/logger": "^2.0.2-alpha.0",
"@quiet/types": "^2.0.2-alpha.1",
"abortable-iterator": "^3.0.0",
"bufferutil": "^4.0.8",
"class-transformer": "^0.5.1",
"class-validator": "^0.13.1",
"cli-table": "^0.3.6",
Expand Down Expand Up @@ -139,6 +140,7 @@
"socks-proxy-agent": "^5.0.0",
"string-replace-loader": "3.1.0",
"ts-jest-resolver": "^2.0.0",
"utf-8-validate": "^6.0.4",
"validator": "^13.11.0"
},
"overrides": {
Expand Down
5 changes: 3 additions & 2 deletions packages/backend/src/nest/app.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,9 @@ export class AppModule {
allowedHeaders: ['authorization'],
credentials: true,
},
pingInterval: 1000_000,
pingTimeout: 1000_000,
pingInterval: 10_000,
pingTimeout: 2_000,
connectTimeout: 60_000,
})
io.engine.use((req, res, next) => {
const authHeader = req.headers['authorization']
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ import { emitError } from '../socket/socket.errors'
import { createLibp2pAddress, isPSKcodeValid } from '@quiet/common'
import { CertFieldsTypes, createRootCA, getCertFieldValue, loadCertificate } from '@quiet/identity'
import { DateTime } from 'luxon'
import { platform } from 'os'

@Injectable()
export class ConnectionsManagerService extends EventEmitter implements OnModuleInit {
Expand All @@ -75,6 +76,7 @@ export class ConnectionsManagerService extends EventEmitter implements OnModuleI
private ports: GetPorts
isTorInit: TorInitState = TorInitState.NOT_STARTED
private peerInfo: Libp2pPeerInfo | undefined = undefined
private initializationInterval: NodeJS.Timer

private readonly logger = Logger(ConnectionsManagerService.name)
constructor(
Expand All @@ -93,6 +95,7 @@ export class ConnectionsManagerService extends EventEmitter implements OnModuleI
}

async onModuleInit() {
this.logger('Initializing connection manager')
process.on('unhandledRejection', error => {
console.error(error)
throw new Error()
Expand Down Expand Up @@ -264,6 +267,11 @@ export class ConnectionsManagerService extends EventEmitter implements OnModuleI
this.logger('Resuming!')
await this.openSocket()
const peersToDial = await this.getPeersOnResume()
const callback = async () => {
this.logger('Bootstrapping is finished')
this.libp2pService?.resume(peersToDial)
}
if (await this.runOnTorBootstrap(callback)) return
this.libp2pService?.resume(peersToDial)
}

Expand Down Expand Up @@ -578,7 +586,8 @@ export class ConnectionsManagerService extends EventEmitter implements OnModuleI
peers: peers ? peers.slice(1) : [],
psk: Libp2pService.generateLibp2pPSK(community.psk).fullKey,
}
await this.libp2pService.createInstance(params)
const startDialImmediately = this.tor.isTorInitialized
await this.libp2pService.createInstance(params, startDialImmediately)

// Libp2p event listeners
this.libp2pService.on(Libp2pEvents.PEER_CONNECTED, async (payload: { peers: string[] }) => {
Expand Down Expand Up @@ -634,6 +643,14 @@ export class ConnectionsManagerService extends EventEmitter implements OnModuleI
SocketActionTypes.CONNECTION_PROCESS_INFO,
ConnectionProcessInfo.CONNECTING_TO_COMMUNITY
)

const callback = async () => {
console.log(`Sending ${SocketActionTypes.TOR_INITIALIZED}`)
this.serverIoProvider.io.emit(SocketActionTypes.TOR_INITIALIZED)
console.log(`Sending ${SocketActionTypes.INITIAL_DIAL}`)
this.libp2pService?.emit(Libp2pEvents.INITIAL_DIAL)
}
await this.runOnTorBootstrap(callback)
}

private attachTorEventsListeners() {
Expand All @@ -642,10 +659,9 @@ export class ConnectionsManagerService extends EventEmitter implements OnModuleI
this.tor.on(SocketActionTypes.CONNECTION_PROCESS_INFO, data => {
this.serverIoProvider.io.emit(SocketActionTypes.CONNECTION_PROCESS_INFO, data)
})
this.tor.on(SocketActionTypes.REDIAL_PEERS, async data => {
this.logger(`Socket - ${SocketActionTypes.REDIAL_PEERS}`)
const peerInfo = this.libp2pService?.getCurrentPeerInfo()
await this.libp2pService?.redialPeers([...peerInfo.connected, ...peerInfo.dialed])
this.tor.on(SocketActionTypes.INITIAL_DIAL, async () => {
this.logger(`Socket - ${SocketActionTypes.INITIAL_DIAL}`)
this.libp2pService?.emit(Libp2pEvents.INITIAL_DIAL)
})
this.socketService.on(SocketActionTypes.CONNECTION_PROCESS_INFO, data => {
this.serverIoProvider.io.emit(SocketActionTypes.CONNECTION_PROCESS_INFO, data)
Expand Down Expand Up @@ -840,4 +856,20 @@ export class ConnectionsManagerService extends EventEmitter implements OnModuleI
this.serverIoProvider.io.emit(SocketActionTypes.USER_PROFILES_STORED, payload)
})
}

private async runOnTorBootstrap(callback: () => Promise<any>, intervalTimerMs: number = 2500): Promise<boolean> {
if (!this.tor.isTorServiceUsed) {
this.logger(`We aren't using the tor service in this client, checking bootstrap status in connection manager`)
this.initializationInterval = setInterval(async () => {
console.log('Checking bootstrap interval')
const bootstrapDone = await this.tor.isBootstrappingFinished()
if (bootstrapDone) {
clearInterval(this.initializationInterval)
await callback()
}
}, intervalTimerMs)
return true
}
return false
}
}
31 changes: 20 additions & 11 deletions packages/backend/src/nest/libp2p/libp2p.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,15 @@ import { toString as uint8ArrayToString } from 'uint8arrays/to-string'
import validator from 'validator'
import waitForExpect from 'wait-for-expect'
import { DEFAULT_NUM_TRIES, ProcessInChunksService } from './process-in-chunks.service'
import { sleep } from '../common/sleep'

describe('Libp2pService', () => {
let module: TestingModule
let libp2pService: Libp2pService
let params: Libp2pNodeParams
let processInChunks: ProcessInChunksService<string>

beforeAll(async () => {
beforeEach(async () => {
module = await Test.createTestingModule({
imports: [TestModule, Libp2pModule],
}).compile()
Expand All @@ -26,7 +27,7 @@ describe('Libp2pService', () => {
params = await libp2pInstanceParams()
})

afterAll(async () => {
afterEach(async () => {
await libp2pService.libp2pInstance?.stop()
await module.close()
})
Expand Down Expand Up @@ -71,14 +72,19 @@ describe('Libp2pService', () => {
libp2pService.createLibp2pAddress('onionAddress1.onion', peerId1.toString()),
libp2pService.createLibp2pAddress('onionAddress2.onion', peerId2.toString()),
]
await libp2pService.createInstance(params)
// @ts-expect-error processItem is private
const spyOnProcessItem = jest.spyOn(processInChunks, 'processItem')
await libp2pService.createInstance(params, false)
expect(libp2pService.libp2pInstance).not.toBeNull()

// @ts-expect-error processItem is private
const processItemSpy = jest.spyOn(processInChunks, 'processItem')
const dialSpy = jest.spyOn(libp2pService.libp2pInstance!, 'dial')
libp2pService.emit(Libp2pEvents.INITIAL_DIAL, addresses)
libp2pService.emit(Libp2pEvents.DIAL_PEERS, addresses)

await waitForExpect(async () => {
expect(spyOnProcessItem).toBeCalledTimes(addresses.length)
})
expect(processItemSpy).toBeCalledTimes(6)
expect(dialSpy).toBeCalledTimes(3)
}, 30000)
})

it(`Do not dial peer on '${Libp2pEvents.DIAL_PEERS}' event if peer was already dialed`, async () => {
Expand All @@ -90,15 +96,18 @@ describe('Libp2pService', () => {
alreadyDialedAddress,
libp2pService.createLibp2pAddress('onionAddress2.onion', peerId2.toString()),
]
await libp2pService.createInstance(params)
await libp2pService.createInstance(params, false)
expect(libp2pService.libp2pInstance).not.toBeNull()

// @ts-expect-error processItem is private
const processItemSpy = jest.spyOn(processInChunks, 'processItem')
const dialSpy = jest.spyOn(libp2pService.libp2pInstance!, 'dial')
libp2pService.emit(Libp2pEvents.INITIAL_DIAL, addresses)
libp2pService.emit(Libp2pEvents.DIAL_PEERS, addresses)

await waitForExpect(async () => {
expect(processItemSpy).toBeCalledTimes(2 * DEFAULT_NUM_TRIES)
expect(dialSpy).toBeCalledTimes(1)
})
expect(processItemSpy).toBeCalledTimes(4)
expect(dialSpy).toBeCalledTimes(2)
}, 30000)
})
})
Loading
Loading