Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix change detection condition on KeeperMap read-only step #256

Merged
merged 1 commit into from
Nov 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 0 additions & 26 deletions astacus/coordinator/plugins/clickhouse/steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,39 +191,13 @@ async def revoke_write_on_table(self, table: Table, user_name: bytes) -> None:
f"REVOKE INSERT, ALTER UPDATE, ALTER DELETE ON {table.escaped_sql_identifier} FROM {escaped_user_name}"
)
await asyncio.gather(*(client.execute(revoke_statement.encode()) for client in self.clients))
await self.wait_for_access_type_grant(user_name=user_name, table=table, expected_count=0)

async def grant_write_on_table(self, table: Table, user_name: bytes) -> None:
escaped_user_name = escape_sql_identifier(user_name)
grant_statement = (
f"GRANT INSERT, ALTER UPDATE, ALTER DELETE ON {table.escaped_sql_identifier} TO {escaped_user_name}"
)
await asyncio.gather(*(client.execute(grant_statement.encode()) for client in self.clients))
await self.wait_for_access_type_grant(user_name=user_name, table=table, expected_count=3)

async def wait_for_access_type_grant(self, *, table: Table, user_name: bytes, expected_count: int) -> None:
escaped_user_name = escape_sql_string(user_name)
escaped_database = escape_sql_string(table.database)
escaped_table = escape_sql_string(table.name)

async def check_function_count(client: ClickHouseClient) -> bool:
statement = (
f"SELECT count() FROM grants "
f"WHERE user_name={escaped_user_name} "
f"AND database={escaped_database} "
f"AND table={escaped_table} "
f"AND access_type IN ('INSERT', 'ALTER UPDATE', 'ALTER DELETE')"
)
num_grants_response = await client.execute(statement.encode())
num_grants = int(cast(str, num_grants_response[0][0]))
return num_grants == expected_count

await wait_for_condition_on_every_node(
clients=self.clients,
condition=check_function_count,
description="access grants changes to be enforced",
timeout_seconds=60,
)

