diff --git a/nmigen_soc/event.py b/nmigen_soc/event.py new file mode 100644 index 0000000..aab7ec2 --- /dev/null +++ b/nmigen_soc/event.py @@ -0,0 +1,210 @@ +import enum +from collections import OrderedDict + +from nmigen import * + + +__all__ = ["Source", "EventMap", "Monitor"] + + +class Source(Record): + class Trigger(enum.Enum): + """Event trigger mode.""" + LEVEL = "level" + RISE = "rise" + FALL = "fall" + + """Event source interface. + + Parameters + ---------- + trigger : :class:`Trigger` + Trigger mode. An event can be edge- or level-triggered by the input line. + name: str + Name of the underlying record. + + Attributes + ---------- + i : Signal() + Input line. Sampled in order to detect an event. + trg : Signal() + Event trigger. Asserted when an event occurs, according to the trigger mode. + """ + def __init__(self, *, trigger="level", name=None, src_loc_at=0): + choices = ("level", "rise", "fall") + if not isinstance(trigger, Source.Trigger) and trigger not in choices: + raise ValueError("Invalid trigger mode {!r}; must be one of {}" + .format(trigger, ", ".join(choices))) + self.trigger = Source.Trigger(trigger) + self._map = None + + super().__init__([ + ("i", 1), + ("trg", 1), + ], name=name, src_loc_at=1 + src_loc_at) + + @property + def event_map(self): + """Event map. + + Return value + ------------ + A :class:`EventMap` describing subordinate event sources. + + Exceptions + ---------- + Raises :exn:`NotImplementedError` if the source does not have an event map. + """ + if self._map is None: + raise NotImplementedError("Event source {!r} does not have an event map" + .format(self)) + return self._map + + @event_map.setter + def event_map(self, event_map): + if not isinstance(event_map, EventMap): + raise TypeError("Event map must be an instance of EventMap, not {!r}" + .format(event_map)) + event_map.freeze() + self._map = event_map + + # FIXME: get rid of this + __hash__ = object.__hash__ + + +class EventMap: + """Event map. + + An event map is a description of a set of events. It is built by adding event sources + and can be queried later to determine their index. Event indexing is done implicitly by + increment, starting at 0. + """ + def __init__(self): + self._sources = OrderedDict() + self._frozen = False + + @property + def size(self): + """Size of the event map. + + Return value + ------------ + The number of event sources in the map. + """ + return len(self._sources) + + def freeze(self): + """Freeze the event map. + + Once the event map is frozen, sources cannot be added anymore. + """ + self._frozen = True + + def add(self, src): + """Add an event source. + + Arguments + --------- + src : :class:`Source` + Event source. + + Exceptions + ---------- + Raises :exn:`ValueError` if the event map is frozen. + """ + if self._frozen: + raise ValueError("Event map has been frozen. Cannot add source.") + if not isinstance(src, Source): + raise TypeError("Event source must be an instance of event.Source, not {!r}" + .format(src)) + if src not in self._sources: + self._sources[src] = self.size + + def index(self, src): + """Get the index corresponding to an event source. + + Arguments + --------- + src : :class:`Source` + Event source. + + Return value + ------------ + The index of the source. + + Exceptions + ---------- + Raises :exn:`KeyError` if the source is not found. + """ + if not isinstance(src, Source): + raise TypeError("Event source must be an instance of event.Source, not {!r}" + .format(src)) + return self._sources[src] + + def sources(self): + """Iterate event sources. + + Yield values + ------------ + A tuple ``src, index`` corresponding to an event source and its index. + """ + for src, index in self._sources.items(): + yield src, index + + +class Monitor(Elaboratable): + """Event monitor. + + A monitor for subordinate event sources. + + Parameters + ---------- + event_map : :class:`EventMap` + Event map. + trigger : :class:`Source.Trigger` + Trigger mode. See :class:`Source`. + + Attributes + ---------- + src : :class:`Source` + Event source. Its input is asserted when a subordinate event is enabled and pending. + enable : Signal(event_map.size), bit mask, in + Enabled events. + pending : Signal(event_map.size), bit mask, out + Pending events. + clear : Signal(event_map.size), bit mask, in + Clear selected pending events. + """ + def __init__(self, event_map, *, trigger="level"): + self.src = Source(trigger=trigger) + self.src.event_map = event_map + + self.enable = Signal(event_map.size) + self.pending = Signal(event_map.size) + self.clear = Signal(event_map.size) + + def elaborate(self, platform): + m = Module() + + for sub, index in self.src.event_map.sources(): + if sub.trigger != Source.Trigger.LEVEL: + sub_i_r = Signal.like(sub.i, name_suffix="_r") + m.d.sync += sub_i_r.eq(sub.i) + + if sub.trigger == Source.Trigger.LEVEL: + m.d.comb += sub.trg.eq(sub.i) + elif sub.trigger == Source.Trigger.RISE: + m.d.comb += sub.trg.eq(~sub_i_r & sub.i) + elif sub.trigger == Source.Trigger.FALL: + m.d.comb += sub.trg.eq( sub_i_r & ~sub.i) + else: + assert False # :nocov: + + with m.If(sub.trg): + m.d.sync += self.pending[index].eq(1) + with m.Elif(self.clear[index]): + m.d.sync += self.pending[index].eq(0) + + m.d.comb += self.src.i.eq((self.enable & self.pending).any()) + + return m diff --git a/nmigen_soc/test/test_event.py b/nmigen_soc/test/test_event.py new file mode 100644 index 0000000..9d6852b --- /dev/null +++ b/nmigen_soc/test/test_event.py @@ -0,0 +1,211 @@ +# nmigen: UnusedElaboratable=no + +import unittest +from nmigen import * +from nmigen.back.pysim import * + +from ..event import * + + +def simulation_test(dut, process): + with Simulator(dut, vcd_file=open("test.vcd", "w")) as sim: + sim.add_clock(1e-6) + sim.add_sync_process(process) + sim.run() + + +class SourceTestCase(unittest.TestCase): + def test_level(self): + src = Source(trigger="level") + self.assertEqual(src.trigger, Source.Trigger.LEVEL) + + def test_rise(self): + src = Source(trigger="rise") + self.assertEqual(src.trigger, Source.Trigger.RISE) + + def test_fall(self): + src = Source(trigger="fall") + self.assertEqual(src.trigger, Source.Trigger.FALL) + + def test_trigger_wrong(self): + with self.assertRaisesRegex(ValueError, + r"Invalid trigger mode 'foo'; must be one of level, rise, fall"): + src = Source(trigger="foo") + + def test_get_map_wrong(self): + src = Source() + with self.assertRaisesRegex(NotImplementedError, + r"Event source \(rec src i trg\) does not have an event map"): + src.event_map + + def test_get_map_frozen(self): + src = Source() + src.event_map = EventMap() + with self.assertRaisesRegex(ValueError, + r"Event map has been frozen. Cannot add source."): + src.event_map.add(Source()) + + def test_set_map_wrong(self): + src = Source() + with self.assertRaisesRegex(TypeError, + r"Event map must be an instance of EventMap, not 'foo'"): + src.event_map = "foo" + + +class EventMapTestCase(unittest.TestCase): + def test_add(self): + src_0 = Source() + src_1 = Source() + event_map = EventMap() + event_map.add(src_0) + event_map.add(src_1) + self.assertTrue(src_0 in event_map._sources) + self.assertTrue(src_1 in event_map._sources) + + def test_add_wrong(self): + event_map = EventMap() + with self.assertRaisesRegex(TypeError, + r"Event source must be an instance of event.Source, not 'foo'"): + event_map.add("foo") + + def test_add_wrong_frozen(self): + event_map = EventMap() + event_map.freeze() + with self.assertRaisesRegex(ValueError, + r"Event map has been frozen. Cannot add source."): + event_map.add(Source()) + + def test_size(self): + event_map = EventMap() + event_map.add(Source()) + event_map.add(Source()) + self.assertEqual(event_map.size, 2) + + def test_index(self): + src_0 = Source() + src_1 = Source() + event_map = EventMap() + event_map.add(src_0) + event_map.add(src_1) + self.assertEqual(event_map.index(src_0), 0) + self.assertEqual(event_map.index(src_1), 1) + + def test_index_add_twice(self): + src = Source() + event_map = EventMap() + event_map.add(src) + event_map.add(src) + self.assertEqual(event_map.index(src), 0) + self.assertEqual(event_map.size, 1) + + def test_index_wrong(self): + event_map = EventMap() + with self.assertRaisesRegex(TypeError, + r"Event source must be an instance of event.Source, not 'foo'"): + event_map.index("foo") + + def test_index_not_found(self): + src = Source() + event_map = EventMap() + with self.assertRaises(KeyError): + event_map.index(src) + + def test_iter_sources(self): + src_0 = Source() + src_1 = Source() + event_map = EventMap() + event_map.add(src_0) + event_map.add(src_1) + self.assertEqual(list(event_map.sources()), [ + (src_0, 0), + (src_1, 1), + ]) + + +class MonitorTestCase(unittest.TestCase): + def test_simple(self): + sub_0 = Source() + sub_1 = Source() + event_map = EventMap() + event_map.add(sub_0) + event_map.add(sub_1) + dut = Monitor(event_map, trigger="rise") + self.assertIs(dut.src.event_map, event_map) + self.assertEqual(dut.src.trigger, Source.Trigger.RISE) + self.assertEqual(dut.enable.width, 2) + self.assertEqual(dut.pending.width, 2) + self.assertEqual(dut.clear.width, 2) + + def test_event_map_wrong(self): + with self.assertRaisesRegex(TypeError, + r"Event map must be an instance of EventMap, not 'foo'"): + dut = Monitor("foo") + + def test_events(self): + sub_0 = Source(trigger="level") + sub_1 = Source(trigger="rise") + sub_2 = Source(trigger="fall") + event_map = EventMap() + event_map.add(sub_0) + event_map.add(sub_1) + event_map.add(sub_2) + dut = Monitor(event_map) + + def process(): + yield sub_0.i.eq(1) + yield sub_1.i.eq(0) + yield sub_2.i.eq(1) + yield + self.assertEqual((yield sub_0.trg), 1) + self.assertEqual((yield sub_1.trg), 0) + self.assertEqual((yield sub_2.trg), 0) + yield + self.assertEqual((yield dut.pending), 0b001) + self.assertEqual((yield dut.src.i), 0) + + yield dut.enable.eq(0b111) + yield + self.assertEqual((yield dut.src.i), 1) + + yield dut.clear.eq(0b001) + yield + self.assertEqual((yield dut.pending), 0b001) + self.assertEqual((yield dut.src.i), 1) + + yield sub_0.i.eq(0) + yield + self.assertEqual((yield sub_0.trg), 0) + self.assertEqual((yield sub_1.trg), 0) + self.assertEqual((yield sub_2.trg), 0) + yield + self.assertEqual((yield dut.pending), 0b000) + self.assertEqual((yield dut.src.i), 0) + + yield sub_1.i.eq(1) + yield + self.assertEqual((yield sub_0.trg), 0) + self.assertEqual((yield sub_1.trg), 1) + self.assertEqual((yield sub_2.trg), 0) + yield + self.assertEqual((yield dut.pending), 0b010) + self.assertEqual((yield dut.src.i), 1) + + yield sub_2.i.eq(0) + yield + self.assertEqual((yield sub_0.trg), 0) + self.assertEqual((yield sub_1.trg), 0) + self.assertEqual((yield sub_2.trg), 1) + yield + self.assertEqual((yield dut.pending), 0b110) + self.assertEqual((yield dut.src.i), 1) + + yield dut.clear.eq(0b110) + yield + self.assertEqual((yield sub_0.trg), 0) + self.assertEqual((yield sub_1.trg), 0) + self.assertEqual((yield sub_2.trg), 0) + yield + self.assertEqual((yield dut.pending), 0b000) + self.assertEqual((yield dut.src.i), 0) + + simulation_test(dut, process)