Skip to content

Commit

Permalink
feat: add query queue tolerance
Browse files Browse the repository at this point in the history
  • Loading branch information
SkeLLLa committed Jan 3, 2025
1 parent f359dd0 commit dc73f84
Show file tree
Hide file tree
Showing 21 changed files with 289 additions and 18 deletions.
60 changes: 60 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ Unofficial node.js [Apache Pnot](https://pinot.apache.org/) client. Uses [undici
- [Transport](#transport)
- [Broker client](#broker-client)
- [Constructing and performing queries](#constructing-and-performing-queries)
- [Pools and queues](#pools-and-queues)
- [Queue size](#queue-size)
- [Queue tolerance](#queue-tolerance)
- [Utilities](#utilities)
- [SqlUtils](#sqlutils)
- [stringifyQuery](#stringifyquery)
Expand Down Expand Up @@ -112,6 +115,63 @@ console.log(result.stats);

While using query options like `timeoutMs` they are passed to http request timeouts as well, so they shouldn't run longer than you expect the query should run.

### Pools and queues

#### Queue size

By default pinot transport requests are made via HTTP connection pool with maximum concurrency.
If requests rate exceeds max concurrency value, the requests are queued.
By default the queue is not limited, but it's strongly recommended to set it to prevent high memory consumption.

```typescript
import { PinotBrokerJSONTransport } from 'pinot-noir';

const pinotTransport = new PinotBrokerJSONTransport({
...
maxQueueSize: 1024,
});
```

#### Queue tolerance

Since pinot can contain different data some of requests may not tolerate queuing.
For addressing that issue in query you may specify queue tolerance.
It's a value that represents percent of max queue size that this request could tolerate.

For example if `maxQueueSize` is `1000` and `queueTolerance` is `0.1`, then the query will be performed only if current queue is less than `1000 * 0.1 = 100`.
Otherwise `QUEUE_TOLERANCE_LIMIT` error will be thrown.
For real-time data you can specify `queueTolerance` as `0`, then the requests will be discarded if there's a queue.

```typescript
import { PinotBrokerJSONTransport, PinotBrokerClient, PinotError, EPinotErrorType, EBrokerTransportErrorCode} from 'pinot-noir';
type: EPinotErrorType.TRANSPORT,
code: EBrokerTransportErrorCode.QUEUE_TOLERANCE_LIMIT,

const pinotTransport = new PinotBrokerJSONTransport({
...
maxQueueSize: 1000,
});

const pinotClient = new PinotBrokerClient({ transport: pinotTransport });

const year = 2010;
const query = sql`
select sum(hits) as hits, sum(homeRuns) as homeRuns, sum(numberOfGames) as gamesCount
from baseballStats
where yearID > ${year}`;
try {
const result = await pinotClient.select<IResult>(query, { timeoutMs: 1000, queueTolerance: 0.1 });
// Do something with result
} catch (err) {
if (err instanceof PinotError) {
if (err.type === EPinotErrorType.TRANSPORT && err.code === EBrokerTransportErrorCode.QUEUE_TOLERANCE_LIMIT) {
// Request skipped due to queue size
}
}
throw err
}
```

### Utilities

#### SqlUtils
Expand Down
4 changes: 2 additions & 2 deletions docs/api/pinot-noir.ebrokertransporterrorcode.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,15 @@ Invalid response from pinot
</td></tr>
<tr><td>

LIMIT_EXCEEDED
QUEUE_TOLERANCE_LIMIT

</td><td>

`3`

</td><td>

Limit exceded
Queue tolerance limit exceded

</td></tr>
<tr><td>
Expand Down
36 changes: 36 additions & 0 deletions docs/api/pinot-noir.ibrokertransportrequestoptions.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,39 @@ export interface IBrokerTransportRequestOptions extends Pick<Dispatcher.RequestO
```
**Extends:** Pick&lt;Dispatcher.RequestOptions, 'method' \| 'headers' \| 'path' \| 'body' \| 'query' \| 'bodyTimeout' \| 'headersTimeout'&gt;
## Properties
<table><thead><tr><th>
Property
</th><th>
Modifiers
</th><th>
Type
</th><th>
Description
</th></tr></thead>
<tbody><tr><td>
[options?](./pinot-noir.ibrokertransportrequestoptions.options.md)
</td><td>
</td><td>
{ queueTolerance?: number \| undefined; } \| undefined
</td><td>
_(Optional)_
</td></tr>
</tbody></table>
13 changes: 13 additions & 0 deletions docs/api/pinot-noir.ibrokertransportrequestoptions.options.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
<!-- Do not edit this file. It is automatically generated by API Documenter. -->

[Home](./index.md) &gt; [pinot-noir](./pinot-noir.md) &gt; [IBrokerTransportRequestOptions](./pinot-noir.ibrokertransportrequestoptions.md) &gt; [options](./pinot-noir.ibrokertransportrequestoptions.options.md)

## IBrokerTransportRequestOptions.options property

**Signature:**

```typescript
options?: {
queueTolerance?: number | undefined;
} | undefined;
```
15 changes: 15 additions & 0 deletions docs/api/pinot-noir.ipinotqueryoptions.md
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,21 @@ _(Optional)_ Number of replica groups to query when replica-group based routing
</td></tr>
<tr><td>

[queueTolerance?](./pinot-noir.ipinotqueryoptions.queuetolerance.md)

</td><td>

</td><td>

QueueTolerancePredefined \| number

</td><td>

_(Optional)_ Queue tolerance in percent of `maxQueueSize`<!-- -->. If maxQueueSize \* queueTolerance &lt;<!-- -->= queue size the request is discarded.

</td></tr>
<tr><td>

[skipIndexes?](./pinot-noir.ipinotqueryoptions.skipindexes.md)

</td><td>
Expand Down
13 changes: 13 additions & 0 deletions docs/api/pinot-noir.ipinotqueryoptions.queuetolerance.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
<!-- Do not edit this file. It is automatically generated by API Documenter. -->

[Home](./index.md) &gt; [pinot-noir](./pinot-noir.md) &gt; [IPinotQueryOptions](./pinot-noir.ipinotqueryoptions.md) &gt; [queueTolerance](./pinot-noir.ipinotqueryoptions.queuetolerance.md)

## IPinotQueryOptions.queueTolerance property

Queue tolerance in percent of `maxQueueSize`<!-- -->. If maxQueueSize \* queueTolerance &lt;<!-- -->= queue size the request is discarded.

**Signature:**

```typescript
queueTolerance?: QueueTolerancePredefined | number;
```
22 changes: 22 additions & 0 deletions docs/api/pinot-noir.md
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,28 @@ Pinot result table
</td></tr>
</tbody></table>

## Variables

<table><thead><tr><th>

Variable

</th><th>

Description

</th></tr></thead>
<tbody><tr><td>

[NON_PINOT_OPTIONS](./pinot-noir.non_pinot_options.md)

</td><td>

Non pinot options list.

</td></tr>
</tbody></table>

## Type Aliases

<table><thead><tr><th>
Expand Down
13 changes: 13 additions & 0 deletions docs/api/pinot-noir.non_pinot_options.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
<!-- Do not edit this file. It is automatically generated by API Documenter. -->

[Home](./index.md) &gt; [pinot-noir](./pinot-noir.md) &gt; [NON_PINOT_OPTIONS](./pinot-noir.non_pinot_options.md)

## NON_PINOT_OPTIONS variable

Non pinot options list.

**Signature:**

```typescript
NON_PINOT_OPTIONS: readonly (keyof IPinotQueryOptions)[]
```
2 changes: 2 additions & 0 deletions docs/api/pinot-noir.pinotbrokerjsontransport.maxqueuesize.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

## PinotBrokerJSONTransport.maxQueueSize property

Maximum query queue size.

**Signature:**

```typescript
Expand Down
8 changes: 7 additions & 1 deletion docs/api/pinot-noir.pinotbrokerjsontransport.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ number \| undefined
</td><td>
Maximum query queue size.
</td></tr>
<tr><td>
Expand All @@ -94,6 +96,8 @@ Pool
</td><td>
HTTP client pool.
</td></tr>
<tr><td>
Expand Down Expand Up @@ -128,6 +132,8 @@ string
</td><td>
Pinot broker auth token.
</td></tr>
</tbody></table>
Expand Down Expand Up @@ -159,7 +165,7 @@ Closes connection to pinot broker
</td></tr>
<tr><td>
[request({ body, headers, method, path, query, })](./pinot-noir.pinotbrokerjsontransport.request.md)
[request({ body, headers, method, path, query, options, })](./pinot-noir.pinotbrokerjsontransport.request.md)
</td><td>
Expand Down
2 changes: 2 additions & 0 deletions docs/api/pinot-noir.pinotbrokerjsontransport.pool.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

## PinotBrokerJSONTransport.pool property

HTTP client pool.

**Signature:**

```typescript
Expand Down
4 changes: 2 additions & 2 deletions docs/api/pinot-noir.pinotbrokerjsontransport.request.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ Perform HTTP request to pinot
**Signature:**

```typescript
request<TResponse = unknown>({ body, headers, method, path, query, }: IBrokerTransportRequestOptions): Promise<TResponse>;
request<TResponse = unknown>({ body, headers, method, path, query, options, }: IBrokerTransportRequestOptions): Promise<TResponse>;
```

## Parameters
Expand All @@ -29,7 +29,7 @@ Description
</th></tr></thead>
<tbody><tr><td>

{ body, headers, method, path, query, }
{ body, headers, method, path, query, options, }

</td><td>

Expand Down
2 changes: 2 additions & 0 deletions docs/api/pinot-noir.pinotbrokerjsontransport.token.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

## PinotBrokerJSONTransport.token property

Pinot broker auth token.

**Signature:**

```typescript
Expand Down
23 changes: 18 additions & 5 deletions etc/pinot-noir.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ export const enum EBrokerErrorCode {
// @public
export const enum EBrokerTransportErrorCode {
INVALID_RESPONSE = 1,
QUEUE_TOLERANCE_LIMIT = 3,
TIMEOUT = 2,
UNKNOWN = 0
}

Expand Down Expand Up @@ -137,13 +139,20 @@ export interface IBrokerTransportConfig {
bodyTimeout?: number;
brokerUrl: URL | string;
connections?: number;
connectTimeout?: number;
headersTimeout?: number;
keepAliveMaxTimeout?: number;
keepAliveTimeout?: number;
maxQueueSize?: number;
token: string;
}

// @public
export interface IBrokerTransportRequestOptions extends Pick<Dispatcher.RequestOptions, 'method' | 'headers' | 'path' | 'body' | 'query'> {
export interface IBrokerTransportRequestOptions extends Pick<Dispatcher.RequestOptions, 'method' | 'headers' | 'path' | 'body' | 'query' | 'bodyTimeout' | 'headersTimeout'> {
// (undocumented)
options?: {
queueTolerance?: number | undefined;
} | undefined;
}

// @public
Expand Down Expand Up @@ -199,6 +208,8 @@ export interface IPinotQueryOptions {
minSegmentGroupTrimSize?: number;
minServerGroupTrimSize?: number;
numReplicaGroupsToQuery?: number;
// Warning: (ae-forgotten-export) The symbol "QueueTolerancePredefined" needs to be exported by the entry point index.d.ts
queueTolerance?: QueueTolerancePredefined | number;
skipIndexes?: string;
skipUpsert?: boolean;
timeoutMs?: number;
Expand Down Expand Up @@ -262,6 +273,9 @@ export interface IResultTable {

export { join }

// @public
export const NON_PINOT_OPTIONS: readonly (keyof IPinotQueryOptions)[];

// @public
export class PinotBrokerClient implements IPinotClient {
constructor(deps: {
Expand All @@ -278,13 +292,12 @@ export class PinotBrokerClient implements IPinotClient {

// @public
export class PinotBrokerJSONTransport implements IPinotBrokerTransport {
constructor({ brokerUrl, token, bodyTimeout, connections, keepAliveMaxTimeout, }: IBrokerTransportConfig);
constructor({ bodyTimeout, brokerUrl, connections, headersTimeout, keepAliveMaxTimeout, token, maxQueueSize, }: IBrokerTransportConfig);
close(): Promise<void>;
// (undocumented)
protected readonly maxQueueSize: number | undefined;
protected readonly pool: Pool;
request<TResponse = unknown>({ method, headers, path, body, query, }: IBrokerTransportRequestOptions): Promise<TResponse>;
request<TResponse = unknown>({ body, headers, method, path, query, options, }: IBrokerTransportRequestOptions): Promise<TResponse>;
get stats(): IPinotPoolStats;
// (undocumented)
protected readonly token: string;
}

Expand Down
5 changes: 5 additions & 0 deletions src/client/broker/broker-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import {
EBrokerErrorCode,
IPinotClient,
IPinotPoolStats,
NON_PINOT_OPTIONS,
type IBrokerResponse,
type IPinotQueryOptions,
type IQueryResult,
Expand Down Expand Up @@ -44,6 +45,9 @@ export class PinotClient implements IPinotClient {
): string | undefined {
return options
? Object.entries(options)
.filter(([k]) => {
return !NON_PINOT_OPTIONS.includes(k as keyof IPinotQueryOptions);
})
.map((kv) => {
return kv.join('=');
})
Expand Down Expand Up @@ -92,6 +96,7 @@ export class PinotClient implements IPinotClient {
method: 'POST',
path: PinotClient.ENDPOINTS.sql,
...PinotClient.getTimeouts(options?.timeoutMs),
options,
body: JSON.stringify({
sql,
queryOptions,
Expand Down
Loading

0 comments on commit dc73f84

Please sign in to comment.