diff --git a/components/app_trace/host_file_io.c b/components/app_trace/host_file_io.c index 6c0d2742ef..5221b3f4dc 100644 --- a/components/app_trace/host_file_io.c +++ b/components/app_trace/host_file_io.c @@ -15,9 +15,10 @@ // * Operation arguments. See file operation helper structures below. #include +#include #include "esp_app_trace.h" - #include "esp_log.h" + const static char *TAG = "esp_host_file_io"; #define ESP_APPTRACE_FILE_CMD_FOPEN 0x0 @@ -77,25 +78,56 @@ typedef struct { void *file; } esp_apptrace_ftell_args_t; +#define ESP_APPTRACE_FILE_RECV_TIMEOUT_US 100000 // 100ms +#define ESP_APPTRACE_FRAME_STX 0x02 +#define ESP_APPTRACE_FRAME_ETX 0x03 +/** + * Send a file command to the host. + * For the UART destination, the command is wrapped in a frame protocol. + * The frame protocol is as follows: + * - STX (0x02) + * - Length of the command and arguments (2 bytes) + * - Command ID (1 byte) + * - Arguments (variable length) + * - ETX (0x03) + */ static esp_err_t esp_apptrace_file_cmd_send(uint8_t cmd, void (*prep_args)(uint8_t *, void *), void *args, uint32_t args_len) { - esp_err_t ret; - esp_apptrace_fcmd_hdr_t *hdr; + const bool is_uart = esp_apptrace_get_destination() == ESP_APPTRACE_DEST_UART; + const size_t payload_off = is_uart ? 3u : 0u; // STX + LEN + const size_t payload_len = sizeof(cmd) + (size_t)args_len; // CMD + ARGS + const size_t total_len = payload_off + payload_len + (is_uart ? 1u /*ETX*/ : 0u); + size_t inx = 0; - ESP_EARLY_LOGV(TAG, "%s %d", __func__, cmd); - uint8_t *ptr = esp_apptrace_buffer_get(sizeof(*hdr) + args_len, ESP_APPTRACE_TMO_INFINITE); //TODO: finite tmo - if (ptr == NULL) { + if (is_uart && payload_len > 0xFFFFu) { + return ESP_ERR_INVALID_SIZE; + } + + ESP_EARLY_LOGV(TAG, "%s cmd:%d len:%d", __func__, cmd, total_len); + uint8_t *buf = esp_apptrace_buffer_get(total_len, ESP_APPTRACE_TMO_INFINITE); + if (!buf) { return ESP_ERR_NO_MEM; } - hdr = (esp_apptrace_fcmd_hdr_t *)ptr; - hdr->cmd = cmd; - if (prep_args) { - prep_args(ptr + sizeof(hdr->cmd), args); + if (is_uart) { + buf[inx++] = ESP_APPTRACE_FRAME_STX; + buf[inx++] = (uint8_t)(payload_len & 0xFF); + buf[inx++] = (uint8_t)((payload_len >> 8) & 0xFF); + } + + // Payload: CMD then ARGS + buf[inx++] = cmd; + if (args_len) { + prep_args(&buf[inx], args); + inx += args_len; + } + + if (is_uart) { + buf[inx++] = ESP_APPTRACE_FRAME_ETX; } // now indicate that this buffer is ready to be sent off to host - ret = esp_apptrace_buffer_put(ptr, ESP_APPTRACE_TMO_INFINITE);//TODO: finite tmo + esp_err_t ret = esp_apptrace_buffer_put(buf, ESP_APPTRACE_TMO_INFINITE);//TODO: finite tmo if (ret != ESP_OK) { ESP_EARLY_LOGE(TAG, "Failed to put apptrace buffer (%d)!", ret); return ret; @@ -115,7 +147,7 @@ static esp_err_t esp_apptrace_file_rsp_recv(uint8_t *buf, uint32_t buf_len) uint32_t tot_rd = 0; while (tot_rd < buf_len) { uint32_t rd_size = buf_len - tot_rd; - esp_err_t ret = esp_apptrace_read(buf + tot_rd, &rd_size, ESP_APPTRACE_TMO_INFINITE); //TODO: finite tmo + esp_err_t ret = esp_apptrace_read(buf + tot_rd, &rd_size, ESP_APPTRACE_FILE_RECV_TIMEOUT_US); if (ret != ESP_OK) { ESP_EARLY_LOGE(TAG, "Failed to read (%d)!", ret); return ret; diff --git a/components/app_trace/port/port_uart.c b/components/app_trace/port/port_uart.c index 3cc84c1b70..ee73f29111 100644 --- a/components/app_trace/port/port_uart.c +++ b/components/app_trace/port/port_uart.c @@ -260,8 +260,6 @@ static esp_err_t esp_apptrace_uart_init(void *hw_data, const esp_apptrace_config /* Initialize UART HAL (sets default 8N1 mode) */ uart_hal_init(&uart_data->hal_ctx, uart_config->uart_num); - ESP_LOGI(TAG, "uart_hal_init: %d", uart_config->uart_num); - HP_UART_SRC_CLK_ATOMIC() { uart_hal_set_sclk(&uart_data->hal_ctx, UART_SCLK_DEFAULT); uart_hal_set_baudrate(&uart_data->hal_ctx, uart_config->baud_rate, sclk_hz); @@ -429,13 +427,25 @@ static uint8_t *esp_apptrace_uart_down_buffer_get(void *hw_data, uint32_t *size, return NULL; } - uint32_t rx_len = uart_ll_get_rxfifo_len(uart_data->hal_ctx.dev); - int to_read = MIN(rx_len, MIN(uart_data->rx_msg_buff_size, *size)); - if (to_read) { - uart_hal_read_rxfifo(&uart_data->hal_ctx, uart_data->rx_msg_buff, &to_read); - } - *size = to_read; + /* Read until we get the requested number of bytes (or timeout) */ + const uint32_t req_size = MIN(uart_data->rx_msg_buff_size, *size); + uint32_t total_read = 0; + while (total_read < req_size) { + uint32_t rx_len = uart_hal_get_rxfifo_len(&uart_data->hal_ctx); + int to_read = MIN((uint32_t)(req_size - total_read), rx_len); + if (to_read > 0) { + uart_hal_read_rxfifo(&uart_data->hal_ctx, uart_data->rx_msg_buff + total_read, &to_read); + total_read += to_read; + continue; + } + if (esp_apptrace_tmo_check(tmo) != ESP_OK) { + break; + } + esp_rom_delay_us(50); + } + + *size = total_read; esp_apptrace_uart_unlock(uart_data); return (*size > 0) ? uart_data->rx_msg_buff : NULL; diff --git a/conftest.py b/conftest.py index 1797992b00..91cd649514 100644 --- a/conftest.py +++ b/conftest.py @@ -222,13 +222,15 @@ class OpenOCD: else: raise ConnectionRefusedError - def write(self, s: str) -> t.Any: + def write(self, s: str, timeout: int = 30) -> t.Any: if self.telnet is None: logging.error('Telnet connection is not established.') return '' resp = self.telnet.read_very_eager() self.telnet.write(to_bytes(s, '\n')) - resp += self.telnet.read_until(b'>') + resp += self.telnet.read_until(b'>', timeout=timeout) + if not resp.endswith(b'>'): + return '' return to_str(resp) def apptrace_wait_stop(self, timeout: int = 30) -> None: @@ -244,6 +246,17 @@ class OpenOCD: raise pexpect.TIMEOUT('Failed to wait for apptrace stop!') time.sleep(1) + def gcov_dump(self, on_the_fly: bool = True) -> t.Any: + cmd = 'esp gcov' + if not on_the_fly: + cmd += ' dump' + cmd_out = self.write(cmd) + if 'Targets connected.' not in cmd_out: + raise pexpect.TIMEOUT('Failed to start gcov dump!') + if 'Targets disconnected.' not in cmd_out: + raise pexpect.TIMEOUT('Failed to stop gcov dump!') + return cmd_out + def kill(self) -> None: # Check if the process is still running if self.proc and self.proc.isalive(): diff --git a/examples/system/gcov/main/gcov_example_main.c b/examples/system/gcov/main/gcov_example_main.c index 5a754a25b9..2eddb7392d 100644 --- a/examples/system/gcov/main/gcov_example_main.c +++ b/examples/system/gcov/main/gcov_example_main.c @@ -24,6 +24,20 @@ static const char *TAG = "example"; void blink_dummy_func(void); void some_dummy_func(void); +#if !CONFIG_APPTRACE_DEST_JTAG +#include "soc/uart_pins.h" +#include "esp_app_trace.h" +/* Override default uart config to use console pins as a uart channel */ +esp_apptrace_config_t esp_apptrace_get_user_params(void) +{ + esp_apptrace_config_t config = APPTRACE_UART_CONFIG_DEFAULT(); + config.dest_cfg.uart.uart_num = 0; + config.dest_cfg.uart.tx_pin_num = U0TXD_GPIO_NUM; + config.dest_cfg.uart.rx_pin_num = U0RXD_GPIO_NUM; + return config; +} +#endif + static void blink_task(void *pvParameter) { ESP_LOGI(TAG, "Ready for OpenOCD connection"); diff --git a/examples/system/gcov/pytest_gcov.py b/examples/system/gcov/pytest_gcov.py index 49e4be994e..d32398835e 100644 --- a/examples/system/gcov/pytest_gcov.py +++ b/examples/system/gcov/pytest_gcov.py @@ -1,6 +1,7 @@ -# SPDX-FileCopyrightText: 2022-2025 Espressif Systems (Shanghai) CO LTD +# SPDX-FileCopyrightText: 2022-2026 Espressif Systems (Shanghai) CO LTD # SPDX-License-Identifier: Unlicense OR CC0-1.0 import os.path +import sys import time import typing @@ -11,13 +12,58 @@ from pytest_embedded_idf.utils import idf_parametrize if typing.TYPE_CHECKING: from conftest import OpenOCD +try: + from gcov_capture import UartGcovCapture + from gcov_capture import get_coverage_data +except ImportError: + idf_path = os.getenv('IDF_PATH') + if not idf_path: + raise RuntimeError('IDF_PATH not found. Please run `source $IDF_PATH/export.sh`') + esp_app_trace_dir = os.path.join(idf_path, 'tools', 'esp_app_trace') + sys.path.insert(0, esp_app_trace_dir) + from gcov_capture import UartGcovCapture + from gcov_capture import get_coverage_data -def _test_gcov(openocd_dut: 'OpenOCD', dut: IdfDut) -> None: - # create the generated .gcda folder, otherwise would have error: failed to open file. - # normally this folder would be created via `idf.py build`. but in CI the non-related files would not be preserved + +def get_expected_gcda_paths(dut: IdfDut) -> list: + """Get list of expected .gcda file paths for this example.""" + return [ + os.path.join( + dut.app.binary_path, 'esp-idf', 'main', 'CMakeFiles', '__idf_main.dir', 'gcov_example_main.c.gcda' + ), + os.path.join( + dut.app.binary_path, 'esp-idf', 'main', 'CMakeFiles', '__idf_main.dir', 'gcov_example_func.c.gcda' + ), + os.path.join(dut.app.binary_path, 'esp-idf', 'sample', 'CMakeFiles', '__idf_sample.dir', 'some_funcs.c.gcda'), + ] + + +def prepare_test(dut: IdfDut) -> list: + """Prepare test environment: create directories and clean up old .gcda files. + + Returns list of expected .gcda file paths. + """ + # Create the generated .gcda folders + # Normally created via `idf.py build`, but in CI non-related files aren't preserved os.makedirs(os.path.join(dut.app.binary_path, 'esp-idf', 'main', 'CMakeFiles', '__idf_main.dir'), exist_ok=True) os.makedirs(os.path.join(dut.app.binary_path, 'esp-idf', 'sample', 'CMakeFiles', '__idf_sample.dir'), exist_ok=True) + # Get expected paths and clean up old files + expected_gcda_paths = get_expected_gcda_paths(dut) + for gcda_path in expected_gcda_paths: + if os.path.isfile(gcda_path): + try: + os.remove(gcda_path) + print(f'Removed old .gcda file: {os.path.basename(gcda_path)}') + except OSError as e: + print(f'Warning: Could not remove {gcda_path}: {e}') + + return expected_gcda_paths + + +def _test_gcov(openocd_dut: 'OpenOCD', dut: IdfDut) -> None: + expected_gcda_paths = prepare_test(dut) + def expect_counter_output(loop: int, timeout: int = 10) -> None: dut.expect_exact( [f'blink_dummy_func: Counter = {loop}', f'some_dummy_func: Counter = {loop * 2}'], @@ -25,8 +71,8 @@ def _test_gcov(openocd_dut: 'OpenOCD', dut: IdfDut) -> None: timeout=timeout, ) - def dump_coverage(cmd: str) -> None: - response = openocd.write(cmd) + def dump_coverage(on_the_fly: bool, expected_counts: dict | None = None) -> None: + response = openocd.gcov_dump(on_the_fly=on_the_fly) expect_lines = [ 'Targets connected.', @@ -47,6 +93,21 @@ def _test_gcov(openocd_dut: 'OpenOCD', dut: IdfDut) -> None: assert len(expect_lines) == 0 + # Verify execution counts if expected and coverage data is available + if expected_counts: + coverage = get_coverage_data(dut.app.binary_path, expected_gcda_paths) + if coverage: # Only verify if detailed data is available + print(f'Coverage data: {coverage}') + for func, expected_count in expected_counts.items(): + actual_count = coverage.get(func) + assert actual_count == expected_count, f'Expected {func}={expected_count}, got {actual_count}' + else: + # Backup verification: ensure .gcda files exist + print('Coverage data not available (gcov tool not found)') + for gcda_path in expected_gcda_paths: + assert os.path.isfile(gcda_path), f'Expected .gcda file not found: {gcda_path}' + print('Basic verification passed (all .gcda files exist)') + dut.expect_exact('example: Ready for OpenOCD connection', timeout=5) with openocd_dut.run() as openocd: openocd.write('reset run') @@ -55,32 +116,86 @@ def _test_gcov(openocd_dut: 'OpenOCD', dut: IdfDut) -> None: expect_counter_output(0) dut.expect('Ready to dump GCOV data...', timeout=5) - # Test two hard-coded dumps - dump_coverage('esp gcov dump') + # Test two hard-coded dumps with verification + dump_coverage(False, {'gcov_example_func.c:blink_dummy_func': 1, 'some_funcs.c:some_dummy_func': 1}) dut.expect('GCOV data have been dumped.', timeout=5) expect_counter_output(1) dut.expect('Ready to dump GCOV data...', timeout=5) - dump_coverage('esp gcov dump') + + dump_coverage(False, {'gcov_example_func.c:blink_dummy_func': 2, 'some_funcs.c:some_dummy_func': 2}) dut.expect('GCOV data have been dumped.', timeout=5) for i in range(2, 6): expect_counter_output(i) - for _ in range(3): + # Test instant run-time dumps with verification + expected_runtime_counts = [ + {'gcov_example_func.c:blink_dummy_func': 7, 'some_funcs.c:some_dummy_func': 7}, + {'gcov_example_func.c:blink_dummy_func': 8, 'some_funcs.c:some_dummy_func': 8}, + {'gcov_example_func.c:blink_dummy_func': 10, 'some_funcs.c:some_dummy_func': 10}, + ] + + for expected in expected_runtime_counts: time.sleep(1) - # Test instant run-time dump - dump_coverage('esp gcov') + dump_coverage(True, expected) @pytest.mark.jtag +@idf_parametrize('config', ['gcov_jtag'], indirect=['config']) @idf_parametrize('target', ['esp32', 'esp32c2', 'esp32s2'], indirect=['target']) def test_gcov(openocd_dut: 'OpenOCD', dut: IdfDut) -> None: _test_gcov(openocd_dut, dut) @pytest.mark.usb_serial_jtag +@idf_parametrize('config', ['gcov_jtag'], indirect=['config']) @idf_parametrize( 'target', ['esp32s3', 'esp32c3', 'esp32c5', 'esp32c6', 'esp32c61', 'esp32h2', 'esp32p4'], indirect=['target'] ) def test_gcov_usj(openocd_dut: 'OpenOCD', dut: IdfDut) -> None: _test_gcov(openocd_dut, dut) + + +def _test_gcov_uart(dut: IdfDut) -> None: + # Close console port to free up the port for the gcov capture + dut.serial.close() + + expected_gcda_paths = prepare_test(dut) + log_file = os.path.join(dut.logdir, 'gcov_uart.log') + uart_port = dut.serial.port + baud = dut.app.sdkconfig.get('APPTRACE_UART_BAUDRATE') + with UartGcovCapture(port=uart_port, baudrate=baud, log_file=log_file, log_level=1) as uart_capture: + uart_capture.run(background=True) + time.sleep(0.5) + + # Expected execution counts + expected_counts = [ + {'gcov_example_func.c:blink_dummy_func': 1, 'some_funcs.c:some_dummy_func': 1}, # First dump + {'gcov_example_func.c:blink_dummy_func': 2, 'some_funcs.c:some_dummy_func': 2}, # Second dump + ] + + # Verify each dump + for dump_num, expected in enumerate(expected_counts, start=1): + if uart_capture.wait_for_fstop(timeout=10.0): + coverage = get_coverage_data(dut.app.binary_path, expected_gcda_paths) + + if coverage: # Only verify details if coverage data is available + print(f'Coverage data: {coverage}') + for func, expected_count in expected.items(): + actual_count = coverage.get(func) + assert actual_count == expected_count, f'Expected {func}={expected_count}, got {actual_count}' + else: + # Backup verification: ensure .gcda files exist + print('Coverage data not available (gcov tool not found)') + for gcda_path in expected_gcda_paths: + assert os.path.isfile(gcda_path), f'Expected .gcda file not found: {gcda_path}' + print('Basic verification passed (all .gcda files exist)') + else: + assert False, f'esp_gcov_dump {dump_num} timeout' + + +@pytest.mark.generic +@idf_parametrize('config', ['gcov_uart'], indirect=['config']) +@idf_parametrize('target', ['supported_targets'], indirect=['target']) +def test_gcov_uart(dut: IdfDut) -> None: + _test_gcov_uart(dut) diff --git a/examples/system/gcov/sdkconfig.ci b/examples/system/gcov/sdkconfig.ci deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/examples/system/gcov/sdkconfig.ci.gcov_jtag b/examples/system/gcov/sdkconfig.ci.gcov_jtag new file mode 100644 index 0000000000..2e6a2c905a --- /dev/null +++ b/examples/system/gcov/sdkconfig.ci.gcov_jtag @@ -0,0 +1,4 @@ +CONFIG_APPTRACE_DEST_JTAG=y +CONFIG_APPTRACE_LOCK_ENABLE=y +CONFIG_APPTRACE_ONPANIC_HOST_FLUSH_TMO=-1 +CONFIG_APPTRACE_POSTMORTEM_FLUSH_THRESH=0 diff --git a/examples/system/gcov/sdkconfig.ci.gcov_uart b/examples/system/gcov/sdkconfig.ci.gcov_uart new file mode 100644 index 0000000000..287038190e --- /dev/null +++ b/examples/system/gcov/sdkconfig.ci.gcov_uart @@ -0,0 +1,5 @@ +CONFIG_ESP_CONSOLE_NONE=y +CONFIG_APPTRACE_DEST_UART=y +CONFIG_APPTRACE_DEST_UART_NUM=0 +CONFIG_APPTRACE_UART_BAUDRATE=1000000 +CONFIG_APPTRACE_UART_TX_MSG_SIZE=256 diff --git a/examples/system/gcov/sdkconfig.defaults b/examples/system/gcov/sdkconfig.defaults index bd9ffe56ca..3274cb0945 100644 --- a/examples/system/gcov/sdkconfig.defaults +++ b/examples/system/gcov/sdkconfig.defaults @@ -1,9 +1,4 @@ CONFIG_ESP_TRACE_ENABLE=y CONFIG_ESP_TRACE_LIB_NONE=y CONFIG_ESP_TRACE_TRANSPORT_APPTRACE=y -CONFIG_APPTRACE_DEST_JTAG=y -CONFIG_APPTRACE_LOCK_ENABLE=y -CONFIG_APPTRACE_ONPANIC_HOST_FLUSH_TMO=-1 -CONFIG_APPTRACE_POSTMORTEM_FLUSH_THRESH=0 - CONFIG_ESP_GCOV_ENABLE=y diff --git a/tools/ci/executable-list.txt b/tools/ci/executable-list.txt index 82763cedea..154e6940c4 100644 --- a/tools/ci/executable-list.txt +++ b/tools/ci/executable-list.txt @@ -86,6 +86,7 @@ tools/ci/test_autocomplete/test_autocomplete.py tools/ci/test_configure_ci_environment.sh tools/docker/entrypoint.sh tools/docs/gen_version_specific_includes.py +tools/esp_app_trace/gcov_capture.py tools/esp_app_trace/logtrace_proc.py tools/esp_app_trace/sysviewtrace_proc.py tools/esp_app_trace/test/logtrace/test.sh diff --git a/tools/esp_app_trace/gcov_capture.py b/tools/esp_app_trace/gcov_capture.py new file mode 100755 index 0000000000..ca31487828 --- /dev/null +++ b/tools/esp_app_trace/gcov_capture.py @@ -0,0 +1,837 @@ +#!/usr/bin/env python +# +# SPDX-FileCopyrightText: 2025-2026 Espressif Systems (Shanghai) CO LTD +# SPDX-License-Identifier: Apache-2.0 +# +# GCOV Data Capture Tool +# Captures data from UART serial port, logs raw bytes in hex format, and parses/executes +# file I/O commands (gcov host file protocol) on the fly. +# +# Usage: +# CLI mode: +# python gcov_capture.py -p /dev/tty.usbserial-101 -b 115200 -o gcov.log -l1 +# +# Python API for pytest (single dump): +# from gcov_capture import gcov_capture_from_uart +# +# def test_my_gcov(dut): +# dut.serial.close() # Free up the port if you use the same port with the dut.serial.port +# assert gcov_capture_from_uart( +# port=dut.serial.port, +# baudrate=dut.app.sdkconfig.get('APPTRACE_UART_BAUDRATE'), +# log_file='gcov.log' +# ), "GCOV dump timeout" +# # Create report with idf.py gcovr-report +# # idf.py gcovr-report # Uses default build dir +# # idf.py -B build_esp32c5_gcov_uart gcovr-report +# +# Python API for pytest (multiple dumps): +# from gcov_capture import UartGcovCapture +# +# def test_multiple_gcov_dumps(dut): +# dut.serial.close() # Free up the port if you use the same port with the dut.serial.port +# with UartGcovCapture( +# port=dut.serial.port, +# baudrate=dut.app.sdkconfig.get('APPTRACE_UART_BAUDRATE'), +# log_file='gcov.log', +# log_level=1 +# ) as capture: +# capture.run(background=True) +# time.sleep(0.5) +# # Wait for multiple dumps +# for i in range(3): +# assert capture.wait_for_fstop(timeout=30.0), f"Dump {i+1} timeout" +# print(f"Dump {i+1} complete, total: {capture.get_fstop_count()}") +# # Create report with idf.py gcovr-report +# # idf.py gcovr-report # Uses default build dir +# # idf.py -B build_esp32c5_gcov_uart gcovr-report +# +# Coverage verification helper (works for both JTAG and UART): +# from gcov_capture import get_coverage_data +# expected_gcda_paths = [...] # List of .gcda files under build dir +# coverage = get_coverage_data(dut.app.binary_path, expected_gcda_paths) +# print(coverage) + +import argparse +import gzip +import json +import logging +import os +import subprocess +import sys +import threading +import time +from collections.abc import Callable +from dataclasses import dataclass +from typing import Any +from typing import BinaryIO +from typing import TextIO +from typing import cast + +import serial + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser( + description='Capture GCOV UART data and process file I/O commands in real-time', + formatter_class=argparse.RawDescriptionHelpFormatter, + allow_abbrev=False, + ) + + parser.add_argument('-p', '--port', required=True, help='Serial port (e.g., /dev/ttyUSB0, COM3)') + parser.add_argument('-b', '--baudrate', type=int, required=True, help='Serial baudrate (e.g., 115200)') + parser.add_argument('-o', '--output', help='Optional log file path (only used if log-level > 0)') + parser.add_argument( + '-l', + '--log-level', + type=int, + default=0, + choices=[0, 1, 2], + help='Log level: 0=no logging, 1=REQ+RES+SUMMARY, 2=1+CONSOLE', + ) + + return parser.parse_args() + + +def _le_u32(data: bytes, offset: int) -> int: + return int.from_bytes(data[offset : offset + 4], byteorder='little', signed=False) + + +def _le_i32(data: bytes, offset: int) -> int: + return int.from_bytes(data[offset : offset + 4], byteorder='little', signed=True) + + +def _normalize_mode(mode: str) -> str | None: + s = set(mode) + base = s & {'r', 'w', 'a', 'x'} + + if len(base) != 1 or s - {'r', 'w', 'a', 'x', '+', 'b', 't'}: + return None + + return next(iter(base)) + 'b' + ('+' if '+' in s else '') + + +@dataclass +class CommandInfo: + """Metadata for a file I/O command.""" + + code: int + name: str + min_payload_len: int + needs_fd_validation: bool + handler: Callable | None = None + arg_extractor: Callable | None = None + + +class FileOpProcessor: + """Emulates the gcov file protocol operations like esp_gcov_process_data.""" + + # Common response codes (little-endian) + RESP_ZERO = (0).to_bytes(4, 'little') # Zero (success for most ops, error for fd/size) + RESP_ERROR_NEG1 = (-1).to_bytes(4, 'little', signed=True) # -1 (error for signed ops) + RESP_FWRITE_OK = (1).to_bytes(4, 'little') # 1 (FWRITE success) + + def __init__(self) -> None: + self.files: dict[int, BinaryIO] = {} + self.max_files = 1024 + self.next_fd = 1 + self.running = True + + # Unified command table with all metadata + self._commands: dict[int, CommandInfo] = { + 0x0: CommandInfo(0x0, 'FOPEN', 0, False, None, None), # Special handling + 0x1: CommandInfo(0x1, 'FCLOSE', 4, True, self._close, lambda p: (_le_u32(p, 0),)), + 0x2: CommandInfo(0x2, 'FWRITE', 4, True, self._write, lambda p: (_le_u32(p, 0), p[4:])), + 0x3: CommandInfo(0x3, 'FREAD', 8, True, self._read, lambda p: (_le_u32(p, 0), _le_u32(p, 4))), + 0x4: CommandInfo( + 0x4, 'FSEEK', 12, True, self._seek, lambda p: (_le_u32(p, 0), _le_i32(p, 4), _le_i32(p, 8)) + ), + 0x5: CommandInfo(0x5, 'FTELL', 4, True, self._tell, lambda p: (_le_u32(p, 0),)), + 0x6: CommandInfo(0x6, 'FSTOP', 0, False, None, None), # Special handling + 0x7: CommandInfo(0x7, 'FEOF', 4, True, self._feof, lambda p: (_le_u32(p, 0),)), + } + + def _alloc_fd(self) -> int: + if self.next_fd > self.max_files: + return 0 + fd = self.next_fd + self.next_fd += 1 + return fd + + def _get_file(self, fd: int) -> BinaryIO | None: + return self.files.get(fd) # type: ignore[return-value] + + def _validate_fd(self, fd: int) -> bool: + """Validate that FD is in valid range and exists.""" + if fd < 1 or fd > self.max_files: + return False + return fd in self.files + + def _close_all_files(self) -> None: + """Close all open files and reset the files dictionary.""" + for fd, handle in list(self.files.items()): + try: + handle.close() + except OSError: + pass # Ignore errors when closing + self.files.clear() + self.next_fd = 1 # Reset FD counter + + def _open(self, path: str, mode: str) -> tuple[str, bytes]: + if len(self.files) >= self.max_files: + return 'FOPEN failed: max files reached', self.RESP_ZERO + + fd = self._alloc_fd() + if fd == 0: + return 'FOPEN failed: fd alloc error', self.RESP_ZERO + + normalized = _normalize_mode(mode) + if not normalized: + return f"FOPEN path='{path}' invalid mode='{mode}'", self.RESP_ZERO + + try: + # Create parent dir for write/append/create modes + if normalized[0] in ('w', 'a', 'x'): + dir_path = os.path.dirname(path) + if dir_path: + try: + os.makedirs(dir_path, exist_ok=True) + except OSError: + pass # let open() raise the real error if it's still a problem + + # Open in binary mode (gcov files are always binary) + # Always use binary mode since gcov protocol works with binary data + if 'b' in normalized: + handle = cast(BinaryIO, open(path, normalized, buffering=0)) + else: + # Force binary mode even if not specified + handle = cast(BinaryIO, open(path, normalized[0] + 'b' + normalized[2:], buffering=0)) + self.files[fd] = handle + return f"FOPEN path='{path}' mode='{mode}' normalized='{normalized}' -> fd={fd}", fd.to_bytes(4, 'little') + except (OSError, ValueError) as e: + return f"FOPEN path='{path}' mode='{mode}' failed: {e}", self.RESP_ZERO + + def _close(self, fd: int) -> tuple[str, bytes]: + handle = self._get_file(fd) + if not handle: + return f'FCLOSE fd={fd} failed: not open', self.RESP_ERROR_NEG1 + try: + handle.close() + del self.files[fd] + return f'FCLOSE fd={fd} -> 0', self.RESP_ZERO + except OSError as e: + return f'FCLOSE fd={fd} failed: {e}', self.RESP_ERROR_NEG1 + + def _write(self, fd: int, payload: bytes) -> tuple[str, bytes]: + handle = self._get_file(fd) + if not handle: + return f'FWRITE fd={fd} failed: not open', self.RESP_ZERO + try: + written = handle.write(payload) + handle.flush() + fret = self.RESP_FWRITE_OK if written else self.RESP_ZERO + return ( + f'FWRITE fd={fd} len={len(payload)} -> {1 if written else 0}', + fret, + ) + except OSError as e: + return f'FWRITE fd={fd} failed: {e}', self.RESP_ZERO + + def _read(self, fd: int, length: int) -> tuple[str, bytes]: + handle = self._get_file(fd) + if not handle: + return f'FREAD fd={fd} failed: not open', self.RESP_ZERO + try: + data_bytes = handle.read(length) + if not isinstance(data_bytes, bytes): + # Should not happen with BinaryIO, but be defensive + data_bytes = bytes(data_bytes) + fret = len(data_bytes) + resp = fret.to_bytes(4, 'little') + data_bytes + preview = data_bytes[:16].hex(' ') + suffix = '' if len(data_bytes) <= 16 else ' ...' + return f'FREAD fd={fd} len={length} -> {fret} bytes "{preview}{suffix}"', resp + except OSError as e: + return f'FREAD fd={fd} failed: {e}', self.RESP_ZERO + + def _seek(self, fd: int, offset: int, whence: int) -> tuple[str, bytes]: + handle = self._get_file(fd) + if not handle: + return f'FSEEK fd={fd} failed: not open', self.RESP_ERROR_NEG1 + try: + fret = handle.seek(offset, whence) + return f'FSEEK fd={fd} off={offset} whence={whence} -> {fret}', fret.to_bytes(4, 'little', signed=True) + except OSError as e: + return f'FSEEK fd={fd} off={offset} whence={whence} failed: {e}', self.RESP_ERROR_NEG1 + + def _tell(self, fd: int) -> tuple[str, bytes]: + handle = self._get_file(fd) + if not handle: + return f'FTELL fd={fd} failed: not open', self.RESP_ERROR_NEG1 + try: + pos = handle.tell() + return f'FTELL fd={fd} -> {pos}', pos.to_bytes(4, 'little', signed=True) + except OSError as e: + return f'FTELL fd={fd} failed: {e}', self.RESP_ERROR_NEG1 + + def _feof(self, fd: int) -> tuple[str, bytes]: + handle = self._get_file(fd) + if not handle: + return f'FEOF fd={fd} failed: not open', self.RESP_ERROR_NEG1 + try: + cur = handle.tell() + handle.seek(0, 2) # end + end = handle.tell() + handle.seek(cur, 0) + eof = int(cur >= end) + return f'FEOF fd={fd} -> {eof}', eof.to_bytes(4, 'little', signed=True) + except OSError as e: + return f'FEOF fd={fd} failed: {e}', self.RESP_ERROR_NEG1 + + def process(self, data: bytes) -> tuple[str, bytes | None]: + """Process a single command buffer. Returns summary and optional response bytes.""" + if not data: + return 'Empty data', None + + cmd = data[0] + payload = data[1:] + + # Look up command info + cmd_info = self._commands.get(cmd) + if cmd_info is None: + return f'Unknown command 0x{cmd:02X} (len={len(data)})', None + + # Handle FOPEN specially. Extract path and mode from payload. + if cmd_info.name == 'FOPEN': + null_pos = payload.find(0) + if null_pos <= 0: + return 'FOPEN invalid (empty or missing path)', self.RESP_ZERO + path = payload[:null_pos].decode('utf-8', errors='replace') + mode_start = payload[null_pos + 1 :] + null_pos = mode_start.find(0) + mode = mode_start[:null_pos].decode('utf-8', errors='replace') if null_pos != -1 else '' + return self._open(path, mode) + + # Handle FSTOP specially (close all files, no response) + if cmd_info.name == 'FSTOP': + self._close_all_files() + return 'FSTOP (all files closed, continuing capture)', None + + # Handle other commands via unified command table + if len(payload) < cmd_info.min_payload_len: + return f'{cmd_info.name} invalid (too short)', None + + # Extract arguments using the extractor function + if cmd_info.arg_extractor is None: + return f'{cmd_info.name} has no arg extractor', None + args = cmd_info.arg_extractor(payload) + + if cmd_info.needs_fd_validation: + fd: int = args[0] # Type annotation for mypy + if not self._validate_fd(fd): + return f'{cmd_info.name} invalid fd={fd} (not open or out of range)', None + + # Call handler function + if cmd_info.handler is None: + return f'{cmd_info.name} has no handler', None + result: tuple[str, bytes | None] = cmd_info.handler(*args) + return result + + def _get_cmd_name(self, cmd: int) -> str: + """Get command name for error messages.""" + cmd_info = self._commands.get(cmd) + return cmd_info.name if cmd_info else f'0x{cmd:02X}' + + +def _parse_command(data: bytes, proc: FileOpProcessor) -> tuple[str, bytes | None]: + """Process a command buffer and return summary plus optional response bytes.""" + return proc.process(data) + + +class FrameParser: + """State machine parser for STX|LEN(2)|payload|ETX frames.""" + + STX = 0x02 + ETX = 0x03 + + # states + WAIT_STX = 0 + WAIT_LENL = 1 + WAIT_LENH = 2 + WAIT_PAYLOAD = 3 + WAIT_ETX = 4 + + # Timeout in seconds for incomplete frames (100ms) + FRAME_TIMEOUT = 0.1 + + def __init__(self) -> None: + self.state = self.WAIT_STX + self.length = 0 + self.payload = bytearray() + self.frame = bytearray() + self.state_timestamp = time.time() # Track when we entered current state + + def reset(self) -> None: + """Reset parser to initial state.""" + self.state = self.WAIT_STX + self.length = 0 + self.payload.clear() + self.frame.clear() + self.state_timestamp = time.time() + + def check_timeout(self) -> bytes | None: + """Check if parser has been stuck in a non-STX state for too long. + + Returns the incomplete frame bytes if timeout occurred (and resets parser), + or None if no timeout. + """ + # Only check timeout if we're not in WAIT_STX state + if self.state != self.WAIT_STX: + elapsed = time.time() - self.state_timestamp + if elapsed > self.FRAME_TIMEOUT: + # Timeout - save incomplete frame for caller to log + incomplete = bytes(self.frame) if self.frame else None + self.reset() + return incomplete + return None + + def feed(self, data: bytes) -> list[tuple[bytes, bytes]]: + """Feed bytes to the parser. + + Returns list of (frame, payload) for each complete frame found. + """ + out: list[tuple[bytes, bytes]] = [] + for byte in data: + result = self._process_byte(byte) + if result is not None: + out.append(result) + return out + + def _process_byte(self, byte: int) -> tuple[bytes, bytes] | None: + """Process a single byte. Returns (frame, payload) when frame is complete.""" + self.frame.append(byte) + + if self.state == self.WAIT_STX: + if byte == self.STX: + self.state = self.WAIT_LENL + self.payload.clear() + self.state_timestamp = time.time() # Update timestamp on state change + else: + # Not STX, reset and wait for next STX + self.reset() + + elif self.state == self.WAIT_LENL: + self.length = byte + self.state = self.WAIT_LENH + self.state_timestamp = time.time() + + elif self.state == self.WAIT_LENH: + self.length |= byte << 8 + self.payload.clear() + if self.length == 0: + # There is no payload, go directly to ETX + self.state = self.WAIT_ETX + else: + self.state = self.WAIT_PAYLOAD + self.state_timestamp = time.time() + + elif self.state == self.WAIT_PAYLOAD: + self.payload.append(byte) + if len(self.payload) >= self.length: + self.state = self.WAIT_ETX + self.state_timestamp = time.time() + + elif self.state == self.WAIT_ETX: + if byte == self.ETX: + # Frame complete! Save the result and reset for next frame + result = (bytes(self.frame), bytes(self.payload)) + self.reset() + return result + else: + # Expected ETX but got something else, reset + self.reset() + + return None + + +def _log_frame_to_file(file_desc: TextIO | None, log_level: int, frame: bytes, resp: bytes | None, summary: str) -> str: + """Write frame data to log file based on log level and return console response string. + + Args: + file_desc: Open file descriptor (or None to skip logging) + log_level: Log level (1=REQ+RES+summary, 2=1+CONSOLE) + frame: Complete frame bytes including STX/LEN/payload/ETX + resp: Response bytes (or None) + summary: Parse summary string + + Returns: + Console response string to append to print output + """ + if not file_desc or log_level <= 0: + return '' + + hex_chunk = ' '.join(f'{byte:02X}' for byte in frame) + resp_hex = resp.hex(' ').upper() if resp else '' + console_resp = '' + + if log_level >= 1: + file_desc.write(f'REQ: {hex_chunk}\r\n') + if resp: + file_desc.write(f'RES: {resp_hex}\n') + if summary: + file_desc.write(f' : {summary}\r\n') + + if log_level >= 2 and resp: + console_resp = f' | RES: {resp_hex}' + + file_desc.flush() + return console_resp + + +class UartGcovCapture: + """UART-based GCOV capture handler for both CLI and pytest-embedded-idf integration. + + Can run in either blocking mode (CLI) or background thread mode (pytest). + Processes gcov file I/O commands in real-time, sending responses immediately + to avoid blocking gcov rtio operations. + + Usage in CLI: + capture = UartGcovCapture(port='/dev/ttyUSB0', baudrate=115200, log_file='gcov.log', log_level=2) + capture.run(background=False) # Blocks until Ctrl+C + + Usage in pytest: + with UartGcovCapture(port='/dev/ttyUSB0', baudrate=115200, log_file='gcov.log', log_level=2) as capture: + capture.run(background=True) + # ... wait for FSTOP or perform test actions ... + if capture.wait_for_fstop(timeout=30): + print(f'Dump completed, count: {capture.get_fstop_count()}') + """ + + def __init__(self, port: str, baudrate: int = 115200, log_file: str | None = None, log_level: int = 2) -> None: + """Initialize UART GCOV capture. + + Args: + port: Serial port path (e.g., '/dev/ttyUSB0') + baudrate: Serial baud rate (default: 115200) + log_file: Optional path to log file for REQ/RES/SUMMARY output (default: None) + log_level: Log level: 0=nothing, 1=REQ+RES+SUMMARY, 2=1+CONSOLE (default: 0) + """ + self.port = port + self.baudrate = baudrate + self.log_file = log_file + self.log_level = log_level + self.ser: serial.Serial | None = None + self.capture_thread: threading.Thread | None = None + self.stop_event = threading.Event() + self.proc = FileOpProcessor() + self.parser = FrameParser() + self.logger = logging.getLogger(__name__) + self.file_desc: TextIO | None = None + self.fstop_count = 0 # Track number of FSTOP commands received + self.fstop_event = threading.Event() # Event to signal FSTOP received + self.total_bytes = 0 # Track total bytes captured + + def __enter__(self) -> 'UartGcovCapture': + """Context manager entry.""" + return self + + def __exit__(self, exception_type: Any, exception_value: Any, exception_traceback: Any) -> None: + """Context manager exit - stop capture and cleanup.""" + self.stop() + + def run(self, background: bool = True) -> 'UartGcovCapture': + """Start UART capture. + + Args: + background: If True, run in background thread (pytest mode). + If False, run in foreground blocking mode (CLI mode). + + Returns: + self for chaining + """ + if background: + return self._run_background() + else: + return self._run_foreground() + + def _run_background(self) -> 'UartGcovCapture': + if self.capture_thread is not None and self.capture_thread.is_alive(): + self.logger.warning('Capture thread already running') + return self + + self._open_serial_and_log() + + self.stop_event.clear() + self.capture_thread = threading.Thread(target=self._capture_loop, daemon=True, name='UartGcovCapture') + self.capture_thread.start() + self.logger.info('Started UART capture thread') + return self + + def _run_foreground(self) -> 'UartGcovCapture': + self._open_serial_and_log() + + self.logger.info('Started gcov data capture and processing... Press Ctrl+C to exit') + + try: + self._capture_loop() + except KeyboardInterrupt: + self.logger.info(f'Data capture interrupted. Captured {self.total_bytes} bytes.') + if self.log_file: + self.logger.info(f'Data saved to: {self.log_file}') + finally: + self.stop() + + return self + + def _open_serial_and_log(self) -> None: + """Open serial port and log file (common for both modes).""" + try: + self.ser = serial.Serial( + self.port, + self.baudrate, + timeout=0.1, # Short timeout for non-blocking reads + write_timeout=1, + inter_byte_timeout=None, # No timeout between bytes + ) + self.logger.info(f'Opened serial port: {self.port} at {self.baudrate} baud') + except serial.SerialException as e: + self.logger.error(f'Failed to open serial port {self.port}: {e}') + raise + + if self.log_file and self.log_level > 0: + try: + self.file_desc = open(self.log_file, 'w', encoding='ascii', buffering=8192) + self.logger.info(f'Opened log file: {self.log_file} (log level: {self.log_level})') + except OSError as e: + self.logger.warning(f'Failed to open log file {self.log_file}: {e}') + + def wait_for_fstop(self, timeout: float = 10.0) -> bool: + """Wait for an FSTOP command to be received. + + Args: + timeout: Maximum time to wait in seconds + + Returns: + True if FSTOP was received, False if timeout + """ + result = self.fstop_event.wait(timeout=timeout) + if result: + self.fstop_event.clear() # Clear after successful wait + return result + + def get_fstop_count(self) -> int: + """Get the number of FSTOP commands received so far.""" + return self.fstop_count + + def stop(self) -> None: + """Stop capture thread and close serial port.""" + if self.stop_event.is_set(): + return + + self.stop_event.set() + + # Close serial port first to unblock any blocking read in the capture thread + if self.ser is not None and self.ser.is_open: + try: + self.ser.close() + self.logger.info('Closed serial port') + except OSError: + pass + self.ser = None + + # Now wait for the thread to finish (should exit quickly after serial close) + if self.capture_thread is not None and self.capture_thread.is_alive(): + self.logger.info('Waiting for capture thread to stop...') + self.capture_thread.join(timeout=2.0) + if self.capture_thread.is_alive(): + self.logger.warning('Capture thread did not stop within timeout') + + # Close log file + if self.file_desc is not None: + try: + self.file_desc.flush() + self.file_desc.close() + self.logger.info('Closed log file') + except OSError: + pass + self.file_desc = None + + def _capture_loop(self) -> None: + """Main capture loop (runs in either foreground or background).""" + if self.ser is None: + return + + self.logger.info('Capture loop started') + try: + while not self.stop_event.is_set(): + # Check if serial port was closed (e.g., by stop()) + if self.ser is None or not self.ser.is_open: + break + + # Check for parser timeout (in case of incomplete frames) + incomplete_frame = self.parser.check_timeout() + if incomplete_frame is not None: + hex_bytes = incomplete_frame.hex(' ').upper() if incomplete_frame else '(empty)' + self.logger.warning(f'Parser timeout - incomplete frame discarded: {hex_bytes}') + + if self.ser.in_waiting > 0: + chunk = self.ser.read(self.ser.in_waiting) + frames = self.parser.feed(chunk) # Returns List[Tuple[bytes, bytes]] + + for frame, payload in frames: + summary, resp = _parse_command(payload, self.proc) + + # Check if this is an FSTOP command + if payload and payload[0] == 0x6: + self.fstop_count += 1 + self.fstop_event.set() # Signal that FSTOP was received + self.fstop_event.clear() # Clear for next FSTOP + + if resp and self.ser is not None and self.ser.is_open: + self.ser.write(resp) + self.ser.flush() + + self.total_bytes += len(frame) + + console_resp = _log_frame_to_file(self.file_desc, self.log_level, frame, resp, summary) + + if self.log_level > 1: + self.logger.info(f'Captured {self.total_bytes} bytes | {summary}{console_resp}') + else: + time.sleep(0.001) + + except serial.SerialException: + # Expected when serial port is closed during stop() + pass + except Exception as e: + self.logger.error(f'Unexpected error in capture loop: {e}') + finally: + self.logger.info('Capture loop ended') + + +def main() -> None: + logging.basicConfig( + level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s', datefmt='%Y-%m-%d %H:%M:%S' + ) + + args = parse_args() + + if args.log_level > 0 and not args.output: + print('Error: -o/--output is required when log-level > 0') + sys.exit(1) + + print(f'Opening serial port: {args.port} at {args.baudrate} baud') + if args.output: + print(f'Log file: {args.output} (log level: {args.log_level})') + else: + print('Log level 0: No file logging') + + try: + capture = UartGcovCapture( + port=args.port, baudrate=args.baudrate, log_file=args.output, log_level=args.log_level + ) + capture.run(background=False) + except Exception as e: + print(f'Unexpected error: {e}') + sys.exit(1) + + +def get_coverage_data(binary_path: str, expected_gcda_paths: list) -> dict: + """Extract coverage data from .gcda files using gcov tool. + + Returns a dictionary with function execution counts for verification. + If gcov tool is not available, returns empty dict after verifying files exist. + """ + desc_path = os.path.join(binary_path, 'project_description.json') + with open(desc_path, encoding='utf-8') as f: + project_desc = json.load(f) + + toolchain_prefix = project_desc.get('monitor_toolprefix', '') + gcov_tool = toolchain_prefix + 'gcov' + + try: + subprocess.run([gcov_tool, '--version'], capture_output=True, check=True, timeout=3) + except (subprocess.SubprocessError, FileNotFoundError): + print(f'gcov tool is not available ({gcov_tool}), skipping detailed coverage data processing') + for gcda_path in expected_gcda_paths: + if os.path.isfile(gcda_path): + print(f'✓ .gcda file exists: {os.path.basename(gcda_path)}') + return {} + + coverage_data = {} + + for gcda_path in expected_gcda_paths: + if not os.path.isfile(gcda_path): + continue + + gcda_dir = os.path.dirname(gcda_path) + + try: + subprocess.run( + [gcov_tool, '-j', os.path.basename(gcda_path)], + cwd=gcda_dir, + capture_output=True, + text=True, + check=True, + timeout=3, + ) + + basename = os.path.basename(gcda_path).replace('.gcda', '') + json_gz_file = os.path.join(gcda_dir, f'{basename}.gcov.json.gz') + + if os.path.isfile(json_gz_file): + with gzip.open(json_gz_file, 'rt', encoding='utf-8') as f: + gcov_data = json.load(f) + + for file_data in gcov_data.get('files', []): + source_file = os.path.basename(file_data.get('file', '')) + for function in file_data.get('functions', []): + func_name = function.get('name', '') + exec_count = function.get('execution_count', 0) + coverage_data[f'{source_file}:{func_name}'] = exec_count + + os.remove(json_gz_file) + + except (subprocess.SubprocessError, FileNotFoundError, json.JSONDecodeError) as e: + print(f'Warning: Could not extract coverage from {gcda_path}: {e}') + + return coverage_data + + +def gcov_capture_from_uart( + port: str, baudrate: int = 115200, log_file: str | None = None, log_level: int = 1, timeout: float = 10.0 +) -> bool: + """Convenience function to capture a single GCOV dump from UART and wait for FSTOP. + + This is a simple helper for pytest scripts that need to capture GCOV data via UART. + It starts capture, waits for one FSTOP command, then stops. + + Args: + port: Serial port path (e.g., '/dev/ttyUSB0') + baudrate: Serial baud rate (default: 115200) + log_file: Optional path to log file for REQ/RES/SUMMARY output + log_level: Log level: 0=nothing, 1=REQ+RES+SUMMARY, 2=1+CONSOLE (default: 1) + timeout: Maximum time to wait for FSTOP in seconds (default: 10.0) + + Returns: + True if FSTOP was received, False if timeout + + Example usage in pytest: + >>> from gcov_capture import gcov_capture_from_uart + >>> # Close DUT console + >>> dut.serial.close() + >>> # Trigger gcov dump on target via esp_gcov_dump() + >>> # Wait for dump to complete + >>> assert gcov_capture_from_uart( + ... port=dut.serial.port, baudrate=115200, log_file='gcov_dump.log', timeout=30.0 + ... ), 'GCOV dump timeout' + >>> # Check .gcda files were created + """ + with UartGcovCapture(port=port, baudrate=baudrate, log_file=log_file, log_level=log_level) as capture: + capture.run(background=True) + time.sleep(0.1) # Give capture a moment to start + return capture.wait_for_fstop(timeout=timeout) + + +if __name__ == '__main__': + main()