Skip to content

Commit

Permalink
Merge pull request #184 from spotify/lynn/add-dft-metrics-pt1
Browse files Browse the repository at this point in the history
[lib, docs] Add default metrics for batch IO transforms, and retry & timeout decorator
  • Loading branch information
econchick authored Mar 30, 2021
2 parents b08c1fd + d82945e commit 333646c
Show file tree
Hide file tree
Showing 17 changed files with 749 additions and 109 deletions.
1 change: 1 addition & 0 deletions docs/src/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ def find_version(*file_paths):
linkcheck_anchors_ignore = [
"changelog-format",
"update-changelog",
"matplotlib.figure.Figure",
]

# -- Options for HTML output -------------------------------------------------
Expand Down
1 change: 1 addition & 0 deletions docs/src/reference/lib/api/transforms/helpers.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ Helpers

.. currentmodule:: klio.transforms.helpers

.. autoclass:: KlioMessageCounter()
.. autoclass:: KlioGcsCheckInputExists()
.. autoclass:: KlioGcsCheckOutputExists()
.. autoclass:: KlioFilterPing()
Expand Down
1 change: 1 addition & 0 deletions docs/src/reference/lib/api/transforms/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
.. autosummary::
:nosignatures:

KlioMessageCounter
KlioGcsCheckInputExists
KlioGcsCheckOutputExists
KlioFilterPing
Expand Down
2 changes: 1 addition & 1 deletion exec/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ def get_long_description(package_dir):
"pytest-mock",
],
"debug": [
"line_profiler", # wall time profiling
"line_profiler<3.2", # wall time profiling
"matplotlib", # needed for plotting mem/CPU usage
"numpy", # needed for plotting
"memory_profiler",
Expand Down
2 changes: 1 addition & 1 deletion integration/audio-spectrograms/expected_job_output.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
INFO:root:Found worker image: integration-klio-audio:audio-spectrograms
INFO:matplotlib.font_manager:Generating new fontManager, this may take some time...
INFO:matplotlib.font_manager:generated new fontManager
DEBUG:klio:Loading config file from /usr/local/klio-job-run-effective.yaml.
DEBUG:klio:KlioMessage full audit log - Entity ID: - Path: fluffy-zelda-glitch-toki-kobe::klio-audio (current job)
DEBUG:klio:Process 'battleclip_daq': Ping mode OFF.
Expand Down
1 change: 1 addition & 0 deletions integration/audio-spectrograms/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
"apache_beam.internal.gcp.auth",
"oauth2client.transport",
"oauth2client.client",
"klio.metrics",
# The concurrency logs may be different for every machine, so let's
# just turn them off
"klio.concurrency",
Expand Down
36 changes: 34 additions & 2 deletions lib/src/klio/transforms/_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,15 @@ class _KlioInputDataMixin(object):

DIRECTION_PFX = KlioIODirection.INPUT

def setup(self, *args, **kwargs):
super(_KlioInputDataMixin, self).setup(*args, **kwargs)
self.found_ctr = self._klio.metrics.counter(
"kmsg-data-found-input", transform=self._transform_name
)
self.not_found_ctr = self._klio.metrics.counter(
"kmsg-data-not-found-input", transform=self._transform_name
)

@property
def _data_config(self):
# TODO: figure out how to support multiple inputs
Expand Down Expand Up @@ -198,6 +207,11 @@ def _data_config(self):
)
return self._klio.config.job_config.data.inputs[0]

@property
def _transform_name(self):
# grab the child class name that inherits this class, if any
return self.__class__.__name__


