Skip to content

Commit

Permalink
fix: Fixes download issues on cancellation, download status logs, con…
Browse files Browse the repository at this point in the history
…nection stability (#2687)

* Fix download issues and improve connections

* Update e2e-linux.yml

* Some mild updates to help with initial connections and connection stability

* Mild dependency update

* Mild abort improvements and make the file component more legible

* Fix snapshots
  • Loading branch information
islathehut authored Jan 2, 2025
1 parent 5986fc3 commit 1c32ef5
Show file tree
Hide file tree
Showing 14 changed files with 170 additions and 95 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/e2e-linux.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ jobs:
DISPLAY: ":99.0"
TEST_MODE: true
IS_CI: true
SKIP_BACK_COMPAT_TEST_BRANCHES: '["update-orbitdb", "chore/upgrade-orbitdb-2_4_3"]'
SKIP_BACK_COMPAT_TEST_BRANCHES: '["update-orbitdb", "chore/upgrade-orbitdb-2_4_3", "fix/2679-2680-2682-3_0-fixes"]'

steps:
- uses: actions/checkout@b4ffde65f46336ab88eb53be808477a3936bae11 # v4.1.1
Expand Down
20 changes: 20 additions & 0 deletions packages/backend/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions packages/backend/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
"@types/cors": "2.8.17",
"@types/crypto-js": "^4.0.2",
"@types/express": "^4.17.9",
"@types/get-port": "4.2.0",
"@types/jest": "28.1.8",
"@types/luxon": "^3.4.2",
"@types/mock-fs": "^4.13.1",
Expand Down
1 change: 0 additions & 1 deletion packages/backend/src/nest/common/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import { UserData } from '@quiet/types'
import { HttpsProxyAgent } from 'https-proxy-agent'
import { generateKeyPair } from '@libp2p/crypto/keys'
import { peerIdFromPrivateKey } from '@libp2p/peer-id'
import { type PeerId } from '@libp2p/interface'
import tmp from 'tmp'
import crypto from 'crypto'
import { type PermsData } from '@quiet/types'
Expand Down
177 changes: 108 additions & 69 deletions packages/backend/src/nest/ipfs-file-manager/ipfs-file-manager.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import sizeOf from 'image-size'
import { CID } from 'multiformats/cid'
import { DownloadProgress, DownloadState, DownloadStatus, FileMetadata, imagesExtensions } from '@quiet/types'
import { QUIET_DIR } from '../const'
import { ExportProgress, FilesData, IpfsFilesManagerEvents } from './ipfs-file-manager.types'
import { ExportProgress, ExportWalk, FilesData, IpfsFilesManagerEvents } from './ipfs-file-manager.types'
import { StorageEvents, UnixFSEvents } from '../storage/storage.types'
import { MAX_EVENT_LISTENERS, TRANSFER_SPEED_SPAN, UPDATE_STATUS_INTERVAL } from './ipfs-file-manager.const'
import { sleep } from '../common/sleep'
Expand Down Expand Up @@ -238,27 +238,26 @@ export class IpfsFileManagerService extends EventEmitter {

private async cancelDownload(cid: string) {
const _logger = createLogger(`${IpfsFileManagerService.name}:cancel:${cid}`)
const abortController = this.controllers.get(cid)
let abortController = this.controllers.get(cid)
const downloadInProgress = this.files.get(cid)
if (!downloadInProgress) return
// In case download is cancelled right after start and queue is not yet initialized.
if (!abortController) {
while (abortController == null) {
_logger.info(`Waiting for abort controller to be created...`)
await sleep(1000)
await this.cancelDownload(cid)
} else {
_logger.info(`Aborting download`)
const controller = abortController.controller
this.cancelledDownloads.add(cid)
controller.abort()
abortController = this.controllers.get(cid)
}

_logger.info(`Aborting download`)
const controller = abortController.controller
controller.abort()
}

public async downloadFile(fileMetadata: FileMetadata) {
const _logger = createLogger(`${IpfsFileManagerService.name}:download:${fileMetadata.cid.toString()}`)

const fileCid: CID = CID.parse(fileMetadata.cid)
const downloadedBlocks: Set<string> = new Set()
let downloadedBlocks: number = 0
const pendingBlocks: Set<string> = new Set()
const controller = new AbortController()

Expand All @@ -267,7 +266,7 @@ export class IpfsFileManagerService extends EventEmitter {
this.controllers.set(fileMetadata.cid, { controller })

// Add try catch and return downloadBlocks with timeout
const initialStat = await this.ufs.stat(fileCid)
const initialStat = await this.ufs.stat(fileCid, { signal: controller.signal })
const fileSize = initialStat.fileSize
const localSize = initialStat.localFileSize
if (fileMetadata.size && !compare(fileMetadata.size, fileSize, 0.05)) {
Expand Down Expand Up @@ -304,7 +303,9 @@ export class IpfsFileManagerService extends EventEmitter {
// Transfer speed
const blocksStats: BlockStat[] = []

const handleDownloadProgressEvents = async (event: GetEvents | CustomProgressEvent<CID>) => {
const handleDownloadProgressEvents = async (
event: GetEvents | GetBlockProgressEvents | CustomProgressEvent<any>
) => {
// if we don't have an event type there's nothing useful to do
if (event.type === null) {
return
Expand All @@ -324,19 +325,13 @@ export class IpfsFileManagerService extends EventEmitter {
}

_logger.info(`Getting block ${cidStr} from local blockstore`)
if (downloadedBlocks.has(cidStr)) {
_logger.info(`Already downloaded block ${cidStr}`)
return
}

downloadedBlocks.add(cidStr)
}

// handler for events where we are walking the file to get all child blocks
// NOTE: this happens at the beginning of the download process AND when we have all of the blocks are we are walking through them to get the contents
const handleWalkFile = async (cid: CID) => {
const cidStr = cid.toString()
if (downloadedBlocks.size === 0 && pendingBlocks.size === 0) {
const handleWalkFile = async (event: CustomProgressEvent<ExportWalk>) => {
const cidStr = event.detail.cid.toString()
if (downloadedBlocks === 0 && pendingBlocks.size === 0) {
// this is the first time we've seen this event so it means we are just starting the download process
_logger.info(`Download started, walking`)
await this.updateStatus(cidStr, DownloadState.Downloading)
Expand All @@ -356,6 +351,7 @@ export class IpfsFileManagerService extends EventEmitter {
byteLength: Number(totalBytes) - Number(bytesRead),
}
blocksStats.push(blockStat)
downloadedBlocks += 1
}

// handler for events where we are asking for the block on the network because we don't have it stored locally
Expand All @@ -370,10 +366,20 @@ export class IpfsFileManagerService extends EventEmitter {
pendingBlocks.add(cidStr)
}

const handlePutBlock = async (event: GetBlockProgressEvents) => {
const cidStr = event.detail.toString()
if (pendingBlocks.has(cidStr)) {
pendingBlocks.delete(cidStr)
}

_logger.info(`Putting block ${cidStr} into local blockstore`)
}

this.logger.info(`Event with type`, event.type)
switch (event.type) {
case UnixFSEvents.WALK_FILE:
// this event has a different format for how it stores the CID on the detail
await handleWalkFile((event as any).detail.cid as CID)
await handleWalkFile(event as CustomProgressEvent<ExportWalk>)
break
case UnixFSEvents.GET_BLOCK_PROVIDERS:
case UnixFSEvents.WANT_BLOCK:
Expand All @@ -385,50 +391,77 @@ export class IpfsFileManagerService extends EventEmitter {
case UnixFSEvents.DOWNLOAD_BLOCK:
await handleDownloadBlock(event as CustomProgressEvent<ExportProgress>)
break
case UnixFSEvents.PUT_BLOCK:
await handlePutBlock(event as GetBlockProgressEvents)
break
default:
break
}

return
}

const updateDownloadStatusWithTransferSpeed = setInterval(async () => {
const totalDownloadedBytes = Number((await this.ufs.stat(fileCid)).localFileSize)
let recentlyDownloadedBytes = 0
const thresholdTimestamp = Math.floor(Date.now() / 1000) - TRANSFER_SPEED_SPAN
blocksStats.forEach((blockStat: BlockStat) => {
if (blockStat.fetchTime >= thresholdTimestamp) {
recentlyDownloadedBytes += blockStat.byteLength
const updateDownloadStatusWithTransferSpeed = setInterval(
async () => {
if (controller.signal.aborted) {
_logger.warn(`Cancelling update status interval due to cancellation`)
clearInterval(updateDownloadStatusWithTransferSpeed)
return
}
})
this.logger.info(`Current downloaded bytes`, recentlyDownloadedBytes, totalDownloadedBytes)

const transferSpeed = recentlyDownloadedBytes === 0 ? 0 : recentlyDownloadedBytes / TRANSFER_SPEED_SPAN
const fileState = this.files.get(fileMetadata.cid)
if (!fileState) {
this.logger.error(`No saved data for file cid ${fileMetadata.cid}`)
return
}
this.files.set(fileMetadata.cid, {
...fileState,
transferSpeed: transferSpeed,
downloadedBytes: totalDownloadedBytes,
})
await this.updateStatus(fileMetadata.cid, DownloadState.Downloading)
}, UPDATE_STATUS_INTERVAL * 1000)
const totalDownloadedBytes = Number((await this.ufs.stat(fileCid)).localFileSize)
let recentlyDownloadedBytes = 0
const thresholdTimestamp = Math.floor(Date.now() / 1000) - TRANSFER_SPEED_SPAN
blocksStats.forEach((blockStat: BlockStat) => {
if (blockStat.fetchTime >= thresholdTimestamp) {
recentlyDownloadedBytes += blockStat.byteLength
}
})
this.logger.info(`Current downloaded bytes`, recentlyDownloadedBytes, totalDownloadedBytes)

const downloadCompletedOrCanceled = new Promise((resolve, reject) => {
const interval = setInterval(() => {
const transferSpeed = recentlyDownloadedBytes === 0 ? 0 : recentlyDownloadedBytes / TRANSFER_SPEED_SPAN
const fileState = this.files.get(fileMetadata.cid)
this.ufs.stat(fileCid).then(({ fileSize, localFileSize }) => {
if (this.cancelledDownloads.has(fileMetadata.cid) || !fileState || localFileSize === fileSize) {
clearInterval(interval)
resolve('No more blocks to fetch, download is completed or canceled')
} else {
_logger.info(`Downloaded ${downloadedBlocks.size} blocks (${pendingBlocks.size} blocks pending)`)
}
if (!fileState) {
this.logger.error(`No saved data for file cid ${fileMetadata.cid}`)
return
}
this.files.set(fileMetadata.cid, {
...fileState,
transferSpeed: transferSpeed,
downloadedBytes: totalDownloadedBytes,
})
}, 1000)
await this.updateStatus(fileMetadata.cid, DownloadState.Downloading)
},
UPDATE_STATUS_INTERVAL * 1000,
controller
)

const downloadCompletedOrCanceled = new Promise((resolve, reject) => {
const interval = setInterval(
() => {
const fileState = this.files.get(fileMetadata.cid)
this.ufs
.stat(fileCid)
.then(({ fileSize, localFileSize }) => {
if (controller.signal.aborted || !fileState || localFileSize === fileSize) {
clearInterval(interval)
resolve('No more blocks to fetch, download is completed or canceled')
} else {
_logger.info(`Downloaded ${downloadedBlocks} blocks (${pendingBlocks.size} blocks pending)`)
}
})
.catch(e => {
clearInterval(interval)
if (controller.signal.aborted) {
resolve('No more blocks to fetch, download is completed or canceled')
} else {
reject(e)
}
})
},
1000,
controller
)
})

let downloading = fileSize !== initialStat.localFileSize
Expand All @@ -438,7 +471,7 @@ export class IpfsFileManagerService extends EventEmitter {
signal: controller.signal,
}

while (downloading) {
while (downloading && !controller.signal.aborted) {
const stat = await this.ufs.stat(fileCid)
const totalSize = Number(stat.fileSize)
const downloadedSize = Number(stat.localFileSize)
Expand All @@ -465,7 +498,7 @@ export class IpfsFileManagerService extends EventEmitter {
_logger.info(`Got block with size (in bytes)`, entry.byteLength)
}
} catch (e) {
if (this.cancelledDownloads.has(fileCid.toString())) {
if (controller.signal.aborted) {
_logger.warn(`Cancelling download`)
downloading = false
break
Expand All @@ -476,7 +509,7 @@ export class IpfsFileManagerService extends EventEmitter {

// I don't love that I'm doing this but just writing the files straight from the cat operation above ends up giving you a corrupt final file
// This gives us all blocks as they are
if (!this.cancelledDownloads.has(fileCid.toString())) {
if (!controller.signal.aborted) {
try {
const entries = this.ufs.cat(fileCid, baseCatOptions)
for await (const entry of entries) {
Expand All @@ -493,39 +526,45 @@ export class IpfsFileManagerService extends EventEmitter {
})
}
} catch (e) {
if (this.cancelledDownloads.has(fileCid.toString())) {
if (controller.signal.aborted) {
_logger.warn(`Cancelling download`)
}
}
}

writeStream.end()

await downloadCompletedOrCanceled
try {
await downloadCompletedOrCanceled
} catch (e) {
this.logger.error(`Error while waiting for download to be completed or canceled`, e)
}

clearInterval(updateDownloadStatusWithTransferSpeed)

const fileState = this.files.get(fileMetadata.cid)
if (!fileState) {
if (!fileState && !controller.signal.aborted) {
this.logger.error(`No saved data for file cid ${fileMetadata.cid}`)
return
}

if (this.cancelledDownloads.has(fileMetadata.cid)) {
this.files.set(fileMetadata.cid, {
...fileState,
downloadedBytes: 0,
transferSpeed: 0,
})
if (controller.signal.aborted) {
if (fileState != null) {
this.files.set(fileMetadata.cid, {
...fileState,
downloadedBytes: 0,
transferSpeed: 0,
})
}

await this.updateStatus(fileMetadata.cid, DownloadState.Canceled)
this.cancelledDownloads.delete(fileMetadata.cid)
this.controllers.delete(fileMetadata.cid)
this.files.delete(fileMetadata.cid)
this.controllers.delete(fileMetadata.cid)
return
}

this.files.set(fileMetadata.cid, {
...fileState,
...fileState!,
transferSpeed: 0,
downloadedBytes: Number((await this.ufs.stat(fileCid)).localFileSize),
})
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import { CID } from 'multiformats'

export enum IpfsFilesManagerEvents {
// Incoming evetns
DOWNLOAD_FILE = 'downloadFile',
Expand Down Expand Up @@ -38,3 +40,7 @@ export interface ExportProgress {
*/
fileSize: bigint
}

export interface ExportWalk {
cid: CID
}
Loading

0 comments on commit 1c32ef5

Please sign in to comment.