Skip to content

Commit

Permalink
Merge pull request #71 from authorizon/policy_fetcher_handle_errors
Browse files Browse the repository at this point in the history
policy fetcher can handle error response codes
  • Loading branch information
asafc authored Jun 2, 2021
2 parents accc408 + d48b77a commit e281c12
Show file tree
Hide file tree
Showing 6 changed files with 96 additions and 66 deletions.
6 changes: 3 additions & 3 deletions opal_client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from opal_client.opa.runner import OpaRunner
from opal_client.opa.options import OpaServerOptions
from opal_client.policy.api import init_policy_router
from opal_client.policy.updater import PolicyUpdater, update_policy
from opal_client.policy.updater import PolicyUpdater


class OpalClient:
Expand Down Expand Up @@ -67,7 +67,7 @@ def __init__(
options=inline_opa_options,
rehydration_callbacks=[
# refetches policy code (e.g: rego) and static data from server
functools.partial(update_policy, policy_store=self.policy_store, force_full_update=True),
functools.partial(self.policy_updater.update_policy, force_full_update=True),
functools.partial(self.data_updater.get_base_policy_data, data_fetch_reason="policy store rehydration"),
]
)
Expand Down Expand Up @@ -99,7 +99,7 @@ def _configure_api_routes(self, app: FastAPI):
mounts the api routes on the app object
"""
# Init api routers with required dependencies
policy_router = init_policy_router(policy_store=self.policy_store)
policy_router = init_policy_router(policy_updater=self.policy_updater)
data_router = init_data_router(data_updater=self.data_updater)

# mount the api routes on the app object
Expand Down
2 changes: 0 additions & 2 deletions opal_client/data/api.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
from opal_common.schemas.data import DataUpdate
from fastapi import APIRouter, status
from starlette.status import HTTP_200_OK

from opal_common.logger import logger
from opal_client.data.updater import DataUpdater
Expand Down
9 changes: 3 additions & 6 deletions opal_client/policy/api.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,15 @@
from fastapi import APIRouter, status
from starlette.status import HTTP_200_OK

from opal_common.logger import logger
from opal_client.policy.updater import update_policy
from opal_client.policy_store import BasePolicyStoreClient, DEFAULT_POLICY_STORE_GETTER
from opal_client.policy.updater import PolicyUpdater

def init_policy_router(policy_store:BasePolicyStoreClient=None):
policy_store = policy_store or DEFAULT_POLICY_STORE_GETTER()
def init_policy_router(policy_updater: PolicyUpdater):
router = APIRouter()

@router.post("/policy-updater/trigger", status_code=status.HTTP_200_OK)
async def trigger_policy_update():
logger.info("triggered policy update from api")
await update_policy(policy_store)
await policy_updater.update_policy(force_full_update=True)
return {"status": "ok"}

return router
46 changes: 40 additions & 6 deletions opal_client/policy/fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from typing import List, Optional
from pydantic import ValidationError
from fastapi import status
from tenacity import retry, wait, stop

from opal_common.utils import get_authorization_header
from opal_common.schemas.policy import PolicyBundle
Expand All @@ -11,19 +12,33 @@
from opal_client.config import opal_client_config


def policy_bundle_or_none(bundle) -> Optional[PolicyBundle]:
def force_valid_bundle(bundle) -> PolicyBundle:
try:
return PolicyBundle(**bundle)
except ValidationError as e:
logger.warning("server returned invalid bundle: {err}", bundle=bundle, err=e)
return None
raise

async def throw_if_bad_status_code(response: aiohttp.ClientResponse, expected: List[int]) -> aiohttp.ClientResponse:
if response.status in expected:
return response

# else, bad status code
details = await response.json()
logger.warning("Unexpected response code {status}: {details}", status=response.status, details=details)
raise ValueError(f"unexpected response code while fetching bundle: {response.status}")

class PolicyFetcher:
"""
fetches policy from backend
"""
def __init__(self, backend_url=None, token=None):
DEFAULT_RETRY_CONFIG = {
'wait': wait.wait_random_exponential(max=10),
'stop': stop.stop_after_attempt(5),
'reraise': True,
}

def __init__(self, backend_url=None, token=None, retry_config = None):
"""
Args:
backend_url ([type], optional): Defaults to opal_client_config.SERVER_URL.
Expand All @@ -32,12 +47,28 @@ def __init__(self, backend_url=None, token=None):
self._backend_url = backend_url or opal_client_config.SERVER_URL
self._token = token or opal_client_config.CLIENT_TOKEN
self._auth_headers = tuple_to_dict(get_authorization_header(self._token))
self._retry_config = retry_config if retry_config is not None else self.DEFAULT_RETRY_CONFIG

