Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 74 additions & 48 deletions backend/src/clients/registry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,67 +44,94 @@ async function registryRequest(
returnRawBody: boolean = false,
extraFetchOptions: Partial<Omit<RequestInit, 'headers' | 'dispatcher' | 'signal'>> = {},
extraHeaders: HeadersInit = {},
traverseLinks: boolean = true,
): Promise<RegistryRequestResult> {
const controller = new AbortController()
let res: Response
try {
// Note that this `fetch` is from `Node` and not `node-fetch` unlike other places in the codebase.
// This is because `node-fetch` was incorrectly closing the stream received from `tar` for some (but not all) entries which meant that not all of the streamed data was sent to the registry
res = await fetch(`${registry}/v2/${endpoint}`, {
headers: {
Authorization: `Bearer ${token}`,
...extraHeaders,
},
dispatcher: agent,
signal: controller.signal,
...extraFetchOptions,
})
} catch (err) {
throw InternalError('Unable to communicate with the registry.', { err })
}

const headersObject = res.headers ? Object.fromEntries(res.headers) : {}
const contentType = res.headers.get('content-type') || ''

let body: unknown
const allRepositories: string[] = []
let paginateParameter = ''
let headersObject: {
[k: string]: string
}
let contentType: string
let res: Response
let body: any
let stream: ReadableStream | Readable | undefined

if (returnRawBody) {
stream = res.body as any
} else if (contentType.endsWith('json')) {
// e.g. 'application/json', 'application/vnd.docker.distribution.manifest.v2+json'
do {
try {
body = await res.json()
// Note that this `fetch` is from `Node` and not `node-fetch` unlike other places in the codebase.
// This is because `node-fetch` was incorrectly closing the stream received from `tar` for some (but not all) entries which meant that not all of the streamed data was sent to the registry
res = await fetch(`${registry}/v2/${endpoint}${paginateParameter}`, {
headers: {
Authorization: `Bearer ${token}`,
...extraHeaders,
},
dispatcher: agent,
signal: controller.signal,
...extraFetchOptions,
})
} catch (err) {
throw InternalError('Unable to parse response body JSON.', { err })
throw InternalError('Unable to communicate with the registry.', { err })
}
} else {
try {
body = await res.text()
} catch (err) {
throw InternalError('Unable to read non-JSON response body.', { err })

headersObject = res.headers ? Object.fromEntries(res.headers) : {}
contentType = res.headers.get('content-type') || ''

if (headersObject.link) {
paginateParameter = headersObject.link.substring(
headersObject.link.indexOf('%'),
headersObject.link.lastIndexOf('>'),
)
}
}

if (!res.ok) {
const context = {
url: res.url,
status: res.status,
statusText: res.statusText,
if (returnRawBody) {
stream = res.body as any
} else if (contentType.endsWith('json')) {
// e.g. 'application/json', 'application/vnd.docker.distribution.manifest.v2+json'
try {
body = await res.json()
} catch (err) {
throw InternalError('Unable to parse response body JSON.', { err })
}
} else {
try {
body = await res.text()
} catch (err) {
throw InternalError('Unable to read non-JSON response body.', { err })
}
}

if (!body && contentType.includes('application/json')) {
// try to get the json if there's an error, even if we wanted the raw body
body = await res.json().catch(() => undefined)
if (!res.ok) {
const context = {
url: res.url,
status: res.status,
statusText: res.statusText,
}

if (!body && contentType.includes('application/json')) {
// try to get the json if there's an error, even if we wanted the raw body
body = await res.json().catch(() => undefined)
}

if (isRegistryErrorResponse(body)) {
throw RegistryError(body, context)
} else {
throw InternalError('Unrecognised response returned by the registry.', {
...context,
body: JSON.stringify(body),
})
}
}

if (isRegistryErrorResponse(body)) {
throw RegistryError(body, context)
} else {
throw InternalError('Unrecognised response returned by the registry.', {
...context,
body: JSON.stringify(body),
})
if (body?.repositories) {
allRepositories.push(...body.repositories)
}
} while (traverseLinks && headersObject?.link)

if (allRepositories.length) {
body = {
repositories: allRepositories,
}
}

Expand All @@ -119,7 +146,6 @@ async function registryRequest(
}
}

// Currently limited to a maximum 100 image names
export async function listModelRepos(token: string, modelId: string) {
const { body } = await registryRequest(token, `_catalog?n=100&last=${modelId}`)
if (!isListModelReposResponse(body)) {
Expand Down
Loading