Skip to content

Commit

Permalink
add AbortSignal for websocket
Browse files Browse the repository at this point in the history
  • Loading branch information
xingyu.ren committed Feb 1, 2024
1 parent 5b97700 commit 1b63184
Showing 1 changed file with 32 additions and 11 deletions.
43 changes: 32 additions & 11 deletions src/kube-api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -202,9 +202,13 @@ class ApiVersionResourceCache {
return resources.find(r => r.kind === kind)!;
}

public async resourceByName(resourcePath: string, name: string) {
public async resourceByName(
resourcePath: string,
name: string,
signal?: AbortSignal
) {
const localVarPath = `${this.basePath}${resourcePath}`;
const { resources } = await this.fetchAndCache(localVarPath);
const { resources } = await this.fetchAndCache(localVarPath, signal);
return resources.find(r => r.name === name)!;
}

Expand All @@ -214,9 +218,11 @@ class ApiVersionResourceCache {
return [this.basePath, api, apiVersion].join('/');
}

private async fetchAndCache(localVarPath: string) {
private async fetchAndCache(localVarPath: string, signal?: AbortSignal) {
if (!this.cache[localVarPath]) {
const resources = await ky.get(localVarPath).json<APIResourceList>();
const resources = await ky
.get(localVarPath, { signal })
.json<APIResourceList>();
this.cache[localVarPath] = resources;
}
return this.cache[localVarPath];
Expand Down Expand Up @@ -299,7 +305,8 @@ export class KubeApi<T extends UnstructuredList> {
query,
onResponse,
onEvent,
})
}),
signal
);
const stop = () => {
stopWatch?.();
Expand All @@ -315,7 +322,8 @@ export class KubeApi<T extends UnstructuredList> {
onResponse: KubeApiListWatchOptions<T>['onResponse'],
onEvent: KubeApiListWatchOptions<T>['onEvent'],
// let listwatch know it needs retry
retry: () => Promise<StopWatchHandler>
retry: () => Promise<StopWatchHandler>,
signal?: AbortSignal
): Promise<StopWatchHandler> {
let { items } = response as unknown as UnstructuredList;
const stops: Array<() => void> = [];
Expand Down Expand Up @@ -384,14 +392,17 @@ export class KubeApi<T extends UnstructuredList> {
// server-side watch
const resource = await this.apiVersionResourceCache.resourceByName(
this.resourceBasePath,
this.resource
this.resource,
signal
);
const verbs = resource?.verbs || [];
const supportServerWatch = verbs.includes('watch');

if (supportServerWatch) {
if (this.watchWsBasePath) {
stops.push(this.watchByWebsocket(url, response, handleEvent, retry));
stops.push(
this.watchByWebsocket(url, response, handleEvent, retry, signal)
);
} else {
stops.push(this.watchByHttp(url, response, handleEvent, retry));
}
Expand Down Expand Up @@ -429,7 +440,8 @@ export class KubeApi<T extends UnstructuredList> {
res: T,
handleEvent: (event: WatchEvent) => void,
// let listwatch know it needs retry
retry: () => Promise<StopWatchHandler>
retry: () => Promise<StopWatchHandler>,
signal?: AbortSignal
) {
const { resourceVersion = '' } = (res as unknown as UnstructuredList)
.metadata;
Expand All @@ -440,6 +452,15 @@ export class KubeApi<T extends UnstructuredList> {
: `${protocol}://${location.host}/${url}?resourceVersion=${resourceVersion}&watch=1`
);
let shouldCloseAfterConnected = false;
const abortListener = () => {
if (socket.readyState === socket.CONNECTING) {
shouldCloseAfterConnected = true;
}
};
const removeAbortListener = () => {
signal?.removeEventListener('abort', abortListener);
};
signal?.addEventListener('abort', abortListener);
let stopWatch: () => void = () => {
if (socket.readyState === socket.OPEN) {
socket.close(3001, 'KUBEAPI_MANUAL_CLOSE');
Expand All @@ -458,6 +479,7 @@ export class KubeApi<T extends UnstructuredList> {
return;
}
heartbeat(socket);
removeAbortListener();
});
socket.addEventListener('message', function (msg) {
const event = JSON.parse(msg.data) as WatchEvent;
Expand All @@ -467,11 +489,10 @@ export class KubeApi<T extends UnstructuredList> {
});
socket.addEventListener('close', evt => {
clearTimeout((socket as ExtendedWebsocketClient).pingTimeout);

if (evt.reason === 'KUBEAPI_MANUAL_CLOSE') {
return;
}

removeAbortListener();
this.retryFunc(async () => {
stopWatch = await retry();
});
Expand Down

0 comments on commit 1b63184

Please sign in to comment.