Skip to content

Commit

Permalink
make pipeline index stable (#562)
Browse files Browse the repository at this point in the history
  • Loading branch information
ekneg54 authored Apr 12, 2024
1 parent 0008958 commit 810e933
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 18 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

### Bugfix

* fixes a bug where the pipeline index increases on every restart of a failed pipeline

## 11.0.0
### Breaking
Expand Down
25 changes: 16 additions & 9 deletions logprep/framework/pipeline_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,19 +105,26 @@ def _decrease_to_count(self, count: int):

def restart_failed_pipeline(self):
"""Remove one pipeline at a time."""
failed_pipelines = [pipeline for pipeline in self._pipelines if not pipeline.is_alive()]
for failed_pipeline in failed_pipelines:
self._pipelines.remove(failed_pipeline)
failed_pipelines = [
(index, pipeline)
for index, pipeline in enumerate(self._pipelines)
if not pipeline.is_alive()
]

if not failed_pipelines:
return

for index, failed_pipeline in failed_pipelines:
pipeline_index = index + 1
self._pipelines.pop(index)
self.metrics.number_of_failed_pipelines += 1
if self.prometheus_exporter:
self.prometheus_exporter.mark_process_dead(failed_pipeline.pid)

if failed_pipelines:
self.set_count(self._configuration.process_count)
exit_codes = [pipeline.exitcode for pipeline in failed_pipelines]
self._pipelines.insert(index, self._create_pipeline(pipeline_index))
exit_code = failed_pipeline.exitcode
self._logger.warning(
f"Restarted {len(failed_pipelines)} failed pipeline(s), "
f"with exit code(s): {exit_codes}"
f"Restarting failed pipeline on index {pipeline_index} "
f"with exit code: {exit_code}"
)

def stop(self):
Expand Down
40 changes: 31 additions & 9 deletions tests/unit/framework/test_pipeline_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ def test_remove_failed_pipelines_logs_warning_for_removed_failed_pipelines(self,
ok_pipeline.is_alive = mock.MagicMock(return_value=True)
self.manager._pipelines = [failed_pipeline, ok_pipeline]
self.manager.restart_failed_pipeline()
logger_mock.assert_called_with("Restarted 1 failed pipeline(s), with exit code(s): [-1]")
logger_mock.assert_called_with("Restarting failed pipeline on index 1 with exit code: -1")

def test_stop_terminates_processes_created(self):
self.manager.set_count(3)
Expand All @@ -111,9 +111,10 @@ def test_restart_failed_pipelines_calls_prometheus_cleanup_method(self, tmpdir):
failed_pipeline.is_alive = mock.MagicMock()
failed_pipeline.is_alive.return_value = False
failed_pipeline.pid = 42
self.config.metrics = {"enabled": True, "port": 1234}
self.config.process_count = 2
manager = PipelineManager(self.config)
config = deepcopy(self.config)
config.metrics = {"enabled": True, "port": 1234}
config.process_count = 2
manager = PipelineManager(config)
prometheus_exporter_mock = mock.MagicMock()
manager.prometheus_exporter = prometheus_exporter_mock
manager._pipelines = [failed_pipeline]
Expand All @@ -134,16 +135,17 @@ def test_stop_calls_prometheus_cleanup_method(self, tmpdir):
with mock.patch("os.environ", new={"PROMETHEUS_MULTIPROC_DIR": str(tmpdir)}):
config = deepcopy(self.config)
config.metrics = {"enabled": True, "port": 1234}
self.config.process_count = 2
config.process_count = 2
manager = PipelineManager(config)
prometheus_exporter_mock = mock.MagicMock()
manager.prometheus_exporter = prometheus_exporter_mock
manager.stop()
prometheus_exporter_mock.cleanup_prometheus_multiprocess_dir.assert_called()

def test_prometheus_exporter_is_instanciated_if_metrics_enabled(self):
self.config.metrics = MetricsConfig(enabled=True, port=8000)
manager = PipelineManager(self.config)
config = deepcopy(self.config)
config.metrics = MetricsConfig(enabled=True, port=8000)
manager = PipelineManager(config)
assert isinstance(manager.prometheus_exporter, PrometheusExporter)

def test_stop_stops_queue_listener(self):
Expand Down Expand Up @@ -175,9 +177,29 @@ def test_restart_calls_set_count(self):
assert mock_set_count.call_count == 2

def test_restart_calls_prometheus_exporter_run(self):
self.config.metrics = MetricsConfig(enabled=True, port=666)
pipeline_manager = PipelineManager(self.config)
config = deepcopy(self.config)
config.metrics = MetricsConfig(enabled=True, port=666)
pipeline_manager = PipelineManager(config)
pipeline_manager.prometheus_exporter.is_running = False
with mock.patch.object(pipeline_manager.prometheus_exporter, "run") as mock_run:
pipeline_manager.restart()
mock_run.assert_called()

def test_restart_sets_deterministic_pipline_index(self):
config = deepcopy(self.config)
config.metrics = MetricsConfig(enabled=False, port=666)
pipeline_manager = PipelineManager(config)
pipeline_manager.set_count(3)
expected_calls = [mock.call(1), mock.call(2), mock.call(3)]
with mock.patch.object(pipeline_manager, "_create_pipeline") as mock_create_pipeline:
pipeline_manager.restart()
mock_create_pipeline.assert_has_calls(expected_calls)

def test_restart_failed_pipelines_sets_old_pipeline_index(self):
pipeline_manager = PipelineManager(self.config)
pipeline_manager.set_count(3)
pipeline_manager._pipelines[0] = mock.MagicMock()
pipeline_manager._pipelines[0].is_alive.return_value = False
with mock.patch.object(pipeline_manager, "_create_pipeline") as mock_create_pipeline:
pipeline_manager.restart_failed_pipeline()
mock_create_pipeline.assert_called_once_with(1)

0 comments on commit 810e933

Please sign in to comment.