~ruther/vhdl-spi-2

54521aad7da2977d41073a82e32ee5dec7b96a5d — Rutherther 3 months ago e70719e
feat: add csn pulse test and rx, tx disabling test
2 files changed, 134 insertions(+), 13 deletions(-)

M hdl_spi/models/spi_models.py
M hdl_spi/tests/test_spi_masterslave.py
M hdl_spi/models/spi_models.py => hdl_spi/models/spi_models.py +23 -8
@@ 3,7 3,7 @@ import cocotb.handle
from dataclasses import dataclass
from cocotb.queue import Queue
from cocotb.clock import Clock
from cocotb.triggers import Trigger, First, Event, Timer, RisingEdge, FallingEdge
from cocotb.triggers import Trigger, First, Event, Timer, Edge, RisingEdge, FallingEdge

import logging



@@ 59,6 59,7 @@ class SpiConfig:
    shifting: Trigger
    sck_period: int
    sck_period_unit: str
    csn_pulse: bool = False

class SpiSlave:
    def __init__(self, spi: SpiInterface, config: SpiConfig):


@@ 101,19 102,20 @@ class SpiSlave:

    async def coroutine(self):
        while True:
            self._idle.set()
            await self._wakeup.wait()
            if self.transactions.empty():
                self._idle.set()
                await self._wakeup.wait()

            # first = await First(wakeup, csn_falling)
            # if first == csn_falling:
            #     self.log.error("CSN fell when transaction was unexpected.")
            #     continue

            self._log.info("Got new transaction expectation.")
                self._log.info("Got new transaction expectation.")

            self._wakeup.clear()
                self._wakeup.clear()
                self._idle.clear()
            max, unit = self.data
            self._idle.clear()

            csn_falling = FallingEdge(self.csn)



@@ 142,11 144,12 @@ class SpiSlave:
                while len > 0:
                    sampling = self.config.sampling(self.sck)
                    shifting = self.config.shifting(self.sck)
                    timeout = Timer(self.config.sck_period * 4, self.config.sck_period_unit)
                    timeout = Timer(self.config.sck_period * 2, self.config.sck_period_unit)
                    res = await First(sampling, shifting, timeout)

                    if res == timeout:
                        self._log.error("Got no sck edge in time!")
                        raise Exception("Got not sck edge in time")
                        continue

                    if res == shifting:


@@ 156,7 159,10 @@ class SpiSlave:
                    elif res == sampling:
                        got_sampling = True
                        received = received << 1
                        received |= int(self.rx.value)
                        try: # if z or something, just leave it be
                            received |= int(self.rx.value)
                        except Exception:
                            pass
                        len -= 1

                    await Timer(1, "ns")


@@ 165,14 171,23 @@ class SpiSlave:
                self._log.info(f"Received {received}")
                first = False

                if self.config.csn_pulse:
                    break

            # now wait for csn rising
            timeout = Timer(self.config.sck_period * 2, self.config.sck_period_unit)
            # sck_edge = Edge(self.sck)
            csn_rising = RisingEdge(self.csn)
            res = await First(csn_rising, timeout)

            if res == timeout:
                self._log.error("Got no rising edge on csn")
                raise Exception("Got no rising edge on csn")
                continue
            # elif res == sck_edge:
            #     self._log.error("Got sck edge when csn rising was expected")

            self._log.info("csn is rising")

            self.tx.value = cocotb.handle.Release()


M hdl_spi/tests/test_spi_masterslave.py => hdl_spi/tests/test_spi_masterslave.py +111 -5
@@ 10,7 10,7 @@ import cocotb
import cocotb.handle
from cocotb.queue import Queue
from cocotb.clock import Clock
from cocotb.triggers import Trigger, First, Event, Timer, RisingEdge, FallingEdge
from cocotb.triggers import Trigger, First, Event, Timer, RisingEdge, Edge, FallingEdge
from cocotb_tools.runner import get_runner

if cocotb.simulator.is_running():


@@ 59,6 59,7 @@ class DutDriver:
    async def receive_data(self):
        if int(self.dut.rx_valid_o.value) != 1:
          self._log.error("RX is not valid when receiving data was requested")
          raise Exception("RX is not valid when receiving data was requested")
        await FallingEdge(self.dut.clk_i)
        self.dut.rx_ready_i.value = 1
        data = self.dut.rx_data_o.value


@@ 66,6 67,7 @@ class DutDriver:
        await FallingEdge(self.dut.clk_i)
        if int(self.dut.rx_valid_o.value) != 0:
          self._log.error("RX data stayed valid after receiving!")
          raise Exception("RX data stayed valid after receiving!")
        self.dut.rx_ready_i.value = 0

        return data


@@ 191,6 193,86 @@ async def single_transmit(dut):

    await Timer(100, "ns")