class _KlioOutputDataMixin(object):
"""Mixin to add output-specific logic for a data existence check.
Expand All @@ -207,6 +221,15 @@ class _KlioOutputDataMixin(object):

DIRECTION_PFX = KlioIODirection.OUTPUT

def setup(self, *args, **kwargs):
super(_KlioOutputDataMixin, self).setup(*args, **kwargs)
self.found_ctr = self._klio.metrics.counter(
"kmsg-data-found-output", transform=self._transform_name
)
self.not_found_ctr = self._klio.metrics.counter(
"kmsg-data-not-found-output", transform=self._transform_name
)

@property
def _data_config(self):
# TODO: figure out how to support multiple outputs
Expand Down Expand Up @@ -235,6 +258,11 @@ def _data_config(self):
)
return self._klio.config.job_config.data.outputs[0]

@property
def _transform_name(self):
# grab the child class name that inherits this class, if any
return self.__class__.__name__


class _KlioGcsDataExistsMixin(object):
"""Mixin for GCS-specific data existence check logic.
Expand All @@ -243,7 +271,8 @@ class _KlioGcsDataExistsMixin(object):
_KlioInputDataMixin or _KlioOutputDataMixin
"""

def setup(self):
def setup(self, *args, **kwargs):
super(_KlioGcsDataExistsMixin, self).setup(*args, **kwargs)
self.client = gcsio.GcsIO()

def exists(self, path):
Expand All @@ -260,9 +289,12 @@ def process(self, kmsg):
item_path = self._get_absolute_path(item)
item_exists = self.exists(item_path)

state = DataExistState.FOUND
if not item_exists:
self.not_found_ctr.inc()
state = DataExistState.NOT_FOUND
else:
self.found_ctr.inc()
state = DataExistState.FOUND

self._klio.logger.info(
"%s %s at %s"
Expand Down
9 changes: 9 additions & 0 deletions lib/src/klio/transforms/_retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ def __init__(
self,
function,
tries,
klio_context,
delay=None,
exception=None,
raise_exception=None,
Expand All @@ -54,6 +55,12 @@ def __init__(
self._retry_exception = raise_exception or KlioRetriesExhausted
self._exception_message = exception_message
self._logger = logging.getLogger("klio")
self._retry_ctr = klio_context.metrics.counter(
"kmsg-retry-attempt", transform=self._func_name
)
self._retry_error_ctr = klio_context.metrics.counter(
"kmsg-drop-retry-error", transform=self._func_name
)

def __call__(self, *args, **kwargs):
tries = self._tries
Expand All @@ -68,9 +75,11 @@ def __call__(self, *args, **kwargs):
except self._exception as e:
tries -= 1
if not tries:
self._retry_error_ctr.inc()
self._raise_exception(e)
break

self._retry_ctr.inc()
msg = self._format_log_message(tries, e)
self._logger.warning(msg)

Expand Down
11 changes: 10 additions & 1 deletion lib/src/klio/transforms/_timeout.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,21 @@ class KlioTimeoutWrapper(object):
DEFAULT_EXC_MSG = "Function '{}' timed out after {} seconds."

def __init__(
self, function, seconds, timeout_exception=None, exception_message=None
self,
function,
seconds,
klio_context,
timeout_exception=None,
exception_message=None,
):
self._function = function
self._func_name = getattr(function, "__qualname__", function.__name__)
self._seconds = seconds
self._timeout_exception = timeout_exception or KlioTimeoutError
self._exception_message = exception_message
self._timeout_ctr = klio_context.metrics.counter(
"klio-drop-timed-out", transform=self._func_name
)

def __call__(self, *args, **kwargs):
self._queue = multiprocessing.Queue(maxsize=1)
Expand Down Expand Up @@ -123,6 +131,7 @@ def cancel(self):
def ready(self):
"""Manage the status of "value" property."""
if self._timeout < time.monotonic():
self._timeout_ctr.inc()
self.cancel()
return self._queue.full() and not self._queue.empty()

Expand Down
32 changes: 18 additions & 14 deletions lib/src/klio/transforms/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -456,12 +456,14 @@ def _timeout(seconds=None, exception=None, exception_message=None):
)

def inner(func_or_meth):
timeout_wrapper = ktimeout.KlioTimeoutWrapper(
function=func_or_meth,
seconds=seconds,
timeout_exception=exception,
exception_message=exception_message,
)
with _klio_context() as kctx:
timeout_wrapper = ktimeout.KlioTimeoutWrapper(
function=func_or_meth,
seconds=seconds,
timeout_exception=exception,
exception_message=exception_message,
klio_context=kctx,
)

# Unfortunately these two wrappers can't be abstracted into
# one wrapper - the `self` arg apparently can not be abstracted
Expand Down Expand Up @@ -528,14 +530,16 @@ def _retry(
)

def inner(func_or_meth):
retry_wrapper = kretry.KlioRetryWrapper(
function=func_or_meth,
tries=tries,
delay=delay,
exception=exception,
raise_exception=raise_exception,
exception_message=exception_message,
)
with _klio_context() as kctx:
retry_wrapper = kretry.KlioRetryWrapper(
function=func_or_meth,
tries=tries,
delay=delay,
exception=exception,
raise_exception=raise_exception,
exception_message=exception_message,
klio_context=kctx,
)

# Unfortunately these two wrappers can't be abstracted into
# one wrapper - the `self` arg apparently can not be abstracted
Expand Down
Loading

0 comments on commit 333646c

Please sign in to comment.