Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add x/marketmap module #20

Closed
wants to merge 13 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions indexer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -230,4 +230,5 @@ Other example subscription events:
{ "type": "subscribe", "channel": "v4_markets" }
{ "type": "subscribe", "channel": "v4_orderbook", "id": "BTC-USD" }
{ "type": "subscribe", "channel": "v4_subaccounts", "id": "address/0" }
{ "type": "subscribe", "channel": "v4_block_height" }
```
3 changes: 2 additions & 1 deletion indexer/docker-compose-local-deployment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ services:
to-websockets-subaccounts:1:1,\
to-websockets-trades:1:1,\
to-websockets-markets:1:1,\
to-websockets-candles:1:1"
to-websockets-candles:1:1,\
to-websockets-block-height:1:1"
KAFKA_LISTENERS: INTERNAL://:9092,EXTERNAL_SAME_HOST://:29092
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:9092,EXTERNAL_SAME_HOST://localhost:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL_SAME_HOST:PLAINTEXT
Expand Down
5 changes: 3 additions & 2 deletions indexer/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,15 @@ services:
environment:
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_ADVERTISED_HOST_NAME: localhost
KAFKA_CREATE_TOPICS:
KAFKA_CREATE_TOPICS:
"to-ender:1:1,\
to-vulcan:1:1,\
to-websockets-orderbooks:1:1,\
to-websockets-subaccounts:1:1,\
to-websockets-trades:1:1,\
to-websockets-markets:1:1,\
to-websockets-candles:1:1"
to-websockets-candles:1:1,\
to-websockets-block-height:1:1"
postgres-test:
build:
context: .
Expand Down
1 change: 1 addition & 0 deletions indexer/packages/kafka/src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ export const SUBACCOUNTS_WEBSOCKET_MESSAGE_VERSION: string = '3.0.0';
export const TRADES_WEBSOCKET_MESSAGE_VERSION: string = '2.1.0';
export const MARKETS_WEBSOCKET_MESSAGE_VERSION: string = '1.0.0';
export const CANDLES_WEBSOCKET_MESSAGE_VERSION: string = '1.0.0';
export const BLOCK_HEIGHT_WEBSOCKET_MESSAGE_VERSION: string = '1.0.0';
2 changes: 2 additions & 0 deletions indexer/packages/kafka/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ export enum WebsocketTopics {
TO_WEBSOCKETS_TRADES = 'to-websockets-trades',
TO_WEBSOCKETS_MARKETS = 'to-websockets-markets',
TO_WEBSOCKETS_CANDLES = 'to-websockets-candles',
TO_WEBSOCKETS_BLOCK_HEIGHT = 'to-websockets-block-height',
}

export enum KafkaTopics {
Expand All @@ -14,4 +15,5 @@ export enum KafkaTopics {
TO_WEBSOCKETS_TRADES = 'to-websockets-trades',
TO_WEBSOCKETS_MARKETS = 'to-websockets-markets',
TO_WEBSOCKETS_CANDLES = 'to-websockets-candles',
TO_WEBSOCKETS_BLOCK_HEIGHT = 'to-websockets-block-height',
}
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,30 @@ export interface CandleMessageSDKType {

version: string;
}
/** Message to be sent through the 'to-websockets-block-height` kafka topic. */

export interface BlockHeightMessage {
/** Block height where the contents occur. */
blockHeight: string;
/** ISO formatted time of the block height. */

time: string;
/** Version of the websocket message. */

version: string;
}
/** Message to be sent through the 'to-websockets-block-height` kafka topic. */

export interface BlockHeightMessageSDKType {
/** Block height where the contents occur. */
block_height: string;
/** ISO formatted time of the block height. */

time: string;
/** Version of the websocket message. */

version: string;
}

