Skip to content

Commit

Permalink
Implement RFC 69: Add a lib.io.PortLike object usable in simulation.
Browse files Browse the repository at this point in the history
  • Loading branch information
whitequark committed Jul 19, 2024
1 parent f965624 commit a154b64
Show file tree
Hide file tree
Showing 5 changed files with 744 additions and 65 deletions.
271 changes: 222 additions & 49 deletions amaranth/lib/io.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import enum
import operator
import warnings
from abc import ABCMeta, abstractmethod
from collections.abc import Iterable

Expand All @@ -11,7 +12,7 @@


__all__ = [
"Direction", "PortLike", "SingleEndedPort", "DifferentialPort",
"Direction", "PortLike", "SingleEndedPort", "DifferentialPort", "SimulationPort",
"Buffer", "FFBuffer", "DDRBuffer",
"Pin",
]
Expand Down Expand Up @@ -57,6 +58,12 @@ class PortLike(metaclass=ABCMeta):
:class:`amaranth.hdl.IOPort` is not an instance of :class:`amaranth.lib.io.PortLike`.
"""

# TODO(amaranth-0.6): remove
def __init_subclass__(cls):
if cls.__add__ is PortLike.__add__:
warnings.warn(f"{cls.__module__}.{cls.__qualname__} must override the `__add__` method",
DeprecationWarning, stacklevel=2)

@property
@abstractmethod
def direction(self):
Expand Down Expand Up @@ -108,6 +115,32 @@ def __invert__(self):
"""
raise NotImplementedError # :nocov:

# TODO(amaranth-0.6): make abstract
# @abstractmethod
def __add__(self, other):
"""Concatenates two library I/O ports of the same type.
The direction of the resulting port is:
* The same as the direction of both, if the two ports have the same direction.
* :attr:`Direction.Input` if a bidirectional port is concatenated with an input port.
* :attr:`Direction.Output` if a bidirectional port is concatenated with an output port.
Returns
-------
:py:`type(self)`
A new :py:`type(self)` which contains wires from :py:`self` followed by wires
from :py:`other`, preserving their polarity inversion.
Raises
------
:exc:`ValueError`
If an input port is concatenated with an output port.
:exc:`TypeError`
If :py:`self` and :py:`other` have different types.
"""
raise NotImplementedError # :nocov:


