Skip to content

Commit

Permalink
Add job queue infrastructure
Browse files Browse the repository at this point in the history
We want to process bulk uploads asynchronously, and so this means we
need a worker queue.  Since this initial task is going to be infrequent,
we felt it did not warrant adding a new dependency to our stack (e.g.
reddis or rabbitmq), and so we're going to just leverage postgres as our
backend.

I explored two tools for this purpose: pg-boss, and graphile-worker.
Both packages appeared to meet our technical needs, and both seem to be
maintained and similarly popular.  I decided to go with graphile-worker
as it appears to be part of a suite of related tools and the primary
developer appears to have built a community around that suite of tools
(and earns some income to support the development of the project).

Issue #559 Set up a queue / processing system for handling bulk proposal uploads
  • Loading branch information
slifty committed Nov 30, 2023
1 parent 8f82c68 commit 00bc5ce
Show file tree
Hide file tree
Showing 14 changed files with 501 additions and 165 deletions.
502 changes: 337 additions & 165 deletions package-lock.json

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
"dotenv": "^16.3.1",
"express": "^4.18.2",
"express-jwt": "^8.4.1",
"graphile-worker": "^0.15.1",
"jwks-rsa": "^3.1.0",
"pino": "^8.16.2",
"pino-http": "^8.5.1",
Expand Down
2 changes: 2 additions & 0 deletions src/database/migrate.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import path from 'path';
import { migrate as pgMigrate } from 'postgres-schema-migrations';
import { runJobQueueMigrations } from '../jobQueue';
import { db } from './db';

export const migrate = async (schema = 'public'): Promise<void> => {
Expand All @@ -10,6 +11,7 @@ export const migrate = async (schema = 'public'): Promise<void> => {
path.resolve(__dirname, 'migrations'),
{ schema },
);
await runJobQueueMigrations();
} finally {
client.release();
}
Expand Down
1 change: 1 addition & 0 deletions src/errors/JobQueueStateError.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export class JobQueueStateError extends Error {}

Check warning on line 1 in src/errors/JobQueueStateError.ts

View check run for this annotation

Codecov / codecov/patch

src/errors/JobQueueStateError.ts#L1

Added line #L1 was not covered by tests
8 changes: 8 additions & 0 deletions src/handlers/bulkUploadsHandlers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@ import {
import {
DatabaseError,
InputValidationError,
NotFoundError,
} from '../errors';
import {
extractPaginationParameters,
} from '../queryParameters';
import { addProcessBulkUploadJob } from '../jobQueue';
import type {
Request,
Response,
Expand Down Expand Up @@ -46,6 +48,12 @@ const createBulkUpload = (
status: BulkUploadStatus.PENDING,
});
const bulkUpload = bulkUploadsQueryResult.rows[0];
if (!bulkUpload) {
throw new NotFoundError('The database did not return an entity after bulk upload creation.');

Check warning on line 52 in src/handlers/bulkUploadsHandlers.ts

View check run for this annotation

Codecov / codecov/patch

src/handlers/bulkUploadsHandlers.ts#L52

Added line #L52 was not covered by tests
}
await addProcessBulkUploadJob({
bulkUploadId: bulkUpload.id,
});
res.status(201)
.contentType('application/json')
.send(bulkUpload);
Expand Down
5 changes: 5 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { app } from './app';
import { startJobQueue } from './jobQueue';

Check warning on line 2 in src/index.ts

View check run for this annotation

Codecov / codecov/patch

src/index.ts#L2

Added line #L2 was not covered by tests
import { getLogger } from './logger';

const logger = getLogger(__filename);
Expand All @@ -16,3 +17,7 @@ app.listen(
logger.info(`Server running on http://${host}:${port}`);
},
);

startJobQueue().catch((err) => {
logger.error(err, 'Job queue failed to start');

Check warning on line 22 in src/index.ts

View check run for this annotation

Codecov / codecov/patch

src/index.ts#L21-L22

Added lines #L21 - L22 were not covered by tests
});
79 changes: 79 additions & 0 deletions src/jobQueue.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import {
Logger,
quickAddJob,
run,
runMigrations,
} from 'graphile-worker';
import { getLogger } from './logger';
import { db } from './database/db';
import { processBulkUpload } from './tasks';
import type { ProcessBulkUploadJobPayload } from './types';

const logger = getLogger(__filename);

enum JobType {
PROCESS_BULK_UPLOAD = 'processBulkUpload',
}

export const jobQueueLogger = new Logger((scope) => (
(level, message, meta) => {

Check warning on line 19 in src/jobQueue.ts

View check run for this annotation

Codecov / codecov/patch

src/jobQueue.ts#L19

Added line #L19 was not covered by tests
switch (level.valueOf()) {
case 'error':
logger.error({ meta, scope }, message);
break;
case 'warn':
logger.warn({ meta, scope }, message);
break;
case 'info':
logger.info({ meta, scope }, message);
break;
case 'debug':
logger.debug({ meta, scope }, message);
break;
default:
logger.info({ meta, scope }, message);

Check warning on line 34 in src/jobQueue.ts

View check run for this annotation

Codecov / codecov/patch

src/jobQueue.ts#L21-L34

Added lines #L21 - L34 were not covered by tests
}
}
));