function createBaseOrderbookMessage(): OrderbookMessage {
return {
Expand Down Expand Up @@ -629,4 +653,69 @@ export const CandleMessage = {
return message;
}

};

function createBaseBlockHeightMessage(): BlockHeightMessage {
return {
blockHeight: "",
time: "",
version: ""
};
}

export const BlockHeightMessage = {
encode(message: BlockHeightMessage, writer: _m0.Writer = _m0.Writer.create()): _m0.Writer {
if (message.blockHeight !== "") {
writer.uint32(10).string(message.blockHeight);
}

if (message.time !== "") {
writer.uint32(18).string(message.time);
}

if (message.version !== "") {
writer.uint32(26).string(message.version);
}

return writer;
},

decode(input: _m0.Reader | Uint8Array, length?: number): BlockHeightMessage {
const reader = input instanceof _m0.Reader ? input : new _m0.Reader(input);
let end = length === undefined ? reader.len : reader.pos + length;
const message = createBaseBlockHeightMessage();

while (reader.pos < end) {
const tag = reader.uint32();

switch (tag >>> 3) {
case 1:
message.blockHeight = reader.string();
break;

case 2:
message.time = reader.string();
break;

case 3:
message.version = reader.string();
break;

default:
reader.skipType(tag & 7);
break;
}
}

return message;
},

fromPartial(object: DeepPartial<BlockHeightMessage>): BlockHeightMessage {
const message = createBaseBlockHeightMessage();
message.blockHeight = object.blockHeight ?? "";
message.time = object.time ?? "";
message.version = object.version ?? "";
return message;
}

};
109 changes: 108 additions & 1 deletion indexer/packages/v4-protos/src/codegen/dydxprotocol/vault/genesis.ts
Original file line number Diff line number Diff line change
@@ -1,22 +1,55 @@
import { Params, ParamsSDKType } from "./params";
import { VaultId, VaultIdSDKType, NumShares, NumSharesSDKType } from "./vault";
import { OwnerShare, OwnerShareSDKType } from "./query";
import * as _m0 from "protobufjs/minimal";
import { DeepPartial } from "../../helpers";
/** GenesisState defines `x/vault`'s genesis state. */

export interface GenesisState {
/** The parameters of the module. */
params?: Params;
/** The vaults. */

vaults: Vault[];
}
/** GenesisState defines `x/vault`'s genesis state. */

export interface GenesisStateSDKType {
/** The parameters of the module. */
params?: ParamsSDKType;
/** The vaults. */

vaults: VaultSDKType[];
}
/** Vault defines the total shares and owner shares of a vault. */

export interface Vault {
/** The ID of the vault. */
vaultId?: VaultId;
/** The total number of shares in the vault. */

totalShares?: NumShares;
/** The shares of each owner in the vault. */

ownerShares: OwnerShare[];
}
/** Vault defines the total shares and owner shares of a vault. */

export interface VaultSDKType {
/** The ID of the vault. */
vault_id?: VaultIdSDKType;
/** The total number of shares in the vault. */

total_shares?: NumSharesSDKType;
/** The shares of each owner in the vault. */

owner_shares: OwnerShareSDKType[];
}

function createBaseGenesisState(): GenesisState {
return {
params: undefined
params: undefined,
vaults: []
};
}

Expand All @@ -26,6 +59,10 @@ export const GenesisState = {
Params.encode(message.params, writer.uint32(10).fork()).ldelim();
}

for (const v of message.vaults) {
Vault.encode(v!, writer.uint32(18).fork()).ldelim();
}

return writer;
},

Expand All @@ -42,6 +79,10 @@ export const GenesisState = {
message.params = Params.decode(reader, reader.uint32());
break;

case 2:
message.vaults.push(Vault.decode(reader, reader.uint32()));
break;

default:
reader.skipType(tag & 7);
break;
Expand All @@ -54,6 +95,72 @@ export const GenesisState = {
fromPartial(object: DeepPartial<GenesisState>): GenesisState {
const message = createBaseGenesisState();
message.params = object.params !== undefined && object.params !== null ? Params.fromPartial(object.params) : undefined;
message.vaults = object.vaults?.map(e => Vault.fromPartial(e)) || [];
return message;
}

};

function createBaseVault(): Vault {
return {
vaultId: undefined,
totalShares: undefined,
ownerShares: []
};
}

export const Vault = {
encode(message: Vault, writer: _m0.Writer = _m0.Writer.create()): _m0.Writer {
if (message.vaultId !== undefined) {
VaultId.encode(message.vaultId, writer.uint32(10).fork()).ldelim();
}

if (message.totalShares !== undefined) {
NumShares.encode(message.totalShares, writer.uint32(18).fork()).ldelim();
}

for (const v of message.ownerShares) {
OwnerShare.encode(v!, writer.uint32(26).fork()).ldelim();
}

return writer;
},

decode(input: _m0.Reader | Uint8Array, length?: number): Vault {
const reader = input instanceof _m0.Reader ? input : new _m0.Reader(input);
let end = length === undefined ? reader.len : reader.pos + length;
const message = createBaseVault();

while (reader.pos < end) {
const tag = reader.uint32();

switch (tag >>> 3) {
case 1:
message.vaultId = VaultId.decode(reader, reader.uint32());
break;

case 2:
message.totalShares = NumShares.decode(reader, reader.uint32());
break;

case 3:
message.ownerShares.push(OwnerShare.decode(reader, reader.uint32()));
break;

default:
reader.skipType(tag & 7);
break;
}
}

return message;
},

fromPartial(object: DeepPartial<Vault>): Vault {
const message = createBaseVault();
message.vaultId = object.vaultId !== undefined && object.vaultId !== null ? VaultId.fromPartial(object.vaultId) : undefined;
message.totalShares = object.totalShares !== undefined && object.totalShares !== null ? NumShares.fromPartial(object.totalShares) : undefined;
message.ownerShares = object.ownerShares?.map(e => OwnerShare.fromPartial(e)) || [];
return message;
}

Expand Down
7 changes: 7 additions & 0 deletions indexer/services/auxo/src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,13 @@ export const BAZOOKA_DB_MIGRATION_PAYLOAD: Uint8Array = new TextEncoder().encode
}),
);

