~ruther/vhdl-spi-2

6d0d2eed312340d8e59a96719438d502d0c982fb — Rutherther 3 months ago 19cab45
feat: add tests for clock phase, polarity
1 files changed, 113 insertions(+), 48 deletions(-)

M hdl_spi/tests/test.py
M hdl_spi/tests/test.py => hdl_spi/tests/test.py +113 -48
@@ 70,7 70,7 @@ class SpiSlave:
        self._idle.clear()
        self._wakeup.clear()

        self.log = logging.getLogger("SpiSlave")
        self._log = logging.getLogger("cocotb.SpiSlave")

        self.config = config
        self.transactions = Queue()


@@ 109,7 109,7 @@ class SpiSlave:
            #     self.log.error("CSN fell when transaction was unexpected.")
            #     continue

            self.log.info("Got new transaction expectation.")
            self._log.info("Got new transaction expectation.")

            self._wakeup.clear()
            max, unit = self.data


@@ 120,18 120,21 @@ class SpiSlave:
            first = await First(csn_falling, Timer(max, unit))

            if first != csn_falling:
                self.log.error(f"CSN did not fall in time ({max} {unit})!")
                self._log.error(f"CSN did not fall in time ({max} {unit})!")
                continue

            # csn fell

            first = True
            while not self.transactions.empty():
                # expect clock
                data, len = await self.transactions.get()
                total_len = len
                self._transaction.set()
                self.tx.value = cocotb.handle.Force((data >> (total_len - 1)) & 1)
                data = data << 1
                if first:
                    self.tx.value = cocotb.handle.Force((data >> (total_len - 1)) & 1)
                    data = data << 1

                received = 0

                got_sampling = False


@@ 143,28 146,33 @@ class SpiSlave:
                    res = await First(sampling, shifting, timeout)

                    if res == timeout:
                        self.log.error("Got no sck edge in time!")
                        self._log.error("Got no sck edge in time!")
                        continue

                    if res == shifting:
                        if got_sampling:
                        if got_sampling or not first:
                            self.tx.value = cocotb.handle.Force((data >> (total_len - 1)) & 1)
                            data = data << 1
                    else:
                    elif res == sampling:
                        got_sampling = True
                        received = received << 1
                        received |= int(self.rx.value)
                        len -= 1

                    await Timer(self.config.sck_period / 4, self.config.sck_period_unit)
                await self.received.put(received)
                self.log.info(f"Received {received}")

                self._log.info(f"Received {received}")
                first = False

            # now wait for csn rising
            timeout = Timer(self.config.sck_period * 2, self.config.sck_period_unit)
            csn_rising = RisingEdge(self.csn)
            res = await First(csn_rising, timeout)

            self._log.error("TEST fs")
            if res == timeout:
                self.log.error("Got no rising edge on csn")
                self._log.error("Got no rising edge on csn")
                continue

            self.tx.value = cocotb.handle.Release()


@@ 207,7 215,7 @@ async def init(dut, master: int = 1, tx_en: int = 1):
class DutDriver:
    def __init__(self, dut):
        self.dut = dut
        self._log = logging.getLogger("DutDriver")
        self._log = logging.getLogger("cocotb.DutDriver")
        self._received = Queue()
        self._sync = Event()
        self._auto_receive = False


@@ 340,25 348,8 @@ async def single_transmit(dut):

    await Timer(100, "ns")

@cocotb.test()
async def multiple_transmits(dut):
    clk = Clock(dut.clk_i, 5, "ns")
    interface = SpiInterface(dut.csn_io, dut.sck_io, dut.miso_io, dut.mosi_io)
    config = SpiConfig(8, RisingEdge, FallingEdge, 10, "ns")
    slave = SpiSlave(interface, config)
    driver = DutDriver(dut)

    await cocotb.start(clk.start())

    await init(dut)

    await cocotb.start(slave.coroutine())
    await cocotb.start(driver.coroutine())

    await driver.auto_receive()

    count = 5

async def perform_multiple_transmits(count, dut, slave, driver):
    tx_data = [random.randint(0, 255) for i in range(count)]
    rx_data = [random.randint(0, 255) for i in range(count)]



@@ 384,6 375,26 @@ async def multiple_transmits(dut):
        received = await slave.received_data()
        assert received & 0xFF == rx

