Skip to content

Commit

Permalink
Add support for Citus
Browse files Browse the repository at this point in the history
We make all queries using pg_stat_activity view templated so that they
can use citus_stat_activity view when the --citus option is passed.
  • Loading branch information
dlax committed Aug 27, 2024
1 parent f728f5b commit c25799c
Show file tree
Hide file tree
Showing 26 changed files with 132 additions and 100 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
configuration. (Requires Python 3.9 or higher.)
* Add a `y` command to copy focused query to the system clipboard, using
OSC 52 escape sequence (#311).
* Support Citus query activity (`citus_stat_activity`) views, through a new
`--citus` command-line option.

### Fixed

Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ ex:
--rds Enable support for AWS RDS (implies --no-tempfiles and
filters out the rdsadmin database from space
calculation).
--citus Enable support for Citus.
--output FILEPATH Store running queries as CSV.
--db-size, --no-db-size
Enable/disable total size of DB.
Expand Down
5 changes: 5 additions & 0 deletions docs/man/pg_activity.1
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,11 @@ required by another session. It shows following information:
.Vb 1
\& Enable support for AWS RDS (implies \-\-no\-tempfiles and filters out the rdsadmin database from space calculation).
.Ve
.IP "\fB\-\-citus\fR" 2
.IX Item "--citus"
.Vb 1
\& Enable support for Citus.
.Ve
.IP "\fB\-\-output=FILEPATH\fR" 2
.IX Item "--output=FILEPATH"
.Vb 1
Expand Down
4 changes: 4 additions & 0 deletions docs/man/pg_activity.pod
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,10 @@ required by another session. It shows following information:

Enable support for AWS RDS (implies --no-tempfiles and filters out the rdsadmin database from space calculation).

=item B<--citus>

Enable support for Citus.

=item B<--output=FILEPATH>

Store running queries as CSV.
Expand Down
7 changes: 7 additions & 0 deletions pgactivity/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,13 @@ def get_parser() -> argparse.ArgumentParser:
help="Enable support for AWS RDS (implies --no-tempfiles and filters out the rdsadmin database from space calculation).",
default=False,
)
group.add_argument(
"--citus",
dest="citus",
action="store_true",
help="Enable support for Citus.",
default=False,
)
group.add_argument(
"--output",
dest="output",
Expand Down
12 changes: 11 additions & 1 deletion pgactivity/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ class Data:
filters: Filters
dsn_parameters: dict[str, str]
failed_queries: FailedQueriesInfo
pg_stat_activity: str

@classmethod
def pg_connect(
Expand All @@ -85,6 +86,7 @@ def pg_connect(
password: str | None = None,
database: str = "postgres",
rds_mode: bool = False,
citus: bool = False,
dsn: str = "",
hide_queries_in_logs: bool = False,
filters: Filters = NO_FILTER,
Expand Down Expand Up @@ -115,6 +117,7 @@ def pg_connect(
failed_queries=FailedQueriesInfo(),
filters=filters,
dsn_parameters=pg.connection_parameters(pg_conn),
pg_stat_activity="citus_stat_activity" if citus else "pg_stat_activity",
)

def try_reconnect(self) -> Data | None:
Expand Down Expand Up @@ -320,7 +323,10 @@ def pg_get_server_information(
else:
query = queries.get("get_server_info_oldest")

qs = sql.SQL(query).format(dbname_filter=self.dbname_filter)
qs = sql.SQL(query).format(
pg_stat_activity=sql.Identifier(self.pg_stat_activity),
dbname_filter=self.dbname_filter,
)
try:
ret = pg.fetchone(
self.pg_conn,
Expand Down Expand Up @@ -410,6 +416,7 @@ def pg_get_activities(self, duration_mode: int = 1) -> list[RunningProcess]:

duration_column = self.get_duration_column(duration_mode)
query = sql.SQL(qs).format(
pg_stat_activity=sql.Identifier(self.pg_stat_activity),
dbname_filter=self.dbname_filter,
duration_column=sql.Identifier(duration_column),
min_duration=sql.Literal(self.min_duration),
Expand Down Expand Up @@ -437,6 +444,7 @@ def pg_get_waiting(self, duration_mode: int = 1) -> list[WaitingProcess]:

duration_column = self.get_duration_column(duration_mode)
query = sql.SQL(qs).format(
pg_stat_activity=sql.Identifier(self.pg_stat_activity),
dbname_filter=self.dbname_filter,
duration_column=sql.Identifier(duration_column),
min_duration=sql.Literal(self.min_duration),
Expand Down Expand Up @@ -466,6 +474,7 @@ def pg_get_blocking(self, duration_mode: int = 1) -> list[BlockingProcess]:

duration_column = self.get_duration_column(duration_mode)
query = sql.SQL(qs).format(
pg_stat_activity=sql.Identifier(self.pg_stat_activity),
dbname_filter=self.dbname_filter,
duration_column=sql.Identifier(duration_column),
min_duration=sql.Literal(self.min_duration),
Expand Down Expand Up @@ -527,6 +536,7 @@ def pg_connect(
password=password,
database=options.dbname,
rds_mode=options.rds,
citus=options.citus,
min_duration=min_duration,
filters=filters,
hide_queries_in_logs=options.hide_queries_in_logs,
Expand Down
48 changes: 24 additions & 24 deletions pgactivity/queries/get_blocking_oldest.sql
Original file line number Diff line number Diff line change
Expand Up @@ -27,21 +27,21 @@ SELECT
SELECT
blocking.pid,
'<unknown>' AS application_name,
pg_stat_activity.current_query AS query,
{pg_stat_activity}.current_query AS query,
blocking.mode,
pg_stat_activity.datname,
pg_stat_activity.datid,
pg_stat_activity.usename,
pg_stat_activity.client_addr AS client,
{pg_stat_activity}.datname,
{pg_stat_activity}.datid,
{pg_stat_activity}.usename,
{pg_stat_activity}.client_addr AS client,
blocking.locktype,
EXTRACT(epoch FROM (NOW() - pg_stat_activity.{duration_column})) AS duration,
EXTRACT(epoch FROM (NOW() - {pg_stat_activity}.{duration_column})) AS duration,
NULL AS state,
blocking.relation::regclass AS relation,
pg_stat_activity.waiting
{pg_stat_activity}.waiting
FROM
pg_locks AS blocking
JOIN pg_locks AS blocked ON (blocking.transactionid = blocked.transactionid AND blocking.locktype = blocked.locktype)
JOIN pg_stat_activity ON (blocking.pid = pg_stat_activity.procpid)
JOIN {pg_stat_activity} ON (blocking.pid = {pg_stat_activity}.procpid)
WHERE
blocking.granted
AND NOT blocked.granted
Expand All @@ -57,21 +57,21 @@ SELECT
SELECT
blocking.pid,
'<unknown>' AS application_name,
pg_stat_activity.current_query AS query,
{pg_stat_activity}.current_query AS query,
blocking.mode,
pg_stat_activity.datname,
pg_stat_activity.datid,
pg_stat_activity.usename,
pg_stat_activity.client_addr AS client,
{pg_stat_activity}.datname,
{pg_stat_activity}.datid,
{pg_stat_activity}.usename,
{pg_stat_activity}.client_addr AS client,
blocking.locktype,
EXTRACT(epoch FROM (NOW() - pg_stat_activity.{duration_column})) AS duration,
EXTRACT(epoch FROM (NOW() - {pg_stat_activity}.{duration_column})) AS duration,
NULL AS state,
blocking.relation::regclass AS relation,
pg_stat_activity.waiting
{pg_stat_activity}.waiting
FROM
pg_locks AS blocking
JOIN pg_locks AS blocked ON (blocking.virtualxid = blocked.virtualxid AND blocking.locktype = blocked.locktype)
JOIN pg_stat_activity ON (blocking.pid = pg_stat_activity.procpid)
JOIN {pg_stat_activity} ON (blocking.pid = {pg_stat_activity}.procpid)
WHERE
blocking.granted
AND NOT blocked.granted
Expand All @@ -87,21 +87,21 @@ SELECT
SELECT
blocking.pid,
'<unknown>' AS application_name,
pg_stat_activity.current_query AS query,
{pg_stat_activity}.current_query AS query,
blocking.mode,
pg_stat_activity.datname,
pg_stat_activity.datid,
pg_stat_activity.usename,
pg_stat_activity.client_addr AS client,
{pg_stat_activity}.datname,
{pg_stat_activity}.datid,
{pg_stat_activity}.usename,
{pg_stat_activity}.client_addr AS client,
blocking.locktype,
EXTRACT(epoch FROM (NOW() - pg_stat_activity.{duration_column})) AS duration,
EXTRACT(epoch FROM (NOW() - {pg_stat_activity}.{duration_column})) AS duration,
NULL AS state,
blocking.relation::regclass AS relation,
pg_stat_activity.waiting
{pg_stat_activity}.waiting
FROM
pg_locks AS blocking
JOIN pg_locks AS blocked ON (blocking.database = blocked.database AND blocking.relation = blocked.relation AND blocking.locktype = blocked.locktype)
JOIN pg_stat_activity ON (blocking.pid = pg_stat_activity.procpid)
JOIN {pg_stat_activity} ON (blocking.pid = {pg_stat_activity}.procpid)
WHERE
blocking.granted
AND NOT blocked.granted
Expand Down
60 changes: 30 additions & 30 deletions pgactivity/queries/get_blocking_post_090200.sql
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,22 @@ SELECT
-- Transaction id lock
SELECT
blocking.pid,
pg_stat_activity.application_name,
pg_stat_activity.query,
{pg_stat_activity}.application_name,
{pg_stat_activity}.query,
blocking.mode,
pg_stat_activity.datname,
pg_stat_activity.datid,
pg_stat_activity.usename,
pg_stat_activity.client_addr AS client,
{pg_stat_activity}.datname,
{pg_stat_activity}.datid,
{pg_stat_activity}.usename,
{pg_stat_activity}.client_addr AS client,
blocking.locktype,
EXTRACT(epoch FROM (NOW() - pg_stat_activity.{duration_column})) AS duration,
pg_stat_activity.state as state,
EXTRACT(epoch FROM (NOW() - {pg_stat_activity}.{duration_column})) AS duration,
{pg_stat_activity}.state as state,
blocking.relation::regclass AS relation,
pg_stat_activity.waiting
{pg_stat_activity}.waiting
FROM
pg_locks AS blocking
JOIN pg_locks AS blocked ON (blocking.transactionid = blocked.transactionid AND blocking.locktype = blocked.locktype)
JOIN pg_stat_activity ON (blocking.pid = pg_stat_activity.pid)
JOIN {pg_stat_activity} ON (blocking.pid = {pg_stat_activity}.pid)
WHERE
blocking.granted
AND NOT blocked.granted
Expand All @@ -51,22 +51,22 @@ SELECT
-- VirtualXid Lock
SELECT
blocking.pid,
pg_stat_activity.application_name,
pg_stat_activity.query,
{pg_stat_activity}.application_name,
{pg_stat_activity}.query,
blocking.mode,
pg_stat_activity.datname,
pg_stat_activity.datid,
pg_stat_activity.usename,
pg_stat_activity.client_addr AS client,
{pg_stat_activity}.datname,
{pg_stat_activity}.datid,
{pg_stat_activity}.usename,
{pg_stat_activity}.client_addr AS client,
blocking.locktype,
EXTRACT(epoch FROM (NOW() - pg_stat_activity.{duration_column})) AS duration,
pg_stat_activity.state as state,
EXTRACT(epoch FROM (NOW() - {pg_stat_activity}.{duration_column})) AS duration,
{pg_stat_activity}.state as state,
blocking.relation::regclass AS relation,
pg_stat_activity.waiting
{pg_stat_activity}.waiting
FROM
pg_locks AS blocking
JOIN pg_locks AS blocked ON (blocking.virtualxid = blocked.virtualxid AND blocking.locktype = blocked.locktype)
JOIN pg_stat_activity ON (blocking.pid = pg_stat_activity.pid)
JOIN {pg_stat_activity} ON (blocking.pid = {pg_stat_activity}.pid)
WHERE
blocking.granted
AND NOT blocked.granted
Expand All @@ -81,22 +81,22 @@ SELECT
-- Relation or tuple Lock
SELECT
blocking.pid,
pg_stat_activity.application_name,
pg_stat_activity.query,
{pg_stat_activity}.application_name,
{pg_stat_activity}.query,
blocking.mode,
pg_stat_activity.datname,
pg_stat_activity.datid,
pg_stat_activity.usename,
pg_stat_activity.client_addr AS client,
{pg_stat_activity}.datname,
{pg_stat_activity}.datid,
{pg_stat_activity}.usename,
{pg_stat_activity}.client_addr AS client,
blocking.locktype,
EXTRACT(epoch FROM (NOW() - pg_stat_activity.{duration_column})) AS duration,
pg_stat_activity.state as state,
EXTRACT(epoch FROM (NOW() - {pg_stat_activity}.{duration_column})) AS duration,
{pg_stat_activity}.state as state,
blocking.relation::regclass AS relation,
pg_stat_activity.waiting
{pg_stat_activity}.waiting
FROM
pg_locks AS blocking
JOIN pg_locks AS blocked ON (blocking.database = blocked.database AND blocking.relation = blocked.relation AND blocking.locktype = blocked.locktype)
JOIN pg_stat_activity ON (blocking.pid = pg_stat_activity.pid)
JOIN {pg_stat_activity} ON (blocking.pid = {pg_stat_activity}.pid)
WHERE
blocking.granted
AND NOT blocked.granted
Expand Down
Loading

0 comments on commit c25799c

Please sign in to comment.