Skip to content

Commit

Permalink
feat!: new event bus config format (#272)
Browse files Browse the repository at this point in the history
  • Loading branch information
Rebecca Graber authored Oct 6, 2023
1 parent 03a8710 commit 6e7dbd2
Show file tree
Hide file tree
Showing 9 changed files with 217 additions and 50 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,13 @@ Change Log
Unreleased
----------

[9.0.0] - 2023-10-04
--------------------
Changed
~~~~~~~
* Re-licensed this repository from AGPL 3.0 to Apache 2.0
* **Breaking change**: Restructured EVENT_BUS_PRODUCER_CONFIG

[8.9.0] - 2023-10-04
--------------------
Expand Down
65 changes: 65 additions & 0 deletions docs/decisions/0014-new-event-bus-producer-config.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
13. Change event producer config settings
#########################################

Status
******

**Accepted**

Context
*******

In a previous ADR, we set the structure for the event bus producing configuration to a dictionary like the following:

.. code-block:: python
{ 'org.openedx.content_authoring.xblock.published.v1': [
{'topic': 'content-authoring-xblock-lifecycle', 'event_key_field': 'xblock_info.usage_key', 'enabled': True},
{'topic': 'content-authoring-xblock-published', 'event_key_field': 'xblock_info.usage_key', 'enabled': False},
],
'org.openedx.content_authoring.xblock.deleted.v1': [
{'topic': 'content-authoring-xblock-lifecycle', 'event_key_field': 'xblock_info.usage_key', 'enabled': True},
],
}
While attempting to implement this for edx-platform, we came across some problems with using this structure. In particular, it results in ambiguity
because maintainers can accidentally add something like
``{'topic': 'content-authoring-xblock-lifecycle', 'event_key_field': 'xblock_info.usage_key', 'enabled': True}`` and
``{'topic': 'content-authoring-xblock-lifecycle', 'event_key_field': 'xblock_info.usage_key', 'enabled': False}`` to the same event_type.
Moreover, enabling/disabling an existing event/topic pair requires reaching into the structure, searching for the dictionary with the correct topic, and modifying
the existing object, which is awkward.

This ADR aims to propose a new structure that will provide greater flexibility in using this configuration.

Decision
********

The new EVENT_BUS_PRODUCER_CONFIG will have the following configuration format:

.. code-block:: python
# .. setting_name: EVENT_BUS_PRODUCER_CONFIG
# .. setting_default: {}
# .. setting_description: Dictionary of event_types to dictionaries for topic related configuration.
# Each topic configuration dictionary uses the topic as a key and contains a flag called `enabled`
# denoting whether the event will be and `event_key_field` which is a period-delimited string path
# to event data field to use as event key.
# Note: The topic names should not include environment prefix as it will be dynamically added based on
# EVENT_BUS_TOPIC_PREFIX setting.
EVENT_BUS_PRODUCER_CONFIG = {
'org.openedx.content_authoring.xblock.published.v1': {
'content-authoring-xblock-lifecycle': {'event_key_field': 'xblock_info.usage_key', 'enabled': False},
'content-authoring-xblock-published': {'event_key_field': 'xblock_info.usage_key', 'enabled': True}
},
'org.openedx.content_authoring.xblock.deleted.v1': {
'content-authoring-xblock-lifecycle': {'event_key_field': 'xblock_info.usage_key', 'enabled': True},
},
}
A new ``merge_producer_configs`` method will be added to openedx-events.event_bus to make it easier to correctly determine the config map from multiple sources.

Consequences
************

* As long as the implementing IDA calls ``merge_producer_configs``, maintainers can add existing topics to new event_types without having to recreate the whole dictionary
* There is no ambiguity about whether an event/topic pair is enabled or disabled
1 change: 1 addition & 0 deletions docs/decisions/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@ Architectural Decision Records (ADRs)
0011-depending-on-multiple-event-bus-implementations
0012-producing-to-event-bus-via-settings
0013-special-exam-submission-and-review-events
0014-new-event-bus-producer-config
2 changes: 1 addition & 1 deletion openedx_events/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@
more information about the project.
"""

__version__ = "8.9.0"
__version__ = "9.0.0"
55 changes: 39 additions & 16 deletions openedx_events/apps.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,22 @@

def general_signal_handler(sender, signal, **kwargs): # pylint: disable=unused-argument
"""
Signal handler for publishing events to configured event bus.
Signal handler for producing events to configured event bus.
"""
configurations = getattr(settings, "EVENT_BUS_PRODUCER_CONFIG", {}).get(signal.event_type, ())
event_type_producer_configs = getattr(settings, "EVENT_BUS_PRODUCER_CONFIG", {}).get(signal.event_type, {})
# event_type_producer_configs should look something like
# {
# "topic_a": { "event_key_field": "my.key.field", "enabled": True },
# "topic_b": { "event_key_field": "my.key.field", "enabled": False }
# }"
event_data = {key: kwargs.get(key) for key in signal.init_data}
for configuration in configurations:
if configuration["enabled"]:

for topic in event_type_producer_configs.keys():
if event_type_producer_configs[topic]["enabled"] is True:
get_producer().send(
signal=signal,
topic=configuration["topic"],
event_key_field=configuration["event_key_field"],
topic=topic,
event_key_field=event_type_producer_configs[topic]["event_key_field"],
event_data=event_data,
event_metadata=kwargs["metadata"],
)
Expand All @@ -34,47 +40,64 @@ class OpenedxEventsConfig(AppConfig):

name = "openedx_events"

def _get_validated_signal_config(self, event_type, configurations):
def _get_validated_signal_config(self, event_type, configuration):
"""
Validate signal configuration format.
Example expected signal configuration:
{
"topic_a": { "event_key_field": "my.key.field", "enabled": True },
"topic_b": { "event_key_field": "my.key.field", "enabled": False }
}
Raises:
ProducerConfigurationError: If configuration is not valid.
"""
if not isinstance(configurations, list) and not isinstance(configurations, tuple):
if not isinstance(configuration, dict):
raise ProducerConfigurationError(
event_type=event_type,
message="Configuration for event_types should be a list or a tuple of dictionaries"
message="Configuration for event_types should be a dict"
)
try:
signal = OpenEdxPublicSignal.get_signal_by_type(event_type)
except KeyError as exc:
raise ProducerConfigurationError(message=f"No OpenEdxPublicSignal of type: '{event_type}'.") from exc
for configuration in configurations:
if not isinstance(configuration, dict):
for _, topic_configuration in configuration.items():
if not isinstance(topic_configuration, dict):
raise ProducerConfigurationError(
event_type=event_type,
message="One of the configuration object is not a dictionary"
message="One of the configuration objects is not a dictionary"
)
expected_keys = {"topic": str, "event_key_field": str, "enabled": bool}
expected_keys = {"event_key_field": str, "enabled": bool}
for expected_key, expected_type in expected_keys.items():
if expected_key not in configuration:
if expected_key not in topic_configuration.keys():
raise ProducerConfigurationError(
event_type=event_type,
message=f"One of the configuration object is missing '{expected_key}' key."
)
if not isinstance(configuration[expected_key], expected_type):
if not isinstance(topic_configuration[expected_key], expected_type):
raise ProducerConfigurationError(
event_type=event_type,
message=(f"Expected type: {expected_type} for '{expected_key}', "
f"found: {type(configuration[expected_key])}")
f"found: {type(topic_configuration[expected_key])}")
)
return signal

def ready(self):
"""
Read `EVENT_BUS_PRODUCER_CONFIG` setting and connects appropriate handlers to the events based on it.
Example expected configuration:
{
"org.openedx.content_authoring.xblock.deleted.v1" : {
"topic_a": { "event_key_field": "xblock_info.usage_key", "enabled": True },
"topic_b": { "event_key_field": "xblock_info.usage_key", "enabled": False }
},
"org.openedx.content_authoring.course.catalog_info.changed.v1" : {
"topic_c": {"event_key_field": "course_info.course_key", "enabled": True }
}
}
Raises:
ProducerConfigurationError: If `EVENT_BUS_PRODUCER_CONFIG` is not valid.
"""
Expand Down
26 changes: 26 additions & 0 deletions openedx_events/event_bus/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
``EVENT_BUS_CONSUMER``.
"""

import copy
import warnings
from abc import ABC, abstractmethod
from functools import lru_cache
Expand Down Expand Up @@ -183,3 +184,28 @@ def make_single_consumer(*, topic: str, group_id: str,
def _reset_state(sender, **kwargs): # pylint: disable=unused-argument
"""Reset caches when settings change during unit tests."""
get_producer.cache_clear()


def merge_producer_configs(producer_config_original, producer_config_overrides):
"""
Merge two EVENT_BUS_PRODUCER_CONFIG maps.
Arguments:
producer_config_original: An EVENT_BUS_PRODUCER_CONFIG-structured map
producer_config_overrides: An EVENT_BUS_PRODUCER_CONFIG-structured map
Returns:
A new EVENT_BUS_PRODUCER_CONFIG map created by combining the two maps. All event_type/topic pairs in
producer_config_overrides are added to the producer_config_original. If there is a conflict on whether a
particular event_type/topic pair is enabled, producer_config_overrides wins out.
"""
combined = copy.deepcopy(producer_config_original)
for event_type, event_type_config_overrides in producer_config_overrides.items():
event_type_config_combined = combined.get(event_type, {})
for topic, topic_config_overrides in event_type_config_overrides.items():
topic_config_combined = event_type_config_combined.get(topic, {})
topic_config_combined['enabled'] = topic_config_overrides['enabled']
topic_config_combined['event_key_field'] = topic_config_overrides['event_key_field']
event_type_config_combined[topic] = topic_config_combined
combined[event_type] = event_type_config_combined
return combined
61 changes: 60 additions & 1 deletion openedx_events/event_bus/tests/test_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@
Tests for event bus implementation loader.
"""

import copy
import warnings
from contextlib import contextmanager
from unittest import TestCase

from django.test import override_settings

from openedx_events.data import EventsMetadata
from openedx_events.event_bus import _try_load, get_producer, make_single_consumer
from openedx_events.event_bus import _try_load, get_producer, make_single_consumer, merge_producer_configs
from openedx_events.learning.signals import SESSION_LOGIN_COMPLETED


Expand Down Expand Up @@ -126,3 +127,61 @@ def test_default_does_nothing(self):
with assert_warnings([]):
# Nothing thrown, no warnings.
assert consumer.consume_indefinitely() is None


class TestSettings(TestCase):
def test_merge_configs(self):
dict_a = {
'event_type_0': {
'topic_a': {'event_key_field': 'field', 'enabled': True},
'topic_b': {'event_key_field': 'field', 'enabled': True}
},
'event_type_1': {
'topic_c': {'event_key_field': 'field', 'enabled': True},
}
}
# for ensuring we didn't change the original dict
dict_a_copy = copy.deepcopy(dict_a)
dict_b = {
'event_type_0': {
# disable an existing event/topic pairing
'topic_a': {'event_key_field': 'field', 'enabled': False},
# add a new topic to an existing topic
'topic_d': {'event_key_field': 'field', 'enabled': True},
},
# add a new event_type
'event_type_2': {
'topic_e': {'event_key_field': 'field', 'enabled': True},
}
}
dict_b_copy = copy.deepcopy(dict_b)
result = merge_producer_configs(dict_a, dict_b)
self.assertDictEqual(result, {
'event_type_0': {
'topic_a': {'event_key_field': 'field', 'enabled': False},
'topic_b': {'event_key_field': 'field', 'enabled': True},
'topic_d': {'event_key_field': 'field', 'enabled': True},
},
'event_type_1': {
'topic_c': {'event_key_field': 'field', 'enabled': True},
},
'event_type_2': {
'topic_e': {'event_key_field': 'field', 'enabled': True},
}
})
self.assertDictEqual(dict_a, dict_a_copy)
self.assertDictEqual(dict_b, dict_b_copy)

def test_merge_configs_with_empty(self):
dict_a = {
'event_type_0': {
'topic_a': {'event_key_field': 'field', 'enabled': True},
'topic_b': {'event_key_field': 'field', 'enabled': True}
},
'event_type_1': {
'topic_c': {'event_key_field': 'field', 'enabled': True},
}
}
dict_b = {}
result = merge_producer_configs(dict_a, dict_b)
self.assertDictEqual(result, dict_a)
25 changes: 14 additions & 11 deletions openedx_events/tests/test_producer_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def setUp(self) -> None:
@patch('openedx_events.apps.get_producer')
def test_enabled_disabled_events(self, mock_producer):
"""
Check whether XBLOCK_PUBLISHED is connected to the handler and the handler only publishes enabled events.
Check whether XBLOCK_PUBLISHED is connected to the handler and the handler only produces enabled events.
Args:
mock_producer: mock get_producer to inspect the arguments.
Expand All @@ -46,12 +46,12 @@ def test_enabled_disabled_events(self, mock_producer):
# check that call_args_list only consists of enabled topics.
call_args = mock_send.send.call_args_list[0][1]
self.assertDictContainsSubset(
{'topic': 'content-authoring-xblock-lifecycle', 'event_key_field': 'xblock_info.usage_key'},
{'topic': 'enabled_topic_a', 'event_key_field': 'xblock_info.usage_key'},
call_args
)
call_args = mock_send.send.call_args_list[1][1]
self.assertDictContainsSubset(
{'topic': 'content-authoring-all-status', 'event_key_field': 'xblock_info.usage_key'},
{'topic': 'enabled_topic_b', 'event_key_field': 'xblock_info.usage_key'},
call_args
)

Expand All @@ -78,31 +78,34 @@ def test_configuration_is_validated(self):
with pytest.raises(ProducerConfigurationError, match="should be a dictionary"):
apps.get_app_config("openedx_events").ready()

with override_settings(EVENT_BUS_PRODUCER_CONFIG={"invalid.event.type": []}):
with override_settings(EVENT_BUS_PRODUCER_CONFIG={"invalid.event.type": {}}):
with pytest.raises(ProducerConfigurationError, match="No OpenEdxPublicSignal of type"):
apps.get_app_config("openedx_events").ready()

with override_settings(EVENT_BUS_PRODUCER_CONFIG={"org.openedx.content_authoring.xblock.deleted.v1": ""}):
with pytest.raises(ProducerConfigurationError, match="should be a list or a tuple"):
with pytest.raises(ProducerConfigurationError, match="should be a dict"):
apps.get_app_config("openedx_events").ready()

with override_settings(EVENT_BUS_PRODUCER_CONFIG={"org.openedx.content_authoring.xblock.deleted.v1": [""]}):
with pytest.raises(ProducerConfigurationError, match="object is not a dictionary"):
with override_settings(EVENT_BUS_PRODUCER_CONFIG={"org.openedx.content_authoring.xblock.deleted.v1":
{"topic": ""}}):
with pytest.raises(ProducerConfigurationError, match="One of the configuration objects is not a"
" dictionary"):
apps.get_app_config("openedx_events").ready()

with override_settings(
EVENT_BUS_PRODUCER_CONFIG={
"org.openedx.content_authoring.xblock.deleted.v1": [{"topic": "some", "enabled": True}]
"org.openedx.content_authoring.xblock.deleted.v1": {"topic": {"enabled": True}}
}
):
with pytest.raises(ProducerConfigurationError, match="missing 'event_key_field' key."):
apps.get_app_config("openedx_events").ready()

with override_settings(
EVENT_BUS_PRODUCER_CONFIG={
"org.openedx.content_authoring.xblock.deleted.v1": [
{"topic": "some", "enabled": 1, "event_key_field": "some"}
]
"org.openedx.content_authoring.xblock.deleted.v1":
{
"some": {"enabled": 1, "event_key_field": "some"}
}
}
):
with pytest.raises(
Expand Down
Loading

0 comments on commit 6e7dbd2

Please sign in to comment.