Skip to content

Commit

Permalink
[mds-ingest-service] Add pagination and order support to getEvents vi…
Browse files Browse the repository at this point in the history
…a ingest (#643)
  • Loading branch information
cjlynch12 authored Jul 2, 2021
1 parent e87e9dc commit f4500c6
Show file tree
Hide file tree
Showing 10 changed files with 287 additions and 46 deletions.
28 changes: 26 additions & 2 deletions packages/mds-ingest-service/@types/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,19 @@ export type TimeRange = {
start: Timestamp
end: Timestamp
}

export const GetVehicleEventsOrderColumn = <const>['timestamp', 'provider_id', 'vehicle_state']

export type GetVehicleEventsOrderColumn = typeof GetVehicleEventsOrderColumn[number]

export const GetVehicleEventsOrderDirection = <const>['ASC', 'DESC']

export type GetVehicleEventsOrderDirection = typeof GetVehicleEventsOrderDirection[number]

export type GetVehicleEventsOrderOption = {
column: GetVehicleEventsOrderColumn
direction?: GetVehicleEventsOrderDirection
}
export interface GetVehicleEventsFilterParams {
vehicle_types?: VEHICLE_TYPE[]
propulsion_types?: PROPULSION_TYPE[]
Expand All @@ -82,6 +95,15 @@ export interface GetVehicleEventsFilterParams {
event_types?: VEHICLE_EVENT[]
geography_ids?: UUID[]
limit?: number
order?: GetVehicleEventsOrderOption
}

export type GetVehicleEventsResponse = {
events: EventDomainModel[]
cursor: {
prev: Nullable<string>
next: Nullable<string>
}
}

export interface EventDomainModel extends RecordedColumn {
Expand All @@ -102,12 +124,14 @@ export type EventDomainCreateModel = DomainModelCreate<Omit<EventDomainModel, ke

export interface IngestService {
name: () => string
getEvents: (params: GetVehicleEventsFilterParams) => EventDomainModel[]
getEventsUsingOptions: (params: GetVehicleEventsFilterParams) => GetVehicleEventsResponse
getEventsUsingCursor: (cursor: string) => GetVehicleEventsResponse
getDevices: (ids: UUID[]) => DeviceDomainModel[]
}

export const IngestServiceDefinition: RpcServiceDefinition<IngestService> = {
name: RpcRoute<IngestService['name']>(),
getEvents: RpcRoute<IngestService['getEvents']>(),
getEventsUsingOptions: RpcRoute<IngestService['getEventsUsingOptions']>(),
getEventsUsingCursor: RpcRoute<IngestService['getEventsUsingCursor']>(),
getDevices: RpcRoute<IngestService['getDevices']>()
}
3 changes: 2 additions & 1 deletion packages/mds-ingest-service/client/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ const IngestServiceRpcClient = RpcClient(IngestServiceDefinition, {
// What the API layer, and any other clients, will invoke.
export const IngestServiceClient: ServiceClient<IngestService> = {
name: (...args) => RpcRequest(IngestServiceRpcClient.name, args),
getEvents: (...args) => RpcRequest(IngestServiceRpcClient.getEvents, args),
getEventsUsingOptions: (...args) => RpcRequest(IngestServiceRpcClient.getEventsUsingOptions, args),
getEventsUsingCursor: (...args) => RpcRequest(IngestServiceRpcClient.getEventsUsingCursor, args),
getDevices: (...args) => RpcRequest(IngestServiceRpcClient.getDevices, args)
}
3 changes: 2 additions & 1 deletion packages/mds-ingest-service/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
"@mds-core/mds-types": "0.1.25",
"@mds-core/mds-utils": "0.1.28",
"joi": "17.4.0",
"typeorm": "0.2.34"
"typeorm": "0.2.34",
"typeorm-cursor-pagination": "0.6.1"
}
}
20 changes: 10 additions & 10 deletions packages/mds-ingest-service/repository/entities/event-entity.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/

import { BigintTransformer, IdentityColumn, RecordedColumn } from '@mds-core/mds-repository'
import { Nullable } from '@mds-core/mds-types'
import { Nullable, Timestamp, TRIP_STATE, UUID, VEHICLE_EVENT, VEHICLE_STATE } from '@mds-core/mds-types'
import { Column, Entity, Index } from 'typeorm'
import { EventDomainModel } from '../../@types'
import { TelemetryEntity, TelemetryEntityModel } from './telemetry-entity'
Expand All @@ -36,32 +36,32 @@ export interface EventEntityModel extends IdentityColumn, RecordedColumn {
@Entity('events')
export class EventEntity extends IdentityColumn(RecordedColumn(class {})) implements EventEntityModel {
@Column('uuid', { primary: true })
device_id: EventEntityModel['device_id']
device_id: UUID

@Column('uuid')
provider_id: EventEntityModel['provider_id']
provider_id: UUID

@Column('bigint', { transformer: BigintTransformer, primary: true })
timestamp: EventEntityModel['timestamp']
timestamp: Timestamp

@Column('varchar', { array: true, length: 31 })
event_types: EventEntityModel['event_types']
event_types: VEHICLE_EVENT[]

@Column('varchar', { length: 31 })
vehicle_state: EventEntityModel['vehicle_state']
vehicle_state: VEHICLE_STATE

@Column('varchar', { length: 31, nullable: true })
trip_state: EventEntityModel['trip_state']
trip_state: TRIP_STATE

@Column('bigint', { transformer: BigintTransformer, nullable: true })
telemetry_timestamp: EventEntityModel['telemetry_timestamp']
telemetry_timestamp: Nullable<Timestamp>

@Index()
@Column('uuid', { nullable: true })
trip_id: EventEntityModel['trip_id']
trip_id: Nullable<UUID>

@Column('uuid', { nullable: true })
service_area_id: EventEntityModel['service_area_id']
service_area_id: Nullable<UUID>

telemetry?: TelemetryEntity
}
76 changes: 69 additions & 7 deletions packages/mds-ingest-service/repository/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,15 @@

import { InsertReturning, ReadWriteRepository, RepositoryError } from '@mds-core/mds-repository'
import { UUID } from '@mds-core/mds-types'
import { isUUID } from '@mds-core/mds-utils'
import { isUUID, ValidationError } from '@mds-core/mds-utils'
import { Any } from 'typeorm'
import { buildPaginator, Cursor } from 'typeorm-cursor-pagination'
import {
DeviceDomainCreateModel,
DeviceDomainModel,
EventDomainCreateModel,
EventDomainModel,
GetVehicleEventsFilterParams,
GetVehicleEventsResponse,
TelemetryDomainCreateModel
} from '../@types'
import entities from './entities'
Expand All @@ -40,6 +41,8 @@ import {
} from './mappers'
import migrations from './migrations'

type VehicleEventsQueryParams = GetVehicleEventsFilterParams & Cursor

/**
* Aborts execution if not running under a test environment.
*/
Expand All @@ -53,6 +56,17 @@ class IngestReadWriteRepository extends ReadWriteRepository {
super('ingest', { entities, migrations })
}

private buildCursor = (cursor: VehicleEventsQueryParams) =>
Buffer.from(JSON.stringify(cursor), 'utf-8').toString('base64')

private parseCursor = (cursor: string): VehicleEventsQueryParams & { limit: number } => {
try {
return JSON.parse(Buffer.from(cursor, 'base64').toString('utf-8'))
} catch (error) {
throw new ValidationError('Invalid cursor', error)
}
}

public createEvents = async (events: EventDomainCreateModel[]) => {
const { connect } = this
try {
Expand Down Expand Up @@ -114,10 +128,10 @@ class IngestReadWriteRepository extends ReadWriteRepository {
}
}

public getEvents = async (params: GetVehicleEventsFilterParams): Promise<EventDomainModel[]> => {
private getEvents = async (params: VehicleEventsQueryParams): Promise<GetVehicleEventsResponse> => {
const { connect } = this
const {
time_range: { start, end },
time_range,
// geography_ids,
grouping_type = 'latest_per_vehicle',
event_types,
Expand All @@ -127,8 +141,14 @@ class IngestReadWriteRepository extends ReadWriteRepository {
device_ids,
propulsion_types,
provider_ids,
limit
limit,
beforeCursor,
afterCursor,
order
} = params

const { start, end } = time_range

try {
const connection = await connect('ro')

Expand Down Expand Up @@ -199,14 +219,56 @@ class IngestReadWriteRepository extends ReadWriteRepository {
query.andWhere('events.provider_id = ANY(:provider_ids)', { provider_ids })
}

const entities = await query.limit(limit).getMany()
const pager = buildPaginator({
entity: EventEntity,
alias: 'events',
paginationKeys: [order?.column ?? 'timestamp', 'id'],
query: {
limit,
order: order?.direction ?? (order?.column === undefined ? 'DESC' : 'ASC'),
beforeCursor: beforeCursor ?? undefined, // typeorm-cursor-pagination type weirdness
afterCursor: afterCursor ?? undefined // typeorm-cursor-pagination type weirdness
}
})

return entities.map(EventEntityToDomain.map)
const {
data,
cursor: { beforeCursor: nextBeforeCursor, afterCursor: nextAfterCursor }
} = await pager.paginate(query)

const cursor = {
time_range,
// geography_ids,
grouping_type,
event_types,
vehicle_states,
vehicle_types,
vehicle_id,
device_ids,
propulsion_types,
provider_ids,
limit,
order
}

return {
events: data.map(EventEntityToDomain.map),
cursor: {
next: nextAfterCursor && this.buildCursor({ ...cursor, beforeCursor: null, afterCursor: nextAfterCursor }),
prev: nextBeforeCursor && this.buildCursor({ ...cursor, beforeCursor: nextBeforeCursor, afterCursor: null })
}
}
} catch (error) {
throw RepositoryError(error)
}
}

public getEventsUsingOptions = async (options: GetVehicleEventsFilterParams): Promise<GetVehicleEventsResponse> =>
this.getEvents({ ...options, beforeCursor: null, afterCursor: null, limit: options.limit ?? 100 })

public getEventsUsingCursor = async (cursor: string): Promise<GetVehicleEventsResponse> =>
this.getEvents(this.parseCursor(cursor))

/**
* Nukes everything from orbit. Boom.
*/
Expand Down
3 changes: 2 additions & 1 deletion packages/mds-ingest-service/service/manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ export const IngestServiceManager = RpcServer(
},
{
name: args => IngestServiceProvider.name(...args),
getEvents: args => IngestServiceProvider.getEvents(...args),
getEventsUsingOptions: args => IngestServiceProvider.getEventsUsingOptions(...args),
getEventsUsingCursor: args => IngestServiceProvider.getEventsUsingCursor(...args),
getDevices: args => IngestServiceProvider.getDevices(...args)
},
{
Expand Down
13 changes: 11 additions & 2 deletions packages/mds-ingest-service/service/provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,18 @@ export const IngestServiceProvider: ServiceProvider<IngestService> & ProcessCont
start: IngestRepository.initialize,
stop: IngestRepository.shutdown,
name: async () => ServiceResult('mds-ingest-service'),
getEvents: async params => {
getEventsUsingOptions: async params => {
try {
return ServiceResult(await IngestRepository.getEvents(validateGetVehicleEventsFilterParams(params)))
return ServiceResult(await IngestRepository.getEventsUsingOptions(validateGetVehicleEventsFilterParams(params)))
} catch (error) {
const exception = ServiceException(`Error in getEvents `, error)
logger.error('getEvents exception', { exception, error })
return exception
}
},
getEventsUsingCursor: async cursor => {
try {
return ServiceResult(await IngestRepository.getEventsUsingCursor(cursor))
} catch (error) {
const exception = ServiceException(`Error in getEvents `, error)
logger.error('getEvents exception', { exception, error })
Expand Down
12 changes: 11 additions & 1 deletion packages/mds-ingest-service/service/validators.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import {
DeviceDomainModel,
EventDomainModel,
GetVehicleEventsFilterParams,
GetVehicleEventsOrderColumn,
GetVehicleEventsOrderDirection,
GROUPING_TYPES,
TelemetryDomainModel
} from '../@types'
Expand Down Expand Up @@ -163,7 +165,15 @@ export const { validate: validateGetVehicleEventsFilterParams } = SchemaValidato
vehicle_id: { type: 'string' },
device_ids: { type: 'array', items: { type: 'string', format: 'uuid' } },
event_types: { type: 'array', items: { type: 'string', enum: [...new Set(VEHICLE_EVENTS)] } },
limit: { type: 'integer' }
limit: { type: 'integer' },
order: {
type: 'object',
properties: {
column: { type: 'string', enum: [...GetVehicleEventsOrderColumn] },
direction: { type: 'string', enum: [...GetVehicleEventsOrderDirection] }
},
additionalProperties: false
}
},
required: ['time_range']
})
Expand Down
Loading

0 comments on commit f4500c6

Please sign in to comment.