diff --git a/amaranth_stdio/serial.py b/amaranth_stdio/serial.py index 65373c6..be2748a 100644 --- a/amaranth_stdio/serial.py +++ b/amaranth_stdio/serial.py @@ -1,110 +1,198 @@ from amaranth import * +from amaranth.lib import enum, data, wiring +from amaranth.lib.wiring import In, Out, connect, flipped from amaranth.lib.cdc import FFSynchronizer from amaranth.utils import bits_for -__all__ = ["AsyncSerialRX", "AsyncSerialTX", "AsyncSerial"] +__all__ = ["Parity", "AsyncSerialRX", "AsyncSerialTX", "AsyncSerial"] + + +class Parity(enum.Enum): + """Asynchronous serial parity mode.""" + NONE = "none" + MARK = "mark" + SPACE = "space" + EVEN = "even" + ODD = "odd" + + def _compute_bit(self, data): + cast_data = Value.cast(data) + if self == self.NONE: + return Const(0, 0) + if self == self.MARK: + return Const(1, 1) + if self == self.SPACE: + return Const(0, 1) + if self == self.EVEN: + return cast_data.xor() + if self == self.ODD: + return ~cast_data.xor() + assert False # :nocov: + + +class _FrameLayout(data.StructLayout): + def __init__(self, data_bits, parity): + super().__init__({ + "start": unsigned(1), + "data": unsigned(data_bits), + "parity": unsigned(0 if parity == Parity.NONE else 1), + "stop": unsigned(1), + }) + + +class AsyncSerialRX(wiring.Component): + class Signature(wiring.Signature): + """Asynchronous serial receiver signature. + + Parameters + ---------- + divisor : int + Clock divisor initial value. Should be set to ``int(clk_frequency // baudrate)``. + divisor_bits : int + Clock divisor width. Optional. If omitted, ``bits_for(divisor)`` is used instead. + data_bits : int + Data bits per frame. + parity : :class:`Parity` + Parity mode. + + Interface attributes + -------------------- + divisor : Signal, in + Clock divisor. + data : Signal, out + Read data. Valid only when ``rdy`` is asserted. + err.overflow : Signal, out + Error flag. A new frame has been received, but the previous one was not acknowledged. + err.frame : Signal, out + Error flag. The received bits do not fit in a frame. + err.parity : Signal, out + Error flag. The parity check has failed. + rdy : Signal, out + Read strobe. + ack : Signal, in + Read acknowledge. Must be held asserted while data can be read out of the receiver. + i : Signal, in + Serial input. + + Raises + ------ + See :meth:`AsyncSerialRX.Signature.check_parameters`. + """ + def __init__(self, *, divisor, divisor_bits=None, data_bits=8, parity="none"): + self.check_parameters(divisor=divisor, divisor_bits=divisor_bits, data_bits=data_bits, + parity=parity) + self._divisor = divisor + self._divisor_bits = divisor_bits if divisor_bits is not None else bits_for(divisor) + self._data_bits = data_bits + self._parity = Parity(parity) + + super().__init__({ + "divisor": In(unsigned(self._divisor_bits), reset=self._divisor), + "data": Out(unsigned(self._data_bits)), + "err": Out(data.StructLayout({"overflow": 1, "frame": 1, "parity": 1})), + "rdy": Out(unsigned(1)), + "ack": In(unsigned(1)), + "i": In(unsigned(1), reset=1), + }) + + @classmethod + def check_parameters(cls, *, divisor, divisor_bits=None, data_bits=8, parity="none"): + """Validate signature parameters. + + Raises + ------ + :exc:`TypeError` + If ``divisor`` is not an integer greater than or equal to 5. + :exc:`TypeError` + If ``divisor_bits`` is not `None` and not an integer greater than or equal to + ``bits_for(divisor)``. + :exc:`TypeError` + If ``data_bits`` is not an integer greater than or equal to 0. + :exc:`ValueError` + If ``parity`` is not a :class:`Parity` member. + """ + # The clock divisor must be >= 5 to keep the receiver FSM synchronized with its input + # during a DONE->IDLE->BUSY transition. + AsyncSerial.Signature._check_divisor(divisor, divisor_bits, min_divisor=5) + if not isinstance(data_bits, int) or data_bits < 0: + raise TypeError(f"Data bits must be a non-negative integer, not {data_bits!r}") + # Raise a ValueError if parity is invalid. + Parity(parity) + + @property + def divisor(self): + return self._divisor + + @property + def divisor_bits(self): + return self._divisor_bits + + @property + def data_bits(self): + return self._data_bits + + @property + def parity(self): + return self._parity + + def __eq__(self, other): + """Compare signatures. + + Two signatures are equal if they have the same divisor value, divisor bits, + data bits, and parity. + """ + return (isinstance(other, AsyncSerialRX.Signature) and + self.divisor == other.divisor and + self.divisor_bits == other.divisor_bits and + self.data_bits == other.data_bits and + self.parity == other.parity) + + def __repr__(self): + return f"AsyncSerialRX.Signature({self.members!r})" - -def _check_divisor(divisor, bound): - if divisor < bound: - raise ValueError("Invalid divisor {!r}; must be greater than or equal to {}" - .format(divisor, bound)) - - -def _check_parity(parity): - choices = ("none", "mark", "space", "even", "odd") - if parity not in choices: - raise ValueError("Invalid parity {!r}; must be one of {}" - .format(parity, ", ".join(choices))) - - -def _compute_parity_bit(data, parity): - if parity == "none": - return C(0, 0) - if parity == "mark": - return C(1, 1) - if parity == "space": - return C(0, 1) - if parity == "even": - return data.xor() - if parity == "odd": - return ~data.xor() - assert False - - -def _wire_layout(data_bits, parity="none"): - return [ - ("start", 1), - ("data", data_bits), - ("parity", 0 if parity == "none" else 1), - ("stop", 1), - ] - - -class AsyncSerialRX(Elaboratable): """Asynchronous serial receiver. Parameters ---------- divisor : int - Clock divisor reset value. Should be set to ``int(clk_frequency // baudrate)``. + Clock divisor initial value. Should be set to ``int(clk_frequency // baudrate)``. divisor_bits : int - Optional. Clock divisor width. If omitted, ``bits_for(divisor)`` is used instead. + Clock divisor width. Optional. If omitted, ``bits_for(divisor)`` is used instead. data_bits : int - Data width. - parity : ``"none"``, ``"mark"``, ``"space"``, ``"even"``, ``"odd"`` + Data bits per frame. + parity : :class:`Parity` Parity mode. pins : :class:`amaranth.lib.io.Pin` - Optional. UART pins. See :class:`amaranth_boards.resources.UARTResource` for layout. + UART pins. Optional. See :class:`amaranth_boards.resources.UARTResource` for layout. + If provided, the ``i`` port of the receiver is internally connected to ``pins.rx.i``. - Attributes - ---------- - divisor : Signal, in - Clock divisor. - data : Signal, out - Read data. Valid only when ``rdy`` is asserted. - err.overflow : Signal, out - Error flag. A new frame has been received, but the previous one was not acknowledged. - err.frame : Signal, out - Error flag. The received bits do not fit in a frame. - err.parity : Signal, out - Error flag. The parity check has failed. - rdy : Signal, out - Read strobe. - ack : Signal, in - Read acknowledge. Must be held asserted while data can be read out of the receiver. - i : Signal, in - Serial input. If ``pins`` has been specified, ``pins.rx.i`` drives it. + Raises + ------ + See :meth:`AsyncSerialRX.Signature.check_parameters`. """ def __init__(self, *, divisor, divisor_bits=None, data_bits=8, parity="none", pins=None): - _check_parity(parity) - self._parity = parity + self.Signature.check_parameters(divisor=divisor, divisor_bits=divisor_bits, + data_bits=data_bits, parity=parity) + self._divisor = divisor + self._divisor_bits = divisor_bits if divisor_bits is not None else bits_for(divisor) + self._data_bits = data_bits + self._parity = Parity(parity) + self._pins = pins - # The clock divisor must be at least 5 to keep the FSM synchronized with the serial input - # during a DONE->IDLE->BUSY transition. - _check_divisor(divisor, 5) - self.divisor = Signal(divisor_bits or bits_for(divisor), reset=divisor) + super().__init__() - self.data = Signal(data_bits) - self.err = Record([ - ("overflow", 1), - ("frame", 1), - ("parity", 1), - ]) - self.rdy = Signal() - self.ack = Signal() - - self.i = Signal(reset=1) - - self._pins = pins + @property + def signature(self): + return self.Signature(divisor=self._divisor, divisor_bits=self._divisor_bits, + data_bits=self._data_bits, parity=self._parity) def elaborate(self, platform): m = Module() timer = Signal.like(self.divisor) - shreg = Record(_wire_layout(len(self.data), self._parity)) - bitno = Signal(range(len(shreg))) + shreg = Signal(_FrameLayout(self._data_bits, self._parity)) + bitno = Signal(range(len(shreg.as_value()))) if self._pins is not None: m.submodules += FFSynchronizer(self._pins.rx.i, self.i, reset=1) @@ -113,7 +201,7 @@ def elaborate(self, platform): with m.State("IDLE"): with m.If(~self.i): m.d.sync += [ - bitno.eq(len(shreg) - 1), + bitno.eq(len(shreg.as_value()) - 1), timer.eq(self.divisor >> 1), ] m.next = "BUSY" @@ -123,7 +211,7 @@ def elaborate(self, platform): m.d.sync += timer.eq(timer - 1) with m.Else(): m.d.sync += [ - shreg.eq(Cat(shreg[1:], self.i)), + shreg.eq(Cat(shreg.as_value()[1:], self.i)), bitno.eq(bitno - 1), timer.eq(self.divisor - 1), ] @@ -136,7 +224,7 @@ def elaborate(self, platform): self.data.eq(shreg.data), self.err.frame .eq(~((shreg.start == 0) & (shreg.stop == 1))), self.err.parity.eq(~(shreg.parity == - _compute_parity_bit(shreg.data, self._parity))), + self._parity._compute_bit(shreg.data))), ] m.d.sync += self.err.overflow.eq(~self.ack) m.next = "IDLE" @@ -147,56 +235,149 @@ def elaborate(self, platform): return m -class AsyncSerialTX(Elaboratable): +class AsyncSerialTX(wiring.Component): + class Signature(wiring.Signature): + """Asynchronous serial transmitter signature. + + Parameters + ---------- + divisor : int + Clock divisor initial value. Should be set to ``int(clk_frequency // baudrate)``. + divisor_bits : int + Clock divisor width. Optional. If omitted, ``bits_for(divisor)`` is used instead. + data_bits : int + Data bits per frame. + parity : :class:`Parity` + Parity mode. + + Interface attributes + -------------------- + divisor : Signal, in + Clock divisor. + data : Signal, in + Write data. Valid only when ``ack`` is asserted. + rdy : Signal, out + Write ready. Asserted when the transmitter is ready to transmit data. + ack : Signal, in + Write strobe. Data gets transmitted when both ``rdy`` and ``ack`` are asserted. + o : Signal, out + Serial output. + + Raises + ------ + See :meth:`AsyncSerialTX.Signature.check_parameters`. + """ + def __init__(self, *, divisor, divisor_bits=None, data_bits=8, parity="none"): + self.check_parameters(divisor=divisor, divisor_bits=divisor_bits, data_bits=data_bits, + parity=parity) + self._divisor = divisor + self._divisor_bits = divisor_bits if divisor_bits is not None else bits_for(divisor) + self._data_bits = data_bits + self._parity = Parity(parity) + + super().__init__({ + "divisor": In(unsigned(self._divisor_bits), reset=self._divisor), + "data": In(unsigned(self._data_bits)), + "rdy": Out(unsigned(1)), + "ack": In(unsigned(1)), + "o": Out(unsigned(1), reset=1), + }) + + @classmethod + def check_parameters(cls, *, divisor, divisor_bits=None, data_bits=8, parity="none"): + """Validate signature parameters. + + Raises + ------ + :exc:`TypeError` + If ``divisor`` is not an integer greater than or equal to 1. + :exc:`TypeError` + If ``divisor_bits`` is not `None` and not an integer greater than or equal to + ``bits_for(divisor)``. + :exc:`TypeError` + If ``data_bits`` is not an integer greater than or equal to 0. + :exc:`ValueError` + If ``parity`` is not a :class:`Parity` member. + """ + AsyncSerial.Signature._check_divisor(divisor, divisor_bits, min_divisor=1) + if not isinstance(data_bits, int) or data_bits < 0: + raise TypeError(f"Data bits must be a non-negative integer, not {data_bits!r}") + # Raise a ValueError if parity is invalid. + Parity(parity) + + @property + def divisor(self): + return self._divisor + + @property + def divisor_bits(self): + return self._divisor_bits + + @property + def data_bits(self): + return self._data_bits + + @property + def parity(self): + return self._parity + + def __eq__(self, other): + """Compare signatures. + + Two signatures are equal if they have the same divisor value, divisor bits, + data bits, and parity. + """ + return (isinstance(other, AsyncSerialTX.Signature) and + self.divisor == other.divisor and + self.divisor_bits == other.divisor_bits and + self.data_bits == other.data_bits and + self.parity == other.parity) + + def __repr__(self): + return f"AsyncSerialTX.Signature({self.members!r})" + """Asynchronous serial transmitter. Parameters ---------- divisor : int - Clock divisor reset value. Should be set to ``int(clk_frequency // baudrate)``. + Clock divisor initial value. Should be set to ``int(clk_frequency // baudrate)``. divisor_bits : int - Optional. Clock divisor width. If omitted, ``bits_for(divisor)`` is used instead. + Clock divisor width. Optional. If omitted, ``bits_for(divisor)`` is used instead. data_bits : int - Data width. - parity : ``"none"``, ``"mark"``, ``"space"``, ``"even"``, ``"odd"`` + Data bits per frame. + parity : :class:`Parity` Parity mode. pins : :class:`amaranth.lib.io.Pin` - Optional. UART pins. See :class:`amaranth_boards.resources.UARTResource` for layout. + UART pins. Optional. See :class:`amaranth_boards.resources.UARTResource` for layout. + If provided, the ``o`` port of the transmitter is internally connected to ``pins.tx.o``. - Attributes - ---------- - divisor : Signal, in - Clock divisor. - data : Signal, in - Write data. Valid only when ``ack`` is asserted. - rdy : Signal, out - Write ready. Asserted when the transmitter is ready to transmit data. - ack : Signal, in - Write strobe. Data gets transmitted when both ``rdy`` and ``ack`` are asserted. - o : Signal, out - Serial output. If ``pins`` has been specified, it drives ``pins.tx.o``. + Raises + ------ + See :class:`AsyncSerialTX.Signature.check_parameters`. """ def __init__(self, *, divisor, divisor_bits=None, data_bits=8, parity="none", pins=None): - _check_parity(parity) - self._parity = parity - - _check_divisor(divisor, 1) - self.divisor = Signal(divisor_bits or bits_for(divisor), reset=divisor) + self.Signature.check_parameters(divisor=divisor, divisor_bits=divisor_bits, + data_bits=data_bits, parity=parity) + self._divisor = divisor + self._divisor_bits = divisor_bits if divisor_bits is not None else bits_for(divisor) + self._data_bits = data_bits + self._parity = Parity(parity) + self._pins = pins - self.data = Signal(data_bits) - self.rdy = Signal() - self.ack = Signal() + super().__init__() - self.o = Signal(reset=1) - - self._pins = pins + @property + def signature(self): + return self.Signature(divisor=self._divisor, divisor_bits=self._divisor_bits, + data_bits=self._data_bits, parity=self._parity) def elaborate(self, platform): m = Module() timer = Signal.like(self.divisor) - shreg = Record(_wire_layout(len(self.data), self._parity)) - bitno = Signal(range(len(shreg))) + shreg = Signal(_FrameLayout(len(self.data), self._parity)) + bitno = Signal(range(len(shreg.as_value()))) if self._pins is not None: m.d.comb += self._pins.tx.o.eq(self.o) @@ -208,9 +389,9 @@ def elaborate(self, platform): m.d.sync += [ shreg.start .eq(0), shreg.data .eq(self.data), - shreg.parity.eq(_compute_parity_bit(self.data, self._parity)), + shreg.parity.eq(self._parity._compute_bit(self.data)), shreg.stop .eq(1), - bitno.eq(len(shreg) - 1), + bitno.eq(len(shreg.as_value()) - 1), timer.eq(self.divisor - 1), ] m.next = "BUSY" @@ -230,43 +411,170 @@ def elaborate(self, platform): return m -class AsyncSerial(Elaboratable): +class AsyncSerial(wiring.Component): + class Signature(wiring.Signature): + """Asynchronous serial transceiver signature. + + Parameters + ---------- + divisor : int + Clock divisor initial value. Should be set to ``int(clk_frequency // baudrate)``. + divisor_bits : int + Clock divisor width. Optional. If omitted, ``bits_for(divisor)`` is used instead. + data_bits : int + Data bits per frame. + parity : :class:`Parity` + Parity mode. + + Interface attributes + -------------------- + divisor : Signal, in + Clock divisor. It is internally connected to ``rx.divisor`` and ``tx.divisor``. + rx : :class:`wiring.Interface` + Receiver interface. See :class:`AsyncSerialRX.Signature`. + tx : :class:`wiring.Interface` + Transmitter interface. See :class:`AsyncSerialTX.Signature`. + + Raises + ------ + See :meth:`AsyncSerial.Signature.check_parameters`. + """ + def __init__(self, *, divisor, divisor_bits=None, data_bits=8, parity="none"): + rx_sig = AsyncSerialRX.Signature(divisor=divisor, divisor_bits=divisor_bits, + data_bits=data_bits, parity=parity) + tx_sig = AsyncSerialTX.Signature(divisor=divisor, divisor_bits=divisor_bits, + data_bits=data_bits, parity=parity) + + assert rx_sig.members["divisor"] == tx_sig.members["divisor"] + divisor_shape = rx_sig.members["divisor"].shape + divisor_reset = rx_sig.members["divisor"].reset + + super().__init__({ + "divisor": In(divisor_shape, reset=divisor_reset), + "rx": Out(rx_sig), + "tx": Out(tx_sig), + }) + + @classmethod + def _check_divisor(cls, divisor, divisor_bits, min_divisor=1): + if not isinstance(divisor, int) or divisor < min_divisor: + raise TypeError(f"Divisor initial value must be an integer greater than or equal " + f"to {min_divisor}, not {divisor!r}") + if divisor_bits is not None: + min_divisor_bits = bits_for(divisor) + if not isinstance(divisor_bits, int) or divisor_bits < min_divisor_bits: + raise TypeError(f"Divisor bits must be an integer greater than or equal to " + f"{min_divisor_bits}, not {divisor_bits!r}") + + @classmethod + def check_parameters(cls, *, divisor, divisor_bits=None, data_bits=8, parity="none"): + """Validate signature parameters. + + Raises + ------ + :exc:`TypeError` + If ``divisor`` is not an integer greater than or equal to 5. + :exc:`TypeError` + If ``divisor_bits`` is not `None` and not an integer greater than or equal to + ``bits_for(divisor)``. + :exc:`TypeError` + If ``data_bits`` is not an integer greater than or equal to 0. + :exc:`ValueError` + If ``parity`` is not a :class:`Parity` member. + """ + AsyncSerialRX.Signature.check_parameters(divisor=divisor, divisor_bits=divisor_bits, + data_bits=data_bits, parity=parity) + AsyncSerialTX.Signature.check_parameters(divisor=divisor, divisor_bits=divisor_bits, + data_bits=data_bits, parity=parity) + + @property + def divisor(self): + return self.members["rx"].signature.divisor + + @property + def divisor_bits(self): + return self.members["rx"].signature.divisor_bits + + @property + def data_bits(self): + return self.members["rx"].signature.data_bits + + @property + def parity(self): + return self.members["rx"].signature.parity + + def __eq__(self, other): + """Compare signatures. + + Two signatures are equal if they have the same divisor value, divisor bits, + data bits, and parity. + """ + return (isinstance(other, AsyncSerial.Signature) and + self.divisor == other.divisor and + self.divisor_bits == other.divisor_bits and + self.data_bits == other.data_bits and + self.parity == other.parity) + + def __repr__(self): + return f"AsyncSerial.Signature({self.members!r})" + """Asynchronous serial transceiver. Parameters ---------- divisor : int - Clock divisor reset value. Should be set to ``int(clk_frequency // baudrate)``. + Clock divisor initial value. Should be set to ``int(clk_frequency // baudrate)``. divisor_bits : int - Optional. Clock divisor width. If omitted, ``bits_for(divisor)`` is used instead. + Clock divisor width. Optional. If omitted, ``bits_for(divisor)`` is used instead. data_bits : int - Data width. - parity : ``"none"``, ``"mark"``, ``"space"``, ``"even"``, ``"odd"`` + Data bits per frame. + parity : :class:`Parity` Parity mode. pins : :class:`amaranth.lib.io.Pin` - Optional. UART pins. See :class:`amaranth_boards.resources.UARTResource` for layout. + UART pins. Optional. See :class:`amaranth_boards.resources.UARTResource` for layout. + If provided, the ``rx.i`` and ``tx.o`` ports of the transceiver are internally connected + to ``pins.rx.i`` and ``pins.tx.o``, respectively. - Attributes - ---------- - divisor : Signal, in - Clock divisor. - rx : :class:`AsyncSerialRX` - See :class:`AsyncSerialRX`. - tx : :class:`AsyncSerialTX` - See :class:`AsyncSerialTX`. + Raises + ------ + See :meth:`AsyncSerial.Signature.check_parameters`. """ - def __init__(self, *, divisor, divisor_bits=None, **kwargs): - self.divisor = Signal(divisor_bits or bits_for(divisor), reset=divisor) + def __init__(self, *, divisor, divisor_bits=None, data_bits=8, parity="none", pins=None): + self.Signature.check_parameters(divisor=divisor, divisor_bits=divisor_bits, + data_bits=data_bits, parity=parity) + self._divisor = divisor + self._divisor_bits = divisor_bits if divisor_bits is not None else bits_for(divisor) + self._data_bits = data_bits + self._parity = Parity(parity) + self._pins = pins + + super().__init__() - self.rx = AsyncSerialRX(divisor=divisor, divisor_bits=divisor_bits, **kwargs) - self.tx = AsyncSerialTX(divisor=divisor, divisor_bits=divisor_bits, **kwargs) + @property + def signature(self): + return self.Signature(divisor=self._divisor, divisor_bits=self._divisor_bits, + data_bits=self._data_bits, parity=self._parity) def elaborate(self, platform): m = Module() - m.submodules.rx = self.rx - m.submodules.tx = self.tx + + rx = AsyncSerialRX(divisor=self._divisor, divisor_bits=self._divisor_bits, + data_bits=self._data_bits, parity=self._parity) + tx = AsyncSerialTX(divisor=self._divisor, divisor_bits=self._divisor_bits, + data_bits=self._data_bits, parity=self._parity) + m.submodules.rx = rx + m.submodules.tx = tx + m.d.comb += [ self.rx.divisor.eq(self.divisor), self.tx.divisor.eq(self.divisor), ] + + if self._pins is not None: + m.submodules += FFSynchronizer(self._pins.rx.i, self.rx.i, reset=1) + m.d.comb += self._pins.tx.o.eq(self.tx.o) + + connect(m, flipped(self.rx), rx) + connect(m, flipped(self.tx), tx) + return m diff --git a/tests/test_serial.py b/tests/test_serial.py index ca4f83e..a5ca755 100644 --- a/tests/test_serial.py +++ b/tests/test_serial.py @@ -1,10 +1,12 @@ # amaranth: UnusedElaboratable=no -import unittest +from unittest import TestCase from amaranth import * +from amaranth.lib.data import StructLayout from amaranth.lib.fifo import SyncFIFO from amaranth.lib.io import pin_layout +from amaranth.lib.wiring import * from amaranth.sim import * from amaranth_stdio.serial import * @@ -18,21 +20,114 @@ def simulation_test(dut, process): sim.run() -class AsyncSerialRXTestCase(unittest.TestCase): +class _DummyPins: + def __init__(self): + self.rx = Signal(StructLayout({"i": 1}), reset={"i": 1}) + self.tx = Signal(StructLayout({"o": 1}), reset={"o": 1}) + + +class AsyncSerialRXSignatureTestCase(TestCase): + def test_simple(self): + sig = AsyncSerialRX.Signature(divisor=10, divisor_bits=8, data_bits=7, parity="even") + self.assertEqual(sig.divisor, 10) + self.assertEqual(sig.divisor_bits, 8) + self.assertEqual(sig.data_bits, 7) + self.assertEqual(sig.parity, Parity.EVEN) + self.assertEqual(sig.members, Signature({ + "divisor": In(unsigned(8), reset=10), + "data": Out(unsigned(7)), + "err": Out(StructLayout({"overflow": 1, "frame": 1, "parity": 1})), + "rdy": Out(unsigned(1)), + "ack": In(unsigned(1)), + "i": In(unsigned(1), reset=1), + }).members) + + def test_defaults(self): + sig = AsyncSerialRX.Signature(divisor=10) + self.assertEqual(sig.divisor, 10) + self.assertEqual(sig.divisor_bits, 4) + self.assertEqual(sig.data_bits, 8) + self.assertEqual(sig.parity, Parity.NONE) + + def test_eq(self): + self.assertEqual(AsyncSerialRX.Signature(divisor=10, divisor_bits=8, data_bits=7, parity="even"), + AsyncSerialRX.Signature(divisor=10, divisor_bits=8, data_bits=7, parity="even")) + # different divisor + self.assertNotEqual(AsyncSerialRX.Signature(divisor=10, divisor_bits=8, data_bits=7, parity="even"), + AsyncSerialRX.Signature(divisor= 8, divisor_bits=8, data_bits=7, parity="even")) + # different divisor_bits + self.assertNotEqual(AsyncSerialRX.Signature(divisor=10, divisor_bits=8, data_bits=7, parity="even"), + AsyncSerialRX.Signature(divisor=10, divisor_bits=4, data_bits=7, parity="even")) + # different data_bits + self.assertNotEqual(AsyncSerialRX.Signature(divisor=10, divisor_bits=8, data_bits=7, parity="even"), + AsyncSerialRX.Signature(divisor=10, divisor_bits=8, data_bits=8, parity="even")) + # different parity + self.assertNotEqual(AsyncSerialRX.Signature(divisor=10, divisor_bits=8, data_bits=8, parity="even"), + AsyncSerialRX.Signature(divisor=10, divisor_bits=8, data_bits=8, parity="none")) + + def test_repr(self): + sig = AsyncSerialRX.Signature(divisor=10, divisor_bits=8, data_bits=7, parity="even") + self.assertEqual(repr(sig), "AsyncSerialRX.Signature(SignatureMembers({" + "'divisor': In(unsigned(8), reset=10), " + "'data': Out(unsigned(7)), " + "'err': Out(StructLayout({" + "'overflow': 1, " + "'frame': 1, " + "'parity': 1})), " + "'rdy': Out(unsigned(1)), " + "'ack': In(unsigned(1)), " + "'i': In(unsigned(1), reset=1)}))") + + def test_wrong_divisor(self): + with self.assertRaisesRegex(TypeError, + r"Divisor initial value must be an integer greater than or equal to 5, not 4"): + AsyncSerialRX.Signature(divisor=4) + with self.assertRaisesRegex(TypeError, + r"Divisor initial value must be an integer greater than or equal to 5, not 4"): + AsyncSerialRX.Signature.check_parameters(divisor=4) + + def test_wrong_divisor_bits(self): + with self.assertRaisesRegex(TypeError, + r"Divisor bits must be an integer greater than or equal to 4, not 3"): + AsyncSerialRX.Signature(divisor=8, divisor_bits=3) + with self.assertRaisesRegex(TypeError, + r"Divisor bits must be an integer greater than or equal to 4, not 3"): + AsyncSerialRX.Signature.check_parameters(divisor=8, divisor_bits=3) + + def test_wrong_data_bits(self): + with self.assertRaisesRegex(TypeError, + r"Data bits must be a non-negative integer, not -1"): + AsyncSerialRX.Signature(divisor=10, data_bits=-1) + with self.assertRaisesRegex(TypeError, + r"Data bits must be a non-negative integer, not -1"): + AsyncSerialRX.Signature.check_parameters(divisor=10, data_bits=-1) + + def test_wrong_parity(self): + with self.assertRaisesRegex(ValueError, r"'foo' is not a valid Parity"): + AsyncSerialRX.Signature(divisor=10, parity="foo") + with self.assertRaisesRegex(ValueError, r"'foo' is not a valid Parity"): + AsyncSerialRX.Signature.check_parameters(divisor=10, parity="foo") + + +class AsyncSerialRXTestCase(TestCase): def tx_period(self): for _ in range((yield self.dut.divisor)): yield - def tx_bits(self, bits): + def tx_bits(self, bits, pins=None): + if pins is not None: + rx_i = pins.rx.i + else: + rx_i = self.dut.i for bit in bits: yield from self.tx_period() - yield self.dut.i.eq(bit) + yield rx_i.eq(bit) - def rx_test(self, bits, *, data=None, errors=None): + def rx_test(self, bits, *, data=None, errors=None, pins=None): def process(): self.assertFalse((yield self.dut.rdy)) yield self.dut.ack.eq(1) - yield from self.tx_bits(bits) + yield from self.tx_bits(bits, pins) while not (yield self.dut.rdy): yield if data is not None: @@ -45,15 +140,20 @@ def process(): self.assertTrue((yield getattr(self.dut.err, error))) simulation_test(self.dut, process) - def test_invalid_divisor(self): - with self.assertRaisesRegex(ValueError, - r"Invalid divisor 1; must be greater than or equal to 5"): - self.dut = AsyncSerialRX(divisor=1) - - def test_invalid_parity(self): - with self.assertRaisesRegex(ValueError, - r"Invalid parity 'bad'; must be one of none, mark, space, even, odd"): - self.dut = AsyncSerialRX(divisor=5, parity="bad") + def test_signature(self): + dut = AsyncSerialRX(divisor=10, divisor_bits=8, data_bits=7, parity="even") + self.assertIsInstance(dut.signature, AsyncSerialRX.Signature) + self.assertEqual(dut.signature.divisor, 10) + self.assertEqual(dut.signature.divisor_bits, 8) + self.assertEqual(dut.signature.data_bits, 7) + self.assertEqual(dut.signature.parity, Parity.EVEN) + + def test_defaults(self): + dut = AsyncSerialRX(divisor=10) + self.assertEqual(dut.signature.divisor, 10) + self.assertEqual(dut.signature.divisor_bits, 4) + self.assertEqual(dut.signature.data_bits, 8) + self.assertEqual(dut.signature.parity, Parity.NONE) def test_8n1(self): self.dut = AsyncSerialRX(divisor=5, data_bits=8, parity="none") @@ -88,6 +188,11 @@ def test_8o1(self): self.rx_test([0, 1,0,1,0,1,1,0,0, 1, 1], data=0b00110101) self.rx_test([0, 1,0,1,0,1,1,1,0, 1, 1], errors={"parity"}) + def test_8n1_pins(self): + pins = _DummyPins() + self.dut = AsyncSerialRX(divisor=5, data_bits=8, parity="none", pins=pins) + self.rx_test([0, 1,0,1,0,1,1,1,0, 1], data=0b01110101, pins=pins) + def test_err_frame(self): self.dut = AsyncSerialRX(divisor=5) self.rx_test([0, 0,0,0,0,0,0,0,0, 0], errors={"frame"}) @@ -130,12 +235,94 @@ def process(): simulation_test(m, process) -class AsyncSerialTXTestCase(unittest.TestCase): +class AsyncSerialTXSignatureTestCase(TestCase): + def test_simple(self): + sig = AsyncSerialTX.Signature(divisor=10, divisor_bits=8, data_bits=7, parity="even") + self.assertEqual(sig.divisor, 10) + self.assertEqual(sig.divisor_bits, 8) + self.assertEqual(sig.data_bits, 7) + self.assertEqual(sig.parity, Parity.EVEN) + self.assertEqual(sig.members, Signature({ + "divisor": In(unsigned(8), reset=10), + "data": In(unsigned(7)), + "rdy": Out(unsigned(1)), + "ack": In(unsigned(1)), + "o": Out(unsigned(1), reset=1), + }).members) + + def test_defaults(self): + sig = AsyncSerialTX.Signature(divisor=10) + self.assertEqual(sig.divisor, 10) + self.assertEqual(sig.divisor_bits, 4) + self.assertEqual(sig.data_bits, 8) + self.assertEqual(sig.parity, Parity.NONE) + + def test_eq(self): + self.assertEqual(AsyncSerialTX.Signature(divisor=10, divisor_bits=8, data_bits=7, parity="even"), + AsyncSerialTX.Signature(divisor=10, divisor_bits=8, data_bits=7, parity="even")) + # different divisor + self.assertNotEqual(AsyncSerialTX.Signature(divisor=10, divisor_bits=8, data_bits=7, parity="even"), + AsyncSerialTX.Signature(divisor= 8, divisor_bits=8, data_bits=7, parity="even")) + # different divisor_bits + self.assertNotEqual(AsyncSerialTX.Signature(divisor=10, divisor_bits=8, data_bits=7, parity="even"), + AsyncSerialTX.Signature(divisor=10, divisor_bits=4, data_bits=7, parity="even")) + # different data_bits + self.assertNotEqual(AsyncSerialTX.Signature(divisor=10, divisor_bits=8, data_bits=7, parity="even"), + AsyncSerialTX.Signature(divisor=10, divisor_bits=8, data_bits=8, parity="even")) + # different parity + self.assertNotEqual(AsyncSerialTX.Signature(divisor=10, divisor_bits=8, data_bits=8, parity="even"), + AsyncSerialTX.Signature(divisor=10, divisor_bits=8, data_bits=8, parity="none")) + + def test_repr(self): + sig = AsyncSerialTX.Signature(divisor=10, divisor_bits=8, data_bits=7, parity="even") + self.assertEqual(repr(sig), "AsyncSerialTX.Signature(SignatureMembers({" + "'divisor': In(unsigned(8), reset=10), " + "'data': In(unsigned(7)), " + "'rdy': Out(unsigned(1)), " + "'ack': In(unsigned(1)), " + "'o': Out(unsigned(1), reset=1)}))") + + def test_wrong_divisor(self): + with self.assertRaisesRegex(TypeError, + r"Divisor initial value must be an integer greater than or equal to 1, not 0"): + AsyncSerialTX.Signature(divisor=0) + with self.assertRaisesRegex(TypeError, + r"Divisor initial value must be an integer greater than or equal to 1, not 0"): + AsyncSerialTX.Signature.check_parameters(divisor=0) + + def test_wrong_divisor_bits(self): + with self.assertRaisesRegex(TypeError, + r"Divisor bits must be an integer greater than or equal to 4, not 3"): + AsyncSerialTX.Signature(divisor=8, divisor_bits=3) + with self.assertRaisesRegex(TypeError, + r"Divisor bits must be an integer greater than or equal to 4, not 3"): + AsyncSerialTX.Signature.check_parameters(divisor=8, divisor_bits=3) + + def test_wrong_data_bits(self): + with self.assertRaisesRegex(TypeError, + r"Data bits must be a non-negative integer, not -1"): + AsyncSerialTX.Signature(divisor=10, data_bits=-1) + with self.assertRaisesRegex(TypeError, + r"Data bits must be a non-negative integer, not -1"): + AsyncSerialTX.Signature.check_parameters(divisor=10, data_bits=-1) + + def test_wrong_parity(self): + with self.assertRaisesRegex(ValueError, r"'foo' is not a valid Parity"): + AsyncSerialTX.Signature(divisor=10, parity="foo") + with self.assertRaisesRegex(ValueError, r"'foo' is not a valid Parity"): + AsyncSerialTX.Signature.check_parameters(divisor=10, parity="foo") + + +class AsyncSerialTXTestCase(TestCase): def tx_period(self): for _ in range((yield self.dut.divisor)): yield - def tx_test(self, data, *, bits): + def tx_test(self, data, *, bits, pins=None): + if pins is not None: + tx_o = pins.tx.o + else: + tx_o = self.dut.o def process(): self.assertTrue((yield self.dut.rdy)) yield self.dut.data.eq(data) @@ -144,18 +331,23 @@ def process(): yield for bit in bits: yield from self.tx_period() - self.assertEqual((yield self.dut.o), bit) + self.assertEqual((yield tx_o), bit) simulation_test(self.dut, process) - def test_invalid_divisor(self): - with self.assertRaisesRegex(ValueError, - r"Invalid divisor 0; must be greater than or equal to 1"): - self.dut = AsyncSerialTX(divisor=0) - - def test_invalid_parity(self): - with self.assertRaisesRegex(ValueError, - r"Invalid parity 'bad'; must be one of none, mark, space, even, odd"): - self.dut = AsyncSerialTX(divisor=1, parity="bad") + def test_signature(self): + dut = AsyncSerialTX(divisor=10, divisor_bits=8, data_bits=7, parity="even") + self.assertIsInstance(dut.signature, AsyncSerialTX.Signature) + self.assertEqual(dut.signature.divisor, 10) + self.assertEqual(dut.signature.divisor_bits, 8) + self.assertEqual(dut.signature.data_bits, 7) + self.assertEqual(dut.signature.parity, Parity.EVEN) + + def test_defaults(self): + dut = AsyncSerialTX(divisor=10) + self.assertEqual(dut.signature.divisor, 10) + self.assertEqual(dut.signature.divisor_bits, 4) + self.assertEqual(dut.signature.data_bits, 8) + self.assertEqual(dut.signature.parity, Parity.NONE) def test_8n1(self): self.dut = AsyncSerialTX(divisor=1, data_bits=8, parity="none") @@ -185,6 +377,11 @@ def test_8o1(self): self.tx_test(0b01110101, bits=[0, 1,0,1,0,1,1,1,0, 0, 1]) self.tx_test(0b00110101, bits=[0, 1,0,1,0,1,1,0,0, 1, 1]) + def test_8n1_pins(self): + pins = _DummyPins() + self.dut = AsyncSerialTX(divisor=1, data_bits=8, parity="none", pins=pins) + self.tx_test(0b01110101, bits=[0, 1,0,1,0,1,1,1,0, 1], pins=pins) + def test_fifo(self): self.dut = AsyncSerialTX(divisor=1) self.fifo = SyncFIFO(width=8, depth=4) @@ -216,10 +413,130 @@ def process(): simulation_test(m, process) -class AsyncSerialTestCase(unittest.TestCase): +class AsyncSerialSignatureTestCase(TestCase): + def test_simple(self): + sig = AsyncSerial.Signature(divisor=10, divisor_bits=8, data_bits=7, parity="even") + self.assertEqual(sig.divisor, 10) + self.assertEqual(sig.divisor_bits, 8) + self.assertEqual(sig.data_bits, 7) + self.assertEqual(sig.parity, Parity.EVEN) + self.assertEqual(sig.members, Signature({ + "divisor": In(unsigned(8), reset=10), + "rx": Out(AsyncSerialRX.Signature(divisor=10, divisor_bits=8, data_bits=7, parity="even")), + "tx": Out(AsyncSerialTX.Signature(divisor=10, divisor_bits=8, data_bits=7, parity="even")), + }).members) + + def test_defaults(self): + sig = AsyncSerial.Signature(divisor=10) + self.assertEqual(sig.divisor, 10) + self.assertEqual(sig.divisor_bits, 4) + self.assertEqual(sig.data_bits, 8) + self.assertEqual(sig.parity, Parity.NONE) + + def test_eq(self): + self.assertEqual(AsyncSerial.Signature(divisor=10, divisor_bits=8, data_bits=7, parity="even"), + AsyncSerial.Signature(divisor=10, divisor_bits=8, data_bits=7, parity="even")) + # different divisor + self.assertNotEqual(AsyncSerial.Signature(divisor=10, divisor_bits=8, data_bits=7, parity="even"), + AsyncSerial.Signature(divisor= 8, divisor_bits=8, data_bits=7, parity="even")) + # different divisor_bits + self.assertNotEqual(AsyncSerial.Signature(divisor=10, divisor_bits=8, data_bits=7, parity="even"), + AsyncSerial.Signature(divisor=10, divisor_bits=4, data_bits=7, parity="even")) + # different data_bits + self.assertNotEqual(AsyncSerial.Signature(divisor=10, divisor_bits=8, data_bits=7, parity="even"), + AsyncSerial.Signature(divisor=10, divisor_bits=8, data_bits=8, parity="even")) + # different parity + self.assertNotEqual(AsyncSerial.Signature(divisor=10, divisor_bits=8, data_bits=8, parity="even"), + AsyncSerial.Signature(divisor=10, divisor_bits=8, data_bits=8, parity="none")) + + def test_repr(self): + sig = AsyncSerial.Signature(divisor=10, divisor_bits=8, data_bits=7, parity="even") + self.assertEqual(repr(sig), "AsyncSerial.Signature(SignatureMembers({" + "'divisor': In(unsigned(8), reset=10), " + "'rx': Out(AsyncSerialRX.Signature(SignatureMembers({" + "'divisor': In(unsigned(8), reset=10), " + "'data': Out(unsigned(7)), " + "'err': Out(StructLayout({" + "'overflow': 1, " + "'frame': 1, " + "'parity': 1})), " + "'rdy': Out(unsigned(1)), " + "'ack': In(unsigned(1)), " + "'i': In(unsigned(1), reset=1)}))), " + "'tx': Out(AsyncSerialTX.Signature(SignatureMembers({" + "'divisor': In(unsigned(8), reset=10), " + "'data': In(unsigned(7)), " + "'rdy': Out(unsigned(1)), " + "'ack': In(unsigned(1)), " + "'o': Out(unsigned(1), reset=1)})))}))") + + def test_wrong_divisor(self): + with self.assertRaisesRegex(TypeError, + r"Divisor initial value must be an integer greater than or equal to 5, not 4"): + AsyncSerial.Signature(divisor=4) + with self.assertRaisesRegex(TypeError, + r"Divisor initial value must be an integer greater than or equal to 5, not 4"): + AsyncSerial.Signature.check_parameters(divisor=4) + + def test_wrong_divisor_bits(self): + with self.assertRaisesRegex(TypeError, + r"Divisor bits must be an integer greater than or equal to 4, not 3"): + AsyncSerial.Signature(divisor=8, divisor_bits=3) + with self.assertRaisesRegex(TypeError, + r"Divisor bits must be an integer greater than or equal to 4, not 3"): + AsyncSerial.Signature.check_parameters(divisor=8, divisor_bits=3) + + def test_wrong_data_bits(self): + with self.assertRaisesRegex(TypeError, + r"Data bits must be a non-negative integer, not -1"): + AsyncSerial.Signature(divisor=10, data_bits=-1) + with self.assertRaisesRegex(TypeError, + r"Data bits must be a non-negative integer, not -1"): + AsyncSerial.Signature.check_parameters(divisor=10, data_bits=-1) + + def test_wrong_parity(self): + with self.assertRaisesRegex(ValueError, r"'foo' is not a valid Parity"): + AsyncSerial.Signature(divisor=10, parity="foo") + with self.assertRaisesRegex(ValueError, r"'foo' is not a valid Parity"): + AsyncSerial.Signature.check_parameters(divisor=10, parity="foo") + + +class AsyncSerialTestCase(TestCase): + def test_signature(self): + dut = AsyncSerial(divisor=10, divisor_bits=8, data_bits=7, parity="even") + self.assertIsInstance(dut.signature, AsyncSerial.Signature) + self.assertEqual(dut.signature.divisor, 10) + self.assertEqual(dut.signature.divisor_bits, 8) + self.assertEqual(dut.signature.data_bits, 7) + self.assertEqual(dut.signature.parity, Parity.EVEN) + + def test_defaults(self): + dut = AsyncSerial(divisor=10) + self.assertEqual(dut.signature.divisor, 10) + self.assertEqual(dut.signature.divisor_bits, 4) + self.assertEqual(dut.signature.data_bits, 8) + self.assertEqual(dut.signature.parity, Parity.NONE) + def test_loopback(self): - pins = Record([("rx", pin_layout(1, dir="i")), - ("tx", pin_layout(1, dir="o"))]) + self.dut = AsyncSerial(divisor=5) + m = Module() + m.submodules.serial = self.dut + m.d.comb += self.dut.rx.i.eq(self.dut.tx.o) + def process(): + self.assertTrue((yield self.dut.tx.rdy)) + yield self.dut.tx.data.eq(0xAA) + yield self.dut.tx.ack.eq(1) + yield + yield self.dut.tx.ack.eq(0) + yield self.dut.rx.ack.eq(1) + yield + while not (yield self.dut.rx.rdy): + yield + self.assertEqual((yield self.dut.rx.data), 0xAA) + simulation_test(m, process) + + def test_loopback_pins(self): + pins = _DummyPins() self.dut = AsyncSerial(divisor=5, pins=pins) m = Module() m.submodules.serial = self.dut