Skip to content

Commit

Permalink
fix: close GRPC channel when we dispose of clients (#779)
Browse files Browse the repository at this point in the history
  • Loading branch information
schmidt-sebastian authored Dec 15, 2019
1 parent 5000b2d commit 22ef0d0
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 18 deletions.
5 changes: 3 additions & 2 deletions dev/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ export class Firestore {

this._clientPool = new ClientPool(
MAX_CONCURRENT_REQUESTS_PER_CLIENT,
() => {
/* clientFactory= */ () => {
let client: GapicClient;

if (this._settings.ssl === false) {
Expand All @@ -387,7 +387,8 @@ export class Firestore {

logger('Firestore', null, 'Initialized Firestore GAPIC Client');
return client;
}
},
/* clientDestructor= */ (client: GapicClient) => client.close()
);

logger('Firestore', null, 'Initialized Firestore');
Expand Down
27 changes: 17 additions & 10 deletions dev/src/pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,14 @@ export class ClientPool<T> {
* can handle.
* @param clientFactory A factory function called as needed when new clients
* are required.
* @param clientDestructor A cleanup function that is called when a client is
* disposed of.
*/
constructor(
private readonly concurrentOperationLimit: number,
private readonly clientFactory: () => T
private readonly clientFactory: () => T,
private readonly clientDestructor: (client: T) => Promise<void> = () =>
Promise.resolve()
) {}

/**
Expand Down Expand Up @@ -88,15 +92,15 @@ export class ClientPool<T> {
* removing it from the pool of active clients.
* @private
*/
private release(requestTag: string, client: T): void {
private async release(requestTag: string, client: T): Promise<void> {
let requestCount = this.activeClients.get(client) || 0;
assert(requestCount > 0, 'No active request');

requestCount = requestCount! - 1;
this.activeClients.set(client, requestCount);

if (requestCount === 0) {
const deletedCount = this.garbageCollect();
const deletedCount = await this.garbageCollect();
if (deletedCount) {
logger(
'ClientPool.release',
Expand Down Expand Up @@ -147,12 +151,12 @@ export class ClientPool<T> {
const client = this.acquire(requestTag);

return op(client)
.catch(err => {
this.release(requestTag, client);
.catch(async err => {
await this.release(requestTag, client);
return Promise.reject(err);
})
.then(res => {
this.release(requestTag, client);
.then(async res => {
await this.release(requestTag, client);
return res;
});
}
Expand All @@ -164,17 +168,20 @@ export class ClientPool<T> {
* @return Number of clients deleted.
* @private
*/
private garbageCollect(): number {
private async garbageCollect(): Promise<number> {
let idleClients = 0;
this.activeClients.forEach((requestCount, client) => {
const cleanUpTasks: Array<Promise<void>> = [];
for (const [client, requestCount] of this.activeClients) {
if (requestCount === 0) {
++idleClients;

if (idleClients > 1) {
this.activeClients.delete(client);
cleanUpTasks.push(this.clientDestructor(client));
}
}
});
}
await Promise.all(cleanUpTasks);
return idleClients - 1;
}
}
4 changes: 2 additions & 2 deletions dev/src/v1/firestore_admin_client_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
"initial_retry_delay_millis": 100,
"retry_delay_multiplier": 1.3,
"max_retry_delay_millis": 60000,
"initial_rpc_timeout_millis": 20000,
"initial_rpc_timeout_millis": 60000,
"rpc_timeout_multiplier": 1,
"max_rpc_timeout_millis": 20000,
"max_rpc_timeout_millis": 60000,
"total_timeout_millis": 600000
}
},
Expand Down
4 changes: 2 additions & 2 deletions dev/src/v1/firestore_client_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
"initial_retry_delay_millis": 100,
"retry_delay_multiplier": 1.3,
"max_retry_delay_millis": 60000,
"initial_rpc_timeout_millis": 20000,
"initial_rpc_timeout_millis": 60000,
"rpc_timeout_multiplier": 1,
"max_rpc_timeout_millis": 20000,
"max_rpc_timeout_millis": 60000,
"total_timeout_millis": 600000
}
},
Expand Down
4 changes: 2 additions & 2 deletions dev/src/v1beta1/firestore_client_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@
"initial_retry_delay_millis": 100,
"retry_delay_multiplier": 1.3,
"max_retry_delay_millis": 60000,
"initial_rpc_timeout_millis": 20000,
"initial_rpc_timeout_millis": 60000,
"rpc_timeout_multiplier": 1,
"max_rpc_timeout_millis": 20000,
"max_rpc_timeout_millis": 60000,
"total_timeout_millis": 600000
}
},
Expand Down
22 changes: 22 additions & 0 deletions dev/test/pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,28 @@ describe('Client pool', () => {
);
});

it('garbage collection calls destructor', () => {
const garbageCollect = new Deferred();

const clientPool = new ClientPool<{}>(
1,
() => {
return {};
},
() => Promise.resolve(garbageCollect.resolve())
);

const operationPromises = deferredPromises(2);

// Create two pending operations that each spawn their own client
clientPool.run(REQUEST_TAG, () => operationPromises[0].promise);
clientPool.run(REQUEST_TAG, () => operationPromises[1].promise);

operationPromises.forEach(deferred => deferred.resolve());

return garbageCollect.promise;
});

it('forwards success', () => {
const clientPool = new ClientPool<{}>(1, () => {
return {};
Expand Down

0 comments on commit 22ef0d0

Please sign in to comment.