Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[mds-ingest-service] Add pagination and order support to getEvents via ingest #643

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 27 additions & 3 deletions packages/mds-ingest-service/@types/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/

import { DomainModelCreate, RecordedColumn } from '@mds-core/mds-repository'
import { RpcRoute, RpcServiceDefinition } from '@mds-core/mds-rpc-common'
import {
ACCESSIBILITY_OPTION,
MODALITY,
Expand All @@ -29,7 +30,6 @@ import {
VEHICLE_STATE,
VEHICLE_TYPE
} from '@mds-core/mds-types'
import { RpcServiceDefinition, RpcRoute } from '@mds-core/mds-rpc-common'

export interface DeviceDomainModel extends RecordedColumn {
device_id: UUID
Expand Down Expand Up @@ -70,6 +70,19 @@ export type TimeRange = {
start: Timestamp
end: Timestamp
}

export const GetVehicleEventsOrderColumn = <const>['timestamp', 'vehicle_id', 'provider_id', 'vehicle_state']
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious as I've seen this in other places as I learn typescript. Is it common to name the value and type name the same? I've seen it where the value name is descriptive and would name it like "GetVehicleEventOrderColumns" (with an s to describe these are all the columns) and the type is then without the "s". Also, is it preferred to use vs "as const"? Are they the same?

Copy link
Author

@cjlynch12 cjlynch12 Jul 1, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've seen instances of both

export const FooTypes = <const>['bar', 'baz']
export type FooType = typeof FooTypes[number]

and

export const FooType = <const>['bar', 'baz']
export type FooType = typeof FooType[number]

In various places in the code base. I think the latter is the preferred.

My assumption is that the latter just reduces the overhead of knowing which type to choose (FooTypes vs FooTypes) since TS will interpret it as the type declaration. (@drtyh2o feel free to tell me I'm off base here πŸ˜„ ).

As for
<const> foo vs foo as const, they are equivalent except in a .tsx file.


export type GetVehicleEventsOrderColumn = typeof GetVehicleEventsOrderColumn[number]

export const GetVehicleEventsOrderDirection = <const>['ASC', 'DESC']

export type GetVehicleEventsOrderDirection = typeof GetVehicleEventsOrderDirection[number]

export type GetVehicleEventsOrderOption = {
column: GetVehicleEventsOrderColumn
direction?: GetVehicleEventsOrderDirection
}
export interface GetVehicleEventsFilterParams {
vehicle_types?: VEHICLE_TYPE[]
propulsion_types?: PROPULSION_TYPE[]
Expand All @@ -82,6 +95,15 @@ export interface GetVehicleEventsFilterParams {
event_types?: VEHICLE_EVENT[]
geography_ids?: UUID[]
limit?: number
order?: GetVehicleEventsOrderOption
}

export type GetVehicleEventsResponse = {
events: EventDomainModel[]
cursor: {
prev: Nullable<string>
next: Nullable<string>
}
}

export interface EventDomainModel extends RecordedColumn {
Expand All @@ -102,12 +124,14 @@ export type EventDomainCreateModel = DomainModelCreate<Omit<EventDomainModel, ke

export interface IngestService {
name: () => string
getEvents: (params: GetVehicleEventsFilterParams) => EventDomainModel[]
getEventsUsingOptions: (params: GetVehicleEventsFilterParams) => GetVehicleEventsResponse
getEventsUsingCursor: (cursor: string) => GetVehicleEventsResponse
getDevices: (ids: UUID[]) => DeviceDomainModel[]
}

export const IngestServiceDefinition: RpcServiceDefinition<IngestService> = {
name: RpcRoute<IngestService['name']>(),
getEvents: RpcRoute<IngestService['getEvents']>(),
getEventsUsingOptions: RpcRoute<IngestService['getEventsUsingOptions']>(),
getEventsUsingCursor: RpcRoute<IngestService['getEventsUsingCursor']>(),
getDevices: RpcRoute<IngestService['getDevices']>()
}
5 changes: 3 additions & 2 deletions packages/mds-ingest-service/client/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
* limitations under the License.
*/

import { ServiceClient } from '@mds-core/mds-service-helpers'
import { RpcClient, RpcRequest } from '@mds-core/mds-rpc-common'
import { ServiceClient } from '@mds-core/mds-service-helpers'
import { IngestService, IngestServiceDefinition } from '../@types'

const IngestServiceRpcClient = RpcClient(IngestServiceDefinition, {
Expand All @@ -26,6 +26,7 @@ const IngestServiceRpcClient = RpcClient(IngestServiceDefinition, {
// What the API layer, and any other clients, will invoke.
export const IngestServiceClient: ServiceClient<IngestService> = {
name: (...args) => RpcRequest(IngestServiceRpcClient.name, args),
getEvents: (...args) => RpcRequest(IngestServiceRpcClient.getEvents, args),
getEventsUsingOptions: (...args) => RpcRequest(IngestServiceRpcClient.getEventsUsingOptions, args),
getEventsUsingCursor: (...args) => RpcRequest(IngestServiceRpcClient.getEventsUsingCursor, args),
getDevices: (...args) => RpcRequest(IngestServiceRpcClient.getDevices, args)
}
3 changes: 2 additions & 1 deletion packages/mds-ingest-service/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
"@mds-core/mds-types": "0.1.25",
"@mds-core/mds-utils": "0.1.28",
"joi": "17.4.0",
"typeorm": "0.2.34"
"typeorm": "0.2.34",
"typeorm-cursor-pagination": "0.6.1"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@
* limitations under the License.
*/

import { Entity, Column, Index } from 'typeorm'
import { BigintTransformer, IdentityColumn, RecordedColumn } from '@mds-core/mds-repository'
import { Nullable, Timestamp } from '@mds-core/mds-types'
import { Column, Entity, Index } from 'typeorm'
import { EventDomainModel } from '../../@types'
import { TelemetryEntity, TelemetryEntityModel } from './telemetry-entity'
import { Nullable } from '@mds-core/mds-types'

export interface EventEntityModel extends IdentityColumn, RecordedColumn {
device_id: EventDomainModel['device_id']
Expand All @@ -42,7 +42,7 @@ export class EventEntity extends IdentityColumn(RecordedColumn(class {})) implem
provider_id: EventEntityModel['provider_id']

@Column('bigint', { transformer: BigintTransformer, primary: true })
timestamp: EventEntityModel['timestamp']
timestamp: Timestamp
Copy link
Author

@cjlynch12 cjlynch12 Jul 1, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Strange type behavior here with the typeorm-cursor-pagination lib.
When this is left as EventEntityModel['timestamp'], and you pass limit to the buildPaginator(...) function, it results in the following error:

unknown type in cursor: [object]1625149089973

Which corresponds to the time_range: {begin} parameter.

Interestingly, this is only an issue when limit is passed. If the type declaration here is left as EventEntityModel['timestamp'] and limit isn't provided, it works fine. As soon as limit is provided, it breaks.

Changing this to Timestamp directly here fixes the issue in both cases.

This doesn't have any issues afaict, all previous tests are still passing with this change. 🀷

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep. We’ve seen this before.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We've recently stopped defining entity properties in terms of Entity or (especially) Domain model properties and use the base types instead. Some libraries that use metadata reflection have issues tracing the aliases to the base type. In recent repositories, you will see a pattern something like:

export class SomeEntity {
  @Column(...)
  entity_id: UUID

  @Column(...)
  timestamp: Timestamp
}

/* Create an alias to keep consistent with other services */
export SomeEntityModel = SomeEntity


@Column('varchar', { array: true, length: 31 })
event_types: EventEntityModel['event_types']
Expand Down
89 changes: 78 additions & 11 deletions packages/mds-ingest-service/repository/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,17 @@
*/

import { InsertReturning, ReadWriteRepository, RepositoryError } from '@mds-core/mds-repository'
import { Any } from 'typeorm'
import { isUUID } from '@mds-core/mds-utils'
import { UUID } from '@mds-core/mds-types'
import { isUUID, ValidationError } from '@mds-core/mds-utils'
import { Any } from 'typeorm'
import { buildPaginator, Cursor } from 'typeorm-cursor-pagination'
import {
EventDomainModel,
EventDomainCreateModel,
TelemetryDomainCreateModel,
DeviceDomainCreateModel,
DeviceDomainModel,
EventDomainCreateModel,
GetVehicleEventsFilterParams,
DeviceDomainModel
GetVehicleEventsResponse,
TelemetryDomainCreateModel
} from '../@types'
import entities from './entities'
import { DeviceEntity } from './entities/device-entity'
Expand All @@ -40,6 +41,8 @@ import {
} from './mappers'
import migrations from './migrations'

type VehicleEventsQueryParams = GetVehicleEventsFilterParams & Cursor

/**
* Aborts execution if not running under a test environment.
*/
Expand All @@ -53,6 +56,17 @@ class IngestReadWriteRepository extends ReadWriteRepository {
super('ingest', { entities, migrations })
}

private buildCursor = (cursor: VehicleEventsQueryParams) =>
Buffer.from(JSON.stringify(cursor), 'utf-8').toString('base64')

private parseCursor = (cursor: string): VehicleEventsQueryParams & { limit: number } => {
try {
return JSON.parse(Buffer.from(cursor, 'base64').toString('utf-8'))
} catch (error) {
throw new ValidationError('Invalid cursor', error)
}
}

public createEvents = async (events: EventDomainCreateModel[]) => {
const { connect } = this
try {
Expand Down Expand Up @@ -114,10 +128,10 @@ class IngestReadWriteRepository extends ReadWriteRepository {
}
}

public getEvents = async (params: GetVehicleEventsFilterParams): Promise<EventDomainModel[]> => {
private getEvents = async (params: VehicleEventsQueryParams): Promise<GetVehicleEventsResponse> => {
const { connect } = this
const {
time_range: { start, end },
time_range,
// geography_ids,
grouping_type = 'latest_per_vehicle',
event_types,
Expand All @@ -127,8 +141,14 @@ class IngestReadWriteRepository extends ReadWriteRepository {
device_ids,
propulsion_types,
provider_ids,
limit
limit,
beforeCursor,
afterCursor,
order
} = params

const { start, end } = time_range

try {
const connection = await connect('ro')

Expand Down Expand Up @@ -199,14 +219,61 @@ class IngestReadWriteRepository extends ReadWriteRepository {
query.andWhere('events.provider_id = ANY(:provider_ids)', { provider_ids })
}

const entities = await query.limit(limit).getMany()
// Use query instead of paginator to manage order if using a joined field
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

couldn't figure out how to pass an order column string for a joined field to buildPaginator without it throwing an error, if there's a better way to handle this field happy to change it.

if (order && order.column === 'vehicle_id') {
query.orderBy('devices.vehicle_id', order.direction)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm unsure how the query.orderBy and paginationKeys interact, but I assume the pagination keys will get appended to the orderBy. Is timestamp always the secondary sort column?

Copy link
Author

@cjlynch12 cjlynch12 Jul 1, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call out, I took a look at this and it turns out that paginationKeys always override anything added to the query with orderBy or addOrderBy. If you remove paginationKeys param entirely, it just uses id (the default).

There seems to be an open issue for this (can't sort on relational tables / override paginationKeys) - and the maintainer has ack'd it recently.
benjamin658/typeorm-cursor-pagination#4

So for now, removed the ability to sort by vehicle_id. Will follow up with the open issue. Not worth sinking a bunch of time for this one sort-able field atm.

That being said, sorting on joined tables will most likely come up in the future at some point - so hopefully this gets addressed in typeorm-cursor-pagination shortly.

PS - updated tests since apparently they were false positives for sorting on vehicle_id.

}

const pager = buildPaginator({
entity: EventEntity,
alias: 'events',
paginationKeys: ['timestamp', 'id'],
query: {
limit,
order: order?.direction ?? (order?.column === undefined ? 'DESC' : 'ASC'),
beforeCursor: beforeCursor ?? undefined, // typeorm-cursor-pagination type weirdness
afterCursor: afterCursor ?? undefined // typeorm-cursor-pagination type weirdness
}
})

return entities.map(EventEntityToDomain.map)
const {
data,
cursor: { beforeCursor: nextBeforeCursor, afterCursor: nextAfterCursor }
} = await pager.paginate(query)

const cursor = {
time_range,
// geography_ids,
grouping_type,
event_types,
vehicle_states,
vehicle_types,
vehicle_id,
device_ids,
propulsion_types,
provider_ids,
limit,
order
}

return {
events: data.map(EventEntityToDomain.map),
cursor: {
next: nextAfterCursor && this.buildCursor({ ...cursor, beforeCursor: null, afterCursor: nextAfterCursor }),
prev: nextBeforeCursor && this.buildCursor({ ...cursor, beforeCursor: nextBeforeCursor, afterCursor: null })
}
}
} catch (error) {
throw RepositoryError(error)
}
}

public getEventsUsingOptions = async (options: GetVehicleEventsFilterParams): Promise<GetVehicleEventsResponse> =>
this.getEvents({ ...options, beforeCursor: null, afterCursor: null, limit: options.limit ?? 100 })

public getEventsUsingCursor = async (cursor: string): Promise<GetVehicleEventsResponse> =>
this.getEvents(this.parseCursor(cursor))

/**
* Nukes everything from orbit. Boom.
*/
Expand Down
3 changes: 2 additions & 1 deletion packages/mds-ingest-service/service/manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ export const IngestServiceManager = RpcServer(
},
{
name: args => IngestServiceProvider.name(...args),
getEvents: args => IngestServiceProvider.getEvents(...args),
getEventsUsingOptions: args => IngestServiceProvider.getEventsUsingOptions(...args),
getEventsUsingCursor: args => IngestServiceProvider.getEventsUsingCursor(...args),
getDevices: args => IngestServiceProvider.getDevices(...args)
},
{
Expand Down
19 changes: 14 additions & 5 deletions packages/mds-ingest-service/service/provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,29 @@
* limitations under the License.
*/

import { ServiceProvider, ProcessController, ServiceResult, ServiceException } from '@mds-core/mds-service-helpers'
import logger from '@mds-core/mds-logger'
import { ProcessController, ServiceException, ServiceProvider, ServiceResult } from '@mds-core/mds-service-helpers'
import { UUID } from '@mds-core/mds-types'
import { IngestService } from '../@types'
import { IngestRepository } from '../repository'
import logger from '@mds-core/mds-logger'
import { validateGetVehicleEventsFilterParams, validateUUIDs } from './validators'
import { UUID } from '@mds-core/mds-types'

export const IngestServiceProvider: ServiceProvider<IngestService> & ProcessController = {
start: IngestRepository.initialize,
stop: IngestRepository.shutdown,
name: async () => ServiceResult('mds-ingest-service'),
getEvents: async params => {
getEventsUsingOptions: async params => {
try {
return ServiceResult(await IngestRepository.getEventsUsingOptions(validateGetVehicleEventsFilterParams(params)))
} catch (error) {
const exception = ServiceException(`Error in getEvents `, error)
logger.error('getEvents exception', { exception, error })
return exception
}
},
getEventsUsingCursor: async cursor => {
try {
return ServiceResult(await IngestRepository.getEvents(validateGetVehicleEventsFilterParams(params)))
return ServiceResult(await IngestRepository.getEventsUsingCursor(cursor))
} catch (error) {
const exception = ServiceException(`Error in getEvents `, error)
logger.error('getEvents exception', { exception, error })
Expand Down
12 changes: 11 additions & 1 deletion packages/mds-ingest-service/service/validators.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import {
DeviceDomainModel,
EventDomainModel,
GetVehicleEventsFilterParams,
GetVehicleEventsOrderColumn,
GetVehicleEventsOrderDirection,
GROUPING_TYPES,
TelemetryDomainModel
} from '../@types'
Expand Down Expand Up @@ -163,7 +165,15 @@ export const { validate: validateGetVehicleEventsFilterParams } = SchemaValidato
vehicle_id: { type: 'string' },
device_ids: { type: 'array', items: { type: 'string', format: 'uuid' } },
event_types: { type: 'array', items: { type: 'string', enum: [...new Set(VEHICLE_EVENTS)] } },
limit: { type: 'integer' }
limit: { type: 'integer' },
order: {
type: 'object',
properties: {
column: { type: 'string', enum: [...GetVehicleEventsOrderColumn] },
direction: { type: 'string', enum: [...GetVehicleEventsOrderDirection] }
},
additionalProperties: false
}
},
required: ['time_range']
})
Expand Down
Loading