feat(ble_log_console): add buffer utilization tracker and display

Add BufUtilTracker to record per-LBM inflight peak data, wire it
through StatsAccumulator and StatsUpdated message for thread-safe
delivery to the UI. Buffer utilization is displayed in a dedicated
BufUtilScreen accessible via the 'm' keybinding, separate from
the frame stats screen ('d').

Also reject false INIT_DONE frames with version==0 caused by
misaligned data during parser sync, preventing spurious stats resets.
This commit is contained in:
Zhou Xiao
2026-03-23 18:29:37 +08:00
parent a11bde6fd0
commit 3236123cb4
5 changed files with 208 additions and 3 deletions
+31 -2
View File
@@ -25,6 +25,8 @@ from src.backend.models import FRAME_OVERHEAD
from src.backend.models import LL_TS_OFFSET
from src.backend.models import BackendStopped
from src.backend.models import BleLogSource
from src.backend.models import BufUtilEntry
from src.backend.models import BufUtilResult
from src.backend.models import EnhStatResult
from src.backend.models import FrameLossDetected
from src.backend.models import FunnelSnapshot
@@ -49,6 +51,7 @@ from src.backend.uart_transport import open_serial
from src.frontend.launch_screen import LaunchScreen
from src.frontend.log_view import LogView
from src.frontend.shortcut_screen import ShortcutScreen
from src.frontend.stats_screen import BufUtilScreen
from src.frontend.stats_screen import StatsScreen
from src.frontend.status_panel import StatusPanel
@@ -72,6 +75,8 @@ class BLELogApp(App):
Binding('S', 'toggle_scroll', show=False),
Binding('d', 'dump_stats', 'Stats'),
Binding('D', 'dump_stats', show=False),
Binding('m', 'show_buf_util', 'BufUtil'),
Binding('M', 'show_buf_util', show=False),
Binding('h', 'show_help', 'Help'),
Binding('H', 'show_help', show=False),
Binding('r', 'reset_chip', 'Reset'),
@@ -96,6 +101,7 @@ class BLELogApp(App):
# Console-side per-source received bytes (from StatsUpdated snapshots)
self._per_source_rx_bytes: dict[int, int] | None = None
self._funnel_snapshots: list[FunnelSnapshot] = []
self._buf_util_snapshots: list[BufUtilEntry] = []
# Wall-clock capture start (set when backend loop begins)
self._capture_start_time: float = 0.0
self._serial_lock = threading.Lock()
@@ -113,9 +119,12 @@ class BLELogApp(App):
@property
def funnel_snapshots(self) -> list[FunnelSnapshot]:
"""Public accessor for funnel snapshots (used by StatsScreen)."""
return self._funnel_snapshots
@property
def buf_util_snapshots(self) -> list[BufUtilEntry]:
return self._buf_util_snapshots
def _on_launch_result(self, config: LaunchConfig | None) -> None:
"""Handle Launch Screen dismissal."""
if config is None:
@@ -150,7 +159,8 @@ class BLELogApp(App):
checksum_mode=parser.checksum_mode,
)
funnel = stats.funnel_snapshot(elapsed)
self._post(StatsUpdated(snapshot, funnel))
buf_util = stats.buf_util_snapshot()
self._post(StatsUpdated(snapshot, funnel, buf_util))
return now
def _backend_loop(self) -> None:
@@ -231,6 +241,14 @@ class BLELogApp(App):
decoded = decode_internal_frame(item.payload)
if decoded:
int_src = decoded['int_src']
# Reject false INIT_DONE from misaligned data:
# real firmware always has version >= 1.
if int_src == InternalSource.INIT_DONE:
info = cast(InfoResult, decoded)
if info['version'] == 0:
continue
self._post(InternalFrameDecoded(int_src, decoded))
if int_src in (InternalSource.INIT_DONE, InternalSource.INFO):
@@ -260,6 +278,13 @@ class BLELogApp(App):
lost_bytes=new_bytes,
)
)
elif int_src == InternalSource.BUF_UTIL:
buf = cast(BufUtilResult, decoded)
stats.record_buf_util(
lbm_id=buf['lbm_id'],
trans_cnt=buf['trans_cnt'],
inflight_peak=buf['inflight_peak'],
)
# Decode UART redirect frames (raw ASCII, no os_ts prefix).
# A single log line may span multiple frames due to
@@ -318,6 +343,7 @@ class BLELogApp(App):
panel = self.query_one(StatusPanel)
panel.stats = msg.stats
self._funnel_snapshots = msg.funnel_snapshots
self._buf_util_snapshots = msg.buf_util_snapshots
# Preserve all-time per-source peak for the stats screen
if msg.stats.os_peak.max_per_source is not None:
self._max_per_source_peak = msg.stats.os_peak.max_per_source
@@ -373,6 +399,9 @@ class BLELogApp(App):
def action_dump_stats(self) -> None:
self.push_screen(StatsScreen(start_time=self._capture_start_time))
def action_show_buf_util(self) -> None:
self.push_screen(BufUtilScreen())
def action_show_help(self) -> None:
self.push_screen(ShortcutScreen())
@@ -6,6 +6,7 @@
from __future__ import annotations
from src.backend.models import BleLogSource
from src.backend.models import BufUtilEntry
from src.backend.models import ChecksumMode
from src.backend.models import FrameByteCount
from src.backend.models import FrameStats
@@ -13,10 +14,11 @@ from src.backend.models import FunnelSnapshot
from src.backend.models import SourceCode
from src.backend.models import SyncState
from src.backend.models import ThroughputInfo
from src.backend.stats.buf_util import BufUtilTracker
from src.backend.stats.firmware_loss import FirmwareLossTracker
from src.backend.stats.firmware_written import FirmwareWrittenTracker
from src.backend.stats.peak_burst import PeakBurstTracker
from src.backend.stats.peak_burst import WRITE_RATE_WINDOW_MS
from src.backend.stats.peak_burst import PeakBurstTracker
from src.backend.stats.sn_gap import SNGapTracker
from src.backend.stats.traffic_spike import TrafficSpikeDetector
from src.backend.stats.traffic_spike import TrafficSpikeResult
@@ -38,6 +40,7 @@ class StatsAccumulator:
self._fw_written = FirmwareWrittenTracker()
self._sn_gap = SNGapTracker()
self._traffic = TrafficSpikeDetector()
self._buf_util = BufUtilTracker()
self._per_source_received_frames: dict[SourceCode, int] = {}
self._per_source_received_bytes: dict[SourceCode, int] = {}
self._enh_stat_prev: dict[SourceCode, tuple[int, int, int, int]] = {}
@@ -85,6 +88,14 @@ class StatsAccumulator:
def check_traffic(self) -> TrafficSpikeResult | None:
return self._traffic.check()
# -- Buffer utilization ------------------------------------------------------
def record_buf_util(self, lbm_id: int, trans_cnt: int, inflight_peak: int) -> None:
self._buf_util.record(lbm_id, trans_cnt, inflight_peak)
def buf_util_snapshot(self) -> list[BufUtilEntry]:
return self._buf_util.snapshot() # type: ignore[no-any-return]
# -- Firmware ENH_STAT -------------------------------------------------------
def record_enh_stat(
@@ -131,6 +142,7 @@ class StatsAccumulator:
self._fw_written.reset()
self._enh_stat_prev.clear()
self._prev_written.clear()
self._buf_util.reset()
elif reason == 'flush':
# ENH_STAT-coupled: reset baselines only
self._fw_loss.reset_baselines()
@@ -0,0 +1,28 @@
# SPDX-FileCopyrightText: 2026 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Apache-2.0
from __future__ import annotations
from src.backend.models import BufUtilEntry
class BufUtilTracker:
def __init__(self) -> None:
self._entries: dict[int, BufUtilEntry] = {}
def record(self, lbm_id: int, trans_cnt: int, inflight_peak: int) -> None:
pool = (lbm_id >> 4) & 0x0F
index = lbm_id & 0x0F
self._entries[lbm_id] = BufUtilEntry(
lbm_id=lbm_id,
pool=pool,
index=index,
trans_cnt=trans_cnt,
inflight_peak=inflight_peak,
)
def reset(self) -> None:
self._entries.clear()
def snapshot(self) -> list[BufUtilEntry]:
return sorted(self._entries.values(), key=lambda e: (e.pool, e.index))
@@ -19,9 +19,12 @@ from textual.containers import Vertical
from textual.screen import ModalScreen
from textual.widgets import Static
from src.backend.models import BufUtilEntry
from src.backend.models import FunnelSnapshot
from src.backend.models import format_bytes
from src.backend.models import format_throughput
from src.backend.models import resolve_lbm_name
from src.backend.models import resolve_pool_name
from src.backend.models import resolve_source_name
if TYPE_CHECKING:
@@ -66,6 +69,34 @@ def _build_firmware_table(snapshots: list[FunnelSnapshot]) -> Table:
return table
def _build_buf_util_table(entries: list[BufUtilEntry]) -> Table:
table = Table(title='Buffer Utilization (since chip init)', expand=True)
table.add_column('Pool', style='cyan', no_wrap=True, min_width=12, max_width=16)
table.add_column('Idx', justify='right', min_width=4, max_width=6)
table.add_column('Name', style='cyan', no_wrap=True, min_width=10, max_width=14)
table.add_column('Peak', justify='right', min_width=6, max_width=8)
table.add_column('Total', justify='right', min_width=6, max_width=8)
table.add_column('Util%', justify='right', min_width=6, max_width=8)
for entry in entries:
if entry.trans_cnt > 0:
pct = entry.inflight_peak / entry.trans_cnt * 100
pct_text = Text(f'{pct:.0f}%', style='red' if pct >= 100 else '')
else:
pct_text = Text('-')
table.add_row(
resolve_pool_name(entry.pool),
str(entry.index),
resolve_lbm_name(entry.pool, entry.index),
str(entry.inflight_peak),
str(entry.trans_cnt),
pct_text,
)
return table
def _build_console_table(snapshots: list[FunnelSnapshot]) -> Table:
table = Table(title='Console Measurements (since console start)', expand=True)
table.add_column('Source', style='cyan', no_wrap=True, min_width=12, max_width=16)
@@ -155,3 +186,51 @@ class StatsScreen(ModalScreen):
fw.update(_build_firmware_table(snapshots))
cs.update(_build_console_table(snapshots))
class BufUtilScreen(ModalScreen):
DEFAULT_CSS = """
BufUtilScreen {
align: center middle;
}
#buf-util-container {
width: 80%;
max-width: 100;
height: auto;
max-height: 60%;
overflow-y: auto;
background: $surface;
padding: 1 2;
border: thick $accent;
}
#buf-util-container > Static {
height: auto;
}
"""
BINDINGS = [
Binding('escape', 'dismiss', 'Close'),
Binding('m', 'dismiss', 'Close'),
]
def _get_app(self) -> BLELogApp:
return self.app # type: ignore[return-value]
def compose(self) -> ComposeResult:
with Vertical(id='buf-util-container'):
yield Static(id='buf-util-table')
yield Static('[dim]Press Escape to return -- refreshes every 1s[/dim]')
def on_mount(self) -> None:
self._refresh_table()
self.set_interval(REFRESH_INTERVAL_SEC, self._refresh_table)
def _refresh_table(self) -> None:
entries = self._get_app().buf_util_snapshots
widget = self.query_one('#buf-util-table', Static)
if not entries:
widget.update('No buffer utilization data yet.\n\nPress Escape to return.')
return
widget.update(_build_buf_util_table(entries))
@@ -0,0 +1,57 @@
# SPDX-FileCopyrightText: 2026 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Apache-2.0
from src.backend.stats.buf_util import BufUtilTracker
class TestRecordAndSnapshot:
def test_single_entry(self) -> None:
tracker = BufUtilTracker()
tracker.record(lbm_id=0x00, trans_cnt=4, inflight_peak=3)
entries = tracker.snapshot()
assert len(entries) == 1
assert entries[0].lbm_id == 0x00
assert entries[0].pool == 0
assert entries[0].index == 0
assert entries[0].trans_cnt == 4
assert entries[0].inflight_peak == 3
class TestUpdateOverwrites:
def test_same_lbm_id_overwrites(self) -> None:
tracker = BufUtilTracker()
tracker.record(lbm_id=0x01, trans_cnt=4, inflight_peak=1)
tracker.record(lbm_id=0x01, trans_cnt=4, inflight_peak=3)
entries = tracker.snapshot()
assert len(entries) == 1
assert entries[0].inflight_peak == 3
class TestResetClears:
def test_reset_empties_tracker(self) -> None:
tracker = BufUtilTracker()
tracker.record(lbm_id=0x00, trans_cnt=4, inflight_peak=2)
tracker.record(lbm_id=0x10, trans_cnt=4, inflight_peak=1)
tracker.reset()
assert tracker.snapshot() == []
class TestMultipleLbms:
def test_multiple_lbm_ids_coexist(self) -> None:
tracker = BufUtilTracker()
tracker.record(lbm_id=0x00, trans_cnt=4, inflight_peak=1)
tracker.record(lbm_id=0x01, trans_cnt=4, inflight_peak=2)
tracker.record(lbm_id=0x10, trans_cnt=4, inflight_peak=3)
tracker.record(lbm_id=0x20, trans_cnt=4, inflight_peak=4)
entries = tracker.snapshot()
assert len(entries) == 4
def test_snapshot_sorted_by_pool_then_index(self) -> None:
tracker = BufUtilTracker()
tracker.record(lbm_id=0x21, trans_cnt=4, inflight_peak=2)
tracker.record(lbm_id=0x00, trans_cnt=4, inflight_peak=1)
tracker.record(lbm_id=0x20, trans_cnt=4, inflight_peak=4)
tracker.record(lbm_id=0x10, trans_cnt=4, inflight_peak=3)
entries = tracker.snapshot()
pools_and_indices = [(e.pool, e.index) for e in entries]
assert pools_and_indices == [(0, 0), (1, 0), (2, 0), (2, 1)]