async def fetch_policy_bundle(
self,
directories: List[str] = ['.'],
base_hash: Optional[str] = None
) -> Optional[PolicyBundle]:
attempter = retry(**self._retry_config)(self._fetch_policy_bundle)
try:
return await attempter(directories=directories, base_hash=base_hash)
except Exception as err:
logger.warning("Failed all attempts to fetch bundle, got error: {err}", err=err)
return None

async def _fetch_policy_bundle(
self,
directories: List[str] = ['.'],
base_hash: Optional[str] = None
) -> Optional[PolicyBundle]:
"""
Fetches the bundle. May throw, in which case we retry again.
"""
params = {"path": directories}
if base_hash is not None:
params["base_hash"] = base_hash
Expand All @@ -51,11 +82,14 @@ async def fetch_policy_bundle(
if response.status == status.HTTP_404_NOT_FOUND:
logger.warning("requested paths not found: {paths}", paths=directories)
return None

# may throw ValueError
await throw_if_bad_status_code(response, expected=[status.HTTP_200_OK])

# may throw Validation Error
bundle = await response.json()
return policy_bundle_or_none(bundle)
return force_valid_bundle(bundle)
except aiohttp.ClientError as e:
logger.warning("server connection error: {err}", err=e)
raise


policy_fetcher = PolicyFetcher()
97 changes: 49 additions & 48 deletions opal_client/policy/updater.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,55 +13,11 @@
)
from opal_client.logger import logger
from opal_client.config import opal_client_config
from opal_client.policy.fetcher import policy_fetcher
from opal_client.policy.fetcher import PolicyFetcher
from opal_client.policy_store.base_policy_store_client import BasePolicyStoreClient
from opal_client.policy_store.policy_store_client_factory import DEFAULT_POLICY_STORE_GETTER
from opal_client.policy.topics import default_subscribed_policy_directories


async def update_policy(policy_store: BasePolicyStoreClient, directories: List[str] = None, force_full_update=False):
"""
fetches policy (code, e.g: rego) from backend and stores it in the policy store.
Args:
policy_store (BasePolicyStoreClient, optional): Policy store client to use to store policy code.
directories (List[str], optional): specific source directories we want.
force_full_update (bool, optional): if true, ignore stored hash and fetch full policy bundle.
"""
directories = directories if directories is not None else default_subscribed_policy_directories()
if force_full_update:
logger.info("full update was forced (ignoring stored hash if exists)")
base_hash = None
else:
base_hash = await policy_store.get_policy_version()

if base_hash is None:
logger.info("Refetching policy code (full bundle)")
else:
logger.info("Refetching policy code (delta bundle), base hash: '{base_hash}'", base_hash=base_hash)
bundle: Optional[PolicyBundle] = await policy_fetcher.fetch_policy_bundle(directories, base_hash=base_hash)
if bundle:
if bundle.old_hash is None:
logger.info(
"got policy bundle, commit hash: '{commit_hash}'",
commit_hash=bundle.hash,
manifest=bundle.manifest
)
else:
deleted_files = None if bundle.deleted_files is None else bundle.deleted_files.dict()
logger.info(
"got policy bundle (delta): '{diff_against_hash}' -> '{commit_hash}'",
commit_hash=bundle.hash,
diff_against_hash=bundle.old_hash,
manifest=bundle.manifest,
deleted=deleted_files
)
# store policy bundle in OPA cache
# We wrap our interaction with the policy store with a transaction, so that
# if the write-op fails, we will mark the transaction as failed.
async with policy_store.transaction_context(bundle.hash) as store_transaction:
await store_transaction.set_policies(bundle)