async def run_step(self, cluster: Cluster, context: StepsContext) -> None:
_, tables = context.get_result(RetrieveDatabasesAndTablesStep)
Expand Down
106 changes: 82 additions & 24 deletions tests/integration/coordinator/plugins/clickhouse/test_steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
"""

from .conftest import ClickHouseCommand, create_clickhouse_cluster, get_clickhouse_client, MinioBucket
from .test_plugin import setup_cluster_users
from astacus.coordinator.cluster import Cluster
from astacus.coordinator.plugins.base import StepsContext
from astacus.coordinator.plugins.clickhouse.client import ClickHouseClient, ClickHouseClientQueryError, HttpClickHouseClient
Expand Down Expand Up @@ -106,7 +105,47 @@ async def test_retrieve_tables(ports: Ports, clickhouse_command: ClickHouseComma
class KeeperMapInfo(NamedTuple):
context: StepsContext
clickhouse_client: ClickHouseClient
user_client: ClickHouseClient
user_clients: Sequence[ClickHouseClient]


def get_grant_database_usage_queries(user_name: str, database_names: Sequence[str]) -> list[bytes]:
database_privileges = [
"ALTER UPDATE",
"ALTER DELETE",
"ALTER COLUMN",
"ALTER MODIFY COMMENT",
"ALTER INDEX",
"ALTER PROJECTION",
"ALTER CONSTRAINT",
"ALTER TTL",
"ALTER SETTINGS",
"ALTER VIEW",
"CREATE TABLE",
"DROP TABLE",
"INSERT",
"OPTIMIZE",
"SELECT",
"SHOW",
"SYSTEM SYNC REPLICA",
"TRUNCATE",
]
database_privileges_str = ", ".join(database_privileges)
return [
f"GRANT {database_privileges_str} ON {database_name}.* TO {user_name} WITH GRANT OPTION".encode()
for database_name in database_names
]


async def create_user_with_privileges(admin_client: ClickHouseClient, *, username: str, password: str) -> None:
await admin_client.execute(f"CREATE USER {username} IDENTIFIED WITH sha256_password BY '{password}'".encode())
await admin_client.execute(f"GRANT ACCESS MANAGEMENT ON *.* TO {username} WITH GRANT OPTION".encode())
response_rows = cast(
Sequence[tuple[str]],
await admin_client.execute(b"SELECT name FROM system.databases WHERE engine == 'Replicated' ORDER BY name"),
)
database_names = [row[0] for row in response_rows]
for query in get_grant_database_usage_queries(username, database_names):
await admin_client.execute(query)


@pytest.fixture(name="keeper_table_context")
Expand All @@ -119,34 +158,33 @@ async def fixture_keeper_table_context(
):
clickhouse = clickhouse_cluster.services[0]
admin_client = get_clickhouse_client(clickhouse)
await setup_cluster_users([admin_client])
for statement in [
b"CREATE DATABASE `keeperdata` ENGINE = Replicated('/clickhouse/databases/keeperdata', '{my_shard}', '{my_replica}')",
b"CREATE TABLE `keeperdata`.`keepertable` (thekey UInt32, thevalue UInt32) ENGINE = KeeperMap('test', 1000) PRIMARY KEY thekey",
b"INSERT INTO `keeperdata`.`keepertable` SELECT *, materialize(1) FROM numbers(3)",
b"CREATE USER bob IDENTIFIED WITH sha256_password BY 'secret'",
b"GRANT INSERT, SELECT, UPDATE, DELETE ON `keeperdata`.`keepertable` TO `bob`",
]:
await admin_client.execute(statement)
user_client = HttpClickHouseClient(
await admin_client.execute(
b"CREATE DATABASE `keeperdata` ENGINE = Replicated('/clickhouse/databases/keeperdata', '{my_shard}', '{my_replica}')"
)
await create_user_with_privileges(admin_client, username="foobar", password="secret")
foobar_client = HttpClickHouseClient(
host=clickhouse.host,
port=clickhouse.port,
username="bob",
username="foobar",
password="secret",
timeout=10,
)
for statement in [
b"CREATE TABLE `keeperdata`.`keepertable` (thekey UInt32, thevalue UInt32) ENGINE = KeeperMap('test', 1000) PRIMARY KEY thekey",
b"INSERT INTO `keeperdata`.`keepertable` SELECT *, materialize(1) FROM numbers(3)",
b"CREATE USER alice",
b"GRANT SELECT, INSERT, UPDATE, DELETE on `keeperdata`.* TO alice",
]:
await foobar_client.execute(statement)
alice_client = HttpClickHouseClient(host=clickhouse.host, port=clickhouse.port, username="alice", timeout=10)
step = RetrieveDatabasesAndTablesStep(clients=[admin_client])
context = StepsContext()
databases_tables_result = await step.run_step(Cluster(nodes=[]), context=context)
context.set_result(RetrieveDatabasesAndTablesStep, databases_tables_result)
yield KeeperMapInfo(context, admin_client, user_client)
yield KeeperMapInfo(context, admin_client, [foobar_client, alice_client])


async def test_keeper_map_table_select_only_setting_modified(keeper_table_context: KeeperMapInfo) -> None:
steps_context, admin_client, user_client = keeper_table_context
read_only_step = KeeperMapTablesReadOnlyStep(clients=[admin_client], allow_writes=False)
# After the read-only step, the user should only be able to select from the table
await read_only_step.run_step(Cluster(nodes=[]), steps_context)
async def check_read_only(user_client: ClickHouseClient) -> None:
with pytest.raises(ClickHouseClientQueryError, match=".*ACCESS_DENIED.*"):
await user_client.execute(
b"INSERT INTO `keeperdata`.`keepertable` SETTINGS wait_for_async_insert=1 SELECT *, materialize(2) FROM numbers(3)"
Expand All @@ -159,14 +197,14 @@ async def test_keeper_map_table_select_only_setting_modified(keeper_table_contex
Sequence[tuple[int]], await user_client.execute(b"SELECT count() FROM `keeperdata`.`keepertable`")
)
assert int(read_only_row_count[0][0]) == 3
# After the read-write step, the user should be able to write, update and delete from the table
read_write_step = KeeperMapTablesReadOnlyStep(clients=[admin_client], allow_writes=True)
await read_write_step.run_step(Cluster(nodes=[]), steps_context)
await user_client.execute(b"INSERT INTO `keeperdata`.`keepertable` SELECT *, materialize(3) FROM numbers(3, 3)")


async def check_read_write(user_client: ClickHouseClient) -> None:
await user_client.execute(b"INSERT INTO `keeperdata`.`keepertable` SELECT *, materialize(10) FROM numbers(3)")
read_write_row_count = cast(
Sequence[tuple[int]], await user_client.execute(b"SELECT count() FROM `keeperdata`.`keepertable`")
)
assert int(read_write_row_count[0][0]) == 6
assert int(read_write_row_count[0][0]) == 3
await user_client.execute(b"ALTER TABLE `keeperdata`.`keepertable` UPDATE thevalue = 3 WHERE thekey < 20")
current_values = await user_client.execute(b"SELECT thevalue FROM `keeperdata`.`keepertable` ORDER BY thekey")
assert all(int(cast(str, val[0])) == 3 for val in current_values)
Expand All @@ -175,3 +213,23 @@ async def test_keeper_map_table_select_only_setting_modified(keeper_table_contex
Sequence[tuple[int]], await user_client.execute(b"SELECT count() FROM `keeperdata`.`keepertable`")
)
assert int(post_delete_row_count[0][0]) == 0


async def test_keeper_map_table_read_only_step(keeper_table_context: KeeperMapInfo) -> None:
steps_context, admin_client, user_clients = keeper_table_context
read_only_step = KeeperMapTablesReadOnlyStep(clients=[admin_client], allow_writes=False)
# After the read-only step, users should only be able to select from the table
await read_only_step.run_step(Cluster(nodes=[]), steps_context)
for user_client in user_clients:
await check_read_only(user_client)
# After the read-write step, users should be able to write, update and delete from the table
read_write_step = KeeperMapTablesReadOnlyStep(clients=[admin_client], allow_writes=True)
await read_write_step.run_step(Cluster(nodes=[]), steps_context)
# Clean up table so that each user starts from a clean slate
await admin_client.execute(b"TRUNCATE TABLE `keeperdata`.`keepertable`")
post_delete_row_count = cast(
Sequence[tuple[int]], await admin_client.execute(b"SELECT count() FROM `keeperdata`.`keepertable`")
)
assert int(post_delete_row_count[0][0]) == 0
for user_client in user_clients:
await check_read_write(user_client)
8 changes: 0 additions & 8 deletions tests/unit/coordinator/plugins/clickhouse/test_steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -1299,15 +1299,13 @@ async def test_restore_keeper_map_table_data_step() -> None:
[
b"SELECT base64Encode(name) FROM system.users WHERE storage = 'replicated' ORDER BY name",
b"REVOKE INSERT, ALTER UPDATE, ALTER DELETE ON `db-two`.`table-keeper` FROM `alice`",
b"SELECT count() FROM grants WHERE user_name='alice' AND database='db-two' AND table='table-keeper' AND access_type IN ('INSERT', 'ALTER UPDATE', 'ALTER DELETE')",
],
),
(
True,
[
b"SELECT base64Encode(name) FROM system.users WHERE storage = 'replicated' ORDER BY name",
b"GRANT INSERT, ALTER UPDATE, ALTER DELETE ON `db-two`.`table-keeper` TO `alice`",
b"SELECT count() FROM grants WHERE user_name='alice' AND database='db-two' AND table='table-keeper' AND access_type IN ('INSERT', 'ALTER UPDATE', 'ALTER DELETE')",
],
),
],
Expand All @@ -1319,12 +1317,6 @@ async def test_keeper_map_table_select_only_setting_modified(allow_writes: bool,
def execute_side_effect(statement: bytes) -> list[Row]:
if statement == b"SELECT base64Encode(name) FROM system.users WHERE storage = 'replicated' ORDER BY name":
return [[base64.b64encode(b"alice").decode()]]
elif (
statement
== b"SELECT count() FROM grants WHERE user_name='alice' AND database='db-two' AND table='table-keeper' AND access_type IN ('INSERT', 'ALTER UPDATE', 'ALTER DELETE')"
):
num_expected_grants = 3 if allow_writes else 0
return [[num_expected_grants]]
return []

clickhouse_client.execute.side_effect = execute_side_effect
Expand Down
Loading