Skip to content

Commit

Permalink
Merge pull request #143 from authorizon/fetching_config
Browse files Browse the repository at this point in the history
Add config to control fetching engine
  • Loading branch information
asafc authored Sep 22, 2021
2 parents 3e04a8e + 3d73484 commit 8148d6d
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 29 deletions.
15 changes: 10 additions & 5 deletions opal_client/data/fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from typing import Dict, Any, List, Tuple

from opal_client.config import opal_client_config
from opal_common.config import opal_common_config
from opal_common.utils import get_authorization_header
from opal_client.utils import tuple_to_dict
from opal_common.fetcher import FetchingEngine
Expand All @@ -24,11 +25,15 @@ def __init__(self,
default_data_url (str, optional): The URL used to fetch data if no specific url is given in a fetch request. Defaults to DEFAULT_DATA_URL.
token (str, optional): default auth token. Defaults to CLIENT_TOKEN.
"""
# defaults
# defaults
default_data_url: str = default_data_url or opal_client_config.DEFAULT_DATA_URL
token: str = token or opal_client_config.CLIENT_TOKEN
# The underlying fetching engine
self._engine = FetchingEngine()
self._engine = FetchingEngine(
worker_count=opal_common_config.FETCHING_WORKER_COUNT,
callback_timeout=opal_common_config.FETCHING_CALLBACK_TIMEOUT,
enqueue_timeout=opal_common_config.FETCHING_ENQUEUE_TIMEOUT
)
self._data_url = default_data_url
self._token = token
self._auth_headers = tuple_to_dict(get_authorization_header(token))
Expand Down Expand Up @@ -77,7 +82,7 @@ async def handle_urls(self, urls: List[Tuple[str, FetcherConfig]] = None) -> Lis
Returns:
List[Tuple[str,FetcherConfig, Any]]: urls mapped to their resulting fetched data
"""

# tasks
tasks = []
# if no url provided - default to the builtin route
Expand All @@ -90,7 +95,7 @@ async def handle_urls(self, urls: List[Tuple[str, FetcherConfig]] = None) -> Lis
results = await asyncio.gather(*tasks, return_exceptions=True)

# Map results with their matching urls and config
results_with_url_and_config = [ (url, config, result) for (url, config), result in zip(urls, results)]
results_with_url_and_config = [(url, config, result) for (url, config), result in zip(urls, results)]

# return results
return results_with_url_and_config
17 changes: 12 additions & 5 deletions opal_common/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@
from sys import prefix
from .confi import Confi, confi


class OpalCommonConfig(Confi):
ALLOWED_ORIGINS = confi.list("ALLOWED_ORIGINS", ["*"])
# Process name to show in logs - Not confi-controlable on purpose
# Process name to show in logs - Not confi-controlable on purpose
PROCESS_NAME = ""
# Logging
# - Log formatting
Expand Down Expand Up @@ -35,21 +36,27 @@ class OpalCommonConfig(Confi):
# - where to load providers from
FETCH_PROVIDER_MODULES = confi.list("FETCH_PROVIDER_MODULES", ["opal_common.fetcher.providers"])

# Fetching engine
# Max number of worker tasks handling fetch events concurrently
FETCHING_WORKER_COUNT = confi.int("FETCHING_WORKER_COUNT", 5)
# Time in seconds to wait on the queued fetch task.
FETCHING_CALLBACK_TIMEOUT = confi.int("FETCHING_CALLBACK_TIMEOUT", 10)
# Time in seconds to wait for queuing a new task (if the queue is full)
FETCHING_ENQUEUE_TIMEOUT = confi.int("FETCHING_ENQUEUE_TIMEOUT", 10)

GIT_SSH_KEY_FILE = confi.str("GIT_SSH_KEY_FILE", str(Path.home() / ".ssh/opal_repo_ssh_key"))

# Trust self signed certificates (Advanced Usage - only affects OPAL client) -----------------------------
# DO NOT change these defaults unless you absolutely know what you are doing!
# By default, OPAL client only trusts SSL certificates that are signed by a public recognized CA (certificate authority).
# However, sometimes (mostly in on-prem setups or in dev environments) users setup their own self-signed certificates.
# We allow OPAL client to trust these certificates, by changing the following config vars.
CLIENT_SELF_SIGNED_CERTIFICATES_ALLOWED=confi.bool(
CLIENT_SELF_SIGNED_CERTIFICATES_ALLOWED = confi.bool(
"CLIENT_SELF_SIGNED_CERTIFICATES_ALLOWED", False,
description="Whether or not OPAL Client will trust HTTPs connections protected by self signed certificates. DO NOT USE THIS IN PRODUCTION!")
CLIENT_SSL_CONTEXT_TRUSTED_CA_FILE=confi.str(
CLIENT_SSL_CONTEXT_TRUSTED_CA_FILE = confi.str(
"CLIENT_SSL_CONTEXT_TRUSTED_CA_FILE", None,
description="A path to your own CA public certificate file (usually a .crt or a .pem file). Certifcates signed by this issuer will be trusted by OPAL Client. DO NOT USE THIS IN PRODUCTION!")


opal_common_config = OpalCommonConfig(prefix="OPAL_")


47 changes: 28 additions & 19 deletions opal_common/fetcher/engine/fetching_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from .fetch_worker import fetch_worker
from .core_callbacks import OnFetchFailureCallback


logger = get_logger("engine")


Expand All @@ -31,18 +32,25 @@ class FetchingEngine(BaseFetchingEngine):
def gen_uid():
return uuid.uuid4().hex

def __init__(self, register_config:Dict[str, BaseFetchProvider]=None, worker_count:int=DEFAULT_WORKER_COUNT) -> None:
def __init__(self, register_config: Dict[str, BaseFetchProvider] = None,
worker_count: int = DEFAULT_WORKER_COUNT,
callback_timeout: int = DEFAULT_CALLBACK_TIMEOUT,
enqueue_timeout: int = DEFAULT_ENQUEUE_TIMEOUT
) -> None:
# The internal task queue (created at start_workers)
self._queue:asyncio.Queue = None
self._queue: asyncio.Queue = None
# Worker working the queue
self._tasks:List[asyncio.Task] = []
self._tasks: List[asyncio.Task] = []
# register of the fetch providers workers can use
self._fetcher_register = FetcherRegister(register_config)
# core event callback regsiters
self._failure_handlers:List[OnFetchFailureCallback] = []
self._failure_handlers: List[OnFetchFailureCallback] = []
# how many workers to run
self._worker_count:int = worker_count

self._worker_count: int = worker_count
# time in seconds before timeout on a fetch callback
self._callback_timeout = callback_timeout
# time in seconds before time out on adding a task to queue (when full)
self._enqueue_timeout = enqueue_timeout

def start_workers(self):
if self._queue is None:
Expand All @@ -52,7 +60,7 @@ def start_workers(self):
self.create_worker()

@property
def register(self)->FetcherRegister:
def register(self) -> FetcherRegister:
return self._fetcher_register

async def __aenter__(self):
Expand All @@ -76,24 +84,26 @@ async def terminate_workers(self):
task.cancel()
# Wait until all worker tasks are cancelled.
await asyncio.gather(*self._tasks, return_exceptions=True)
# reset queue
# reset queue
self._queue = None

async def handle_url(self, url:str, timeout: float=DEFAULT_CALLBACK_TIMEOUT, **kwargs ):
async def handle_url(self, url: str, timeout: float = None, **kwargs):
"""
Same as self.queue_url but instead of using a callback, you can wait on this coroutine for the result as a return value
Args:
url (str):
timeout (float, optional): time in seconds to wait on the queued fetch task. Defaults to DEFAULT_CALLBACK_TIMEOUT.
timeout (float, optional): time in seconds to wait on the queued fetch task. Defaults to self._callback_timeout.
kwargs: additional args passed to self.queue_url
Raises:
asyncio.TimeoutError: if the given timeout has expired
also - @see self.queue_fetch_event
"""
timeout = self._callback_timeout if timeout is None else timeout
wait_event = asyncio.Event()
data = {'result': None}
# Callback to wait and retrive data

async def waiter_callback(answer):
data['result'] = answer
# Signal callback is done
Expand All @@ -108,8 +118,7 @@ async def waiter_callback(answer):
# return saved result value from callback
return data['result']


async def queue_url(self, url: str, callback: Coroutine, config: Union[FetcherConfig,dict] = None, fetcher="HttpFetchProvider")->FetchEvent:
async def queue_url(self, url: str, callback: Coroutine, config: Union[FetcherConfig, dict] = None, fetcher="HttpFetchProvider") -> FetchEvent:
"""
Simplified default fetching handler for queuing a fetch task
Expand All @@ -134,7 +143,7 @@ async def queue_url(self, url: str, callback: Coroutine, config: Union[FetcherCo
event = FetchEvent(url=url, fetcher=fetcher, config=config)
return await self.queue_fetch_event(event, callback)

async def queue_fetch_event(self, event: FetchEvent, callback: Coroutine, enqueue_timeout=DEFAULT_ENQUEUE_TIMEOUT)->FetchEvent:
async def queue_fetch_event(self, event: FetchEvent, callback: Coroutine, enqueue_timeout=None) -> FetchEvent:
"""
Basic handler to queue a fetch event for a fetcher class.
Waits if the queue is full until enqueue_timeout seconds; if enqueue_timeout is None returns immediately or raises QueueFull
Expand All @@ -151,7 +160,8 @@ async def queue_fetch_event(self, event: FetchEvent, callback: Coroutine, enqueu
asyncio.QueueFull: if the queue is full and enqueue_timeout is set as None
asyncio.TimeoutError: if enqueue_timeout is not None, and the queue is full and hasn't cleared by the timeout time
"""
# Assign a unique identifier for the event
enqueue_timeout = enqueue_timeout if enqueue_timeout is not None else self._enqueue_timeout
# Assign a unique identifier for the event
event.id = self.gen_uid()
# add to the queue for handling
# if no timeout we return immediately or raise QueueFull
Expand All @@ -171,8 +181,7 @@ def create_worker(self) -> asyncio.Task:
self._tasks.append(task)
return task


def register_failure_handler(self, callback : OnFetchFailureCallback):
def register_failure_handler(self, callback: OnFetchFailureCallback):
"""
Register a callback to be called with exception and original event in case of failure
Expand All @@ -181,15 +190,15 @@ def register_failure_handler(self, callback : OnFetchFailureCallback):
"""
self._failure_handlers.append(callback)

async def _management_event_handler(self, handlers:List[Coroutine], *args, **kwargs):
async def _management_event_handler(self, handlers: List[Coroutine], *args, **kwargs):
"""
Generic management event subscriber caller
Args:
handlers (List[Coroutine]): callback coroutines
"""
await asyncio.gather(*(callback(*args, **kwargs) for callback in handlers))
await asyncio.gather(*(callback(*args, **kwargs) for callback in handlers))

async def _on_failure(self, error:Exception, event: FetchEvent):
async def _on_failure(self, error: Exception, event: FetchEvent):
"""
Call event failure subscribers
Expand Down

0 comments on commit 8148d6d

Please sign in to comment.