class PolicyUpdater:
"""
Keeps policy-stores (e.g. OPA) up to date with relevant policy code
Expand Down Expand Up @@ -109,6 +65,8 @@ def __init__(
# The task running the Pub/Sub subcribing client
self._subscriber_task = None
self._stopping = False
# policy fetcher - fetches policy bundles
self._policy_fetcher = PolicyFetcher()

async def __aenter__(self):
await self.start()
Expand Down Expand Up @@ -136,7 +94,7 @@ async def _update_policy_callback(self, data: dict = None, topic: str = "", **kw
directories = default_subscribed_policy_directories()
logger.warning("Received policy updated (invalid topic): {topic}", topic=topic)

await update_policy(self._policy_store, directories)
await self.update_policy(directories)

async def _on_connect(self, client: PubSubClient, channel: RpcChannel):
"""
Expand All @@ -147,7 +105,7 @@ async def _on_connect(self, client: PubSubClient, channel: RpcChannel):
when the connection is lost we assume we need to start from scratch.
"""
logger.info("Connected to server")
await update_policy(policy_store=self._policy_store)
await self.update_policy()

async def _on_disconnect(self, channel: RpcChannel):
"""
Expand Down Expand Up @@ -210,4 +168,47 @@ async def _subscriber(self):
server_uri=self._server_url
)
async with self._client:
await self._client.wait_until_done()
await self._client.wait_until_done()

async def update_policy(self, directories: List[str] = None, force_full_update=False):
"""
fetches policy (code, e.g: rego) from backend and stores it in the policy store.
Args:
policy_store (BasePolicyStoreClient, optional): Policy store client to use to store policy code.
directories (List[str], optional): specific source directories we want.
force_full_update (bool, optional): if true, ignore stored hash and fetch full policy bundle.
"""
directories = directories if directories is not None else default_subscribed_policy_directories()
if force_full_update:
logger.info("full update was forced (ignoring stored hash if exists)")
base_hash = None
else:
base_hash = await self._policy_store.get_policy_version()

if base_hash is None:
logger.info("Refetching policy code (full bundle)")
else:
logger.info("Refetching policy code (delta bundle), base hash: '{base_hash}'", base_hash=base_hash)
bundle: Optional[PolicyBundle] = await self._policy_fetcher.fetch_policy_bundle(directories, base_hash=base_hash)
if bundle:
if bundle.old_hash is None:
logger.info(
"got policy bundle, commit hash: '{commit_hash}'",
commit_hash=bundle.hash,
manifest=bundle.manifest
)
else:
deleted_files = None if bundle.deleted_files is None else bundle.deleted_files.dict()
logger.info(
"got policy bundle (delta): '{diff_against_hash}' -> '{commit_hash}'",
commit_hash=bundle.hash,
diff_against_hash=bundle.old_hash,
manifest=bundle.manifest,
deleted=deleted_files
)
# store policy bundle in OPA cache
# We wrap our interaction with the policy store with a transaction, so that
# if the write-op fails, we will mark the transaction as failed.
async with self._policy_store.transaction_context(bundle.hash) as store_transaction:
await store_transaction.set_policies(bundle)
2 changes: 1 addition & 1 deletion setup/__version__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
Project homepage: https://github.com/authorizon/opal
"""

VERSION = (0, 1, 6)
VERSION = (0, 1, 7)
VERSION_STRING = '.'.join(map(str,VERSION))

__version__ = VERSION_STRING
Expand Down

0 comments on commit e281c12

Please sign in to comment.