Skip to content

Commit

Permalink
Init athena querying
Browse files Browse the repository at this point in the history
  • Loading branch information
cavis committed May 2, 2024
1 parent db0f361 commit 3fb4df1
Show file tree
Hide file tree
Showing 7 changed files with 216 additions and 13 deletions.
4 changes: 3 additions & 1 deletion .eslintrc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ parserOptions:
ecmaVersion: latest
sourceType: module
rules:
import/no-extraneous-dependencies: off
import/prefer-default-export: off
no-await-in-loop: off
no-console: off
no-restricted-syntax: off
no-use-before-define: [error, { functions: false }]
import/prefer-default-export: off
object-shorthand: off
8 changes: 5 additions & 3 deletions env-example
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
DT_LOG_EXPIRATION_DAYS=14
MAX_DAYS_TO_ROLLUP=1
BQ_DATASET=development
ATHENA_DB=
ATHENA_TABLE=
BQ_DATASET=
DT_LOG_EXPIRATION_DAYS=
MAX_DAYS_TO_ROLLUP=
26 changes: 17 additions & 9 deletions index.js
Original file line number Diff line number Diff line change
@@ -1,21 +1,15 @@
const log = require("lambda-log");

const athena = require("./lib/athena");
const bigquery = require("./lib/bigquery");
const util = require("./lib/util");

/**
* Rollup a single day
*/
async function rollup(day) {
const dayStr = day.toISOString().split("T").shift();
log.info(`rolling up ${dayStr}`);
}

/**
* Entrypoint
*/
exports.handler = async (event = {}) => {
if (event.day) {
await rollup(event.day);
await rollup(util.parseDate(event.day));
} else {
const start = await bigquery.latestRollupDay();
const days = util.daysToProcess(start);
Expand All @@ -24,3 +18,17 @@ exports.handler = async (event = {}) => {
}
}
};

/**
* Rollup a single day
*/
async function rollup(day) {
const dayStr = day.toISOString().split("T").shift();
log.info(`rolling up ${dayStr}`);

const [elapsed, rows] = await util.elapsed(() => athena.queryUsage(day));
const parsed = util.parseUsage(day, rows);
const total = parsed.reduce((sum, row) => sum + row.bytes, 0);
const info = { rows: rows.length, parsed: parsed.length, total: total };
log.info(`athena ${dayStr}`, { elapsed, ...info });
}
82 changes: 82 additions & 0 deletions lib/athena.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
const log = require("lambda-log");
const {
AthenaClient,
StartQueryExecutionCommand,
GetQueryExecutionCommand,
GetQueryResultsCommand,
} = require("@aws-sdk/client-athena");

// helper to start a query
async function startQuery(client, QueryString, ExecutionParameters = null) {
const params = {
QueryString,
ExecutionParameters,
QueryExecutionContext: {
Catalog: "AwsDataCatalog",
Database: process.env.ATHENA_DB,
},
OutputLocation: "s3://prx-ryan/why-does-it-not-put-results-here/",
};
const command = await new StartQueryExecutionCommand(params);
const response = await client.send(command);
return response.QueryExecutionId;
}

// helper to wait for query
async function waitQuery(client, QueryExecutionId) {
const command = await new GetQueryExecutionCommand({ QueryExecutionId });
const response = await client.send(command);
const state = response.QueryExecution.Status.State;
if (state === "SUCCEEDED") {
return true;
}
if (state === "QUEUED" || state === "RUNNING") {
await new Promise((r) => {
setTimeout(r, 1000);
});
return waitQuery(client, QueryExecutionId);
}
log.error("athena query failed", { err: response });
throw new Error("query failed");
}

// helper to load all pages of results
async function loadResults(client, QueryExecutionId, NextToken = null) {
const params = { QueryExecutionId, NextToken, MaxResults: 1000 };
const command = new GetQueryResultsCommand(params);
const response = await client.send(command);
const rows = response.ResultSet.ResultRows.map((r) => r.Data);
if (!NextToken) {
rows.shift(); // first row contains headers
}
if (response.NextToken) {
return rows.concat(
await loadResults(client, QueryExecutionId, response.NextToken),
);
}
return rows;
}

/**
* Run a query and wait for the result
*/
exports.query = async (sql, params = null) => {
const athena = new AthenaClient();
const id = await startQuery(athena, sql, params);
await waitQuery(athena, id);
return loadResults(athena, id);
};