class SingleEndedPort(PortLike):
"""Represents a single-ended library I/O port.
Expand All @@ -124,9 +157,9 @@ class SingleEndedPort(PortLike):
same length as the width of :py:`io`, and the inversion is specified for individual wires.
direction : :class:`Direction` or :class:`str`
Set of allowed buffer directions. A string is converted to a :class:`Direction` first.
If equal to :attr:`Direction.Input` or :attr:`Direction.Output`, this port can only be used
with buffers of matching direction. If equal to :attr:`Direction.Bidir`, this port can be
used with buffers of any direction.
If equal to :attr:`~Direction.Input` or :attr:`~Direction.Output`, this port can only be
used with buffers of matching direction. If equal to :attr:`~Direction.Bidir`, this port
can be used with buffers of any direction.
Attributes
----------
Expand Down Expand Up @@ -176,27 +209,6 @@ def __getitem__(self, index):
direction=self._direction)

def __add__(self, other):
"""Concatenates two single-ended library I/O ports.
The direction of the resulting port is:
* The same as the direction of both, if the two ports have the same direction.
* :attr:`Direction.Input` if a bidirectional port is concatenated with an input port.
* :attr:`Direction.Output` if a bidirectional port is concatenated with an output port.
Returns
-------
:class:`SingleEndedPort`
A new :class:`SingleEndedPort` which contains wires from :py:`self` followed by wires
from :py:`other`, preserving their polarity inversion.
Raises
------
:exc:`ValueError`
If an input port is concatenated with an output port.
:exc:`TypeError`
If :py:`self` and :py:`other` have incompatible types.
"""
if not isinstance(other, SingleEndedPort):
return NotImplemented
return SingleEndedPort(Cat(self._io, other._io), invert=self._invert + other._invert,
Expand Down Expand Up @@ -231,9 +243,9 @@ class DifferentialPort(PortLike):
individual wires.
direction : :class:`Direction` or :class:`str`
Set of allowed buffer directions. A string is converted to a :class:`Direction` first.
If equal to :attr:`Direction.Input` or :attr:`Direction.Output`, this port can only be used
with buffers of matching direction. If equal to :attr:`Direction.Bidir`, this port can be
used with buffers of any direction.
If equal to :attr:`~Direction.Input` or :attr:`~Direction.Output`, this port can only be
used with buffers of matching direction. If equal to :attr:`~Direction.Bidir`, this port
can be used with buffers of any direction.
Attributes
----------
Expand Down Expand Up @@ -293,27 +305,6 @@ def __getitem__(self, index):
direction=self._direction)

def __add__(self, other):
"""Concatenates two differential library I/O ports.
The direction of the resulting port is:
* The same as the direction of both, if the two ports have the same direction.
* :attr:`Direction.Input` if a bidirectional port is concatenated with an input port.
* :attr:`Direction.Output` if a bidirectional port is concatenated with an output port.
Returns
-------
:class:`DifferentialPort`
A new :class:`DifferentialPort` which contains pairs of wires from :py:`self` followed
by pairs of wires from :py:`other`, preserving their polarity inversion.
Raises
------
:exc:`ValueError`
If an input port is concatenated with an output port.
:exc:`TypeError`
If :py:`self` and :py:`other` have incompatible types.
"""
if not isinstance(other, DifferentialPort):
return NotImplemented
return DifferentialPort(Cat(self._p, other._p), Cat(self._n, other._n),
Expand All @@ -331,6 +322,167 @@ def __repr__(self):
f"direction={self._direction})")


class SimulationPort(PortLike):
"""Represents a simulation library I/O port.
Implements the :class:`PortLike` interface.
Parameters
----------
direction : :class:`Direction` or :class:`str`
Set of allowed buffer directions. A string is converted to a :class:`Direction` first.
If equal to :attr:`~Direction.Input` or :attr:`~Direction.Output`, this port can only be
used with buffers of matching direction. If equal to :attr:`~Direction.Bidir`, this port
can be used with buffers of any direction.
width : :class:`int`
Width of the port. The width of each of the attributes :py:`i`, :py:`o`, :py:`oe` (whenever
present) equals :py:`width`.
invert : :class:`bool` or iterable of :class:`bool`
Polarity inversion. If the value is a simple :class:`bool`, it specifies inversion for
the entire port. If the value is an iterable of :class:`bool`, the iterable must have the
same length as the width of :py:`p` and :py:`n`, and the inversion is specified for
individual wires.
name : :class:`str` or :py:`None`
Name of the port. This name is only used to derive the names of the input, output, and
output enable signals.
src_loc_at : :class:`int`
:ref:`Source location <lang-srcloc>`. Used to infer :py:`name` if not specified.
Attributes
----------
i : :class:`Signal`
Input signal. Present if :py:`direction in (Input, Bidir)`.
o : :class:`Signal`
Ouptut signal. Present if :py:`direction in (Output, Bidir)`.
oe : :class:`Signal`
Output enable signal. Present if :py:`direction in (Output, Bidir)`.
invert : :class:`tuple` of :class:`bool`
The :py:`invert` parameter, normalized to specify polarity inversion per-wire.
direction : :class:`Direction`
The :py:`direction` parameter, normalized to the :class:`Direction` enumeration.
"""
def __init__(self, direction, width, *, invert=False, name=None, src_loc_at=0):
if name is not None and not isinstance(name, str):
raise TypeError(f"Name must be a string, not {name!r}")
if name is None:
name = tracer.get_var_name(depth=2 + src_loc_at, default="$port")

if not (isinstance(width, int) and width >= 0):
raise TypeError(f"Width must be a non-negative integer, not {width!r}")

self._direction = Direction(direction)

self._i = self._o = self._oe = None
if self._direction in (Direction.Input, Direction.Bidir):
self._i = Signal(width, name=f"{name}__i")
if self._direction in (Direction.Output, Direction.Bidir):
self._o = Signal(width, name=f"{name}__o")
self._oe = Signal(width, name=f"{name}__oe",
init=~0 if self._direction is Direction.Output else 0)

if isinstance(invert, bool):
self._invert = (invert,) * width
elif isinstance(invert, Iterable):
self._invert = tuple(invert)
if len(self._invert) != width:
raise ValueError(f"Length of 'invert' ({len(self._invert)}) doesn't match "
f"port width ({width})")
if not all(isinstance(item, bool) for item in self._invert):
raise TypeError(f"'invert' must be a bool or iterable of bool, not {invert!r}")
else:
raise TypeError(f"'invert' must be a bool or iterable of bool, not {invert!r}")

@property
def i(self):
if self._i is None:
raise AttributeError(
"Simulation port with output direction does not have an input signal")
return self._i

@property
def o(self):
if self._o is None:
raise AttributeError(
"Simulation port with input direction does not have an output signal")
return self._o

@property
def oe(self):
if self._oe is None:
raise AttributeError(
"Simulation port with input direction does not have an output enable signal")
return self._oe

@property
def invert(self):
return self._invert

@property
def direction(self):
return self._direction

def __len__(self):
if self._direction is Direction.Input:
return len(self._i)
if self._direction is Direction.Output:
assert len(self._o) == len(self._oe)
return len(self._o)
if self._direction is Direction.Bidir:
assert len(self._i) == len(self._o) == len(self._oe)
return len(self._i)
assert False # :nocov:

def __getitem__(self, key):
result = object.__new__(type(self))
result._i = None if self._i is None else self._i [key]
result._o = None if self._o is None else self._o [key]
result._oe = None if self._oe is None else self._oe[key]
if isinstance(key, slice):
result._invert = self._invert[key]
else:
result._invert = (self._invert[key],)
result._direction = self._direction
return result

def __invert__(self):
result = object.__new__(type(self))
result._i = self._i
result._o = self._o
result._oe = self._oe
result._invert = tuple(not invert for invert in self._invert)
result._direction = self._direction
return result

def __add__(self, other):
if not isinstance(other, SimulationPort):
return NotImplemented
direction = self._direction & other._direction
result = object.__new__(type(self))
result._i = None if direction is Direction.Output else Cat(self._i, other._i)
result._o = None if direction is Direction.Input else Cat(self._o, other._o)
result._oe = None if direction is Direction.Input else Cat(self._oe, other._oe)
result._invert = self._invert + other._invert
result._direction = direction
return result

def __repr__(self):
parts = []
if self._i is not None:
parts.append(f"i={self._i!r}")
if self._o is not None:
parts.append(f"o={self._o!r}")
if self._oe is not None:
parts.append(f"oe={self._oe!r}")
if not any(self._invert):
invert = False
elif all(self._invert):
invert = True
else:
invert = self._invert
return (f"SimulationPort({', '.join(parts)}, invert={invert!r}, "
f"direction={self._direction})")


class Buffer(wiring.Component):
"""A combinational I/O buffer component.
Expand Down Expand Up @@ -476,6 +628,18 @@ def elaborate(self, platform):
else:
m.submodules += IOBufferInstance(self._port.p, o=o_inv, oe=self.oe, i=i_inv)
m.submodules += IOBufferInstance(self._port.n, o=~o_inv, oe=self.oe)
elif isinstance(self._port, SimulationPort):
if self.direction is Direction.Bidir:
# Loop back `o` if `oe` is asserted. This frees the test harness from having to
# provide this functionality itself.
for i_inv_bit, oe_bit, o_bit, i_bit in \
zip(i_inv, self._port.oe, self._port.o, self._port.i):
m.d.comb += i_inv_bit.eq(Cat(Mux(oe_bit, o_bit, i_bit)))
if self.direction is Direction.Input:
m.d.comb += i_inv.eq(self._port.i)
if self.direction in (Direction.Output, Direction.Bidir):
m.d.comb += self._port.o.eq(o_inv)
m.d.comb += self._port.oe.eq(self.oe.replicate(len(self._port)))
else:
raise TypeError("Cannot elaborate generic 'Buffer' with port {self._port!r}") # :nocov:

Expand Down Expand Up @@ -719,6 +883,12 @@ class DDRBuffer(wiring.Component):
This limitation may be lifted in the future.
.. warning::
Double data rate I/O buffers are not compatible with :class:`SimulationPort`.
This limitation may be lifted in the future.
Parameters
----------
direction : :class:`Direction`
Expand Down Expand Up @@ -826,6 +996,9 @@ def elaborate(self, platform):
if hasattr(platform, "get_io_buffer"):
return platform.get_io_buffer(self)

if isinstance(self._port, SimulationPort):
raise NotImplementedError(f"DDR buffers are not supported in simulation") # :nocov:

raise NotImplementedError(f"DDR buffers are not supported on {platform!r}") # :nocov:


Expand Down
Loading

0 comments on commit a154b64

Please sign in to comment.