feat(ble_log_console): add Textual frontend and app wiring

TUI frontend built on Textual with thread-safe backend integration:

- Launch screen with port/baud/log-dir selection and auto-refresh
- Log view with scrollable RichLog and color-coded write methods;
  Rich markup escaped to prevent crash on bracket characters
- Stats screen split into two tables by time base:
  - Firmware Counters (since chip init): Written and Buffer Loss
  - Console Measurements (since console start): Average Throughput
    and Peak Write — each with separate frames and bytes sub-columns
  - Column widths constrained with min_width/max_width for stable layout
  - Division-by-zero guard when peak window is zero
- Shortcut screen overlay
- Status panel with sync state, checksum mode, and transport metrics;
  speed display uses named UART_BITS_PER_BYTE constant
- Main app wiring: UART reader thread with threading.Lock for serial
  access, frame parser pipeline, stats emission via StatsUpdated
  messages (including funnel snapshots via public accessor for
  thread-safe delivery), BackendStopped message for disconnect
- BackendStopped posted on open_serial failure (prevents stuck status)
- Select.BLANK guard for baud rate in launch screen
- Explicit None guards replace asserts in backend worker
- Wall-clock burst tracking for REDIR frames (no chip-side timestamp)
- SN gap alerts (FrameLossDetected) and traffic spike notifications
This commit is contained in:
Zhou Xiao
2026-03-23 02:10:45 +08:00
parent cbbc495825
commit e40fd6753d
8 changed files with 923 additions and 0 deletions
+18
View File
@@ -0,0 +1,18 @@
# SPDX-FileCopyrightText: 2026 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Apache-2.0
import sys
if sys.version_info < (3, 10): # noqa: UP036 — runtime guard for users on old Python
print(f'Error: Python 3.10 or later is required.\nCurrent version: {sys.version}')
sys.exit(1)
try:
import textual # noqa: F401
except ImportError:
print(
"Error: 'textual' package is not installed.\n"
"Run 'run.sh' (Linux/macOS) or 'run.bat' (Windows) "
'to launch with auto-setup.'
)
sys.exit(1)
+390
View File
@@ -0,0 +1,390 @@
# SPDX-FileCopyrightText: 2026 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Apache-2.0
"""Textual App wiring backend Worker to frontend widgets.
See Spec Section 6.
"""
import struct
import threading
import time
from datetime import datetime
from pathlib import Path
from typing import cast
import serial
from textual.app import App
from textual.app import ComposeResult
from textual.binding import Binding
from textual.message import Message
from src.backend.frame_parser import FrameParser
from src.backend.internal_decoder import decode_internal_frame
from src.backend.models import FRAME_OVERHEAD
from src.backend.models import LL_TS_OFFSET
from src.backend.models import BackendStopped
from src.backend.models import BleLogSource
from src.backend.models import EnhStatResult
from src.backend.models import FrameLossDetected
from src.backend.models import FunnelSnapshot
from src.backend.models import InfoResult
from src.backend.models import InternalFrameDecoded
from src.backend.models import InternalSource
from src.backend.models import LaunchConfig
from src.backend.models import LogLine
from src.backend.models import LossType
from src.backend.models import ParsedFrame
from src.backend.models import SourcePeakWrite
from src.backend.models import StatsUpdated
from src.backend.models import SyncState
from src.backend.models import SyncStateChanged
from src.backend.models import TrafficSpikeDetected
from src.backend.models import has_os_ts
from src.backend.models import is_ll_source
from src.backend.models import resolve_source_name
from src.backend.stats import StatsAccumulator
from src.backend.uart_transport import UART_BLOCK_SIZE
from src.backend.uart_transport import open_serial
from src.frontend.launch_screen import LaunchScreen
from src.frontend.log_view import LogView
from src.frontend.shortcut_screen import ShortcutScreen
from src.frontend.stats_screen import StatsScreen
from src.frontend.status_panel import StatusPanel
STATS_INTERVAL = 0.25 # seconds
class BLELogApp(App):
CSS = """
Screen {
layout: vertical;
}
"""
BINDINGS = [
Binding('q', 'quit', 'Quit'),
Binding('Q', 'quit', show=False),
Binding('ctrl+c', 'quit', show=False, priority=True),
Binding('c', 'clear_log', 'Clear'),
Binding('C', 'clear_log', show=False),
Binding('s', 'toggle_scroll', 'Auto-scroll'),
Binding('S', 'toggle_scroll', show=False),
Binding('d', 'dump_stats', 'Stats'),
Binding('D', 'dump_stats', show=False),
Binding('h', 'show_help', 'Help'),
Binding('H', 'show_help', show=False),
Binding('r', 'reset_chip', 'Reset'),
Binding('R', 'reset_chip', show=False),
]
def __init__(
self,
port: str | None = None,
baudrate: int = 3_000_000,
log_dir: Path | None = None,
) -> None:
super().__init__()
self._port = port
self._baudrate = baudrate
self._log_dir = log_dir or Path.cwd()
self._output_path: Path | None = None
self._serial: serial.Serial | None = None
# All-time per-source chip write peak (updated from StatsUpdated messages)
self._max_per_source_peak: dict[int, SourcePeakWrite] | None = None
self._ll_max_per_source_peak: dict[int, SourcePeakWrite] | None = None
# Console-side per-source received bytes (from StatsUpdated snapshots)
self._per_source_rx_bytes: dict[int, int] | None = None
self._funnel_snapshots: list[FunnelSnapshot] = []
# Wall-clock capture start (set when backend loop begins)
self._capture_start_time: float = 0.0
self._serial_lock = threading.Lock()
def compose(self) -> ComposeResult:
yield LogView()
yield StatusPanel()
def on_mount(self) -> None:
if self._port is not None:
self._resolve_output_path()
self.run_worker(self._backend_loop, thread=True, exclusive=True)
else:
self.push_screen(LaunchScreen(default_log_dir=self._log_dir), callback=self._on_launch_result)
@property
def funnel_snapshots(self) -> list[FunnelSnapshot]:
"""Public accessor for funnel snapshots (used by StatsScreen)."""
return self._funnel_snapshots
def _on_launch_result(self, config: LaunchConfig | None) -> None:
"""Handle Launch Screen dismissal."""
if config is None:
self.exit()
return
self._port = config.port
self._baudrate = config.baudrate
self._log_dir = config.log_dir
self._resolve_output_path()
self.run_worker(self._backend_loop, thread=True, exclusive=True)
def _resolve_output_path(self) -> None:
"""Generate timestamped output file path in the log directory."""
self._log_dir.mkdir(parents=True, exist_ok=True)
ts = datetime.now().strftime('%Y%m%d_%H%M%S')
self._output_path = self._log_dir / f'ble_log_{ts}.bin'
def _post(self, msg: Message) -> None:
"""Thread-safe message posting from backend worker."""
self.call_from_thread(self.post_message, msg)
def _emit_stats(self, stats: StatsAccumulator, parser: FrameParser, last_time: float) -> float:
"""Emit a stats snapshot if the interval has elapsed. Returns updated timestamp."""
now = time.perf_counter()
if now - last_time < STATS_INTERVAL:
return last_time
elapsed = now - last_time
snapshot = stats.snapshot(
elapsed,
sync_state=parser.sync_state,
checksum_mode=parser.checksum_mode,
)
funnel = stats.funnel_snapshot(elapsed)
self._post(StatsUpdated(snapshot, funnel))
return now
def _backend_loop(self) -> None:
"""Background worker: UART read -> parse -> stats -> messages."""
if self._port is None or self._output_path is None:
self._post(LogLine('Backend started without port/output configuration'))
self._post(BackendStopped('Configuration missing'))
return
parser = FrameParser()
stats = StatsAccumulator()
stats.set_wire_max(self._baudrate)
redir_line_buf = ''
prev_sync_state = SyncState.SEARCHING
last_snapshot_time = time.perf_counter()
try:
self._serial = open_serial(self._port, self._baudrate)
except Exception as e:
self._post(LogLine(f'Failed to open UART: {e}'))
self._post(BackendStopped(f'Failed to open UART: {e}'))
return
self._capture_start_time = time.perf_counter()
ser = self._serial
self._post(LogLine(f'Connected to {self._port} at {self._baudrate} baud'))
# Lazy file handles — created on first data arrival
output_file = None
console_log_file = None
console_log_path = self._output_path.with_name(self._output_path.stem + '_console.log')
try:
while True:
with self._serial_lock:
block = ser.read(UART_BLOCK_SIZE)
if not block:
last_snapshot_time = self._emit_stats(stats, parser, last_snapshot_time)
continue
# 1. Save raw binary (lazy-open on first block)
if output_file is None:
output_file = open(self._output_path, 'wb') # noqa: SIM115
self._post(LogLine(f'Saving to {self._output_path}'))
output_file.write(block)
output_file.flush()
# 2. Track bytes
stats.record_bytes(len(block))
# 3. Parse frames
results = parser.feed(block)
# 4. Check sync state transition
if parser.sync_state != prev_sync_state:
self._post(SyncStateChanged(parser.sync_state))
prev_sync_state = parser.sync_state
# 5. Process results
for item in results:
if isinstance(item, ParsedFrame):
frame_size = len(item.payload) + FRAME_OVERHEAD
if item.source_code != BleLogSource.INTERNAL:
stats.record_frame(frame_size, item.source_code, item.frame_sn)
stats.record_frame_traffic(frame_size, item.source_code)
else:
stats.record_frame() # count frame for transport metrics, no SN tracking
if has_os_ts(item.source_code) and item.source_code != BleLogSource.INTERNAL:
stats.record_frame_ts(item.os_ts_ms, frame_size, item.source_code)
elif is_ll_source(item.source_code) and len(item.payload) >= 6:
(lc_ts_us,) = struct.unpack_from('<I', item.payload, LL_TS_OFFSET)
stats.record_ll_frame_ts(lc_ts_us, frame_size, item.source_code)
elif item.source_code == BleLogSource.REDIR:
wall_ms = int(time.perf_counter() * 1000) & 0xFFFFFFFF
stats.record_frame_wall_ts(wall_ms, frame_size, item.source_code)
# Decode internal frames
if item.source_code == BleLogSource.INTERNAL:
decoded = decode_internal_frame(item.payload)
if decoded:
int_src = decoded['int_src']
self._post(InternalFrameDecoded(int_src, decoded))
if int_src in (InternalSource.INIT_DONE, InternalSource.INFO):
info = cast(InfoResult, decoded)
stats.set_firmware_version(info['version'])
if int_src == InternalSource.INIT_DONE:
stats.reset('init')
elif int_src == InternalSource.FLUSH:
stats.reset('flush')
elif int_src == InternalSource.ENH_STAT:
enh = cast(EnhStatResult, decoded)
new_frames, new_bytes = stats.record_enh_stat(
src_code=enh['src_code'],
written_frames=enh['written_frame_cnt'],
lost_frames=enh['lost_frame_cnt'],
written_bytes=enh['written_bytes_cnt'],
lost_bytes=enh['lost_bytes_cnt'],
baudrate=self._baudrate,
)
if new_frames > 0:
source_name = resolve_source_name(enh['src_code'])
self._post(
FrameLossDetected(
source_name,
loss_type=LossType.BUFFER,
lost_frames=new_frames,
lost_bytes=new_bytes,
)
)
# Decode UART redirect frames (raw ASCII, no os_ts prefix).
# A single log line may span multiple frames due to
# batch sealing, so buffer partial lines until '\n'.
elif item.source_code == BleLogSource.REDIR:
payload_text = item.payload.decode('ascii', errors='replace')
# Write raw payload to console log (independent of line buffering)
if console_log_file is None:
console_log_file = open(console_log_path, 'w') # noqa: SIM115
console_log_file.write(payload_text)
console_log_file.flush()
redir_line_buf += payload_text
while '\n' in redir_line_buf:
line, redir_line_buf = redir_line_buf.split('\n', 1)
if line:
self._post(LogLine(line))
elif isinstance(item, str):
self._post(LogLine(item))
# 6. Traffic spike detection
spike = stats.check_traffic()
if spike is not None:
self._post(
TrafficSpikeDetected(
throughput_kbs=spike.throughput_kbs,
wire_max_kbs=spike.wire_max_kbs,
utilization_pct=spike.utilization_pct,
duration_ms=spike.duration_ms,
per_source=spike.per_source,
)
)
# 7. Periodic stats snapshot
last_snapshot_time = self._emit_stats(stats, parser, last_snapshot_time)
except Exception as e:
self._post(LogLine(f'Error: {e}'))
finally:
ser.close()
if output_file is not None:
output_file.close()
if console_log_file is not None:
console_log_file.close()
self._post(BackendStopped('Serial connection closed'))
# --- Message handlers ---
def on_sync_state_changed(self, msg: SyncStateChanged) -> None:
log_view = self.query_one(LogView)
log_view.write_sync(f'State: {msg.state.value}')
def on_stats_updated(self, msg: StatsUpdated) -> None:
panel = self.query_one(StatusPanel)
panel.stats = msg.stats
self._funnel_snapshots = msg.funnel_snapshots
# Preserve all-time per-source peak for the stats screen
if msg.stats.os_peak.max_per_source is not None:
self._max_per_source_peak = msg.stats.os_peak.max_per_source
if msg.stats.ll_peak.max_per_source is not None:
self._ll_max_per_source_peak = msg.stats.ll_peak.max_per_source
if msg.stats.per_source_rx_bytes is not None:
self._per_source_rx_bytes = msg.stats.per_source_rx_bytes
def on_internal_frame_decoded(self, msg: InternalFrameDecoded) -> None:
if msg.int_src == InternalSource.INIT_DONE:
info = cast(InfoResult, msg.payload)
log_view = self.query_one(LogView)
log_view.write_info(f'BLE Log v{info["version"]} initialized - statistics reset')
elif msg.int_src == InternalSource.FLUSH:
log_view = self.query_one(LogView)
log_view.write_info('Firmware flush - SN counters reset')
def on_log_line(self, msg: LogLine) -> None:
log_view = self.query_one(LogView)
log_view.write_ascii(msg.text)
def on_frame_loss_detected(self, msg: FrameLossDetected) -> None:
log_view = self.query_one(LogView)
log_view.write_warning(
f'Frame loss [{msg.source_name}] ({msg.loss_type.value}): {msg.lost_frames} frames, {msg.lost_bytes} bytes'
)
def on_backend_stopped(self, msg: BackendStopped) -> None:
log_view = self.query_one(LogView)
log_view.write_warning(f'Backend stopped: {msg.reason}')
panel = self.query_one(StatusPanel)
panel.disconnected = True
def on_traffic_spike_detected(self, msg: TrafficSpikeDetected) -> None:
top_sources = sorted(msg.per_source.items(), key=lambda x: x[1], reverse=True)
src_parts = ', '.join(f'{resolve_source_name(s)} {p:.0f}%' for s, p in top_sources if p >= 1.0)
if msg.utilization_pct >= 100.0:
util_str = 'saturated'
else:
util_str = f'{msg.utilization_pct:.0f}% wire'
log_view = self.query_one(LogView)
log_view.write_traffic(f'{msg.throughput_kbs:.0f} KB/s ({util_str}) over {msg.duration_ms:.0f}ms | {src_parts}')
# --- Actions ---
def action_clear_log(self) -> None:
self.query_one(LogView).clear()
def action_toggle_scroll(self) -> None:
log_view = self.query_one(LogView)
log_view.auto_scroll = not log_view.auto_scroll
def action_dump_stats(self) -> None:
self.push_screen(StatsScreen(start_time=self._capture_start_time))
def action_show_help(self) -> None:
self.push_screen(ShortcutScreen())
def action_reset_chip(self) -> None:
"""Reset ESP32 via DTR/RTS toggle (same sequence as esptool)."""
ser = self._serial
if ser is None or not ser.is_open:
return
with self._serial_lock:
ser.dtr = False
ser.rts = True
time.sleep(0.1)
ser.rts = False
log_view = self.query_one(LogView)
log_view.write_info('Chip reset triggered')
@@ -0,0 +1,2 @@
# SPDX-FileCopyrightText: 2026 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Apache-2.0
@@ -0,0 +1,175 @@
# SPDX-FileCopyrightText: 2026 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Apache-2.0
"""Launch Screen — interactive setup for port, baud rate, and log directory.
Shown on startup when --port is not provided via CLI.
Dismissed with a LaunchConfig result on Connect, or None on quit.
"""
from pathlib import Path
from textual import on
from textual import work
from textual.app import ComposeResult
from textual.binding import Binding
from textual.containers import Center
from textual.containers import Vertical
from textual.screen import Screen
from textual.widgets import Button
from textual.widgets import Input
from textual.widgets import Label
from textual.widgets import Select
from textual_fspicker import SelectDirectory
from src.backend.models import LaunchConfig
from src.backend.uart_transport import list_serial_ports
BAUD_RATES: list[int] = [115200, 230400, 460800, 921600, 1500000, 2000000, 3000000]
DEFAULT_BAUD_RATE: int = 3000000
class LaunchScreen(Screen[LaunchConfig | None]):
"""Interactive setup screen for BLE Log Console."""
DEFAULT_CSS = """
LaunchScreen {
align: center middle;
}
#launch-container {
width: 60;
max-height: 80%;
background: $surface;
padding: 1 2;
border: thick $accent;
}
#launch-title {
text-align: center;
text-style: bold;
margin-bottom: 1;
}
.field-label {
margin-top: 1;
}
.field-row {
height: auto;
layout: horizontal;
}
#port-select {
width: 1fr;
}
#refresh-btn {
width: auto;
min-width: 12;
}
#dir-input {
width: 1fr;
}
#browse-btn {
width: auto;
min-width: 12;
}
#connect-btn {
margin-top: 2;
}
#no-ports-label {
color: $warning;
}
"""
BINDINGS = [
Binding('q', 'quit', 'Quit'),
Binding('Q', 'quit', show=False),
Binding('ctrl+c', 'quit', show=False, priority=True),
]
def __init__(self, default_log_dir: Path | None = None) -> None:
super().__init__()
self._default_log_dir = default_log_dir or Path.cwd()
def compose(self) -> ComposeResult:
ports = list_serial_ports()
port_options = [(p, p) for p in ports]
baud_options = [(str(b), b) for b in BAUD_RATES]
with Vertical(id='launch-container'):
yield Label('BLE Log Console Setup', id='launch-title')
yield Label('Serial Port', classes='field-label')
with Vertical(classes='field-row'):
if port_options:
yield Select(port_options, value=ports[0], id='port-select')
else:
yield Select([], id='port-select', prompt='No ports detected')
yield Label('No serial ports detected', id='no-ports-label')
yield Button('Refresh', id='refresh-btn')
yield Label('Baud Rate', classes='field-label')
yield Select(baud_options, value=DEFAULT_BAUD_RATE, id='baud-select')
yield Label('Log Directory', classes='field-label')
with Vertical(classes='field-row'):
yield Input(str(self._default_log_dir), id='dir-input')
yield Button('Browse...', id='browse-btn')
with Center():
yield Button('Connect', variant='primary', id='connect-btn')
@on(Button.Pressed, '#refresh-btn')
def refresh_ports(self) -> None:
"""Re-scan serial ports and update the Select widget."""
ports = list_serial_ports()
port_options = [(p, p) for p in ports]
port_select = self.query_one('#port-select', Select)
port_select.set_options(port_options)
if ports:
port_select.value = ports[0]
@on(Button.Pressed, '#browse-btn')
@work
async def browse_directory(self) -> None:
"""Open a directory picker dialog."""
current = self.query_one('#dir-input', Input).value
start = Path(current) if current else Path.cwd()
if not start.is_dir():
start = Path.cwd()
chosen = await self.app.push_screen_wait(SelectDirectory(location=start))
if chosen is not None:
self.query_one('#dir-input', Input).value = str(chosen)
@on(Button.Pressed, '#connect-btn')
def connect(self) -> None:
"""Validate and return config."""
port_select = self.query_one('#port-select', Select)
baud_select = self.query_one('#baud-select', Select)
dir_input = self.query_one('#dir-input', Input)
if port_select.value is Select.BLANK:
self.notify('Please select a serial port', severity='error')
return
if baud_select.value is Select.BLANK:
self.notify('Please select a baud rate', severity='error')
return
log_dir = Path(dir_input.value)
config = LaunchConfig(
port=str(port_select.value),
baudrate=int(baud_select.value), # type: ignore[arg-type] # guarded above
log_dir=log_dir,
)
self.dismiss(config)
def action_quit(self) -> None:
self.dismiss(None)
@@ -0,0 +1,47 @@
# SPDX-FileCopyrightText: 2026 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Apache-2.0
"""Scrollable log view widget.
See Spec Section 11.
"""
from rich.text import Text
from textual.widgets import RichLog
class LogView(RichLog):
DEFAULT_CSS = """
LogView {
height: 1fr;
}
"""
def __init__(self) -> None:
super().__init__(highlight=False, markup=True, wrap=True, auto_scroll=True)
def _write_tagged(self, tag: str, color: str, text: str) -> None:
t = Text.from_markup(f'[dim][{color}]\\[{tag}][/{color}] [/dim]')
t.append(text)
self.write(t)
def write_info(self, text: str) -> None:
self._write_tagged('INFO', 'green', text)
def write_warning(self, text: str) -> None:
self._write_tagged('WARN', 'yellow', text)
def write_error(self, text: str) -> None:
self._write_tagged('ERROR', 'red', text)
def write_sync(self, text: str) -> None:
self._write_tagged('SYNC', 'cyan', text)
def write_enh_stat(self, text: str) -> None:
self._write_tagged('ENH_STAT', 'cyan', text)
def write_traffic(self, text: str) -> None:
self._write_tagged('TRAFFIC', 'magenta', text)
def write_ascii(self, text: str) -> None:
self.write(Text(text))
@@ -0,0 +1,65 @@
# SPDX-FileCopyrightText: 2026 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Apache-2.0
"""Modal screen showing available keyboard shortcuts.
Pushed by the 'h' keybinding; dismissed by Escape or 'h' again.
"""
from rich.table import Table
from textual.app import ComposeResult
from textual.binding import Binding
from textual.screen import ModalScreen
from textual.widgets import Static
_SHORTCUTS = [
('q', 'Quit'),
('c', 'Clear log'),
('s', 'Toggle auto-scroll'),
('d', 'Frame statistics'),
('h', 'This help screen'),
('r', 'Reset chip'),
]
def _build_shortcut_table() -> Table:
"""Build a Rich Table listing all keyboard shortcuts."""
table = Table(title='Keyboard Shortcuts', expand=True)
table.add_column('Key', style='cyan', no_wrap=True)
table.add_column('Action')
for key, action in _SHORTCUTS:
table.add_row(key, action)
return table
class ShortcutScreen(ModalScreen):
"""Modal overlay showing available keyboard shortcuts."""
DEFAULT_CSS = """
ShortcutScreen {
align: center middle;
}
ShortcutScreen > Static {
width: 60;
max-height: 80%;
background: $surface;
padding: 1 2;
border: thick $accent;
}
"""
BINDINGS = [
Binding('escape', 'dismiss', 'Close'),
Binding('h', 'dismiss', 'Close'),
Binding('H', 'dismiss', show=False),
]
def compose(self) -> ComposeResult:
table = _build_shortcut_table()
content = Static()
content.update(table)
yield content
yield Static('[dim]Press Escape to return[/dim]')
@@ -0,0 +1,157 @@
# SPDX-FileCopyrightText: 2026 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Apache-2.0
"""Modal screen for per-source frame statistics display.
Pushed by the 'd' keybinding; dismissed by Escape or 'd' again.
Refreshes every second to show live throughput data.
"""
from __future__ import annotations
from typing import TYPE_CHECKING
from rich.table import Table
from rich.text import Text
from textual.app import ComposeResult
from textual.binding import Binding
from textual.containers import Vertical
from textual.screen import ModalScreen
from textual.widgets import Static
from src.backend.models import FunnelSnapshot
from src.backend.models import format_bytes
from src.backend.models import format_throughput
from src.backend.models import resolve_source_name
if TYPE_CHECKING:
from src.app import BLELogApp
REFRESH_INTERVAL_SEC = 1.0
def _fmt_frames(n: int) -> str:
return str(n) if n > 0 else '-'
def _fmt_loss_frames(n: int) -> Text:
if n == 0:
return Text('-')
return Text(str(n), style='red')
def _fmt_loss_bytes(n: int) -> Text:
if n == 0:
return Text('-')
return Text(format_bytes(n), style='red')
def _build_firmware_table(snapshots: list[FunnelSnapshot]) -> Table:
table = Table(title='Firmware Counters (since chip init)', expand=True)
table.add_column('Source', style='cyan', no_wrap=True, min_width=12, max_width=16)
table.add_column('Written\nFrames', justify='right', min_width=10, max_width=12)
table.add_column('Written\nBytes', justify='right', min_width=10, max_width=12)
table.add_column('Buffer Loss\nFrames', justify='right', min_width=12, max_width=14)
table.add_column('Buffer Loss\nBytes', justify='right', min_width=12, max_width=14)
for snap in snapshots:
table.add_row(
resolve_source_name(snap.source),
_fmt_frames(snap.written.frames),
format_bytes(snap.written.bytes) if snap.written.bytes > 0 else '-',
_fmt_loss_frames(snap.buffer_loss.frames),
_fmt_loss_bytes(snap.buffer_loss.bytes),
)
return table
def _build_console_table(snapshots: list[FunnelSnapshot]) -> Table:
table = Table(title='Console Measurements (since console start)', expand=True)
table.add_column('Source', style='cyan', no_wrap=True, min_width=12, max_width=16)
table.add_column('Received\nFrames', justify='right', min_width=10, max_width=12)
table.add_column('Received\nBytes', justify='right', min_width=10, max_width=12)
table.add_column('Average\nFrames/s', justify='right', style='magenta', min_width=10, max_width=12)
table.add_column('Average\nBytes/s', justify='right', style='magenta', min_width=10, max_width=12)
table.add_column('Peak\nFrames/10ms', justify='right', style='magenta', min_width=12, max_width=14)
table.add_column('Peak\nBytes/s', justify='right', style='magenta', min_width=12, max_width=14)
for snap in snapshots:
tp_fps = snap.throughput.throughput_fps
tp_bps = snap.throughput.throughput_bps
pf = snap.throughput.peak_write_frames
pb = snap.throughput.peak_write_bytes
wms = snap.throughput.peak_window_ms
table.add_row(
resolve_source_name(snap.source),
_fmt_frames(snap.received.frames),
format_bytes(snap.received.bytes) if snap.received.bytes > 0 else '-',
f'{tp_fps:.0f}' if tp_fps > 0 else '-',
format_throughput(tp_bps) if tp_bps > 0 else '-',
f'{pf}' if pf > 0 else '-',
format_throughput(pb * 1000 / wms) if pf > 0 and wms > 0 else '-',
)
return table
class StatsScreen(ModalScreen):
"""Modal overlay showing per-source frame statistics with live refresh."""
DEFAULT_CSS = """
StatsScreen {
align: center middle;
}
#stats-container {
width: 90%;
max-width: 140;
height: auto;
max-height: 80%;
overflow-y: auto;
background: $surface;
padding: 1 2;
border: thick $accent;
}
#stats-container > Static {
height: auto;
}
"""
BINDINGS = [
Binding('escape', 'dismiss', 'Close'),
Binding('d', 'dismiss', 'Close'),
]
def __init__(self, start_time: float) -> None:
super().__init__()
self._start_time = start_time
def _get_app(self) -> BLELogApp:
return self.app # type: ignore[return-value]
def compose(self) -> ComposeResult:
with Vertical(id='stats-container'):
yield Static(id='firmware-table')
yield Static(id='console-table')
yield Static('[dim]Press Escape to return — refreshes every 1s[/dim]')
def on_mount(self) -> None:
self._refresh_table()
self.set_interval(REFRESH_INTERVAL_SEC, self._refresh_table)
def _refresh_table(self) -> None:
app = self._get_app()
snapshots = app.funnel_snapshots
fw = self.query_one('#firmware-table', Static)
cs = self.query_one('#console-table', Static)
if not snapshots:
fw.update('No data received yet.\n\nPress Escape to return.')
cs.update('')
return
fw.update(_build_firmware_table(snapshots))
cs.update(_build_console_table(snapshots))
@@ -0,0 +1,69 @@
# SPDX-FileCopyrightText: 2026 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Apache-2.0
"""Status panel widget — docked to bottom, shows live stats.
See Spec Section 11.
"""
from rich.text import Text
from textual.reactive import reactive
from textual.widget import Widget
from src.backend.models import FrameStats
from src.backend.models import SyncState
from src.backend.models import format_bytes
from src.backend.models import format_throughput
from src.backend.stats import UART_BITS_PER_BYTE
def _format_speed(bps: float) -> str:
return format_throughput(bps / UART_BITS_PER_BYTE)
_SYNC_COLORS = {
SyncState.SEARCHING: 'yellow',
SyncState.CONFIRMING_SYNC: 'cyan',
SyncState.SYNCED: 'green',
SyncState.CONFIRMING_LOSS: 'red',
}
class StatusPanel(Widget):
DEFAULT_CSS = """
StatusPanel {
dock: bottom;
height: 3;
border-top: solid $accent;
padding: 0 1;
}
"""
stats: reactive[FrameStats] = reactive(FrameStats)
disconnected: reactive[bool] = reactive(False)
def render(self) -> Text:
s = self.stats
if self.disconnected:
line1 = '[bold red]DISCONNECTED[/bold red]'
line2 = 'Backend stopped — serial connection closed'
return Text.from_markup(f'{line1}\n{line2}')
sync_color = _SYNC_COLORS.get(s.sync_state, 'white')
if s.checksum_algorithm and s.checksum_scope:
cksum_str = f' | Checksum: {s.checksum_algorithm.value} / {s.checksum_scope.value}'
else:
cksum_str = ''
line1 = f'Sync: [{sync_color}]{s.sync_state.value}[/{sync_color}]{cksum_str} | Press [bold]h[/bold] for help'
t = s.transport
loss = s.loss
loss_style = 'red' if loss.total_frames > 0 else 'yellow'
line2 = (
f'RX: {format_bytes(t.rx_bytes)} '
f'Speed: {_format_speed(t.bps)} '
f'Max: {_format_speed(t.max_bps)} '
f'Rate: {t.fps:.0f} fps '
f'[{loss_style}]Lost: {loss.total_frames} frames, {format_bytes(loss.total_bytes)}[/{loss_style}]'
)
return Text.from_markup(f'{line1}\n{line2}')