Skip to content

Commit

Permalink
WIP dumped state of PyRTL process
Browse files Browse the repository at this point in the history
  • Loading branch information
bieganski committed Sep 16, 2023
1 parent 2f3c78c commit 05cdcc8
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 23 deletions.
2 changes: 1 addition & 1 deletion mtkcpu/utils/tests/sim_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ def get_state_name(fsm, num):

@dataclass(frozen=True)
class Checkpoint:
deadline : int # must occur before 'deadline' CPU clock cycle
timeout_tck : int
signals : List[Tuple[Signal, Any]] # all sig==Value must hold


Expand Down
85 changes: 63 additions & 22 deletions mtkcpu/utils/tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from dataclasses import dataclass
from enum import Enum, unique
from itertools import count
from typing import Dict, List, Optional, OrderedDict
from typing import Optional, OrderedDict, Tuple
import pytest
from pathlib import Path
import subprocess
Expand Down Expand Up @@ -385,6 +385,7 @@ def cpu_testbench_test(case : CpuTestbenchCase):
dev_mode=False,
with_debug=False,
pc_reset_value=MEM_START_ADDR,
with_virtual_memory=False,
),
mem_config=mem_cfg
)
Expand Down Expand Up @@ -484,12 +485,7 @@ def assert_mem_test(case: MemTestCase):
)


# Returns dict with keys:
# "sim" - Simulator object
# "frag" - Fragment object
# "vcd_traces" - List of JTAG/DM signals to be traced
# "jtag_fsm" - JTAG FSM
def create_jtag_simulator(monitor: DMI_Monitor, cpu: MtkCpu):
def create_jtag_simulator(monitor: DMI_Monitor, cpu: MtkCpu) -> Tuple[Simulator, list[Signal]]:
# cursed stuff for retrieving jtag FSM state for 'traces=vcd_traces' variable
# https://freenode.irclog.whitequark.org/amaranth/2020-07-26#27592720;
frag = Fragment.get(monitor, platform=None)
Expand Down Expand Up @@ -564,11 +560,8 @@ def create_jtag_simulator(monitor: DMI_Monitor, cpu: MtkCpu):
cpu.gprf_debug_data,
cpu.gprf_debug_addr,
]
return {
"sim": sim,
"vcd_traces": vcd_traces,
# "jtag_fsm": jtag_fsm,
}

return sim, vcd_traces


def run_gdb(
Expand Down Expand Up @@ -694,6 +687,7 @@ def assert_jtag_test(
dev_mode=False,
with_debug=True,
pc_reset_value=0x8000,
with_virtual_memory=False,
)
)

Expand Down Expand Up @@ -730,8 +724,7 @@ def run_gdb_when_ocd_ready():

dmi_monitor = DMI_Monitor(cpu=cpu)

sim_gadgets = create_jtag_simulator(dmi_monitor, cpu)
sim, vcd_traces = [sim_gadgets[k] for k in ["sim", "vcd_traces"]]
sim, vcd_traces = create_jtag_simulator(dmi_monitor, cpu)

processes = [
monitor_cmderr(dmi_monitor),
Expand All @@ -745,22 +738,70 @@ def run_gdb_when_ocd_ready():
get_sim_jtag_controller(cpu=cpu, timeout_cycles=timeout_cycles),
monitor_writes_to_dcsr(dmi_monitor=dmi_monitor),
monitor_abstractauto(dmi_monitor=dmi_monitor),
bus_capture_write_transactions(cpu=dmi_monitor.cpu, output_dict=dict()),
# bus_capture_write_transactions(cpu=dmi_monitor.cpu, output_dict=dict()),
]

with_checkpoints = False # XXX

def timeouted(fn, timeout: int):
def aux():
for i, x in enumerate(fn):
if i == timeout:
fn_name = fn if not hasattr(fn, "__name__") else fn.__name__
raise TimeoutError(f"Timeout of {timeout} TCK ticks expired for function '{fn_name}'!")
yield x
return aux

def ckpt_processses_supervisor(active_processes: list, ckpt_processes: list):
def aux():
while True:
if all([x not in active_processes for x in ckpt_processes]):
from pprint import pformat
from inspect import getmembers
l = lambda x: pformat(getmembers(x))
print("NAJS!", [type(x) for x in active_processes])
raise ValueError(f"OK {l(active_processes.pop())}")
return # success! all ckpt checker finished
print("not yet..")
yield
return aux

if with_checkpoints:
processes.append(get_ocd_checkpoint_checker(cpu))
ckpt_processes = bus_capture_write_transactions(cpu=dmi_monitor.cpu, output_dict=dict()),
pass
# raise ValueError("with_checkpts")
processes += ckpt_processes

# [
# timeouted(dmcontrol_written(dmi_monitor), 10)
# ]





# def dmcontrol_written(dmi_monitor: DMI_Monitor):
# yield Passive()
# while True:
# cmd = yield dmi_monitor.cur_COMMAND.control.write
# if cmd:
# raise ValueError("OK")


for p in processes:
sim.add_sync_process(p)

if with_checkpoints:
sim.add_sync_process(ckpt_processses_supervisor(
active_processes=sim._engine._processes,
ckpt_processes=ckpt_processes)
)

with sim.write_vcd("jtag.vcd", "jtag.gtkw", traces=vcd_traces):
sim.run()
while sim.advance():
pass
# sim.run()

@parametrized
def component_testbench(f, cases: List[ComponentTestbenchCase]):
def component_testbench(f, cases: list[ComponentTestbenchCase]):
@pytest.mark.parametrize("test_case", cases)
@rename(f.__name__)
def aux(test_case):
Expand All @@ -770,7 +811,7 @@ def aux(test_case):


@parametrized
def mem_test(f, cases: List[MemTestCase]):
def mem_test(f, cases: list[MemTestCase]):
@pytest.mark.parametrize("test_case", cases)
@rename(f.__name__)
def aux(test_case):
Expand All @@ -779,7 +820,7 @@ def aux(test_case):
return aux

@parametrized
def cpu_testbench(f, cases: List[CpuTestbenchCase]):
def cpu_testbench(f, cases: list[CpuTestbenchCase]):
@pytest.mark.parametrize("test_case", cases)
@rename(f.__name__)
def aux(test_case):
Expand Down

0 comments on commit 05cdcc8

Please sign in to comment.