~ruther/vhdl-spi-2

94410b0e9f2ca2e0fdf37df9c565150784b99655 — Rutherther 3 months ago e9f10c1
feat: tests for multiple transmissions, rx lost
1 files changed, 214 insertions(+), 26 deletions(-)

M hdl_spi/tests/test.py
M hdl_spi/tests/test.py => hdl_spi/tests/test.py +214 -26
@@ 5,6 5,7 @@ import cocotb.handle
from cocotb.clock import Clock
from cocotb.triggers import Trigger, First, Event, Timer, RisingEdge, FallingEdge
import logging
import random

@dataclass
class SpiInterface:


@@ 154,6 155,8 @@ class SpiSlave:
                        received = received << 1
                        received |= int(self.rx.value)
                        len -= 1
                await self.received.put(received)
                self.log.info(f"Received {received}")

            # now wait for csn rising
            timeout = Timer(self.config.sck_period * 2, self.config.sck_period_unit)


@@ 165,8 168,6 @@ class SpiSlave:
                continue

            self.tx.value = cocotb.handle.Release()
            await self.received.put(received)
            self.log.info(f"Received {received}")

            # good, continue
            self._transaction.set()


@@ 203,54 204,241 @@ async def init(dut, master: int = 1, tx_en: int = 1):

    dut._log.info("Init done!")

class DutDriver:
    def __init__(self, dut):
        self.dut = dut
        self._log = logging.getLogger("DutDriver")
        self._received = Queue()
        self._sync = Event()
        self._auto_receive = False
        self._set_ready = False

    async def receive_data(self):
        if int(self.dut.rx_valid_o.value) != 1:
          self._log.error("RX is not valid when receiving data was requested")
        await FallingEdge(self.dut.clk_i)
        self.dut.rx_ready_i.value = 1
        data = self.dut.rx_data_o.value

        await FallingEdge(self.dut.clk_i)
        if int(self.dut.rx_valid_o.value) != 0:
          self._log.error("RX data stayed valid after receiving!")
        self.dut.rx_ready_i.value = 0

        return data

    async def send_data(self, data):
        if int(self.dut.tx_ready_o.value) != 1:
          self._log.error("TX is not ready when sending data was requested")
        await FallingEdge(self.dut.clk_i)
        self.dut.tx_valid_i.value = 1
        self.dut.tx_data_i.value = data
        await FallingEdge(self.dut.clk_i)
        self.dut.tx_valid_i.value = 0
        if int(self.dut.tx_ready_o.value) != 0:
          self._log.error("TX stayed ready after requesting send of data!")

    async def send_data_wait(self, data):
        await FallingEdge(self.dut.clk_i)
        self.dut.tx_valid_i.value = 1
        self.dut.tx_data_i.value = data
        clk_falling = FallingEdge(self.dut.clk_i)
        tx_ready_falling = FallingEdge(self.dut.tx_ready_o)
        res = await First(clk_falling, tx_ready_falling)

        if res == clk_falling:
            # TODO timeout
            await RisingEdge(self.dut.tx_ready_o)

        await RisingEdge(self.dut.clk_i)
            # now the data were registered
            # (note that it's ready, not confirmation
            # so data are sampled only after it actually is ready
            # that means the next clock cycle from when ready went to 1)

        self.dut.tx_valid_i.value = 0

    async def auto_receive(self, receive: bool = True, set_ready: bool = True):
        self._auto_receive = receive
        self._set_ready = set_ready
        self._sync.set()

    async def received_data(self):
        if self._received.empty():
            return None

        return await self._received.get()

    async def coroutine(self):
        while True:
            if not self._auto_receive:
                self.dut.rx_ready_i.value = 0
                await self._sync.wait()
                self._sync.clear()
                if not self._auto_receive:
                    continue

            if self._set_ready:
                self.dut.rx_ready_i.value = 1

            await RisingEdge(self.dut.rx_valid_o)
            if int(self.dut.rx_valid_o.value) == 1:
                await self._received.put(self.dut.rx_data_o.value)

            should_lose_data = not self._set_ready and self.dut.tx_valid_i.value == 1
            await RisingEdge(self.dut.clk_i)
            await RisingEdge(self.dut.clk_i)
            if should_lose_data and int(self.dut.err_lost_rx_data_o.value) != 1:
                self._log.error("Didn't get err lost rx data after rx valid")

            if should_lose_data:
                self.dut.clear_lost_rx_data_i.value = 1
                await RisingEdge(self.dut.clk_i)
                await FallingEdge(self.dut.clk_i)
                self.dut.clear_lost_rx_data_i.value = 0
                if int(self.dut.err_lost_rx_data_o.value) != 0:
                    self._log.error("Could not clear err lost rx data")


@cocotb.test()
async def simple_test(dut):
    clk = Clock(dut.clk_i, 5, "ns")
