Skip to content

Commit

Permalink
Merge pull request #183 from permitio/omer/per-10768-fix-pdp-datasync…
Browse files Browse the repository at this point in the history
…-issues

Fix PDP not checking if datasync is enabled everywhere
  • Loading branch information
omer9564 authored Sep 22, 2024
2 parents 4826a5a + 0427c05 commit 8410d25
Showing 1 changed file with 49 additions and 36 deletions.
85 changes: 49 additions & 36 deletions horizon/data_manager/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,29 +108,31 @@ def __init__(
offline_mode_enabled: bool = False,
shard_id: Optional[str] = None,
):
self._data_manager_runner = DataManagerRunner(
data_manager_url=sidecar_config.DATA_MANAGER_SERVICE_URL,
data_manager_binary_path=sidecar_config.DATA_MANAGER_BINARY_PATH,
data_manager_token=opal_client_config.CLIENT_TOKEN,
data_manager_remote_backup_url=sidecar_config.DATA_MANAGER_REMOTE_BACKUP_URL,
engine_token=sidecar_config.API_KEY,
piped_logs_format=EngineLogFormat.FULL,
)
policy_store = policy_store or DataManagerPolicyStoreClient(
data_manager_client=lambda: self._data_manager_runner.client,
opa_server_url=opal_client_config.POLICY_STORE_URL,
opa_auth_token=opal_client_config.POLICY_STORE_AUTH_TOKEN,
auth_type=opal_client_config.POLICY_STORE_AUTH_TYPE,
oauth_client_id=opal_client_config.POLICY_STORE_AUTH_OAUTH_CLIENT_ID,
oauth_client_secret=opal_client_config.POLICY_STORE_AUTH_OAUTH_CLIENT_SECRET,
oauth_server=opal_client_config.POLICY_STORE_AUTH_OAUTH_SERVER,
data_updater_enabled=opal_client_config.DATA_UPDATER_ENABLED,
policy_updater_enabled=opal_client_config.POLICY_UPDATER_ENABLED,
cache_policy_data=opal_client_config.OFFLINE_MODE_ENABLED,
tls_client_cert=opal_client_config.POLICY_STORE_TLS_CLIENT_CERT,
tls_client_key=opal_client_config.POLICY_STORE_TLS_CLIENT_KEY,
tls_ca=opal_client_config.POLICY_STORE_TLS_CA,
)
self._enable_external_data_manager = sidecar_config.ENABLE_EXTERNAL_DATA_MANAGER
if self._enable_external_data_manager:
self._data_manager_runner = DataManagerRunner(
data_manager_url=sidecar_config.DATA_MANAGER_SERVICE_URL,
data_manager_binary_path=sidecar_config.DATA_MANAGER_BINARY_PATH,
data_manager_token=opal_client_config.CLIENT_TOKEN,
data_manager_remote_backup_url=sidecar_config.DATA_MANAGER_REMOTE_BACKUP_URL,
engine_token=sidecar_config.API_KEY,
piped_logs_format=EngineLogFormat.FULL,
)
policy_store = policy_store or DataManagerPolicyStoreClient(
data_manager_client=lambda: self._data_manager_runner.client,
opa_server_url=opal_client_config.POLICY_STORE_URL,
opa_auth_token=opal_client_config.POLICY_STORE_AUTH_TOKEN,
auth_type=opal_client_config.POLICY_STORE_AUTH_TYPE,
oauth_client_id=opal_client_config.POLICY_STORE_AUTH_OAUTH_CLIENT_ID,
oauth_client_secret=opal_client_config.POLICY_STORE_AUTH_OAUTH_CLIENT_SECRET,
oauth_server=opal_client_config.POLICY_STORE_AUTH_OAUTH_SERVER,
data_updater_enabled=opal_client_config.DATA_UPDATER_ENABLED,
policy_updater_enabled=opal_client_config.POLICY_UPDATER_ENABLED,
cache_policy_data=opal_client_config.OFFLINE_MODE_ENABLED,
tls_client_cert=opal_client_config.POLICY_STORE_TLS_CLIENT_CERT,
tls_client_key=opal_client_config.POLICY_STORE_TLS_CLIENT_KEY,
tls_ca=opal_client_config.POLICY_STORE_TLS_CA,
)
super().__init__(
policy_store_type=policy_store_type,
policy_store=policy_store,
Expand Down Expand Up @@ -168,29 +170,40 @@ async def stop_data_manager_runner(self):
await self._data_manager_runner.stop()

async def check_healthy(self) -> bool:
opal_health = await super().check_healthy()
if not opal_health:
try:
opal_health = await super().check_healthy()
if not opal_health:
return False
if self._enable_external_data_manager:
return await self._data_manager_runner.is_healthy()
except Exception as e:
logger.exception("Error checking health: {e}", e=e)
return False
return await self._data_manager_runner.is_healthy()
else:
return True

async def check_ready(self) -> bool:
opal_ready = await super().check_ready()
if not opal_ready:
try:
opal_ready = await super().check_ready()
if not opal_ready:
return False
if self._enable_external_data_manager:
return await self._data_manager_runner.is_ready()
except Exception as e:
logger.exception("Error checking ready: {e}", e=e)
return False
return await self._data_manager_runner.is_ready()
else:
return True

async def start_client_background_tasks(
self,
*,
data_manager_runner_enabled: bool = sidecar_config.ENABLE_EXTERNAL_DATA_MANAGER
):
async def start_client_background_tasks(self):
tasks = [super().start_client_background_tasks()]
if data_manager_runner_enabled:
if self._enable_external_data_manager:
logger.info("Starting Data Manager runner")
tasks.append(self.start_data_manager_runner())
await asyncio.gather(*tasks)

async def stop_client_background_tasks(self):
"""stops all background tasks (called on shutdown event)"""
await super().stop_client_background_tasks()
await self.stop_data_manager_runner()
if self._enable_external_data_manager:
await self.stop_data_manager_runner()

0 comments on commit 8410d25

Please sign in to comment.