Skip to content

Commit

Permalink
Log errors when we fall behind
Browse files Browse the repository at this point in the history
  • Loading branch information
cavis committed May 7, 2024
1 parent 455fe3c commit 7e7dc9d
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 9 deletions.
34 changes: 28 additions & 6 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,26 +7,48 @@ const util = require("./lib/util");
* Entrypoint
*/
exports.handler = async (event = {}) => {
let latestDay;

try {
if (event.day) {
await rollup(util.parseDate(event.day));
await exports.rollup(util.parseDate(event.day));
} else {
const start = await bigquery.latestRollupDay();
const days = util.daysToProcess(start);
latestDay = await bigquery.latestRollupDay();
const days = util.daysToProcess(latestDay);
for (const day of days) {
await rollup(day);
await exports.rollup(day);
latestDay = day;
}
}
} catch (err) {
log.error("error running rollups", { err });
throw err;
} finally {
if (!event.day) {
exports.logFallingBehind(latestDay);
}
}
};

// log errors if we're far behind (or can't connect to BQ to check)
exports.logFallingBehind = (latestDay) => {
if (latestDay) {
const daysBehind = Math.floor(bigquery.logExpirationDays() / 2);
const threshold = new Date();
threshold.setDate(threshold.getDate() - daysBehind);
if (latestDay < threshold) {
const latest = latestDay.toISOString().split("T").shift();
log.error("rollups behind", { latest });
}
} else {
log.error("rollups behind");
}
};

/**
* Rollup a single day
*/
async function rollup(day) {
exports.rollup = async (day) => {
const dayStr = day.toISOString().split("T").shift();
log.info(`rolling up ${dayStr}`);

Expand All @@ -53,4 +75,4 @@ async function rollup(day) {
log.info(`bigquery ${dayStr}`, { elapsed: bqElapsed, rows: bqRows });

return true;
}
};
91 changes: 90 additions & 1 deletion index.test.js
Original file line number Diff line number Diff line change
@@ -1,15 +1,104 @@
const log = require("lambda-log");
const athena = require("./lib/athena");
const bigquery = require("./lib/bigquery");
const index = require("./index");

describe("index", () => {
describe("handler", () => {
it("validates dates", async () => {
jest.spyOn(log, "error").mockImplementation(() => null);
jest.spyOn(log, "error").mockReturnValue();

expect(index.handler({ day: "n/a" })).rejects.toThrow(/invalid date/i);

expect(log.error).toHaveBeenCalledTimes(1);
expect(log.error.mock.calls[0][0]).toMatch(/error running rollups/);
});

it("inserts a day of data", async () => {
const day = new Date("2024-05-07");
const rows = [["1234/abcd", 5678]];

jest.spyOn(log, "info").mockReturnValue();
jest.spyOn(athena, "queryUsage").mockReturnValue(rows);
jest.spyOn(bigquery, "totalBytes").mockReturnValue(0);
jest.spyOn(bigquery, "deleteDay").mockReturnValue();
jest.spyOn(bigquery, "loadDay").mockReturnValue();

await index.handler({ day: "2024-05-07" });

expect(athena.queryUsage).toHaveBeenCalledTimes(1);
expect(athena.queryUsage.mock.calls[0][0]).toEqual(day);
expect(bigquery.totalBytes).toHaveBeenCalledTimes(1);
expect(bigquery.totalBytes.mock.calls[0][0]).toEqual(day);
expect(bigquery.deleteDay).toHaveBeenCalledTimes(1);
expect(bigquery.deleteDay.mock.calls[0][0]).toEqual(day);
expect(bigquery.loadDay).toHaveBeenCalledTimes(1);
expect(bigquery.loadDay.mock.calls[0][0]).toEqual(day);
expect(bigquery.loadDay.mock.calls[0][1]).toEqual([
{
feeder_podcast: 1234,
feeder_episode: "abcd",
bytes: 5678,
day: "2024-05-07",
},
]);
});

it("rolls up the latest day forward", async () => {
const today = new Date("2024-05-07");
const latest = new Date("2024-05-05");

jest.useFakeTimers().setSystemTime(today);
jest.spyOn(bigquery, "latestRollupDay").mockReturnValue(latest);
jest.spyOn(index, "rollup").mockReturnValue(true);

process.env.MAX_DAYS_TO_ROLLUP = 4;
await index.handler();

expect(index.rollup).toHaveBeenCalledTimes(3);
expect(index.rollup.mock.calls[0][0]).toEqual(latest);
expect(index.rollup.mock.calls[1][0]).toEqual(new Date("2024-05-06"));
expect(index.rollup.mock.calls[2][0]).toEqual(today);
});

it("logs an error if we're falling behind", async () => {
jest.spyOn(log, "info").mockReturnValue();
jest.spyOn(log, "error").mockReturnValue();

const today = new Date("2024-05-07");
const latest = new Date("2024-04-24");

jest.useFakeTimers().setSystemTime(today);
jest.spyOn(bigquery, "latestRollupDay").mockReturnValue(latest);
jest.spyOn(index, "rollup").mockReturnValue(true);

process.env.MAX_DAYS_TO_ROLLUP = 4;
await index.handler();

expect(index.rollup).toHaveBeenCalledTimes(4);
expect(log.error).toHaveBeenCalledTimes(1);
expect(log.error.mock.calls[0][0]).toEqual("rollups behind");
expect(log.error.mock.calls[0][1]).toEqual({ latest: "2024-04-27" });
});

it("logs an error if the day totalBytes is larger than what we queried", async () => {
const day = new Date("2024-05-07");
const rows = [["1234/abcd", 5678]];

jest.spyOn(log, "info").mockReturnValue();
jest.spyOn(log, "error").mockReturnValue();
jest.spyOn(athena, "queryUsage").mockReturnValue(rows);
jest.spyOn(bigquery, "totalBytes").mockReturnValue(9999);

await index.handler({ day: "2024-05-07" });

expect(athena.queryUsage).toHaveBeenCalledTimes(1);
expect(athena.queryUsage.mock.calls[0][0]).toEqual(day);
expect(bigquery.totalBytes).toHaveBeenCalledTimes(1);
expect(bigquery.totalBytes.mock.calls[0][0]).toEqual(day);

expect(log.error).toHaveBeenCalledTimes(1);
expect(log.error.mock.calls[0][0]).toMatch(/bigquery total mismatch/);
});
});
});
5 changes: 4 additions & 1 deletion lib/bigquery.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ const { BigQuery } = require("@google-cloud/bigquery");

const gzip = promisify(zlib.gzip);

exports.logExpirationDays = () =>
parseInt(process.env.DT_LOG_EXPIRATION_DAYS, 10) || 14;

// optional: google workload identity federation
exports.opts = () => {
if (process.env.BQ_CLIENT_CONFIG) {
Expand Down Expand Up @@ -37,7 +40,7 @@ exports.query = async (query, params = null) => {
* Get the latest rolled-up-day in the dt_bytes table
*/
exports.latestRollupDay = async () => {
const logExp = parseInt(process.env.DT_LOG_EXPIRATION_DAYS, 10) || 14;
const logExp = exports.logExpirationDays();

// find the earliest day eligibile to rollup
const lower = new Date();
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
"watch": "npx jest --watch"
},
"jest": {
"resetMocks": true
"restoreMocks": true
},
"repository": {
"type": "git",
Expand Down

0 comments on commit 7e7dc9d

Please sign in to comment.