Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Caspian Migration #48

Merged
merged 15 commits into from
Feb 13, 2024
Merged
9 changes: 9 additions & 0 deletions src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,15 @@ export interface ConfigFile {
BIGQUERY_CLIENT_EMAIL?: string;
BIGQUERY_PROJECT_ID?: string;
BIGQUERY_PRIVATE_KEY?: string;


//
CASPIAN_ENDPOINT?: string;
CASPIAN_API_KEY?: string;
CASPIAN_PRODUCER_NAME?: string;
CASPIAN_ENTITY_NAME?: string;
CASPIAN_SCHEMA_TYPE?: string;
CASPIAN_SCHEMA_VERSION?: number;
}

export interface Config extends Required<ConfigFile> {}
Expand Down
126 changes: 77 additions & 49 deletions src/routes/accounts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { config } from "../config";
import { getTicket } from "../ticket-queue";
import rTracer from "cls-rtracer";
import { incrementTxRequestCount, incrementTxCount } from "../index";
const https = require("https");

export default async function (req: Request, res: Response) {
incrementTxRequestCount();
Expand Down Expand Up @@ -90,7 +91,7 @@ export default async function (req: Request, res: Response) {
amount: Number(amount),
};

if (wallet.seed) {
if (wallet && wallet.seed) {
response.seed = wallet.seed;
}

Expand All @@ -101,12 +102,12 @@ export default async function (req: Request, res: Response) {
} with ${amount} XRP (${status})`
);

if (config.BIGQUERY_PROJECT_ID) {
if (config.CASPIAN_API_KEY) {
try {
await insertIntoBigQuery(account, amount, req.body);
console.log("inserted big query");
await insertIntoCaspian(account, Number(amount), req.body, client);
console.log("Data sent to Caspian successfully");
} catch (error) {
console.warn(`Failed to insert into BigQuery: ${error}`);
console.warn("Caspian Insertion Error:", error);
}
}
incrementTxCount();
Expand All @@ -120,50 +121,6 @@ export default async function (req: Request, res: Response) {
});
}
}
async function insertIntoBigQuery(
account: Account,
amount: string,
reqBody: any
): Promise<void> {
const { userAgent = "", usageContext = "" } = reqBody;
const memos = reqBody.memos
? reqBody.memos.map((memo: any) => ({ memo }))
: [];
const rows = [
{
user_agent: userAgent,
usage_context: usageContext,
memos: memos,
account: account.xAddress,
amount: amount,
},
];
const bigquery = new BigQuery({
projectId: config.BIGQUERY_PROJECT_ID,
credentials: {
client_email: config.BIGQUERY_CLIENT_EMAIL,
private_key: config.BIGQUERY_PRIVATE_KEY,
},
});

return new Promise((resolve, reject) => {
bigquery
.dataset(config.BIGQUERY_DATASET_ID)
.table(config.BIGQUERY_TABLE_ID)
.insert(rows, (error) => {
if (error) {
console.warn(
"WARNING: Failed to insert into BigQuery",
JSON.stringify(error, null, 2)
);
reject(error);
} else {
console.log(`Inserted ${rows.length} rows`);
resolve();
}
});
});
}

async function submitPaymentWithTicket(
payment: Payment,
Expand All @@ -190,3 +147,74 @@ async function submitPaymentWithTicket(

return result;
}

async function insertIntoCaspian(
account: Account,
amount: number,
reqBody: any,
client: Client
) {
const dataPayload = [
{
user_agent: reqBody.userAgent || "",
usage_context: reqBody.usageContext || "",
memos: reqBody.memos || [],
account: account.xAddress,
amount: amount,
network: client.networkID,
},
];

const postData = JSON.stringify({
producerName: config.CASPIAN_PRODUCER_NAME,
entityName: config.CASPIAN_ENTITY_NAME,
schemaType: config.CASPIAN_SCHEMA_TYPE,
schemaVersion: Math.round(config.CASPIAN_SCHEMA_VERSION),
data: dataPayload,
timestamp: Date.now(),
});
console.log(postData);

const options = {
method: "POST",
headers: {
"Content-Type": "application/json",
"x-api-key": config.CASPIAN_API_KEY,
},
};

return new Promise((resolve, reject) => {
const req = https.request(
config.CASPIAN_ENDPOINT,
options,
(res: Response) => {
let data = "";

res.on("data", (chunk) => {
data += chunk;
});

res.on("end", () => {
if (res.statusCode === 200 || res.statusCode === 201) {
resolve(data);
} else {
reject(`Failed to send data to Caspian: ${data}`);
}
});
}
);

req.on("error", (error: any) => {
console.error("Request Error:", error);
reject({
message: `Failed to send data to Caspian: ${
error.message || "Unknown error"
}`,
attemptedData: postData, // Here, we include the data that was being sent
});
});

req.write(postData);
req.end();
});
}