From 9c617e8424dd512cbd89a48ce6066336809be86a Mon Sep 17 00:00:00 2001 From: Rutherther Date: Sat, 28 Dec 2024 20:58:16 +0100 Subject: [PATCH] chore: move spi models to separate file --- hdl_spi/models/__init__.py | 0 hdl_spi/models/spi_models.py | 180 +++++++++++++ hdl_spi/tests/Makefile | 4 +- .../{test.py => test_spi_masterslave.py} | 238 +++++------------- 4 files changed, 244 insertions(+), 178 deletions(-) create mode 100644 hdl_spi/models/__init__.py create mode 100644 hdl_spi/models/spi_models.py rename hdl_spi/tests/{test.py => test_spi_masterslave.py} (65%) diff --git a/hdl_spi/models/__init__.py b/hdl_spi/models/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/hdl_spi/models/spi_models.py b/hdl_spi/models/spi_models.py new file mode 100644 index 0000000..d75ff82 --- /dev/null +++ b/hdl_spi/models/spi_models.py @@ -0,0 +1,180 @@ +import cocotb +import cocotb.handle +from dataclasses import dataclass +from cocotb.queue import Queue +from cocotb.clock import Clock +from cocotb.triggers import Trigger, First, Event, Timer, RisingEdge, FallingEdge + +import logging + +@dataclass +class SpiInterface: + csn: cocotb.handle.LogicObject + sck: cocotb.handle.LogicObject + miso: cocotb.handle.LogicObject + mosi: cocotb.handle.LogicObject + +# class SignalMonitor: +# def expect_change_to_after() -> SignalMonitor: +# def expect_stable_for() -> SignalMonitor: +# def expect_value() -> SignalMonitor: +# def step() -> SignalMonitor: + +# async def coroutine(): + +# class ClockedSignalMonitor: +# def __init__(clock, signal, sampling = RisingEdge): +# self.signal = signal +# self.sampling = sampling +# self.events = [ [] ] + +# def expect_change_to_after() -> SignalMonitor: +# def expect_stable_for() -> SignalMonitor: +# def expect_value() -> SignalMonitor: +# def set_value() -> SignalMonitor: + +# def set_callback() -> SignalMonitor: + +# def step() -> SignalMonitor: +# self.events.append([]) + +# async def coroutine(): +# while True: +# await self.sampling + +# if self.events.len() == 0: +# continue + +# if self.events[0].len() == 0: +# continue + +# events = self.events.pop(0) + +# for event in events: + +@dataclass +class SpiConfig: + transaction_len: int + sampling: Trigger + shifting: Trigger + sck_period: int + sck_period_unit: str + +class SpiSlave: + def __init__(self, spi: SpiInterface, config: SpiConfig): + self._transaction = Event("spi_slave_transaction") + self._idle = Event("spi_idle") + self._wakeup = Event("spi_slave_wakeup") + + self._transaction.clear() + self._idle.clear() + self._wakeup.clear() + + self._log = logging.getLogger("cocotb.SpiSlave") + + self.config = config + self.transactions = Queue() + self.received = Queue() + + self.sck = spi.sck + self.csn = spi.csn + self.tx = spi.miso + self.rx = spi.mosi + + async def send_data(self, data: int, len: int): + item = (data, len) + await self.transactions.put(item) + + async def received_data(self) -> int: + return await self.received.get() + + async def expect_transaction_in(self, max: int, unit: str = "ns"): + self.data = (max, unit) + self._wakeup.set() + + async def wait_one(self): + self._transaction.clear() + await self._transaction.wait() + + async def wait_all(self): + await self._idle.wait() + + async def coroutine(self): + while True: + self._idle.set() + await self._wakeup.wait() + + # first = await First(wakeup, csn_falling) + # if first == csn_falling: + # self.log.error("CSN fell when transaction was unexpected.") + # continue + + self._log.info("Got new transaction expectation.") + + self._wakeup.clear() + max, unit = self.data + self._idle.clear() + + csn_falling = FallingEdge(self.csn) + + first = await First(csn_falling, Timer(max, unit)) + + if first != csn_falling: + self._log.error(f"CSN did not fall in time ({max} {unit})!") + continue + + # csn fell + + first = True + while not self.transactions.empty(): + # expect clock + data, len = await self.transactions.get() + total_len = len + self._transaction.set() + if first: + self.tx.value = cocotb.handle.Force((data >> (total_len - 1)) & 1) + data = data << 1 + + received = 0 + + got_sampling = False + + while len > 0: + sampling = self.config.sampling(self.sck) + shifting = self.config.shifting(self.sck) + timeout = Timer(self.config.sck_period * 4, self.config.sck_period_unit) + res = await First(sampling, shifting, timeout) + + if res == timeout: + self._log.error("Got no sck edge in time!") + continue + + if res == shifting: + if got_sampling or not first: + self.tx.value = cocotb.handle.Force((data >> (total_len - 1)) & 1) + data = data << 1 + elif res == sampling: + got_sampling = True + received = received << 1 + received |= int(self.rx.value) + len -= 1 + + await Timer(1, "ns") + await self.received.put(received) + + self._log.info(f"Received {received}") + first = False + + # now wait for csn rising + timeout = Timer(self.config.sck_period * 2, self.config.sck_period_unit) + csn_rising = RisingEdge(self.csn) + res = await First(csn_rising, timeout) + + if res == timeout: + self._log.error("Got no rising edge on csn") + continue + + self.tx.value = cocotb.handle.Release() + + # good, continue + self._transaction.set() diff --git a/hdl_spi/tests/Makefile b/hdl_spi/tests/Makefile index ef0031a..b1f3a9a 100644 --- a/hdl_spi/tests/Makefile +++ b/hdl_spi/tests/Makefile @@ -15,7 +15,9 @@ GHDL_ARGS= --std=08 TOPLEVEL = spi_masterslave # MODULE is the basename of the Python test file -MODULE = test +MODULE = test_spi_masterslave + +export PYTHONPATH := $(PWD)/../models:$(PYTHONPATH) # include cocotb's make rules to take care of the simulator setup include $(shell cocotb-config --makefiles)/Makefile.sim diff --git a/hdl_spi/tests/test.py b/hdl_spi/tests/test_spi_masterslave.py similarity index 65% rename from hdl_spi/tests/test.py rename to hdl_spi/tests/test_spi_masterslave.py index ac596f4..858502e 100644 --- a/hdl_spi/tests/test.py +++ b/hdl_spi/tests/test_spi_masterslave.py @@ -1,185 +1,20 @@ +import os +import sys +from pathlib import Path +import logging +import random + from dataclasses import dataclass -from cocotb.queue import Queue + import cocotb import cocotb.handle +from cocotb.queue import Queue from cocotb.clock import Clock from cocotb.triggers import Trigger, First, Event, Timer, RisingEdge, FallingEdge -import logging -import random - -@dataclass -class SpiInterface: - csn: cocotb.handle.LogicObject - sck: cocotb.handle.LogicObject - miso: cocotb.handle.LogicObject - mosi: cocotb.handle.LogicObject - -# class SignalMonitor: -# def expect_change_to_after() -> SignalMonitor: -# def expect_stable_for() -> SignalMonitor: -# def expect_value() -> SignalMonitor: -# def step() -> SignalMonitor: - -# async def coroutine(): - -# class ClockedSignalMonitor: -# def __init__(clock, signal, sampling = RisingEdge): -# self.signal = signal -# self.sampling = sampling -# self.events = [ [] ] - -# def expect_change_to_after() -> SignalMonitor: -# def expect_stable_for() -> SignalMonitor: -# def expect_value() -> SignalMonitor: -# def set_value() -> SignalMonitor: - -# def set_callback() -> SignalMonitor: - -# def step() -> SignalMonitor: -# self.events.append([]) - -# async def coroutine(): -# while True: -# await self.sampling - -# if self.events.len() == 0: -# continue - -# if self.events[0].len() == 0: -# continue - -# events = self.events.pop(0) - -# for event in events: - -@dataclass -class SpiConfig: - transaction_len: int - sampling: Trigger - shifting: Trigger - sck_period: int - sck_period_unit: str - -class SpiSlave: - def __init__(self, spi: SpiInterface, config: SpiConfig): - self._transaction = Event("spi_slave_transaction") - self._idle = Event("spi_idle") - self._wakeup = Event("spi_slave_wakeup") - - self._transaction.clear() - self._idle.clear() - self._wakeup.clear() - - self._log = logging.getLogger("cocotb.SpiSlave") - - self.config = config - self.transactions = Queue() - self.received = Queue() - - self.sck = spi.sck - self.csn = spi.csn - self.tx = spi.miso - self.rx = spi.mosi - - async def send_data(self, data: int, len: int): - item = (data, len) - await self.transactions.put(item) - - async def received_data(self) -> int: - return await self.received.get() - - async def expect_transaction_in(self, max: int, unit: str = "ns"): - self.data = (max, unit) - self._wakeup.set() - - async def wait_one(self): - self._transaction.clear() - await self._transaction.wait() - - async def wait_all(self): - await self._idle.wait() - - async def coroutine(self): - while True: - self._idle.set() - await self._wakeup.wait() - - # first = await First(wakeup, csn_falling) - # if first == csn_falling: - # self.log.error("CSN fell when transaction was unexpected.") - # continue - - self._log.info("Got new transaction expectation.") - - self._wakeup.clear() - max, unit = self.data - self._idle.clear() - - csn_falling = FallingEdge(self.csn) - - first = await First(csn_falling, Timer(max, unit)) - - if first != csn_falling: - self._log.error(f"CSN did not fall in time ({max} {unit})!") - continue - - # csn fell - - first = True - while not self.transactions.empty(): - # expect clock - data, len = await self.transactions.get() - total_len = len - self._transaction.set() - if first: - self.tx.value = cocotb.handle.Force((data >> (total_len - 1)) & 1) - data = data << 1 - - received = 0 - - got_sampling = False - - while len > 0: - sampling = self.config.sampling(self.sck) - shifting = self.config.shifting(self.sck) - timeout = Timer(self.config.sck_period * 4, self.config.sck_period_unit) - res = await First(sampling, shifting, timeout) - - if res == timeout: - self._log.error("Got no sck edge in time!") - continue - - if res == shifting: - if got_sampling or not first: - self.tx.value = cocotb.handle.Force((data >> (total_len - 1)) & 1) - data = data << 1 - elif res == sampling: - got_sampling = True - received = received << 1 - received |= int(self.rx.value) - len -= 1 - - await Timer(self.config.sck_period / 4, self.config.sck_period_unit) - await self.received.put(received) - - self._log.info(f"Received {received}") - first = False - - # now wait for csn rising - timeout = Timer(self.config.sck_period * 2, self.config.sck_period_unit) - csn_rising = RisingEdge(self.csn) - res = await First(csn_rising, timeout) - - self._log.error("TEST fs") - if res == timeout: - self._log.error("Got no rising edge on csn") - continue - - self.tx.value = cocotb.handle.Release() - - # good, continue - self._transaction.set() +from cocotb_tools.runner import get_runner +if cocotb.simulator.is_running(): + from spi_models import SpiInterface, SpiConfig, SpiSlave async def init(dut, master: int = 1, tx_en: int = 1): dut._log.info("Init started!") @@ -310,7 +145,7 @@ class DutDriver: @cocotb.test() async def single_transmit(dut): - clk = Clock(dut.clk_i, 5, "ns", impl = "py") + clk = Clock(dut.clk_i, 5, "ns") interface = SpiInterface(dut.csn_io, dut.sck_io, dut.miso_io, dut.mosi_io) config = SpiConfig(8, RisingEdge, FallingEdge, 10, "ns") slave = SpiSlave(interface, config) @@ -507,3 +342,52 @@ async def shifted_inverted_clock(dut): # csn pulse # rx_en off - miso should be ignored, no rx_valid is always 0. Tx works fine # tx_en off - mosi should be Z, tx_ready is always 0. Rx works fine + +def spi_tests_runner(): + hdl_toplevel_lang = "vhdl" + sim = os.getenv("SIM", "questa") + + proj_path = Path(__file__).resolve().parent.parent + # equivalent to setting the PYTHONPATH environment variable + sys.path.append(str(proj_path / "models")) + + sources = [ + proj_path / "src" / "spi_pkg.vhd", + proj_path / "src" / "rs_latch.vhd", + proj_path / "src" / "register.vhd", + proj_path / "src" / "shift_register.vhd", + proj_path / "src" / "spi_clkgen.vhd", + proj_path / "src" / "spi_clkmon.vhd", + proj_path / "src" / "spi_multiplexor.vhd", + proj_path / "src" / "spi_slave_ctrl.vhd", + proj_path / "src" / "spi_master_ctrl.vhd", + proj_path / "src" / "spi_master.vhd", + proj_path / "src" / "spi_masterslave.vhd", + proj_path / "src" / "spi_peripheral.vhd" + ] + + build_args = [] + extra_args = [] + if sim == "ghdl": + extra_args = ["--std=08"] + elif sim == "questa" or sim == "modelsim": + build_args = ["-2008"] + + # equivalent to setting the PYTHONPATH environment variable + sys.path.append(str(proj_path / "tests")) + + runner = get_runner(sim) + runner.build( + sources=sources, + hdl_toplevel="spi_masterslave", + always=True, + build_args=build_args + extra_args, + ) + runner.test( + hdl_toplevel="spi_masterslave", hdl_toplevel_lang=hdl_toplevel_lang, + test_module="test_spi_masterslave", test_args = extra_args + ) + + +if __name__ == "__main__": + spi_tests_runner() -- 2.48.1