Skip to content

Commit

Permalink
Fix change detection condition on KeeperMap read-only step
Browse files Browse the repository at this point in the history
Unfortunately looking at `system.grants` to check whether specific grants have
been successfully revoked in ClickHouse is not perfect.

Using the system table or parsing the output or ``SHOW GRANTS FOR <user>`` makes
the step more brittle, and doesn't necessarily lead to better garantees. The fix
is to remove the watcher.

More details:

Let a user with a set of initial grants, including some ALTER statements. Doing
``REVOKE INSERT, ALTER UPDATE, ALTER DELETE ON <user> FROM <table>`` on this
user creates rows in ``system.grants`` for each of the grants with ``is_partial_revoke=1``.

If the user doesn't have ``ALTER`` statements in their initial grants, then
after executing ``REVOKE`` the ``system.grants`` table simply has no
corresponding rows.
  • Loading branch information
aris-aiven committed Nov 6, 2024
1 parent 86b5089 commit 0a7b163
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 58 deletions.
26 changes: 0 additions & 26 deletions astacus/coordinator/plugins/clickhouse/steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,39 +191,13 @@ async def revoke_write_on_table(self, table: Table, user_name: bytes) -> None:
f"REVOKE INSERT, ALTER UPDATE, ALTER DELETE ON {table.escaped_sql_identifier} FROM {escaped_user_name}"
)
await asyncio.gather(*(client.execute(revoke_statement.encode()) for client in self.clients))
await self.wait_for_access_type_grant(user_name=user_name, table=table, expected_count=0)

async def grant_write_on_table(self, table: Table, user_name: bytes) -> None:
escaped_user_name = escape_sql_identifier(user_name)
grant_statement = (
f"GRANT INSERT, ALTER UPDATE, ALTER DELETE ON {table.escaped_sql_identifier} TO {escaped_user_name}"
)
await asyncio.gather(*(client.execute(grant_statement.encode()) for client in self.clients))
await self.wait_for_access_type_grant(user_name=user_name, table=table, expected_count=3)

async def wait_for_access_type_grant(self, *, table: Table, user_name: bytes, expected_count: int) -> None:
escaped_user_name = escape_sql_string(user_name)
escaped_database = escape_sql_string(table.database)
escaped_table = escape_sql_string(table.name)

async def check_function_count(client: ClickHouseClient) -> bool:
statement = (
f"SELECT count() FROM grants "
f"WHERE user_name={escaped_user_name} "
f"AND database={escaped_database} "
f"AND table={escaped_table} "
f"AND access_type IN ('INSERT', 'ALTER UPDATE', 'ALTER DELETE')"
)
num_grants_response = await client.execute(statement.encode())
num_grants = int(cast(str, num_grants_response[0][0]))
return num_grants == expected_count

await wait_for_condition_on_every_node(
clients=self.clients,
condition=check_function_count,
description="access grants changes to be enforced",
timeout_seconds=60,
)

