diff --git a/.eslintrc.yml b/.eslintrc.yml index 28f6059..f025cc3 100644 --- a/.eslintrc.yml +++ b/.eslintrc.yml @@ -7,8 +7,10 @@ parserOptions: ecmaVersion: latest sourceType: module rules: + import/no-extraneous-dependencies: off + import/prefer-default-export: off no-await-in-loop: off no-console: off no-restricted-syntax: off no-use-before-define: [error, { functions: false }] - import/prefer-default-export: off + object-shorthand: off diff --git a/env-example b/env-example index 9db16b4..7b3e4a3 100644 --- a/env-example +++ b/env-example @@ -1,3 +1,5 @@ -DT_LOG_EXPIRATION_DAYS=14 -MAX_DAYS_TO_ROLLUP=1 -BQ_DATASET=development +ATHENA_DB= +ATHENA_TABLE= +BQ_DATASET= +DT_LOG_EXPIRATION_DAYS= +MAX_DAYS_TO_ROLLUP= diff --git a/index.js b/index.js index 1db3232..0b7e752 100644 --- a/index.js +++ b/index.js @@ -1,21 +1,15 @@ const log = require("lambda-log"); + +const athena = require("./lib/athena"); const bigquery = require("./lib/bigquery"); const util = require("./lib/util"); -/** - * Rollup a single day - */ -async function rollup(day) { - const dayStr = day.toISOString().split("T").shift(); - log.info(`rolling up ${dayStr}`); -} - /** * Entrypoint */ exports.handler = async (event = {}) => { if (event.day) { - await rollup(event.day); + await rollup(util.parseDate(event.day)); } else { const start = await bigquery.latestRollupDay(); const days = util.daysToProcess(start); @@ -24,3 +18,17 @@ exports.handler = async (event = {}) => { } } }; + +/** + * Rollup a single day + */ +async function rollup(day) { + const dayStr = day.toISOString().split("T").shift(); + log.info(`rolling up ${dayStr}`); + + const [elapsed, rows] = await util.elapsed(() => athena.queryUsage(day)); + const parsed = util.parseUsage(day, rows); + const total = parsed.reduce((sum, row) => sum + row.bytes, 0); + const info = { rows: rows.length, parsed: parsed.length, total: total }; + log.info(`athena ${dayStr}`, { elapsed, ...info }); +} diff --git a/lib/athena.js b/lib/athena.js new file mode 100644 index 0000000..fe59299 --- /dev/null +++ b/lib/athena.js @@ -0,0 +1,82 @@ +const log = require("lambda-log"); +const { + AthenaClient, + StartQueryExecutionCommand, + GetQueryExecutionCommand, + GetQueryResultsCommand, +} = require("@aws-sdk/client-athena"); + +// helper to start a query +async function startQuery(client, QueryString, ExecutionParameters = null) { + const params = { + QueryString, + ExecutionParameters, + QueryExecutionContext: { + Catalog: "AwsDataCatalog", + Database: process.env.ATHENA_DB, + }, + OutputLocation: "s3://prx-ryan/why-does-it-not-put-results-here/", + }; + const command = await new StartQueryExecutionCommand(params); + const response = await client.send(command); + return response.QueryExecutionId; +} + +// helper to wait for query +async function waitQuery(client, QueryExecutionId) { + const command = await new GetQueryExecutionCommand({ QueryExecutionId }); + const response = await client.send(command); + const state = response.QueryExecution.Status.State; + if (state === "SUCCEEDED") { + return true; + } + if (state === "QUEUED" || state === "RUNNING") { + await new Promise((r) => { + setTimeout(r, 1000); + }); + return waitQuery(client, QueryExecutionId); + } + log.error("athena query failed", { err: response }); + throw new Error("query failed"); +} + +// helper to load all pages of results +async function loadResults(client, QueryExecutionId, NextToken = null) { + const params = { QueryExecutionId, NextToken, MaxResults: 1000 }; + const command = new GetQueryResultsCommand(params); + const response = await client.send(command); + const rows = response.ResultSet.ResultRows.map((r) => r.Data); + if (!NextToken) { + rows.shift(); // first row contains headers + } + if (response.NextToken) { + return rows.concat( + await loadResults(client, QueryExecutionId, response.NextToken), + ); + } + return rows; +} + +/** + * Run a query and wait for the result + */ +exports.query = async (sql, params = null) => { + const athena = new AthenaClient(); + const id = await startQuery(athena, sql, params); + await waitQuery(athena, id); + return loadResults(athena, id); +}; + +/** + * Query path/bytes usage information from Athena + */ +exports.queryUsage = async (day) => { + const dayStr = day.toISOString().split("T").shift(); + const removeRegion = "REGEXP_REPLACE(uri, '\\A/(use1/|usw2/)?')"; + const uri = `REGEXP_REPLACE(${removeRegion}, '/[^/]+/[^/]+\\z')`; + const tbl = process.env.ATHENA_TABLE; + const sql = `SELECT ${uri}, SUM(bytes) FROM ${tbl} WHERE date = DATE(?) GROUP BY ${uri}`; + + // NOTE: for some reason, date params must be quoted + return exports.query(sql, [`'${dayStr}'`]); +}; diff --git a/lib/athena.test.js b/lib/athena.test.js new file mode 100644 index 0000000..9f53367 --- /dev/null +++ b/lib/athena.test.js @@ -0,0 +1,7 @@ +describe("athena", () => { + describe("queryUsage", () => { + test("todo", () => { + expect("todo").toEqual("todo"); + }); + }); +}); diff --git a/lib/util.js b/lib/util.js index 065fca0..7e7769b 100644 --- a/lib/util.js +++ b/lib/util.js @@ -1,3 +1,7 @@ +const log = require("lambda-log"); + +const IGNORE_PATHS = [/^$/, /^favicon\.ico$/, /^robots.txt$/]; + /** * Parse string to date array */ @@ -31,3 +35,44 @@ exports.daysToProcess = (startDate) => { } return days; }; + +/** + * Time how long a function takes to execute + */ +exports.elapsed = async (fn) => { + const start = Date.now(); + const result = await fn(); + return [Date.now() - start, result]; +}; + +/** + * Extract metadata from cdn paths. Counted paths must be one of: + * / + * // + */ +exports.parseUsage = (day, rows) => + rows + .map(([uri, byteStr]) => { + const parts = uri.split("/"); + const bytes = parseInt(byteStr, 10); + + if (parts[0].match(/^[0-9]+$/)) { + /* eslint-disable camelcase */ + const podcast_id = parseInt(parts[0], 10); + const feed_slug = parts[1]; + const episode_id = parts[parts.length - 1]; + if (parts.length === 2) { + return { podcast_id, episode_id, bytes, day }; + } + if (parts.length === 3) { + return { podcast_id, feed_slug, episode_id, bytes, day }; + } + /* eslint-enable camelcase */ + } + + if (!IGNORE_PATHS.some((r) => r.test(uri))) { + log.warn(`unrecognized uri: '${uri}' (${bytes} bytes)`); + } + return null; + }) + .filter((r) => r); diff --git a/lib/util.test.js b/lib/util.test.js index 8246354..0062dd4 100644 --- a/lib/util.test.js +++ b/lib/util.test.js @@ -1,3 +1,4 @@ +const log = require("lambda-log"); const util = require("./util"); describe("util", () => { @@ -45,4 +46,60 @@ describe("util", () => { ]); }); }); + + describe("elapsed", () => { + test("times functions", async () => { + const [elapsed, result] = await util.elapsed(() => "foo"); + expect(elapsed).toBeGreaterThanOrEqual(0); + expect(result).toEqual("foo"); + }); + }); + + describe("parseUsage", () => { + test("parses dovetail cdn paths", () => { + const day = new Date("2024-03-04"); + const rows = util.parseUsage(day, [ + ["1234/some-guid", "99"], + ["1234/another-guid", "88"], + ["5678/my-feed/the-guid", "77"], + ]); + + expect(rows.length).toEqual(3); + expect(rows[0]).toEqual({ + podcast_id: 1234, + episode_id: "some-guid", + bytes: 99, + day, + }); + expect(rows[1]).toEqual({ + podcast_id: 1234, + episode_id: "another-guid", + bytes: 88, + day, + }); + expect(rows[2]).toEqual({ + podcast_id: 5678, + feed_slug: "my-feed", + episode_id: "the-guid", + bytes: 77, + day, + }); + }); + + test("warns for unrecognized paths", () => { + jest.spyOn(log, "warn").mockImplementation(() => null); + + const day = new Date("2024-03-04"); + const rows = util.parseUsage(day, [ + ["whatev", "99"], + ["robots.txt", "88"], + ["string/string", "77"], + ]); + + expect(rows.length).toEqual(0); + expect(log.warn).toHaveBeenCalledTimes(2); + expect(log.warn.mock.calls[0][0]).toMatch(/unrecognized uri/); + expect(log.warn.mock.calls[1][0]).toMatch(/unrecognized uri/); + }); + }); });