async def single_transmit(dut):
    clk = Clock(dut.clk_i, 5, "ns", impl = "py")
    interface = SpiInterface(dut.csn_io, dut.sck_io, dut.miso_io, dut.mosi_io)
    config = SpiConfig(8, RisingEdge, FallingEdge, 10, "ns")
    slave = SpiSlave(interface, config)
    driver = DutDriver(dut)

    await cocotb.start(clk.start())

    await init(dut)

    await cocotb.start(slave.coroutine())
    await cocotb.start(driver.coroutine())

    await slave.send_data(123, 8)
    # From slave point of view
    rx = random.randint(0, 255)
    tx = random.randint(0, 255)

    await slave.send_data(tx, 8)
    await slave.expect_transaction_in(15, "ns")

    await FallingEdge(dut.clk_i)
    dut.tx_valid_i.value = 1
    dut.tx_data_i.value = 100
    await driver.send_data(rx)

    await FallingEdge(dut.clk_i)
    dut.tx_valid_i.value = 0
    await slave.wait_all()

    await slave.wait_one()
    dut_received = await driver.receive_data()
    assert int(dut_received) & 0xFF == tx

    assert int(dut.rx_valid_o.value) == 1
    assert int(dut.rx_data_o.value) & 0xFF == 123
    # Wait a few clocks, rx data should still stay valid!
    await FallingEdge(dut.clk_i)
    await FallingEdge(dut.clk_i)
    await FallingEdge(dut.clk_i)
    await FallingEdge(dut.clk_i)

    received = await slave.received_data()
    assert received & 0xFF == rx

    await Timer(100, "ns")

@cocotb.test()
async def multiple_transmits(dut):
    clk = Clock(dut.clk_i, 5, "ns")
    interface = SpiInterface(dut.csn_io, dut.sck_io, dut.miso_io, dut.mosi_io)
    config = SpiConfig(8, RisingEdge, FallingEdge, 10, "ns")
    slave = SpiSlave(interface, config)
    driver = DutDriver(dut)

    await cocotb.start(clk.start())

    await init(dut)

    await cocotb.start(slave.coroutine())
    await cocotb.start(driver.coroutine())

    await driver.auto_receive()

    assert received & 0xFF == 100
    count = 5

    tx_data = [random.randint(0, 255) for i in range(count)]
    rx_data = [random.randint(0, 255) for i in range(count)]

    for tx in tx_data:
        await slave.send_data(tx, 8)
        dut._log.info(f"Sending Data from slave: {tx}")

    dut._log.info("To expect transaction")
    await slave.expect_transaction_in(15, "ns")

    for rx in rx_data:
        dut._log.info(f"Sending Data from master: {rx}")
        await driver.send_data_wait(rx)

    await slave.wait_all()

    # Checks
    for tx in tx_data:
        dut_received = await driver.received_data()
        assert int(dut_received) & 0xFF == tx

    for rx in rx_data:
        received = await slave.received_data()
        assert received & 0xFF == rx

    await Timer(100, "ns")

@cocotb.test()
async def lost_rx_data(dut):
    clk = Clock(dut.clk_i, 5, "ns")
    interface = SpiInterface(dut.csn_io, dut.sck_io, dut.miso_io, dut.mosi_io)
    config = SpiConfig(8, RisingEdge, FallingEdge, 10, "ns")
    slave = SpiSlave(interface, config)
    driver = DutDriver(dut)

    await cocotb.start(clk.start())

    await init(dut)

    await cocotb.start(slave.coroutine())
    await cocotb.start(driver.coroutine())

    # Do not confirm reception of data. That will auto check
    # if lost is asserted and cleared appropriately.
    await driver.auto_receive(True, False)

    count = 5

    tx_data = [random.randint(0, 255) for i in range(count)]
    rx_data = [random.randint(0, 255) for i in range(count)]

    for tx in tx_data:
        await slave.send_data(tx, 8)

    await slave.expect_transaction_in(15, "ns")

    for rx in rx_data:
        await driver.send_data_wait(rx)

    await slave.wait_all()

    # Checks
    # Even though the DUT thinks data were lost, they were actually
    # read and should still be valid data.
    for tx in tx_data:
        dut_received = await driver.received_data()
        assert int(dut_received) & 0xFF == tx

    for rx in rx_data:
        received = await slave.received_data()
        assert received & 0xFF == rx

    await Timer(100, "ns")

# Simple one receive, transmit
  # Check csn goes low
  # Check 8 sck pulses
# All clock phases and polarities
# All sizes, divisors
# Rx blocking - Can't go to another transmission until data confirmed.
  # When data read a bit later, and csn pulsing is enabled, the csn should still pulse, before data are obtained

# Multiple times receive, transmit without pulse
  # All clock phases and polarities

# Multiple times receive, transmit with pulse

# Only transmission
  # Rx is not valid
# Only reception
  # Z on mosi
# All clock phases and polarities
# csn pulse
# rx_en off - miso should be ignored, no rx_valid is always 0. Tx works fine
# tx_en off - mosi should be Z, tx_ready is always 0. Rx works fine

Do not follow this link