From bd70a2bb6a3414d329ad8f6b6147507a543d96d7 Mon Sep 17 00:00:00 2001 From: Rutherther Date: Tue, 31 Dec 2024 22:46:17 +0100 Subject: [PATCH] tests: add test for interrupts spi_peripheral --- hdl_spi/tests/test_spi_peripheral.py | 135 +++++++++++++++++++++++---- 1 file changed, 118 insertions(+), 17 deletions(-) diff --git a/hdl_spi/tests/test_spi_peripheral.py b/hdl_spi/tests/test_spi_peripheral.py index 4e55430..3b1e89f 100644 --- a/hdl_spi/tests/test_spi_peripheral.py +++ b/hdl_spi/tests/test_spi_peripheral.py @@ -21,14 +21,14 @@ ADDR_INTMASK = 1 ADDR_STATUS = 2 ADDR_DATA = 3 -INTMASK_RX_BUFFER_FULL = 0 -INTMASK_TX_BUFFER_EMPTY = 1 -INTMASK_LOST_RX_DATA = 3 +INTMASK_RX_BUFFER_FULL = 1 +INTMASK_TX_BUFFER_EMPTY = 2 +INTMASK_LOST_RX_DATA = 4 STATUS_RX_BUFFER_FULL = 0 -STATUS_TX_BUFFER_EMPTY = 1 -STATUS_BUSY = 2 -STATUS_LOST_RX_DATA = 3 +STATUS_TX_BUFFER_EMPTY = 2 +STATUS_BUSY = 8 +STATUS_LOST_RX_DATA = 16 CTRL_EN = 0 CTRL_MASTER = 1 @@ -65,6 +65,7 @@ class DutDriver: def __init__(self, dut): self.dut = dut self.irq_handlers = [] + self._handle_interrupt = False async def clear_lost_rx_data(self): await self.write(ADDR_INTMASK, INTMASK_LOST_RX_DATA) @@ -88,12 +89,13 @@ class DutDriver: (div_sel << CTRL_DIV_SEL)) async def read(self, address): - await FallingEdge(self.dut.clk_i) self.dut.raddress_i.value = address + await FallingEdge(self.dut.clk_i) + value = self.dut.rdata_o.value self.dut.read_i.value = 1 await FallingEdge(self.dut.clk_i) self.dut.read_i.value = 0 - return self.dut.rdata_o.value + return value async def write(self, address, data): await FallingEdge(self.dut.clk_i) @@ -105,17 +107,22 @@ class DutDriver: def register_interrupt_handler(self, handler): self.irq_handlers.push(handler) + self._handle_interrupt = True async def coroutine(self): - await RisingEdge(self.dut.interrupt_o) + while True: + await RisingEdge(self.dut.interrupt_o) - if self.irq_handlers.len() == 0: - raise Exception("Got interrupt but there is no irq handler. This would be a dead loop.") + if not self._handle_interrupt: + continue - # The handlers are called until interrupt goes down! - while self.dut.interrupt_o.value == 1: - for irq_handler in irq_handlers: - irq_handler(self) + if self.irq_handlers.len() == 0: + raise Exception("Got interrupt but there is no irq handler. This would be a dead loop.") + + # The handlers are called until interrupt goes down! + while self.dut.interrupt_o.value == 1: + for irq_handler in irq_handlers: + irq_handler(self) @cocotb.test() async def single_transission(dut): @@ -165,6 +172,100 @@ async def single_transission(dut): await Timer(100, "ns") +@cocotb.test +async def interrupt(dut): + clk = Clock(dut.clk_i, 5, "ns") + interface = SpiInterface(dut.csn_o, dut.sck_o, dut.miso_i, dut.mosi_o) + config = SpiConfig(16, RisingEdge, FallingEdge, 10, "ns") + slave = SpiSlave(interface, config) + driver = DutDriver(dut) + + await cocotb.start(clk.start()) + + await init(dut) + + await cocotb.start(slave.coroutine()) + await cocotb.start(driver.coroutine()) + + await driver.configure( + en = 1, + master = 1, + tx_en = 1, + rx_en = 1, + clock_polarity = 1, + clock_phase = 1, + pulse_csn = 0, + lsbfirst = 0, + size_sel = 0, + div_sel = 0) + + rx = random.randint(0, 255) + tx = random.randint(0, 255) + + # No interrupt as it's masked + assert int(dut.interrupt_o.value) == 0 + # TX buffer is empty + assert int((await driver.read(ADDR_STATUS))) == INTMASK_TX_BUFFER_EMPTY + await driver.write(ADDR_INTMASK, INTMASK_RX_BUFFER_FULL | INTMASK_TX_BUFFER_EMPTY) + assert int(dut.interrupt_o.value) == 1 + + await slave.send_data(tx, 8) + await slave.send_data(tx, 8) + await slave.expect_transaction_in(50, "ns") + + await driver.write(ADDR_DATA, rx) + + async def check_status(value): + assert int((await driver.read(ADDR_STATUS))) == value + await cocotb.start(check_status(0)) + + await FallingEdge(dut.clk_i) + + assert int(dut.interrupt_o.value) == 0 + + timeout = Timer(20, "ns") + res = await First(RisingEdge(dut.interrupt_o), timeout) + + if res == timeout: + raise Exception("Timeout on interrupt rising") + + # Check that the buffer is empty, as one transmission should be happening + assert int((await driver.read(ADDR_STATUS))) == INTMASK_TX_BUFFER_EMPTY + await driver.write(ADDR_INTMASK, 0) + assert int(dut.interrupt_o.value) == 0 + assert int((await driver.read(ADDR_STATUS))) == STATUS_TX_BUFFER_EMPTY | STATUS_BUSY + + # Send second data byte + await driver.write(ADDR_DATA, rx) + assert int(dut.interrupt_o.value) == 0 + assert int((await driver.read(ADDR_STATUS))) == STATUS_BUSY + + # Now unmask only rx buffer full + await driver.write(ADDR_INTMASK, INTMASK_RX_BUFFER_FULL) + + timeout = Timer(50, "ns") + res = await First(timeout, RisingEdge(dut.interrupt_o)) + + if res == timeout: + raise Exception("Timeout on interrupt rising") + + data = await driver.read(ADDR_DATA) + assert int(data) & 0xFF == tx + + assert int(dut.interrupt_o.value) == 1 + + await FallingEdge(dut.clk_i) + + assert int(dut.interrupt_o.value) == 0 + assert int((await driver.read(ADDR_STATUS))) == STATUS_TX_BUFFER_EMPTY | STATUS_BUSY + + await slave.wait_all() + assert int(dut.interrupt_o.value) == 1 + assert int((await driver.read(ADDR_STATUS))) == STATUS_TX_BUFFER_EMPTY | STATUS_RX_BUFFER_FULL + + assert int(await slave.received_data()) & 0xFF == rx + assert int(await slave.received_data()) & 0xFF == rx + def spi_peripheral_tests_runner(): hdl_toplevel_lang = "vhdl" @@ -206,9 +307,9 @@ def spi_peripheral_tests_runner(): build_args=build_args + extra_args, ) runner.test( + testcase = "interrupt", hdl_toplevel="spi_peripheral", hdl_toplevel_lang=hdl_toplevel_lang, - test_module="test_spi_peripheral", test_args = extra_args, - gui = True + test_module="test_spi_peripheral", test_args = extra_args, gui = True ) -- 2.48.1