@@ 82,6 82,7 @@ class DutDriver:
(master << CTRL_MASTER) |
(tx_en << CTRL_TX_EN) |
(rx_en << CTRL_RX_EN) |
+ (pulse_csn << CTRL_PULSE_CSN) |
(clock_polarity << CTRL_CLOCK_POLARITY) |
(clock_phase << CTRL_CLOCK_PHASE) |
(lsbfirst << CTRL_LSBFIRST) |
@@ 106,7 107,7 @@ class DutDriver:
self.dut.write_i.value = 0
def register_interrupt_handler(self, handler):
- self.irq_handlers.push(handler)
+ self.irq_handlers.append(handler)
self._handle_interrupt = True
async def coroutine(self):
@@ 116,13 117,13 @@ class DutDriver:
if not self._handle_interrupt:
continue
- if self.irq_handlers.len() == 0:
+ if len(self.irq_handlers) == 0:
raise Exception("Got interrupt but there is no irq handler. This would be a dead loop.")
# The handlers are called until interrupt goes down!
while self.dut.interrupt_o.value == 1:
- for irq_handler in irq_handlers:
- irq_handler(self)
+ for irq_handler in self.irq_handlers:
+ await irq_handler(self)
@cocotb.test()
async def single_transission(dut):
@@ 266,6 267,97 @@ async def interrupt(dut):
assert int(await slave.received_data()) & 0xFF == rx
assert int(await slave.received_data()) & 0xFF == rx
+@cocotb.test
+async def application(dut):
+ clk = Clock(dut.clk_i, 5, "ns")
+ interface = SpiInterface(dut.csn_o, dut.sck_o, dut.miso_i, dut.mosi_o)
+ config = SpiConfig(16, RisingEdge, FallingEdge, 10, "ns", csn_pulse = True)
+ slave = SpiSlave(interface, config)
+ driver = DutDriver(dut)
+
+ await cocotb.start(clk.start())
+
+ await init(dut)
+
+ await cocotb.start(slave.coroutine())
+ await cocotb.start(driver.coroutine())
+
+ await driver.configure(
+ en = 1,
+ master = 1,
+ tx_en = 1,
+ rx_en = 1,
+ clock_polarity = 0,
+ clock_phase = 0,
+ pulse_csn = 1,
+ lsbfirst = 0,
+ size_sel = 1,
+ div_sel = 0)
+
+ driver.pending_transactions = 0
+ dut_received = []
+ async def interrupt_handler(driver):
+ await FallingEdge(driver.dut.clk_i)
+ status = await driver.read(ADDR_STATUS)
+ if int(status) & STATUS_RX_BUFFER_FULL != 0:
+ dut_received.append(await driver.read(ADDR_DATA))
+ dut._log.info("Received data")
+
+ if int(status) & STATUS_TX_BUFFER_EMPTY != 0 and driver.pending_transactions > 0:
+ driver.pending_transactions -= 1
+ await driver.write(ADDR_DATA, random.randint(0, 65535))
+ dut._log.info("Sent another transaction")
+
+ if int(status) & STATUS_TX_BUFFER_EMPTY != 0 and driver.pending_transactions == 0:
+ await driver.write(ADDR_INTMASK, INTMASK_RX_BUFFER_FULL)
+ dut._log.info("Turning off tx buffer empty intmask.")
+
+ driver.register_interrupt_handler(interrupt_handler)
+
+ transmitted = []
+
+ # Two transactions.
+ driver.pending_transactions = 2
+ for i in range(0, driver.pending_transactions):
+ data = random.randint(0, 65535)
+ transmitted.append(data)
+ await slave.send_data(data, 16)
+ await slave.expect_transaction_in(50, "ns")
+ await driver.write(ADDR_INTMASK, INTMASK_RX_BUFFER_FULL | INTMASK_TX_BUFFER_EMPTY)
+
+ await slave.wait_all()
+ assert driver.pending_transactions == 0
+
+ # Wait
+ await Timer(100, "ns")
+
+ # Five transactions
+ driver.pending_transactions = 5
+ for i in range(0, driver.pending_transactions):
+ data = random.randint(0, 65535)
+ transmitted.append(data)
+ await slave.send_data(data, 16)
+ await slave.expect_transaction_in(50, "ns")
+ await driver.write(ADDR_INTMASK, INTMASK_RX_BUFFER_FULL | INTMASK_TX_BUFFER_EMPTY)
+
+ await slave.wait_all()
+ assert driver.pending_transactions == 0
+ # Wait
+ await Timer(100, "ns")
+ # Ten transactions
+ driver.pending_transactions = 10
+ for i in range(0, driver.pending_transactions):
+ data = random.randint(0, 65535)
+ transmitted.append(data)
+ await slave.send_data(data, 16)
+ await slave.expect_transaction_in(100, "ns")
+ await driver.write(ADDR_INTMASK, INTMASK_RX_BUFFER_FULL | INTMASK_TX_BUFFER_EMPTY)
+
+ await slave.wait_all()
+ assert driver.pending_transactions == 0
+
+ # Everything handled with interrupts
+
def spi_peripheral_tests_runner():
hdl_toplevel_lang = "vhdl"