diff --git a/tools/bt/ble_log_console/src/__init__.py b/tools/bt/ble_log_console/src/__init__.py new file mode 100644 index 0000000000..3609c82fad --- /dev/null +++ b/tools/bt/ble_log_console/src/__init__.py @@ -0,0 +1,18 @@ +# SPDX-FileCopyrightText: 2026 Espressif Systems (Shanghai) CO LTD +# SPDX-License-Identifier: Apache-2.0 + +import sys + +if sys.version_info < (3, 10): # noqa: UP036 — runtime guard for users on old Python + print(f'Error: Python 3.10 or later is required.\nCurrent version: {sys.version}') + sys.exit(1) + +try: + import textual # noqa: F401 +except ImportError: + print( + "Error: 'textual' package is not installed.\n" + "Run 'run.sh' (Linux/macOS) or 'run.bat' (Windows) " + 'to launch with auto-setup.' + ) + sys.exit(1) diff --git a/tools/bt/ble_log_console/src/app.py b/tools/bt/ble_log_console/src/app.py new file mode 100644 index 0000000000..116409885d --- /dev/null +++ b/tools/bt/ble_log_console/src/app.py @@ -0,0 +1,390 @@ +# SPDX-FileCopyrightText: 2026 Espressif Systems (Shanghai) CO LTD +# SPDX-License-Identifier: Apache-2.0 + +"""Textual App wiring backend Worker to frontend widgets. + +See Spec Section 6. +""" + +import struct +import threading +import time +from datetime import datetime +from pathlib import Path +from typing import cast + +import serial +from textual.app import App +from textual.app import ComposeResult +from textual.binding import Binding +from textual.message import Message + +from src.backend.frame_parser import FrameParser +from src.backend.internal_decoder import decode_internal_frame +from src.backend.models import FRAME_OVERHEAD +from src.backend.models import LL_TS_OFFSET +from src.backend.models import BackendStopped +from src.backend.models import BleLogSource +from src.backend.models import EnhStatResult +from src.backend.models import FrameLossDetected +from src.backend.models import FunnelSnapshot +from src.backend.models import InfoResult +from src.backend.models import InternalFrameDecoded +from src.backend.models import InternalSource +from src.backend.models import LaunchConfig +from src.backend.models import LogLine +from src.backend.models import LossType +from src.backend.models import ParsedFrame +from src.backend.models import SourcePeakWrite +from src.backend.models import StatsUpdated +from src.backend.models import SyncState +from src.backend.models import SyncStateChanged +from src.backend.models import TrafficSpikeDetected +from src.backend.models import has_os_ts +from src.backend.models import is_ll_source +from src.backend.models import resolve_source_name +from src.backend.stats import StatsAccumulator +from src.backend.uart_transport import UART_BLOCK_SIZE +from src.backend.uart_transport import open_serial +from src.frontend.launch_screen import LaunchScreen +from src.frontend.log_view import LogView +from src.frontend.shortcut_screen import ShortcutScreen +from src.frontend.stats_screen import StatsScreen +from src.frontend.status_panel import StatusPanel + +STATS_INTERVAL = 0.25 # seconds + + +class BLELogApp(App): + CSS = """ + Screen { + layout: vertical; + } + """ + + BINDINGS = [ + Binding('q', 'quit', 'Quit'), + Binding('Q', 'quit', show=False), + Binding('ctrl+c', 'quit', show=False, priority=True), + Binding('c', 'clear_log', 'Clear'), + Binding('C', 'clear_log', show=False), + Binding('s', 'toggle_scroll', 'Auto-scroll'), + Binding('S', 'toggle_scroll', show=False), + Binding('d', 'dump_stats', 'Stats'), + Binding('D', 'dump_stats', show=False), + Binding('h', 'show_help', 'Help'), + Binding('H', 'show_help', show=False), + Binding('r', 'reset_chip', 'Reset'), + Binding('R', 'reset_chip', show=False), + ] + + def __init__( + self, + port: str | None = None, + baudrate: int = 3_000_000, + log_dir: Path | None = None, + ) -> None: + super().__init__() + self._port = port + self._baudrate = baudrate + self._log_dir = log_dir or Path.cwd() + self._output_path: Path | None = None + self._serial: serial.Serial | None = None + # All-time per-source chip write peak (updated from StatsUpdated messages) + self._max_per_source_peak: dict[int, SourcePeakWrite] | None = None + self._ll_max_per_source_peak: dict[int, SourcePeakWrite] | None = None + # Console-side per-source received bytes (from StatsUpdated snapshots) + self._per_source_rx_bytes: dict[int, int] | None = None + self._funnel_snapshots: list[FunnelSnapshot] = [] + # Wall-clock capture start (set when backend loop begins) + self._capture_start_time: float = 0.0 + self._serial_lock = threading.Lock() + + def compose(self) -> ComposeResult: + yield LogView() + yield StatusPanel() + + def on_mount(self) -> None: + if self._port is not None: + self._resolve_output_path() + self.run_worker(self._backend_loop, thread=True, exclusive=True) + else: + self.push_screen(LaunchScreen(default_log_dir=self._log_dir), callback=self._on_launch_result) + + @property + def funnel_snapshots(self) -> list[FunnelSnapshot]: + """Public accessor for funnel snapshots (used by StatsScreen).""" + return self._funnel_snapshots + + def _on_launch_result(self, config: LaunchConfig | None) -> None: + """Handle Launch Screen dismissal.""" + if config is None: + self.exit() + return + self._port = config.port + self._baudrate = config.baudrate + self._log_dir = config.log_dir + self._resolve_output_path() + self.run_worker(self._backend_loop, thread=True, exclusive=True) + + def _resolve_output_path(self) -> None: + """Generate timestamped output file path in the log directory.""" + self._log_dir.mkdir(parents=True, exist_ok=True) + ts = datetime.now().strftime('%Y%m%d_%H%M%S') + self._output_path = self._log_dir / f'ble_log_{ts}.bin' + + def _post(self, msg: Message) -> None: + """Thread-safe message posting from backend worker.""" + self.call_from_thread(self.post_message, msg) + + def _emit_stats(self, stats: StatsAccumulator, parser: FrameParser, last_time: float) -> float: + """Emit a stats snapshot if the interval has elapsed. Returns updated timestamp.""" + now = time.perf_counter() + if now - last_time < STATS_INTERVAL: + return last_time + + elapsed = now - last_time + snapshot = stats.snapshot( + elapsed, + sync_state=parser.sync_state, + checksum_mode=parser.checksum_mode, + ) + funnel = stats.funnel_snapshot(elapsed) + self._post(StatsUpdated(snapshot, funnel)) + return now + + def _backend_loop(self) -> None: + """Background worker: UART read -> parse -> stats -> messages.""" + if self._port is None or self._output_path is None: + self._post(LogLine('Backend started without port/output configuration')) + self._post(BackendStopped('Configuration missing')) + return + parser = FrameParser() + stats = StatsAccumulator() + stats.set_wire_max(self._baudrate) + redir_line_buf = '' + prev_sync_state = SyncState.SEARCHING + last_snapshot_time = time.perf_counter() + + try: + self._serial = open_serial(self._port, self._baudrate) + except Exception as e: + self._post(LogLine(f'Failed to open UART: {e}')) + self._post(BackendStopped(f'Failed to open UART: {e}')) + return + + self._capture_start_time = time.perf_counter() + ser = self._serial + self._post(LogLine(f'Connected to {self._port} at {self._baudrate} baud')) + + # Lazy file handles — created on first data arrival + output_file = None + console_log_file = None + console_log_path = self._output_path.with_name(self._output_path.stem + '_console.log') + + try: + while True: + with self._serial_lock: + block = ser.read(UART_BLOCK_SIZE) + if not block: + last_snapshot_time = self._emit_stats(stats, parser, last_snapshot_time) + continue + + # 1. Save raw binary (lazy-open on first block) + if output_file is None: + output_file = open(self._output_path, 'wb') # noqa: SIM115 + self._post(LogLine(f'Saving to {self._output_path}')) + output_file.write(block) + output_file.flush() + + # 2. Track bytes + stats.record_bytes(len(block)) + + # 3. Parse frames + results = parser.feed(block) + + # 4. Check sync state transition + if parser.sync_state != prev_sync_state: + self._post(SyncStateChanged(parser.sync_state)) + prev_sync_state = parser.sync_state + + # 5. Process results + for item in results: + if isinstance(item, ParsedFrame): + frame_size = len(item.payload) + FRAME_OVERHEAD + if item.source_code != BleLogSource.INTERNAL: + stats.record_frame(frame_size, item.source_code, item.frame_sn) + stats.record_frame_traffic(frame_size, item.source_code) + else: + stats.record_frame() # count frame for transport metrics, no SN tracking + if has_os_ts(item.source_code) and item.source_code != BleLogSource.INTERNAL: + stats.record_frame_ts(item.os_ts_ms, frame_size, item.source_code) + elif is_ll_source(item.source_code) and len(item.payload) >= 6: + (lc_ts_us,) = struct.unpack_from(' 0: + source_name = resolve_source_name(enh['src_code']) + self._post( + FrameLossDetected( + source_name, + loss_type=LossType.BUFFER, + lost_frames=new_frames, + lost_bytes=new_bytes, + ) + ) + + # Decode UART redirect frames (raw ASCII, no os_ts prefix). + # A single log line may span multiple frames due to + # batch sealing, so buffer partial lines until '\n'. + elif item.source_code == BleLogSource.REDIR: + payload_text = item.payload.decode('ascii', errors='replace') + + # Write raw payload to console log (independent of line buffering) + if console_log_file is None: + console_log_file = open(console_log_path, 'w') # noqa: SIM115 + console_log_file.write(payload_text) + console_log_file.flush() + + redir_line_buf += payload_text + while '\n' in redir_line_buf: + line, redir_line_buf = redir_line_buf.split('\n', 1) + if line: + self._post(LogLine(line)) + + elif isinstance(item, str): + self._post(LogLine(item)) + + # 6. Traffic spike detection + spike = stats.check_traffic() + if spike is not None: + self._post( + TrafficSpikeDetected( + throughput_kbs=spike.throughput_kbs, + wire_max_kbs=spike.wire_max_kbs, + utilization_pct=spike.utilization_pct, + duration_ms=spike.duration_ms, + per_source=spike.per_source, + ) + ) + + # 7. Periodic stats snapshot + last_snapshot_time = self._emit_stats(stats, parser, last_snapshot_time) + + except Exception as e: + self._post(LogLine(f'Error: {e}')) + finally: + ser.close() + if output_file is not None: + output_file.close() + if console_log_file is not None: + console_log_file.close() + self._post(BackendStopped('Serial connection closed')) + + # --- Message handlers --- + + def on_sync_state_changed(self, msg: SyncStateChanged) -> None: + log_view = self.query_one(LogView) + log_view.write_sync(f'State: {msg.state.value}') + + def on_stats_updated(self, msg: StatsUpdated) -> None: + panel = self.query_one(StatusPanel) + panel.stats = msg.stats + self._funnel_snapshots = msg.funnel_snapshots + # Preserve all-time per-source peak for the stats screen + if msg.stats.os_peak.max_per_source is not None: + self._max_per_source_peak = msg.stats.os_peak.max_per_source + if msg.stats.ll_peak.max_per_source is not None: + self._ll_max_per_source_peak = msg.stats.ll_peak.max_per_source + if msg.stats.per_source_rx_bytes is not None: + self._per_source_rx_bytes = msg.stats.per_source_rx_bytes + + def on_internal_frame_decoded(self, msg: InternalFrameDecoded) -> None: + if msg.int_src == InternalSource.INIT_DONE: + info = cast(InfoResult, msg.payload) + log_view = self.query_one(LogView) + log_view.write_info(f'BLE Log v{info["version"]} initialized - statistics reset') + elif msg.int_src == InternalSource.FLUSH: + log_view = self.query_one(LogView) + log_view.write_info('Firmware flush - SN counters reset') + + def on_log_line(self, msg: LogLine) -> None: + log_view = self.query_one(LogView) + log_view.write_ascii(msg.text) + + def on_frame_loss_detected(self, msg: FrameLossDetected) -> None: + log_view = self.query_one(LogView) + log_view.write_warning( + f'Frame loss [{msg.source_name}] ({msg.loss_type.value}): {msg.lost_frames} frames, {msg.lost_bytes} bytes' + ) + + def on_backend_stopped(self, msg: BackendStopped) -> None: + log_view = self.query_one(LogView) + log_view.write_warning(f'Backend stopped: {msg.reason}') + panel = self.query_one(StatusPanel) + panel.disconnected = True + + def on_traffic_spike_detected(self, msg: TrafficSpikeDetected) -> None: + top_sources = sorted(msg.per_source.items(), key=lambda x: x[1], reverse=True) + src_parts = ', '.join(f'{resolve_source_name(s)} {p:.0f}%' for s, p in top_sources if p >= 1.0) + if msg.utilization_pct >= 100.0: + util_str = 'saturated' + else: + util_str = f'{msg.utilization_pct:.0f}% wire' + log_view = self.query_one(LogView) + log_view.write_traffic(f'{msg.throughput_kbs:.0f} KB/s ({util_str}) over {msg.duration_ms:.0f}ms | {src_parts}') + + # --- Actions --- + + def action_clear_log(self) -> None: + self.query_one(LogView).clear() + + def action_toggle_scroll(self) -> None: + log_view = self.query_one(LogView) + log_view.auto_scroll = not log_view.auto_scroll + + def action_dump_stats(self) -> None: + self.push_screen(StatsScreen(start_time=self._capture_start_time)) + + def action_show_help(self) -> None: + self.push_screen(ShortcutScreen()) + + def action_reset_chip(self) -> None: + """Reset ESP32 via DTR/RTS toggle (same sequence as esptool).""" + ser = self._serial + if ser is None or not ser.is_open: + return + with self._serial_lock: + ser.dtr = False + ser.rts = True + time.sleep(0.1) + ser.rts = False + log_view = self.query_one(LogView) + log_view.write_info('Chip reset triggered') diff --git a/tools/bt/ble_log_console/src/frontend/__init__.py b/tools/bt/ble_log_console/src/frontend/__init__.py new file mode 100644 index 0000000000..a9ace4413e --- /dev/null +++ b/tools/bt/ble_log_console/src/frontend/__init__.py @@ -0,0 +1,2 @@ +# SPDX-FileCopyrightText: 2026 Espressif Systems (Shanghai) CO LTD +# SPDX-License-Identifier: Apache-2.0 diff --git a/tools/bt/ble_log_console/src/frontend/launch_screen.py b/tools/bt/ble_log_console/src/frontend/launch_screen.py new file mode 100644 index 0000000000..8fffc199ad --- /dev/null +++ b/tools/bt/ble_log_console/src/frontend/launch_screen.py @@ -0,0 +1,175 @@ +# SPDX-FileCopyrightText: 2026 Espressif Systems (Shanghai) CO LTD +# SPDX-License-Identifier: Apache-2.0 + +"""Launch Screen — interactive setup for port, baud rate, and log directory. + +Shown on startup when --port is not provided via CLI. +Dismissed with a LaunchConfig result on Connect, or None on quit. +""" + +from pathlib import Path + +from textual import on +from textual import work +from textual.app import ComposeResult +from textual.binding import Binding +from textual.containers import Center +from textual.containers import Vertical +from textual.screen import Screen +from textual.widgets import Button +from textual.widgets import Input +from textual.widgets import Label +from textual.widgets import Select +from textual_fspicker import SelectDirectory + +from src.backend.models import LaunchConfig +from src.backend.uart_transport import list_serial_ports + +BAUD_RATES: list[int] = [115200, 230400, 460800, 921600, 1500000, 2000000, 3000000] +DEFAULT_BAUD_RATE: int = 3000000 + + +class LaunchScreen(Screen[LaunchConfig | None]): + """Interactive setup screen for BLE Log Console.""" + + DEFAULT_CSS = """ + LaunchScreen { + align: center middle; + } + + #launch-container { + width: 60; + max-height: 80%; + background: $surface; + padding: 1 2; + border: thick $accent; + } + + #launch-title { + text-align: center; + text-style: bold; + margin-bottom: 1; + } + + .field-label { + margin-top: 1; + } + + .field-row { + height: auto; + layout: horizontal; + } + + #port-select { + width: 1fr; + } + + #refresh-btn { + width: auto; + min-width: 12; + } + + #dir-input { + width: 1fr; + } + + #browse-btn { + width: auto; + min-width: 12; + } + + #connect-btn { + margin-top: 2; + } + + #no-ports-label { + color: $warning; + } + """ + + BINDINGS = [ + Binding('q', 'quit', 'Quit'), + Binding('Q', 'quit', show=False), + Binding('ctrl+c', 'quit', show=False, priority=True), + ] + + def __init__(self, default_log_dir: Path | None = None) -> None: + super().__init__() + self._default_log_dir = default_log_dir or Path.cwd() + + def compose(self) -> ComposeResult: + ports = list_serial_ports() + port_options = [(p, p) for p in ports] + baud_options = [(str(b), b) for b in BAUD_RATES] + + with Vertical(id='launch-container'): + yield Label('BLE Log Console Setup', id='launch-title') + + yield Label('Serial Port', classes='field-label') + with Vertical(classes='field-row'): + if port_options: + yield Select(port_options, value=ports[0], id='port-select') + else: + yield Select([], id='port-select', prompt='No ports detected') + yield Label('No serial ports detected', id='no-ports-label') + yield Button('Refresh', id='refresh-btn') + + yield Label('Baud Rate', classes='field-label') + yield Select(baud_options, value=DEFAULT_BAUD_RATE, id='baud-select') + + yield Label('Log Directory', classes='field-label') + with Vertical(classes='field-row'): + yield Input(str(self._default_log_dir), id='dir-input') + yield Button('Browse...', id='browse-btn') + + with Center(): + yield Button('Connect', variant='primary', id='connect-btn') + + @on(Button.Pressed, '#refresh-btn') + def refresh_ports(self) -> None: + """Re-scan serial ports and update the Select widget.""" + ports = list_serial_ports() + port_options = [(p, p) for p in ports] + port_select = self.query_one('#port-select', Select) + port_select.set_options(port_options) + if ports: + port_select.value = ports[0] + + @on(Button.Pressed, '#browse-btn') + @work + async def browse_directory(self) -> None: + """Open a directory picker dialog.""" + current = self.query_one('#dir-input', Input).value + start = Path(current) if current else Path.cwd() + if not start.is_dir(): + start = Path.cwd() + chosen = await self.app.push_screen_wait(SelectDirectory(location=start)) + if chosen is not None: + self.query_one('#dir-input', Input).value = str(chosen) + + @on(Button.Pressed, '#connect-btn') + def connect(self) -> None: + """Validate and return config.""" + port_select = self.query_one('#port-select', Select) + baud_select = self.query_one('#baud-select', Select) + dir_input = self.query_one('#dir-input', Input) + + if port_select.value is Select.BLANK: + self.notify('Please select a serial port', severity='error') + return + + if baud_select.value is Select.BLANK: + self.notify('Please select a baud rate', severity='error') + return + + log_dir = Path(dir_input.value) + + config = LaunchConfig( + port=str(port_select.value), + baudrate=int(baud_select.value), # type: ignore[arg-type] # guarded above + log_dir=log_dir, + ) + self.dismiss(config) + + def action_quit(self) -> None: + self.dismiss(None) diff --git a/tools/bt/ble_log_console/src/frontend/log_view.py b/tools/bt/ble_log_console/src/frontend/log_view.py new file mode 100644 index 0000000000..9bd7309f1c --- /dev/null +++ b/tools/bt/ble_log_console/src/frontend/log_view.py @@ -0,0 +1,47 @@ +# SPDX-FileCopyrightText: 2026 Espressif Systems (Shanghai) CO LTD +# SPDX-License-Identifier: Apache-2.0 + +"""Scrollable log view widget. + +See Spec Section 11. +""" + +from rich.text import Text +from textual.widgets import RichLog + + +class LogView(RichLog): + DEFAULT_CSS = """ + LogView { + height: 1fr; + } + """ + + def __init__(self) -> None: + super().__init__(highlight=False, markup=True, wrap=True, auto_scroll=True) + + def _write_tagged(self, tag: str, color: str, text: str) -> None: + t = Text.from_markup(f'[dim][{color}]\\[{tag}][/{color}] [/dim]') + t.append(text) + self.write(t) + + def write_info(self, text: str) -> None: + self._write_tagged('INFO', 'green', text) + + def write_warning(self, text: str) -> None: + self._write_tagged('WARN', 'yellow', text) + + def write_error(self, text: str) -> None: + self._write_tagged('ERROR', 'red', text) + + def write_sync(self, text: str) -> None: + self._write_tagged('SYNC', 'cyan', text) + + def write_enh_stat(self, text: str) -> None: + self._write_tagged('ENH_STAT', 'cyan', text) + + def write_traffic(self, text: str) -> None: + self._write_tagged('TRAFFIC', 'magenta', text) + + def write_ascii(self, text: str) -> None: + self.write(Text(text)) diff --git a/tools/bt/ble_log_console/src/frontend/shortcut_screen.py b/tools/bt/ble_log_console/src/frontend/shortcut_screen.py new file mode 100644 index 0000000000..db7dda2c5c --- /dev/null +++ b/tools/bt/ble_log_console/src/frontend/shortcut_screen.py @@ -0,0 +1,65 @@ +# SPDX-FileCopyrightText: 2026 Espressif Systems (Shanghai) CO LTD +# SPDX-License-Identifier: Apache-2.0 + +"""Modal screen showing available keyboard shortcuts. + +Pushed by the 'h' keybinding; dismissed by Escape or 'h' again. +""" + +from rich.table import Table +from textual.app import ComposeResult +from textual.binding import Binding +from textual.screen import ModalScreen +from textual.widgets import Static + +_SHORTCUTS = [ + ('q', 'Quit'), + ('c', 'Clear log'), + ('s', 'Toggle auto-scroll'), + ('d', 'Frame statistics'), + ('h', 'This help screen'), + ('r', 'Reset chip'), +] + + +def _build_shortcut_table() -> Table: + """Build a Rich Table listing all keyboard shortcuts.""" + table = Table(title='Keyboard Shortcuts', expand=True) + table.add_column('Key', style='cyan', no_wrap=True) + table.add_column('Action') + + for key, action in _SHORTCUTS: + table.add_row(key, action) + + return table + + +class ShortcutScreen(ModalScreen): + """Modal overlay showing available keyboard shortcuts.""" + + DEFAULT_CSS = """ + ShortcutScreen { + align: center middle; + } + + ShortcutScreen > Static { + width: 60; + max-height: 80%; + background: $surface; + padding: 1 2; + border: thick $accent; + } + """ + + BINDINGS = [ + Binding('escape', 'dismiss', 'Close'), + Binding('h', 'dismiss', 'Close'), + Binding('H', 'dismiss', show=False), + ] + + def compose(self) -> ComposeResult: + table = _build_shortcut_table() + content = Static() + content.update(table) + yield content + yield Static('[dim]Press Escape to return[/dim]') diff --git a/tools/bt/ble_log_console/src/frontend/stats_screen.py b/tools/bt/ble_log_console/src/frontend/stats_screen.py new file mode 100644 index 0000000000..0170dacea0 --- /dev/null +++ b/tools/bt/ble_log_console/src/frontend/stats_screen.py @@ -0,0 +1,157 @@ +# SPDX-FileCopyrightText: 2026 Espressif Systems (Shanghai) CO LTD +# SPDX-License-Identifier: Apache-2.0 + +"""Modal screen for per-source frame statistics display. + +Pushed by the 'd' keybinding; dismissed by Escape or 'd' again. +Refreshes every second to show live throughput data. +""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +from rich.table import Table +from rich.text import Text +from textual.app import ComposeResult +from textual.binding import Binding +from textual.containers import Vertical +from textual.screen import ModalScreen +from textual.widgets import Static + +from src.backend.models import FunnelSnapshot +from src.backend.models import format_bytes +from src.backend.models import format_throughput +from src.backend.models import resolve_source_name + +if TYPE_CHECKING: + from src.app import BLELogApp + +REFRESH_INTERVAL_SEC = 1.0 + + +def _fmt_frames(n: int) -> str: + return str(n) if n > 0 else '-' + + +def _fmt_loss_frames(n: int) -> Text: + if n == 0: + return Text('-') + return Text(str(n), style='red') + + +def _fmt_loss_bytes(n: int) -> Text: + if n == 0: + return Text('-') + return Text(format_bytes(n), style='red') + + +def _build_firmware_table(snapshots: list[FunnelSnapshot]) -> Table: + table = Table(title='Firmware Counters (since chip init)', expand=True) + table.add_column('Source', style='cyan', no_wrap=True, min_width=12, max_width=16) + table.add_column('Written\nFrames', justify='right', min_width=10, max_width=12) + table.add_column('Written\nBytes', justify='right', min_width=10, max_width=12) + table.add_column('Buffer Loss\nFrames', justify='right', min_width=12, max_width=14) + table.add_column('Buffer Loss\nBytes', justify='right', min_width=12, max_width=14) + + for snap in snapshots: + table.add_row( + resolve_source_name(snap.source), + _fmt_frames(snap.written.frames), + format_bytes(snap.written.bytes) if snap.written.bytes > 0 else '-', + _fmt_loss_frames(snap.buffer_loss.frames), + _fmt_loss_bytes(snap.buffer_loss.bytes), + ) + + return table + + +def _build_console_table(snapshots: list[FunnelSnapshot]) -> Table: + table = Table(title='Console Measurements (since console start)', expand=True) + table.add_column('Source', style='cyan', no_wrap=True, min_width=12, max_width=16) + table.add_column('Received\nFrames', justify='right', min_width=10, max_width=12) + table.add_column('Received\nBytes', justify='right', min_width=10, max_width=12) + table.add_column('Average\nFrames/s', justify='right', style='magenta', min_width=10, max_width=12) + table.add_column('Average\nBytes/s', justify='right', style='magenta', min_width=10, max_width=12) + table.add_column('Peak\nFrames/10ms', justify='right', style='magenta', min_width=12, max_width=14) + table.add_column('Peak\nBytes/s', justify='right', style='magenta', min_width=12, max_width=14) + + for snap in snapshots: + tp_fps = snap.throughput.throughput_fps + tp_bps = snap.throughput.throughput_bps + pf = snap.throughput.peak_write_frames + pb = snap.throughput.peak_write_bytes + wms = snap.throughput.peak_window_ms + + table.add_row( + resolve_source_name(snap.source), + _fmt_frames(snap.received.frames), + format_bytes(snap.received.bytes) if snap.received.bytes > 0 else '-', + f'{tp_fps:.0f}' if tp_fps > 0 else '-', + format_throughput(tp_bps) if tp_bps > 0 else '-', + f'{pf}' if pf > 0 else '-', + format_throughput(pb * 1000 / wms) if pf > 0 and wms > 0 else '-', + ) + + return table + + +class StatsScreen(ModalScreen): + """Modal overlay showing per-source frame statistics with live refresh.""" + + DEFAULT_CSS = """ + StatsScreen { + align: center middle; + } + + #stats-container { + width: 90%; + max-width: 140; + height: auto; + max-height: 80%; + overflow-y: auto; + background: $surface; + padding: 1 2; + border: thick $accent; + } + + #stats-container > Static { + height: auto; + } + """ + + BINDINGS = [ + Binding('escape', 'dismiss', 'Close'), + Binding('d', 'dismiss', 'Close'), + ] + + def __init__(self, start_time: float) -> None: + super().__init__() + self._start_time = start_time + + def _get_app(self) -> BLELogApp: + return self.app # type: ignore[return-value] + + def compose(self) -> ComposeResult: + with Vertical(id='stats-container'): + yield Static(id='firmware-table') + yield Static(id='console-table') + yield Static('[dim]Press Escape to return — refreshes every 1s[/dim]') + + def on_mount(self) -> None: + self._refresh_table() + self.set_interval(REFRESH_INTERVAL_SEC, self._refresh_table) + + def _refresh_table(self) -> None: + app = self._get_app() + snapshots = app.funnel_snapshots + + fw = self.query_one('#firmware-table', Static) + cs = self.query_one('#console-table', Static) + if not snapshots: + fw.update('No data received yet.\n\nPress Escape to return.') + cs.update('') + return + + fw.update(_build_firmware_table(snapshots)) + cs.update(_build_console_table(snapshots)) diff --git a/tools/bt/ble_log_console/src/frontend/status_panel.py b/tools/bt/ble_log_console/src/frontend/status_panel.py new file mode 100644 index 0000000000..0829639edd --- /dev/null +++ b/tools/bt/ble_log_console/src/frontend/status_panel.py @@ -0,0 +1,69 @@ +# SPDX-FileCopyrightText: 2026 Espressif Systems (Shanghai) CO LTD +# SPDX-License-Identifier: Apache-2.0 + +"""Status panel widget — docked to bottom, shows live stats. + +See Spec Section 11. +""" + +from rich.text import Text +from textual.reactive import reactive +from textual.widget import Widget + +from src.backend.models import FrameStats +from src.backend.models import SyncState +from src.backend.models import format_bytes +from src.backend.models import format_throughput +from src.backend.stats import UART_BITS_PER_BYTE + + +def _format_speed(bps: float) -> str: + return format_throughput(bps / UART_BITS_PER_BYTE) + + +_SYNC_COLORS = { + SyncState.SEARCHING: 'yellow', + SyncState.CONFIRMING_SYNC: 'cyan', + SyncState.SYNCED: 'green', + SyncState.CONFIRMING_LOSS: 'red', +} + + +class StatusPanel(Widget): + DEFAULT_CSS = """ + StatusPanel { + dock: bottom; + height: 3; + border-top: solid $accent; + padding: 0 1; + } + """ + + stats: reactive[FrameStats] = reactive(FrameStats) + disconnected: reactive[bool] = reactive(False) + + def render(self) -> Text: + s = self.stats + if self.disconnected: + line1 = '[bold red]DISCONNECTED[/bold red]' + line2 = 'Backend stopped — serial connection closed' + return Text.from_markup(f'{line1}\n{line2}') + sync_color = _SYNC_COLORS.get(s.sync_state, 'white') + + if s.checksum_algorithm and s.checksum_scope: + cksum_str = f' | Checksum: {s.checksum_algorithm.value} / {s.checksum_scope.value}' + else: + cksum_str = '' + line1 = f'Sync: [{sync_color}]{s.sync_state.value}[/{sync_color}]{cksum_str} | Press [bold]h[/bold] for help' + t = s.transport + loss = s.loss + loss_style = 'red' if loss.total_frames > 0 else 'yellow' + line2 = ( + f'RX: {format_bytes(t.rx_bytes)} ' + f'Speed: {_format_speed(t.bps)} ' + f'Max: {_format_speed(t.max_bps)} ' + f'Rate: {t.fps:.0f} fps ' + f'[{loss_style}]Lost: {loss.total_frames} frames, {format_bytes(loss.total_bytes)}[/{loss_style}]' + ) + + return Text.from_markup(f'{line1}\n{line2}')