@@ 21,14 21,14 @@ ADDR_INTMASK = 1
ADDR_STATUS = 2
ADDR_DATA = 3
-INTMASK_RX_BUFFER_FULL = 0
-INTMASK_TX_BUFFER_EMPTY = 1
-INTMASK_LOST_RX_DATA = 3
+INTMASK_RX_BUFFER_FULL = 1
+INTMASK_TX_BUFFER_EMPTY = 2
+INTMASK_LOST_RX_DATA = 4
STATUS_RX_BUFFER_FULL = 0
-STATUS_TX_BUFFER_EMPTY = 1
-STATUS_BUSY = 2
-STATUS_LOST_RX_DATA = 3
+STATUS_TX_BUFFER_EMPTY = 2
+STATUS_BUSY = 8
+STATUS_LOST_RX_DATA = 16
CTRL_EN = 0
CTRL_MASTER = 1
@@ 65,6 65,7 @@ class DutDriver:
def __init__(self, dut):
self.dut = dut
self.irq_handlers = []
+ self._handle_interrupt = False
async def clear_lost_rx_data(self):
await self.write(ADDR_INTMASK, INTMASK_LOST_RX_DATA)
@@ 88,12 89,13 @@ class DutDriver:
(div_sel << CTRL_DIV_SEL))
async def read(self, address):
- await FallingEdge(self.dut.clk_i)
self.dut.raddress_i.value = address
+ await FallingEdge(self.dut.clk_i)
+ value = self.dut.rdata_o.value
self.dut.read_i.value = 1
await FallingEdge(self.dut.clk_i)
self.dut.read_i.value = 0
- return self.dut.rdata_o.value
+ return value
async def write(self, address, data):
await FallingEdge(self.dut.clk_i)
@@ 105,17 107,22 @@ class DutDriver:
def register_interrupt_handler(self, handler):
self.irq_handlers.push(handler)
+ self._handle_interrupt = True
async def coroutine(self):
- await RisingEdge(self.dut.interrupt_o)
+ while True:
+ await RisingEdge(self.dut.interrupt_o)
- if self.irq_handlers.len() == 0:
- raise Exception("Got interrupt but there is no irq handler. This would be a dead loop.")
+ if not self._handle_interrupt:
+ continue
- # The handlers are called until interrupt goes down!
- while self.dut.interrupt_o.value == 1:
- for irq_handler in irq_handlers:
- irq_handler(self)
+ if self.irq_handlers.len() == 0:
+ raise Exception("Got interrupt but there is no irq handler. This would be a dead loop.")
+
+ # The handlers are called until interrupt goes down!
+ while self.dut.interrupt_o.value == 1:
+ for irq_handler in irq_handlers:
+ irq_handler(self)
@cocotb.test()
async def single_transission(dut):
@@ 165,6 172,100 @@ async def single_transission(dut):
await Timer(100, "ns")
+@cocotb.test
+async def interrupt(dut):
+ clk = Clock(dut.clk_i, 5, "ns")
+ interface = SpiInterface(dut.csn_o, dut.sck_o, dut.miso_i, dut.mosi_o)
+ config = SpiConfig(16, RisingEdge, FallingEdge, 10, "ns")
+ slave = SpiSlave(interface, config)
+ driver = DutDriver(dut)
+
+ await cocotb.start(clk.start())
+
+ await init(dut)
+
+ await cocotb.start(slave.coroutine())
+ await cocotb.start(driver.coroutine())
+
+ await driver.configure(
+ en = 1,
+ master = 1,
+ tx_en = 1,
+ rx_en = 1,
+ clock_polarity = 1,
+ clock_phase = 1,
+ pulse_csn = 0,
+ lsbfirst = 0,
+ size_sel = 0,
+ div_sel = 0)
+
+ rx = random.randint(0, 255)
+ tx = random.randint(0, 255)
+
+ # No interrupt as it's masked
+ assert int(dut.interrupt_o.value) == 0
+ # TX buffer is empty
+ assert int((await driver.read(ADDR_STATUS))) == INTMASK_TX_BUFFER_EMPTY
+ await driver.write(ADDR_INTMASK, INTMASK_RX_BUFFER_FULL | INTMASK_TX_BUFFER_EMPTY)
+ assert int(dut.interrupt_o.value) == 1
+
+ await slave.send_data(tx, 8)
+ await slave.send_data(tx, 8)
+ await slave.expect_transaction_in(50, "ns")
+
+ await driver.write(ADDR_DATA, rx)
+
+ async def check_status(value):
+ assert int((await driver.read(ADDR_STATUS))) == value
+ await cocotb.start(check_status(0))
+
+ await FallingEdge(dut.clk_i)
+
+ assert int(dut.interrupt_o.value) == 0
+
+ timeout = Timer(20, "ns")
+ res = await First(RisingEdge(dut.interrupt_o), timeout)
+
+ if res == timeout:
+ raise Exception("Timeout on interrupt rising")
+
+ # Check that the buffer is empty, as one transmission should be happening
+ assert int((await driver.read(ADDR_STATUS))) == INTMASK_TX_BUFFER_EMPTY
+ await driver.write(ADDR_INTMASK, 0)
+ assert int(dut.interrupt_o.value) == 0
+ assert int((await driver.read(ADDR_STATUS))) == STATUS_TX_BUFFER_EMPTY | STATUS_BUSY
+
+ # Send second data byte
+ await driver.write(ADDR_DATA, rx)
+ assert int(dut.interrupt_o.value) == 0
+ assert int((await driver.read(ADDR_STATUS))) == STATUS_BUSY
+
+ # Now unmask only rx buffer full
+ await driver.write(ADDR_INTMASK, INTMASK_RX_BUFFER_FULL)
+
+ timeout = Timer(50, "ns")
+ res = await First(timeout, RisingEdge(dut.interrupt_o))
+
+ if res == timeout:
+ raise Exception("Timeout on interrupt rising")
+
+ data = await driver.read(ADDR_DATA)
+ assert int(data) & 0xFF == tx
+
+ assert int(dut.interrupt_o.value) == 1
+
+ await FallingEdge(dut.clk_i)
+
+ assert int(dut.interrupt_o.value) == 0
+ assert int((await driver.read(ADDR_STATUS))) == STATUS_TX_BUFFER_EMPTY | STATUS_BUSY
+
+ await slave.wait_all()
+ assert int(dut.interrupt_o.value) == 1
+ assert int((await driver.read(ADDR_STATUS))) == STATUS_TX_BUFFER_EMPTY | STATUS_RX_BUFFER_FULL
+
+ assert int(await slave.received_data()) & 0xFF == rx
+ assert int(await slave.received_data()) & 0xFF == rx
+
def spi_peripheral_tests_runner():
hdl_toplevel_lang = "vhdl"
@@ 206,9 307,9 @@ def spi_peripheral_tests_runner():
build_args=build_args + extra_args,
)
runner.test(
+ testcase = "interrupt",
hdl_toplevel="spi_peripheral", hdl_toplevel_lang=hdl_toplevel_lang,
- test_module="test_spi_peripheral", test_args = extra_args,
- gui = True
+ test_module="test_spi_peripheral", test_args = extra_args, gui = True
)