Skip to content
18 changes: 18 additions & 0 deletions src/mx_bluesky/beamlines/i04/callbacks/murko_callback.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from dodal.log import LOGGER
from event_model.documents import Event, RunStart, RunStop
from redis import StrictRedis
from redis.exceptions import ConnectionError

FORWARDING_COMPLETE_MESSAGE = "image_forwarding_complete"

Expand Down Expand Up @@ -57,7 +58,20 @@ def __init__(self, redis_host: str, redis_password: str, redis_db: int = 0):
self.last_uuid = None
self.previous_omegas: list[OmegaReading] = []

def _check_redis_connection(self):
try:
self.redis_client.ping()
return True
except ConnectionError:
LOGGER.warning(
f"Failed to connect to redis: {self.redis_client}. Murko callback will not run"
)
return False

def start(self, doc: RunStart) -> RunStart | None:
self.redis_connected = self._check_redis_connection()
if not self.redis_connected:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could: It might be better to re-check the connection on start. For other callbacks we only create them once when we start up and never recreate them. I know we currently don't for MurkoCallback but if we did want to this would mean that we would have to restart the whole env to pick up redis coming back up

return doc
self.murko_metadata: dict = {"sample_id": doc.get("sample_id")}
self.last_uuid = None
self.previous_omegas = []
Expand All @@ -67,6 +81,8 @@ def start(self, doc: RunStart) -> RunStart | None:
return doc

def event(self, doc: Event) -> Event:
if not self.redis_connected:
return doc
data = doc["data"]
for prefix in ("oav", "oav_full_screen"):
if f"{prefix}-beam_centre_j" in data:
Expand Down Expand Up @@ -114,6 +130,8 @@ def call_murko(self, uuid: str, omega: float):
self.redis_client.publish("murko", json.dumps(metadata))

