Skip to content

Commit

Permalink
Merge pull request #253 from NTIA/finalize_task_thread
Browse files Browse the repository at this point in the history
Thread task finalization and add configurable timeout to scheduler callback.
  • Loading branch information
jhazentia authored Oct 2, 2023
2 parents 3fffade + 56745ca commit 0782938
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 30 deletions.
1 change: 1 addition & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ services:
- AUTHENTICATION
- CALLBACK_AUTHENTICATION
- CALLBACK_SSL_VERIFICATION
- CALLBACK_TIMEOUT
- CLIENT_ID
- CLIENT_SECRET
- DEBUG
Expand Down
1 change: 1 addition & 0 deletions env.template
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ BASE_IMAGE=ghcr.io/ntia/scos-tekrsa/tekrsa_usb:0.2.3
# Default callback api/results
# Set to OAUTH if using OAuth Password Flow Authentication, callback url needs to be api/v2/results
CALLBACK_AUTHENTICATION=TOKEN
CALLBACK_TIMEOUT=2

CLIENT_ID=sensor01.sms.internal
CLIENT_SECRET=sensor-secret
Expand Down
68 changes: 39 additions & 29 deletions src/scheduler/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class Scheduler(threading.Thread):

def __init__(self):
threading.Thread.__init__(self)

self.task_status_lock = threading.Lock()
self.timefn = utils.timefn
self.delayfn = utils.delayfn

Expand All @@ -44,7 +44,6 @@ def __init__(self):
# Cache the currently running task state
self.entry = None # ScheduleEntry that created the current task
self.task = None # Task object describing current task
self.task_result = None # TaskResult object for current task
self.last_status = ""
self.consecutive_failures = 0

Expand Down Expand Up @@ -126,27 +125,22 @@ def _consume_task_queue(self, pending_task_queue):
entry_name = task.schedule_entry_name
self.task = task
self.entry = ScheduleEntry.objects.get(name=entry_name)
self._initialize_task_result()
task_result = self._initialize_task_result()
started = timezone.now()
status, detail = self._call_task_action()
finished = timezone.now()
self._finalize_task_result(started, finished, status, detail)
if status == "failure" and self.last_status == "failure":
self.consecutive_failures = self.consecutive_failures + 1
elif status == "failure":
self.consecutive_failures = 1
if settings.ASYNC_CALLBACK:
finalize_task_thread = threading.Thread(target=self._finalize_task_result, args=(task_result, started,finished,status,detail), daemon=True)
finalize_task_thread.start()
else:
self.consecutive_failures = 0
if self.consecutive_failures >= settings.MAX_FAILURES:
trigger_api_restart.send(sender=self.__class__)

self.last_status = status
self._finalize_task_result(task_result, started, finished, status, detail)

def _initialize_task_result(self):
def _initialize_task_result(self) -> TaskResult:
"""Initalize an 'in-progress' result so it exists when action runs."""
tid = self.task.task_id
self.task_result = TaskResult(schedule_entry=self.entry, task_id=tid)
self.task_result.save()
task_result = TaskResult(schedule_entry=self.entry, task_id=tid)
task_result.save()
return task_result

def _call_task_action(self):
entry_name = self.task.schedule_entry_name
Expand Down Expand Up @@ -175,19 +169,20 @@ def _call_task_action(self):

return status, detail[:MAX_DETAIL_LEN]

def _finalize_task_result(self, started, finished, status, detail):
tr = self.task_result
tr.started = started
tr.finished = finished
tr.duration = finished - started
tr.status = status
tr.detail = detail
def _finalize_task_result(self, task_result, started, finished, status, detail):
task_result.started = started
task_result.finished = finished
task_result.duration = finished - started
task_result.status = status
task_result.detail = detail
task_result.save()


if self.entry.callback_url:
try:
logger.info("Trying callback to URL: " + self.entry.callback_url)
context = {"request": self.entry.request}
result_json = TaskResultSerializer(tr, context=context).data
result_json = TaskResultSerializer(task_result, context=context).data
verify_ssl = settings.CALLBACK_SSL_VERIFICATION
if settings.CALLBACK_SSL_VERIFICATION:
if settings.PATH_TO_VERIFY_CERT != "":
Expand All @@ -201,8 +196,9 @@ def _finalize_task_result(self, started, finished, status, detail):
data=json.dumps(result_json),
headers=headers,
verify=verify_ssl,
timeout=settings.CALLBACK_TIMEOUT,
)
self._callback_response_handler(response, tr)
self._callback_response_handler(response, task_result)
else:
logger.info("Posting with token")
token = self.entry.owner.auth_token
Expand All @@ -212,15 +208,29 @@ def _finalize_task_result(self, started, finished, status, detail):
json=result_json,
headers=headers,
verify=verify_ssl,
timeout=settings.CALLBACK_TIMEOUT,
)
logger.info("posted")
self._callback_response_handler(response, tr)
self._callback_response_handler(response, task_result)
except Exception as err:
logger.error(str(err))
tr.status = "notification_failed"
tr.save()
task_result.status = "notification_failed"
task_result.save()
else:
tr.save()
task_result.save()