export const BAZOOKA_DB_MIGRATION_AND_CREATE_KAFKA_PAYLOAD: Uint8Array = new TextEncoder().encode(
JSON.stringify({
migrate: true,
create_kafka_topics: true,
}),
);

export const ECS_SERVICE_NAMES: EcsServiceNames[] = [
EcsServiceNames.COMLINK,
EcsServiceNames.ENDER,
Expand Down
16 changes: 11 additions & 5 deletions indexer/services/auxo/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import _ from 'lodash';

import config from './config';
import {
BAZOOKA_DB_MIGRATION_AND_CREATE_KAFKA_PAYLOAD,
BAZOOKA_DB_MIGRATION_PAYLOAD,
BAZOOKA_LAMBDA_FUNCTION_NAME,
ECS_SERVICE_NAMES,
Expand All @@ -40,7 +41,7 @@ import { AuxoEventJson, EcsServiceNames, TaskDefinitionArnMap } from './types';
/**
* Upgrades all services and run migrations
* 1. Upgrade Bazooka
* 2. Run db migration in Bazooka
* 2. Run db migration in Bazooka, and update kafka topics
* 3. Create new ECS Task Definition for ECS Services with new image
* 4. Upgrade all ECS Services (comlink, ender, roundtable, socks, vulcan)
*/
Expand All @@ -66,8 +67,9 @@ export async function handler(
// 1. Upgrade Bazooka
await upgradeBazooka(lambda, ecr, event);

// 2. Run db migration in Bazooka
await runDbMigration(lambda);
// 2. Run db migration in Bazooka,
// boolean flag used to determine if new kafka topics should be created
await runDbAndKafkaMigration(event.addNewKafkaTopics, lambda);

// 3. Create new ECS Task Definition for ECS Services with new image
const taskDefinitionArnMap: TaskDefinitionArnMap = await createNewEcsTaskDefinitions(
Expand Down Expand Up @@ -192,16 +194,20 @@ async function getImageDetail(

}

async function runDbMigration(
async function runDbAndKafkaMigration(
createNewKafkaTopics: boolean,
lambda: ECRClient,
): Promise<void> {
logger.info({
at: 'index#runDbMigration',
message: 'Running db migration',
});
const payload = createNewKafkaTopics
? BAZOOKA_DB_MIGRATION_AND_CREATE_KAFKA_PAYLOAD
: BAZOOKA_DB_MIGRATION_PAYLOAD;
const response: InvokeCommandOutput = await lambda.send(new InvokeCommand({
FunctionName: BAZOOKA_LAMBDA_FUNCTION_NAME,
Payload: BAZOOKA_DB_MIGRATION_PAYLOAD,
Payload: payload,
// RequestResponse means that the lambda is synchronously invoked
InvocationType: 'RequestResponse',
}));
Expand Down
1 change: 1 addition & 0 deletions indexer/services/auxo/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ export interface AuxoEventJson {
region: string;
// In our naming we often times use the appreviated region name
regionAbbrev: string;
addNewKafkaTopics: boolean;
}

// EcsServiceName to task definition arn mapping
Expand Down
Loading
Loading