import cocotb
import cocotb.handle
from dataclasses import dataclass
from cocotb.queue import Queue
from cocotb.clock import Clock
from cocotb.triggers import Trigger, First, Event, Timer, Edge, RisingEdge, FallingEdge
import logging
@dataclass
class SpiInterface:
csn: cocotb.handle.LogicObject
sck: cocotb.handle.LogicObject
miso: cocotb.handle.LogicObject
mosi: cocotb.handle.LogicObject
# class SignalMonitor:
# def expect_change_to_after() -> SignalMonitor:
# def expect_stable_for() -> SignalMonitor:
# def expect_value() -> SignalMonitor:
# def step() -> SignalMonitor:
# async def coroutine():
# class ClockedSignalMonitor:
# def __init__(clock, signal, sampling = RisingEdge):
# self.signal = signal
# self.sampling = sampling
# self.events = [ [] ]
# def expect_change_to_after() -> SignalMonitor:
# def expect_stable_for() -> SignalMonitor:
# def expect_value() -> SignalMonitor:
# def set_value() -> SignalMonitor:
# def set_callback() -> SignalMonitor:
# def step() -> SignalMonitor:
# self.events.append([])
# async def coroutine():
# while True:
# await self.sampling
# if self.events.len() == 0:
# continue
# if self.events[0].len() == 0:
# continue
# events = self.events.pop(0)
# for event in events:
@dataclass
class SpiConfig:
transaction_len: int
sampling: Trigger
shifting: Trigger
sck_period: int
sck_period_unit: str
csn_pulse: bool = False
class SpiSlave:
def __init__(self, spi: SpiInterface, config: SpiConfig):
self._transaction = Event("spi_slave_transaction")
self._idle = Event("spi_idle")
self._wakeup = Event("spi_slave_wakeup")
self._transaction.clear()
self._idle.clear()
self._wakeup.clear()
self._log = logging.getLogger("cocotb.SpiSlave")
self.config = config
self.transactions = Queue()
self.received = Queue()
self.sck = spi.sck
self.csn = spi.csn
self.tx = spi.miso
self.rx = spi.mosi
async def send_data(self, data: int, len: int):
item = (data, len)
await self.transactions.put(item)
async def received_data(self) -> int:
return await self.received.get()
async def expect_transaction_in(self, max: int, unit: str = "ns"):
self.data = (max, unit)
self._wakeup.set()
async def wait_one(self):
self._transaction.clear()
await self._transaction.wait()
async def wait_all(self):
await self._idle.wait()
async def coroutine(self):
while True:
if self.transactions.empty():
self._idle.set()
await self._wakeup.wait()
# first = await First(wakeup, csn_falling)
# if first == csn_falling:
# self.log.error("CSN fell when transaction was unexpected.")
# continue
self._log.info("Got new transaction expectation.")
self._wakeup.clear()
self._idle.clear()
max, unit = self.data
csn_falling = FallingEdge(self.csn)
first = await First(csn_falling, Timer(max, unit))
if first != csn_falling:
self._log.error(f"CSN did not fall in time ({max} {unit})!")
raise Exception(f"CSN did not fall in time ({max} {unit})!")
continue
# csn fell
first = True
while not self.transactions.empty():
# expect clock
data, len = await self.transactions.get()
total_len = len
self._transaction.set()
if first:
self.tx.value = cocotb.handle.Force((data >> (total_len - 1)) & 1)
data = data << 1
received = 0
got_sampling = False
while len > 0:
sampling = self.config.sampling(self.sck)
shifting = self.config.shifting(self.sck)
timeout = Timer(self.config.sck_period * 2, self.config.sck_period_unit)
res = await First(sampling, shifting, timeout)
if res == timeout:
self._log.error("Got no sck edge in time!")
raise Exception("Got not sck edge in time")
continue
if res == shifting:
if got_sampling or not first:
self.tx.value = cocotb.handle.Force((data >> (total_len - 1)) & 1)
data = data << 1
elif res == sampling:
got_sampling = True
received = received << 1
try: # if z or something, just leave it be
received |= int(self.rx.value)
except Exception:
pass
len -= 1
await Timer(1, "ns")
await self.received.put(received)
self._log.info(f"Received {received}")
first = False
if self.config.csn_pulse:
break
# now wait for csn rising
timeout = Timer(self.config.sck_period * 2, self.config.sck_period_unit)
# sck_edge = Edge(self.sck)
csn_rising = RisingEdge(self.csn)
res = await First(csn_rising, timeout)
if res == timeout:
self._log.error("Got no rising edge on csn")
raise Exception("Got no rising edge on csn")
continue
# elif res == sck_edge:
# self._log.error("Got sck edge when csn rising was expected")
self._log.info("csn is rising")
self.tx.value = cocotb.handle.Release()
# good, continue
self._transaction.set()