diff --git a/packages/backend/package.json b/packages/backend/package.json index bd70c5324..b7baac652 100644 --- a/packages/backend/package.json +++ b/packages/backend/package.json @@ -13,7 +13,7 @@ "webpack": "webpack --env mode=development && cp ./lib/bundle.cjs ../backend-bundle/bundle.cjs", "webpack:prod": "webpack --env mode=production && cp ./lib/bundle.cjs ../backend-bundle/bundle.cjs", "postinstall": "npm run applyPatches", - "applyPatches": "patch -f -p0 < ./patch/electron-fetch.patch || true && patch -f -p0 --forward --binary < ./patch/parse-duration.patch || true && patch -f -p0 --forward --binary < ./patch/parse-duration-esm.patch || true && patch -f -p0 < ./patch/ipfs-pubsub-peer-monitor.patch || true", + "applyPatches": "patch -f -p0 < ./patch/electron-fetch.patch || true && patch -f -p0 --forward --binary < ./patch/parse-duration.patch || true && patch -f -p0 --forward --binary < ./patch/parse-duration-esm.patch || true && patch -f -p0 < ./patch/ipfs-pubsub-peer-monitor.patch || true && patch --forward -p0 node_modules/@libp2p/kad-dht/dist/src/providers.js < ./patch/libp2p-kaddht-providers.patch || true && patch --forward -p0 node_modules/@helia/block-brokers/node_modules/ipfs-bitswap/dist/src/bitswap.js < ./patch/bitswap.patch || true", "prepare": "npm run webpack", "version": "git add -A src", "lint:no-fix": "eslint --ext .jsx,.js,.ts,.tsx ./src/", diff --git a/packages/backend/src/nest/ipfs-file-manager/ipfs-file-manager.service.ts b/packages/backend/src/nest/ipfs-file-manager/ipfs-file-manager.service.ts index 0d56b98ef..1863491fb 100644 --- a/packages/backend/src/nest/ipfs-file-manager/ipfs-file-manager.service.ts +++ b/packages/backend/src/nest/ipfs-file-manager/ipfs-file-manager.service.ts @@ -4,7 +4,7 @@ import fs from 'fs' import path from 'path' import crypto from 'crypto' import { GetBlockProgressEvents, type Helia } from 'helia' -import { AddEvents, CatOptions, GetEvents, unixfs, type UnixFS } from '@helia/unixfs' +import { AddEvents, CatOptions, GetEvents, unixfs, UnixFSStats, type UnixFS } from '@helia/unixfs' import { promisify } from 'util' import sizeOf from 'image-size' import { CID } from 'multiformats/cid' @@ -19,6 +19,7 @@ const { createPaths, compare } = await import('../common/utils') import { createLogger } from '../common/logger' import { IpfsService } from '../ipfs/ipfs.service' import { CustomProgressEvent } from 'progress-events' +import { file } from 'mock-fs/lib/filesystem' // 1048576 is the number of bytes in a block uploaded via unixfs // Reference: packages/backend/node_modules/@helia/unixfs/src/commands/add.ts @@ -265,8 +266,15 @@ export class IpfsFileManagerService extends EventEmitter { this.controllers.set(fileMetadata.cid, { controller }) - // Add try catch and return downloadBlocks with timeout - const initialStat = await this.ufs.stat(fileCid, { signal: controller.signal }) + let initialStat: UnixFSStats | undefined = undefined + try { + initialStat = await this.ufs.stat(fileCid, { signal: controller.signal }) + } catch (e) { + _logger.error(`Error while performing initial stat`, e) + await this.cancelDownload(fileCid.toString()) + return + } + const fileSize = initialStat.fileSize const localSize = initialStat.localFileSize if (fileMetadata.size && !compare(fileMetadata.size, fileSize, 0.05)) { @@ -472,7 +480,14 @@ export class IpfsFileManagerService extends EventEmitter { } while (downloading && !controller.signal.aborted) { - const stat = await this.ufs.stat(fileCid) + let stat: UnixFSStats | undefined = undefined + try { + stat = await this.ufs.stat(fileCid) + } catch (e) { + _logger.error(`Error while running stat`, e) + continue + } + const totalSize = Number(stat.fileSize) const downloadedSize = Number(stat.localFileSize) if (offset >= totalSize) { @@ -503,6 +518,9 @@ export class IpfsFileManagerService extends EventEmitter { downloading = false break } + + _logger.error(`Error while catting file`, e) + continue } offset += DEFAULT_CAT_BLOCK_CHUNK_SIZE } @@ -528,6 +546,9 @@ export class IpfsFileManagerService extends EventEmitter { } catch (e) { if (controller.signal.aborted) { _logger.warn(`Cancelling download`) + } else { + _logger.error(`Error while catting to write blocks out to local file`, e) + await this.cancelDownload(fileCid.toString()) } } } @@ -545,6 +566,8 @@ export class IpfsFileManagerService extends EventEmitter { const fileState = this.files.get(fileMetadata.cid) if (!fileState && !controller.signal.aborted) { this.logger.error(`No saved data for file cid ${fileMetadata.cid}`) + await this.updateStatus(fileMetadata.cid, DownloadState.Canceled) + this.controllers.delete(fileMetadata.cid) return } diff --git a/packages/backend/src/nest/libp2p/libp2p.service.ts b/packages/backend/src/nest/libp2p/libp2p.service.ts index 16a6be343..92c9576bf 100644 --- a/packages/backend/src/nest/libp2p/libp2p.service.ts +++ b/packages/backend/src/nest/libp2p/libp2p.service.ts @@ -260,6 +260,9 @@ export class Libp2pService extends EventEmitter { outboundUpgradeTimeout: 30_000, protocolNegotiationTimeout: 10_000, maxDialQueueLength: 500, + reconnectRetryInterval: 5_000, + reconnectRetries: 50, + reconnectBackoffFactor: 1.5, }, privateKey: params.peerId.privKey, addresses: { listen: params.listenAddresses }, @@ -285,6 +288,7 @@ export class Libp2pService extends EventEmitter { handshakeTimeout: 15_000, ciphers: WEBSOCKET_CIPHER_SUITE, followRedirects: true, + sessionTimeout: 600, }, localAddress: params.localAddress, targetPort: params.targetPort, @@ -292,7 +296,7 @@ export class Libp2pService extends EventEmitter { }), ], services: { - ping: ping({ timeout: 30_000 }), + ping: ping({ timeout: 30_000, runOnLimitedConnection: false, maxInboundStreams: 60, maxOutboundStreams: 60 }), pubsub: gossipsub({ // neccessary to run a single peer allowPublishToZeroTopicPeers: true, @@ -306,8 +310,28 @@ export class Libp2pService extends EventEmitter { keychain: keychain(), dht: kadDHT({ allowQueryWithZeroPeers: true, - clientMode: false, + clientMode: true, initialQuerySelfInterval: 500, + providers: { + cacheSize: 1024, + }, + maxInboundStreams: 128, + maxOutboundStreams: 128, + networkDialTimeout: { + minTimeout: 30_000, + timeoutMultiplier: 1.05, + failureMultiplier: 1.1, + }, + pingNewContactTimeout: { + minTimeout: 30_000, + timeoutMultiplier: 1.05, + failureMultiplier: 1.1, + }, + pingOldContactTimeout: { + minTimeout: 30_000, + timeoutMultiplier: 1.02, + failureMultiplier: 1.15, + }, }), }, }) diff --git a/packages/backend/src/nest/websocketOverTor/constants.ts b/packages/backend/src/nest/websocketOverTor/constants.ts index d6c510687..6d5890786 100644 --- a/packages/backend/src/nest/websocketOverTor/constants.ts +++ b/packages/backend/src/nest/websocketOverTor/constants.ts @@ -10,4 +10,4 @@ export const CODE_WS = 477 export const CODE_WSS = 478 // Time to wait for a connection to close gracefully before destroying it manually -export const CLOSE_TIMEOUT = 500 +export const CLOSE_TIMEOUT = 5000 diff --git a/packages/backend/src/nest/websocketOverTor/socket-to-conn.ts b/packages/backend/src/nest/websocketOverTor/socket-to-conn.ts index 29cdea4a1..f8d6b0aa1 100644 --- a/packages/backend/src/nest/websocketOverTor/socket-to-conn.ts +++ b/packages/backend/src/nest/websocketOverTor/socket-to-conn.ts @@ -6,6 +6,7 @@ import { CLOSE_TIMEOUT } from './constants' import type { AbortOptions, ComponentLogger, CounterGroup, MultiaddrConnection } from '@libp2p/interface' import type { Multiaddr } from '@multiformats/multiaddr' import type { DuplexWebSocket } from 'it-ws/duplex' +import { CloseEvent, ErrorEvent } from 'ws' export interface SocketToConnOptions { localAddr?: Multiaddr @@ -21,7 +22,7 @@ export function socketToMaConn( remoteAddr: Multiaddr, options: SocketToConnOptions ): MultiaddrConnection { - const log = options.logger.forComponent('libp2p:websockets:maconn') + const log = options.logger.forComponent(`libp2p:websockets:maconn:${remoteAddr.getPeerId()}`) const metrics = options.metrics const metricPrefix = options.metricPrefix ?? '' @@ -101,9 +102,14 @@ export function socketToMaConn( }, } + stream.socket.addEventListener('error', (event: ErrorEvent) => { + log.error(`Error on socket: ${event.message}`, event.error) + }) + stream.socket.addEventListener( 'close', - () => { + (event: CloseEvent) => { + log(`Closing socket`, JSON.stringify(event)) metrics?.increment({ [`${metricPrefix}close`]: true }) // In instances where `close` was not explicitly called,