@cocotb.test()
async def multiple_transmits(dut):
    clk = Clock(dut.clk_i, 5, "ns")
    interface = SpiInterface(dut.csn_io, dut.sck_io, dut.miso_io, dut.mosi_io)
    config = SpiConfig(8, RisingEdge, FallingEdge, 10, "ns")
    slave = SpiSlave(interface, config)
    driver = DutDriver(dut)

    await cocotb.start(clk.start())

    await init(dut)

    await cocotb.start(slave.coroutine())
    await cocotb.start(driver.coroutine())

    await driver.auto_receive()

    count = 5
    await perform_multiple_transmits(count, dut, slave, driver)

    await Timer(100, "ns")

@cocotb.test()


@@ 406,35 417,89 @@ async def lost_rx_data(dut):
    await driver.auto_receive(True, False)

    count = 5
    await perform_multiple_transmits(count, dut, slave, driver)

    tx_data = [random.randint(0, 255) for i in range(count)]
    rx_data = [random.randint(0, 255) for i in range(count)]
    await Timer(100, "ns")

    for tx in tx_data:
        await slave.send_data(tx, 8)
@cocotb.test()
async def different_clock(dut):
    clk = Clock(dut.clk_i, 5, "ns")
    interface = SpiInterface(dut.csn_io, dut.sck_io, dut.miso_io, dut.mosi_io)
    config = SpiConfig(8, RisingEdge, FallingEdge, 20, "ns")
    slave = SpiSlave(interface, config)
    driver = DutDriver(dut)

    await slave.expect_transaction_in(15, "ns")
    await cocotb.start(clk.start())

    for rx in rx_data:
        await driver.send_data_wait(rx)
    await init(dut)
    dut.div_sel_i.value = 1 # divide by 4

    await slave.wait_all()
    await cocotb.start(slave.coroutine())
    await cocotb.start(driver.coroutine())

    # Checks
    # Even though the DUT thinks data were lost, they were actually
    # read and should still be valid data.
    for tx in tx_data:
        dut_received = await driver.received_data()
        assert int(dut_received) & 0xFF == tx
    await driver.auto_receive()

    for rx in rx_data:
        received = await slave.received_data()
        assert received & 0xFF == rx
    count = 5
    await perform_multiple_transmits(count, dut, slave, driver)

    await Timer(100, "ns")

# All clock phases and polarities
# All sizes, divisors
@cocotb.test()
async def inverted_clock(dut):
    clk = Clock(dut.clk_i, 5, "ns")
    interface = SpiInterface(dut.csn_io, dut.sck_io, dut.miso_io, dut.mosi_io)
    config = SpiConfig(8, RisingEdge, FallingEdge, 20, "ns")
    slave = SpiSlave(interface, config)
    driver = DutDriver(dut)

    await cocotb.start(clk.start())

    await init(dut)
    dut.clock_phase_i.value = 1
    dut.clock_polarity_i.value = 1

    await cocotb.start(slave.coroutine())
    await cocotb.start(driver.coroutine())

    await driver.auto_receive()

    count = 5
    await perform_multiple_transmits(count, dut, slave, driver)

    await Timer(100, "ns")

@cocotb.test()
async def shifted_inverted_clock(dut):
    clk = Clock(dut.clk_i, 5, "ns")
    interface = SpiInterface(dut.csn_io, dut.sck_io, dut.miso_io, dut.mosi_io)
    config = SpiConfig(8, FallingEdge, RisingEdge, 20, "ns")
    slave = SpiSlave(interface, config)
    driver = DutDriver(dut)

    await cocotb.start(clk.start())

    await init(dut)
    dut.clock_phase_i.value = 0
    dut.clock_polarity_i.value = 1
    await FallingEdge(dut.clk_i)

    await cocotb.start(slave.coroutine())
    await cocotb.start(driver.coroutine())

    await driver.auto_receive()

    count = 3
    await perform_multiple_transmits(count, dut, slave, driver)

    dut.clock_phase_i.value = 1
    dut.clock_polarity_i.value = 0
    await FallingEdge(dut.clk_i)

    await perform_multiple_transmits(count, dut, slave, driver)

    await Timer(100, "ns")

# All sizes
# Rx blocking - Can't go to another transmission until data confirmed.
  # When data read a bit later, and csn pulsing is enabled, the csn should still pulse, before data are obtained


Do not follow this link