Skip to content

Commit

Permalink
Wire up remaining BQ loading
Browse files Browse the repository at this point in the history
  • Loading branch information
cavis committed May 3, 2024
1 parent 3fb4df1 commit d0813e3
Show file tree
Hide file tree
Showing 9 changed files with 215 additions and 40 deletions.
38 changes: 30 additions & 8 deletions index.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
const log = require("lambda-log");

const athena = require("./lib/athena");
const bigquery = require("./lib/bigquery");
const util = require("./lib/util");
Expand All @@ -8,14 +7,19 @@ const util = require("./lib/util");
* Entrypoint
*/
exports.handler = async (event = {}) => {
if (event.day) {
await rollup(util.parseDate(event.day));
} else {
const start = await bigquery.latestRollupDay();
const days = util.daysToProcess(start);
for (const day of days) {
await rollup(day);
try {
if (event.day) {
await rollup(util.parseDate(event.day));
} else {
const start = await bigquery.latestRollupDay();
const days = util.daysToProcess(start);
for (const day of days) {
await rollup(day);
}
}
} catch (err) {
log.error("error running rollups", { err });
throw err;
}
};

Expand All @@ -26,9 +30,27 @@ async function rollup(day) {
const dayStr = day.toISOString().split("T").shift();
log.info(`rolling up ${dayStr}`);

// query and parse cloudfront logs via athena
const [elapsed, rows] = await util.elapsed(() => athena.queryUsage(day));
const parsed = util.parseUsage(day, rows);
const total = parsed.reduce((sum, row) => sum + row.bytes, 0);
const info = { rows: rows.length, parsed: parsed.length, total: total };
log.info(`athena ${dayStr}`, { elapsed, ...info });

// abort if bigquery already has more bytes for the day
// (maybe part of this day logs already expired from s3)
const currentTotal = await bigquery.totalBytes(day);
if (currentTotal > total) {
log.error(`bigquery total mismatch ${dayStr}`, { currentTotal, total });
return false;
}

// delete day from bigquery and insert fresh data
await bigquery.deleteDay(day);
const [bqElapsed, bqRows] = await util.elapsed(() =>
bigquery.loadDay(day, parsed),
);
log.info(`bigquery ${dayStr}`, { elapsed: bqElapsed, rows: bqRows });

return true;
}
5 changes: 4 additions & 1 deletion lib/athena.js
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,14 @@ async function loadResults(client, QueryExecutionId, NextToken = null) {
return rows;
}

// mockable client
exports.client = () => new AthenaClient();

/**
* Run a query and wait for the result
*/
exports.query = async (sql, params = null) => {
const athena = new AthenaClient();
const athena = exports.client();
const id = await startQuery(athena, sql, params);
await waitQuery(athena, id);
return loadResults(athena, id);
Expand Down
45 changes: 43 additions & 2 deletions lib/athena.test.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,48 @@
const athena = require("./athena");

describe("athena", () => {
describe("query", () => {
it("jumps through a bunch of hoops to run a query and load results", async () => {
const commands = [];

const client = {
send: (cmd) => {
commands.push(cmd);
if (cmd.constructor.name === "StartQueryExecutionCommand") {
return { QueryExecutionId: 1234 };
}
if (cmd.constructor.name === "GetQueryExecutionCommand") {
return { QueryExecution: { Status: { State: "SUCCEEDED" } } };
}
if (cmd.constructor.name === "GetQueryResultsCommand") {
const header = ["col1", "col2"];
const row1 = ["foo", "bar"];
return {
ResultSet: { ResultRows: [{ Data: header }, { Data: row1 }] },
};
}
throw new Error(`Unexpected command: ${cmd.constructor.name}`);
},
};

jest.spyOn(athena, "client").mockReturnValue(client);

const rows = await athena.query("SELECT foo FROM ?", ["'bar'"]);
expect(rows).toEqual([["foo", "bar"]]);
});
});

describe("queryUsage", () => {
test("todo", () => {
expect("todo").toEqual("todo");
it("queries for cdn usage", async () => {
const rows = [["foo", "bar"]];
jest.spyOn(athena, "query").mockImplementation(async () => rows);

const result = await athena.queryUsage(new Date("2024-05-03"));
expect(result).toEqual(rows);

expect(athena.query).toHaveBeenCalledTimes(1);
expect(athena.query.mock.calls[0][0]).toMatch(/select regexp_replace/i);
expect(athena.query.mock.calls[0][1]).toEqual(["'2024-05-03'"]);
});
});
});
61 changes: 58 additions & 3 deletions lib/bigquery.js
Original file line number Diff line number Diff line change
@@ -1,12 +1,22 @@
const { promisify } = require("node:util");
const fs = require("fs").promises;
const zlib = require("zlib");
const log = require("lambda-log");
const { BigQuery } = require("@google-cloud/bigquery");

const gzip = promisify(zlib.gzip);

// mockable dataset
exports.dataset = () => {
const bq = new BigQuery();
return bq.dataset(process.env.BQ_DATASET);
};

/**
* Run a query and wait for the result
*/
exports.query = async (query, params = null) => {
const bq = new BigQuery();
const ds = bq.dataset(process.env.BQ_DATASET);
const [job] = await ds.createQueryJob({ query, params });
const [job] = await exports.dataset().createQueryJob({ query, params });
const [rows] = await job.getQueryResults();
return rows;
};
Expand All @@ -31,3 +41,48 @@ exports.latestRollupDay = async () => {

return lower;
};

/**
* Get the current total bytes for a day
*/
exports.totalBytes = async (day) => {
const sql = `SELECT SUM(bytes) AS total_bytes FROM dt_bytes WHERE day = @day`;
const rows = await exports.query(sql, { day });
return rows[0]?.total_bytes || 0;
};

/**
* Delete a day of bytes data
*/
exports.deleteDay = async (day) => {
const sql = `DELETE FROM dt_bytes WHERE day = @day`;
await exports.query(sql, { day });
return true;
};

/**
* Save a day of bytes to a temp file, then load to bigquery
*/
exports.loadDay = async (day, rows) => {
if (rows.length === 0) {
return 0;
}

const file = `/tmp/dt_bytes_${day.toISOString().split("T").shift()}.mjson.gz`;
try {
const mjson = rows.map((r) => JSON.stringify(r)).join("\n");
const gzipped = await gzip(Buffer.from(mjson, "utf8"));
await fs.writeFile(file, gzipped);

const opts = { sourceFormat: "NEWLINE_DELIMITED_JSON" };
await exports.dataset().table("dt_bytes").load(file, opts);

return rows.length;
} finally {
try {
await fs.unlink(file);
} catch {
log.debug("unable to unlink", { file });
}
}
};
58 changes: 54 additions & 4 deletions lib/bigquery.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,66 @@ describe("bigquery", () => {
jest.useFakeTimers().setSystemTime(new Date("2024-04-04"));
});

test("returns the earliest log we haven't expired", async () => {
it("returns the earliest log we haven't expired", async () => {
process.env.DT_LOG_EXPIRATION_DAYS = 2;
jest.spyOn(bigquery, "query").mockImplementation(() => []);
jest.spyOn(bigquery, "query").mockReturnValue([]);
expect(await bigquery.latestRollupDay()).toEqual(new Date("2024-04-02"));
});

test("returns the latest rollup day in bigquery", async () => {
it("returns the latest rollup day in bigquery", async () => {
const row = { day: { value: "2024-03-06" } };
jest.spyOn(bigquery, "query").mockImplementation(() => [row]);
jest.spyOn(bigquery, "query").mockReturnValue([row]);
expect(await bigquery.latestRollupDay()).toEqual(new Date("2024-03-06"));
});
});

describe("totalBytes", () => {
it("returns the total bytes for a day in bigquery", async () => {
const row = { total_bytes: 12345 };
jest.spyOn(bigquery, "query").mockReturnValue([row]);

const total = await bigquery.totalBytes(new Date("2024-05-03"));
expect(total).toEqual(12345);

expect(bigquery.query).toHaveBeenCalledTimes(1);
expect(bigquery.query.mock.calls[0][0]).toMatch(/select sum\(bytes\)/i);
expect(bigquery.query.mock.calls[0][1]).toEqual({
day: new Date("2024-05-03"),
});
});
});

describe("deleteDay", () => {
it("deletes a day of data", async () => {
jest.spyOn(bigquery, "query").mockReturnValue([]);

await bigquery.deleteDay(new Date("2024-05-03"));

expect(bigquery.query).toHaveBeenCalledTimes(1);
expect(bigquery.query.mock.calls[0][0]).toMatch(/delete from dt_bytes/i);
expect(bigquery.query.mock.calls[0][1]).toEqual({
day: new Date("2024-05-03"),
});
});
});

describe("loadDay", () => {
it("gzips and loads a day of data", async () => {
const table = { load: () => true };
const dataset = { table: () => table };
jest.spyOn(table, "load").mockImplementation(async () => true);
jest.spyOn(bigquery, "dataset").mockReturnValue(dataset);

const r1 = { feeder_episode: "foo", bytes: 1234 };
const r2 = { feeder_episode: "bar", bytes: 5678 };
const count = await bigquery.loadDay(new Date("2024-05-03"), [r1, r2]);

expect(count).toEqual(2);
expect(table.load).toHaveBeenCalledTimes(1);
expect(table.load.mock.calls[0][0]).toMatch("2024-05-03.mjson.gz");
expect(table.load.mock.calls[0][1]).toEqual({
sourceFormat: "NEWLINE_DELIMITED_JSON",
});
});
});
});
14 changes: 7 additions & 7 deletions lib/util.js
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ exports.elapsed = async (fn) => {

/**
* Extract metadata from cdn paths. Counted paths must be one of:
* <podcast_id>/<episode_id>
* <podcast_id>/<feed_slug>/<episode_id>
* <feeder_podcast>/<feeder_episode>
* <feeder_podcast>/<feeder_feed>/<feeder_episode>
*/
exports.parseUsage = (day, rows) =>
rows
Expand All @@ -58,14 +58,14 @@ exports.parseUsage = (day, rows) =>

if (parts[0].match(/^[0-9]+$/)) {
/* eslint-disable camelcase */
const podcast_id = parseInt(parts[0], 10);
const feed_slug = parts[1];
const episode_id = parts[parts.length - 1];
const feeder_podcast = parseInt(parts[0], 10);
const feeder_feed = parts[1];
const feeder_episode = parts[parts.length - 1];
if (parts.length === 2) {
return { podcast_id, episode_id, bytes, day };
return { feeder_podcast, feeder_episode, bytes, day };
}
if (parts.length === 3) {
return { podcast_id, feed_slug, episode_id, bytes, day };
return { feeder_podcast, feeder_feed, feeder_episode, bytes, day };
}
/* eslint-enable camelcase */
}
Expand Down
30 changes: 15 additions & 15 deletions lib/util.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,16 @@ const util = require("./util");

describe("util", () => {
describe("parseDate", () => {
test("parses strings", () => {
it("parses strings", () => {
expect(util.parseDate("2022-04-05")).toEqual(new Date("2022-04-05"));
});

test("returns null for blank values", () => {
it("returns null for blank values", () => {
expect(util.parseDate(null)).toBeNull();
expect(util.parseDate("")).toBeNull();
});

test("throws errors for bad dates", () => {
it("throws errors for bad dates", () => {
expect(() => util.parseDate("whatev")).toThrow(/invalid date input/i);
expect(() => util.parseDate("2024-99-99")).toThrow(/invalid date input/i);
});
Expand All @@ -23,7 +23,7 @@ describe("util", () => {
jest.useFakeTimers().setSystemTime(new Date("2024-04-04"));
});

test("returns an array of dates", () => {
it("returns an array of dates", () => {
process.env.MAX_DAYS_TO_ROLLUP = 1;
expect(util.daysToProcess(new Date("2024-03-04T12:34:56Z"))).toEqual([
new Date("2024-03-04"),
Expand All @@ -38,7 +38,7 @@ describe("util", () => {
]);
});

test("stops after the current day", () => {
it("stops after the current day", () => {
process.env.MAX_DAYS_TO_ROLLUP = 4;
expect(util.daysToProcess(new Date("2024-04-03"))).toEqual([
new Date("2024-04-03"),
Expand All @@ -48,15 +48,15 @@ describe("util", () => {
});

describe("elapsed", () => {
test("times functions", async () => {
it("times functions", async () => {
const [elapsed, result] = await util.elapsed(() => "foo");
expect(elapsed).toBeGreaterThanOrEqual(0);
expect(result).toEqual("foo");
});
});

describe("parseUsage", () => {
test("parses dovetail cdn paths", () => {
it("parses dovetail cdn paths", () => {
const day = new Date("2024-03-04");
const rows = util.parseUsage(day, [
["1234/some-guid", "99"],
Expand All @@ -66,27 +66,27 @@ describe("util", () => {

expect(rows.length).toEqual(3);
expect(rows[0]).toEqual({
podcast_id: 1234,
episode_id: "some-guid",
feeder_podcast: 1234,
feeder_episode: "some-guid",
bytes: 99,
day,
});
expect(rows[1]).toEqual({
podcast_id: 1234,
episode_id: "another-guid",
feeder_podcast: 1234,
feeder_episode: "another-guid",
bytes: 88,
day,
});
expect(rows[2]).toEqual({
podcast_id: 5678,
feed_slug: "my-feed",
episode_id: "the-guid",
feeder_podcast: 5678,
feeder_feed: "my-feed",
feeder_episode: "the-guid",
bytes: 77,
day,
});
});

test("warns for unrecognized paths", () => {
it("warns for unrecognized paths", () => {
jest.spyOn(log, "warn").mockImplementation(() => null);

const day = new Date("2024-03-04");
Expand Down
Loading

0 comments on commit d0813e3

Please sign in to comment.