with self.task_status_lock:
if status == "failure" and self.last_status == "failure":
self.consecutive_failures = self.consecutive_failures + 1
elif status == "failure":
self.consecutive_failures = 1
else:
self.consecutive_failures = 0
if self.consecutive_failures >= settings.MAX_FAILURES:
trigger_api_restart.send(sender=self.__class__)

self.last_status = status


@staticmethod
def _callback_response_handler(resp, task_result):
Expand Down
3 changes: 3 additions & 0 deletions src/sensor/runtime_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,13 +103,15 @@
DEBUG = True
ALLOWED_HOSTS = []
ENCRYPTION_KEY = Fernet.generate_key()
ASYNC_CALLBACK = False
else:
SECURE_PROXY_SSL_HEADER = ("HTTP_X_FORWARDED_PROTO", "https")
SECRET_KEY = env.str("SECRET_KEY")
DEBUG = env.bool("DEBUG", default=False)
ALLOWED_HOSTS = env.str("DOMAINS").split() + env.str("IPS").split()
POSTGRES_PASSWORD = env("POSTGRES_PASSWORD")
ENCRYPTION_KEY = env.str("ENCRYPTION_KEY")
ASYNC_CALLBACK = env.bool("ASYNC_CALLBACK", default=True)

SESSION_COOKIE_SECURE = IN_DOCKER
CSRF_COOKIE_SECURE = IN_DOCKER
Expand Down Expand Up @@ -388,6 +390,7 @@
CALLBACK_SSL_VERIFICATION = env.bool("CALLBACK_SSL_VERIFICATION", default=True)
# OAuth Password Flow Authentication
CALLBACK_AUTHENTICATION = env("CALLBACK_AUTHENTICATION", default="")
CALLBACK_TIMEOUT = env.int("CALLBACK_TIMEOUT", default=3)
CLIENT_ID = env("CLIENT_ID", default="")
CLIENT_SECRET = env("CLIENT_SECRET", default="")
USER_NAME = CLIENT_ID
Expand Down
5 changes: 4 additions & 1 deletion src/sensor/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,13 +103,15 @@
DEBUG = True
ALLOWED_HOSTS = []
ENCRYPTION_KEY = Fernet.generate_key()
ASYNC_CALLBACK = False
else:
SECURE_PROXY_SSL_HEADER = ("HTTP_X_FORWARDED_PROTO", "https")
SECRET_KEY = env.str("SECRET_KEY")
DEBUG = env.bool("DEBUG", default=False)
ALLOWED_HOSTS = env.str("DOMAINS").split() + env.str("IPS").split()
POSTGRES_PASSWORD = env("POSTGRES_PASSWORD")
ENCRYPTION_KEY = env.str("ENCRYPTION_KEY")
ASYNC_CALLBACK = env.bool("ASYNC_CALLBACK", default=True)

SESSION_COOKIE_SECURE = IN_DOCKER
CSRF_COOKIE_SECURE = IN_DOCKER
Expand Down Expand Up @@ -389,6 +391,7 @@
CALLBACK_SSL_VERIFICATION = env.bool("CALLBACK_SSL_VERIFICATION", default=True)
# OAuth Password Flow Authentication
CALLBACK_AUTHENTICATION = env("CALLBACK_AUTHENTICATION", default="")
CALLBACK_TIMEOUT = env.int("CALLBACK_TIMEOUT", default=3)
CLIENT_ID = env("CLIENT_ID", default="")
CLIENT_SECRET = env("CLIENT_SECRET", default="")
USER_NAME = CLIENT_ID
Expand Down Expand Up @@ -423,4 +426,4 @@
)
SIGAN_POWER_CYCLE_STATES = env("SIGAN_POWER_CYCLE_STATES", default=None)
SIGAN_POWER_SWITCH = env("SIGAN_POWER_SWITCH", default=None)
MAX_FAILURES = env("MAX_FAILURES", default=2)
MAX_FAILURES = env("MAX_FAILURES", default=2)

0 comments on commit 0782938

Please sign in to comment.