export const startJobQueue = async () => {
const runner = await run({

Check warning on line 40 in src/jobQueue.ts

View check run for this annotation

Codecov / codecov/patch

src/jobQueue.ts#L40

Added line #L40 was not covered by tests
logger: jobQueueLogger,
pgPool: db.pool,
concurrency: 5,
noHandleSignals: false,
pollInterval: 1000,
taskList: {
processBulkUpload,
},
});
await runner.promise;

Check warning on line 50 in src/jobQueue.ts

View check run for this annotation

Codecov / codecov/patch

src/jobQueue.ts#L50

Added line #L50 was not covered by tests
};

export const runJobQueueMigrations = async () => (
runMigrations({
logger: jobQueueLogger,
pgPool: db.pool,
})
);

export const addJob = async (
jobType: JobType,
payload: unknown,
) => (
quickAddJob(
{
logger: jobQueueLogger,
pgPool: db.pool,
},
jobType,
payload,
)
);

export const addProcessBulkUploadJob = async (payload: ProcessBulkUploadJobPayload) => (
addJob(
JobType.PROCESS_BULK_UPLOAD,
payload,
)
);
12 changes: 12 additions & 0 deletions src/tasks/__tests__/processBulkUpload.unit.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import { getMockJobHelpers } from '../../test/mockGraphileWorker';
import { processBulkUpload } from '../processBulkUpload';
import { InternalValidationError } from '../../errors';

describe('processBulkUpload', () => {
it('should error when passed an invalid payload', async () => {
await expect(processBulkUpload(
{},
getMockJobHelpers(),
)).rejects.toBeInstanceOf(InternalValidationError);
});
});
1 change: 1 addition & 0 deletions src/tasks/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export * from './processBulkUpload';
17 changes: 17 additions & 0 deletions src/tasks/processBulkUpload.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import { isProcessBulkUploadJobPayload } from '../types';
import { InternalValidationError } from '../errors';
import type { JobHelpers } from 'graphile-worker';

export const processBulkUpload = async (
payload: unknown,
helpers: JobHelpers,
): Promise<void> => {
if (!isProcessBulkUploadJobPayload(payload)) {
helpers.logger.debug('Malformed bulk upload job payload', { errors: isProcessBulkUploadJobPayload.errors ?? [] });
throw new InternalValidationError(
'The bulk upload job payload is not properly formed',
isProcessBulkUploadJobPayload.errors ?? [],
);
}
helpers.logger.debug(`Started processBulkUpload Job for Bulk Upload ID ${payload.bulkUploadId}`);

Check warning on line 16 in src/tasks/processBulkUpload.ts

View check run for this annotation

Codecov / codecov/patch

src/tasks/processBulkUpload.ts#L16

Added line #L16 was not covered by tests
};
11 changes: 11 additions & 0 deletions src/test/integrationSuiteSetup.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,17 @@ import {
} from './harnessFunctions';
import { mockJwks } from './mockJwt';

// This mock prevents our queue manager from actually being invoked.
// It's necessary because of the way we leverage PGOPTIONS to specify
// the schema / search path when preparing the test worker to interact
// with specific schemas.
//
// We may eventually want to be able to write tests that interact with the queue
// and we may eventually face issues with blunt mocking of graphile-worker.
// When that happens, we'll need to remove this mock and change the way we're
// setting the schema / path.
jest.mock('graphile-worker');

beforeAll(async () => {
mockJwks.start();
});
Expand Down
6 changes: 6 additions & 0 deletions src/test/mockGraphileWorker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
import { jobQueueLogger } from '../jobQueue';
import type { JobHelpers } from 'graphile-worker';

export const getMockJobHelpers = (): JobHelpers => ({
logger: jobQueueLogger,
} as JobHelpers);
20 changes: 20 additions & 0 deletions src/types/ProcessBulkUploadJobPayload.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import { ajv } from '../ajv';
import type { JSONSchemaType } from 'ajv';

export interface ProcessBulkUploadJobPayload {
bulkUploadId: number;
}

export const processBulkUploadJobPayloadSchema: JSONSchemaType<ProcessBulkUploadJobPayload> = {
type: 'object',
properties: {
bulkUploadId: {
type: 'integer',
},
},
required: [
'bulkUploadId',
],
};

export const isProcessBulkUploadJobPayload = ajv.compile(processBulkUploadJobPayloadSchema);
1 change: 1 addition & 0 deletions src/types/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ export * from './PaginationParametersQuery';
export * from './PlatformProviderResponse';
export * from './PostgresErrorCode';
export * from './PresignedPostRequest';
export * from './ProcessBulkUploadJobPayload';
export * from './Proposal';
export * from './ProposalFieldValue';
export * from './ProposalVersion';
Expand Down

0 comments on commit 00bc5ce

Please sign in to comment.