import os
import sys
from pathlib import Path
import logging
import random
from dataclasses import dataclass
import cocotb
import cocotb.handle
from cocotb.queue import Queue
from cocotb.clock import Clock
from cocotb.triggers import Trigger, First, Event, Timer, RisingEdge, FallingEdge
from cocotb_tools.runner import get_runner
if cocotb.simulator.is_running():
from spi_models import SpiInterface, SpiConfig, SpiSlave
async def init(dut, master: int = 1, tx_en: int = 1):
dut._log.info("Init started!")
dut.miso_io.value = 0;
dut.rst_in.value = 0;
dut.clock_polarity_i.value = 0;
dut.clock_phase_i.value = 0;
dut.size_sel_i.value = 0;
dut.div_sel_i.value = 0;
dut.pulse_csn_i.value = 0;
dut.rx_block_on_full_i.value = 0;
dut.rx_ready_i.value = 0;
dut.tx_valid_i.value = 0;
dut.clear_lost_rx_data_i.value = 0;
dut.rx_en_i.value = 1;
dut.tx_en_i.value = 1;
dut.master_i.value = 1;
dut.en_i.value = 1;
await FallingEdge(dut.clk_i)
await FallingEdge(dut.clk_i)
# Release reset
dut.rst_in.value = 1;
await FallingEdge(dut.clk_i)
await FallingEdge(dut.clk_i)
dut._log.info("Init done!")
class DutDriver:
def __init__(self, dut):
self.dut = dut
self._log = logging.getLogger("cocotb.DutDriver")
self._received = Queue()
self._sync = Event()
self._auto_receive = False
self._set_ready = False
async def receive_data(self):
if int(self.dut.rx_valid_o.value) != 1:
self._log.error("RX is not valid when receiving data was requested")
await FallingEdge(self.dut.clk_i)
self.dut.rx_ready_i.value = 1
data = self.dut.rx_data_o.value
await FallingEdge(self.dut.clk_i)
if int(self.dut.rx_valid_o.value) != 0:
self._log.error("RX data stayed valid after receiving!")
self.dut.rx_ready_i.value = 0
return data
async def send_data(self, data):
if int(self.dut.tx_ready_o.value) != 1:
self._log.error("TX is not ready when sending data was requested")
await FallingEdge(self.dut.clk_i)
self.dut.tx_valid_i.value = 1
self.dut.tx_data_i.value = data
await FallingEdge(self.dut.clk_i)
self.dut.tx_valid_i.value = 0
if int(self.dut.tx_ready_o.value) != 0:
self._log.error("TX stayed ready after requesting send of data!")
async def send_data_wait(self, data):
await FallingEdge(self.dut.clk_i)
self.dut.tx_valid_i.value = 1
self.dut.tx_data_i.value = data
clk_falling = FallingEdge(self.dut.clk_i)
tx_ready_falling = FallingEdge(self.dut.tx_ready_o)
res = await First(clk_falling, tx_ready_falling)
if res == clk_falling:
# TODO timeout
await RisingEdge(self.dut.tx_ready_o)
await RisingEdge(self.dut.clk_i)
# now the data were registered
# (note that it's ready, not confirmation
# so data are sampled only after it actually is ready
# that means the next clock cycle from when ready went to 1)
self.dut.tx_valid_i.value = 0
async def auto_receive(self, receive: bool = True, set_ready: bool = True, waiting_clocks: int = 0):
self._auto_receive = receive
self._set_ready = set_ready
self._waiting_clocks = waiting_clocks
self._sync.set()
async def received_data(self):
if self._received.empty():
return None
return await self._received.get()
async def coroutine(self):
while True:
if not self._auto_receive:
self.dut.rx_ready_i.value = 0
await self._sync.wait()
self._sync.clear()
if not self._auto_receive:
continue
if self._set_ready and self._waiting_clocks == 0:
self.dut.rx_ready_i.value = 1
await RisingEdge(self.dut.rx_valid_o)
if int(self.dut.rx_valid_o.value) == 1:
await self._received.put(self.dut.rx_data_o.value)
if self._waiting_clocks > 0:
for i in range(self._waiting_clocks):
await FallingEdge(self.dut.clk_i)
self.dut.rx_ready_i.value = 1
await FallingEdge(self.dut.clk_i)
self.dut.rx_ready_i.value = 0
should_lose_data = not self._set_ready and self.dut.tx_valid_i.value == 1
await RisingEdge(self.dut.clk_i)
await RisingEdge(self.dut.clk_i)
if should_lose_data and int(self.dut.err_lost_rx_data_o.value) != 1:
self._log.error("Didn't get err lost rx data after rx valid")
if should_lose_data:
self.dut.clear_lost_rx_data_i.value = 1
await RisingEdge(self.dut.clk_i)
await FallingEdge(self.dut.clk_i)
self.dut.clear_lost_rx_data_i.value = 0
if int(self.dut.err_lost_rx_data_o.value) != 0:
self._log.error("Could not clear err lost rx data")
@cocotb.test()
async def single_transmit(dut):
clk = Clock(dut.clk_i, 5, "ns")
interface = SpiInterface(dut.csn_io, dut.sck_io, dut.miso_io, dut.mosi_io)
config = SpiConfig(8, RisingEdge, FallingEdge, 10, "ns")
slave = SpiSlave(interface, config)
driver = DutDriver(dut)
await cocotb.start(clk.start())
await init(dut)
await cocotb.start(slave.coroutine())
await cocotb.start(driver.coroutine())
# From slave point of view
rx = random.randint(0, 255)
tx = random.randint(0, 255)
await slave.send_data(tx, 8)
await slave.expect_transaction_in(15, "ns")
await driver.send_data(rx)
await slave.wait_all()
dut_received = await driver.receive_data()
assert int(dut_received) & 0xFF == tx
# Wait a few clocks, rx data should still stay valid!
await FallingEdge(dut.clk_i)
await FallingEdge(dut.clk_i)
await FallingEdge(dut.clk_i)
await FallingEdge(dut.clk_i)
received = await slave.received_data()
assert received & 0xFF == rx
await Timer(100, "ns")
async def perform_multiple_transmits(count, dut, slave, driver, size = 8):
tx_data = [random.randint(0, 2**size - 1) for i in range(count)]
rx_data = [random.randint(0, 2**size - 1) for i in range(count)]
for tx in tx_data:
await slave.send_data(tx, size)
dut._log.info(f"Sending Data from slave: {tx}")
dut._log.info("To expect transaction")
await slave.expect_transaction_in(15, "ns")
for rx in rx_data:
dut._log.info(f"Sending Data from master: {rx}")
await driver.send_data_wait(rx)
await slave.wait_all()
mask = 0
for i in range(size):
mask |= 1 << i
# Checks
for tx in tx_data:
dut_received = await driver.received_data()
assert int(dut_received) & mask == tx
for rx in rx_data:
received = await slave.received_data()
assert received & mask == rx
@cocotb.test()
async def multiple_transmits(dut):
clk = Clock(dut.clk_i, 5, "ns")
interface = SpiInterface(dut.csn_io, dut.sck_io, dut.miso_io, dut.mosi_io)
config = SpiConfig(8, RisingEdge, FallingEdge, 10, "ns")
slave = SpiSlave(interface, config)
driver = DutDriver(dut)
await cocotb.start(clk.start())
await init(dut)
await cocotb.start(slave.coroutine())
await cocotb.start(driver.coroutine())
await driver.auto_receive()
count = 5
await perform_multiple_transmits(count, dut, slave, driver)
await Timer(100, "ns")
@cocotb.test()
async def lost_rx_data(dut):
clk = Clock(dut.clk_i, 5, "ns")
interface = SpiInterface(dut.csn_io, dut.sck_io, dut.miso_io, dut.mosi_io)
config = SpiConfig(8, RisingEdge, FallingEdge, 10, "ns")
slave = SpiSlave(interface, config)
driver = DutDriver(dut)
await cocotb.start(clk.start())
await init(dut)
await cocotb.start(slave.coroutine())
await cocotb.start(driver.coroutine())
# Do not confirm reception of data. That will auto check
# if lost is asserted and cleared appropriately.
await driver.auto_receive(True, False)
count = 5
await perform_multiple_transmits(count, dut, slave, driver)
await Timer(100, "ns")
@cocotb.test()
async def different_clock(dut):
clk = Clock(dut.clk_i, 5, "ns")
interface = SpiInterface(dut.csn_io, dut.sck_io, dut.miso_io, dut.mosi_io)
config = SpiConfig(8, RisingEdge, FallingEdge, 20, "ns")
slave = SpiSlave(interface, config)
driver = DutDriver(dut)
await cocotb.start(clk.start())
await init(dut)
dut.div_sel_i.value = 1 # divide by 4
await cocotb.start(slave.coroutine())
await cocotb.start(driver.coroutine())
await driver.auto_receive()
count = 5
await perform_multiple_transmits(count, dut, slave, driver)
await Timer(100, "ns")
@cocotb.test()
async def inverted_clock(dut):
clk = Clock(dut.clk_i, 5, "ns")
interface = SpiInterface(dut.csn_io, dut.sck_io, dut.miso_io, dut.mosi_io)
config = SpiConfig(8, RisingEdge, FallingEdge, 20, "ns")
slave = SpiSlave(interface, config)
driver = DutDriver(dut)
await cocotb.start(clk.start())
await init(dut)
dut.clock_phase_i.value = 1
dut.clock_polarity_i.value = 1
await cocotb.start(slave.coroutine())
await cocotb.start(driver.coroutine())
await driver.auto_receive()
count = 5
await perform_multiple_transmits(count, dut, slave, driver)
await Timer(100, "ns")
@cocotb.test()
async def shifted_inverted_clock(dut):
clk = Clock(dut.clk_i, 5, "ns")
interface = SpiInterface(dut.csn_io, dut.sck_io, dut.miso_io, dut.mosi_io)
config = SpiConfig(8, FallingEdge, RisingEdge, 20, "ns")
slave = SpiSlave(interface, config)
driver = DutDriver(dut)
await cocotb.start(clk.start())
await init(dut)
dut.clock_phase_i.value = 0
dut.clock_polarity_i.value = 1
await FallingEdge(dut.clk_i)
await cocotb.start(slave.coroutine())
await cocotb.start(driver.coroutine())
await driver.auto_receive()
count = 3
await perform_multiple_transmits(count, dut, slave, driver)
dut.clock_phase_i.value = 1
dut.clock_polarity_i.value = 0
await FallingEdge(dut.clk_i)
await perform_multiple_transmits(count, dut, slave, driver)
await Timer(100, "ns")
@cocotb.test()
async def sixteen_bits(dut):
clk = Clock(dut.clk_i, 5, "ns")
interface = SpiInterface(dut.csn_io, dut.sck_io, dut.miso_io, dut.mosi_io)
config = SpiConfig(16, FallingEdge, RisingEdge, 20, "ns")
slave = SpiSlave(interface, config)
driver = DutDriver(dut)
await cocotb.start(clk.start())
await init(dut)
dut.clock_phase_i.value = 0
dut.clock_polarity_i.value = 1
dut.size_sel_i.value = 1
await FallingEdge(dut.clk_i)
await cocotb.start(slave.coroutine())
await cocotb.start(driver.coroutine())
await driver.auto_receive()
count = 5
await perform_multiple_transmits(count, dut, slave, driver, 16)
await Timer(100, "ns")
@cocotb.test()
async def rx_blocking_tx(dut):
clk = Clock(dut.clk_i, 5, "ns")
interface = SpiInterface(dut.csn_io, dut.sck_io, dut.miso_io, dut.mosi_io)
config = SpiConfig(16, RisingEdge, FallingEdge, 20, "ns")
slave = SpiSlave(interface, config)
driver = DutDriver(dut)
await cocotb.start(clk.start())
await init(dut)
dut.size_sel_i.value = 1
dut.rx_block_on_full_i.value = 1
await FallingEdge(dut.clk_i)
await cocotb.start(slave.coroutine())
await cocotb.start(driver.coroutine())
await driver.auto_receive(waiting_clocks = 5)
count = 5
await perform_multiple_transmits(count, dut, slave, driver, 16)
await Timer(100, "ns")
# Rx blocking - Can't go to another transmission until data confirmed.
# When data read a bit later, and csn pulsing is enabled, the csn should still pulse, before data are obtained
# All clock phases and polarities
# csn pulse
# rx_en off - miso should be ignored, no rx_valid is always 0. Tx works fine
# tx_en off - mosi should be Z, tx_ready is always 0. Rx works fine
def spi_tests_runner():
hdl_toplevel_lang = "vhdl"
sim = os.getenv("SIM", "questa")
proj_path = Path(__file__).resolve().parent.parent
# equivalent to setting the PYTHONPATH environment variable
sys.path.append(str(proj_path / "models"))
sources = [
proj_path / "src" / "spi_pkg.vhd",
proj_path / "src" / "rs_latch.vhd",
proj_path / "src" / "register.vhd",
proj_path / "src" / "shift_register.vhd",
proj_path / "src" / "spi_clkgen.vhd",
proj_path / "src" / "spi_clkmon.vhd",
proj_path / "src" / "spi_multiplexor.vhd",
proj_path / "src" / "spi_slave_ctrl.vhd",
proj_path / "src" / "spi_master_ctrl.vhd",
proj_path / "src" / "spi_master.vhd",
proj_path / "src" / "spi_masterslave.vhd",
proj_path / "src" / "spi_peripheral.vhd"
]
build_args = []
extra_args = []
if sim == "ghdl":
extra_args = ["--std=08"]
elif sim == "questa" or sim == "modelsim":
build_args = ["-2008"]
# equivalent to setting the PYTHONPATH environment variable
sys.path.append(str(proj_path / "tests"))
runner = get_runner(sim)
runner.build(
sources=sources,
hdl_toplevel="spi_masterslave",
always=True,
build_args=build_args + extra_args,
)
runner.test(
hdl_toplevel="spi_masterslave", hdl_toplevel_lang=hdl_toplevel_lang,
test_module="test_spi_masterslave", test_args = extra_args
)
if __name__ == "__main__":
spi_tests_runner()