From 94410b0e9f2ca2e0fdf37df9c565150784b99655 Mon Sep 17 00:00:00 2001 From: Rutherther Date: Sat, 28 Dec 2024 16:56:15 +0100 Subject: [PATCH] feat: tests for multiple transmissions, rx lost --- hdl_spi/tests/test.py | 240 +++++++++++++++++++++++++++++++++++++----- 1 file changed, 214 insertions(+), 26 deletions(-) diff --git a/hdl_spi/tests/test.py b/hdl_spi/tests/test.py index 7a75b2c..18244e2 100644 --- a/hdl_spi/tests/test.py +++ b/hdl_spi/tests/test.py @@ -5,6 +5,7 @@ import cocotb.handle from cocotb.clock import Clock from cocotb.triggers import Trigger, First, Event, Timer, RisingEdge, FallingEdge import logging +import random @dataclass class SpiInterface: @@ -154,6 +155,8 @@ class SpiSlave: received = received << 1 received |= int(self.rx.value) len -= 1 + await self.received.put(received) + self.log.info(f"Received {received}") # now wait for csn rising timeout = Timer(self.config.sck_period * 2, self.config.sck_period_unit) @@ -165,8 +168,6 @@ class SpiSlave: continue self.tx.value = cocotb.handle.Release() - await self.received.put(received) - self.log.info(f"Received {received}") # good, continue self._transaction.set() @@ -203,54 +204,241 @@ async def init(dut, master: int = 1, tx_en: int = 1): dut._log.info("Init done!") +class DutDriver: + def __init__(self, dut): + self.dut = dut + self._log = logging.getLogger("DutDriver") + self._received = Queue() + self._sync = Event() + self._auto_receive = False + self._set_ready = False + + async def receive_data(self): + if int(self.dut.rx_valid_o.value) != 1: + self._log.error("RX is not valid when receiving data was requested") + await FallingEdge(self.dut.clk_i) + self.dut.rx_ready_i.value = 1 + data = self.dut.rx_data_o.value + + await FallingEdge(self.dut.clk_i) + if int(self.dut.rx_valid_o.value) != 0: + self._log.error("RX data stayed valid after receiving!") + self.dut.rx_ready_i.value = 0 + + return data + + async def send_data(self, data): + if int(self.dut.tx_ready_o.value) != 1: + self._log.error("TX is not ready when sending data was requested") + await FallingEdge(self.dut.clk_i) + self.dut.tx_valid_i.value = 1 + self.dut.tx_data_i.value = data + await FallingEdge(self.dut.clk_i) + self.dut.tx_valid_i.value = 0 + if int(self.dut.tx_ready_o.value) != 0: + self._log.error("TX stayed ready after requesting send of data!") + + async def send_data_wait(self, data): + await FallingEdge(self.dut.clk_i) + self.dut.tx_valid_i.value = 1 + self.dut.tx_data_i.value = data + clk_falling = FallingEdge(self.dut.clk_i) + tx_ready_falling = FallingEdge(self.dut.tx_ready_o) + res = await First(clk_falling, tx_ready_falling) + + if res == clk_falling: + # TODO timeout + await RisingEdge(self.dut.tx_ready_o) + + await RisingEdge(self.dut.clk_i) + # now the data were registered + # (note that it's ready, not confirmation + # so data are sampled only after it actually is ready + # that means the next clock cycle from when ready went to 1) + + self.dut.tx_valid_i.value = 0 + + async def auto_receive(self, receive: bool = True, set_ready: bool = True): + self._auto_receive = receive + self._set_ready = set_ready + self._sync.set() + + async def received_data(self): + if self._received.empty(): + return None + + return await self._received.get() + + async def coroutine(self): + while True: + if not self._auto_receive: + self.dut.rx_ready_i.value = 0 + await self._sync.wait() + self._sync.clear() + if not self._auto_receive: + continue + + if self._set_ready: + self.dut.rx_ready_i.value = 1 + + await RisingEdge(self.dut.rx_valid_o) + if int(self.dut.rx_valid_o.value) == 1: + await self._received.put(self.dut.rx_data_o.value) + + should_lose_data = not self._set_ready and self.dut.tx_valid_i.value == 1 + await RisingEdge(self.dut.clk_i) + await RisingEdge(self.dut.clk_i) + if should_lose_data and int(self.dut.err_lost_rx_data_o.value) != 1: + self._log.error("Didn't get err lost rx data after rx valid") + + if should_lose_data: + self.dut.clear_lost_rx_data_i.value = 1 + await RisingEdge(self.dut.clk_i) + await FallingEdge(self.dut.clk_i) + self.dut.clear_lost_rx_data_i.value = 0 + if int(self.dut.err_lost_rx_data_o.value) != 0: + self._log.error("Could not clear err lost rx data") + + @cocotb.test() -async def simple_test(dut): - clk = Clock(dut.clk_i, 5, "ns") +async def single_transmit(dut): + clk = Clock(dut.clk_i, 5, "ns", impl = "py") interface = SpiInterface(dut.csn_io, dut.sck_io, dut.miso_io, dut.mosi_io) config = SpiConfig(8, RisingEdge, FallingEdge, 10, "ns") slave = SpiSlave(interface, config) + driver = DutDriver(dut) await cocotb.start(clk.start()) await init(dut) await cocotb.start(slave.coroutine()) + await cocotb.start(driver.coroutine()) - await slave.send_data(123, 8) + # From slave point of view + rx = random.randint(0, 255) + tx = random.randint(0, 255) + + await slave.send_data(tx, 8) await slave.expect_transaction_in(15, "ns") - await FallingEdge(dut.clk_i) - dut.tx_valid_i.value = 1 - dut.tx_data_i.value = 100 + await driver.send_data(rx) - await FallingEdge(dut.clk_i) - dut.tx_valid_i.value = 0 + await slave.wait_all() - await slave.wait_one() + dut_received = await driver.receive_data() + assert int(dut_received) & 0xFF == tx - assert int(dut.rx_valid_o.value) == 1 - assert int(dut.rx_data_o.value) & 0xFF == 123 + # Wait a few clocks, rx data should still stay valid! + await FallingEdge(dut.clk_i) + await FallingEdge(dut.clk_i) + await FallingEdge(dut.clk_i) + await FallingEdge(dut.clk_i) received = await slave.received_data() + assert received & 0xFF == rx + + await Timer(100, "ns") + +@cocotb.test() +async def multiple_transmits(dut): + clk = Clock(dut.clk_i, 5, "ns") + interface = SpiInterface(dut.csn_io, dut.sck_io, dut.miso_io, dut.mosi_io) + config = SpiConfig(8, RisingEdge, FallingEdge, 10, "ns") + slave = SpiSlave(interface, config) + driver = DutDriver(dut) + + await cocotb.start(clk.start()) + + await init(dut) + + await cocotb.start(slave.coroutine()) + await cocotb.start(driver.coroutine()) + + await driver.auto_receive() - assert received & 0xFF == 100 + count = 5 + + tx_data = [random.randint(0, 255) for i in range(count)] + rx_data = [random.randint(0, 255) for i in range(count)] + + for tx in tx_data: + await slave.send_data(tx, 8) + dut._log.info(f"Sending Data from slave: {tx}") + + dut._log.info("To expect transaction") + await slave.expect_transaction_in(15, "ns") + + for rx in rx_data: + dut._log.info(f"Sending Data from master: {rx}") + await driver.send_data_wait(rx) + + await slave.wait_all() + + # Checks + for tx in tx_data: + dut_received = await driver.received_data() + assert int(dut_received) & 0xFF == tx + + for rx in rx_data: + received = await slave.received_data() + assert received & 0xFF == rx + + await Timer(100, "ns") + +@cocotb.test() +async def lost_rx_data(dut): + clk = Clock(dut.clk_i, 5, "ns") + interface = SpiInterface(dut.csn_io, dut.sck_io, dut.miso_io, dut.mosi_io) + config = SpiConfig(8, RisingEdge, FallingEdge, 10, "ns") + slave = SpiSlave(interface, config) + driver = DutDriver(dut) + + await cocotb.start(clk.start()) + + await init(dut) + + await cocotb.start(slave.coroutine()) + await cocotb.start(driver.coroutine()) + + # Do not confirm reception of data. That will auto check + # if lost is asserted and cleared appropriately. + await driver.auto_receive(True, False) + + count = 5 + + tx_data = [random.randint(0, 255) for i in range(count)] + rx_data = [random.randint(0, 255) for i in range(count)] + + for tx in tx_data: + await slave.send_data(tx, 8) + + await slave.expect_transaction_in(15, "ns") + + for rx in rx_data: + await driver.send_data_wait(rx) + + await slave.wait_all() + + # Checks + # Even though the DUT thinks data were lost, they were actually + # read and should still be valid data. + for tx in tx_data: + dut_received = await driver.received_data() + assert int(dut_received) & 0xFF == tx + + for rx in rx_data: + received = await slave.received_data() + assert received & 0xFF == rx await Timer(100, "ns") -# Simple one receive, transmit - # Check csn goes low - # Check 8 sck pulses # All clock phases and polarities # All sizes, divisors # Rx blocking - Can't go to another transmission until data confirmed. # When data read a bit later, and csn pulsing is enabled, the csn should still pulse, before data are obtained -# Multiple times receive, transmit without pulse - # All clock phases and polarities - -# Multiple times receive, transmit with pulse - -# Only transmission - # Rx is not valid -# Only reception - # Z on mosi +# All clock phases and polarities +# csn pulse +# rx_en off - miso should be ignored, no rx_valid is always 0. Tx works fine +# tx_en off - mosi should be Z, tx_ready is always 0. Rx works fine -- 2.48.1