async def run_step(self, cluster: Cluster, context: StepsContext) -> None:
_, tables = context.get_result(RetrieveDatabasesAndTablesStep)
Expand Down
106 changes: 82 additions & 24 deletions tests/integration/coordinator/plugins/clickhouse/test_steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
"""

from .conftest import ClickHouseCommand, create_clickhouse_cluster, get_clickhouse_client, MinioBucket
from .test_plugin import setup_cluster_users
from astacus.coordinator.cluster import Cluster
from astacus.coordinator.plugins.base import StepsContext
from astacus.coordinator.plugins.clickhouse.client import ClickHouseClient, ClickHouseClientQueryError, HttpClickHouseClient
Expand Down Expand Up @@ -106,7 +105,47 @@ async def test_retrieve_tables(ports: Ports, clickhouse_command: ClickHouseComma
class KeeperMapInfo(NamedTuple):
context: StepsContext
clickhouse_client: ClickHouseClient
user_client: ClickHouseClient
user_clients: Sequence[ClickHouseClient]


def get_grant_database_usage_queries(user_name: str, database_names: Sequence[str]) -> list[bytes]:
database_privileges = [
"ALTER UPDATE",
"ALTER DELETE",
"ALTER COLUMN",
"ALTER MODIFY COMMENT",
"ALTER INDEX",
"ALTER PROJECTION",
"ALTER CONSTRAINT",
"ALTER TTL",
"ALTER SETTINGS",
"ALTER VIEW",
"CREATE TABLE",
"DROP TABLE",
"INSERT",
"OPTIMIZE",
"SELECT",
"SHOW",
"SYSTEM SYNC REPLICA",
"TRUNCATE",
]
database_privileges_str = ", ".join(database_privileges)
return [
f"GRANT {database_privileges_str} ON {database_name}.* TO {user_name} WITH GRANT OPTION".encode()
for database_name in database_names
]


async def create_user_with_privileges(admin_client: ClickHouseClient, *, username: str, password: str) -> None:
await admin_client.execute(f"CREATE USER {username} IDENTIFIED WITH sha256_password BY '{password}'".encode())
await admin_client.execute(f"GRANT ACCESS MANAGEMENT ON *.* TO {username} WITH GRANT OPTION".encode())
response_rows = cast(
Sequence[tuple[str]],
await admin_client.execute(b"SELECT name FROM system.databases WHERE engine == 'Replicated' ORDER BY name"),
)
database_names = [row[0] for row in response_rows]
for query in get_grant_database_usage_queries(username, database_names):
await admin_client.execute(query)


@pytest.fixture(name="keeper_table_context")
Expand All @@ -119,34 +158,33 @@ async def fixture_keeper_table_context(
):
clickhouse = clickhouse_cluster.services[0]
admin_client = get_clickhouse_client(clickhouse)
await setup_cluster_users([admin_client])
for statement in [
b"CREATE DATABASE `keeperdata` ENGINE = Replicated('/clickhouse/databases/keeperdata', '{my_shard}', '{my_replica}')",
b"CREATE TABLE `keeperdata`.`keepertable` (thekey UInt32, thevalue UInt32) ENGINE = KeeperMap('test', 1000) PRIMARY KEY thekey",
b"INSERT INTO `keeperdata`.`keepertable` SELECT *, materialize(1) FROM numbers(3)",
b"CREATE USER bob IDENTIFIED WITH sha256_password BY 'secret'",
b"GRANT INSERT, SELECT, UPDATE, DELETE ON `keeperdata`.`keepertable` TO `bob`",
]:
await admin_client.execute(statement)
user_client = HttpClickHouseClient(
await admin_client.execute(
b"CREATE DATABASE `keeperdata` ENGINE = Replicated('/clickhouse/databases/keeperdata', '{my_shard}', '{my_replica}')"
)
await create_user_with_privileges(admin_client, username="foobar", password="secret")
foobar_client = HttpClickHouseClient(
host=clickhouse.host,
port=clickhouse.port,
username="bob",
username="foobar",
password="secret",
timeout=10,
)
for statement in [
b"CREATE TABLE `keeperdata`.`keepertable` (thekey UInt32, thevalue UInt32) ENGINE = KeeperMap('test', 1000) PRIMARY KEY thekey",
b"INSERT INTO `keeperdata`.`keepertable` SELECT *, materialize(1) FROM numbers(3)",
b"CREATE USER alice",
b"GRANT SELECT, INSERT, UPDATE, DELETE on `keeperdata`.* TO alice",
]:
await foobar_client.execute(statement)
alice_client = HttpClickHouseClient(host=clickhouse.host, port=clickhouse.port, username="alice", timeout=10)
step = RetrieveDatabasesAndTablesStep(clients=[admin_client])
context = StepsContext()
databases_tables_result = await step.run_step(Cluster(nodes=[]), context=context)
context.set_result(RetrieveDatabasesAndTablesStep, databases_tables_result)
yield KeeperMapInfo(context, admin_client, user_client)
yield KeeperMapInfo(context, admin_client, [foobar_client, alice_client])


async def test_keeper_map_table_select_only_setting_modified(keeper_table_context: KeeperMapInfo) -> None:
steps_context, admin_client, user_client = keeper_table_context
read_only_step = KeeperMapTablesReadOnlyStep(clients=[admin_client], allow_writes=False)
# After the read-only step, the user should only be able to select from the table
await read_only_step.run_step(Cluster(nodes=[]), steps_context)
async def check_read_only(user_client: ClickHouseClient) -> None:
with pytest.raises(ClickHouseClientQueryError, match=".*ACCESS_DENIED.*"):
await user_client.execute(
b"INSERT INTO `keeperdata`.`keepertable` SETTINGS wait_for_async_insert=1 SELECT *, materialize(2) FROM numbers(3)"
Expand All @@ -159,14 +197,14 @@ async def test_keeper_map_table_select_only_setting_modified(keeper_table_contex
Sequence[tuple[int]], await user_client.execute(b"SELECT count() FROM `keeperdata`.`keepertable`")
)
assert int(read_only_row_count[0][0]) == 3
# After the read-write step, the user should be able to write, update and delete from the table
read_write_step = KeeperMapTablesReadOnlyStep(clients=[admin_client], allow_writes=True)
await read_write_step.run_step(Cluster(nodes=[]), steps_context)
await user_client.execute(b"INSERT INTO `keeperdata`.`keepertable` SELECT *, materialize(3) FROM numbers(3, 3)")


async def check_read_write(user_client: ClickHouseClient) -> None:
await user_client.execute(b"INSERT INTO `keeperdata`.`keepertable` SELECT *, materialize(10) FROM numbers(3)")
read_write_row_count = cast(
Sequence[tuple[int]], await user_client.execute(b"SELECT count() FROM `keeperdata`.`keepertable`")
)
assert int(read_write_row_count[0][0]) == 6
assert int(read_write_row_count[0][0]) == 3
await user_client.execute(b"ALTER TABLE `keeperdata`.`keepertable` UPDATE thevalue = 3 WHERE thekey < 20")
current_values = await user_client.execute(b"SELECT thevalue FROM `keeperdata`.`keepertable` ORDER BY thekey")
assert all(int(cast(str, val[0])) == 3 for val in current_values)
Expand All @@ -175,3 +213,23 @@ async def test_keeper_map_table_select_only_setting_modified(keeper_table_contex
Sequence[tuple[int]], await user_client.execute(b"SELECT count() FROM `keeperdata`.`keepertable`")
)
assert int(post_delete_row_count[0][0]) == 0


async def test_keeper_map_table_read_only_step(keeper_table_context: KeeperMapInfo) -> None:
steps_context, admin_client, user_clients = keeper_table_context
read_only_step = KeeperMapTablesReadOnlyStep(clients=[admin_client], allow_writes=False)
# After the read-only step, users should only be able to select from the table
await read_only_step.run_step(Cluster(nodes=[]), steps_context)
for user_client in user_clients:
await check_read_only(user_client)
# After the read-write step, users should be able to write, update and delete from the table
read_write_step = KeeperMapTablesReadOnlyStep(clients=[admin_client], allow_writes=True)
await read_write_step.run_step(Cluster(nodes=[]), steps_context)
# Clean up table so that each user starts from a clean slate
await admin_client.execute(b"TRUNCATE TABLE `keeperdata`.`keepertable`")
post_delete_row_count = cast(
Sequence[tuple[int]], await admin_client.execute(b"SELECT count() FROM `keeperdata`.`keepertable`")
)
assert int(post_delete_row_count[0][0]) == 0
for user_client in user_clients:
await check_read_write(user_client)
8 changes: 0 additions & 8 deletions tests/unit/coordinator/plugins/clickhouse/test_steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -1299,15 +1299,13 @@ async def test_restore_keeper_map_table_data_step() -> None:
[
b"SELECT base64Encode(name) FROM system.users WHERE storage = 'replicated' ORDER BY name",
b"REVOKE INSERT, ALTER UPDATE, ALTER DELETE ON `db-two`.`table-keeper` FROM `alice`",
b"SELECT count() FROM grants WHERE user_name='alice' AND database='db-two' AND table='table-keeper' AND access_type IN ('INSERT', 'ALTER UPDATE', 'ALTER DELETE')",
],
),
(
True,
[
b"SELECT base64Encode(name) FROM system.users WHERE storage = 'replicated' ORDER BY name",
b"GRANT INSERT, ALTER UPDATE, ALTER DELETE ON `db-two`.`table-keeper` TO `alice`",
b"SELECT count() FROM grants WHERE user_name='alice' AND database='db-two' AND table='table-keeper' AND access_type IN ('INSERT', 'ALTER UPDATE', 'ALTER DELETE')",
],
),
],
Expand All @@ -1319,12 +1317,6 @@ async def test_keeper_map_table_select_only_setting_modified(allow_writes: bool,
def execute_side_effect(statement: bytes) -> list[Row]:
if statement == b"SELECT base64Encode(name) FROM system.users WHERE storage = 'replicated' ORDER BY name":
return [[base64.b64encode(b"alice").decode()]]
elif (
statement
== b"SELECT count() FROM grants WHERE user_name='alice' AND database='db-two' AND table='table-keeper' AND access_type IN ('INSERT', 'ALTER UPDATE', 'ALTER DELETE')"
):
num_expected_grants = 3 if allow_writes else 0
return [[num_expected_grants]]
return []

clickhouse_client.execute.side_effect = execute_side_effect
Expand Down

0 comments on commit 0a7b163

Please sign in to comment.