@@ -16,6 +16,7 @@ import {
16
16
isBeforeTime ,
17
17
isInTimeRange ,
18
18
toTimezoneDate ,
19
+ AssetsUnderManagement ,
19
20
} from './utils'
20
21
21
22
const logger = makeLogger ( 'Superstate' )
@@ -31,6 +32,8 @@ export interface ResponseSchema {
31
32
32
33
const TZ = 'America/New_York'
33
34
35
+ type ReportValueType = typeof inputParameters . validated . reportValue
36
+
34
37
// Custom transport implementation that takes incoming requests, adds them into a SET, and makes requests to DP
35
38
// on a specific time every day, after receiving a signal from scheduler.
36
39
export class NavTransport implements Transport < BaseEndpointTypes > {
@@ -39,7 +42,7 @@ export class NavTransport implements Transport<BaseEndpointTypes> {
39
42
requester ! : Requester
40
43
settings ! : BaseEndpointTypes [ 'Settings' ]
41
44
endpointName ! : string
42
- fundIdsSet ! : Set < number >
45
+ fundsMap ! : Map < string , [ number , ReportValueType ] >
43
46
44
47
async initialize (
45
48
dependencies : TransportDependencies < BaseEndpointTypes > ,
@@ -52,26 +55,27 @@ export class NavTransport implements Transport<BaseEndpointTypes> {
52
55
this . requester = dependencies . requester
53
56
this . settings = settings
54
57
this . endpointName = endpointName
55
- this . fundIdsSet = new Set ( )
58
+ this . fundsMap = new Map ( )
56
59
this . runScheduler ( )
57
60
}
58
61
59
62
// registerRequest is invoked on every valid request to EA
60
63
// Adds fundId in the request to a SET
61
64
async registerRequest ( req : AdapterRequest < typeof inputParameters . validated > ) {
62
- const { fundId } = req . requestContext . data
63
- if ( ! this . fundIdsSet . has ( fundId ) ) {
64
- this . fundIdsSet . add ( fundId )
65
- logger . info ( `Added new fund id - ${ fundId } ` )
65
+ const { fundId, reportValue } = req . requestContext . data
66
+ const mapKey = `${ fundId } +${ reportValue } `
67
+ if ( ! this . fundsMap . has ( mapKey ) ) {
68
+ this . fundsMap . set ( mapKey , [ fundId , reportValue ] )
69
+ logger . info ( `Added new fund id - ${ fundId } - reportValue ${ reportValue } ` )
66
70
}
67
71
}
68
72
69
73
// foregroundExecute is executed when there is a new request/fundId that is not in the cache
70
74
async foregroundExecute (
71
75
req : AdapterRequest < typeof inputParameters . validated > ,
72
76
) : Promise < AdapterResponse < BaseEndpointTypes [ 'Response' ] > | void > {
73
- const { fundId } = req . requestContext . data
74
- return this . execute ( fundId )
77
+ const { fundId, reportValue } = req . requestContext . data
78
+ return this . execute ( fundId , reportValue )
75
79
}
76
80
77
81
// Runs 'execute' function every day at 9:09 AM ET (if fundIdsSet is not empty)
@@ -83,19 +87,17 @@ export class NavTransport implements Transport<BaseEndpointTypes> {
83
87
84
88
schedule . scheduleJob ( rule , ( ) => {
85
89
logger . info (
86
- `Scheduled execution started at ${ Date . now ( ) } . FundIdSet - ${ [ ...this . fundIdsSet ] . join (
87
- ',' ,
88
- ) } `,
90
+ `Scheduled execution started at ${ Date . now ( ) } . FundsMap - ${ [ ...this . fundsMap ] . join ( ',' ) } ` ,
89
91
)
90
- ; [ ...this . fundIdsSet ] . map ( async ( fundId ) => this . execute ( fundId ) )
92
+ ; [ ...this . fundsMap ] . map ( async ( entry ) => this . execute ( entry [ 1 ] [ 0 ] , entry [ 1 ] [ 1 ] ) )
91
93
} )
92
94
}
93
95
94
96
// execute is either called by scheduler or foregroundExecute.
95
97
// Makes a request to DP and saves the response in the cache.
96
98
// In case the DP returns stale data the function will be executed again several times
97
99
// before finalizing and saving the last returned data to a cache.
98
- async execute ( fundId : number , retryCount = 0 ) {
100
+ async execute ( fundId : number , reportValue : ReportValueType , retryCount = 0 ) {
99
101
const providerDataRequestedUnixMs = Date . now ( )
100
102
const apiResponse = await this . makeRequest ( fundId )
101
103
const providerDataReceivedUnixMs = Date . now ( )
@@ -110,12 +112,15 @@ export class NavTransport implements Transport<BaseEndpointTypes> {
110
112
providerIndicatedTimeUnixMs : undefined ,
111
113
} ,
112
114
}
113
- await this . responseCache . write ( this . name , [ { params : { fundId } , response } ] )
115
+ await this . responseCache . write ( this . name , [ { params : { fundId, reportValue } , response } ] )
114
116
return
115
117
}
116
118
117
119
const data = apiResponse . data [ 0 ]
118
- const result = Number ( data . net_asset_value )
120
+ let result = Number ( data . net_asset_value )
121
+ if ( reportValue == AssetsUnderManagement ) {
122
+ result = Number ( data . assets_under_management )
123
+ }
119
124
120
125
// DP updates previous working day's price on the next working day at 9:09 AM ET
121
126
// If there is no fresh price data, we try to re-fetch the API until 10:30 AM ET
@@ -135,7 +140,10 @@ export class NavTransport implements Transport<BaseEndpointTypes> {
135
140
} ms`,
136
141
)
137
142
retryCount ++
138
- setTimeout ( ( ) => this . execute ( fundId , retryCount ) , this . settings . RETRY_INTERVAL_MS )
143
+ setTimeout (
144
+ ( ) => this . execute ( fundId , reportValue , retryCount ) ,
145
+ this . settings . RETRY_INTERVAL_MS ,
146
+ )
139
147
// We don't `return` here and let the value be stored in cache on purpose.
140
148
// This way the EA will respond with the latest value from DP (even though it's not the value that the EA expects),
141
149
// while it tries to get a fresh update.
@@ -159,7 +167,7 @@ export class NavTransport implements Transport<BaseEndpointTypes> {
159
167
providerIndicatedTimeUnixMs : undefined ,
160
168
} ,
161
169
}
162
- await this . responseCache . write ( this . name , [ { params : { fundId } , response } ] )
170
+ await this . responseCache . write ( this . name , [ { params : { fundId, reportValue } , response } ] )
163
171
return response
164
172
}
165
173
0 commit comments