Skip to content

Commit

Permalink
event: add event management primitives.
Browse files Browse the repository at this point in the history
  • Loading branch information
Jean-François Nguyen committed Mar 31, 2020
1 parent 967a65f commit 8ebe0f6
Show file tree
Hide file tree
Showing 2 changed files with 379 additions and 0 deletions.
189 changes: 189 additions & 0 deletions nmigen_soc/event.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
import enum
from collections import OrderedDict

from nmigen import *


__all__ = ["Source", "EventMap", "Monitor"]


class Source(Record):
class Trigger(enum.Enum):
"""Event trigger mode."""
LEVEL = "level"
RISE = "rise"
FALL = "fall"

"""Event source interface.
Parameters
----------
trigger : :class:`Trigger`
Trigger mode. An event can be edge- or level-triggered by the input line.
name: str
Name of the underlying record.
Attributes
----------
i : Signal()
Input line. Sampled in order to detect an event.
trg : Signal()
Event trigger. Asserted when an event occurs, according to the trigger mode.
"""
def __init__(self, *, trigger="level", name=None, src_loc_at=0):
choices = ("level", "rise", "fall")
if not isinstance(trigger, Source.Trigger) and trigger not in choices:
raise ValueError("Invalid trigger mode {!r}; must be one of {}"
.format(trigger, ", ".join(choices)))
self.trigger = Source.Trigger(trigger)

super().__init__([
("i", 1),
("trg", 1),
], name=name, src_loc_at=1 + src_loc_at)

# FIXME: get rid of this
__hash__ = object.__hash__


class EventMap:
"""Event map.
An event map is a description of a set of events. It is built by adding event sources
and can be queried later to determine their index. Event indexing is done implicitly by
increment, starting at 0.
"""
def __init__(self):
self._sources = OrderedDict()
self._frozen = False

@property
def size(self):
"""Size of the event map.
Return value
------------
The number of event sources in the map.
"""
return len(self._sources)

def freeze(self):
"""Freeze the event map.
Once the event map is frozen, sources cannot be added anymore.
"""
self._frozen = True

def add(self, src):
"""Add an event source.
Arguments
---------
src : :class:`Source`
Event source.
Exceptions
----------
Raises :exn:`ValueError` if the event map is frozen.
"""
if self._frozen:
raise ValueError("Event map has been frozen. Cannot add source.")
if not isinstance(src, Source):
raise TypeError("Event source must be an instance of event.Source, not {!r}"
.format(src))
if src not in self._sources:
self._sources[src] = self.size

def index(self, src):
"""Get the index corresponding to an event source.
Arguments
---------
src : :class:`Source`
Event source.
Return value
------------
The index of the source.
Exceptions
----------
Raises :exn:`KeyError` if the source is not found.
"""
if not isinstance(src, Source):
raise TypeError("Event source must be an instance of event.Source, not {!r}"
.format(src))
return self._sources[src]

def sources(self):
"""Iterate event sources.
Yield values
------------
A tuple ``src, index`` corresponding to an event source and its index.
"""
for src, index in self._sources.items():
yield src, index


class Monitor(Elaboratable):
"""Event monitor.
A monitor for event sources sharing access to an interrupt request line.
Parameters
----------
event_map : :class:`EventMap`
Event map.
Attributes
----------
event_map : :class:`EventMap`
Event map.
enable : Signal(event_map.size), one-hot, in
Enabled events.
pending : Signal(event_map.size), one-hot, out
Pending events.
clear : Signal(event_map.size), one-hot, in
Clear the selected pending events.
irq : Signal(), out
Interrupt request. Asserted when an event is both enabled and pending.
"""
def __init__(self, event_map):
if not isinstance(event_map, EventMap):
raise TypeError("Event map must be an instance of EventMap, not {!r}"
.format(event_map))
event_map.freeze()
self.event_map = event_map

self.enable = Signal(event_map.size)
self.pending = Signal(event_map.size)
self.clear = Signal(event_map.size)

self.irq = Signal()

def elaborate(self, platform):
m = Module()

for src, index in self.event_map.sources():
if src.trigger != Source.Trigger.LEVEL:
src_i_r = Signal.like(src.i, name_suffix="_r")
m.d.sync += src_i_r.eq(src.i)

if src.trigger == Source.Trigger.LEVEL:
m.d.comb += src.trg.eq(src.i)
elif src.trigger == Source.Trigger.RISE:
m.d.comb += src.trg.eq(~src_i_r & src.i)
elif src.trigger == Source.Trigger.FALL:
m.d.comb += src.trg.eq( src_i_r & ~src.i)
else:
assert False # :nocov:

with m.If(src.trg):
m.d.sync += self.pending[index].eq(1)
with m.Elif(self.clear[index]):
m.d.sync += self.pending[index].eq(0)

m.d.comb += self.irq.eq((self.enable & self.pending).any())

return m
190 changes: 190 additions & 0 deletions nmigen_soc/test/test_event.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
# nmigen: UnusedElaboratable=no

import unittest
from nmigen import *
from nmigen.back.pysim import *

from ..event import *


def simulation_test(dut, process):
with Simulator(dut, vcd_file=open("test.vcd", "w")) as sim:
sim.add_clock(1e-6)
sim.add_sync_process(process)
sim.run()