def stop(self, doc: RunStop) -> RunStop | None:
if not self.redis_connected:
return doc
LOGGER.info(f"Finished streaming {self.murko_metadata['sample_id']} to murko")
LOGGER.info(
f"Publishing forwarding complete message: {FORWARDING_COMPLETE_MESSAGE}"
Expand Down
1 change: 1 addition & 0 deletions src/mx_bluesky/beamlines/i04/thawing_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ def thaw_and_murko_centre(
initial_zoom_level = yield from bps.rd(oav_fs.zoom_controller.level)
initial_velocity = yield from bps.rd(smargon.omega.velocity)
new_velocity = abs(rotation / time_to_thaw) * 2.0

murko_callback = MurkoCallback(
RedisConstants.REDIS_HOST,
RedisConstants.REDIS_PASSWORD,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import json
from unittest.mock import MagicMock
from unittest.mock import MagicMock, patch

import pytest
from event_model import Event
Expand Down Expand Up @@ -275,3 +275,26 @@ def test_when_murko_called_with_full_screen_and_roi_event_then_metadata_updates_
)
def test_extrapolate_omega(latest_omega, previous_omega, now, expected):
assert extrapolate_omega(latest_omega, previous_omega, now) == expected


@patch(
"mx_bluesky.beamlines.i04.callbacks.murko_callback.MurkoCallback._check_redis_connection"
)
def test_if_redis_connection_fails_then_there_is_no_error(
mock_check_redis_connection: MagicMock,
):
mock_check_redis_connection.return_value = False
callback = MurkoCallback("", "")
doc = {}
callback.start(doc)
callback.event(doc)
callback.stop(doc)


def test_rwarning_is_logged_if_redis_connection_fails(caplog):
callback = MurkoCallback("", "")
doc = {}
callback.start(doc)
log_message = caplog.records[-1]
assert log_message.levelname == "WARNING"
assert "Failed to connect to redis: " in log_message.message
1 change: 1 addition & 0 deletions tests/unit_tests/beamlines/i04/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,5 @@
def murko_callback() -> MurkoCallback:
callback = MurkoCallback("", "")
callback.redis_client = MagicMock()
callback.redis_connected = True
return callback
61 changes: 54 additions & 7 deletions tests/unit_tests/beamlines/i04/test_thawing.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,9 @@ def test_thaw_and_stream_adds_murko_callback_and_produces_expected_messages(

@patch("mx_bluesky.beamlines.i04.thawing_plan.MurkoCallback.stop")
@patch("mx_bluesky.beamlines.i04.thawing_plan.MurkoCallback.call_murko")
@patch("mx_bluesky.beamlines.i04.thawing_plan.MurkoCallback._check_redis_connection")
def test_thaw_and_stream_will_produce_events_that_call_murko(
patch_check_redis_connection: MagicMock,
patch_murko_call: MagicMock,
patch_stop_call: MagicMock,
smargon: Smargon,
Expand All @@ -272,6 +274,8 @@ def test_thaw_and_stream_will_produce_events_that_call_murko(
oav_forwarder: OAVToRedisForwarder,
run_engine: RunEngine,
):
patch_check_redis_connection.return_value = True

class StopPlanError(Exception):
pass

Expand Down Expand Up @@ -367,7 +371,7 @@ def _run_thaw_and_stream_and_assert_zoom_changes(

@patch("mx_bluesky.beamlines.i04.thawing_plan.MurkoCallback")
def test_given_thaw_succeeds_then_thaw_and_stream_sets_zoom_to_1_and_back(
patch_murko_callback,
patch_murko_callback: MagicMock,
smargon: Smargon,
thawer: Thawer,
oav_forwarder: OAVToRedisForwarder,
Expand All @@ -383,8 +387,8 @@ def test_given_thaw_succeeds_then_thaw_and_stream_sets_zoom_to_1_and_back(
@patch("mx_bluesky.beamlines.i04.thawing_plan.MurkoCallback")
@patch("mx_bluesky.beamlines.i04.thawing_plan.bps.monitor")
def test_given_thaw_fails_then_thaw_and_stream_sets_zoom_to_1_and_back(
mock__thaw,
patch_murko_callback,
mock__thaw: MagicMock,
patch_murko_callback: MagicMock,
smargon: Smargon,
thawer: Thawer,
oav_forwarder: OAVToRedisForwarder,
Expand All @@ -403,8 +407,8 @@ def test_given_thaw_fails_then_thaw_and_stream_sets_zoom_to_1_and_back(
"mx_bluesky.beamlines.i04.thawing_plan._rotate_in_one_direction_and_stream_to_redis"
)
def test_thaw_and_murko_centre_stages_and_unstages_murko_results_twice(
mock_rotate_and_stream,
patch_murko_callback,
mock_rotate_and_stream: MagicMock,
patch_murko_callback: MagicMock,
smargon: Smargon,
thawer: Thawer,
oav_forwarder: OAVToRedisForwarder,
Expand All @@ -424,8 +428,8 @@ def test_thaw_and_murko_centre_stages_and_unstages_murko_results_twice(
@patch("mx_bluesky.beamlines.i04.thawing_plan.MurkoCallback")
@patch("mx_bluesky.beamlines.i04.thawing_plan.bps.monitor")
def test_given_thaw_and_murko_centre_errors_then_murko_results_still_unstaged(
mock__thaw,
patch_murko_callback,
mock__thaw: MagicMock,
patch_murko_callback: MagicMock,
smargon: Smargon,
thawer: Thawer,
oav_forwarder: OAVToRedisForwarder,
Expand Down Expand Up @@ -583,3 +587,46 @@ def test_thawing_plan_with_murko_callback_puts_correct_metadata_into_redis(
assert publish_call_args_list[1].args[1] == json.dumps(FORWARDING_COMPLETE_MESSAGE)
assert publish_call_args_list[2].args[1] == json.dumps(expected_roi_md)
assert publish_call_args_list[3].args[1] == json.dumps(FORWARDING_COMPLETE_MESSAGE)


@patch("mx_bluesky.beamlines.i04.thawing_plan.MurkoCallback._check_redis_connection")
def test_plans_carry_on_thaw_if_redis_connection_check_fails(
patch_callback_check_redis_connection: MagicMock,
smargon: Smargon,
thawer: Thawer,
robot: BartRobot,
oav_forwarder: OAVToRedisForwarder,
run_engine: RunEngine,
):
patch_callback_check_redis_connection.return_value = False
murko_results = MurkoResultsDevice()
murko_results._check_redis_connection = AsyncMock(return_value=False)
for plan in (
thaw_and_murko_centre(
10,
360,
thawer=thawer,
smargon=smargon,
robot=robot,
oav_to_redis_forwarder=oav_forwarder,
murko_results=murko_results,
),
thaw_and_stream_to_redis(
10,
360,
thawer=thawer,
smargon=smargon,
robot=robot,
oav_to_redis_forwarder=oav_forwarder,
),
):
run_engine(plan)

omega_put = get_mock_put(smargon.omega.user_setpoint)

assert omega_put.call_args_list == [
call(360.0, wait=True),
call(0.0, wait=True),
]

omega_put.reset_mock()
Loading