diff --git a/src/daisy/submit_core.py b/src/daisy/submit_core.py index 87d7932..c208dd7 100644 --- a/src/daisy/submit_core.py +++ b/src/daisy/submit_core.py @@ -19,9 +19,7 @@ import logging import random import socket -from datetime import datetime, timezone -import amqp from cassandra.cqlengine.query import DoesNotExist # from daisy import config @@ -79,34 +77,6 @@ def write_to_swift(fileobj: bytes, oops_id: str): return True -def write_to_amqp(message, arch): - queue = "retrace_%s" % arch - channel = amqp_utils.get_connection().channel() - if not channel: - return False - try: - channel.queue_declare(queue=queue, durable=True, auto_delete=False) - # We'll use this timestamp to measure how long it takes to process a - # retrace, from receiving the core file to writing the data back to - # Cassandra. - body = amqp.Message(message, timestamp=int(datetime.now(timezone.utc).timestamp())) - # Persistent - body.properties["delivery_mode"] = 2 - channel.basic_publish(body, exchange="", routing_key=queue) - msg = "%s added to %s queue" % (message.split(":")[0], queue) - logger.info(msg) - queued = True - except amqp_utils.amqplib_error_types as e: - if amqp_utils.is_amqplib_connection_error(e): - # Could not connect / interrupted connection - queued = False - # Unknown error mode : don't hide it. - raise - finally: - channel.close() - return queued - - def submit_core(request, oopsid, arch, system_token): try: # every OOPS will have a SystemIdentifier @@ -137,7 +107,7 @@ def submit_core(request, oopsid, arch, system_token): # same SAS. return msg, 500 - queued = write_to_amqp(f"{oopsid}:swift", arch) + queued = amqp_utils.enqueue(f"{oopsid}:swift", "retrace_%s" % arch) if not queued: # If not written to amqp then write to log file msg = "Failure to write to amqp retrace queue %s %s" % (arch, message) diff --git a/src/errortracker/amqp_utils.py b/src/errortracker/amqp_utils.py index 193e78f..0507f57 100644 --- a/src/errortracker/amqp_utils.py +++ b/src/errortracker/amqp_utils.py @@ -1,10 +1,13 @@ import socket +from datetime import datetime, timezone import amqp from amqp import ConnectionError as AMQPConnectionException from errortracker import config +logger = config.logger + # From oops-amqp # These exception types always indicate an AMQP connection error/closure. # However you should catch amqplib_error_types and post-filter with @@ -61,3 +64,30 @@ def get_connection(): return None # Unknown error mode : don't hide it. raise + + +def enqueue(message, queue): + channel = get_connection().channel() + if not channel: + return False + try: + channel.queue_declare(queue=queue, durable=True, auto_delete=False) + # We'll use this timestamp to measure how long it takes to process a + # retrace, from receiving the core file to writing the data back to + # Cassandra. + body = amqp.Message(message, timestamp=int(datetime.now(timezone.utc).timestamp())) + # Persistent + body.properties["delivery_mode"] = 2 + channel.basic_publish(body, exchange="", routing_key=queue) + msg = "%s added to %s queue" % (message.split(":")[0], queue) + logger.info(msg) + queued = True + except amqplib_error_types as e: + if is_amqplib_connection_error(e): + # Could not connect / interrupted connection + queued = False + # Unknown error mode : don't hide it. + raise + finally: + channel.close() + return queued diff --git a/src/retracer.py b/src/retracer.py index e6e2634..60d99aa 100755 --- a/src/retracer.py +++ b/src/retracer.py @@ -306,47 +306,10 @@ def setup_cache(self, sandbox_dir, release): self._sandboxes[release] = (sandbox, cache) return self._sandboxes[release] - def move_to_failed_queue(self, msg): - if self.failed: - # It failed its 2nd retrace attempt, admit defeat and don't try - # again. - self.processed(msg) - return - - # We've processed this. Delete it off the MQ. - log("ack'ing message from main queue") - msg.channel.basic_ack(msg.delivery_tag) - # We don't call self.processed here because that would remove the core - # file from the storage provider, and we want to retain it. - - # Add it to the failed to retrace queue. - queue = "failed_retrace_%s" % self.architecture - msg.channel.queue_declare(queue=queue, durable=True, auto_delete=False) - body = amqp.Message(msg.body) - # Persistent - body.properties["delivery_mode"] = 2 - msg.channel.basic_publish(body, exchange="", routing_key=queue) - log("pushed message to failed queue") - def failed_to_process(self, msg, oops_id, old=False): # Try to remove the core file from the storage provider - oops_id, provider = self.msg_body.split(":", 1) - removed = self.remove(oops_id) - if removed: - # We've processed this. Delete it off the MQ. - log("ack'ing message from queue (failed)") - msg.channel.basic_ack(msg.delivery_tag) - self.update_time_to_retrace(msg) - # Removing the core file failed in the processing phase, so requeue - # the crash unless it is an old OOPS then don't requeue it. - elif not removed and not old: - log("Requeued failed to process OOPS (%s)" % oops_id) - self.requeue(msg, oops_id) - # It is old so we should just ack the request to retrace it. - elif not removed and old: - log("Ack'ing message about old missing core.") - msg.channel.basic_ack(msg.delivery_tag) - metrics.meter("retrace.failure.old_missing_core") + self.remove(oops_id) + self.update_time_to_retrace(msg) # Also remove it from the retracing index, if we haven't already. try: addr_sig = cassandra_schema.OOPS.objects.get( @@ -419,6 +382,12 @@ def callback(self, msg): metrics.meter("could_not_find_oops") return + + # ack the message very early, to prevent them from staying forever in + # the queue in case the retracer gets OOM-killed + log("ack'ing message from queue") + msg.channel.basic_ack(msg.delivery_tag) + # There are some items still in amqp queue that have already been # retraced, check for this and ack the message. # N.B.: This only works in some cases because we don't mark a report as @@ -426,11 +395,9 @@ def callback(self, msg): if "RetraceFailureReason" in list(col.keys()) or "RetraceOutdatedPackages" in list( col.keys() ): - log("Ack'ing already retraced OOPS.") - msg.channel.basic_ack(msg.delivery_tag) - # 2016-05-19 - this failed to delete cores and ack'ing of msgs - # Call processed so that we also try to remove the core file - # self.processed(msg) + log("already retraced OOPS.") + # also try to remove the core file + self.remove(oops_id) return # Check to see if there is an UnreportableReason so we can log more @@ -460,7 +427,8 @@ def callback(self, msg): except Exception as e: log("Failed to decompress core: %s" % str(e)) # We couldn't decompress this, so there's no value in trying again. - self.processed(msg) + self.remove(oops_id) + self.update_time_to_retrace(msg) # probably incomplete cores from armhf? metrics.meter("retrace.failed") metrics.meter("retrace.failed.%s" % self.architecture) @@ -475,7 +443,8 @@ def callback(self, msg): (out, err) = proc.communicate() if "is truncated: expected core file size" in err or "not a core dump" in err: # Not a core file, there's no value in trying again. - self.processed(msg) + self.remove(oops_id) + self.update_time_to_retrace(msg) log("Not a core dump per gdb.") if unreportable_reason: log("UnreportableReason is: %s" % unreportable_reason) @@ -529,7 +498,8 @@ def callback(self, msg): if invalid: metrics.meter("retrace.failed.invalid") if not release or invalid or not retraceable: - self.processed(msg) + self.remove(oops_id) + self.update_time_to_retrace(msg) rm_eff(work_path) return @@ -704,14 +674,17 @@ def callback(self, msg): # return m = "Retrace failed (%i), %s" action = "leaving as failed." - if give_up: + # It failed its 2nd retrace attempt, admit defeat and don't try again. + if give_up or self.failed: cassandra_schema.OOPS.objects.create( key=oops_id.encode(), column1="RetraceStatus", value="Failure" ) # we don't want to see this OOPS again so process it - self.processed(msg) + self.remove(oops_id) + self.update_time_to_retrace(msg) else: - self.move_to_failed_queue(msg) + amqp_utils.enqueue(msg.body, "failed_retrace_%s" % self.architecture) + log("pushed message to failed queue") if not self.failed: action = "moving to failed queue." log(m % (proc.returncode, action)) @@ -1109,7 +1082,8 @@ def callback(self, msg): rm_eff(work_path) log("Done processing %s" % work_path) - self.processed(msg) + self.remove(oops_id) + self.update_time_to_retrace(msg) self._processing_callback = False # If stop now has been set then we should stop working. if self._stop_now: @@ -1119,23 +1093,7 @@ def callback(self, msg): self.channel.close() sys.exit() - def processed(self, msg): - oops_id, provider = self.msg_body.split(":", 1) - removed = self.remove(oops_id) - if removed: - # We've processed this. Delete it off the MQ. - log("ack'ing message from queue") - msg.channel.basic_ack(msg.delivery_tag) - self.update_time_to_retrace(msg) - return True - return False - def requeue(self, msg, oops_id): - # RabbitMQ versions from 2.7.0 push basic_reject'ed messages - # back onto the front of the queue: - # http://www.rabbitmq.com/semantics.html - # Build a new message from the old one, publish the new and bin - # the old. ts = msg.properties.get("timestamp") # If we are still unable to find the OOPS after 8 days then # just process it as a failure. @@ -1158,7 +1116,6 @@ def requeue(self, msg, oops_id): body = amqp.Message(self.msg_body, timestamp=ts) body.properties["delivery_mode"] = 2 msg.channel.basic_publish(body, exchange="", routing_key=key) - msg.channel.basic_reject(msg.delivery_tag, False) def update_time_to_retrace(self, msg): """Record how long it took to retrace this crash, from the time we got