From 3d1c611fcc77144659c6e510169d4a6c79792986 Mon Sep 17 00:00:00 2001 From: Rutherther Date: Tue, 31 Dec 2024 23:39:51 +0100 Subject: [PATCH] tests: add spi_peripheral 'application' testcase --- hdl_spi/tests/test_spi_peripheral.py | 100 +++++++++++++++++++++++++-- 1 file changed, 96 insertions(+), 4 deletions(-) diff --git a/hdl_spi/tests/test_spi_peripheral.py b/hdl_spi/tests/test_spi_peripheral.py index 3b1e89f..8b51087 100644 --- a/hdl_spi/tests/test_spi_peripheral.py +++ b/hdl_spi/tests/test_spi_peripheral.py @@ -82,6 +82,7 @@ class DutDriver: (master << CTRL_MASTER) | (tx_en << CTRL_TX_EN) | (rx_en << CTRL_RX_EN) | + (pulse_csn << CTRL_PULSE_CSN) | (clock_polarity << CTRL_CLOCK_POLARITY) | (clock_phase << CTRL_CLOCK_PHASE) | (lsbfirst << CTRL_LSBFIRST) | @@ -106,7 +107,7 @@ class DutDriver: self.dut.write_i.value = 0 def register_interrupt_handler(self, handler): - self.irq_handlers.push(handler) + self.irq_handlers.append(handler) self._handle_interrupt = True async def coroutine(self): @@ -116,13 +117,13 @@ class DutDriver: if not self._handle_interrupt: continue - if self.irq_handlers.len() == 0: + if len(self.irq_handlers) == 0: raise Exception("Got interrupt but there is no irq handler. This would be a dead loop.") # The handlers are called until interrupt goes down! while self.dut.interrupt_o.value == 1: - for irq_handler in irq_handlers: - irq_handler(self) + for irq_handler in self.irq_handlers: + await irq_handler(self) @cocotb.test() async def single_transission(dut): @@ -266,6 +267,97 @@ async def interrupt(dut): assert int(await slave.received_data()) & 0xFF == rx assert int(await slave.received_data()) & 0xFF == rx +@cocotb.test +async def application(dut): + clk = Clock(dut.clk_i, 5, "ns") + interface = SpiInterface(dut.csn_o, dut.sck_o, dut.miso_i, dut.mosi_o) + config = SpiConfig(16, RisingEdge, FallingEdge, 10, "ns", csn_pulse = True) + slave = SpiSlave(interface, config) + driver = DutDriver(dut) + + await cocotb.start(clk.start()) + + await init(dut) + + await cocotb.start(slave.coroutine()) + await cocotb.start(driver.coroutine()) + + await driver.configure( + en = 1, + master = 1, + tx_en = 1, + rx_en = 1, + clock_polarity = 0, + clock_phase = 0, + pulse_csn = 1, + lsbfirst = 0, + size_sel = 1, + div_sel = 0) + + driver.pending_transactions = 0 + dut_received = [] + async def interrupt_handler(driver): + await FallingEdge(driver.dut.clk_i) + status = await driver.read(ADDR_STATUS) + if int(status) & STATUS_RX_BUFFER_FULL != 0: + dut_received.append(await driver.read(ADDR_DATA)) + dut._log.info("Received data") + + if int(status) & STATUS_TX_BUFFER_EMPTY != 0 and driver.pending_transactions > 0: + driver.pending_transactions -= 1 + await driver.write(ADDR_DATA, random.randint(0, 65535)) + dut._log.info("Sent another transaction") + + if int(status) & STATUS_TX_BUFFER_EMPTY != 0 and driver.pending_transactions == 0: + await driver.write(ADDR_INTMASK, INTMASK_RX_BUFFER_FULL) + dut._log.info("Turning off tx buffer empty intmask.") + + driver.register_interrupt_handler(interrupt_handler) + + transmitted = [] + + # Two transactions. + driver.pending_transactions = 2 + for i in range(0, driver.pending_transactions): + data = random.randint(0, 65535) + transmitted.append(data) + await slave.send_data(data, 16) + await slave.expect_transaction_in(50, "ns") + await driver.write(ADDR_INTMASK, INTMASK_RX_BUFFER_FULL | INTMASK_TX_BUFFER_EMPTY) + + await slave.wait_all() + assert driver.pending_transactions == 0 + + # Wait + await Timer(100, "ns") + + # Five transactions + driver.pending_transactions = 5 + for i in range(0, driver.pending_transactions): + data = random.randint(0, 65535) + transmitted.append(data) + await slave.send_data(data, 16) + await slave.expect_transaction_in(50, "ns") + await driver.write(ADDR_INTMASK, INTMASK_RX_BUFFER_FULL | INTMASK_TX_BUFFER_EMPTY) + + await slave.wait_all() + assert driver.pending_transactions == 0 + # Wait + await Timer(100, "ns") + # Ten transactions + driver.pending_transactions = 10 + for i in range(0, driver.pending_transactions): + data = random.randint(0, 65535) + transmitted.append(data) + await slave.send_data(data, 16) + await slave.expect_transaction_in(100, "ns") + await driver.write(ADDR_INTMASK, INTMASK_RX_BUFFER_FULL | INTMASK_TX_BUFFER_EMPTY) + + await slave.wait_all() + assert driver.pending_transactions == 0 + + # Everything handled with interrupts + def spi_peripheral_tests_runner(): hdl_toplevel_lang = "vhdl" -- 2.48.1