Skip to content

Commit

Permalink
docs: document most of the typescript code in ipfs-ts and webui
Browse files Browse the repository at this point in the history
  • Loading branch information
bojidar-bg committed Dec 29, 2023
1 parent 36986c2 commit 5464feb
Show file tree
Hide file tree
Showing 8 changed files with 202 additions and 19 deletions.
16 changes: 13 additions & 3 deletions pkg/ipfs-ts/connection-gater.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,28 @@ import { PeerId, isPeerId } from '@libp2p/interface/peer-id'
import type { Multiaddr } from '@multiformats/multiaddr'
import type { ConnectionGater } from '@libp2p/interface/connection-gater'

/**
* A libp2p connection gater implementation that never denies peers that have been explicitly allow()-ed
* Necessary to connect to peers running on localhost without an external IP address.
*/
export class AllowConnectionGater implements ConnectionGater {
public allowed: Set<string>
public onlyAllowed: boolean

constructor({ onlyAllowed = false, allowed = [] }) {
/**
* @param opts Options
* @param opts.onlyAllowed Only allow explicitly-allowed peers, overriding the default connection-gater behavior
*/
constructor({ onlyAllowed = false }) {
this.onlyAllowed = onlyAllowed
this.allowed = new Set<string>(allowed)
this.allowed = new Set<string>()
}

/**
* Allow dialing the specified peer
*/
allow(peer: PeerId | Multiaddr | Multiaddr[]): void {
if (isPeerId(peer)) {
console.log(peer.toString())
this.allowed.add(peer.toString())
} else {
const addrs = Array.isArray(peer) ? peer : [peer]
Expand Down
14 changes: 14 additions & 0 deletions pkg/ipfs-ts/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,14 @@ import { AllowConnectionGater } from './connection-gater'

export { createLibp2pConnectTransport, AllowConnectionGater }

/**
* Connect to the specified peer over the specified protocol using the specified helia node. Thin wrapper around createLibp2pConnectTransport.
*
* @param node the Helia node
* @param peerAddr the address of the peer
* @param protocol the protocol to use
* @returns a connectrpc transport
*/
export function connectTo(
node: Helia,
peerAddr: PeerId | Multiaddr | Multiaddr[],
Expand All @@ -25,6 +33,12 @@ export function connectTo(
})
}

/**
* Creates a helia node and configures it to allow connecting to any peers that are explicitly dialed.
*
* @param opts options
* @param opts.testMode run in test mode, which disables any peer discovery mechnisms and default libp2p background operations
*/
export async function createClient({ testMode = false }): Promise<Helia> {
const connectionGater = new AllowConnectionGater({
onlyAllowed: testMode
Expand Down
60 changes: 46 additions & 14 deletions pkg/ipfs-ts/transport-libp2p-connect.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// SPDX-License-Identifier: GPL-3.0

import { Transport } from '@connectrpc/connect'
import { Transport, Interceptor } from '@connectrpc/connect'
import {
UniversalClientRequest,
UniversalClientResponse
Expand All @@ -15,12 +15,15 @@ const eol = encoder.encode('\r\n')

export interface Libp2pTransportOptions {
dialStream: () => Promise<Stream>
interceptors: []
readMaxBytes: 10000
useBinaryFormat: true
writeMaxBytes: 10000
interceptors: Interceptor[]
readMaxBytes: number
useBinaryFormat: boolean
writeMaxBytes: number
}

/**
* Create a connectRPC transport that uses Libp2p streams for communication.
*/
export function createLibp2pConnectTransport(
options: Libp2pTransportOptions
): Transport {
Expand All @@ -30,13 +33,16 @@ export function createLibp2pConnectTransport(
): Promise<UniversalClientResponse> {
const stream = await options.dialStream() // NOTE: keepalive could be nice here?

// Create the request

let requestIsChunked = false
if (!req.header.has('Content-Length') && req.body !== undefined) {
requestIsChunked = true
req.header.append('Transfer-Encoding', 'chunked')
}
req.header.append('Host', '127.0.0.1')

// Encode headers
const requestHeadersBuffer = new Uint8ArrayList()
requestHeadersBuffer.append(
encoder.encode(`${req.method} ${req.url} HTTP/1.2`),
Expand All @@ -47,7 +53,9 @@ export function createLibp2pConnectTransport(
})
requestHeadersBuffer.append(eol)

let signalEnd!: () => Promise<void>
let signalEnd!: () => Promise<void> // Used to signal the end of the response body, so that we can close the request stream

// Send the request (in the background)
const bodyPromise = stream.sink(
writeBody(
new Uint8ArrayList(requestHeadersBuffer),
Expand All @@ -62,6 +70,10 @@ export function createLibp2pConnectTransport(
)
)

// Receive the response

// Decode headers

let isStatusLine = true
let isBody = false
let responseStatus = -1
Expand All @@ -75,7 +87,7 @@ export function createLibp2pConnectTransport(
if (res.done ?? false) {
throw new Error('Invalid HTTP response (ended too early)')
}
buffer.append(res.value)
buffer.append(res.value) // Concatenate chunks so that we can process responses where one part of a response header is in one packet/chunk and the other is in the next chunk
} catch (e) {
console.log(e)
throw e
Expand Down Expand Up @@ -107,6 +119,7 @@ export function createLibp2pConnectTransport(
}
}

// Parse the rest of the response body
let responseContentLength: number
const transferEncoding = responseHeader.get('Transfer-Encoding')
if ((transferEncoding?.indexOf('chunked') ?? -1) !== -1) {
Expand All @@ -121,6 +134,7 @@ export function createLibp2pConnectTransport(
}
const responseTrailer = new Headers()

// Delegate to readBody(), as we need to return the initial response headers right away with the promise
return {
status: responseStatus,
header: responseHeader,
Expand All @@ -137,14 +151,17 @@ export function createLibp2pConnectTransport(
baseUrl: '',
acceptCompression: [],
compressMinBytes: 10,
interceptors: [],
interceptors: options.interceptors,
readMaxBytes: options.readMaxBytes,
sendCompression: null,
useBinaryFormat: options.useBinaryFormat,
writeMaxBytes: options.writeMaxBytes
})
}

/**
* Write a request body out, taking care of chunked encoding
*/
async function* writeBody(
buffer: Uint8ArrayList,
body?: AsyncIterable<Uint8Array>,
Expand Down Expand Up @@ -178,23 +195,35 @@ async function* writeBody(
}
}

/**
* Read response body out, taking care of chunked encoding
*
* @param buffer chunks that have already been read (e.g. while parsing headers)
* @param source
* @param contentLength length of the body, or -1 if using chunked encoding
* @param trailers Headers object to write chunked encoding trailers
* @param signalEnd function to call when the whole body has been received
*/
async function* readBody(
buffer: Uint8ArrayList,
source: AsyncGenerator<Uint8ArrayList>,
contentLength: number,
trailers: Headers,
signalEnd?: () => Promise<void>
): AsyncGenerator<Uint8Array, void, undefined> {
let remainingChunkBytes = 0
let remainingChunkEolBytes = 0
let remainingChunkBytes = 0 // Remaining bytes from the chunk (bytes that we have to read from the buffer/source and output right away)
let remainingChunkEolBytes = 0 // Remaining EOL bytes at the end of the chunk (that we have to consume and ensure == \r\n)
let isTrailers = false

// This while maintains the buffer/source by pulling an extra frame from the source every iteration, while the rest of the code within it tries to parse as much of the buffer as possible
while (true) {
if (contentLength === -1) {
// Chunked encoding
// Best of luck to whoever might end up having to debug this.. I am sorry :/
while (!isTrailers) {
// Best of luck to whoever might end up having to debug this.. I am sorry :/
if (remainingChunkEolBytes === 0) {
if (remainingChunkBytes === 0) {
// We are starting a new chunk!
const eolIndex = buffer.indexOf(eol)
if (eolIndex !== -1) {
const chunkLine = decoder.decode(buffer.subarray(0, eolIndex))
Expand All @@ -207,10 +236,11 @@ async function* readBody(
}
remainingChunkBytes = chunkSize + eol.byteLength
} else {
// It's the last chunk!
break
}
} else {
// (remainingChunkBytes !== 0)
// (remainingChunkBytes !== 0) - we are in the middle of a chunk!
if (buffer.byteLength >= remainingChunkBytes) {
yield buffer.subarray(0, remainingChunkBytes)
buffer.consume(remainingChunkBytes)
Expand All @@ -226,7 +256,7 @@ async function* readBody(
}
}
} else {
// (remainingChunkEolBytes !== 0)
// (remainingChunkEolBytes !== 0) - the chuck is over!
if (buffer.byteLength < eol.byteLength) {
break
}
Expand All @@ -240,7 +270,7 @@ async function* readBody(
}
}
if (isTrailers) {
// Lack of else is important (so we parse the trailers in the buffer right away)
// Lack of else is important (so we parse any trailers present in the buffer right away)
let eolIndex: number
while ((eolIndex = buffer.indexOf(eol)) !== -1) {
const line = decoder.decode(buffer.subarray(0, eolIndex))
Expand All @@ -256,6 +286,7 @@ async function* readBody(
}
}
} else {
// (contentLength !== -1) -- Content-length/unchunked encoding
if (buffer.byteLength >= contentLength) {
yield buffer.subarray(0, contentLength)
buffer.consume(contentLength)
Expand All @@ -270,6 +301,7 @@ async function* readBody(
}
}

// Advance the buffer
const res = await source.next()
if (res.done ?? false) {
break
Expand Down
13 changes: 12 additions & 1 deletion test/e2e/webui/src/connections.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,24 @@ import { foundry } from 'viem/chains'
import { privateKeyToAccount } from 'viem/accounts'
import { createClient } from 'apocryph-ipfs-ts'

/**
* Client for a public Ethereum node, used for reading, estimating gas fees, and simulating transactions
*/
export const publicClient = createPublicClient({
chain: foundry,
transport: http()
})

/**
* Client for a private Ethereum node, used for signing transactions
*/
export const walletClient = createWalletClient({
chain: foundry,
account: privateKeyToAccount('0x5de4111afa1a4b94908f83103eb1f1706367c2e68ca870fc3fb9a804cdab365a'), // TODO= anvil.accounts[2],
account: privateKeyToAccount('0x5de4111afa1a4b94908f83103eb1f1706367c2e68ca870fc3fb9a804cdab365a'), // TODO= anvil.accounts[2] -- remove hardcode / use metamask!
transport: http() // custom(window.ethereum)
})

/**
* Client for IPFS/libp2p
*/
export const heliaNodePromise = createClient({ testMode: true })
Loading

0 comments on commit 5464feb

Please sign in to comment.