class SourceTestCase(unittest.TestCase):
def test_level(self):
src = Source(trigger="level")
self.assertEqual(src.trigger, Source.Trigger.LEVEL)

def test_rise(self):
src = Source(trigger="rise")
self.assertEqual(src.trigger, Source.Trigger.RISE)

def test_fall(self):
src = Source(trigger="fall")
self.assertEqual(src.trigger, Source.Trigger.FALL)

def test_trigger_wrong(self):
with self.assertRaisesRegex(ValueError,
r"Invalid trigger mode 'foo'; must be one of level, rise, fall"):
src = Source(trigger="foo")


class EventMapTestCase(unittest.TestCase):
def test_add(self):
src_0 = Source()
src_1 = Source()
event_map = EventMap()
event_map.add(src_0)
event_map.add(src_1)
self.assertTrue(src_0 in event_map._sources)
self.assertTrue(src_1 in event_map._sources)

def test_add_wrong(self):
event_map = EventMap()
with self.assertRaisesRegex(TypeError,
r"Event source must be an instance of event.Source, not 'foo'"):
event_map.add("foo")

def test_add_wrong_frozen(self):
event_map = EventMap()
event_map.freeze()
with self.assertRaisesRegex(ValueError,
r"Event map has been frozen. Cannot add source."):
event_map.add(Source())

def test_size(self):
event_map = EventMap()
event_map.add(Source())
event_map.add(Source())
self.assertEqual(event_map.size, 2)

def test_index(self):
src_0 = Source()
src_1 = Source()
event_map = EventMap()
event_map.add(src_0)
event_map.add(src_1)
self.assertEqual(event_map.index(src_0), 0)
self.assertEqual(event_map.index(src_1), 1)

def test_index_add_twice(self):
src = Source()
event_map = EventMap()
event_map.add(src)
event_map.add(src)
self.assertEqual(event_map.index(src), 0)
self.assertEqual(event_map.size, 1)

def test_index_wrong(self):
event_map = EventMap()
with self.assertRaisesRegex(TypeError,
r"Event source must be an instance of event.Source, not 'foo'"):
event_map.index("foo")

def test_index_not_found(self):
src = Source()
event_map = EventMap()
with self.assertRaises(KeyError):
event_map.index(src)

def test_iter_sources(self):
src_0 = Source()
src_1 = Source()
event_map = EventMap()
event_map.add(src_0)
event_map.add(src_1)
self.assertEqual(list(event_map.sources()), [
(src_0, 0),
(src_1, 1),
])


class MonitorTestCase(unittest.TestCase):
def test_simple(self):
src_0 = Source()
src_1 = Source()
event_map = EventMap()
event_map.add(src_0)
event_map.add(src_1)
dut = Monitor(event_map)
self.assertEqual(dut.enable.width, 2)
self.assertEqual(dut.pending.width, 2)
self.assertEqual(dut.clear.width, 2)

def test_event_map_wrong(self):
with self.assertRaisesRegex(TypeError,
r"Event map must be an instance of EventMap, not 'foo'"):
dut = Monitor("foo")

def test_events(self):
src_0 = Source(trigger="level")
src_1 = Source(trigger="rise")
src_2 = Source(trigger="fall")
event_map = EventMap()
event_map.add(src_0)
event_map.add(src_1)
event_map.add(src_2)
dut = Monitor(event_map)

def process():
yield src_0.i.eq(1)
yield src_1.i.eq(0)
yield src_2.i.eq(1)
yield
self.assertEqual((yield src_0.trg), 1)
self.assertEqual((yield src_1.trg), 0)
self.assertEqual((yield src_2.trg), 0)
yield
self.assertEqual((yield dut.pending), 0b001)
self.assertEqual((yield dut.irq), 0)

yield dut.enable.eq(0b111)
yield
self.assertEqual((yield dut.irq), 1)

yield dut.clear.eq(0b001)
yield
self.assertEqual((yield dut.pending), 0b001)
self.assertEqual((yield dut.irq), 1)

yield src_0.i.eq(0)
yield
self.assertEqual((yield src_0.trg), 0)
self.assertEqual((yield src_1.trg), 0)
self.assertEqual((yield src_2.trg), 0)
yield
self.assertEqual((yield dut.pending), 0b000)
self.assertEqual((yield dut.irq), 0)

yield src_1.i.eq(1)
yield
self.assertEqual((yield src_0.trg), 0)
self.assertEqual((yield src_1.trg), 1)
self.assertEqual((yield src_2.trg), 0)
yield
self.assertEqual((yield dut.pending), 0b010)
self.assertEqual((yield dut.irq), 1)

yield src_2.i.eq(0)
yield
self.assertEqual((yield src_0.trg), 0)
self.assertEqual((yield src_1.trg), 0)
self.assertEqual((yield src_2.trg), 1)
yield
self.assertEqual((yield dut.pending), 0b110)
self.assertEqual((yield dut.irq), 1)

yield dut.clear.eq(0b110)
yield
self.assertEqual((yield src_0.trg), 0)
self.assertEqual((yield src_1.trg), 0)
self.assertEqual((yield src_2.trg), 0)
yield
self.assertEqual((yield dut.pending), 0b000)
self.assertEqual((yield dut.irq), 0)

simulation_test(dut, process)

0 comments on commit 8ebe0f6

Please sign in to comment.