Skip to content

Commit

Permalink
Yanc noop on mdata update (#37)
Browse files Browse the repository at this point in the history
* add noop on metadata update
  • Loading branch information
Charley Yan authored Jun 21, 2019
1 parent bec2d9b commit 1602806
Show file tree
Hide file tree
Showing 4 changed files with 212 additions and 31 deletions.
75 changes: 53 additions & 22 deletions falcon/igniter.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from falcon import settings

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger('falcon.{module_path}'.format(module_path=__name__))
logger = logging.getLogger(f'falcon.{__name__}')


class Igniter(object):
Expand Down Expand Up @@ -47,57 +47,88 @@ def join(self):

def execution_loop(self, handler):
logger.info(
'Igniter | Initializing an igniter with thread ID => {0} | {1}'.format(
get_ident(), datetime.now()
)
f'Igniter | Initializing an igniter with thread ID => {get_ident()} | {datetime.now()}'
)
while True:
self.execution_event(handler)

def execution_event(self, handler):
logger.info(
'Igniter | Igniter thread {0} is warmed up and running. | {1}'.format(
get_ident(), datetime.now()
)
f'Igniter | Igniter thread {get_ident()} is warmed up and running. | {datetime.now()}'
)
try:
workflow = handler.workflow_queue.get(block=False)
self.release_workflow(workflow)
if 'force' not in workflow.labels.keys() and self.workflow_is_duplicate(
workflow
):
logger.info(
'Igniter | Found existing workflow with the same hash-id; '
f'aborting workflow {workflow} | {datetime.now()}'
)
self.abort_workflow(workflow)
else:
self.release_workflow(workflow)
except queue.Empty:
logger.info(
'Igniter | The in-memory queue is empty, go back to sleep and wait for the handler to retrieve '
'workflows. | {0}'.format(datetime.now())
f'workflows. | {datetime.now()}'
)
except (
requests.exceptions.ConnectionError,
requests.exceptions.RequestException,
) as error:
logger.error(
f'Igniter | Failed to query cromwell for existing workflows {error} | {datetime.now()}'
)
finally:
self.sleep_for(self.workflow_start_interval)

def release_workflow(self, workflow):
def simple_cromwell_workflow_action(
self, do_thing, workflow, failure_message, success_message
):
try:
response = CromwellAPI.release_hold(
uuid=workflow.id, auth=self.cromwell_auth
)
response = do_thing(uuid=workflow.id, auth=self.cromwell_auth)
if response.status_code != 200:
logger.warning(
'Igniter | Failed to release a workflow {0} | {1} | {2}'.format(
workflow, response.text, datetime.now()
)
f'Igniter | {failure_message} {workflow} | {response.text} | {datetime.now()}'
)
else:
logger.info(
'Igniter | Released a workflow {0} | {1}'.format(
workflow, datetime.now()
)
f'Igniter | {success_message} {workflow} | {datetime.now()}'
)
except (
requests.exceptions.ConnectionError,
requests.exceptions.RequestException,
) as error:
logger.error(
'Igniter | Failed to release a workflow {0}| {1} | {2}'.format(
workflow, error, datetime.now()
)
f'Igniter | {failure_message} {workflow} | {error} | {datetime.now()}'
)

def release_workflow(self, workflow):
self.simple_cromwell_workflow_action(
do_thing=CromwellAPI.release_hold,
workflow=workflow,
failure_message='Failed to release a workflow',
success_message='Released a workflow',
)

def abort_workflow(self, workflow):
self.simple_cromwell_workflow_action(
do_thing=CromwellAPI.abort,
workflow=workflow,
failure_message='Failed to abort a workflow',
success_message='Aborted a workflow',
)

def workflow_is_duplicate(self, workflow):
hash_id = workflow.labels.get('hash-id')
query_dict = {'label': f'hash-id:{hash_id}'}
response = CromwellAPI.query(
query_dict, self.cromwell_auth, raise_for_status=True
)
results = response.json()['results']
return any([result['id'] != workflow.id for result in results])

@staticmethod
def sleep_for(sleep_time):
time.sleep(sleep_time)
16 changes: 11 additions & 5 deletions falcon/queue_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from datetime import datetime
from queue import Queue
from threading import Thread, get_ident
from copy import deepcopy

import requests
from cromwell_tools.cromwell_api import CromwellAPI
Expand All @@ -19,10 +20,15 @@ class Workflow(object):
Besides the features for de-duplication, this class also utilizes a smaller size of chunk in memory.
"""

def __init__(self, workflow_id, bundle_uuid=None, bundle_version=None):
def __init__(self, workflow_id, bundle_uuid=None, bundle_version=None, labels=None):

self.id = workflow_id
self.bundle_uuid = bundle_uuid
self.bundle_version = bundle_version
if labels is None:
self.labels = {}
else:
self.labels = deepcopy(labels)

def __str__(self):
return str(self.id)
Expand Down Expand Up @@ -287,9 +293,7 @@ def _assemble_workflow(workflow_meta):
Workflow: A concrete `Workflow` instance that has necessary properties.
"""
workflow_id = workflow_meta.get('id')
workflow_labels = workflow_meta.get(
'labels'
) # TODO: Integrate this field into Workflow class
workflow_labels = workflow_meta.get('labels')
workflow_bundle_uuid = (
workflow_labels.get('bundle-uuid')
if isinstance(workflow_labels, dict)
Expand All @@ -300,7 +304,9 @@ def _assemble_workflow(workflow_meta):
if isinstance(workflow_labels, dict)
else None
)
workflow = Workflow(workflow_id, workflow_bundle_uuid, workflow_bundle_version)
workflow = Workflow(
workflow_id, workflow_bundle_uuid, workflow_bundle_version, workflow_labels
)
return workflow

@staticmethod
Expand Down
13 changes: 12 additions & 1 deletion falcon/test/cromwell_simulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def release_workflow_raises_RequestException(uuid, auth):
raise requests.exceptions.RequestException


def query_workflows_succeed(query_dict, auth):
def query_workflows_succeed(query_dict, auth, raise_for_status=False):
response = Mock(spec=Response)
response.status_code = 200
response.json.return_value = {
Expand All @@ -35,6 +35,17 @@ def query_workflows_succeed(query_dict, auth):
return response


def query_workflows_return_fake_workflow(query_dict, auth, raise_for_status=False):
response = Mock(spec=Response)
response.status_code = 200
response.json.return_value = {
'results': [
{'id': 'fake_workflow_id', 'submission': '2018-05-25T19:03:51.736Z'}
]
}
return response


def query_workflows_fail_with_400(query_dict, auth):
response = Mock(spec=Response)
response.status_code = 400
Expand Down
139 changes: 136 additions & 3 deletions falcon/test/test_igniters.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging
import os
import timeit
from requests.exceptions import ConnectionError, HTTPError
from queue import Queue
from unittest import mock
from unittest.mock import patch
Expand Down Expand Up @@ -221,15 +222,21 @@ def test_release_workflow_handles_requests_exception(self, caplog):

assert 'Failed to release a workflow fake_workflow_id' in error

def setup_queue_handler(self, workflow=None):
mock_queue = Queue(maxsize=1)
if workflow is not None:
mock_queue.get = mock.Mock(return_value=workflow)
mock_handler = mock.MagicMock(spec=queue_handler.QueueHandler)
mock_handler.workflow_queue = mock_queue
return mock_handler

def test_execution_event_sleeps_properly_for_empty_queue(self, caplog):
"""
This function asserts the `igniter.execution_event()` goes back to sleep when there is no available entry in the
queue to be processed.
"""
caplog.set_level(logging.INFO)
mock_queue = Queue(maxsize=1)
mock_handler = mock.MagicMock(spec=queue_handler.QueueHandler)
mock_handler.workflow_queue = mock_queue
mock_handler = self.setup_queue_handler()
assert mock_handler.workflow_queue.empty() is True

test_igniter = igniter.Igniter(self.config_path)
Expand All @@ -251,3 +258,129 @@ def test_execution_event_sleeps_properly_for_empty_queue(self, caplog):
<= elapsed
<= test_igniter.workflow_start_interval * 1.5
)

def execution_event_with_mocks(
self,
workflow,
release_calls,
abort_calls,
is_dupe_return=None,
is_dupe_effect=None,
):
mock_handler = self.setup_queue_handler(workflow=workflow)

test_igniter = igniter.Igniter(self.config_path)
test_igniter.workflow_is_duplicate = mock.Mock(
return_value=is_dupe_return, side_effect=is_dupe_effect
)
test_igniter.release_workflow = mock.Mock()
test_igniter.abort_workflow = mock.Mock()

test_igniter.execution_event(mock_handler)
assert test_igniter.release_workflow.call_count is release_calls
assert test_igniter.abort_workflow.call_count is abort_calls

def test_execution_event_aborts_duplicate_workflow(self, caplog):
"""
This function asserts the `igniter.execution_event()` aborts a workflow if there
are existing workflows in cromwell with the same hash-id (regardless of status).
"""
caplog.set_level(logging.INFO)
self.execution_event_with_mocks(
workflow=queue_handler.Workflow('fake_workflow_id'),
release_calls=0,
abort_calls=1,
is_dupe_return=True,
)

def test_execution_event_releases_duplicate_workflow_with_force(self, caplog):
"""
This function asserts the `igniter.execution_event()` releases a workflow if it contains
the label 'force' even if there are existing workflows in cromwell with the same
key-data hash.
"""
caplog.set_level(logging.INFO)
self.execution_event_with_mocks(
workflow=queue_handler.Workflow('fake_workflow_id', labels={'force': None}),
release_calls=1,
abort_calls=0,
is_dupe_return=True,
)

def test_execution_event_releases_non_duplicate_workflow(self, caplog):
"""
This function asserts the `igniter.execution_event()` releases a workflow if there
are no existing workflows in cromwell with the same hash-id.
"""
caplog.set_level(logging.INFO)
self.execution_event_with_mocks(
workflow=queue_handler.Workflow('fake_workflow_id'),
release_calls=1,
abort_calls=0,
is_dupe_return=False,
)

def test_execution_event_does_nothing_on_query_failure(self, caplog):
"""
This function asserts the `igniter.execution_event()` goes back to sleep when it fails when
checking if there are existing workflows in cromwell with the same hash-id.
"""
caplog.set_level(logging.INFO)
self.execution_event_with_mocks(
workflow=queue_handler.Workflow('fake_workflow_id'),
release_calls=0,
abort_calls=0,
is_dupe_effect=mock.Mock(side_effect=ConnectionError()),
)

def test_execution_event_does_nothing_when_query_status_not_200(self, caplog):
"""
This function assers the `igniter.execution_event()` goes back to sleep when it
receives a non 200 response when checking if there are existing workflows in cromwell
with the same hash-id
"""
caplog.set_level(logging.INFO)
self.execution_event_with_mocks(
workflow=queue_handler.Workflow('fake_workflow_id'),
release_calls=0,
abort_calls=0,
is_dupe_effect=mock.Mock(side_effect=HTTPError()),
)

@patch(
'falcon.igniter.CromwellAPI.query',
cromwell_simulator.query_workflows_succeed,
create=True,
)
def test_workflow_is_duplicate_returns_true_when_it_finds_workflow_with_same_hash_id(
self, caplog
):
caplog.set_level(logging.INFO)
test_igniter = igniter.Igniter(self.config_path)
assert (
test_igniter.workflow_is_duplicate(
workflow=queue_handler.Workflow(
'fake_workflow_id', labels={'hash-id': ''}
)
)
is True
)

@patch(
'falcon.igniter.CromwellAPI.query',
cromwell_simulator.query_workflows_return_fake_workflow,
create=True,
)
def test_workflow_is_duplicate_returns_false_when_it_only_finds_input_workflow(
self, caplog
):
caplog.set_level(logging.INFO)
test_igniter = igniter.Igniter(self.config_path)
assert (
test_igniter.workflow_is_duplicate(
workflow=queue_handler.Workflow(
'fake_workflow_id', labels={'hash-id': ''}
)
)
is False
)

0 comments on commit 1602806

Please sign in to comment.