diff --git a/tools/bt/ble_log_console/src/backend/internal_decoder.py b/tools/bt/ble_log_console/src/backend/internal_decoder.py index 01b36c27b3..874211df3f 100644 --- a/tools/bt/ble_log_console/src/backend/internal_decoder.py +++ b/tools/bt/ble_log_console/src/backend/internal_decoder.py @@ -9,6 +9,7 @@ See Spec Section 9. import struct +from src.backend.models import BufUtilResult from src.backend.models import EnhStatResult from src.backend.models import InfoResult from src.backend.models import InternalDecoderResult @@ -23,6 +24,9 @@ _INFO_STRUCT = struct.Struct(' InternalDecoderResult | None: """Decode an INTERNAL frame payload. @@ -74,4 +78,20 @@ def decode_internal_frame(payload: bytes) -> InternalDecoderResult | None: os_ts_ms=os_ts_ms, ) + if int_src == InternalSource.BUF_UTIL: + if len(sub_payload) < _BUF_UTIL_STRUCT.size: + return None + _, lbm_id, trans_cnt, inflight_peak = _BUF_UTIL_STRUCT.unpack_from(sub_payload, 0) + pool = (lbm_id >> 4) & 0x0F + index = lbm_id & 0x0F + return BufUtilResult( + int_src=int_src, + lbm_id=lbm_id, + pool=pool, + index=index, + trans_cnt=trans_cnt, + inflight_peak=inflight_peak, + os_ts_ms=os_ts_ms, + ) + return None diff --git a/tools/bt/ble_log_console/src/backend/models.py b/tools/bt/ble_log_console/src/backend/models.py index 652a82df05..6eb46e6373 100644 --- a/tools/bt/ble_log_console/src/backend/models.py +++ b/tools/bt/ble_log_console/src/backend/models.py @@ -117,6 +117,14 @@ class InternalSource(int, Enum): ENH_STAT = 2 INFO = 3 FLUSH = 4 + BUF_UTIL = 5 + + +class BufUtilPool(int, Enum): + COMMON_TASK = 0 + COMMON_ISR = 1 + LL = 2 + REDIR = 3 # --- Data classes --- @@ -154,6 +162,46 @@ class SourceStats: lost_bytes: int = 0 +@dataclass(frozen=True) +class BufUtilEntry: + """Single LBM buffer utilization snapshot.""" + + lbm_id: int + pool: int + index: int + trans_cnt: int + inflight_peak: int + + +# --- Buffer utilization name resolution --- + +_LBM_NAMES: dict[tuple[int, int], str] = { + (0, 0): 'spin', + (1, 0): 'spin', + (2, 0): 'll_task', + (2, 1): 'll_hci', + (3, 0): 'redir', +} + + +def resolve_pool_name(pool: int) -> str: + """Resolve pool code to BufUtilPool name, with fallback for unknown codes.""" + try: + return BufUtilPool(pool).name + except ValueError: + return f'POOL_{pool}' + + +def resolve_lbm_name(pool: int, index: int) -> str: + """Resolve pool + index to human-readable LBM name.""" + key = (pool, index) + if key in _LBM_NAMES: + return _LBM_NAMES[key] + if pool in (0, 1) and index >= 1: + return f'atomic[{index - 1}]' + return f'lbm_{pool}_{index}' + + @dataclass(slots=True) class TransportSnapshot: """Snapshot of transport-layer metrics for the current stats interval.""" @@ -265,7 +313,17 @@ class EnhStatResult(TypedDict): os_ts_ms: int -InternalDecoderResult = InfoResult | EnhStatResult +class BufUtilResult(TypedDict): + int_src: InternalSource + lbm_id: int + pool: int + index: int + trans_cnt: int + inflight_peak: int + os_ts_ms: int + + +InternalDecoderResult = InfoResult | EnhStatResult | BufUtilResult # --- Textual Messages (backend -> frontend) --- @@ -278,10 +336,16 @@ class SyncStateChanged(Message): class StatsUpdated(Message): - def __init__(self, stats: FrameStats, funnel_snapshots: list[FunnelSnapshot] | None = None) -> None: + def __init__( + self, + stats: FrameStats, + funnel_snapshots: list[FunnelSnapshot] | None = None, + buf_util_snapshots: list[BufUtilEntry] | None = None, + ) -> None: super().__init__() self.stats = stats self.funnel_snapshots = funnel_snapshots or [] + self.buf_util_snapshots = buf_util_snapshots or [] class InternalFrameDecoded(Message): diff --git a/tools/bt/ble_log_console/tests/test_internal_decoder.py b/tools/bt/ble_log_console/tests/test_internal_decoder.py index bbefdef4d3..e48892ef17 100644 --- a/tools/bt/ble_log_console/tests/test_internal_decoder.py +++ b/tools/bt/ble_log_console/tests/test_internal_decoder.py @@ -69,6 +69,38 @@ class TestUnknown: assert result is None +class TestBufUtil: + def test_decode_buf_util(self) -> None: + # lbm_id=0x01 (pool=0, index=1), trans_cnt=4, inflight_peak=3 + sub = b'\x01\x04\x03' + payload = _make_internal_payload(os_ts=7777, int_src=5, sub_payload=sub) + result = decode_internal_frame(payload) + assert result is not None + assert result['int_src'] == InternalSource.BUF_UTIL + assert result['lbm_id'] == 0x01 + assert result['pool'] == 0 + assert result['index'] == 1 + assert result['trans_cnt'] == 4 + assert result['inflight_peak'] == 3 + assert result['os_ts_ms'] == 7777 + + def test_buf_util_pool_index_extraction(self) -> None: + # lbm_id=0x21 -> pool=2 (LL), index=1 (ll_hci) + sub = b'\x21\x04\x02' + payload = _make_internal_payload(os_ts=0, int_src=5, sub_payload=sub) + result = decode_internal_frame(payload) + assert result is not None + assert result['pool'] == 2 + assert result['index'] == 1 + + def test_buf_util_too_short(self) -> None: + # Only 2 bytes of sub_payload (need 3 after int_src_code) + sub = b'\x00\x04' + payload = _make_internal_payload(os_ts=0, int_src=5, sub_payload=sub) + result = decode_internal_frame(payload) + assert result is None + + class TestMalformed: def test_too_short_payload(self) -> None: result = decode_internal_frame(b'\x00\x00\x00')