feat(ble_log_console): add backend with frame parser, stats, and data models

Modular backend for the BLE log console rewrite:

- Frame parser with sync state machine and checksum auto-detection
  (4 modes: XOR/Sum x Full/Header-only); handles incomplete frames
  during re-sync search when previously synced
- Internal frame decoder (INIT_DONE, ENH_STAT, FLUSH, INFO)
- Data models: SourceCode, FrameByteCount, FunnelSnapshot, LossType
- Stats package with composition-root StatsAccumulator orchestrating:
  - TransportMetrics (RX bytes, lifetime-average throughput)
  - FirmwareLossTracker / FirmwareWrittenTracker (ENH_STAT deltas
    with first-report absolute value initialization)
  - SNGapTracker (sliding window reorder-tolerant SN gap detection)
  - PeakBurstTracker (per-source sliding window burst density)
  - TrafficSpikeDetector (wire utilization spike detection)
  - Wall-clock burst tracker for non-timestamped sources (REDIR)
- Torn-read guard on ENH_STAT reports (baudrate-based plausibility)
  with prev-state update on discard to prevent cascading drops
- Console-local metrics (TransportMetrics, PeakBurstTracker) preserved
  across INIT_DONE resets; only ENH_STAT-coupled components reset
- UART transport with port validation and exclusive serial access
- Comprehensive test suite (17 test files, 223 tests)
This commit is contained in:
Zhou Xiao
2026-03-23 02:10:29 +08:00
parent 42f2aeedb7
commit 9046c77e52
33 changed files with 3993 additions and 0 deletions
+7
View File
@@ -0,0 +1,7 @@
# SPDX-FileCopyrightText: 2026 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Apache-2.0
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent))
+5
View File
@@ -0,0 +1,5 @@
# Runtime dependencies (textual, pyserial, click) are managed by the ESP-IDF
# virtual environment via tools/requirements/requirements.core.txt, not here.
[tool.pytest.ini_options]
testpaths = ["tests"]
@@ -0,0 +1,2 @@
# SPDX-FileCopyrightText: 2026 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Apache-2.0
@@ -0,0 +1,43 @@
# SPDX-FileCopyrightText: 2026 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Apache-2.0
"""Checksum implementations matching BLE Log firmware (ble_log_util.c).
Two algorithms:
- sum_checksum: byte-by-byte sum
- xor_checksum: 32-bit word XOR matching firmware ble_log_fast_checksum()
The firmware's ror32 alignment compensation makes the XOR checksum
alignment-independent — simple word-by-word XOR produces the same result
regardless of the original buffer alignment.
"""
import struct
def sum_checksum(data: bytes) -> int:
return sum(data) & 0xFFFFFFFF
def xor_checksum(data: bytes) -> int:
"""Compute XOR checksum matching firmware ble_log_fast_checksum().
XORs consecutive 4-byte little-endian words. Partial last word is
zero-padded. Alignment-independent due to firmware's ror32 compensation.
"""
length = len(data)
if length == 0:
return 0
checksum = 0
for i in range(0, length, 4):
remaining = length - i
if remaining >= 4:
(word,) = struct.unpack_from('<I', data, i)
else:
chunk = data[i:] + b'\x00' * (4 - remaining)
(word,) = struct.unpack('<I', chunk)
checksum ^= word
return checksum & 0xFFFFFFFF
@@ -0,0 +1,272 @@
# SPDX-FileCopyrightText: 2026 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Apache-2.0
"""Frame sync state machine with checksum auto-detection.
Parses BLE Log module frames from a raw byte stream.
See Spec Sections 7, 8.
"""
from collections.abc import Callable
from src.backend.checksum import sum_checksum
from src.backend.checksum import xor_checksum
from src.backend.models import CHECKSUM_STRUCT
from src.backend.models import FRAME_HEADER_SIZE
from src.backend.models import FRAME_OVERHEAD
from src.backend.models import HEADER_STRUCT
from src.backend.models import MAX_FRAME_SIZE
from src.backend.models import MAX_REMAINDER_SIZE
from src.backend.models import ChecksumAlgorithm
from src.backend.models import ChecksumMode
from src.backend.models import ChecksumScope
from src.backend.models import ParsedFrame
from src.backend.models import SyncState
ChecksumProbe = tuple[ChecksumAlgorithm, ChecksumScope, Callable[[bytes], int]]
_CHECKSUM_PROBES: list[ChecksumProbe] = [
(ChecksumAlgorithm.XOR, ChecksumScope.FULL, xor_checksum),
(ChecksumAlgorithm.XOR, ChecksumScope.HEADER_ONLY, xor_checksum),
(ChecksumAlgorithm.SUM, ChecksumScope.FULL, sum_checksum),
(ChecksumAlgorithm.SUM, ChecksumScope.HEADER_ONLY, sum_checksum),
]
SYNC_CONFIRM_THRESHOLD = 3 # N consecutive valid frames to confirm sync
LOSS_TOLERANCE = 3 # M consecutive failures before resync
class FrameParser:
"""Stateful frame parser with sync state machine and checksum auto-detection."""
def __init__(self) -> None:
self._remained = b''
self._sync_state = SyncState.SEARCHING
self._checksum_mode: ChecksumMode | None = None
self._confirm_count = 0
self._loss_count = 0
self._ascii_buffer = ''
self._ever_synced = False
@property
def sync_state(self) -> SyncState:
return self._sync_state
@property
def checksum_mode(self) -> ChecksumMode | None:
return self._checksum_mode
def feed(self, data: bytes) -> list[ParsedFrame | str]:
"""Feed raw bytes into the parser.
Returns a list of:
- ParsedFrame for successfully parsed frames
- str for ASCII log lines or warning messages
"""
self._remained += data
results: list[ParsedFrame | str] = []
# Bounded buffer check (Review Correction #2)
if len(self._remained) > MAX_REMAINDER_SIZE:
self._remained = b''
self._transition_to(SyncState.SEARCHING)
results.append('[WARN] Buffer overflow — discarded remainder, resync')
return results
offset = 0
buf = self._remained
while offset < len(buf):
if self._sync_state in (SyncState.SEARCHING, SyncState.CONFIRMING_SYNC):
result = self._try_parse_with_probe(buf, offset)
if result is not None:
frame, next_offset, mode = result
self._flush_ascii(results)
results.append(frame)
offset = next_offset
self._on_frame_found(mode)
elif self._sync_state == SyncState.CONFIRMING_SYNC and self._might_be_incomplete_frame(buf, offset):
break
elif (
self._sync_state == SyncState.SEARCHING
and self._ever_synced
and self._might_be_incomplete_frame(buf, offset)
):
break
else:
if not self._ever_synced:
self._collect_ascii(buf[offset : offset + 1], results)
offset += 1
else:
# SYNCED or CONFIRMING_LOSS: use locked checksum mode
result_locked = self._try_parse_locked(buf, offset)
if result_locked is not None:
frame, next_offset = result_locked
self._flush_ascii(results)
results.append(frame)
offset = next_offset
self._on_frame_valid()
else:
# Check if we might have incomplete data at the end
if self._might_be_incomplete_frame(buf, offset):
break
self._on_frame_invalid()
if self._sync_state == SyncState.SEARCHING:
# Full resync — reprocess from current offset
continue
# Silently discard — do NOT collect ASCII here.
# In CONFIRMING_LOSS, failed bytes are corrupt frame data,
# not readable text. Collecting them would leak binary
# payload bytes that happen to be printable (0x20-0x7E).
offset += 1
# Save remainder
self._remained = buf[offset:] if offset < len(buf) else b''
self._flush_ascii(results)
return results
def _try_parse_at(
self,
buf: bytes,
offset: int,
checksum_fn: Callable[[bytes], int],
scope: ChecksumScope,
) -> tuple[ParsedFrame, int] | None:
"""Try to parse a frame at the given offset with specific checksum params."""
if offset + FRAME_HEADER_SIZE > len(buf):
return None
payload_len, frame_meta = HEADER_STRUCT.unpack_from(buf, offset)
# Sanity checks
if payload_len > MAX_FRAME_SIZE:
return None
if offset + FRAME_OVERHEAD + payload_len > len(buf):
return None
header = buf[offset : offset + FRAME_HEADER_SIZE]
payload = buf[offset + FRAME_HEADER_SIZE : offset + FRAME_HEADER_SIZE + payload_len]
checksum_offset = offset + FRAME_HEADER_SIZE + payload_len
stored_checksum = CHECKSUM_STRUCT.unpack_from(buf, checksum_offset)[0]
# Compute checksum
if scope == ChecksumScope.FULL:
checksum_data = header + payload
else:
checksum_data = header
computed = checksum_fn(checksum_data)
if computed != stored_checksum:
return None
source_code = frame_meta & 0xFF
frame_sn = frame_meta >> 8
# Extract os_ts from first 4 bytes of payload
os_ts_ms = 0
if payload_len >= 4:
os_ts_ms = int.from_bytes(payload[:4], 'little')
frame = ParsedFrame(
source_code=source_code,
frame_sn=frame_sn,
payload=payload,
os_ts_ms=os_ts_ms,
)
next_offset = offset + FRAME_OVERHEAD + payload_len
return frame, next_offset
def _try_parse_with_probe(self, buf: bytes, offset: int) -> tuple[ParsedFrame, int, ChecksumMode] | None:
"""Try all checksum combinations at the given offset (SEARCHING mode)."""
for algo, scope, fn in _CHECKSUM_PROBES:
result = self._try_parse_at(buf, offset, fn, scope)
if result is not None:
frame, next_offset = result
mode = ChecksumMode(algo, scope)
return frame, next_offset, mode
return None
def _try_parse_locked(self, buf: bytes, offset: int) -> tuple[ParsedFrame, int] | None:
"""Try to parse with the locked checksum mode."""
if self._checksum_mode is None:
return None
fn = xor_checksum if self._checksum_mode.algorithm == ChecksumAlgorithm.XOR else sum_checksum
return self._try_parse_at(buf, offset, fn, self._checksum_mode.scope)
def _on_frame_found(self, mode: ChecksumMode) -> None:
"""Called when a frame is found during SEARCHING/CONFIRMING_SYNC."""
if self._sync_state == SyncState.SEARCHING:
self._checksum_mode = mode
self._confirm_count = 1
self._transition_to(SyncState.CONFIRMING_SYNC)
elif self._sync_state == SyncState.CONFIRMING_SYNC:
# Review Correction #3: verify same checksum mode
if (
self._checksum_mode is not None
and mode.algorithm == self._checksum_mode.algorithm
and mode.scope == self._checksum_mode.scope
):
self._confirm_count += 1
if self._confirm_count >= SYNC_CONFIRM_THRESHOLD:
self._transition_to(SyncState.SYNCED)
else:
# Mode mismatch — restart confirmation with new mode
self._checksum_mode = mode
self._confirm_count = 1
def _on_frame_valid(self) -> None:
"""Called when a frame passes checksum in SYNCED/CONFIRMING_LOSS."""
self._loss_count = 0
if self._sync_state == SyncState.CONFIRMING_LOSS:
self._transition_to(SyncState.SYNCED)
def _on_frame_invalid(self) -> None:
"""Called when checksum fails in SYNCED/CONFIRMING_LOSS."""
if self._sync_state == SyncState.SYNCED:
self._loss_count = 1
self._transition_to(SyncState.CONFIRMING_LOSS)
elif self._sync_state == SyncState.CONFIRMING_LOSS:
self._loss_count += 1
if self._loss_count > LOSS_TOLERANCE:
self._transition_to(SyncState.SEARCHING)
self._checksum_mode = None
self._confirm_count = 0
self._loss_count = 0
def _might_be_incomplete_frame(self, buf: bytes, offset: int) -> bool:
"""Check if remaining data could be a partial frame waiting for more data."""
remaining = len(buf) - offset
if remaining < FRAME_OVERHEAD:
return True
if remaining >= FRAME_HEADER_SIZE:
payload_len, _ = HEADER_STRUCT.unpack_from(buf, offset)
if payload_len <= MAX_FRAME_SIZE and remaining < FRAME_OVERHEAD + payload_len:
return True
return False
def _transition_to(self, new_state: SyncState) -> None:
if new_state == SyncState.SYNCED:
self._ever_synced = True
self._sync_state = new_state
def _collect_ascii(self, byte_data: bytes, results: list[ParsedFrame | str]) -> None:
"""Collect bytes for ASCII line assembly.
Only printable ASCII (0x20-0x7E) and newline (0x0A) are collected.
Carriage return (0x0D) and other control characters are silently
dropped, which normalises \\r\\n line endings to \\n for display.
"""
for b in byte_data:
if 0x20 <= b <= 0x7E:
self._ascii_buffer += chr(b)
elif b == 0x0A: # newline
if self._ascii_buffer:
results.append(self._ascii_buffer)
self._ascii_buffer = ''
def _flush_ascii(self, results: list[ParsedFrame | str]) -> None:
"""Flush any pending ASCII buffer."""
if self._ascii_buffer:
results.append(self._ascii_buffer)
self._ascii_buffer = ''
@@ -0,0 +1,77 @@
# SPDX-FileCopyrightText: 2026 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Apache-2.0
"""Decode BLE_LOG_SRC_INTERNAL(0) frame payloads.
Payload format on wire: [4B os_ts][1B int_src_code][variable sub-payload]
See Spec Section 9.
"""
import struct
from src.backend.models import EnhStatResult
from src.backend.models import InfoResult
from src.backend.models import InternalDecoderResult
from src.backend.models import InternalSource
# Minimum payload size: 4B os_ts + 1B int_src_code
_MIN_PAYLOAD_SIZE = 5
# ble_log_info_t: [1B int_src_code][1B version] — used by INIT_DONE, INFO, FLUSH
_INFO_STRUCT = struct.Struct('<BB')
# ble_log_enh_stat_t: [1B int_src_code][1B src_code][4B written][4B lost][4B written_bytes][4B lost_bytes]
_ENH_STAT_STRUCT = struct.Struct('<BBIIII')
def decode_internal_frame(payload: bytes) -> InternalDecoderResult | None:
"""Decode an INTERNAL frame payload.
Args:
payload: Full frame payload including os_ts prefix.
Returns:
Typed dict with decoded fields, or None if the frame should be ignored (TS) or is malformed.
"""
if len(payload) < _MIN_PAYLOAD_SIZE:
return None
os_ts_ms = struct.unpack_from('<I', payload, 0)[0]
int_src_code = payload[4]
sub_payload = payload[4:] # starts at int_src_code
try:
int_src = InternalSource(int_src_code)
except ValueError:
return None
if int_src == InternalSource.TS:
return None # Ignored per spec
if int_src in (InternalSource.INIT_DONE, InternalSource.INFO, InternalSource.FLUSH):
if len(sub_payload) < _INFO_STRUCT.size:
return None
_, version = _INFO_STRUCT.unpack_from(sub_payload, 0)
return InfoResult(
int_src=int_src,
version=version,
os_ts_ms=os_ts_ms,
)
if int_src == InternalSource.ENH_STAT:
if len(sub_payload) < _ENH_STAT_STRUCT.size:
return None
_, src_code, written_frame_cnt, lost_frame_cnt, written_bytes_cnt, lost_bytes_cnt = (
_ENH_STAT_STRUCT.unpack_from(sub_payload, 0)
)
return EnhStatResult(
int_src=int_src,
src_code=src_code,
written_frame_cnt=written_frame_cnt,
lost_frame_cnt=lost_frame_cnt,
written_bytes_cnt=written_bytes_cnt,
lost_bytes_cnt=lost_bytes_cnt,
os_ts_ms=os_ts_ms,
)
return None
@@ -0,0 +1,337 @@
# SPDX-FileCopyrightText: 2026 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Apache-2.0
import struct
from dataclasses import dataclass
from dataclasses import field
from enum import Enum
from pathlib import Path
from typing import TypedDict
from textual.message import Message
# --- Frame constants (Spec Section 7) ---
FRAME_HEADER_SIZE = 6 # 2B payload_len + 4B frame_meta
FRAME_TAIL_SIZE = 4 # 4B checksum
FRAME_OVERHEAD = FRAME_HEADER_SIZE + FRAME_TAIL_SIZE # 10
MAX_FRAME_SIZE = 2048 # Max payload_len sanity check
MAX_REMAINDER_SIZE = 131072 # 128KB bounded buffer
HEADER_FMT = '<HI' # payload_len (uint16), frame_meta (uint32)
CHECKSUM_FMT = '<I' # checksum (uint32)
HEADER_STRUCT = struct.Struct(HEADER_FMT)
CHECKSUM_STRUCT = struct.Struct(CHECKSUM_FMT)
# --- Formatting helpers ---
def format_bytes(cnt: int) -> str:
"""Format byte count as human-readable string (B / KB / MB)."""
if cnt < 1024:
return f'{cnt} B'
elif cnt < 1024 * 1024:
return f'{cnt / 1024:.1f} KB'
else:
return f'{cnt / 1024 / 1024:.2f} MB'
def format_throughput(bytes_per_sec: float) -> str:
"""Format throughput as human-readable string with auto KB/s ↔ MB/s switching."""
kb_per_sec = bytes_per_sec / 1024
if kb_per_sec < 1024:
return f'{kb_per_sec:.1f} KB/s'
else:
return f'{kb_per_sec / 1024:.2f} MB/s'
# --- Enums ---
class SyncState(str, Enum):
SEARCHING = 'SEARCHING'
CONFIRMING_SYNC = 'CONFIRMING'
SYNCED = 'SYNCED'
CONFIRMING_LOSS = 'CONFIRMING_LOSS'
class ChecksumAlgorithm(str, Enum):
XOR = 'XOR'
SUM = 'Sum'
class ChecksumScope(str, Enum):
FULL = 'Header+Payload'
HEADER_ONLY = 'Header'
class BleLogSource(int, Enum):
INTERNAL = 0
CUSTOM = 1
LL_TASK = 2
LL_HCI = 3
LL_ISR = 4
HOST = 5
HCI = 6
ENCODE = 7
REDIR = 8 # BLE_LOG_SRC_REDIR in firmware ble_log.h (UART PORT 0 only)
# Type alias for source code values (BleLogSource member or unknown firmware code).
SourceCode = int
# Sources written via ble_log_write_hex_ll() or stream_write -- no 4-byte os_ts prefix.
_NO_OS_TS_SOURCES: frozenset[int] = frozenset(
{BleLogSource.LL_TASK, BleLogSource.LL_HCI, BleLogSource.LL_ISR, BleLogSource.REDIR}
)
_LL_SOURCES: frozenset[int] = frozenset({BleLogSource.LL_TASK, BleLogSource.LL_HCI, BleLogSource.LL_ISR})
LL_TS_OFFSET = 2 # lc_ts starts at payload[2:6]
LL_TS_SIZE = 4
def has_os_ts(source_code: int) -> bool:
"""Return True if frames from this source carry a valid os_ts prefix."""
return source_code not in _NO_OS_TS_SOURCES
def is_ll_source(source_code: int) -> bool:
"""Return True if this is a Link Layer source with lc_ts timestamp."""
return source_code in _LL_SOURCES
def resolve_source_name(src_code: int) -> str:
"""Resolve source code to BleLogSource name, with fallback for unknown codes."""
try:
return str(BleLogSource(src_code).name)
except ValueError:
return f'SRC_{src_code}'
class InternalSource(int, Enum):
INIT_DONE = 0
TS = 1
ENH_STAT = 2
INFO = 3
FLUSH = 4
# --- Data classes ---
@dataclass(slots=True)
class ChecksumMode:
algorithm: ChecksumAlgorithm
scope: ChecksumScope
@dataclass(slots=True)
class ParsedFrame:
source_code: int
frame_sn: int
payload: bytes # includes os_ts prefix for ble_log_write_hex() frames
os_ts_ms: int # extracted from first 4 bytes of payload; only valid when has_os_ts(source_code) is True
@dataclass(slots=True)
class SourcePeakWrite:
"""Peak write burst for a single source within a 1ms window."""
peak_frames: int = 0 # max frame count in any 1ms window
peak_bytes: int = 0 # total bytes in that same window
@dataclass(slots=True)
class SourceStats:
"""Console-side accumulated per-source statistics (resilient to firmware counter resets)."""
written_frames: int = 0
written_bytes: int = 0
lost_frames: int = 0
lost_bytes: int = 0
@dataclass(slots=True)
class TransportSnapshot:
"""Snapshot of transport-layer metrics for the current stats interval."""
rx_bytes: int = 0
bps: float = 0.0
max_bps: float = 0.0
fps: float = 0.0
@dataclass(slots=True)
class LossSnapshot:
"""Snapshot of firmware-reported cumulative loss."""
total_frames: int = 0
total_bytes: int = 0
@dataclass(slots=True)
class PeakBurstSnapshot:
"""Peak write burst metrics for a single clock domain (os_ts or lc_ts)."""
per_source: dict[SourceCode, SourcePeakWrite] | None = None
max_per_source: dict[SourceCode, SourcePeakWrite] | None = None
class LossType(str, Enum):
BUFFER = 'buffer' # firmware buffer full, frame dropped
TRANSPORT = 'transport' # UART/link loss
@dataclass(frozen=True)
class FrameByteCount:
"""A (frames, bytes) pair."""
frames: int
bytes: int
@dataclass(frozen=True)
class ThroughputInfo:
"""Rate metrics (frames/s and bytes/s)."""
throughput_fps: float # current console receive rate (rolling 1s window)
throughput_bps: float # current console receive byte rate
peak_write_frames: int # raw frame count in densest burst window
peak_write_bytes: int # raw byte count in that burst window
peak_window_ms: int # burst window size in ms
@dataclass(frozen=True)
class FunnelSnapshot:
"""Per-source three-layer funnel snapshot."""
source: int # SourceCode
# Three-layer funnel
produced: FrameByteCount # Layer 0: written + buffer_loss
written: FrameByteCount # Layer 1: from ENH_STAT
received: FrameByteCount # Layer 2: console-side counting
# Loss breakdown
buffer_loss: FrameByteCount # from ENH_STAT lost counts
transport_loss: FrameByteCount # max(0, written - received)
# Rate
throughput: ThroughputInfo
@dataclass(slots=True)
class LaunchConfig:
"""Configuration returned by the Launch Screen."""
port: str
baudrate: int
log_dir: Path
@dataclass(slots=True)
class FrameStats:
"""Periodic stats snapshot with metrics grouped by dimension."""
transport: TransportSnapshot = field(default_factory=TransportSnapshot)
loss: LossSnapshot = field(default_factory=LossSnapshot)
os_peak: PeakBurstSnapshot = field(default_factory=PeakBurstSnapshot)
ll_peak: PeakBurstSnapshot = field(default_factory=PeakBurstSnapshot)
per_source_rx_bytes: dict[SourceCode, int] | None = None
sync_state: SyncState = SyncState.SEARCHING
checksum_algorithm: ChecksumAlgorithm | None = None
checksum_scope: ChecksumScope | None = None
# --- TypedDicts for internal decoder results ---
class InfoResult(TypedDict):
int_src: InternalSource
version: int
os_ts_ms: int
class EnhStatResult(TypedDict):
int_src: InternalSource
src_code: int
written_frame_cnt: int
lost_frame_cnt: int
written_bytes_cnt: int
lost_bytes_cnt: int
os_ts_ms: int
InternalDecoderResult = InfoResult | EnhStatResult
# --- Textual Messages (backend -> frontend) ---
class SyncStateChanged(Message):
def __init__(self, state: SyncState) -> None:
super().__init__()
self.state = state
class StatsUpdated(Message):
def __init__(self, stats: FrameStats, funnel_snapshots: list[FunnelSnapshot] | None = None) -> None:
super().__init__()
self.stats = stats
self.funnel_snapshots = funnel_snapshots or []
class InternalFrameDecoded(Message):
def __init__(self, int_src: InternalSource, payload: InternalDecoderResult) -> None:
super().__init__()
self.int_src = int_src
self.payload = payload
class LogLine(Message):
def __init__(self, text: str) -> None:
super().__init__()
self.text = text
class FrameLossDetected(Message):
def __init__(
self,
source_name: str,
loss_type: LossType,
lost_frames: int,
lost_bytes: int,
sn_range: tuple[int, int] | None = None,
) -> None:
super().__init__()
self.source_name = source_name
self.loss_type = loss_type
self.lost_frames = lost_frames
self.lost_bytes = lost_bytes
self.sn_range = sn_range
class BackendStopped(Message):
def __init__(self, reason: str = '') -> None:
super().__init__()
self.reason = reason
class TrafficSpikeDetected(Message):
def __init__(
self,
throughput_kbs: float,
wire_max_kbs: float,
utilization_pct: float,
duration_ms: float,
per_source: dict[int, float],
) -> None:
super().__init__()
self.throughput_kbs = throughput_kbs
self.wire_max_kbs = wire_max_kbs
self.utilization_pct = utilization_pct
self.duration_ms = duration_ms
self.per_source = per_source
@@ -0,0 +1,38 @@
# SPDX-FileCopyrightText: 2026 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Apache-2.0
"""Stats package -- re-exports for backward-compatible imports."""
from src.backend.stats.accumulator import StatsAccumulator
from src.backend.stats.firmware_loss import FirmwareLossTracker
from src.backend.stats.firmware_written import FirmwareWrittenTracker
from src.backend.stats.peak_burst import WRITE_RATE_WINDOW_MS
from src.backend.stats.peak_burst import PeakBurstTracker
from src.backend.stats.sn_gap import REORDER_WINDOW
from src.backend.stats.sn_gap import SN_MAX
from src.backend.stats.sn_gap import SNGapTracker
from src.backend.stats.traffic_spike import TRAFFIC_ALERT_COOLDOWN_SEC
from src.backend.stats.traffic_spike import TRAFFIC_THRESHOLD_PCT
from src.backend.stats.traffic_spike import TRAFFIC_WINDOW_SEC
from src.backend.stats.traffic_spike import TrafficSpikeDetector
from src.backend.stats.traffic_spike import TrafficSpikeResult
from src.backend.stats.transport import UART_BITS_PER_BYTE
from src.backend.stats.transport import TransportMetrics
__all__ = [
'FirmwareLossTracker',
'FirmwareWrittenTracker',
'PeakBurstTracker',
'REORDER_WINDOW',
'SN_MAX',
'SNGapTracker',
'StatsAccumulator',
'TRAFFIC_ALERT_COOLDOWN_SEC',
'TRAFFIC_THRESHOLD_PCT',
'TRAFFIC_WINDOW_SEC',
'TrafficSpikeDetector',
'TrafficSpikeResult',
'TransportMetrics',
'UART_BITS_PER_BYTE',
'WRITE_RATE_WINDOW_MS',
]
@@ -0,0 +1,233 @@
# SPDX-FileCopyrightText: 2026 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Apache-2.0
"""Thin composition of stats sub-modules into a single accumulator."""
from __future__ import annotations
from src.backend.models import BleLogSource
from src.backend.models import ChecksumMode
from src.backend.models import FrameByteCount
from src.backend.models import FrameStats
from src.backend.models import FunnelSnapshot
from src.backend.models import SourceCode
from src.backend.models import SyncState
from src.backend.models import ThroughputInfo
from src.backend.stats.firmware_loss import FirmwareLossTracker
from src.backend.stats.firmware_written import FirmwareWrittenTracker
from src.backend.stats.peak_burst import PeakBurstTracker
from src.backend.stats.peak_burst import WRITE_RATE_WINDOW_MS
from src.backend.stats.sn_gap import SNGapTracker
from src.backend.stats.traffic_spike import TrafficSpikeDetector
from src.backend.stats.traffic_spike import TrafficSpikeResult
from src.backend.stats.transport import UART_BITS_PER_BYTE
from src.backend.stats.transport import TransportMetrics
_ZERO = FrameByteCount(frames=0, bytes=0)
_SN_PRODUCED_MIN_VERSION = 4
class StatsAccumulator:
def __init__(self) -> None:
self._transport = TransportMetrics()
self._os_burst = PeakBurstTracker()
self._ll_burst = PeakBurstTracker()
self._wall_burst = PeakBurstTracker()
self._fw_loss = FirmwareLossTracker()
self._fw_written = FirmwareWrittenTracker()
self._sn_gap = SNGapTracker()
self._traffic = TrafficSpikeDetector()
self._per_source_received_frames: dict[SourceCode, int] = {}
self._per_source_received_bytes: dict[SourceCode, int] = {}
self._enh_stat_prev: dict[SourceCode, tuple[int, int, int, int]] = {}
self._total_elapsed: float = 0.0
self._prev_written: dict[SourceCode, tuple[int, int]] = {}
self._sn_gap_enabled = False # disabled until firmware version >= 4 confirmed
def set_firmware_version(self, version: int) -> None:
self._sn_gap_enabled = version >= _SN_PRODUCED_MIN_VERSION
def record_bytes(self, count: int) -> None:
self._transport.record_bytes(count)
def record_frame(self, frame_size: int = 0, src_code: int = 0, frame_sn: int = -1) -> int:
"""Record a received frame. Returns confirmed SN gap count (0 if SN tracking disabled)."""
self._transport.record_frame()
gap = 0
if frame_sn >= 0 and src_code > 0:
if self._sn_gap_enabled:
gap = self._sn_gap.record(src_code, frame_sn)
self._per_source_received_frames[src_code] = self._per_source_received_frames.get(src_code, 0) + 1
self._per_source_received_bytes[src_code] = self._per_source_received_bytes.get(src_code, 0) + frame_size
return gap
# -- Timestamp-based burst tracking ------------------------------------------
def record_frame_ts(self, os_ts_ms: int, frame_size: int, src_code: SourceCode) -> None:
self._os_burst.record(os_ts_ms, frame_size, src_code)
def record_ll_frame_ts(self, lc_ts_us: int, frame_size: int, src_code: SourceCode) -> None:
self._ll_burst.record(lc_ts_us // 1000, frame_size, src_code)
def record_frame_wall_ts(self, wall_ms: int, frame_size: int, src_code: SourceCode) -> None:
"""Record frame with wall-clock timestamp for sources without chip-side timestamps."""
self._wall_burst.record(wall_ms, frame_size, src_code)
# -- Traffic spike -----------------------------------------------------------
def set_wire_max(self, baudrate: int) -> None:
self._traffic.set_wire_max_bps(baudrate / UART_BITS_PER_BYTE)
def record_frame_traffic(self, frame_size: int, src_code: SourceCode) -> None:
self._traffic.record(frame_size, src_code)
def check_traffic(self) -> TrafficSpikeResult | None:
return self._traffic.check()
# -- Firmware ENH_STAT -------------------------------------------------------
def record_enh_stat(
self,
src_code: SourceCode,
written_frames: int,
lost_frames: int,
written_bytes: int,
lost_bytes: int,
baudrate: int,
) -> tuple[int, int]:
"""Record firmware ENH_STAT report. Returns (loss_delta_frames, loss_delta_bytes).
Torn-read guard: discards reports where byte deltas exceed 2s of wire
capacity (non-atomic enh_stat_t reads under concurrent ISR/task updates).
"""
prev = self._enh_stat_prev.get(src_code)
if prev is not None:
max_bytes_delta = baudrate * 2 // UART_BITS_PER_BYTE
d_written_bytes = written_bytes - prev[2]
d_lost_bytes = lost_bytes - prev[3]
if d_written_bytes > max_bytes_delta or d_lost_bytes > max_bytes_delta:
# Update prev to avoid cascading discards on next report
self._enh_stat_prev[src_code] = (written_frames, lost_frames, written_bytes, lost_bytes)
return (0, 0)
self._enh_stat_prev[src_code] = (written_frames, lost_frames, written_bytes, lost_bytes)
self._fw_written.record(src_code, written_frames, written_bytes)
return self._fw_loss.record(src_code, lost_frames, lost_bytes)
# -- Reset -------------------------------------------------------------------
def reset(self, reason: str) -> None:
"""Reset components by group.
reason: "init" (INIT_DONE) or "flush" (FLUSH)
"""
# SN-coupled: always full reset
self._sn_gap.reset()
if reason == 'init':
# ENH_STAT-coupled: full reset
self._fw_loss.reset()
self._fw_written.reset()
self._enh_stat_prev.clear()
self._prev_written.clear()
elif reason == 'flush':
# ENH_STAT-coupled: reset baselines only
self._fw_loss.reset_baselines()
self._fw_written.reset_baselines()
self._enh_stat_prev.clear()
# Console-local: preserve (no action)
# -- Snapshots ---------------------------------------------------------------
def snapshot(
self,
elapsed_sec: float,
sync_state: SyncState = SyncState.SEARCHING,
checksum_mode: ChecksumMode | None = None,
) -> FrameStats:
self._wall_burst.harvest()
return FrameStats(
transport=self._transport.harvest(elapsed_sec),
loss=self._fw_loss.totals(),
os_peak=self._os_burst.harvest(),
ll_peak=self._ll_burst.harvest(),
per_source_rx_bytes=(dict(self._per_source_received_bytes) if self._per_source_received_bytes else None),
sync_state=sync_state,
checksum_algorithm=checksum_mode.algorithm if checksum_mode else None,
checksum_scope=checksum_mode.scope if checksum_mode else None,
)
def funnel_snapshot(self, elapsed_sec: float = 0.0) -> list[FunnelSnapshot]:
"""Build per-source funnel snapshots from all component data."""
written_totals = self._fw_written.totals()
loss_totals = self._fw_loss.per_source_totals()
os_max_peaks = self._os_burst.max_peaks()
ll_max_peaks = self._ll_burst.max_peaks()
wall_max_peaks = self._wall_burst.max_peaks()
sources: set[int] = set()
sources.update(written_totals)
sources.update(loss_totals)
sources.update(self._per_source_received_frames)
# Exclude INTERNAL (src_code=0): its transport_loss is inherently
# unknowable — if INTERNAL frames are lost, the ENH_STAT data inside
# them never arrives, making the written-vs-received comparison circular.
sources.discard(BleLogSource.INTERNAL)
self._total_elapsed += elapsed_sec
result: list[FunnelSnapshot] = []
for src in sorted(sources):
w_frames, w_bytes = written_totals.get(src, (0, 0))
l_frames, l_bytes = loss_totals.get(src, (0, 0))
r_frames = self._per_source_received_frames.get(src, 0)
r_bytes = self._per_source_received_bytes.get(src, 0)
produced = FrameByteCount(frames=w_frames + l_frames, bytes=w_bytes + l_bytes)
written = FrameByteCount(frames=w_frames, bytes=w_bytes)
received = FrameByteCount(frames=r_frames, bytes=r_bytes)
buffer_loss = FrameByteCount(frames=l_frames, bytes=l_bytes)
pw_frames, pw_bytes = self._prev_written.get(src, (0, 0))
transport_loss = FrameByteCount(
frames=max(0, pw_frames - r_frames),
bytes=max(0, pw_bytes - r_bytes),
)
if self._total_elapsed > 0:
tp_fps = r_frames / self._total_elapsed
tp_bps = r_bytes / self._total_elapsed
else:
tp_fps = 0.0
tp_bps = 0.0
peak = os_max_peaks.get(src) or ll_max_peaks.get(src) or wall_max_peaks.get(src)
if peak:
peak_frames = peak.peak_frames
peak_bytes = peak.peak_bytes
else:
peak_frames = 0
peak_bytes = 0
result.append(
FunnelSnapshot(
source=src,
produced=produced,
written=written,
received=received,
buffer_loss=buffer_loss,
transport_loss=transport_loss,
throughput=ThroughputInfo(
throughput_fps=tp_fps,
throughput_bps=tp_bps,
peak_write_frames=peak_frames,
peak_write_bytes=peak_bytes,
peak_window_ms=WRITE_RATE_WINDOW_MS,
),
)
)
self._prev_written = dict(written_totals)
return result
@@ -0,0 +1,72 @@
# SPDX-FileCopyrightText: 2026 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Apache-2.0
"""Firmware ENH_STAT loss tracking with incremental delta accumulation.
Resilient to firmware counter resets from ``ble_log_bench_reset_stat``.
"""
from src.backend.models import LossSnapshot
from src.backend.models import SourceCode
class FirmwareLossTracker:
"""Tracks per-source firmware-reported loss using incremental deltas."""
def __init__(self) -> None:
self._frames_prev: dict[SourceCode, int] = {}
self._bytes_prev: dict[SourceCode, int] = {}
self._frames_accum: dict[SourceCode, int] = {}
self._bytes_accum: dict[SourceCode, int] = {}
def record(self, src_code: SourceCode, lost_frames: int, lost_bytes: int) -> tuple[int, int]:
"""Record firmware-reported loss.
Returns (new_frames, new_bytes) delta since last report.
On first report or counter reset, returns (0, 0) and suppresses alert.
"""
if src_code not in self._frames_prev:
self._frames_prev[src_code] = lost_frames
self._bytes_prev[src_code] = lost_bytes
if src_code not in self._frames_accum:
self._frames_accum[src_code] = lost_frames
self._bytes_accum[src_code] = lost_bytes
return (0, 0)
prev_frames = self._frames_prev[src_code]
prev_bytes = self._bytes_prev[src_code]
d_frames = lost_frames - prev_frames
d_bytes = lost_bytes - prev_bytes
self._frames_prev[src_code] = lost_frames
self._bytes_prev[src_code] = lost_bytes
if d_frames < 0 or d_bytes < 0:
self._frames_accum[src_code] += max(0, lost_frames)
self._bytes_accum[src_code] += max(0, lost_bytes)
return (0, 0)
self._frames_accum[src_code] += d_frames
self._bytes_accum[src_code] += d_bytes
return (d_frames, d_bytes)
def reset(self) -> None:
self._frames_prev.clear()
self._bytes_prev.clear()
self._frames_accum.clear()
self._bytes_accum.clear()
def reset_baselines(self) -> None:
self._frames_prev.clear()
self._bytes_prev.clear()
def per_source_totals(self) -> dict[SourceCode, tuple[int, int]]:
"""Return per-source cumulative loss as {src: (frames, bytes)}."""
return {src: (self._frames_accum[src], self._bytes_accum[src]) for src in self._frames_accum}
def totals(self) -> LossSnapshot:
"""Return cumulative loss across all sources."""
return LossSnapshot(
total_frames=sum(self._frames_accum.values()),
total_bytes=sum(self._bytes_accum.values()),
)
@@ -0,0 +1,51 @@
# SPDX-FileCopyrightText: 2026 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Apache-2.0
from src.backend.models import SourceCode
class FirmwareWrittenTracker:
def __init__(self) -> None:
self._frames_prev: dict[SourceCode, int] = {}
self._bytes_prev: dict[SourceCode, int] = {}
self._frames_accum: dict[SourceCode, int] = {}
self._bytes_accum: dict[SourceCode, int] = {}
def record(self, src_code: SourceCode, written_frames: int, written_bytes: int) -> tuple[int, int]:
if src_code not in self._frames_prev:
self._frames_prev[src_code] = written_frames
self._bytes_prev[src_code] = written_bytes
if src_code not in self._frames_accum:
self._frames_accum[src_code] = written_frames
self._bytes_accum[src_code] = written_bytes
return (0, 0)
prev_frames = self._frames_prev[src_code]
prev_bytes = self._bytes_prev[src_code]
d_frames = written_frames - prev_frames
d_bytes = written_bytes - prev_bytes
self._frames_prev[src_code] = written_frames
self._bytes_prev[src_code] = written_bytes
if d_frames < 0 or d_bytes < 0:
self._frames_accum[src_code] += max(0, written_frames)
self._bytes_accum[src_code] += max(0, written_bytes)
return (0, 0)
self._frames_accum[src_code] += d_frames
self._bytes_accum[src_code] += d_bytes
return (d_frames, d_bytes)
def totals(self) -> dict[SourceCode, tuple[int, int]]:
return {src: (self._frames_accum[src], self._bytes_accum[src]) for src in self._frames_accum}
def reset(self) -> None:
self._frames_prev.clear()
self._bytes_prev.clear()
self._frames_accum.clear()
self._bytes_accum.clear()
def reset_baselines(self) -> None:
self._frames_prev.clear()
self._bytes_prev.clear()
@@ -0,0 +1,105 @@
# SPDX-FileCopyrightText: 2026 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Apache-2.0
"""Sliding-window peak write burst tracker.
Tracks the densest burst of log writes within a configurable time window
over chip-side timestamps. A single instance handles one clock domain
(os_ts or lc_ts); the accumulator holds two instances.
The window uses millisecond-resolution timestamps. Because log writes
happen at microsecond frequency, many frames share the same ms timestamp.
Instead of computing an inaccurate fps, we count frames and bytes within
the densest window.
"""
from collections import deque
from src.backend.models import PeakBurstSnapshot
from src.backend.models import SourceCode
from src.backend.models import SourcePeakWrite
# Sliding window width in chip timestamp space (milliseconds).
WRITE_RATE_WINDOW_MS = 10
_UINT32_MAX = 0xFFFF_FFFF
_UINT32_HALF = _UINT32_MAX // 2
# Type alias for a single window entry: (ts_ms, frame_size, src_code)
_WindowEntry = tuple[int, int, SourceCode]
def _ts_delta_ms(newer: int, older: int) -> int:
"""Compute forward delta between two uint32 timestamps, handling wraparound."""
diff = (newer - older) & _UINT32_MAX
if diff > _UINT32_HALF:
return -1
return diff
def _window_peak(window: deque[_WindowEntry]) -> dict[SourceCode, SourcePeakWrite]:
"""Compute per-source peak from the current window contents."""
per_source: dict[SourceCode, SourcePeakWrite] = {}
for _, frame_size, src_code in window:
if src_code in per_source:
sp = per_source[src_code]
per_source[src_code] = SourcePeakWrite(
peak_frames=sp.peak_frames + 1,
peak_bytes=sp.peak_bytes + frame_size,
)
else:
per_source[src_code] = SourcePeakWrite(peak_frames=1, peak_bytes=frame_size)
return per_source
class PeakBurstTracker:
"""Sliding-window peak frame burst over a timestamp stream."""
def __init__(self, window_ms: int = WRITE_RATE_WINDOW_MS) -> None:
self._window: deque[_WindowEntry] = deque()
self._window_ms = window_ms
self._per_source_peak: dict[SourceCode, SourcePeakWrite] = {}
self._max_per_source_peak: dict[SourceCode, SourcePeakWrite] = {}
def record(self, ts_ms: int, frame_size: int, src_code: SourceCode) -> None:
"""Record a frame timestamp for peak burst calculation."""
entry: _WindowEntry = (ts_ms, frame_size, src_code)
self._window.append(entry)
while len(self._window) > 1:
delta = _ts_delta_ms(ts_ms, self._window[0][0])
if delta < 0:
self._window.clear()
self._window.append(entry)
break
if delta < self._window_ms:
break
self._window.popleft()
cur_per_src = _window_peak(self._window)
for src, sp in cur_per_src.items():
existing = self._per_source_peak.get(src)
if existing is None or sp.peak_frames > existing.peak_frames:
self._per_source_peak[src] = sp
def harvest(self) -> PeakBurstSnapshot:
"""Take current-period peaks, update all-time max, reset current period."""
per_source = self._per_source_peak if self._per_source_peak else None
for src, sp in self._per_source_peak.items():
existing = self._max_per_source_peak.get(src)
if existing is None or sp.peak_frames > existing.peak_frames:
self._max_per_source_peak[src] = sp
self._per_source_peak = {}
max_per_source = dict(self._max_per_source_peak) if self._max_per_source_peak else None
return PeakBurstSnapshot(
per_source=per_source,
max_per_source=max_per_source,
)
def max_peaks(self) -> dict[SourceCode, SourcePeakWrite]:
"""Return all-time max peaks per source (non-destructive, no reset)."""
return dict(self._max_per_source_peak)
@@ -0,0 +1,102 @@
# SPDX-FileCopyrightText: 2026 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Apache-2.0
"""Sliding receive window gap tracker for per-source frame sequence numbers.
Frames are only declared lost when the receive window advances past their SN
without them being received, tolerating out-of-order delivery up to
REORDER_WINDOW frames.
"""
from src.backend.models import SourceCode
SN_MAX = 1 << 24 # 24-bit SN space
REORDER_WINDOW = 256 # receive window size
class SNGapTracker:
"""Tracks per-source frame gaps using a sliding receive window."""
def __init__(self) -> None:
self._window_base: dict[SourceCode, int] = {}
self._received: dict[SourceCode, set[int]] = {}
self._gap_accum: dict[SourceCode, int] = {}
def record(self, src_code: SourceCode, frame_sn: int) -> int:
"""Record a received frame SN and return newly confirmed gap count.
Returns the number of SNs confirmed lost by this call (0 for in-order
or reordered frames within the window).
"""
if src_code not in self._window_base:
# First frame: establish baseline
self._window_base[src_code] = (frame_sn + 1) % SN_MAX
self._received[src_code] = set()
self._gap_accum[src_code] = 0
return 0
dist = self._distance(frame_sn, self._window_base[src_code])
if 0 <= dist < REORDER_WINDOW:
# Within receive window: mark received, advance base
self._received[src_code].add(frame_sn)
return self._advance(src_code)
if dist >= REORDER_WINDOW:
# Beyond window: expire old slots as confirmed gaps
new_base = (frame_sn - REORDER_WINDOW + 1) % SN_MAX
gaps = self._expire_to(src_code, new_base)
self._received[src_code].add(frame_sn)
self._advance(src_code)
return gaps
if dist >= -REORDER_WINDOW:
# Behind window within tolerance: late arrival, already handled
return 0
# Far behind window: likely reset (FLUSH/INIT_DONE)
self._window_base[src_code] = (frame_sn + 1) % SN_MAX
self._received[src_code] = set()
return 0
def totals(self) -> dict[SourceCode, int]:
"""Return cumulative confirmed gap count per source."""
return dict(self._gap_accum)
def reset(self, src_code: SourceCode | None = None) -> None:
"""Reset tracker state.
If src_code is None, resets all sources.
Otherwise resets only the specified source.
"""
if src_code is None:
self._window_base.clear()
self._received.clear()
self._gap_accum.clear()
else:
self._window_base.pop(src_code, None)
self._received.pop(src_code, None)
self._gap_accum.pop(src_code, None)
def _distance(self, sn: int, base: int) -> int:
"""Signed distance from base to sn in 24-bit SN space."""
d = (sn - base) % SN_MAX
return d if d < SN_MAX // 2 else d - SN_MAX
def _advance(self, src_code: SourceCode) -> int:
"""Advance base past continuous received SNs."""
while self._window_base[src_code] in self._received[src_code]:
self._received[src_code].discard(self._window_base[src_code])
self._window_base[src_code] = (self._window_base[src_code] + 1) % SN_MAX
return 0
def _expire_to(self, src_code: SourceCode, new_base: int) -> int:
"""Advance base to new_base, counting unreceived SNs as confirmed gaps."""
gaps = 0
while self._window_base[src_code] != new_base:
if self._window_base[src_code] not in self._received[src_code]:
gaps += 1
self._received[src_code].discard(self._window_base[src_code])
self._window_base[src_code] = (self._window_base[src_code] + 1) % SN_MAX
self._gap_accum[src_code] += gaps
return gaps
@@ -0,0 +1,94 @@
# SPDX-FileCopyrightText: 2026 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Apache-2.0
"""Real-time traffic spike detection using a sliding window over wall-clock time."""
import time
from collections import deque
from dataclasses import dataclass
from src.backend.models import SourceCode
TRAFFIC_WINDOW_SEC = 0.1 # 100ms detection window
TRAFFIC_THRESHOLD_PCT = 0.8 # 80% of wire max
TRAFFIC_ALERT_COOLDOWN_SEC = 2.0 # minimum interval between alerts
@dataclass(slots=True)
class TrafficSpikeResult:
throughput_kbs: float
wire_max_kbs: float
utilization_pct: float
duration_ms: float
per_source: dict[SourceCode, float]
class TrafficSpikeDetector:
"""Detects traffic spikes exceeding a percentage of theoretical wire capacity."""
def __init__(self) -> None:
self._wire_max_bps: float = 0.0
self._window: deque[tuple[float, int, SourceCode]] = deque()
self._spike_active = False
self._spike_start: float = 0.0
self._spike_peak_bps: float = 0.0
self._spike_per_source: dict[SourceCode, int] = {}
self._last_alert_time: float = 0.0
def set_wire_max_bps(self, wire_max_bps: float) -> None:
self._wire_max_bps = wire_max_bps
def record(self, frame_size: int, src_code: SourceCode) -> None:
self._window.append((time.perf_counter(), frame_size, src_code))
if self._spike_active:
self._spike_per_source[src_code] = self._spike_per_source.get(src_code, 0) + frame_size
def check(self) -> TrafficSpikeResult | None:
now = time.perf_counter()
window = self._window
cutoff = now - TRAFFIC_WINDOW_SEC
while window and window[0][0] < cutoff:
window.popleft()
if self._wire_max_bps <= 0:
return None
window_bytes = sum(b for _, b, _ in window)
throughput_bps = window_bytes / TRAFFIC_WINDOW_SEC
utilization = throughput_bps / self._wire_max_bps
if utilization >= TRAFFIC_THRESHOLD_PCT:
if not self._spike_active:
self._spike_active = True
self._spike_start = now
self._spike_peak_bps = 0.0
self._spike_per_source = {}
for _, b, src in window:
self._spike_per_source[src] = self._spike_per_source.get(src, 0) + b
if throughput_bps > self._spike_peak_bps:
self._spike_peak_bps = throughput_bps
return None
if not self._spike_active:
return None
self._spike_active = False
duration_ms = (now - self._spike_start) * 1000.0
if now - self._last_alert_time < TRAFFIC_ALERT_COOLDOWN_SEC:
return None
self._last_alert_time = now
spike_bps = self._spike_peak_bps
src_total = max(sum(self._spike_per_source.values()), 1)
src_pcts = {src: v / src_total * 100.0 for src, v in self._spike_per_source.items()}
return TrafficSpikeResult(
throughput_kbs=spike_bps / 1024.0,
wire_max_kbs=self._wire_max_bps / 1024.0,
utilization_pct=spike_bps / self._wire_max_bps * 100.0,
duration_ms=duration_ms,
per_source=src_pcts,
)
@@ -0,0 +1,46 @@
# SPDX-FileCopyrightText: 2026 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Apache-2.0
"""Transport-layer metrics: RX bytes, throughput (bps), frame rate (fps)."""
from src.backend.models import TransportSnapshot
UART_BITS_PER_BYTE = 10 # 8 data + 1 start + 1 stop
class TransportMetrics:
"""Tracks cumulative RX bytes and frame count with delta-based rate snapshots."""
def __init__(self) -> None:
self._rx_bytes = 0
self._rx_bytes_snapshot = 0
self._frame_count = 0
self._frame_count_snapshot = 0
self._max_bps = 0.0
def record_bytes(self, count: int) -> None:
self._rx_bytes += count
def record_frame(self) -> None:
self._frame_count += 1
def harvest(self, elapsed_sec: float) -> TransportSnapshot:
"""Compute rates from deltas since last harvest, update max, and reset deltas."""
rx_delta = self._rx_bytes - self._rx_bytes_snapshot
frame_delta = self._frame_count - self._frame_count_snapshot
bps = rx_delta * UART_BITS_PER_BYTE / elapsed_sec if elapsed_sec > 0 else 0.0
fps = frame_delta / elapsed_sec if elapsed_sec > 0 else 0.0
if bps > self._max_bps:
self._max_bps = bps
self._rx_bytes_snapshot = self._rx_bytes
self._frame_count_snapshot = self._frame_count
return TransportSnapshot(
rx_bytes=self._rx_bytes,
bps=bps,
max_bps=self._max_bps,
fps=fps,
)
@@ -0,0 +1,33 @@
# SPDX-FileCopyrightText: 2026 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Apache-2.0
"""UART read loop with raw binary file writing.
See Spec Sections 6, 12.
"""
import serial
import serial.tools.list_ports
UART_READ_TIMEOUT = 0.1
UART_BLOCK_SIZE = 50 * 1024
def list_serial_ports() -> list[str]:
ports = serial.tools.list_ports.comports()
return [port.device for port in ports]
def validate_uart_port(port: str) -> str | None:
"""Validate port exists and is accessible. Returns error message or None if valid."""
available = list_serial_ports()
if port not in available:
return f"UART port '{port}' not found. Available: {available}"
return None
def open_serial(port: str, baudrate: int) -> serial.Serial:
try:
return serial.Serial(port, baudrate=baudrate, timeout=UART_READ_TIMEOUT, exclusive=True)
except (ValueError, serial.SerialException):
return serial.Serial(port, baudrate=baudrate, timeout=UART_READ_TIMEOUT)
@@ -0,0 +1,2 @@
# SPDX-FileCopyrightText: 2026 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Apache-2.0
+40
View File
@@ -0,0 +1,40 @@
# SPDX-FileCopyrightText: 2026 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Apache-2.0
import struct
from collections.abc import Callable
from src.backend.models import HEADER_FMT
def build_frame_header(payload_len: int, source_code: int, frame_sn: int) -> bytes:
"""Build a 6-byte BLE Log frame header."""
frame_meta = (source_code & 0xFF) | (frame_sn << 8)
return struct.pack(HEADER_FMT, payload_len, frame_meta)
def build_frame(
payload: bytes,
source_code: int,
frame_sn: int,
checksum_fn: Callable[[bytes], int],
checksum_scope_full: bool = True,
) -> bytes:
"""Build a complete BLE Log frame with header, payload, and checksum.
Args:
payload: Frame payload bytes (should include 4B os_ts prefix if applicable)
source_code: BLE Log source code (0-7)
frame_sn: 24-bit sequence number
checksum_fn: Function(data: bytes) -> int
checksum_scope_full: If True, checksum covers header+payload; else header only
"""
header = build_frame_header(len(payload), source_code, frame_sn)
if checksum_scope_full:
checksum_data = header + payload
else:
checksum_data = header
checksum_val = checksum_fn(checksum_data)
return header + payload + struct.pack('<I', checksum_val)
@@ -0,0 +1,96 @@
# SPDX-FileCopyrightText: 2026 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Apache-2.0
from src.backend.checksum import sum_checksum
from src.backend.checksum import xor_checksum
class TestSumChecksum:
def test_empty(self) -> None:
assert sum_checksum(b'') == 0
def test_single_byte(self) -> None:
assert sum_checksum(b'\x42') == 0x42
def test_multiple_bytes(self) -> None:
# Sum of bytes: 0x01 + 0x02 + 0x03 + 0x04 = 0x0A
assert sum_checksum(b'\x01\x02\x03\x04') == 0x0A
def test_overflow_wraps_u32(self) -> None:
# 256 bytes of 0xFF = 256 * 255 = 65280
data = b'\xff' * 256
assert sum_checksum(data) == 65280
class TestXorChecksum:
def test_empty(self) -> None:
assert xor_checksum(b'') == 0
def test_single_word(self) -> None:
# [0x01, 0x02, 0x03, 0x04] → LE word 0x04030201
assert xor_checksum(b'\x01\x02\x03\x04') == 0x04030201
def test_two_words(self) -> None:
data = b'\x01\x02\x03\x04\x05\x06\x07\x08'
# word1 = 0x04030201, word2 = 0x08070605
expected = 0x04030201 ^ 0x08070605
assert xor_checksum(data) == expected
def test_unaligned_length(self) -> None:
"""XOR checksum handles non-4-byte-aligned data lengths correctly."""
# 5 bytes: 1 full word + 1 trailing byte (zero-padded)
data = b'\x01\x02\x03\x04\x05'
# word0 = 0x04030201, word1 = 0x00000005 (padded)
# XOR = 0x04030201 ^ 0x00000005 = 0x04030204
assert xor_checksum(data) == 0x04030204
def test_typical_frame_data_produces_valid_result(self) -> None:
"""Verify xor_checksum produces a valid u32 result on typical frame-sized data."""
# A typical 6-byte header + 10-byte payload
header = b'\x0a\x00\x00\x01\x00\x00' # payload_len=10, src=0, sn=256
payload = b'\x00\x00\x00\x00\x03\x03' + b'\x00' * 4
data = header + payload
result = xor_checksum(data)
assert isinstance(result, int)
assert 0 <= result < 0x100000000
def test_matches_ble_log_parser_v2(self) -> None:
"""Verify our implementation matches the proven ble_log_parser_v2 approach.
Both implementations should produce identical results: simple XOR of
consecutive 4-byte LE words with zero-padding for partial last word.
"""
import struct
def reference_xor(data: bytes) -> int:
"""Reference: ble_log_parser_v2 _validate_xor logic."""
body_len = len(data)
if body_len == 0:
return 0
checksum_cal = 0
for i in range(0, body_len, 4):
remaining = body_len - i
if remaining >= 4:
(word,) = struct.unpack_from('<I', data, i)
else:
last_chunk = data[i:]
padded_chunk = last_chunk + b'\x00' * (4 - remaining)
(word,) = struct.unpack('<I', padded_chunk)
if i == 0:
checksum_cal = word
else:
checksum_cal ^= word
return checksum_cal & 0xFFFFFFFF
# Test various data sizes
test_vectors = [
b'',
b'\x01',
b'\x01\x02\x03',
b'\x01\x02\x03\x04',
b'\x01\x02\x03\x04\x05',
b'\xff' * 16,
b'\x0a\x00\x00\x01\x00\x00' + b'\x00' * 10, # typical frame
]
for data in test_vectors:
assert xor_checksum(data) == reference_xor(data), f'Mismatch for data length {len(data)}'
@@ -0,0 +1,76 @@
# SPDX-FileCopyrightText: 2026 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Apache-2.0
from src.backend.stats.firmware_loss import FirmwareLossTracker
class TestFirmwareLossTracker:
def test_first_report_zero_delta(self) -> None:
t = FirmwareLossTracker()
new_f, new_b = t.record(src_code=1, lost_frames=1000, lost_bytes=5000)
assert new_f == 0
assert new_b == 0
totals = t.totals()
assert totals.total_frames == 1000
assert totals.total_bytes == 5000
per_src = t.per_source_totals()
assert per_src[1] == (1000, 5000)
def test_incremental_delta(self) -> None:
t = FirmwareLossTracker()
t.record(1, 0, 0)
new_f, new_b = t.record(1, 5, 200)
assert new_f == 5
assert new_b == 200
new_f, new_b = t.record(1, 8, 320)
assert new_f == 3
assert new_b == 120
def test_multi_source(self) -> None:
t = FirmwareLossTracker()
t.record(1, 100, 1000)
t.record(2, 50, 500)
t.record(1, 105, 1200)
t.record(2, 52, 580)
totals = t.totals()
assert totals.total_frames == 157
assert totals.total_bytes == 1780
def test_counter_reset(self) -> None:
t = FirmwareLossTracker()
t.record(1, 0, 0)
t.record(1, 100, 4000)
new_f, new_b = t.record(1, 30, 1200)
assert new_f == 0
totals = t.totals()
assert totals.total_frames == 130
assert totals.total_bytes == 5200
def test_normal_after_reset(self) -> None:
t = FirmwareLossTracker()
t.record(1, 0, 0)
t.record(1, 100, 4000)
t.record(1, 30, 1200)
new_f, new_b = t.record(1, 50, 2000)
assert new_f == 20
assert new_b == 800
def test_reset_clears_everything(self) -> None:
t = FirmwareLossTracker()
t.record(1, 10, 100)
t.reset()
assert t.totals().total_frames == 0
assert t.totals().total_bytes == 0
def test_reset_baselines_preserves_accumulators(self) -> None:
t = FirmwareLossTracker()
t.record(1, 10, 100)
d_frames, d_bytes = t.record(1, 15, 150)
assert d_frames == 5
t.reset_baselines()
# Next report is treated as new baseline (no delta)
d_frames, d_bytes = t.record(1, 20, 200)
assert d_frames == 0 # baseline re-established
# Accumulators preserved from before
totals = t.totals()
assert totals.total_frames == 15 # initial absolute + pre-reset delta
@@ -0,0 +1,86 @@
# SPDX-FileCopyrightText: 2026 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Apache-2.0
from src.backend.stats.firmware_written import FirmwareWrittenTracker
class TestFirmwareWrittenTracker:
def test_first_report_zero_delta(self) -> None:
t = FirmwareWrittenTracker()
new_f, new_b = t.record(src_code=1, written_frames=1000, written_bytes=5000)
assert new_f == 0
assert new_b == 0
totals = t.totals()
assert totals[1] == (1000, 5000)
def test_incremental_delta(self) -> None:
t = FirmwareWrittenTracker()
t.record(1, 0, 0)
new_f, new_b = t.record(1, 5, 200)
assert new_f == 5
assert new_b == 200
new_f, new_b = t.record(1, 8, 320)
assert new_f == 3
assert new_b == 120
def test_multi_source(self) -> None:
t = FirmwareWrittenTracker()
t.record(1, 100, 1000)
t.record(2, 50, 500)
t.record(1, 105, 1200)
t.record(2, 52, 580)
totals = t.totals()
assert totals[1] == (105, 1200)
assert totals[2] == (52, 580)
def test_counter_reset(self) -> None:
t = FirmwareWrittenTracker()
t.record(1, 0, 0)
t.record(1, 100, 4000)
new_f, new_b = t.record(1, 30, 1200)
assert new_f == 0
assert new_b == 0
totals = t.totals()
assert totals[1] == (130, 5200)
def test_normal_after_reset(self) -> None:
t = FirmwareWrittenTracker()
t.record(1, 0, 0)
t.record(1, 100, 4000)
t.record(1, 30, 1200)
new_f, new_b = t.record(1, 50, 2000)
assert new_f == 20
assert new_b == 800
def test_reset_clears_all(self) -> None:
t = FirmwareWrittenTracker()
t.record(1, 10, 100)
t.record(1, 20, 200)
t.reset()
assert t.totals() == {}
new_f, new_b = t.record(1, 50, 500)
assert new_f == 0
assert new_b == 0
def test_reset_baselines_preserves_accum(self) -> None:
t = FirmwareWrittenTracker()
t.record(1, 0, 0)
t.record(1, 100, 4000)
t.reset_baselines()
new_f, new_b = t.record(1, 30, 1200)
assert new_f == 0
assert new_b == 0
totals = t.totals()
assert totals[1] == (100, 4000)
def test_reset_baselines_then_incremental(self) -> None:
t = FirmwareWrittenTracker()
t.record(1, 0, 0)
t.record(1, 50, 2000)
t.reset_baselines()
t.record(1, 10, 400)
new_f, new_b = t.record(1, 25, 1000)
assert new_f == 15
assert new_b == 600
totals = t.totals()
assert totals[1] == (65, 2600)
@@ -0,0 +1,239 @@
# SPDX-FileCopyrightText: 2026 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Apache-2.0
from src.backend.checksum import sum_checksum
from src.backend.checksum import xor_checksum
from src.backend.frame_parser import FrameParser
from src.backend.models import ChecksumAlgorithm
from src.backend.models import ChecksumScope
from src.backend.models import SyncState
from tests.helpers import build_frame
def _make_sum_frame(payload: bytes, src: int, sn: int) -> bytes:
return build_frame(payload, src, sn, sum_checksum, checksum_scope_full=True)
def _make_xor_frame(payload: bytes, src: int, sn: int) -> bytes:
return build_frame(payload, src, sn, xor_checksum, checksum_scope_full=True)
class TestFrameParserStateTransitions:
def test_initial_state_is_searching(self) -> None:
parser = FrameParser()
assert parser.sync_state == SyncState.SEARCHING
def test_three_valid_frames_reach_synced(self) -> None:
"""N=3 consecutive valid frames should transition SEARCHING -> CONFIRMING -> SYNCED."""
parser = FrameParser()
payload = b'\x00' * 8 # 4B os_ts + 4B data
frames_data = b''
for sn in range(3):
frames_data += _make_sum_frame(payload, src=1, sn=sn)
parser.feed(frames_data)
assert parser.sync_state == SyncState.SYNCED
def test_garbage_stays_searching(self) -> None:
parser = FrameParser()
garbage = b'\xde\xad\xbe\xef' * 100
parser.feed(garbage)
assert parser.sync_state == SyncState.SEARCHING
def test_mixed_garbage_then_valid_frames(self) -> None:
parser = FrameParser()
payload = b'\x00' * 8
garbage = b'\xff' * 50
frames = b''
for sn in range(3):
frames += _make_sum_frame(payload, src=1, sn=sn)
parser.feed(garbage + frames)
assert parser.sync_state == SyncState.SYNCED
def test_checksum_failure_in_synced_triggers_confirming_loss(self) -> None:
parser = FrameParser()
payload = bytes(range(0xA0, 0xA8))
good_frames = b''
for sn in range(3):
good_frames += _make_sum_frame(payload, src=1, sn=sn)
parser.feed(good_frames)
assert parser.sync_state == SyncState.SYNCED
bad_frame = _make_sum_frame(payload, src=1, sn=99)
corrupt = bytearray(bad_frame)
corrupt[-1] ^= 0xFF
parser.feed(bytes(corrupt))
assert parser.sync_state == SyncState.CONFIRMING_LOSS
def test_confirming_loss_recovers_to_synced(self) -> None:
"""After corrupt bytes, enough valid frames should re-establish SYNCED."""
parser = FrameParser()
payload = bytes(range(0xA0, 0xA8))
good_frames = b''
for sn in range(3):
good_frames += _make_sum_frame(payload, src=1, sn=sn)
parser.feed(good_frames)
assert parser.sync_state == SyncState.SYNCED
corrupt = b'\xfe' * 20
recovery_frames = b''
for sn in range(3, 6):
recovery_frames += _make_sum_frame(payload, src=1, sn=sn)
parser.feed(corrupt + recovery_frames)
assert parser.sync_state == SyncState.SYNCED
def test_confirming_loss_to_searching_after_m_plus_1_failures(self) -> None:
from src.backend.frame_parser import LOSS_TOLERANCE
parser = FrameParser()
payload = bytes(range(0xA0, 0xA8))
good_frames = b''
for sn in range(3):
good_frames += _make_sum_frame(payload, src=1, sn=sn)
parser.feed(good_frames)
assert parser.sync_state == SyncState.SYNCED
garbage = b'\xfe' * (LOSS_TOLERANCE + 20)
parser.feed(garbage)
assert parser.sync_state == SyncState.SEARCHING
def test_confirming_sync_rejects_mismatched_mode(self) -> None:
"""Review Correction #3: CONFIRMING_SYNC must verify same checksum mode."""
parser = FrameParser()
payload = b'\x00' * 8
# Feed one SUM frame to enter CONFIRMING_SYNC
sum_frame = _make_sum_frame(payload, src=1, sn=0)
parser.feed(sum_frame)
assert parser.sync_state == SyncState.CONFIRMING_SYNC
# Feed an XOR frame — mode mismatch should restart confirmation
xor_frame = _make_xor_frame(payload, src=1, sn=1)
parser.feed(xor_frame)
# Should still be in CONFIRMING_SYNC (restarted with new mode), not SYNCED
assert parser.sync_state == SyncState.CONFIRMING_SYNC
class TestFrameParserOutput:
def test_parsed_frames_returned(self) -> None:
parser = FrameParser()
payload = b'\x00\x00\x00\x00\xaa\xbb' # 4B os_ts + 2B data
frames_data = b''
for sn in range(3):
frames_data += _make_sum_frame(payload, src=2, sn=sn)
results = parser.feed(frames_data)
parsed = [r for r in results if hasattr(r, 'source_code')]
assert len(parsed) == 3
assert all(f.source_code == 2 for f in parsed)
def test_ascii_lines_extracted_from_non_frame_data(self) -> None:
parser = FrameParser()
# In SEARCHING state, non-frame data should be collected as ASCII
ascii_data = b'Hello world\n'
results = parser.feed(ascii_data)
lines = [r for r in results if isinstance(r, str)]
assert any('Hello world' in line for line in lines)
def _make_sum_header_only_frame(payload: bytes, src: int, sn: int) -> bytes:
return build_frame(payload, src, sn, sum_checksum, checksum_scope_full=False)
def _make_xor_header_only_frame(payload: bytes, src: int, sn: int) -> bytes:
return build_frame(payload, src, sn, xor_checksum, checksum_scope_full=False)
class TestChecksumAutoDetection:
def test_detects_sum_full(self) -> None:
parser = FrameParser()
payload = b'\x00' * 8
frames = b''
for sn in range(3):
frames += _make_sum_frame(payload, src=1, sn=sn)
parser.feed(frames)
assert parser.sync_state == SyncState.SYNCED
assert parser.checksum_mode is not None
assert parser.checksum_mode.algorithm == ChecksumAlgorithm.SUM
assert parser.checksum_mode.scope == ChecksumScope.FULL
def test_detects_xor_full(self) -> None:
parser = FrameParser()
payload = b'\x00' * 8
frames = b''
for sn in range(3):
frames += _make_xor_frame(payload, src=1, sn=sn)
parser.feed(frames)
assert parser.sync_state == SyncState.SYNCED
assert parser.checksum_mode is not None
assert parser.checksum_mode.algorithm == ChecksumAlgorithm.XOR
def test_detects_sum_header_only(self) -> None:
parser = FrameParser()
payload = b'\x01\x02\x03\x04\xaa\xbb\xcc\xdd'
frames = b''
for sn in range(3):
frames += _make_sum_header_only_frame(payload, src=1, sn=sn)
parser.feed(frames)
assert parser.sync_state == SyncState.SYNCED
assert parser.checksum_mode is not None
assert parser.checksum_mode.algorithm == ChecksumAlgorithm.SUM
assert parser.checksum_mode.scope == ChecksumScope.HEADER_ONLY
def test_detects_xor_header_only(self) -> None:
parser = FrameParser()
payload = b'\x01\x02\x03\x04\xaa\xbb\xcc\xdd'
frames = b''
for sn in range(3):
frames += _make_xor_header_only_frame(payload, src=1, sn=sn)
parser.feed(frames)
assert parser.sync_state == SyncState.SYNCED
assert parser.checksum_mode is not None
assert parser.checksum_mode.algorithm == ChecksumAlgorithm.XOR
assert parser.checksum_mode.scope == ChecksumScope.HEADER_ONLY
class TestBoundedBuffer:
def test_remainder_buffer_bounded(self) -> None:
parser = FrameParser()
# Feed more than MAX_REMAINDER_SIZE of garbage
huge_garbage = b'\xfe' * (131072 + 1)
parser.feed(huge_garbage)
# Buffer should have been reset, state should be SEARCHING
assert parser.sync_state == SyncState.SEARCHING
# Verify parser can still sync after overflow (buffer was cleared)
payload = b'\x00' * 8
frames = b''
for sn in range(3):
frames += _make_sum_frame(payload, src=1, sn=sn)
parser.feed(frames)
assert parser.sync_state == SyncState.SYNCED
def test_buffer_overflow_emits_warning(self) -> None:
"""Review Correction #2: buffer overflow must log warning."""
parser = FrameParser()
huge_garbage = b'\xfe' * (131072 + 1)
results = parser.feed(huge_garbage)
warnings = [r for r in results if isinstance(r, str) and 'WARN' in r]
assert len(warnings) >= 1
class TestFrameSplitAcrossChunks:
def test_frame_split_across_chunks(self) -> None:
"""Review Correction #7: partial frames split across feed() calls."""
parser = FrameParser()
payload = b'\x00' * 8
frames = b''
for sn in range(3):
frames += _make_sum_frame(payload, src=1, sn=sn)
# Split in the middle of the second frame
mid = len(frames) // 2
parser.feed(frames[:mid])
parser.feed(frames[mid:])
assert parser.sync_state == SyncState.SYNCED
@@ -0,0 +1,75 @@
# SPDX-FileCopyrightText: 2026 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Apache-2.0
import struct
from src.backend.internal_decoder import decode_internal_frame
from src.backend.models import InternalSource
def _make_internal_payload(os_ts: int, int_src: int, sub_payload: bytes) -> bytes:
"""Build a full INTERNAL frame payload (os_ts + int_src_code + sub_payload)."""
return struct.pack('<I', os_ts) + bytes([int_src]) + sub_payload
class TestInitDone:
def test_decode_init_done(self) -> None:
payload = _make_internal_payload(os_ts=1234, int_src=0, sub_payload=b'\x03')
result = decode_internal_frame(payload)
assert result is not None
assert result['int_src'] == InternalSource.INIT_DONE
assert result['version'] == 3
assert result['os_ts_ms'] == 1234
class TestInfo:
def test_decode_info(self) -> None:
payload = _make_internal_payload(os_ts=5678, int_src=3, sub_payload=b'\x03')
result = decode_internal_frame(payload)
assert result is not None
assert result['int_src'] == InternalSource.INFO
assert result['version'] == 3
class TestEnhStat:
def test_decode_enh_stat(self) -> None:
sub = struct.pack('<BIIII', 2, 100, 5, 4096, 256) # src=2, written=100, lost=5, ...
payload = _make_internal_payload(os_ts=9999, int_src=2, sub_payload=sub)
result = decode_internal_frame(payload)
assert result is not None
assert result['int_src'] == InternalSource.ENH_STAT
assert result['src_code'] == 2
assert result['written_frame_cnt'] == 100
assert result['lost_frame_cnt'] == 5
assert result['written_bytes_cnt'] == 4096
assert result['lost_bytes_cnt'] == 256
class TestFlush:
def test_decode_flush(self) -> None:
payload = _make_internal_payload(os_ts=0, int_src=4, sub_payload=b'\x03')
result = decode_internal_frame(payload)
assert result is not None
assert result['int_src'] == InternalSource.FLUSH
assert result['version'] == 3
class TestTs:
def test_ts_ignored(self) -> None:
sub = struct.pack('<BIII', 1, 100, 200, 300) # io_level, lc_ts, esp_ts, os_ts
payload = _make_internal_payload(os_ts=0, int_src=1, sub_payload=sub)
result = decode_internal_frame(payload)
assert result is None # TS is ignored
class TestUnknown:
def test_unknown_int_src(self) -> None:
payload = _make_internal_payload(os_ts=0, int_src=99, sub_payload=b'\x00')
result = decode_internal_frame(payload)
assert result is None
class TestMalformed:
def test_too_short_payload(self) -> None:
result = decode_internal_frame(b'\x00\x00\x00')
assert result is None
@@ -0,0 +1,259 @@
# SPDX-FileCopyrightText: 2026 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Apache-2.0
from pathlib import Path
from unittest.mock import MagicMock
from unittest.mock import patch
from src.backend.models import LaunchConfig
from src.frontend.launch_screen import BAUD_RATES
from src.frontend.launch_screen import DEFAULT_BAUD_RATE
from src.frontend.launch_screen import LaunchScreen
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
class TestBaudRateConstants:
def test_baud_rates_is_list_of_ints(self) -> None:
assert isinstance(BAUD_RATES, list)
assert all(isinstance(b, int) for b in BAUD_RATES)
def test_baud_rates_not_empty(self) -> None:
assert len(BAUD_RATES) > 0
def test_baud_rates_ascending(self) -> None:
assert BAUD_RATES == sorted(BAUD_RATES)
def test_default_baud_rate_in_list(self) -> None:
assert DEFAULT_BAUD_RATE in BAUD_RATES
def test_default_baud_rate_value(self) -> None:
assert DEFAULT_BAUD_RATE == 3_000_000
def test_common_rates_present(self) -> None:
"""Standard UART baud rates used by ESP-IDF should be available."""
assert 115200 in BAUD_RATES
assert 921600 in BAUD_RATES
# ---------------------------------------------------------------------------
# LaunchConfig dataclass
# ---------------------------------------------------------------------------
class TestLaunchConfig:
def test_create_with_required_fields(self) -> None:
cfg = LaunchConfig(port='/dev/ttyUSB0', baudrate=3000000, log_dir=Path('/tmp'))
assert cfg.port == '/dev/ttyUSB0'
assert cfg.baudrate == 3000000
assert cfg.log_dir == Path('/tmp')
def test_different_ports(self) -> None:
for port in ['/dev/ttyUSB0', '/dev/ttyACM0', 'COM3', '/dev/tty.usbserial-1420']:
cfg = LaunchConfig(port=port, baudrate=115200, log_dir=Path('.'))
assert cfg.port == port
def test_various_baud_rates(self) -> None:
for baud in BAUD_RATES:
cfg = LaunchConfig(port='/dev/ttyUSB0', baudrate=baud, log_dir=Path('.'))
assert cfg.baudrate == baud
def test_log_dir_is_path(self) -> None:
cfg = LaunchConfig(port='COM1', baudrate=115200, log_dir=Path('/var/log'))
assert isinstance(cfg.log_dir, Path)
# ---------------------------------------------------------------------------
# LaunchScreen instantiation
# ---------------------------------------------------------------------------
class TestLaunchScreenInit:
def test_default_log_dir_is_cwd(self) -> None:
screen = LaunchScreen()
assert screen._default_log_dir == Path.cwd()
def test_custom_log_dir(self) -> None:
custom = Path('/tmp/my_logs')
screen = LaunchScreen(default_log_dir=custom)
assert screen._default_log_dir == custom
def test_none_log_dir_falls_back_to_cwd(self) -> None:
screen = LaunchScreen(default_log_dir=None)
assert screen._default_log_dir == Path.cwd()
def test_is_screen_subclass(self) -> None:
from textual.screen import Screen
assert issubclass(LaunchScreen, Screen)
def test_bindings_include_quit(self) -> None:
"""LaunchScreen should have a quit binding on 'q'."""
keys = [b.key for b in LaunchScreen.BINDINGS]
assert 'q' in keys
# ---------------------------------------------------------------------------
# refresh_ports — unit-level (mocked widgets)
# ---------------------------------------------------------------------------
class TestRefreshPorts:
@patch('src.frontend.launch_screen.list_serial_ports')
def test_refresh_updates_select_with_ports(self, mock_lsp: MagicMock) -> None:
"""refresh_ports should scan ports and update the Select widget."""
mock_lsp.return_value = ['/dev/ttyUSB0', '/dev/ttyUSB1']
screen = LaunchScreen()
mock_select = MagicMock()
screen.query_one = MagicMock(return_value=mock_select) # type: ignore[method-assign]
screen.refresh_ports()
mock_lsp.assert_called_once()
mock_select.set_options.assert_called_once_with(
[('/dev/ttyUSB0', '/dev/ttyUSB0'), ('/dev/ttyUSB1', '/dev/ttyUSB1')]
)
assert mock_select.value == '/dev/ttyUSB0'
@patch('src.frontend.launch_screen.list_serial_ports')
def test_refresh_empty_ports_no_value_set(self, mock_lsp: MagicMock) -> None:
"""When no ports found, set_options is called with empty list and value is not set."""
mock_lsp.return_value = []
screen = LaunchScreen()
mock_select = MagicMock()
screen.query_one = MagicMock(return_value=mock_select) # type: ignore[method-assign]
screen.refresh_ports()
mock_select.set_options.assert_called_once_with([])
# value should NOT have been reassigned when ports list is empty
assert mock_select.value != '/dev/ttyUSB0'
# ---------------------------------------------------------------------------
# connect — unit-level (mocked widgets)
# ---------------------------------------------------------------------------
class TestConnect:
def _make_screen_with_mocks(
self,
port_value: object,
baud_value: int = 3000000,
dir_value: str = '/tmp/logs',
) -> tuple[LaunchScreen, MagicMock, MagicMock, MagicMock]:
"""Helper: create a LaunchScreen with mocked query_one results."""
screen = LaunchScreen()
mock_port_select = MagicMock()
mock_port_select.value = port_value
mock_baud_select = MagicMock()
mock_baud_select.value = baud_value
mock_dir_input = MagicMock()
mock_dir_input.value = dir_value
def fake_query_one(selector: str, widget_type: type = object) -> MagicMock:
if selector == '#port-select':
return mock_port_select
if selector == '#baud-select':
return mock_baud_select
if selector == '#dir-input':
return mock_dir_input
raise ValueError(f'Unexpected selector: {selector}')
screen.query_one = fake_query_one # type: ignore[assignment]
screen.dismiss = MagicMock() # type: ignore[method-assign]
screen.notify = MagicMock() # type: ignore[method-assign]
return screen, mock_port_select, mock_baud_select, mock_dir_input
def test_connect_with_valid_port(self) -> None:
"""connect() should dismiss with LaunchConfig when port is selected."""
screen, _, _, _ = self._make_screen_with_mocks(
port_value='/dev/ttyUSB0',
baud_value=921600,
dir_value='/tmp/logs',
)
screen.connect()
screen.dismiss.assert_called_once()
config = screen.dismiss.call_args[0][0]
assert isinstance(config, LaunchConfig)
assert config.port == '/dev/ttyUSB0'
assert config.baudrate == 921600
assert config.log_dir == Path('/tmp/logs')
def test_connect_with_blank_port_shows_error(self) -> None:
"""connect() should notify error and NOT dismiss when port is BLANK."""
from textual.widgets import Select
screen, _, _, _ = self._make_screen_with_mocks(port_value=Select.BLANK)
screen.connect()
screen.notify.assert_called_once_with('Please select a serial port', severity='error')
screen.dismiss.assert_not_called()
def test_connect_log_dir_is_path_object(self) -> None:
"""The log_dir in LaunchConfig should be a Path, not a string."""
screen, _, _, _ = self._make_screen_with_mocks(port_value='COM3', dir_value='/home/user/logs')
screen.connect()
config = screen.dismiss.call_args[0][0]
assert isinstance(config.log_dir, Path)
assert str(config.log_dir) == '/home/user/logs'
# ---------------------------------------------------------------------------
# action_quit
# ---------------------------------------------------------------------------
class TestActionQuit:
def test_action_quit_dismisses_with_none(self) -> None:
screen = LaunchScreen()
screen.dismiss = MagicMock() # type: ignore[method-assign]
screen.action_quit()
screen.dismiss.assert_called_once_with(None)
# ---------------------------------------------------------------------------
# compose — structural checks (no App context required)
# ---------------------------------------------------------------------------
class TestComposeMethod:
def test_compose_is_defined(self) -> None:
"""LaunchScreen.compose should be a callable method."""
assert callable(getattr(LaunchScreen, 'compose', None))
def test_default_css_contains_expected_ids(self) -> None:
"""DEFAULT_CSS should reference the widget IDs used in compose."""
css = LaunchScreen.DEFAULT_CSS
for widget_id in [
'launch-container',
'launch-title',
'port-select',
'refresh-btn',
'dir-input',
'browse-btn',
'connect-btn',
'no-ports-label',
]:
assert widget_id in css, f'Missing CSS rule for #{widget_id}'
def test_baud_options_built_correctly(self) -> None:
"""Verify the baud option tuples match the expected (label, value) shape."""
baud_options = [(str(b), b) for b in BAUD_RATES]
assert all(isinstance(label, str) and isinstance(val, int) for label, val in baud_options)
assert len(baud_options) == len(BAUD_RATES)
@@ -0,0 +1,35 @@
# SPDX-FileCopyrightText: 2026 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Apache-2.0
from src.backend.models import FrameByteCount
from src.backend.models import FunnelSnapshot
from src.backend.models import LossType
from src.backend.models import ThroughputInfo
def test_frame_byte_count():
fbc = FrameByteCount(frames=100, bytes=5000)
assert fbc.frames == 100
assert fbc.bytes == 5000
def test_loss_type_enum():
assert LossType.BUFFER == 'buffer'
assert LossType.TRANSPORT == 'transport'
def test_funnel_snapshot_structure():
zero = FrameByteCount(frames=0, bytes=0)
tp = ThroughputInfo(
throughput_fps=0.0, throughput_bps=0.0, peak_write_frames=0, peak_write_bytes=0, peak_window_ms=10
)
snap = FunnelSnapshot(
source=0,
produced=zero,
written=zero,
received=zero,
buffer_loss=zero,
transport_loss=zero,
throughput=tp,
)
assert snap.produced.frames == 0
@@ -0,0 +1,98 @@
# SPDX-FileCopyrightText: 2026 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Apache-2.0
from src.backend.stats.peak_burst import WRITE_RATE_WINDOW_MS
from src.backend.stats.peak_burst import PeakBurstTracker
from src.backend.stats.peak_burst import _ts_delta_ms
_SRC = 1
_SRC_B = 2
class TestTsDeltaMs:
def test_normal_forward(self) -> None:
assert _ts_delta_ms(1100, 1000) == 100
def test_zero_delta(self) -> None:
assert _ts_delta_ms(5000, 5000) == 0
def test_wraparound(self) -> None:
assert _ts_delta_ms(50, 0xFFFF_FF00) == 306
def test_backward_jump_returns_negative(self) -> None:
assert _ts_delta_ms(1000, 0x8000_0100) == -1
class TestPeakBurstTracker:
def test_single_frame(self) -> None:
t = PeakBurstTracker()
t.record(1000, 100, _SRC)
snap = t.harvest()
assert snap.per_source is not None
assert snap.per_source[_SRC].peak_frames == 1
def test_two_frames_same_ms(self) -> None:
t = PeakBurstTracker()
t.record(1000, 50, _SRC)
t.record(1000, 70, _SRC)
snap = t.harvest()
assert snap.per_source is not None
assert snap.per_source[_SRC].peak_frames == 2
assert snap.per_source[_SRC].peak_bytes == 120
def test_far_apart_are_separate_windows(self) -> None:
t = PeakBurstTracker()
t.record(100, 60, _SRC)
t.record(100 + WRITE_RATE_WINDOW_MS, 40, _SRC)
snap = t.harvest()
assert snap.per_source is not None
assert snap.per_source[_SRC].peak_frames == 1
def test_multi_source_independent_peaks(self) -> None:
t = PeakBurstTracker()
for _ in range(5):
t.record(1000, 30, _SRC)
t.record(1000, 30, _SRC_B)
for _ in range(4):
t.record(2000, 30, _SRC_B)
snap = t.harvest()
assert snap.per_source is not None
assert snap.per_source[_SRC].peak_frames == 5
assert snap.per_source[_SRC_B].peak_frames == 4
def test_max_persists_across_harvests(self) -> None:
t = PeakBurstTracker()
for _ in range(3):
t.record(1000, 100, _SRC)
snap1 = t.harvest()
assert snap1.max_per_source is not None
assert snap1.max_per_source[_SRC].peak_frames == 3
t.record(5000, 200, _SRC)
snap2 = t.harvest()
assert snap2.max_per_source is not None
assert snap2.max_per_source[_SRC].peak_frames == 3
def test_harvest_resets_current_period(self) -> None:
t = PeakBurstTracker()
t.record(1000, 100, _SRC)
t.harvest()
snap = t.harvest()
assert snap.per_source is None
def test_backward_timestamp_resets_window(self) -> None:
t = PeakBurstTracker()
t.record(5000, 80, _SRC)
t.record(5000, 80, _SRC)
t.record(100, 80, _SRC)
snap = t.harvest()
assert snap.per_source is not None
assert snap.per_source[_SRC].peak_frames == 2
def test_wraparound_within_window(self) -> None:
t = PeakBurstTracker()
t.record(0xFFFF_FFFF, 50, _SRC)
t.record(0, 50, _SRC)
snap = t.harvest()
assert snap.per_source is not None
assert snap.per_source[_SRC].peak_frames == 2
@@ -0,0 +1,149 @@
# SPDX-FileCopyrightText: 2026 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Apache-2.0
"""Reset propagation matrix tests.
Verifies that reset("init") and reset("flush") dispatch correctly per the spec:
| Group | Components | INIT_DONE | FLUSH |
|------------------|---------------------------------------------|--------------------|------------------------------------|
| SN-coupled | SNGapTracker | full reset | full reset |
| ENH_STAT-coupled | FirmwareLossTracker, FirmwareWrittenTracker | full reset | reset baselines, preserve accumulators |
| Console-local | TransportMetrics, PeakBurstTracker, | preserve | preserve |
| | per_source_received, throughput cache | | |
"""
from src.backend.stats import StatsAccumulator
class TestResetPropagation:
"""Verify reset("init") and reset("flush") dispatch correctly per the spec."""
def _populate(self, stats: StatsAccumulator) -> None:
"""Feed data into all components so we can verify what gets reset."""
# Transport (console-local)
stats.record_bytes(1000)
stats.record_frame(100, 1, 10) # frame_size=100, src=1, sn=10
stats.record_frame(100, 1, 11)
# Peak burst (console-local)
stats.record_frame_ts(1000, 100, 1)
# ENH_STAT (firmware-coupled)
stats.record_enh_stat(
src_code=1, written_frames=100, lost_frames=5, written_bytes=5000, lost_bytes=250, baudrate=3_000_000
)
stats.record_enh_stat(
src_code=1, written_frames=200, lost_frames=10, written_bytes=10000, lost_bytes=500, baudrate=3_000_000
)
# === INIT_DONE Tests ===
def test_init_resets_sn_gap(self) -> None:
stats = StatsAccumulator()
stats.record_frame(100, 1, 0)
stats.record_frame(100, 1, 1)
stats.reset('init')
stats.record_frame(100, 1, 100)
funnel = stats.funnel_snapshot()
for snap in funnel:
if snap.source == 1:
assert snap.received.frames == 3
def test_init_resets_firmware_loss(self) -> None:
stats = StatsAccumulator()
self._populate(stats)
stats.reset('init')
# After init reset, loss tracker should be clean
# First report after reset establishes new baseline
stats.record_enh_stat(1, 50, 3, 2500, 150, 3_000_000)
funnel = stats.funnel_snapshot()
for snap in funnel:
if snap.source == 1:
assert snap.buffer_loss.frames == 3 # first report absolute value
def test_init_resets_firmware_written(self) -> None:
stats = StatsAccumulator()
self._populate(stats)
stats.reset('init')
stats.record_enh_stat(1, 50, 0, 2500, 0, 3_000_000)
funnel = stats.funnel_snapshot()
for snap in funnel:
if snap.source == 1:
assert snap.written.frames == 50 # first report absolute value
def test_init_preserves_transport_metrics(self) -> None:
stats = StatsAccumulator()
stats.record_bytes(5000)
stats.record_frame()
stats.reset('init')
snapshot = stats.snapshot(1.0)
assert snapshot.transport.rx_bytes == 5000
assert snapshot.transport.fps == 1.0
def test_init_preserves_per_source_received(self) -> None:
stats = StatsAccumulator()
stats.record_frame(100, 1, 0)
stats.reset('init')
funnel = stats.funnel_snapshot()
assert len(funnel) == 1
assert funnel[0].received.frames == 1
# === FLUSH Tests ===
def test_flush_resets_sn_gap(self) -> None:
stats = StatsAccumulator()
stats.record_frame(100, 1, 0)
stats.record_frame(100, 1, 1)
stats.reset('flush')
# After flush, SN tracker is fully reset
stats.record_frame(100, 1, 0) # SN restarts from 0
# Should not count gap from old SN=1 to new SN=0
# The per_source_received should include the 2 pre-flush frames + 1 post-flush
funnel = stats.funnel_snapshot()
for snap in funnel:
if snap.source == 1:
# 2 pre-flush + 1 post-flush = 3 total received
assert snap.received.frames == 3
def test_flush_preserves_firmware_loss_accumulators(self) -> None:
stats = StatsAccumulator()
# Build up some loss: baseline then delta
stats.record_enh_stat(1, 100, 5, 5000, 250, 3_000_000)
stats.record_enh_stat(1, 200, 10, 10000, 500, 3_000_000)
# Now flush
stats.reset('flush')
# Next report re-establishes baseline (no additional delta)
stats.record_enh_stat(1, 50, 3, 2500, 150, 3_000_000)
funnel = stats.funnel_snapshot()
for snap in funnel:
if snap.source == 1:
# Initial absolute (5) + delta (5) = 10; flush preserves accum
assert snap.buffer_loss.frames == 10
def test_flush_preserves_firmware_written_accumulators(self) -> None:
stats = StatsAccumulator()
stats.record_enh_stat(1, 100, 0, 5000, 0, 3_000_000)
stats.record_enh_stat(1, 200, 0, 10000, 0, 3_000_000)
stats.reset('flush')
stats.record_enh_stat(1, 50, 0, 2500, 0, 3_000_000)
funnel = stats.funnel_snapshot()
for snap in funnel:
if snap.source == 1:
assert snap.written.frames == 200 # initial absolute + pre-flush delta preserved
def test_flush_preserves_transport_metrics(self) -> None:
stats = StatsAccumulator()
stats.record_bytes(5000)
stats.record_frame()
stats.reset('flush')
snapshot = stats.snapshot(1.0)
assert snapshot.transport.rx_bytes == 5000 # preserved
def test_flush_preserves_per_source_received(self) -> None:
stats = StatsAccumulator()
stats.record_frame(100, 1, 0)
stats.record_frame(100, 1, 1)
stats.reset('flush')
funnel = stats.funnel_snapshot()
for snap in funnel:
if snap.source == 1:
assert snap.received.frames == 2 # preserved
@@ -0,0 +1,80 @@
# SPDX-FileCopyrightText: 2026 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Apache-2.0
from src.backend.stats.sn_gap import SNGapTracker
class TestSNGapTracker:
def setup_method(self):
self.tracker = SNGapTracker()
# --- Baseline ---
def test_first_frame_establishes_baseline(self):
assert self.tracker.record(src_code=1, frame_sn=42) == 0
# --- In-order ---
def test_sequential_no_gap(self):
self.tracker.record(1, 0)
assert self.tracker.record(1, 1) == 0
assert self.tracker.record(1, 2) == 0
# --- Simple reorder (within window) ---
def test_reorder_no_false_gap(self):
"""SN=8 arrives before SN=5,6,7 — no gaps should be counted."""
self.tracker.record(1, 5) # baseline → window_base=6
assert self.tracker.record(1, 8) == 0 # within window, NOT a gap
assert self.tracker.record(1, 6) == 0 # late fill
assert self.tracker.record(1, 7) == 0 # late fill
assert self.tracker.totals().get(1, 0) == 0
# --- Confirmed loss ---
def test_loss_confirmed_when_window_expires(self):
"""Frame beyond window forces expiry of unreceived SNs."""
self.tracker.record(1, 0) # baseline → base=1
# SN=1 never arrives; jump to SN=257 (beyond window of 256)
gaps = self.tracker.record(1, 257)
assert gaps > 0 # SN=1 expired as confirmed loss
assert self.tracker.totals()[1] > 0
# --- Late arrival behind window ---
def test_late_arrival_ignored(self):
self.tracker.record(1, 0)
self.tracker.record(1, 257) # force window advance past 0
assert self.tracker.record(1, 1) == 0 # too late, ignored
# --- Reset detection ---
def test_large_backward_jump_resets_baseline(self):
self.tracker.record(1, 1000)
# SN jumps back to 5 (far beyond REORDER_WINDOW backward)
assert self.tracker.record(1, 5) == 0
# After re-baseline, SN=6 should be normal
assert self.tracker.record(1, 6) == 0
# --- Multi-source independence ---
def test_sources_independent(self):
self.tracker.record(1, 10)
self.tracker.record(2, 20)
assert self.tracker.record(1, 11) == 0
assert self.tracker.record(2, 21) == 0
# --- 24-bit wraparound ---
def test_wraparound(self):
SN_MAX = 1 << 24
self.tracker.record(1, SN_MAX - 2) # base = SN_MAX-1
assert self.tracker.record(1, SN_MAX - 1) == 0
assert self.tracker.record(1, 0) == 0 # wraps to 0
assert self.tracker.record(1, 1) == 0
# --- Reset method ---
def test_reset_clears_all(self):
self.tracker.record(1, 10)
self.tracker.reset()
# After reset, next frame establishes new baseline
assert self.tracker.record(1, 0) == 0
def test_reset_single_source(self):
self.tracker.record(1, 10)
self.tracker.record(2, 20)
self.tracker.reset(src_code=1)
assert self.tracker.record(1, 0) == 0 # re-baselined
assert self.tracker.record(2, 21) == 0 # unaffected
@@ -0,0 +1,909 @@
# SPDX-FileCopyrightText: 2026 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Apache-2.0
from unittest.mock import patch
from src.backend.models import BleLogSource
from src.backend.models import has_os_ts
from src.backend.stats import TRAFFIC_ALERT_COOLDOWN_SEC
from src.backend.stats import TRAFFIC_THRESHOLD_PCT
from src.backend.stats import TRAFFIC_WINDOW_SEC
from src.backend.stats import WRITE_RATE_WINDOW_MS
from src.backend.stats import StatsAccumulator
from src.backend.stats import TrafficSpikeResult
from src.backend.stats.peak_burst import _ts_delta_ms
# Convenience: default frame size used in peak write tests (arbitrary but consistent)
_FRAME_SZ = 100
_SRC = 1 # default source code for single-source tests
class TestStatsAccumulator:
def test_initial_state(self) -> None:
stats = StatsAccumulator()
snapshot = stats.snapshot(0.25)
assert snapshot.transport.rx_bytes == 0
assert snapshot.loss.total_frames == 0
assert snapshot.loss.total_bytes == 0
assert snapshot.os_peak.per_source is None
assert snapshot.os_peak.max_per_source is None
def test_record_bytes(self) -> None:
stats = StatsAccumulator()
stats.record_bytes(1024)
snapshot = stats.snapshot(1.0)
assert snapshot.transport.rx_bytes == 1024
# bps = 1024 * 10 / 1.0 = 10240
assert snapshot.transport.bps == 10240.0
def test_record_frame(self) -> None:
stats = StatsAccumulator()
stats.record_frame()
stats.record_frame()
snapshot = stats.snapshot(1.0)
assert snapshot.transport.fps == 2.0
def test_max_bps_tracked(self) -> None:
stats = StatsAccumulator()
stats.record_bytes(10000)
stats.snapshot(1.0) # bps = 100000
stats.record_bytes(100)
snap2 = stats.snapshot(1.0) # bps = 1000
assert snap2.transport.max_bps == 100000.0
def _enh_stat_loss(
self, stats: StatsAccumulator, src_code: int, lost_frames: int, lost_bytes: int
) -> tuple[int, int]:
"""Helper: call record_enh_stat with dummy written/baudrate, return loss delta."""
return stats.record_enh_stat(
src_code=src_code,
written_frames=0,
lost_frames=lost_frames,
written_bytes=0,
lost_bytes=lost_bytes,
baudrate=3_000_000,
)
def test_firmware_loss_first_report_zero_delta(self) -> None:
"""First ENH_STAT initializes prev (delta=0); subsequent reports show delta."""
stats = StatsAccumulator()
new_frames, new_bytes = self._enh_stat_loss(stats, src_code=1, lost_frames=1000, lost_bytes=5000)
assert new_frames == 0
assert new_bytes == 0
snapshot = stats.snapshot(0.25)
assert snapshot.loss.total_frames == 1000
assert snapshot.loss.total_bytes == 5000
new_frames, new_bytes = self._enh_stat_loss(stats, src_code=1, lost_frames=1003, lost_bytes=5128)
assert new_frames == 3
assert new_bytes == 128
snapshot = stats.snapshot(0.25)
assert snapshot.loss.total_frames == 1003
assert snapshot.loss.total_bytes == 5128
def test_firmware_loss_incremental_returns(self) -> None:
"""Incremental return reflects per-report delta, not cumulative."""
stats = StatsAccumulator()
self._enh_stat_loss(stats, src_code=1, lost_frames=0, lost_bytes=0)
new_frames, new_bytes = self._enh_stat_loss(stats, src_code=1, lost_frames=5, lost_bytes=200)
assert new_frames == 5
assert new_bytes == 200
new_frames, new_bytes = self._enh_stat_loss(stats, src_code=1, lost_frames=8, lost_bytes=320)
assert new_frames == 3
assert new_bytes == 120
def test_multi_source_firmware_loss(self) -> None:
"""Firmware loss tracked independently per source code."""
stats = StatsAccumulator()
self._enh_stat_loss(stats, src_code=1, lost_frames=100, lost_bytes=1000)
self._enh_stat_loss(stats, src_code=2, lost_frames=50, lost_bytes=500)
self._enh_stat_loss(stats, src_code=1, lost_frames=105, lost_bytes=1200)
self._enh_stat_loss(stats, src_code=2, lost_frames=52, lost_bytes=580)
snapshot = stats.snapshot(0.25)
assert snapshot.loss.total_frames == 157 # 100 + 50 + 5 + 2
assert snapshot.loss.total_bytes == 1780 # 1000 + 500 + 200 + 80
def test_firmware_loss_counter_reset(self) -> None:
"""Counter reset (bench_reset_stat) detected and handled correctly."""
stats = StatsAccumulator()
self._enh_stat_loss(stats, src_code=1, lost_frames=0, lost_bytes=0)
self._enh_stat_loss(stats, src_code=1, lost_frames=100, lost_bytes=4000)
new_frames, new_bytes = self._enh_stat_loss(stats, src_code=1, lost_frames=30, lost_bytes=1200)
assert new_frames == 0
assert new_bytes == 0
snapshot = stats.snapshot(0.25)
assert snapshot.loss.total_frames == 130
assert snapshot.loss.total_bytes == 5200
new_frames, new_bytes = self._enh_stat_loss(stats, src_code=1, lost_frames=50, lost_bytes=2000)
assert new_frames == 20
assert new_bytes == 800
snapshot = stats.snapshot(0.25)
assert snapshot.loss.total_frames == 150
assert snapshot.loss.total_bytes == 6000
def test_firmware_loss_multiple_resets(self) -> None:
"""Multiple resets accumulate correctly across all cycles."""
stats = StatsAccumulator()
self._enh_stat_loss(stats, src_code=1, lost_frames=0, lost_bytes=0)
self._enh_stat_loss(stats, src_code=1, lost_frames=50, lost_bytes=2000)
self._enh_stat_loss(stats, src_code=1, lost_frames=10, lost_bytes=400)
self._enh_stat_loss(stats, src_code=1, lost_frames=30, lost_bytes=1200)
self._enh_stat_loss(stats, src_code=1, lost_frames=5, lost_bytes=200)
snapshot = stats.snapshot(0.25)
assert snapshot.loss.total_frames == 85
assert snapshot.loss.total_bytes == 3400
def test_firmware_loss_uint32_overflow_treated_as_reset(self) -> None:
"""uint32 counter overflow is indistinguishable from reset -- handled same way."""
stats = StatsAccumulator()
self._enh_stat_loss(stats, src_code=1, lost_frames=0xFFFF_FF00, lost_bytes=0)
new_frames, _ = self._enh_stat_loss(stats, src_code=1, lost_frames=50, lost_bytes=0)
assert new_frames == 0
snapshot = stats.snapshot(0.25)
assert snapshot.loss.total_frames == 0xFFFF_FF00 + 50
class TestRecordFrameWithSN:
def test_backward_compatible_no_args(self) -> None:
stats = StatsAccumulator()
stats.record_frame()
snapshot = stats.snapshot(1.0)
assert snapshot.transport.fps == 1.0
def test_tracks_per_source_received(self) -> None:
stats = StatsAccumulator()
stats.record_frame(frame_size=100, src_code=1, frame_sn=0)
stats.record_frame(frame_size=200, src_code=1, frame_sn=1)
stats.record_frame(frame_size=50, src_code=2, frame_sn=0)
assert stats._per_source_received_frames[1] == 2
assert stats._per_source_received_bytes[1] == 300
assert stats._per_source_received_frames[2] == 1
assert stats._per_source_received_bytes[2] == 50
def test_sn_gap_tracked(self) -> None:
stats = StatsAccumulator()
stats.set_firmware_version(4) # enable SN gap tracking (requires version >= 4)
stats.record_frame(frame_size=100, src_code=1, frame_sn=0)
# SN=257 is beyond the reorder window (256), forcing SN=1 to be confirmed lost
stats.record_frame(frame_size=100, src_code=1, frame_sn=257)
assert stats._sn_gap.totals() == {1: 1}
def test_no_sn_tracking_when_sn_negative(self) -> None:
stats = StatsAccumulator()
stats.record_frame(frame_size=100, src_code=1, frame_sn=-1)
assert 1 not in stats._per_source_received_frames
snapshot = stats.snapshot(1.0)
assert snapshot.transport.fps == 1.0
def test_no_sn_tracking_when_src_zero(self) -> None:
stats = StatsAccumulator()
stats.record_frame(frame_size=100, src_code=0, frame_sn=5)
assert 0 not in stats._per_source_received_frames
class TestRecordEnhStat:
def test_feeds_both_trackers(self) -> None:
stats = StatsAccumulator()
stats.record_enh_stat(
src_code=1, written_frames=0, lost_frames=0, written_bytes=0, lost_bytes=0, baudrate=3_000_000
)
stats.record_enh_stat(
src_code=1, written_frames=100, lost_frames=10, written_bytes=5000, lost_bytes=500, baudrate=3_000_000
)
written = stats._fw_written.totals()
assert written[1] == (100, 5000)
loss = stats._fw_loss.per_source_totals()
assert loss[1] == (10, 500)
def test_returns_loss_delta(self) -> None:
stats = StatsAccumulator()
d_f, d_b = stats.record_enh_stat(
src_code=1, written_frames=0, lost_frames=0, written_bytes=0, lost_bytes=0, baudrate=3_000_000
)
assert (d_f, d_b) == (0, 0)
d_f, d_b = stats.record_enh_stat(
src_code=1, written_frames=50, lost_frames=5, written_bytes=2500, lost_bytes=250, baudrate=3_000_000
)
assert (d_f, d_b) == (5, 250)
def test_torn_read_guard_rejects_implausible_written_bytes(self) -> None:
stats = StatsAccumulator()
baudrate = 3_000_000
max_delta = baudrate * 2 // 10
stats.record_enh_stat(
src_code=1, written_frames=0, lost_frames=0, written_bytes=0, lost_bytes=0, baudrate=baudrate
)
d_f, d_b = stats.record_enh_stat(
src_code=1, written_frames=10, lost_frames=0, written_bytes=max_delta + 1, lost_bytes=0, baudrate=baudrate
)
assert (d_f, d_b) == (0, 0)
assert stats._fw_written.totals()[1] == (0, 0)
def test_torn_read_guard_rejects_implausible_lost_bytes(self) -> None:
stats = StatsAccumulator()
baudrate = 3_000_000
max_delta = baudrate * 2 // 10
stats.record_enh_stat(
src_code=1, written_frames=0, lost_frames=0, written_bytes=0, lost_bytes=0, baudrate=baudrate
)
d_f, d_b = stats.record_enh_stat(
src_code=1, written_frames=10, lost_frames=5, written_bytes=500, lost_bytes=max_delta + 1, baudrate=baudrate
)
assert (d_f, d_b) == (0, 0)
assert stats._fw_loss.per_source_totals()[1] == (0, 0)
def test_torn_read_guard_accepts_plausible_delta(self) -> None:
stats = StatsAccumulator()
baudrate = 3_000_000
max_delta = baudrate * 2 // 10
stats.record_enh_stat(
src_code=1, written_frames=0, lost_frames=0, written_bytes=0, lost_bytes=0, baudrate=baudrate
)
d_f, d_b = stats.record_enh_stat(
src_code=1, written_frames=10, lost_frames=2, written_bytes=max_delta, lost_bytes=100, baudrate=baudrate
)
assert d_f == 2
assert d_b == 100
def test_torn_read_recovery_uses_last_good_prev(self) -> None:
stats = StatsAccumulator()
baudrate = 3_000_000
max_delta = baudrate * 2 // 10
stats.record_enh_stat(
src_code=1, written_frames=0, lost_frames=0, written_bytes=0, lost_bytes=0, baudrate=baudrate
)
stats.record_enh_stat(
src_code=1, written_frames=10, lost_frames=0, written_bytes=max_delta + 1, lost_bytes=0, baudrate=baudrate
)
d_f, d_b = stats.record_enh_stat(
src_code=1, written_frames=20, lost_frames=3, written_bytes=1000, lost_bytes=150, baudrate=baudrate
)
assert d_f == 3
assert d_b == 150
class TestRecordFrameReturnsGap:
def test_returns_zero_for_sequential_frames(self) -> None:
stats = StatsAccumulator()
stats.set_firmware_version(4)
assert stats.record_frame(frame_size=100, src_code=1, frame_sn=0) == 0
assert stats.record_frame(frame_size=100, src_code=1, frame_sn=1) == 0
def test_returns_gap_count_for_large_jump(self) -> None:
stats = StatsAccumulator()
stats.set_firmware_version(4)
stats.record_frame(frame_size=100, src_code=1, frame_sn=0)
gap = stats.record_frame(frame_size=100, src_code=1, frame_sn=300)
assert gap > 0
def test_returns_zero_when_no_sn_tracking(self) -> None:
stats = StatsAccumulator()
assert stats.record_frame(frame_size=100, src_code=1, frame_sn=-1) == 0
assert stats.record_frame(frame_size=100, src_code=0, frame_sn=5) == 0
def test_sn_gap_disabled_for_old_firmware(self) -> None:
stats = StatsAccumulator()
stats.set_firmware_version(3)
stats.record_frame(frame_size=100, src_code=1, frame_sn=0)
gap = stats.record_frame(frame_size=100, src_code=1, frame_sn=300)
assert gap == 0
def test_sn_gap_disabled_by_default(self) -> None:
stats = StatsAccumulator()
stats.record_frame(frame_size=100, src_code=1, frame_sn=0)
gap = stats.record_frame(frame_size=100, src_code=1, frame_sn=300)
assert gap == 0
class TestReset:
def test_init_clears_firmware_preserves_console(self) -> None:
stats = StatsAccumulator()
stats.record_bytes(1000)
stats.record_frame(frame_size=100, src_code=1, frame_sn=0)
stats.record_enh_stat(
src_code=1, written_frames=0, lost_frames=0, written_bytes=0, lost_bytes=0, baudrate=3_000_000
)
stats.record_enh_stat(
src_code=1, written_frames=50, lost_frames=5, written_bytes=2500, lost_bytes=250, baudrate=3_000_000
)
stats.reset('init')
snapshot = stats.snapshot(1.0)
assert snapshot.transport.rx_bytes == 1000
assert snapshot.loss.total_frames == 0
assert stats._per_source_received_frames == {1: 1}
assert stats._per_source_received_bytes == {1: 100}
assert stats._fw_written.totals() == {}
def test_flush_resets_baselines_only(self) -> None:
stats = StatsAccumulator()
stats.record_bytes(1000)
stats.record_enh_stat(
src_code=1, written_frames=0, lost_frames=0, written_bytes=0, lost_bytes=0, baudrate=3_000_000
)
stats.record_enh_stat(
src_code=1, written_frames=50, lost_frames=5, written_bytes=2500, lost_bytes=250, baudrate=3_000_000
)
stats.record_frame(frame_size=100, src_code=1, frame_sn=0)
stats.reset('flush')
# Console-local data preserved
snapshot = stats.snapshot(1.0)
assert snapshot.transport.rx_bytes == 1000
assert stats._per_source_received_bytes == {1: 100}
# Loss accumulators preserved but baselines reset
assert snapshot.loss.total_frames == 5
# Next ENH_STAT re-baselines (first report = 0 delta)
d_f, d_b = stats.record_enh_stat(
src_code=1, written_frames=100, lost_frames=10, written_bytes=5000, lost_bytes=500, baudrate=3_000_000
)
assert (d_f, d_b) == (0, 0)
class TestFunnelSnapshot:
def test_empty(self) -> None:
stats = StatsAccumulator()
assert stats.funnel_snapshot() == []
def test_single_source_full_data(self) -> None:
stats = StatsAccumulator()
stats.record_enh_stat(
src_code=1, written_frames=0, lost_frames=0, written_bytes=0, lost_bytes=0, baudrate=3_000_000
)
stats.record_enh_stat(
src_code=1, written_frames=100, lost_frames=10, written_bytes=5000, lost_bytes=500, baudrate=3_000_000
)
stats.record_frame(frame_size=80, src_code=1, frame_sn=0)
stats.record_frame(frame_size=80, src_code=1, frame_sn=1)
stats.funnel_snapshot() # establishes prev_written baseline
funnels = stats.funnel_snapshot()
assert len(funnels) == 1
f = funnels[0]
assert f.source == 1
assert f.written.frames == 100
assert f.written.bytes == 5000
assert f.buffer_loss.frames == 10
assert f.buffer_loss.bytes == 500
assert f.produced.frames == 110
assert f.produced.bytes == 5500
assert f.received.frames == 2
assert f.received.bytes == 160
assert f.transport_loss.frames == 98
assert f.transport_loss.bytes == 4840
def test_transport_loss_zero_on_first_snapshot(self) -> None:
stats = StatsAccumulator()
stats.record_enh_stat(
src_code=1, written_frames=0, lost_frames=0, written_bytes=0, lost_bytes=0, baudrate=3_000_000
)
stats.record_enh_stat(
src_code=1, written_frames=100, lost_frames=0, written_bytes=5000, lost_bytes=0, baudrate=3_000_000
)
stats.record_frame(frame_size=80, src_code=1, frame_sn=0)
funnels = stats.funnel_snapshot()
assert funnels[0].transport_loss.frames == 0
def test_transport_loss_stable_after_written_jump(self) -> None:
stats = StatsAccumulator()
stats.record_enh_stat(
src_code=1, written_frames=0, lost_frames=0, written_bytes=0, lost_bytes=0, baudrate=3_000_000
)
stats.record_enh_stat(
src_code=1, written_frames=50, lost_frames=0, written_bytes=2500, lost_bytes=0, baudrate=3_000_000
)
for i in range(50):
stats.record_frame(frame_size=50, src_code=1, frame_sn=i)
stats.funnel_snapshot() # prev_written = {1: (50, 2500)}
stats.record_enh_stat(
src_code=1, written_frames=100, lost_frames=0, written_bytes=5000, lost_bytes=0, baudrate=3_000_000
)
for i in range(49):
stats.record_frame(frame_size=50, src_code=1, frame_sn=50 + i)
funnels = stats.funnel_snapshot()
assert funnels[0].transport_loss.frames == 0
def test_multi_source(self) -> None:
stats = StatsAccumulator()
stats.record_enh_stat(
src_code=1, written_frames=0, lost_frames=0, written_bytes=0, lost_bytes=0, baudrate=3_000_000
)
stats.record_enh_stat(
src_code=2, written_frames=0, lost_frames=0, written_bytes=0, lost_bytes=0, baudrate=3_000_000
)
stats.record_enh_stat(
src_code=1, written_frames=50, lost_frames=5, written_bytes=2500, lost_bytes=250, baudrate=3_000_000
)
stats.record_enh_stat(
src_code=2, written_frames=30, lost_frames=2, written_bytes=1500, lost_bytes=100, baudrate=3_000_000
)
funnels = stats.funnel_snapshot()
assert len(funnels) == 2
assert funnels[0].source == 1
assert funnels[1].source == 2
assert funnels[0].written.frames == 50
assert funnels[1].written.frames == 30
def test_throughput_lifetime_average(self) -> None:
stats = StatsAccumulator()
stats.record_frame(frame_size=100, src_code=1, frame_sn=0)
stats.record_frame(frame_size=100, src_code=1, frame_sn=1)
stats.record_frame(frame_size=100, src_code=1, frame_sn=2)
funnels = stats.funnel_snapshot(elapsed_sec=1.0)
assert funnels[0].throughput.throughput_fps == 3.0
assert funnels[0].throughput.throughput_bps == 300.0
def test_throughput_accumulates_across_snapshots(self) -> None:
stats = StatsAccumulator()
stats.record_frame(frame_size=100, src_code=1, frame_sn=0)
stats.record_frame(frame_size=100, src_code=1, frame_sn=1)
stats.funnel_snapshot(elapsed_sec=1.0)
stats.record_frame(frame_size=200, src_code=1, frame_sn=2)
funnels = stats.funnel_snapshot(elapsed_sec=1.0)
assert funnels[0].throughput.throughput_fps == 1.5
assert funnels[0].throughput.throughput_bps == 200.0
def test_peak_write_from_burst_tracker(self) -> None:
stats = StatsAccumulator()
for i in range(5):
stats.record_frame_ts(1000, 80, 1)
stats.snapshot(0.25)
stats.record_frame(frame_size=80, src_code=1, frame_sn=0)
funnels = stats.funnel_snapshot()
assert len(funnels) == 1
assert funnels[0].throughput.peak_write_frames == 5
assert funnels[0].throughput.peak_write_bytes == 5 * 80
assert funnels[0].throughput.peak_window_ms == WRITE_RATE_WINDOW_MS
def test_throughput_zero_without_elapsed(self) -> None:
stats = StatsAccumulator()
stats.record_frame(frame_size=100, src_code=1, frame_sn=0)
funnels = stats.funnel_snapshot()
assert funnels[0].throughput.throughput_fps == 0.0
assert funnels[0].throughput.throughput_bps == 0.0
class TestFunnelExcludesInternal:
def test_internal_only_returns_empty(self) -> None:
stats = StatsAccumulator()
stats.record_enh_stat(
src_code=0, written_frames=0, lost_frames=0, written_bytes=0, lost_bytes=0, baudrate=3_000_000
)
stats.record_enh_stat(
src_code=0, written_frames=50, lost_frames=0, written_bytes=2500, lost_bytes=0, baudrate=3_000_000
)
funnels = stats.funnel_snapshot()
assert funnels == []
def test_internal_excluded_alongside_others(self) -> None:
stats = StatsAccumulator()
stats.record_enh_stat(
src_code=0, written_frames=0, lost_frames=0, written_bytes=0, lost_bytes=0, baudrate=3_000_000
)
stats.record_enh_stat(
src_code=0, written_frames=50, lost_frames=0, written_bytes=2500, lost_bytes=0, baudrate=3_000_000
)
stats.record_enh_stat(
src_code=1, written_frames=0, lost_frames=0, written_bytes=0, lost_bytes=0, baudrate=3_000_000
)
stats.record_enh_stat(
src_code=1, written_frames=100, lost_frames=0, written_bytes=5000, lost_bytes=0, baudrate=3_000_000
)
funnels = stats.funnel_snapshot()
assert len(funnels) == 1
assert funnels[0].source == 1
class TestHasOsTs:
def test_sources_with_os_ts(self) -> None:
assert has_os_ts(BleLogSource.INTERNAL) is True
assert has_os_ts(BleLogSource.CUSTOM) is True
assert has_os_ts(BleLogSource.HOST) is True
assert has_os_ts(BleLogSource.HCI) is True
assert has_os_ts(BleLogSource.ENCODE) is True
def test_sources_without_os_ts(self) -> None:
assert has_os_ts(BleLogSource.LL_TASK) is False
assert has_os_ts(BleLogSource.LL_HCI) is False
assert has_os_ts(BleLogSource.LL_ISR) is False
assert has_os_ts(BleLogSource.REDIR) is False
class TestTsDeltaMs:
def test_normal_forward(self) -> None:
assert _ts_delta_ms(1100, 1000) == 100
def test_zero_delta(self) -> None:
assert _ts_delta_ms(5000, 5000) == 0
def test_wraparound(self) -> None:
# uint32 wraps: newer=50, older=0xFFFFFF00 -> delta=0x100+50=306
assert _ts_delta_ms(50, 0xFFFF_FF00) == 306
def test_backward_jump_returns_negative(self) -> None:
# older > newer by a large amount -> detected as backward
assert _ts_delta_ms(1000, 0x8000_0100) == -1
class TestPeakWriteBurst:
"""Tests for sliding window peak write burst (count + bytes)."""
def test_single_frame_counts_as_peak(self) -> None:
"""A single frame in window -> peak_write_count=1."""
stats = StatsAccumulator()
stats.record_frame_ts(1000, _FRAME_SZ, _SRC)
snapshot = stats.snapshot(0.25)
assert snapshot.os_peak.per_source[_SRC].peak_frames == 1
assert snapshot.os_peak.per_source[_SRC].peak_bytes == _FRAME_SZ
def test_two_frames_same_ms(self) -> None:
"""Two frames at same timestamp -> both in window -> count=2."""
stats = StatsAccumulator()
stats.record_frame_ts(1000, 50, _SRC)
stats.record_frame_ts(1000, 70, _SRC)
snapshot = stats.snapshot(0.25)
assert snapshot.os_peak.per_source[_SRC].peak_frames == 2
assert snapshot.os_peak.per_source[_SRC].peak_bytes == 120
def test_adjacent_ms_within_window(self) -> None:
"""Frames at ts=100 and ts=101 (delta=1 < WRITE_RATE_WINDOW_MS) are in the same window."""
stats = StatsAccumulator()
stats.record_frame_ts(100, 60, _SRC)
stats.record_frame_ts(101, 40, _SRC)
snapshot = stats.snapshot(0.25)
assert snapshot.os_peak.per_source[_SRC].peak_frames == 2
assert snapshot.os_peak.per_source[_SRC].peak_bytes == 100
def test_far_apart_ms_are_separate_windows(self) -> None:
"""Frames with delta >= WRITE_RATE_WINDOW_MS are in separate windows."""
stats = StatsAccumulator()
stats.record_frame_ts(100, 60, _SRC)
stats.record_frame_ts(100 + WRITE_RATE_WINDOW_MS, 40, _SRC)
snapshot = stats.snapshot(0.25)
assert snapshot.os_peak.per_source[_SRC].peak_frames == 1
assert snapshot.os_peak.per_source[_SRC].peak_bytes == 60
def test_burst_same_timestamp(self) -> None:
"""Many frames at the same ms -> all in window."""
stats = StatsAccumulator()
for _ in range(10):
stats.record_frame_ts(5000, 32, _SRC)
snapshot = stats.snapshot(0.25)
assert snapshot.os_peak.per_source[_SRC].peak_frames == 10
assert snapshot.os_peak.per_source[_SRC].peak_bytes == 320
def test_peak_captures_densest_burst(self) -> None:
"""Sparse phase (far apart) then dense phase (same ms) -> peak from dense."""
stats = StatsAccumulator()
# Sparse: 3 frames at 0, 10, 20 ms -- each alone in its 1ms window
for i in range(3):
stats.record_frame_ts(1000 + i * 10, 50, _SRC)
# Dense: 5 frames all at 2000 ms
for _ in range(5):
stats.record_frame_ts(2000, 80, _SRC)
snapshot = stats.snapshot(0.25)
assert snapshot.os_peak.per_source[_SRC].peak_frames == 5
assert snapshot.os_peak.per_source[_SRC].peak_bytes == 400
def test_max_peak_persists_across_snapshots(self) -> None:
stats = StatsAccumulator()
# First: 3 frames same ms
for _ in range(3):
stats.record_frame_ts(1000, 100, _SRC)
snap1 = stats.snapshot(0.25)
assert snap1.os_peak.per_source[_SRC].peak_frames == 3
assert snap1.os_peak.per_source[_SRC].peak_bytes == 300
assert snap1.os_peak.max_per_source[_SRC].peak_frames == 3
assert snap1.os_peak.max_per_source[_SRC].peak_bytes == 300
# Second: only 1 frame
stats.record_frame_ts(5000, 200, _SRC)
snap2 = stats.snapshot(0.25)
assert snap2.os_peak.per_source[_SRC].peak_frames == 1
assert snap2.os_peak.per_source[_SRC].peak_bytes == 200
# All-time max preserved from first snapshot
assert snap2.os_peak.max_per_source[_SRC].peak_frames == 3
assert snap2.os_peak.max_per_source[_SRC].peak_bytes == 300
def test_peak_resets_per_snapshot(self) -> None:
stats = StatsAccumulator()
stats.record_frame_ts(1000, _FRAME_SZ, _SRC)
stats.record_frame_ts(1000, _FRAME_SZ, _SRC)
stats.snapshot(0.25)
# No new frames -> peak should be None
snap2 = stats.snapshot(0.25)
assert snap2.os_peak.per_source is None
def test_window_evicts_old_entries(self) -> None:
stats = StatsAccumulator()
stats.record_frame_ts(0, _FRAME_SZ, _SRC)
# Frame far beyond window -- old entry evicted, only 1 frame remains
stats.record_frame_ts(WRITE_RATE_WINDOW_MS + 5, _FRAME_SZ, _SRC)
snapshot = stats.snapshot(0.25)
# Peak is still 1 (each frame alone in its window), but the best was
# recorded when each individual frame entered.
assert snapshot.os_peak.per_source[_SRC].peak_frames == 1
def test_backward_timestamp_resets_window(self) -> None:
stats = StatsAccumulator()
stats.record_frame_ts(5000, 80, _SRC)
stats.record_frame_ts(5000, 80, _SRC)
# Chip rebooted -- timestamp jumps back to near-zero
stats.record_frame_ts(100, 80, _SRC)
# After reset, window contains only [100]. Peak from before reset was 2.
snapshot = stats.snapshot(0.25)
assert snapshot.os_peak.per_source[_SRC].peak_frames == 2
assert snapshot.os_peak.per_source[_SRC].peak_bytes == 160
def test_wraparound_same_ms_bucket(self) -> None:
"""Timestamps that wrap around uint32 but have delta=0 stay in window."""
stats = StatsAccumulator()
stats.record_frame_ts(0xFFFF_FFFF, 50, _SRC)
stats.record_frame_ts(0xFFFF_FFFF, 50, _SRC)
snapshot = stats.snapshot(0.25)
assert snapshot.os_peak.per_source[_SRC].peak_frames == 2
assert snapshot.os_peak.per_source[_SRC].peak_bytes == 100
def test_wraparound_within_window(self) -> None:
"""Wrap from 0xFFFFFFFF to 0 (delta=1 < WRITE_RATE_WINDOW_MS) stays in window."""
stats = StatsAccumulator()
stats.record_frame_ts(0xFFFF_FFFF, 50, _SRC)
stats.record_frame_ts(0, 50, _SRC)
snapshot = stats.snapshot(0.25)
assert snapshot.os_peak.per_source[_SRC].peak_frames == 2
def test_wraparound_far_evicts(self) -> None:
"""Wrap with delta >= WRITE_RATE_WINDOW_MS evicts old entry."""
stats = StatsAccumulator()
stats.record_frame_ts(0xFFFF_FFFF, 50, _SRC)
stats.record_frame_ts(WRITE_RATE_WINDOW_MS, 50, _SRC)
snapshot = stats.snapshot(0.25)
assert snapshot.os_peak.per_source[_SRC].peak_frames == 1
class TestPerSourcePeak:
"""Tests for per-source peak write burst tracking."""
def test_single_source_peak(self) -> None:
stats = StatsAccumulator()
stats.record_frame_ts(1000, 50, 1)
stats.record_frame_ts(1000, 70, 1)
snapshot = stats.snapshot(0.25)
assert snapshot.os_peak.per_source is not None
assert 1 in snapshot.os_peak.per_source
assert snapshot.os_peak.per_source[1].peak_frames == 2
assert snapshot.os_peak.per_source[1].peak_bytes == 120
def test_multi_source_peak(self) -> None:
"""Two sources writing at same ms -- per-source counts are independent."""
stats = StatsAccumulator()
stats.record_frame_ts(1000, 50, 1)
stats.record_frame_ts(1000, 30, 2)
stats.record_frame_ts(1000, 60, 1)
stats.record_frame_ts(1000, 40, 2)
snapshot = stats.snapshot(0.25)
assert snapshot.os_peak.per_source is not None
assert snapshot.os_peak.per_source[1].peak_frames == 2
assert snapshot.os_peak.per_source[1].peak_bytes == 110
assert snapshot.os_peak.per_source[2].peak_frames == 2
assert snapshot.os_peak.per_source[2].peak_bytes == 70
def test_per_source_all_time_max(self) -> None:
stats = StatsAccumulator()
# First burst: src 1 has 3 frames
for _ in range(3):
stats.record_frame_ts(1000, 40, 1)
snap1 = stats.snapshot(0.25)
assert snap1.os_peak.max_per_source is not None
assert snap1.os_peak.max_per_source[1].peak_frames == 3
# Second burst: src 1 has only 1 frame -- all-time max preserved
stats.record_frame_ts(5000, 40, 1)
snap2 = stats.snapshot(0.25)
assert snap2.os_peak.max_per_source is not None
assert snap2.os_peak.max_per_source[1].peak_frames == 3
def test_per_source_peak_none_when_no_data(self) -> None:
stats = StatsAccumulator()
snapshot = stats.snapshot(0.25)
assert snapshot.os_peak.per_source is None
assert snapshot.os_peak.max_per_source is None
def test_per_source_independent_peak_moments(self) -> None:
"""Each source's peak is tracked even if it occurs at a different moment than global peak."""
stats = StatsAccumulator()
# At ts=1000: src 1 has 5 frames, src 2 has 1
for _ in range(5):
stats.record_frame_ts(1000, 30, 1)
stats.record_frame_ts(1000, 30, 2)
# At ts=2000: src 2 has 4 frames, src 1 has 0
for _ in range(4):
stats.record_frame_ts(2000, 30, 2)
snapshot = stats.snapshot(0.25)
# Global peak is 6 (at ts=1000), but per-source:
assert snapshot.os_peak.per_source is not None
assert snapshot.os_peak.per_source[1].peak_frames == 5 # from ts=1000
assert snapshot.os_peak.per_source[2].peak_frames == 4 # from ts=2000
class TestLLPeakWriteBurst:
"""Tests for LL peak write burst tracking (lc_ts clock domain)."""
def test_ll_single_source_peak(self) -> None:
"""LL frames with same lc_ts_ms are counted in one window."""
stats = StatsAccumulator()
for _ in range(5):
stats.record_ll_frame_ts(1000000, 30, BleLogSource.LL_TASK)
snapshot = stats.snapshot(0.25)
assert snapshot.ll_peak.per_source is not None
assert snapshot.ll_peak.per_source[BleLogSource.LL_TASK].peak_frames == 5
def test_ll_multi_source_peak(self) -> None:
"""LL per-source peaks are tracked independently."""
stats = StatsAccumulator()
for _ in range(3):
stats.record_ll_frame_ts(2000000, 20, BleLogSource.LL_TASK)
for _ in range(7):
stats.record_ll_frame_ts(2000000, 20, BleLogSource.LL_ISR)
snapshot = stats.snapshot(0.25)
assert snapshot.ll_peak.per_source is not None
assert snapshot.ll_peak.per_source[BleLogSource.LL_TASK].peak_frames == 3
assert snapshot.ll_peak.per_source[BleLogSource.LL_ISR].peak_frames == 7
def test_ll_all_time_max_persists(self) -> None:
"""LL all-time peak persists across snapshots."""
stats = StatsAccumulator()
for _ in range(10):
stats.record_ll_frame_ts(1000000, 30, BleLogSource.LL_HCI)
stats.snapshot(0.25)
for _ in range(3):
stats.record_ll_frame_ts(2000000, 30, BleLogSource.LL_HCI)
snapshot = stats.snapshot(0.25)
assert snapshot.ll_peak.max_per_source is not None
assert snapshot.ll_peak.max_per_source[BleLogSource.LL_HCI].peak_frames == 10
def test_ll_window_separate_from_os_ts(self) -> None:
"""LL window does not interfere with os_ts window."""
stats = StatsAccumulator()
stats.record_frame_ts(1000, 30, BleLogSource.CUSTOM)
stats.record_ll_frame_ts(1000000, 30, BleLogSource.LL_TASK)
snapshot = stats.snapshot(0.25)
assert snapshot.os_peak.per_source is not None
assert BleLogSource.CUSTOM in snapshot.os_peak.per_source
assert BleLogSource.LL_TASK not in snapshot.os_peak.per_source
assert snapshot.ll_peak.per_source is not None
assert BleLogSource.LL_TASK in snapshot.ll_peak.per_source
assert BleLogSource.CUSTOM not in snapshot.ll_peak.per_source
class TestTrafficSpikeDetection:
"""Tests for real-time traffic spike detection."""
def _make_stats(self, baudrate: int = 3_000_000) -> StatsAccumulator:
stats = StatsAccumulator()
stats.set_wire_max(baudrate)
return stats
def test_no_spike_below_threshold(self) -> None:
"""Traffic below 80% of wire max does not trigger spike."""
stats = self._make_stats(3_000_000)
wire_max_bps = 3_000_000 / 10 # 300,000 bytes/sec
safe_bps = wire_max_bps * 0.5
bytes_in_window = int(safe_bps * TRAFFIC_WINDOW_SEC)
t = 1000.0
with patch('src.backend.stats.traffic_spike.time') as mock_time:
mock_time.perf_counter.return_value = t
for _ in range(10):
stats.record_frame_traffic(bytes_in_window // 10, 1)
mock_time.perf_counter.return_value = t + TRAFFIC_WINDOW_SEC + 0.001
assert stats.check_traffic() is None
def test_spike_detected_on_exit(self) -> None:
"""Spike alert fires when traffic drops below threshold after exceeding it."""
stats = self._make_stats(3_000_000)
wire_max_bps = 3_000_000 / 10
hot_bps = wire_max_bps * 0.9
bytes_in_window = int(hot_bps * TRAFFIC_WINDOW_SEC)
t = 1000.0
with patch('src.backend.stats.traffic_spike.time') as mock_time:
mock_time.perf_counter.return_value = t
stats.record_frame_traffic(bytes_in_window, 1)
mock_time.perf_counter.return_value = t + 0.05
result = stats.check_traffic()
assert result is None # still in spike, no alert yet
mock_time.perf_counter.return_value = t + TRAFFIC_WINDOW_SEC + 0.01
result = stats.check_traffic()
assert result is not None
assert result.utilization_pct > TRAFFIC_THRESHOLD_PCT * 100
assert result.duration_ms > 0
assert result.throughput_kbs > 0
def _trigger_spike(
self, stats: StatsAccumulator, mock_time: object, t: float, hot_bytes: int, src: int = 1
) -> TrafficSpikeResult | None:
"""Helper: inject traffic, enter spike, then exit and return result."""
mock_time.perf_counter.return_value = t # type: ignore[attr-defined]
stats.record_frame_traffic(hot_bytes, src)
mock_time.perf_counter.return_value = t + 0.05 # type: ignore[attr-defined]
stats.check_traffic() # enter spike
mock_time.perf_counter.return_value = t + TRAFFIC_WINDOW_SEC + 0.01 # type: ignore[attr-defined]
return stats.check_traffic() # exit spike -> alert
def test_cooldown_suppresses_rapid_alerts(self) -> None:
"""Second spike within cooldown is suppressed."""
stats = self._make_stats(3_000_000)
wire_max_bps = 3_000_000 / 10
hot_bytes = int(wire_max_bps * 0.9 * TRAFFIC_WINDOW_SEC)
t = 1000.0
with patch('src.backend.stats.traffic_spike.time') as mock_time:
first = self._trigger_spike(stats, mock_time, t, hot_bytes)
assert first is not None
t2 = t + 0.5
second = self._trigger_spike(stats, mock_time, t2, hot_bytes)
assert second is None
def test_alert_after_cooldown_expires(self) -> None:
"""Alert fires again after cooldown period."""
stats = self._make_stats(3_000_000)
wire_max_bps = 3_000_000 / 10
hot_bytes = int(wire_max_bps * 0.9 * TRAFFIC_WINDOW_SEC)
t = 1000.0
with patch('src.backend.stats.traffic_spike.time') as mock_time:
first = self._trigger_spike(stats, mock_time, t, hot_bytes)
assert first is not None
t2 = t + TRAFFIC_ALERT_COOLDOWN_SEC + 1.0
second = self._trigger_spike(stats, mock_time, t2, hot_bytes)
assert second is not None
def test_per_source_breakdown(self) -> None:
"""Spike result includes per-source percentage breakdown."""
stats = self._make_stats(3_000_000)
wire_max_bps = 3_000_000 / 10
hot_bytes = int(wire_max_bps * 0.9 * TRAFFIC_WINDOW_SEC)
t = 1000.0
with patch('src.backend.stats.traffic_spike.time') as mock_time:
mock_time.perf_counter.return_value = t
stats.record_frame_traffic(int(hot_bytes * 0.7), 1)
stats.record_frame_traffic(int(hot_bytes * 0.3), 2)
mock_time.perf_counter.return_value = t + 0.05
stats.check_traffic() # enter spike
mock_time.perf_counter.return_value = t + TRAFFIC_WINDOW_SEC + 0.01
result = stats.check_traffic() # exit spike
assert result is not None
assert 1 in result.per_source
assert 2 in result.per_source
assert result.per_source[1] > result.per_source[2]
def test_no_wire_max_disables_detection(self) -> None:
"""Traffic detection is disabled when wire max is not set."""
stats = StatsAccumulator()
t = 1000.0
with patch('src.backend.stats.traffic_spike.time') as mock_time:
mock_time.perf_counter.return_value = t
stats.record_frame_traffic(999999, 1)
mock_time.perf_counter.return_value = t + TRAFFIC_WINDOW_SEC + 0.01
assert stats.check_traffic() is None
@@ -0,0 +1,176 @@
# SPDX-FileCopyrightText: 2026 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Apache-2.0
from src.backend.models import FrameByteCount
from src.backend.models import FunnelSnapshot
from src.backend.models import ThroughputInfo
from src.backend.models import format_throughput
from src.backend.stats import StatsAccumulator
from src.frontend.stats_screen import _build_console_table
from src.frontend.stats_screen import _build_firmware_table
_SRC_HOST = 5
_SRC_LL_TASK = 2
_ZERO = FrameByteCount(frames=0, bytes=0)
_ZERO_TP = ThroughputInfo(
throughput_fps=0.0, throughput_bps=0.0, peak_write_frames=0, peak_write_bytes=0, peak_window_ms=10
)
def _snap(
src,
produced=(0, 0),
written=(0, 0),
received=(0, 0),
buf_loss=(0, 0),
tx_loss=(0, 0),
tp_fps=0.0,
peak_frames=0,
):
return FunnelSnapshot(
source=src,
produced=FrameByteCount(*produced),
written=FrameByteCount(*written),
received=FrameByteCount(*received),
buffer_loss=FrameByteCount(*buf_loss),
transport_loss=FrameByteCount(*tx_loss),
throughput=ThroughputInfo(
throughput_fps=tp_fps,
throughput_bps=0.0,
peak_write_frames=peak_frames,
peak_write_bytes=0,
peak_window_ms=10,
),
)
class TestFormatThroughput:
def test_zero(self) -> None:
assert format_throughput(0.0) == '0.0 KB/s'
def test_small_kb(self) -> None:
assert format_throughput(512.0) == '0.5 KB/s'
def test_one_kb(self) -> None:
assert format_throughput(1024.0) == '1.0 KB/s'
def test_large_kb(self) -> None:
assert format_throughput(500 * 1024) == '500.0 KB/s'
def test_boundary_just_below_mb(self) -> None:
bps = 1023.9 * 1024
result = format_throughput(bps)
assert 'KB/s' in result
def test_boundary_at_mb(self) -> None:
bps = 1024 * 1024
assert format_throughput(bps) == '1.00 MB/s'
def test_large_mb(self) -> None:
bps = 2.5 * 1024 * 1024
assert format_throughput(bps) == '2.50 MB/s'
def test_peak_extrapolation_typical(self) -> None:
peak_bytes_1ms = 300
bps = peak_bytes_1ms * 1000
result = format_throughput(bps)
assert 'KB/s' in result
def test_peak_extrapolation_high(self) -> None:
peak_bytes_1ms = 1500
bps = peak_bytes_1ms * 1000
result = format_throughput(bps)
assert 'MB/s' in result
class TestBuildFirmwareTable:
def test_empty_returns_no_rows(self):
table = _build_firmware_table([])
assert table.row_count == 0
def test_column_headers(self):
table = _build_firmware_table([])
headers = [str(col.header) for col in table.columns]
assert 'Source' in headers
assert any('Written' in h for h in headers)
assert any('Loss' in h for h in headers)
def test_single_source(self):
snap = _snap(_SRC_HOST, written=(120, 6000))
table = _build_firmware_table([snap])
assert table.row_count == 1
assert len(table.columns) == 5
def test_with_loss_shows_red(self):
snap = _snap(_SRC_HOST, written=(110, 5500), buf_loss=(10, 500))
table = _build_firmware_table([snap])
assert table.row_count == 1
def test_multiple_sources(self):
snaps = [
_snap(_SRC_HOST, written=(100, 5000)),
_snap(_SRC_LL_TASK, written=(200, 10000)),
]
table = _build_firmware_table(snaps)
assert table.row_count == 2
class TestBuildConsoleTable:
def test_empty_returns_no_rows(self):
table = _build_console_table([])
assert table.row_count == 0
def test_column_headers(self):
table = _build_console_table([])
headers = [str(col.header) for col in table.columns]
assert 'Source' in headers
assert any('Received' in h for h in headers)
assert any('Average' in h for h in headers)
assert any('Peak' in h for h in headers)
def test_single_source(self):
snap = _snap(_SRC_HOST, tp_fps=850.0, peak_frames=12)
table = _build_console_table([snap])
assert table.row_count == 1
assert len(table.columns) == 7
def test_zero_throughput_shows_dash(self):
snap = _snap(_SRC_HOST, tp_fps=0.0, peak_frames=0)
table = _build_console_table([snap])
assert table.row_count == 1
class TestPerSourceRxBytes:
def test_single_frame(self) -> None:
stats = StatsAccumulator()
stats.record_frame(frame_size=100, src_code=_SRC_HOST, frame_sn=0)
snapshot = stats.snapshot(1.0)
assert snapshot.per_source_rx_bytes == {_SRC_HOST: 100}
def test_multiple_frames_same_source(self) -> None:
stats = StatsAccumulator()
stats.record_frame(frame_size=100, src_code=_SRC_HOST, frame_sn=0)
stats.record_frame(frame_size=200, src_code=_SRC_HOST, frame_sn=1)
snapshot = stats.snapshot(1.0)
assert snapshot.per_source_rx_bytes == {_SRC_HOST: 300}
def test_multiple_sources(self) -> None:
stats = StatsAccumulator()
stats.record_frame(frame_size=100, src_code=_SRC_HOST, frame_sn=0)
stats.record_frame(frame_size=200, src_code=_SRC_LL_TASK, frame_sn=0)
snapshot = stats.snapshot(1.0)
assert snapshot.per_source_rx_bytes == {_SRC_HOST: 100, _SRC_LL_TASK: 200}
def test_cumulative_across_snapshots(self) -> None:
stats = StatsAccumulator()
stats.record_frame(frame_size=100, src_code=_SRC_HOST, frame_sn=0)
stats.snapshot(1.0)
stats.record_frame(frame_size=200, src_code=_SRC_HOST, frame_sn=1)
snapshot = stats.snapshot(1.0)
assert snapshot.per_source_rx_bytes == {_SRC_HOST: 300}
def test_none_when_no_data(self) -> None:
stats = StatsAccumulator()
snapshot = stats.snapshot(1.0)
assert snapshot.per_source_rx_bytes is None
@@ -0,0 +1,78 @@
# SPDX-FileCopyrightText: 2026 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Apache-2.0
from unittest.mock import patch
from src.backend.stats.traffic_spike import TRAFFIC_ALERT_COOLDOWN_SEC
from src.backend.stats.traffic_spike import TRAFFIC_WINDOW_SEC
from src.backend.stats.traffic_spike import TrafficSpikeDetector
from src.backend.stats.traffic_spike import TrafficSpikeResult
def _make_detector(baudrate: int = 3_000_000) -> TrafficSpikeDetector:
d = TrafficSpikeDetector()
d.set_wire_max_bps(baudrate / 10)
return d
def _trigger_spike(
d: TrafficSpikeDetector, mock_time: object, t: float, hot_bytes: int, src: int = 1
) -> TrafficSpikeResult | None:
mock_time.perf_counter.return_value = t # type: ignore[attr-defined]
d.record(hot_bytes, src)
mock_time.perf_counter.return_value = t + 0.05 # type: ignore[attr-defined]
d.check()
mock_time.perf_counter.return_value = t + TRAFFIC_WINDOW_SEC + 0.01 # type: ignore[attr-defined]
return d.check()
class TestTrafficSpikeDetector:
def test_no_spike_below_threshold(self) -> None:
d = _make_detector()
wire_max_bps = 300_000
safe_bytes = int(wire_max_bps * 0.5 * TRAFFIC_WINDOW_SEC)
t = 1000.0
with patch('src.backend.stats.traffic_spike.time') as mock_time:
mock_time.perf_counter.return_value = t
d.record(safe_bytes, 1)
mock_time.perf_counter.return_value = t + TRAFFIC_WINDOW_SEC + 0.001
assert d.check() is None
def test_spike_on_exit(self) -> None:
d = _make_detector()
hot_bytes = int(300_000 * 0.9 * TRAFFIC_WINDOW_SEC)
t = 1000.0
with patch('src.backend.stats.traffic_spike.time') as mock_time:
result = _trigger_spike(d, mock_time, t, hot_bytes)
assert result is not None
assert result.duration_ms > 0
def test_cooldown(self) -> None:
d = _make_detector()
hot_bytes = int(300_000 * 0.9 * TRAFFIC_WINDOW_SEC)
t = 1000.0
with patch('src.backend.stats.traffic_spike.time') as mock_time:
first = _trigger_spike(d, mock_time, t, hot_bytes)
assert first is not None
second = _trigger_spike(d, mock_time, t + 0.5, hot_bytes)
assert second is None
def test_alert_after_cooldown(self) -> None:
d = _make_detector()
hot_bytes = int(300_000 * 0.9 * TRAFFIC_WINDOW_SEC)
t = 1000.0
with patch('src.backend.stats.traffic_spike.time') as mock_time:
first = _trigger_spike(d, mock_time, t, hot_bytes)
assert first is not None
t2 = t + TRAFFIC_ALERT_COOLDOWN_SEC + 1.0
second = _trigger_spike(d, mock_time, t2, hot_bytes)
assert second is not None
def test_no_wire_max_disables(self) -> None:
d = TrafficSpikeDetector()
t = 1000.0
with patch('src.backend.stats.traffic_spike.time') as mock_time:
mock_time.perf_counter.return_value = t
d.record(999999, 1)
mock_time.perf_counter.return_value = t + TRAFFIC_WINDOW_SEC + 0.01
assert d.check() is None
@@ -0,0 +1,50 @@
# SPDX-FileCopyrightText: 2026 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Apache-2.0
from src.backend.stats.transport import TransportMetrics
class TestTransportMetrics:
def test_initial_harvest(self) -> None:
t = TransportMetrics()
snap = t.harvest(1.0)
assert snap.rx_bytes == 0
assert snap.bps == 0.0
assert snap.fps == 0.0
def test_record_bytes(self) -> None:
t = TransportMetrics()
t.record_bytes(1024)
snap = t.harvest(1.0)
assert snap.rx_bytes == 1024
assert snap.bps == 10240.0
def test_record_frame(self) -> None:
t = TransportMetrics()
t.record_frame()
t.record_frame()
snap = t.harvest(1.0)
assert snap.fps == 2.0
def test_max_bps_persists(self) -> None:
t = TransportMetrics()
t.record_bytes(10000)
t.harvest(1.0)
t.record_bytes(100)
snap = t.harvest(1.0)
assert snap.max_bps == 100000.0
def test_zero_elapsed(self) -> None:
t = TransportMetrics()
t.record_bytes(100)
snap = t.harvest(0.0)
assert snap.bps == 0.0
assert snap.fps == 0.0
def test_delta_resets_between_harvests(self) -> None:
t = TransportMetrics()
t.record_bytes(1000)
t.harvest(1.0)
snap = t.harvest(1.0)
assert snap.bps == 0.0
assert snap.rx_bytes == 1000
@@ -0,0 +1,28 @@
# SPDX-FileCopyrightText: 2026 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Apache-2.0
from unittest.mock import patch
from src.backend.uart_transport import validate_uart_port
class TestValidateUartPort:
@patch('src.backend.uart_transport.list_serial_ports', return_value=['/dev/ttyUSB0', '/dev/ttyUSB1'])
def test_valid_port_returns_none(self, _mock: object) -> None:
assert validate_uart_port('/dev/ttyUSB0') is None
@patch('src.backend.uart_transport.list_serial_ports', return_value=['/dev/ttyUSB0'])
def test_invalid_port_returns_error(self, _mock: object) -> None:
result = validate_uart_port('/dev/ttyUSB99')
assert result is not None
assert '/dev/ttyUSB99' in result
@patch('src.backend.uart_transport.list_serial_ports', return_value=['COM3', 'COM4'])
def test_windows_com_port_valid(self, _mock: object) -> None:
"""COM ports don't exist as filesystem paths — must not use Path.exists()."""
assert validate_uart_port('COM3') is None
@patch('src.backend.uart_transport.list_serial_ports', return_value=[])
def test_empty_port_list(self, _mock: object) -> None:
result = validate_uart_port('/dev/ttyUSB0')
assert result is not None