Skip to content

Commit

Permalink
Testing changes
Browse files Browse the repository at this point in the history
  • Loading branch information
islathehut committed Jan 9, 2025
1 parent ddd7213 commit 3e52413
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 10 deletions.
2 changes: 1 addition & 1 deletion packages/backend/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
"webpack": "webpack --env mode=development && cp ./lib/bundle.cjs ../backend-bundle/bundle.cjs",
"webpack:prod": "webpack --env mode=production && cp ./lib/bundle.cjs ../backend-bundle/bundle.cjs",
"postinstall": "npm run applyPatches",
"applyPatches": "patch -f -p0 < ./patch/electron-fetch.patch || true && patch -f -p0 --forward --binary < ./patch/parse-duration.patch || true && patch -f -p0 --forward --binary < ./patch/parse-duration-esm.patch || true && patch -f -p0 < ./patch/ipfs-pubsub-peer-monitor.patch || true",
"applyPatches": "patch -f -p0 < ./patch/electron-fetch.patch || true && patch -f -p0 --forward --binary < ./patch/parse-duration.patch || true && patch -f -p0 --forward --binary < ./patch/parse-duration-esm.patch || true && patch -f -p0 < ./patch/ipfs-pubsub-peer-monitor.patch || true && patch --forward -p0 node_modules/@libp2p/kad-dht/dist/src/providers.js < ./patch/libp2p-kaddht-providers.patch || true && patch --forward -p0 node_modules/@helia/block-brokers/node_modules/ipfs-bitswap/dist/src/bitswap.js < ./patch/bitswap.patch || true",
"prepare": "npm run webpack",
"version": "git add -A src",
"lint:no-fix": "eslint --ext .jsx,.js,.ts,.tsx ./src/",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import fs from 'fs'
import path from 'path'
import crypto from 'crypto'
import { GetBlockProgressEvents, type Helia } from 'helia'
import { AddEvents, CatOptions, GetEvents, unixfs, type UnixFS } from '@helia/unixfs'
import { AddEvents, CatOptions, GetEvents, unixfs, UnixFSStats, type UnixFS } from '@helia/unixfs'
import { promisify } from 'util'
import sizeOf from 'image-size'
import { CID } from 'multiformats/cid'
Expand All @@ -19,6 +19,7 @@ const { createPaths, compare } = await import('../common/utils')
import { createLogger } from '../common/logger'
import { IpfsService } from '../ipfs/ipfs.service'
import { CustomProgressEvent } from 'progress-events'
import { file } from 'mock-fs/lib/filesystem'

