-
Notifications
You must be signed in to change notification settings - Fork 193
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #144 from authorizon/opal_client_api
WIP: JWT authentication refactors + new client APIs required by OPToggles
- Loading branch information
Showing
28 changed files
with
659 additions
and
209 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
File renamed without changes.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,67 @@ | ||
from typing import List | ||
from fastapi import APIRouter, Depends, status, HTTPException, Response | ||
from starlette.status import HTTP_500_INTERNAL_SERVER_ERROR | ||
|
||
from opal_common.logger import logger | ||
from opal_common.schemas.security import PeerType | ||
from opal_common.schemas.data import CallbackEntry | ||
from opal_common.authentication.deps import JWTAuthenticator | ||
from opal_common.authentication.verifier import Unauthorized | ||
from opal_common.authentication.types import JWTClaims | ||
from opal_common.authentication.authz import require_peer_type | ||
|
||
from opal_client.config import opal_client_config | ||
from opal_client.callbacks.register import CallbacksRegister | ||
|
||
|
||
def init_callbacks_api(authenticator: JWTAuthenticator, register: CallbacksRegister): | ||
async def require_listener_token(claims: JWTClaims = Depends(authenticator)): | ||
try: | ||
require_peer_type(authenticator, claims, PeerType.listener) # may throw Unauthorized | ||
except Unauthorized as e: | ||
logger.error(f"Unauthorized to publish update: {repr(e)}") | ||
raise | ||
|
||
# all the methods in this router requires a valid JWT token with peer_type == listener | ||
router = APIRouter(prefix="/callbacks", dependencies=[Depends(require_listener_token)]) | ||
|
||
@router.get("", response_model=List[CallbackEntry]) | ||
async def list_callbacks(): | ||
""" | ||
list all the callbacks currently registered by OPAL client. | ||
""" | ||
return list(register.all()) | ||
|
||
@router.get("/{key}", response_model=CallbackEntry) | ||
async def get_callback_by_key(key: str): | ||
""" | ||
get a callback by its key (if such callback is indeed registered). | ||
""" | ||
callback = register.get(key) | ||
if callback is None: | ||
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="no callback found with this key") | ||
return callback | ||
|
||
@router.post("", response_model=CallbackEntry) | ||
async def register_callback(entry: CallbackEntry): | ||
""" | ||
register a new callback by OPAL client, to be called on OPA state updates. | ||
""" | ||
saved_key = register.put(url=entry.url, config=entry.config, key=entry.key) | ||
saved_entry = register.get(saved_key) | ||
if saved_entry is None: | ||
raise HTTPException(status_code=HTTP_500_INTERNAL_SERVER_ERROR, detail="could not register callback") | ||
return saved_entry | ||
|
||
@router.delete("/{key}", status_code=status.HTTP_204_NO_CONTENT) | ||
async def get_callback_by_key(key: str): | ||
""" | ||
unregisters a callback identified by its key (if such callback is indeed registered). | ||
""" | ||
callback = register.get(key) | ||
if callback is None: | ||
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="no callback found with this key") | ||
register.remove(key) | ||
return Response(status_code=status.HTTP_204_NO_CONTENT) | ||
|
||
return router |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,104 @@ | ||
import hashlib | ||
from typing import Dict, Tuple, List, Union, Optional, Generator | ||
|
||
from opal_common.logger import logger | ||
from opal_common.fetcher.providers.http_fetch_provider import HttpFetcherConfig | ||
from opal_common.schemas.data import CallbackEntry | ||
from opal_client.config import opal_client_config | ||
|
||
|
||
CallbackConfig = Tuple[str, HttpFetcherConfig] | ||
|
||
|
||
class CallbacksRegister: | ||
""" | ||
A store for callbacks to other services, invoked on OPA state changes. | ||
Every time OPAL client successfully finishes a transaction to update OPA state, | ||
all the callbacks in this register will be called. | ||
""" | ||
def __init__(self, initial_callbacks: Optional[List[Union[str, CallbackConfig]]] = None) -> None: | ||
self._callbacks: Dict[str, CallbackConfig] = {} | ||
if initial_callbacks is not None: | ||
self._load_initial_callbacks(initial_callbacks) | ||
logger.info("Callbacks register loaded") | ||
|
||
def _load_initial_callbacks(self, initial_callbacks: List[Union[str, CallbackConfig]]) -> None: | ||
normalized_callbacks = self.normalize_callbacks(initial_callbacks) | ||
for callback in normalized_callbacks: | ||
url, config = callback | ||
key = self.calc_hash(url, config) | ||
self._register(key, url, config) | ||
|
||
def normalize_callbacks(self, callbacks: List[Union[str, CallbackConfig]]) -> List[CallbackConfig]: | ||
normalized_callbacks = [] | ||
for callback in callbacks: | ||
if isinstance(callback, str): | ||
url = callback | ||
config = opal_client_config.DEFAULT_UPDATE_CALLBACK_CONFIG | ||
normalized_callbacks.append((url, config)) | ||
elif isinstance(callback, CallbackConfig): | ||
normalized_callbacks.append(callback) | ||
else: | ||
logger.warning(f"Unsupported type for callback config: {type(callback).__name__}") | ||
continue | ||
return normalized_callbacks | ||
|
||
def _register(self, key: str, url: str, config: HttpFetcherConfig): | ||
self._callbacks[key] = (url, config) | ||
|
||
def calc_hash(self, url: str, config: HttpFetcherConfig) -> str: | ||
""" | ||
gets a unique hash key from a callback url and config. | ||
""" | ||
m = hashlib.sha256() | ||
m.update(url.encode()) | ||
m.update(config.json().encode()) | ||
return m.hexdigest() | ||
|
||
def get(self, key: str) -> Optional[CallbackEntry]: | ||
""" | ||
gets a registered callback by its key, or None if no such key found in register. | ||
""" | ||
callback = self._callbacks.get(key, None) | ||
if callback is None: | ||
return None | ||
(url, config) = callback | ||
return CallbackEntry(key=key, url=url, config=config) | ||
|
||
def put(self, url: str, config: Optional[HttpFetcherConfig] = None, key: Optional[str] = None) -> str: | ||
""" | ||
puts a callback in the register. | ||
if no config is provided, the default callback config will be used. | ||
if no key is provided, the key will be calculated by hashing the url and config. | ||
""" | ||
default_config = opal_client_config.DEFAULT_UPDATE_CALLBACK_CONFIG | ||
if isinstance(default_config, dict): | ||
default_config = HttpFetcherConfig(**default_config) | ||
|
||
callback_config = config or default_config | ||
auto_key = self.calc_hash(url, callback_config) | ||
callback_key = key or auto_key | ||
# if the same callback is already registered with another key - remove that callback. | ||
# there is no point in calling the same callback twice. | ||
self.remove(auto_key) | ||
# register the callback under the intended key (auto-generated or provided) | ||
self._register(callback_key, url, callback_config) | ||
return callback_key | ||
|
||
def remove(self, key: str): | ||
""" | ||
removes a callback from the register, if exists. | ||
""" | ||
if key in self._callbacks: | ||
del self._callbacks[key] | ||
|
||
def all(self) -> Generator[CallbackEntry, None, None]: | ||
""" | ||
a generator yielding all the callback configs currently registered. | ||
Yields: | ||
the next callback config found | ||
""" | ||
for key, (url, config) in iter(self._callbacks.items()): | ||
yield CallbackEntry(key=key, url=url, config=config) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,57 @@ | ||
from opal_common.fetcher.providers.http_fetch_provider import HttpFetcherConfig | ||
import aiohttp | ||
import json | ||
|
||
from typing import List, Optional | ||
|
||
from opal_common.http import is_http_error_response | ||
from opal_common.schemas.data import DataUpdateReport | ||
from opal_common.logger import logger | ||
from opal_client.data.fetcher import DataFetcher | ||
from opal_client.callbacks.register import CallbackConfig, CallbacksRegister | ||
|
||
class CallbacksReporter: | ||
""" | ||
can send a report to callbacks registered on the callback register | ||
""" | ||
def __init__(self, register: CallbacksRegister, data_fetcher: DataFetcher) -> None: | ||
self._register = register | ||
self._fetcher = data_fetcher | ||
|
||
async def report_update_results(self, report: DataUpdateReport, extra_callbacks: Optional[List[CallbackConfig]] = None): | ||
try: | ||
# all the urls that will be eventually called by the fetcher | ||
urls = [] | ||
report_data = report.json() | ||
|
||
# first we add the callback urls from the callback register | ||
for entry in self._register.all(): | ||
config = entry.config or HttpFetcherConfig() # should not be None if we got it from the register | ||
config.data = report_data | ||
urls.append((entry.url, config)) | ||
|
||
# next we add the "one time" callbacks from extra_callbacks (i.e: callbacks sent as part of a DataUpdate message) | ||
if extra_callbacks is not None: | ||
for url, config in extra_callbacks: | ||
config.data = report_data | ||
urls.append((url, config)) | ||
|
||
logger.info("Reporting the update to requested callbacks", urls=repr(urls)) | ||
report_results = await self._fetcher.handle_urls(urls) | ||
# log reports which we failed to send | ||
for (url, config, result) in report_results: | ||
if isinstance(result, Exception): | ||
logger.error("Failed to send report to {url}", url=url, exc_info=result) | ||
if isinstance(result, aiohttp.ClientResponse) and is_http_error_response(result): # error responses | ||
try: | ||
error_content = await result.json() | ||
except json.JSONDecodeError: | ||
error_content = await result.text() | ||
logger.error( | ||
"Failed to send report to {url}, got response code {status} with error: {error}", | ||
url=url, | ||
status=result.status, | ||
error=error_content | ||
) | ||
except: | ||
logger.exception("Failed to excute report_update_results") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.