import os import sys from pathlib import Path import logging import random from dataclasses import dataclass import cocotb import cocotb.handle from cocotb.queue import Queue from cocotb.clock import Clock from cocotb.triggers import Trigger, First, Event, Timer, RisingEdge, Edge, FallingEdge from cocotb_tools.runner import get_runner if cocotb.simulator.is_running(): from spi_models import SpiInterface, SpiConfig, SpiSlave async def init(dut, master: int = 1, tx_en: int = 1): dut._log.info("Init started!") dut.miso_i.value = 0; dut.rst_in.value = 0; dut.lsbfirst_i.value = 0; dut.clock_polarity_i.value = 0; dut.clock_phase_i.value = 0; dut.size_sel_i.value = 0; dut.div_sel_i.value = 0; dut.pulse_csn_i.value = 0; dut.rx_block_on_full_i.value = 0; dut.rx_ready_i.value = 0; dut.tx_valid_i.value = 0; dut.clear_lost_rx_data_i.value = 0; dut.rx_en_i.value = 1; dut.tx_en_i.value = 1; dut.master_i.value = 1; dut.en_i.value = 1; await FallingEdge(dut.clk_i) await FallingEdge(dut.clk_i) # Release reset dut.rst_in.value = 1; await FallingEdge(dut.clk_i) await FallingEdge(dut.clk_i) dut._log.info("Init done!") class DutDriver: def __init__(self, dut): self.dut = dut self._log = logging.getLogger("cocotb.DutDriver") self._received = Queue() self._sync = Event() self._auto_receive = False self._set_ready = False async def receive_data(self): if int(self.dut.rx_valid_o.value) != 1: self._log.error("RX is not valid when receiving data was requested") raise Exception("RX is not valid when receiving data was requested") await FallingEdge(self.dut.clk_i) self.dut.rx_ready_i.value = 1 data = self.dut.rx_data_o.value await FallingEdge(self.dut.clk_i) if int(self.dut.rx_valid_o.value) != 0: self._log.error("RX data stayed valid after receiving!") raise Exception("RX data stayed valid after receiving!") self.dut.rx_ready_i.value = 0 return data async def send_data(self, data): if int(self.dut.tx_ready_o.value) != 1: self._log.error("TX is not ready when sending data was requested") await FallingEdge(self.dut.clk_i) self.dut.tx_valid_i.value = 1 self.dut.tx_data_i.value = data await FallingEdge(self.dut.clk_i) self.dut.tx_valid_i.value = 0 if int(self.dut.tx_ready_o.value) != 0: self._log.error("TX stayed ready after requesting send of data!") async def send_data_wait(self, data): await FallingEdge(self.dut.clk_i) self.dut.tx_valid_i.value = 1 self.dut.tx_data_i.value = data clk_falling = FallingEdge(self.dut.clk_i) tx_ready_falling = FallingEdge(self.dut.tx_ready_o) res = await First(clk_falling, tx_ready_falling) if res == clk_falling: # TODO timeout await RisingEdge(self.dut.tx_ready_o) await RisingEdge(self.dut.clk_i) # now the data were registered # (note that it's ready, not confirmation # so data are sampled only after it actually is ready # that means the next clock cycle from when ready went to 1) self.dut.tx_valid_i.value = 0 async def auto_receive(self, receive: bool = True, set_ready: bool = True, waiting_clocks: int = 0): self._auto_receive = receive self._set_ready = set_ready self._waiting_clocks = waiting_clocks self._sync.set() async def received_data(self): if self._received.empty(): return None return await self._received.get() async def coroutine(self): while True: if not self._auto_receive: self.dut.rx_ready_i.value = 0 await self._sync.wait() self._sync.clear() if not self._auto_receive: continue if self._set_ready and self._waiting_clocks == 0: self.dut.rx_ready_i.value = 1 await RisingEdge(self.dut.rx_valid_o) if int(self.dut.rx_valid_o.value) == 1: await self._received.put(self.dut.rx_data_o.value) if self._waiting_clocks > 0: for i in range(self._waiting_clocks): await FallingEdge(self.dut.clk_i) self.dut.rx_ready_i.value = 1 await FallingEdge(self.dut.clk_i) self.dut.rx_ready_i.value = 0 else: await FallingEdge(self.dut.clk_i) should_lose_data = not self._set_ready and self.dut.tx_valid_i.value == 1 await RisingEdge(self.dut.clk_i) await RisingEdge(self.dut.clk_i) if should_lose_data and int(self.dut.err_lost_rx_data_o.value) != 1: self._log.error("Didn't get err lost rx data after rx valid") if should_lose_data: self.dut.clear_lost_rx_data_i.value = 1 await RisingEdge(self.dut.clk_i) await FallingEdge(self.dut.clk_i) self.dut.clear_lost_rx_data_i.value = 0 if int(self.dut.err_lost_rx_data_o.value) != 0: self._log.error("Could not clear err lost rx data") @cocotb.test() async def single_transmit(dut): clk = Clock(dut.clk_i, 5, "ns") interface = SpiInterface(dut.csn_o, dut.sck_o, dut.miso_i, dut.mosi_o) config = SpiConfig(8, RisingEdge, FallingEdge, 10, "ns") slave = SpiSlave(interface, config) driver = DutDriver(dut) await cocotb.start(clk.start()) await init(dut) await cocotb.start(slave.coroutine()) await cocotb.start(driver.coroutine()) # From slave point of view rx = random.randint(0, 255) tx = random.randint(0, 255) await slave.send_data(tx, 8) await slave.expect_transaction_in(15, "ns") await driver.send_data(rx) await slave.wait_all() dut_received = await driver.receive_data() assert int(dut_received) & 0xFF == tx # Wait a few clocks, rx data should still stay valid! await FallingEdge(dut.clk_i) await FallingEdge(dut.clk_i) await FallingEdge(dut.clk_i) await FallingEdge(dut.clk_i) received = await slave.received_data() assert received & 0xFF == rx await Timer(100, "ns") @cocotb.test() async def lsbfirst(dut): clk = Clock(dut.clk_i, 5, "ns") interface = SpiInterface(dut.csn_o, dut.sck_o, dut.miso_i, dut.mosi_o) config = SpiConfig(8, RisingEdge, FallingEdge, 10, "ns") slave = SpiSlave(interface, config) driver = DutDriver(dut) await cocotb.start(clk.start()) await init(dut) dut.lsbfirst_i.value = 1 await FallingEdge(dut.clk_i) await cocotb.start(slave.coroutine()) await cocotb.start(driver.coroutine()) # From slave point of view rx = random.randint(0, 255) tx = random.randint(0, 255) def reverse_bits(num, bits): result = 0 for i in range(bits): result = (result << 1) | (num & 1) num >>= 1 return result rx_reversed = reverse_bits(rx, 8) tx_reversed = reverse_bits(tx, 8) await slave.send_data(tx, 8) await slave.expect_transaction_in(15, "ns") await driver.send_data(rx) await slave.wait_all() dut_received = await driver.receive_data() assert (int(dut_received) >> 8) & 0xFF == tx_reversed # Wait a few clocks, rx data should still stay valid! await FallingEdge(dut.clk_i) await FallingEdge(dut.clk_i) await FallingEdge(dut.clk_i) await FallingEdge(dut.clk_i) received = await slave.received_data() assert received & 0xFF == rx_reversed await Timer(100, "ns") @cocotb.test() async def rx_tx_disabled(dut): clk = Clock(dut.clk_i, 5, "ns") interface = SpiInterface(dut.csn_o, dut.sck_o, dut.miso_i, dut.mosi_o) config = SpiConfig(8, RisingEdge, FallingEdge, 10, "ns") slave = SpiSlave(interface, config) driver = DutDriver(dut) await cocotb.start(clk.start()) await init(dut) dut.rx_en_i.value = 0 dut.tx_en_i.value = 1 # Try with rx blocking, it should not matter. dut.rx_block_on_full_i.value = 1 await FallingEdge(dut.clk_i) await cocotb.start(slave.coroutine()) await cocotb.start(driver.coroutine()) # From slave point of view rx = random.randint(0, 255) tx = random.randint(0, 255) await slave.send_data(tx, 8) await slave.expect_transaction_in(15, "ns") await driver.send_data(rx) await slave.wait_all() # RX should be idle assert int(dut.rx_valid_o.value) == 0 # TX should be working received = await slave.received_data() assert received & 0xFF == rx # Second transaction should be fine. await slave.send_data(tx, 8) await slave.expect_transaction_in(15, "ns") await driver.send_data(rx) await slave.wait_all() # RX should be idle assert int(dut.rx_valid_o.value) == 0 # TX should be working received = await slave.received_data() assert received & 0xFF == rx # Now switch to tx disabled. mosi should stay 'Z' await FallingEdge(dut.clk_i) dut.rx_en_i.value = 1 dut.tx_en_i.value = 0 await FallingEdge(dut.clk_i) await FallingEdge(dut.clk_i) assert int(dut.mosi_t.value) == 1 await slave.send_data(tx, 8) await slave.expect_transaction_in(15, "ns") # Although nothing is actually transmitted, still # tx valid should be asserted for reception to begin await driver.send_data(rx) timeout = Timer(10 * (8 + 5), "ns") mosi_event = Edge(dut.mosi_t) res = await First(timeout, mosi_event) if res == mosi_event: raise Exception("Mosi has changed even though tx is disabled!") await slave.wait_all() # RX should have the data data = await driver.receive_data() assert int(data) & 0xFF == tx await Timer(100, "ns") async def perform_multiple_transmits(count, dut, slave, driver, size = 8): tx_data = [random.randint(0, 2**size - 1) for i in range(count)] rx_data = [random.randint(0, 2**size - 1) for i in range(count)] for tx in tx_data: await slave.send_data(tx, size) dut._log.info(f"Sending Data from slave: {tx}") dut._log.info("To expect transaction") await slave.expect_transaction_in(30, "ns") for rx in rx_data: dut._log.info(f"Sending Data from master: {rx}") await driver.send_data_wait(rx) await slave.wait_all() mask = 0 for i in range(size): mask |= 1 << i # Checks for tx in tx_data: dut_received = await driver.received_data() assert int(dut_received) & mask == tx for rx in rx_data: received = await slave.received_data() assert received & mask == rx @cocotb.test() async def multiple_transmits(dut): clk = Clock(dut.clk_i, 5, "ns") interface = SpiInterface(dut.csn_o, dut.sck_o, dut.miso_i, dut.mosi_o) config = SpiConfig(8, RisingEdge, FallingEdge, 10, "ns") slave = SpiSlave(interface, config) driver = DutDriver(dut) await cocotb.start(clk.start()) await init(dut) await cocotb.start(slave.coroutine()) await cocotb.start(driver.coroutine()) await driver.auto_receive() count = 5 await perform_multiple_transmits(count, dut, slave, driver) await Timer(100, "ns") @cocotb.test() async def lost_rx_data(dut): clk = Clock(dut.clk_i, 5, "ns") interface = SpiInterface(dut.csn_o, dut.sck_o, dut.miso_i, dut.mosi_o) config = SpiConfig(8, RisingEdge, FallingEdge, 10, "ns") slave = SpiSlave(interface, config) driver = DutDriver(dut) await cocotb.start(clk.start()) await init(dut) await cocotb.start(slave.coroutine()) await cocotb.start(driver.coroutine()) # Do not confirm reception of data. That will auto check # if lost is asserted and cleared appropriately. await driver.auto_receive(True, False) count = 5 await perform_multiple_transmits(count, dut, slave, driver) await Timer(100, "ns") @cocotb.test() async def different_clock(dut): clk = Clock(dut.clk_i, 5, "ns") interface = SpiInterface(dut.csn_o, dut.sck_o, dut.miso_i, dut.mosi_o) config = SpiConfig(8, RisingEdge, FallingEdge, 20, "ns") slave = SpiSlave(interface, config) driver = DutDriver(dut) await cocotb.start(clk.start()) await init(dut) dut.div_sel_i.value = 1 # divide by 4 await cocotb.start(slave.coroutine()) await cocotb.start(driver.coroutine()) await driver.auto_receive() count = 5 await perform_multiple_transmits(count, dut, slave, driver) await Timer(100, "ns") @cocotb.test() async def different_clock_256(dut): clk = Clock(dut.clk_i, 5, "ns") interface = SpiInterface(dut.csn_o, dut.sck_o, dut.miso_i, dut.mosi_o) config = SpiConfig(8, RisingEdge, FallingEdge, 2550, "ns") slave = SpiSlave(interface, config) driver = DutDriver(dut) await cocotb.start(clk.start()) await init(dut) dut.div_sel_i.value = 7 # divide by 4 await cocotb.start(slave.coroutine()) await cocotb.start(driver.coroutine()) await driver.auto_receive() count = 5 await perform_multiple_transmits(count, dut, slave, driver) await Timer(100, "ns") @cocotb.test() async def inverted_clock(dut): clk = Clock(dut.clk_i, 5, "ns") interface = SpiInterface(dut.csn_o, dut.sck_o, dut.miso_i, dut.mosi_o) config = SpiConfig(8, RisingEdge, FallingEdge, 10, "ns", clock_polarity = 1) slave = SpiSlave(interface, config) driver = DutDriver(dut) await cocotb.start(clk.start()) await init(dut) dut.clock_phase_i.value = 1 dut.clock_polarity_i.value = 1 await cocotb.start(slave.coroutine()) await cocotb.start(driver.coroutine()) await driver.auto_receive() count = 5 await perform_multiple_transmits(count, dut, slave, driver) await Timer(100, "ns") @cocotb.test() async def shifted_inverted_clock(dut): clk = Clock(dut.clk_i, 5, "ns") interface = SpiInterface(dut.csn_o, dut.sck_o, dut.miso_i, dut.mosi_o) config = SpiConfig(8, FallingEdge, RisingEdge, 10, "ns", clock_polarity = 1) slave = SpiSlave(interface, config) driver = DutDriver(dut) await cocotb.start(clk.start()) await init(dut) dut.clock_phase_i.value = 0 dut.clock_polarity_i.value = 1 await FallingEdge(dut.clk_i) await cocotb.start(slave.coroutine()) await cocotb.start(driver.coroutine()) await driver.auto_receive() count = 3 await perform_multiple_transmits(count, dut, slave, driver) await Timer(100, "ns") dut.clock_phase_i.value = 1 dut.clock_polarity_i.value = 0 slave.config.clock_polarity = 0 await FallingEdge(dut.clk_i) await perform_multiple_transmits(count, dut, slave, driver) await Timer(100, "ns") @cocotb.test() async def sixteen_bits(dut): clk = Clock(dut.clk_i, 5, "ns") interface = SpiInterface(dut.csn_o, dut.sck_o, dut.miso_i, dut.mosi_o) config = SpiConfig(16, FallingEdge, RisingEdge, 10, "ns", clock_polarity = 1) slave = SpiSlave(interface, config) driver = DutDriver(dut) await cocotb.start(clk.start()) await init(dut) dut.clock_phase_i.value = 0 dut.clock_polarity_i.value = 1 dut.size_sel_i.value = 1 await FallingEdge(dut.clk_i) await cocotb.start(slave.coroutine()) await cocotb.start(driver.coroutine()) await driver.auto_receive() count = 5 await perform_multiple_transmits(count, dut, slave, driver, 16) await Timer(100, "ns") @cocotb.test() async def rx_blocking_tx(dut): clk = Clock(dut.clk_i, 5, "ns") interface = SpiInterface(dut.csn_o, dut.sck_o, dut.miso_i, dut.mosi_o) config = SpiConfig(16, RisingEdge, FallingEdge, 10, "ns", csn_pulse = True) slave = SpiSlave(interface, config) driver = DutDriver(dut) await cocotb.start(clk.start()) await init(dut) dut.size_sel_i.value = 1 dut.rx_block_on_full_i.value = 1 await FallingEdge(dut.clk_i) await cocotb.start(slave.coroutine()) await cocotb.start(driver.coroutine()) await driver.auto_receive(waiting_clocks = 5) count = 5 await perform_multiple_transmits(count, dut, slave, driver, 16) await Timer(100, "ns") @cocotb.test() async def csn_pulse(dut): clk = Clock(dut.clk_i, 5, "ns") interface = SpiInterface(dut.csn_o, dut.sck_o, dut.miso_i, dut.mosi_o) config = SpiConfig(16, RisingEdge, FallingEdge, 10, "ns", csn_pulse = True) slave = SpiSlave(interface, config) driver = DutDriver(dut) await cocotb.start(clk.start()) await init(dut) dut.pulse_csn_i.value = 1 dut.size_sel_i.value = 1 await FallingEdge(dut.clk_i) await cocotb.start(slave.coroutine()) await cocotb.start(driver.coroutine()) await driver.auto_receive() count = 5 await perform_multiple_transmits(count, dut, slave, driver, 16) await Timer(100, "ns") def spi_tests_runner(): hdl_toplevel_lang = "vhdl" sim = os.getenv("SIM", "questa") test_filter = os.getenv("TESTCASE", None) gui = True if os.getenv("GUI", "0") == "1" else False proj_path = Path(__file__).resolve().parent.parent # equivalent to setting the PYTHONPATH environment variable sys.path.append(str(proj_path / "models")) run = True if os.getenv("RUN", "1") == "1" else False sources = [ proj_path / "src" / "spi_pkg.vhd", proj_path / "src" / "rs_latch.vhd", proj_path / "src" / "register.vhd", proj_path / "src" / "shift_register.vhd", proj_path / "src" / "spi_clkgen.vhd", proj_path / "src" / "spi_clkmon.vhd", proj_path / "src" / "spi_slave_ctrl.vhd", proj_path / "src" / "spi_master_ctrl.vhd", proj_path / "src" / "spi_master.vhd", proj_path / "src" / "spi_masterslave.vhd", proj_path / "src" / "spi_peripheral.vhd" ] build_args = [] extra_args = [] if sim == "ghdl": extra_args = ["--std=08"] elif sim == "questa" or sim == "modelsim": build_args = ["-2008"] # equivalent to setting the PYTHONPATH environment variable sys.path.append(str(proj_path / "tests")) runner = get_runner(sim) runner.build( sources=sources, hdl_toplevel="spi_masterslave", always=True, build_args=build_args + extra_args, ) if run: runner.test( hdl_toplevel="spi_masterslave", hdl_toplevel_lang=hdl_toplevel_lang, test_module="test_spi_masterslave", test_args = extra_args, gui = gui, test_filter = test_filter ) if __name__ == "__main__": spi_tests_runner()