// 1048576 is the number of bytes in a block uploaded via unixfs
// Reference: packages/backend/node_modules/@helia/unixfs/src/commands/add.ts
Expand Down Expand Up @@ -265,8 +266,15 @@ export class IpfsFileManagerService extends EventEmitter {

this.controllers.set(fileMetadata.cid, { controller })

// Add try catch and return downloadBlocks with timeout
const initialStat = await this.ufs.stat(fileCid, { signal: controller.signal })
let initialStat: UnixFSStats | undefined = undefined
try {
initialStat = await this.ufs.stat(fileCid, { signal: controller.signal })
} catch (e) {
_logger.error(`Error while performing initial stat`, e)
await this.cancelDownload(fileCid.toString())
return
}

const fileSize = initialStat.fileSize
const localSize = initialStat.localFileSize
if (fileMetadata.size && !compare(fileMetadata.size, fileSize, 0.05)) {
Expand Down Expand Up @@ -472,7 +480,14 @@ export class IpfsFileManagerService extends EventEmitter {
}

while (downloading && !controller.signal.aborted) {
const stat = await this.ufs.stat(fileCid)
let stat: UnixFSStats | undefined = undefined
try {
stat = await this.ufs.stat(fileCid)
} catch (e) {
_logger.error(`Error while running stat`, e)
continue
}

const totalSize = Number(stat.fileSize)
const downloadedSize = Number(stat.localFileSize)
if (offset >= totalSize) {
Expand Down Expand Up @@ -503,6 +518,9 @@ export class IpfsFileManagerService extends EventEmitter {
downloading = false
break
}

_logger.error(`Error while catting file`, e)
continue
}
offset += DEFAULT_CAT_BLOCK_CHUNK_SIZE
}
Expand All @@ -528,6 +546,9 @@ export class IpfsFileManagerService extends EventEmitter {
} catch (e) {
if (controller.signal.aborted) {
_logger.warn(`Cancelling download`)
} else {
_logger.error(`Error while catting to write blocks out to local file`, e)
await this.cancelDownload(fileCid.toString())
}
}
}
Expand All @@ -545,6 +566,8 @@ export class IpfsFileManagerService extends EventEmitter {
const fileState = this.files.get(fileMetadata.cid)
if (!fileState && !controller.signal.aborted) {
this.logger.error(`No saved data for file cid ${fileMetadata.cid}`)
await this.updateStatus(fileMetadata.cid, DownloadState.Canceled)
this.controllers.delete(fileMetadata.cid)
return
}

Expand Down
28 changes: 26 additions & 2 deletions packages/backend/src/nest/libp2p/libp2p.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,9 @@ export class Libp2pService extends EventEmitter {
outboundUpgradeTimeout: 30_000,
protocolNegotiationTimeout: 10_000,
maxDialQueueLength: 500,
reconnectRetryInterval: 5_000,
reconnectRetries: 50,
reconnectBackoffFactor: 1.5,
},
privateKey: params.peerId.privKey,
addresses: { listen: params.listenAddresses },
Expand All @@ -285,14 +288,15 @@ export class Libp2pService extends EventEmitter {
handshakeTimeout: 15_000,
ciphers: WEBSOCKET_CIPHER_SUITE,
followRedirects: true,
sessionTimeout: 600,
},
localAddress: params.localAddress,
targetPort: params.targetPort,
closeOnEnd: true,
}),
],
services: {
ping: ping({ timeout: 30_000 }),
ping: ping({ timeout: 30_000, runOnLimitedConnection: false, maxInboundStreams: 60, maxOutboundStreams: 60 }),
pubsub: gossipsub({
// neccessary to run a single peer
allowPublishToZeroTopicPeers: true,
Expand All @@ -306,8 +310,28 @@ export class Libp2pService extends EventEmitter {
keychain: keychain(),
dht: kadDHT({
allowQueryWithZeroPeers: true,
clientMode: false,
clientMode: true,
initialQuerySelfInterval: 500,
providers: {
cacheSize: 1024,
},
maxInboundStreams: 128,
maxOutboundStreams: 128,
networkDialTimeout: {
minTimeout: 30_000,
timeoutMultiplier: 1.05,
failureMultiplier: 1.1,
},
pingNewContactTimeout: {
minTimeout: 30_000,
timeoutMultiplier: 1.05,
failureMultiplier: 1.1,
},
pingOldContactTimeout: {
minTimeout: 30_000,
timeoutMultiplier: 1.02,
failureMultiplier: 1.15,
},
}),
},
})
Expand Down
2 changes: 1 addition & 1 deletion packages/backend/src/nest/websocketOverTor/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,4 @@ export const CODE_WS = 477
export const CODE_WSS = 478

// Time to wait for a connection to close gracefully before destroying it manually
export const CLOSE_TIMEOUT = 500
export const CLOSE_TIMEOUT = 5000
10 changes: 8 additions & 2 deletions packages/backend/src/nest/websocketOverTor/socket-to-conn.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { CLOSE_TIMEOUT } from './constants'
import type { AbortOptions, ComponentLogger, CounterGroup, MultiaddrConnection } from '@libp2p/interface'
import type { Multiaddr } from '@multiformats/multiaddr'
import type { DuplexWebSocket } from 'it-ws/duplex'
import { CloseEvent, ErrorEvent } from 'ws'

export interface SocketToConnOptions {
localAddr?: Multiaddr
Expand All @@ -21,7 +22,7 @@ export function socketToMaConn(
remoteAddr: Multiaddr,
options: SocketToConnOptions
): MultiaddrConnection {
const log = options.logger.forComponent('libp2p:websockets:maconn')
const log = options.logger.forComponent(`libp2p:websockets:maconn:${remoteAddr.getPeerId()}`)
const metrics = options.metrics
const metricPrefix = options.metricPrefix ?? ''

Expand Down Expand Up @@ -101,9 +102,14 @@ export function socketToMaConn(
},
}

stream.socket.addEventListener('error', (event: ErrorEvent) => {
log.error(`Error on socket: ${event.message}`, event.error)
})

stream.socket.addEventListener(
'close',
() => {
(event: CloseEvent) => {
log(`Closing socket`, JSON.stringify(event))
metrics?.increment({ [`${metricPrefix}close`]: true })

// In instances where `close` was not explicitly called,
Expand Down

0 comments on commit 3e52413

Please sign in to comment.