/**
* Query path/bytes usage information from Athena
*/
exports.queryUsage = async (day) => {
const dayStr = day.toISOString().split("T").shift();
const removeRegion = "REGEXP_REPLACE(uri, '\\A/(use1/|usw2/)?')";
const uri = `REGEXP_REPLACE(${removeRegion}, '/[^/]+/[^/]+\\z')`;
const tbl = process.env.ATHENA_TABLE;
const sql = `SELECT ${uri}, SUM(bytes) FROM ${tbl} WHERE date = DATE(?) GROUP BY ${uri}`;

// NOTE: for some reason, date params must be quoted
return exports.query(sql, [`'${dayStr}'`]);
};
7 changes: 7 additions & 0 deletions lib/athena.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
describe("athena", () => {
describe("queryUsage", () => {
test("todo", () => {
expect("todo").toEqual("todo");
});
});
});
45 changes: 45 additions & 0 deletions lib/util.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
const log = require("lambda-log");

const IGNORE_PATHS = [/^$/, /^favicon\.ico$/, /^robots.txt$/];

/**
* Parse string to date array
*/
Expand Down Expand Up @@ -31,3 +35,44 @@ exports.daysToProcess = (startDate) => {
}
return days;
};

/**
* Time how long a function takes to execute
*/
exports.elapsed = async (fn) => {
const start = Date.now();
const result = await fn();
return [Date.now() - start, result];
};

/**
* Extract metadata from cdn paths. Counted paths must be one of:
* <podcast_id>/<episode_id>
* <podcast_id>/<feed_slug>/<episode_id>
*/
exports.parseUsage = (day, rows) =>
rows
.map(([uri, byteStr]) => {
const parts = uri.split("/");
const bytes = parseInt(byteStr, 10);

if (parts[0].match(/^[0-9]+$/)) {
/* eslint-disable camelcase */
const podcast_id = parseInt(parts[0], 10);
const feed_slug = parts[1];
const episode_id = parts[parts.length - 1];
if (parts.length === 2) {
return { podcast_id, episode_id, bytes, day };
}
if (parts.length === 3) {
return { podcast_id, feed_slug, episode_id, bytes, day };
}
/* eslint-enable camelcase */
}

if (!IGNORE_PATHS.some((r) => r.test(uri))) {
log.warn(`unrecognized uri: '${uri}' (${bytes} bytes)`);
}
return null;
})
.filter((r) => r);
57 changes: 57 additions & 0 deletions lib/util.test.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
const log = require("lambda-log");
const util = require("./util");

describe("util", () => {
Expand Down Expand Up @@ -45,4 +46,60 @@ describe("util", () => {
]);
});
});

describe("elapsed", () => {
test("times functions", async () => {
const [elapsed, result] = await util.elapsed(() => "foo");
expect(elapsed).toBeGreaterThanOrEqual(0);
expect(result).toEqual("foo");
});
});

describe("parseUsage", () => {
test("parses dovetail cdn paths", () => {
const day = new Date("2024-03-04");
const rows = util.parseUsage(day, [
["1234/some-guid", "99"],
["1234/another-guid", "88"],
["5678/my-feed/the-guid", "77"],
]);

expect(rows.length).toEqual(3);
expect(rows[0]).toEqual({
podcast_id: 1234,
episode_id: "some-guid",
bytes: 99,
day,
});
expect(rows[1]).toEqual({
podcast_id: 1234,
episode_id: "another-guid",
bytes: 88,
day,
});
expect(rows[2]).toEqual({
podcast_id: 5678,
feed_slug: "my-feed",
episode_id: "the-guid",
bytes: 77,
day,
});
});

test("warns for unrecognized paths", () => {
jest.spyOn(log, "warn").mockImplementation(() => null);

const day = new Date("2024-03-04");
const rows = util.parseUsage(day, [
["whatev", "99"],
["robots.txt", "88"],
["string/string", "77"],
]);

expect(rows.length).toEqual(0);
expect(log.warn).toHaveBeenCalledTimes(2);
expect(log.warn.mock.calls[0][0]).toMatch(/unrecognized uri/);
expect(log.warn.mock.calls[1][0]).toMatch(/unrecognized uri/);
});
});
});

0 comments on commit 3fb4df1

Please sign in to comment.