@cocotb.test()
async def rx_tx_disabled(dut):
    clk = Clock(dut.clk_i, 5, "ns")
    interface = SpiInterface(dut.csn_io, dut.sck_io, dut.miso_io, dut.mosi_io)
    config = SpiConfig(8, RisingEdge, FallingEdge, 10, "ns")
    slave = SpiSlave(interface, config)
    driver = DutDriver(dut)

    await cocotb.start(clk.start())

    await init(dut)
    dut.rx_en_i.value = 0
    dut.tx_en_i.value = 1
    # Try with rx blocking, it should not matter.
    dut.rx_block_on_full_i.value = 1
    await FallingEdge(dut.clk_i)

    await cocotb.start(slave.coroutine())
    await cocotb.start(driver.coroutine())

    # From slave point of view
    rx = random.randint(0, 255)
    tx = random.randint(0, 255)

    await slave.send_data(tx, 8)
    await slave.expect_transaction_in(15, "ns")

    await driver.send_data(rx)

    await slave.wait_all()

    # RX should be idle
    assert int(dut.rx_valid_o.value) == 0

    # TX should be working
    received = await slave.received_data()
    assert received & 0xFF == rx

    # Second transaction should be fine.
    await slave.send_data(tx, 8)
    await slave.expect_transaction_in(15, "ns")
    await driver.send_data(rx)
    await slave.wait_all()

    # RX should be idle
    assert int(dut.rx_valid_o.value) == 0

    # TX should be working
    received = await slave.received_data()
    assert received & 0xFF == rx

    # Now switch to tx disabled. mosi should stay 'Z'
    await FallingEdge(dut.clk_i)
    dut.rx_en_i.value = 1
    dut.tx_en_i.value = 0
    await FallingEdge(dut.clk_i)
    await FallingEdge(dut.clk_i)

    assert str(dut.mosi_io.value[0]) == "Z"

    await slave.send_data(tx, 8)
    await slave.expect_transaction_in(15, "ns")
    # Although nothing is actually transmitted, still
    # tx valid should be asserted for reception to begin
    await driver.send_data(rx)

    timeout = Timer(10 * (8 + 5), "ns")
    mosi_event = Edge(dut.mosi_io)

    res = await First(timeout, mosi_event)
    if res == mosi_event:
        raise Exception("Mosi has changed even though tx is disabled!")

    await slave.wait_all()

    # RX should have the data
    data = await driver.receive_data()
    assert int(data) & 0xFF == tx

    await Timer(100, "ns")

async def perform_multiple_transmits(count, dut, slave, driver, size = 8):
    tx_data = [random.randint(0, 2**size - 1) for i in range(count)]


@@ 201,7 283,7 @@ async def perform_multiple_transmits(count, dut, slave, driver, size = 8):
        dut._log.info(f"Sending Data from slave: {tx}")

    dut._log.info("To expect transaction")
    await slave.expect_transaction_in(15, "ns")
    await slave.expect_transaction_in(30, "ns")

    for rx in rx_data:
        dut._log.info(f"Sending Data from master: {rx}")


@@ 377,7 459,7 @@ async def sixteen_bits(dut):
async def rx_blocking_tx(dut):
    clk = Clock(dut.clk_i, 5, "ns")
    interface = SpiInterface(dut.csn_io, dut.sck_io, dut.miso_io, dut.mosi_io)
    config = SpiConfig(16, RisingEdge, FallingEdge, 20, "ns")
    config = SpiConfig(16, RisingEdge, FallingEdge, 10, "ns", csn_pulse = True)
    slave = SpiSlave(interface, config)
    driver = DutDriver(dut)



@@ 397,11 479,35 @@ async def rx_blocking_tx(dut):
    await perform_multiple_transmits(count, dut, slave, driver, 16)

    await Timer(100, "ns")

@cocotb.test()
async def csn_pulse(dut):
    clk = Clock(dut.clk_i, 5, "ns")
    interface = SpiInterface(dut.csn_io, dut.sck_io, dut.miso_io, dut.mosi_io)
    config = SpiConfig(16, RisingEdge, FallingEdge, 20, "ns", csn_pulse = True)
    slave = SpiSlave(interface, config)
    driver = DutDriver(dut)

    await cocotb.start(clk.start())

    await init(dut)
    dut.pulse_csn_i.value = 1
    dut.size_sel_i.value = 1
    await FallingEdge(dut.clk_i)

    await cocotb.start(slave.coroutine())
    await cocotb.start(driver.coroutine())

    await driver.auto_receive()

    count = 5
    await perform_multiple_transmits(count, dut, slave, driver, 16)

    await Timer(100, "ns")

# Rx blocking - Can't go to another transmission until data confirmed.
  # When data read a bit later, and csn pulsing is enabled, the csn should still pulse, before data are obtained

# All clock phases and polarities
# csn pulse
# rx_en off - miso should be ignored, no rx_valid is always 0. Tx works fine
# tx_en off - mosi should be Z, tx_ready is always 0. Rx works fine


Do not follow this link