import os
import sys
from pathlib import Path
import logging
import random
from dataclasses import dataclass
import cocotb
import cocotb.handle
from cocotb.queue import Queue
from cocotb.clock import Clock
from cocotb.triggers import Trigger, First, Event, Timer, RisingEdge, Edge, FallingEdge
from cocotb_tools.runner import get_runner
if cocotb.simulator.is_running():
from spi_models import SpiInterface, SpiConfig, SpiSlave
async def init(dut, master: int = 1, tx_en: int = 1):
dut._log.info("Init started!")
dut.miso_i.value = 0;
dut.rst_in.value = 0;
dut.lsbfirst_i.value = 0;
dut.clock_polarity_i.value = 0;
dut.clock_phase_i.value = 0;
dut.size_sel_i.value = 0;
dut.div_sel_i.value = 0;
dut.pulse_csn_i.value = 0;
dut.rx_block_on_full_i.value = 0;
dut.rx_ready_i.value = 0;
dut.tx_valid_i.value = 0;
dut.clear_lost_rx_data_i.value = 0;
dut.rx_en_i.value = 1;
dut.tx_en_i.value = 1;
dut.master_i.value = 1;
dut.en_i.value = 1;
await FallingEdge(dut.clk_i)
await FallingEdge(dut.clk_i)
# Release reset
dut.rst_in.value = 1;
await FallingEdge(dut.clk_i)
await FallingEdge(dut.clk_i)
dut._log.info("Init done!")
class DutDriver:
def __init__(self, dut):
self.dut = dut
self._log = logging.getLogger("cocotb.DutDriver")
self._received = Queue()
self._sync = Event()
self._auto_receive = False
self._set_ready = False
async def receive_data(self):
if int(self.dut.rx_valid_o.value) != 1:
self._log.error("RX is not valid when receiving data was requested")
raise Exception("RX is not valid when receiving data was requested")
await FallingEdge(self.dut.clk_i)
self.dut.rx_ready_i.value = 1
data = self.dut.rx_data_o.value
await FallingEdge(self.dut.clk_i)
if int(self.dut.rx_valid_o.value) != 0:
self._log.error("RX data stayed valid after receiving!")
raise Exception("RX data stayed valid after receiving!")
self.dut.rx_ready_i.value = 0
return data
async def send_data(self, data):
if int(self.dut.tx_ready_o.value) != 1:
self._log.error("TX is not ready when sending data was requested")
await FallingEdge(self.dut.clk_i)
self.dut.tx_valid_i.value = 1
self.dut.tx_data_i.value = data
await FallingEdge(self.dut.clk_i)
self.dut.tx_valid_i.value = 0
if int(self.dut.tx_ready_o.value) != 0:
self._log.error("TX stayed ready after requesting send of data!")
async def send_data_wait(self, data):
await FallingEdge(self.dut.clk_i)
self.dut.tx_valid_i.value = 1
self.dut.tx_data_i.value = data
clk_falling = FallingEdge(self.dut.clk_i)
tx_ready_falling = FallingEdge(self.dut.tx_ready_o)
res = await First(clk_falling, tx_ready_falling)
if res == clk_falling:
# TODO timeout
await RisingEdge(self.dut.tx_ready_o)
await RisingEdge(self.dut.clk_i)
# now the data were registered
# (note that it's ready, not confirmation
# so data are sampled only after it actually is ready
# that means the next clock cycle from when ready went to 1)
self.dut.tx_valid_i.value = 0
async def auto_receive(self, receive: bool = True, set_ready: bool = True, waiting_clocks: int = 0):
self._auto_receive = receive
self._set_ready = set_ready
self._waiting_clocks = waiting_clocks
self._sync.set()
async def received_data(self):
if self._received.empty():
return None
return await self._received.get()
async def coroutine(self):
while True:
if not self._auto_receive:
self.dut.rx_ready_i.value = 0
await self._sync.wait()
self._sync.clear()
if not self._auto_receive:
continue
if self._set_ready and self._waiting_clocks == 0:
self.dut.rx_ready_i.value = 1
await RisingEdge(self.dut.rx_valid_o)
if int(self.dut.rx_valid_o.value) == 1:
await self._received.put(self.dut.rx_data_o.value)
if self._waiting_clocks > 0:
for i in range(self._waiting_clocks):
await FallingEdge(self.dut.clk_i)
self.dut.rx_ready_i.value = 1
await FallingEdge(self.dut.clk_i)
self.dut.rx_ready_i.value = 0
else:
await FallingEdge(self.dut.clk_i)
should_lose_data = not self._set_ready and self.dut.tx_valid_i.value == 1
await RisingEdge(self.dut.clk_i)
await RisingEdge(self.dut.clk_i)
if should_lose_data and int(self.dut.err_lost_rx_data_o.value) != 1:
self._log.error("Didn't get err lost rx data after rx valid")
if should_lose_data:
self.dut.clear_lost_rx_data_i.value = 1
await RisingEdge(self.dut.clk_i)
await FallingEdge(self.dut.clk_i)
self.dut.clear_lost_rx_data_i.value = 0
if int(self.dut.err_lost_rx_data_o.value) != 0:
self._log.error("Could not clear err lost rx data")
@cocotb.test()
async def single_transmit(dut):
clk = Clock(dut.clk_i, 5, "ns")
interface = SpiInterface(dut.csn_o, dut.sck_o, dut.miso_i, dut.mosi_o)
config = SpiConfig(8, RisingEdge, FallingEdge, 10, "ns")
slave = SpiSlave(interface, config)
driver = DutDriver(dut)
await cocotb.start(clk.start())
await init(dut)
await cocotb.start(slave.coroutine())
await cocotb.start(driver.coroutine())
# From slave point of view
rx = random.randint(0, 255)
tx = random.randint(0, 255)
await slave.send_data(tx, 8)
await slave.expect_transaction_in(15, "ns")
await driver.send_data(rx)
await slave.wait_all()
dut_received = await driver.receive_data()
assert int(dut_received) & 0xFF == tx
# Wait a few clocks, rx data should still stay valid!
await FallingEdge(dut.clk_i)
await FallingEdge(dut.clk_i)
await FallingEdge(dut.clk_i)
await FallingEdge(dut.clk_i)
received = await slave.received_data()
assert received & 0xFF == rx
await Timer(100, "ns")
@cocotb.test()
async def lsbfirst(dut):
clk = Clock(dut.clk_i, 5, "ns")
interface = SpiInterface(dut.csn_o, dut.sck_o, dut.miso_i, dut.mosi_o)
config = SpiConfig(8, RisingEdge, FallingEdge, 10, "ns")
slave = SpiSlave(interface, config)
driver = DutDriver(dut)
await cocotb.start(clk.start())
await init(dut)
dut.lsbfirst_i.value = 1
await FallingEdge(dut.clk_i)
await cocotb.start(slave.coroutine())
await cocotb.start(driver.coroutine())
# From slave point of view
rx = random.randint(0, 255)
tx = random.randint(0, 255)
def reverse_bits(num, bits):
result = 0
for i in range(bits):
result = (result << 1) | (num & 1)
num >>= 1
return result
rx_reversed = reverse_bits(rx, 8)
tx_reversed = reverse_bits(tx, 8)
await slave.send_data(tx, 8)
await slave.expect_transaction_in(15, "ns")
await driver.send_data(rx)
await slave.wait_all()
dut_received = await driver.receive_data()
assert (int(dut_received) >> 8) & 0xFF == tx_reversed
# Wait a few clocks, rx data should still stay valid!
await FallingEdge(dut.clk_i)
await FallingEdge(dut.clk_i)
await FallingEdge(dut.clk_i)
await FallingEdge(dut.clk_i)
received = await slave.received_data()
assert received & 0xFF == rx_reversed
await Timer(100, "ns")
@cocotb.test()
async def rx_tx_disabled(dut):
clk = Clock(dut.clk_i, 5, "ns")
interface = SpiInterface(dut.csn_o, dut.sck_o, dut.miso_i, dut.mosi_o)
config = SpiConfig(8, RisingEdge, FallingEdge, 10, "ns")
slave = SpiSlave(interface, config)
driver = DutDriver(dut)
await cocotb.start(clk.start())
await init(dut)
dut.rx_en_i.value = 0
dut.tx_en_i.value = 1
# Try with rx blocking, it should not matter.
dut.rx_block_on_full_i.value = 1
await FallingEdge(dut.clk_i)
await cocotb.start(slave.coroutine())
await cocotb.start(driver.coroutine())
# From slave point of view
rx = random.randint(0, 255)
tx = random.randint(0, 255)
await slave.send_data(tx, 8)
await slave.expect_transaction_in(15, "ns")
await driver.send_data(rx)
await slave.wait_all()
# RX should be idle
assert int(dut.rx_valid_o.value) == 0
# TX should be working
received = await slave.received_data()
assert received & 0xFF == rx
# Second transaction should be fine.
await slave.send_data(tx, 8)
await slave.expect_transaction_in(15, "ns")
await driver.send_data(rx)
await slave.wait_all()
# RX should be idle
assert int(dut.rx_valid_o.value) == 0
# TX should be working
received = await slave.received_data()
assert received & 0xFF == rx
# Now switch to tx disabled. mosi should stay 'Z'
await FallingEdge(dut.clk_i)
dut.rx_en_i.value = 1
dut.tx_en_i.value = 0
await FallingEdge(dut.clk_i)
await FallingEdge(dut.clk_i)
assert int(dut.mosi_t.value) == 1
await slave.send_data(tx, 8)
await slave.expect_transaction_in(15, "ns")
# Although nothing is actually transmitted, still
# tx valid should be asserted for reception to begin
await driver.send_data(rx)
timeout = Timer(10 * (8 + 5), "ns")
mosi_event = Edge(dut.mosi_t)
res = await First(timeout, mosi_event)
if res == mosi_event:
raise Exception("Mosi has changed even though tx is disabled!")
await slave.wait_all()
# RX should have the data
data = await driver.receive_data()
assert int(data) & 0xFF == tx
await Timer(100, "ns")
async def perform_multiple_transmits(count, dut, slave, driver, size = 8):
tx_data = [random.randint(0, 2**size - 1) for i in range(count)]
rx_data = [random.randint(0, 2**size - 1) for i in range(count)]
for tx in tx_data:
await slave.send_data(tx, size)
dut._log.info(f"Sending Data from slave: {tx}")
dut._log.info("To expect transaction")
await slave.expect_transaction_in(30, "ns")
for rx in rx_data:
dut._log.info(f"Sending Data from master: {rx}")
await driver.send_data_wait(rx)
await slave.wait_all()
mask = 0
for i in range(size):
mask |= 1 << i
# Checks
for tx in tx_data:
dut_received = await driver.received_data()
assert int(dut_received) & mask == tx
for rx in rx_data:
received = await slave.received_data()
assert received & mask == rx
@cocotb.test()
async def multiple_transmits(dut):
clk = Clock(dut.clk_i, 5, "ns")
interface = SpiInterface(dut.csn_o, dut.sck_o, dut.miso_i, dut.mosi_o)
config = SpiConfig(8, RisingEdge, FallingEdge, 10, "ns")
slave = SpiSlave(interface, config)
driver = DutDriver(dut)
await cocotb.start(clk.start())
await init(dut)
await cocotb.start(slave.coroutine())
await cocotb.start(driver.coroutine())
await driver.auto_receive()
count = 5
await perform_multiple_transmits(count, dut, slave, driver)
await Timer(100, "ns")
@cocotb.test()
async def lost_rx_data(dut):
clk = Clock(dut.clk_i, 5, "ns")
interface = SpiInterface(dut.csn_o, dut.sck_o, dut.miso_i, dut.mosi_o)
config = SpiConfig(8, RisingEdge, FallingEdge, 10, "ns")
slave = SpiSlave(interface, config)
driver = DutDriver(dut)
await cocotb.start(clk.start())
await init(dut)
await cocotb.start(slave.coroutine())
await cocotb.start(driver.coroutine())
# Do not confirm reception of data. That will auto check
# if lost is asserted and cleared appropriately.
await driver.auto_receive(True, False)
count = 5
await perform_multiple_transmits(count, dut, slave, driver)
await Timer(100, "ns")
@cocotb.test()
async def different_clock(dut):
clk = Clock(dut.clk_i, 5, "ns")
interface = SpiInterface(dut.csn_o, dut.sck_o, dut.miso_i, dut.mosi_o)
config = SpiConfig(8, RisingEdge, FallingEdge, 20, "ns")
slave = SpiSlave(interface, config)
driver = DutDriver(dut)
await cocotb.start(clk.start())
await init(dut)
dut.div_sel_i.value = 1 # divide by 4
await cocotb.start(slave.coroutine())
await cocotb.start(driver.coroutine())
await driver.auto_receive()
count = 5
await perform_multiple_transmits(count, dut, slave, driver)
await Timer(100, "ns")
@cocotb.test()
async def different_clock_256(dut):
clk = Clock(dut.clk_i, 5, "ns")
interface = SpiInterface(dut.csn_o, dut.sck_o, dut.miso_i, dut.mosi_o)
config = SpiConfig(8, RisingEdge, FallingEdge, 2550, "ns")
slave = SpiSlave(interface, config)
driver = DutDriver(dut)
await cocotb.start(clk.start())
await init(dut)
dut.div_sel_i.value = 7 # divide by 4
await cocotb.start(slave.coroutine())
await cocotb.start(driver.coroutine())
await driver.auto_receive()
count = 5
await perform_multiple_transmits(count, dut, slave, driver)
await Timer(100, "ns")
@cocotb.test()
async def inverted_clock(dut):
clk = Clock(dut.clk_i, 5, "ns")
interface = SpiInterface(dut.csn_o, dut.sck_o, dut.miso_i, dut.mosi_o)
config = SpiConfig(8, RisingEdge, FallingEdge, 10, "ns", clock_polarity = 1)
slave = SpiSlave(interface, config)
driver = DutDriver(dut)
await cocotb.start(clk.start())
await init(dut)
dut.clock_phase_i.value = 1
dut.clock_polarity_i.value = 1
await cocotb.start(slave.coroutine())
await cocotb.start(driver.coroutine())
await driver.auto_receive()
count = 5
await perform_multiple_transmits(count, dut, slave, driver)
await Timer(100, "ns")
@cocotb.test()
async def shifted_inverted_clock(dut):
clk = Clock(dut.clk_i, 5, "ns")
interface = SpiInterface(dut.csn_o, dut.sck_o, dut.miso_i, dut.mosi_o)
config = SpiConfig(8, FallingEdge, RisingEdge, 10, "ns", clock_polarity = 1)
slave = SpiSlave(interface, config)
driver = DutDriver(dut)
await cocotb.start(clk.start())
await init(dut)
dut.clock_phase_i.value = 0
dut.clock_polarity_i.value = 1
await FallingEdge(dut.clk_i)
await cocotb.start(slave.coroutine())
await cocotb.start(driver.coroutine())
await driver.auto_receive()
count = 3
await perform_multiple_transmits(count, dut, slave, driver)
await Timer(100, "ns")
dut.clock_phase_i.value = 1
dut.clock_polarity_i.value = 0
slave.config.clock_polarity = 0
await FallingEdge(dut.clk_i)
await perform_multiple_transmits(count, dut, slave, driver)
await Timer(100, "ns")
@cocotb.test()
async def sixteen_bits(dut):
clk = Clock(dut.clk_i, 5, "ns")
interface = SpiInterface(dut.csn_o, dut.sck_o, dut.miso_i, dut.mosi_o)
config = SpiConfig(16, FallingEdge, RisingEdge, 10, "ns", clock_polarity = 1)
slave = SpiSlave(interface, config)
driver = DutDriver(dut)
await cocotb.start(clk.start())
await init(dut)
dut.clock_phase_i.value = 0
dut.clock_polarity_i.value = 1
dut.size_sel_i.value = 1
await FallingEdge(dut.clk_i)
await cocotb.start(slave.coroutine())
await cocotb.start(driver.coroutine())
await driver.auto_receive()
count = 5
await perform_multiple_transmits(count, dut, slave, driver, 16)
await Timer(100, "ns")
@cocotb.test()
async def rx_blocking_tx(dut):
clk = Clock(dut.clk_i, 5, "ns")
interface = SpiInterface(dut.csn_o, dut.sck_o, dut.miso_i, dut.mosi_o)
config = SpiConfig(16, RisingEdge, FallingEdge, 10, "ns", csn_pulse = True)
slave = SpiSlave(interface, config)
driver = DutDriver(dut)
await cocotb.start(clk.start())
await init(dut)
dut.size_sel_i.value = 1
dut.rx_block_on_full_i.value = 1
await FallingEdge(dut.clk_i)
await cocotb.start(slave.coroutine())
await cocotb.start(driver.coroutine())
await driver.auto_receive(waiting_clocks = 5)
count = 5
await perform_multiple_transmits(count, dut, slave, driver, 16)
await Timer(100, "ns")
@cocotb.test()
async def csn_pulse(dut):
clk = Clock(dut.clk_i, 5, "ns")
interface = SpiInterface(dut.csn_o, dut.sck_o, dut.miso_i, dut.mosi_o)
config = SpiConfig(16, RisingEdge, FallingEdge, 10, "ns", csn_pulse = True)
slave = SpiSlave(interface, config)
driver = DutDriver(dut)
await cocotb.start(clk.start())
await init(dut)
dut.pulse_csn_i.value = 1
dut.size_sel_i.value = 1
await FallingEdge(dut.clk_i)
await cocotb.start(slave.coroutine())
await cocotb.start(driver.coroutine())
await driver.auto_receive()
count = 5
await perform_multiple_transmits(count, dut, slave, driver, 16)
await Timer(100, "ns")
def spi_tests_runner():
hdl_toplevel_lang = "vhdl"
sim = os.getenv("SIM", "questa")
test_filter = os.getenv("TESTCASE", None)
gui = True if os.getenv("GUI", "0") == "1" else False
proj_path = Path(__file__).resolve().parent.parent
# equivalent to setting the PYTHONPATH environment variable
sys.path.append(str(proj_path / "models"))
run = True if os.getenv("RUN", "1") == "1" else False
sources = [
proj_path / "src" / "spi_pkg.vhd",
proj_path / "src" / "rs_latch.vhd",
proj_path / "src" / "register.vhd",
proj_path / "src" / "shift_register.vhd",
proj_path / "src" / "spi_clkgen.vhd",
proj_path / "src" / "spi_clkmon.vhd",
proj_path / "src" / "spi_slave_ctrl.vhd",
proj_path / "src" / "spi_master_ctrl.vhd",
proj_path / "src" / "spi_master.vhd",
proj_path / "src" / "spi_masterslave.vhd",
proj_path / "src" / "spi_peripheral.vhd"
]
build_args = []
extra_args = []
if sim == "ghdl":
extra_args = ["--std=08"]
elif sim == "questa" or sim == "modelsim":
build_args = ["-2008"]
# equivalent to setting the PYTHONPATH environment variable
sys.path.append(str(proj_path / "tests"))
runner = get_runner(sim)
runner.build(
sources=sources,
hdl_toplevel="spi_masterslave",
always=True,
build_args=build_args + extra_args,
)
if run:
runner.test(
hdl_toplevel="spi_masterslave", hdl_toplevel_lang=hdl_toplevel_lang,
test_module="test_spi_masterslave", test_args = extra_args,
gui = gui, test_filter = test_filter
)
if __name__ == "__main__":
spi_tests_runner()