~ruther/vhdl-spi-2

e70719e8cf9dafc7497ce10d233b2251c730b7eb — Rutherther 3 months ago 6883a17
feat: add tests for rx blocking
1 files changed, 71 insertions(+), 9 deletions(-)

M hdl_spi/tests/test_spi_masterslave.py
M hdl_spi/tests/test_spi_masterslave.py => hdl_spi/tests/test_spi_masterslave.py +71 -9
@@ 101,9 101,10 @@ class DutDriver:

        self.dut.tx_valid_i.value = 0

    async def auto_receive(self, receive: bool = True, set_ready: bool = True):
    async def auto_receive(self, receive: bool = True, set_ready: bool = True, waiting_clocks: int = 0):
        self._auto_receive = receive
        self._set_ready = set_ready
        self._waiting_clocks = waiting_clocks
        self._sync.set()

    async def received_data(self):


@@ 121,13 122,20 @@ class DutDriver:
                if not self._auto_receive:
                    continue

            if self._set_ready:
            if self._set_ready and self._waiting_clocks == 0:
                self.dut.rx_ready_i.value = 1

            await RisingEdge(self.dut.rx_valid_o)
            if int(self.dut.rx_valid_o.value) == 1:
                await self._received.put(self.dut.rx_data_o.value)

            if self._waiting_clocks > 0:
                for i in range(self._waiting_clocks):
                    await FallingEdge(self.dut.clk_i)
                self.dut.rx_ready_i.value = 1
                await FallingEdge(self.dut.clk_i)
                self.dut.rx_ready_i.value = 0

            should_lose_data = not self._set_ready and self.dut.tx_valid_i.value == 1
            await RisingEdge(self.dut.clk_i)
            await RisingEdge(self.dut.clk_i)


@@ 184,12 192,12 @@ async def single_transmit(dut):
    await Timer(100, "ns")


async def perform_multiple_transmits(count, dut, slave, driver):
    tx_data = [random.randint(0, 255) for i in range(count)]
    rx_data = [random.randint(0, 255) for i in range(count)]
async def perform_multiple_transmits(count, dut, slave, driver, size = 8):
    tx_data = [random.randint(0, 2**size - 1) for i in range(count)]
    rx_data = [random.randint(0, 2**size - 1) for i in range(count)]

    for tx in tx_data:
        await slave.send_data(tx, 8)
        await slave.send_data(tx, size)
        dut._log.info(f"Sending Data from slave: {tx}")

    dut._log.info("To expect transaction")


@@ 201,14 209,18 @@ async def perform_multiple_transmits(count, dut, slave, driver):

    await slave.wait_all()

    mask = 0
    for i in range(size):
        mask |= 1 << i

    # Checks
    for tx in tx_data:
        dut_received = await driver.received_data()
        assert int(dut_received) & 0xFF == tx
        assert int(dut_received) & mask == tx

    for rx in rx_data:
        received = await slave.received_data()
        assert received & 0xFF == rx
        assert received & mask == rx

@cocotb.test()
async def multiple_transmits(dut):


@@ 334,7 346,57 @@ async def shifted_inverted_clock(dut):

    await Timer(100, "ns")

# All sizes
@cocotb.test()
async def sixteen_bits(dut):
    clk = Clock(dut.clk_i, 5, "ns")
    interface = SpiInterface(dut.csn_io, dut.sck_io, dut.miso_io, dut.mosi_io)
    config = SpiConfig(16, FallingEdge, RisingEdge, 20, "ns")
    slave = SpiSlave(interface, config)
    driver = DutDriver(dut)

    await cocotb.start(clk.start())

    await init(dut)
    dut.clock_phase_i.value = 0
    dut.clock_polarity_i.value = 1
    dut.size_sel_i.value = 1
    await FallingEdge(dut.clk_i)

    await cocotb.start(slave.coroutine())
    await cocotb.start(driver.coroutine())

    await driver.auto_receive()

    count = 5
    await perform_multiple_transmits(count, dut, slave, driver, 16)

    await Timer(100, "ns")


@cocotb.test()
async def rx_blocking_tx(dut):
    clk = Clock(dut.clk_i, 5, "ns")
    interface = SpiInterface(dut.csn_io, dut.sck_io, dut.miso_io, dut.mosi_io)
    config = SpiConfig(16, RisingEdge, FallingEdge, 20, "ns")
    slave = SpiSlave(interface, config)
    driver = DutDriver(dut)

    await cocotb.start(clk.start())

    await init(dut)
    dut.size_sel_i.value = 1
    dut.rx_block_on_full_i.value = 1
    await FallingEdge(dut.clk_i)

    await cocotb.start(slave.coroutine())
    await cocotb.start(driver.coroutine())

    await driver.auto_receive(waiting_clocks = 5)

    count = 5
    await perform_multiple_transmits(count, dut, slave, driver, 16)

    await Timer(100, "ns")
# Rx blocking - Can't go to another transmission until data confirmed.
  # When data read a bit later, and csn pulsing is enabled, the csn should still pulse, before data are obtained


Do not follow this link