mirror of
https://github.com/espressif/esp-idf.git
synced 2026-04-27 19:13:21 +00:00
feat(examples): dump coverage over uart
This commit is contained in:
@@ -15,9 +15,10 @@
|
||||
// * Operation arguments. See file operation helper structures below.
|
||||
|
||||
#include <string.h>
|
||||
#include <stdbool.h>
|
||||
#include "esp_app_trace.h"
|
||||
|
||||
#include "esp_log.h"
|
||||
|
||||
const static char *TAG = "esp_host_file_io";
|
||||
|
||||
#define ESP_APPTRACE_FILE_CMD_FOPEN 0x0
|
||||
@@ -77,25 +78,56 @@ typedef struct {
|
||||
void *file;
|
||||
} esp_apptrace_ftell_args_t;
|
||||
|
||||
#define ESP_APPTRACE_FILE_RECV_TIMEOUT_US 100000 // 100ms
|
||||
#define ESP_APPTRACE_FRAME_STX 0x02
|
||||
#define ESP_APPTRACE_FRAME_ETX 0x03
|
||||
/**
|
||||
* Send a file command to the host.
|
||||
* For the UART destination, the command is wrapped in a frame protocol.
|
||||
* The frame protocol is as follows:
|
||||
* - STX (0x02)
|
||||
* - Length of the command and arguments (2 bytes)
|
||||
* - Command ID (1 byte)
|
||||
* - Arguments (variable length)
|
||||
* - ETX (0x03)
|
||||
*/
|
||||
static esp_err_t esp_apptrace_file_cmd_send(uint8_t cmd, void (*prep_args)(uint8_t *, void *), void *args, uint32_t args_len)
|
||||
{
|
||||
esp_err_t ret;
|
||||
esp_apptrace_fcmd_hdr_t *hdr;
|
||||
const bool is_uart = esp_apptrace_get_destination() == ESP_APPTRACE_DEST_UART;
|
||||
const size_t payload_off = is_uart ? 3u : 0u; // STX + LEN
|
||||
const size_t payload_len = sizeof(cmd) + (size_t)args_len; // CMD + ARGS
|
||||
const size_t total_len = payload_off + payload_len + (is_uart ? 1u /*ETX*/ : 0u);
|
||||
size_t inx = 0;
|
||||
|
||||
ESP_EARLY_LOGV(TAG, "%s %d", __func__, cmd);
|
||||
uint8_t *ptr = esp_apptrace_buffer_get(sizeof(*hdr) + args_len, ESP_APPTRACE_TMO_INFINITE); //TODO: finite tmo
|
||||
if (ptr == NULL) {
|
||||
if (is_uart && payload_len > 0xFFFFu) {
|
||||
return ESP_ERR_INVALID_SIZE;
|
||||
}
|
||||
|
||||
ESP_EARLY_LOGV(TAG, "%s cmd:%d len:%d", __func__, cmd, total_len);
|
||||
uint8_t *buf = esp_apptrace_buffer_get(total_len, ESP_APPTRACE_TMO_INFINITE);
|
||||
if (!buf) {
|
||||
return ESP_ERR_NO_MEM;
|
||||
}
|
||||
|
||||
hdr = (esp_apptrace_fcmd_hdr_t *)ptr;
|
||||
hdr->cmd = cmd;
|
||||
if (prep_args) {
|
||||
prep_args(ptr + sizeof(hdr->cmd), args);
|
||||
if (is_uart) {
|
||||
buf[inx++] = ESP_APPTRACE_FRAME_STX;
|
||||
buf[inx++] = (uint8_t)(payload_len & 0xFF);
|
||||
buf[inx++] = (uint8_t)((payload_len >> 8) & 0xFF);
|
||||
}
|
||||
|
||||
// Payload: CMD then ARGS
|
||||
buf[inx++] = cmd;
|
||||
if (args_len) {
|
||||
prep_args(&buf[inx], args);
|
||||
inx += args_len;
|
||||
}
|
||||
|
||||
if (is_uart) {
|
||||
buf[inx++] = ESP_APPTRACE_FRAME_ETX;
|
||||
}
|
||||
|
||||
// now indicate that this buffer is ready to be sent off to host
|
||||
ret = esp_apptrace_buffer_put(ptr, ESP_APPTRACE_TMO_INFINITE);//TODO: finite tmo
|
||||
esp_err_t ret = esp_apptrace_buffer_put(buf, ESP_APPTRACE_TMO_INFINITE);//TODO: finite tmo
|
||||
if (ret != ESP_OK) {
|
||||
ESP_EARLY_LOGE(TAG, "Failed to put apptrace buffer (%d)!", ret);
|
||||
return ret;
|
||||
@@ -115,7 +147,7 @@ static esp_err_t esp_apptrace_file_rsp_recv(uint8_t *buf, uint32_t buf_len)
|
||||
uint32_t tot_rd = 0;
|
||||
while (tot_rd < buf_len) {
|
||||
uint32_t rd_size = buf_len - tot_rd;
|
||||
esp_err_t ret = esp_apptrace_read(buf + tot_rd, &rd_size, ESP_APPTRACE_TMO_INFINITE); //TODO: finite tmo
|
||||
esp_err_t ret = esp_apptrace_read(buf + tot_rd, &rd_size, ESP_APPTRACE_FILE_RECV_TIMEOUT_US);
|
||||
if (ret != ESP_OK) {
|
||||
ESP_EARLY_LOGE(TAG, "Failed to read (%d)!", ret);
|
||||
return ret;
|
||||
|
||||
@@ -260,8 +260,6 @@ static esp_err_t esp_apptrace_uart_init(void *hw_data, const esp_apptrace_config
|
||||
/* Initialize UART HAL (sets default 8N1 mode) */
|
||||
uart_hal_init(&uart_data->hal_ctx, uart_config->uart_num);
|
||||
|
||||
ESP_LOGI(TAG, "uart_hal_init: %d", uart_config->uart_num);
|
||||
|
||||
HP_UART_SRC_CLK_ATOMIC() {
|
||||
uart_hal_set_sclk(&uart_data->hal_ctx, UART_SCLK_DEFAULT);
|
||||
uart_hal_set_baudrate(&uart_data->hal_ctx, uart_config->baud_rate, sclk_hz);
|
||||
@@ -429,13 +427,25 @@ static uint8_t *esp_apptrace_uart_down_buffer_get(void *hw_data, uint32_t *size,
|
||||
return NULL;
|
||||
}
|
||||
|
||||
uint32_t rx_len = uart_ll_get_rxfifo_len(uart_data->hal_ctx.dev);
|
||||
int to_read = MIN(rx_len, MIN(uart_data->rx_msg_buff_size, *size));
|
||||
if (to_read) {
|
||||
uart_hal_read_rxfifo(&uart_data->hal_ctx, uart_data->rx_msg_buff, &to_read);
|
||||
}
|
||||
*size = to_read;
|
||||
/* Read until we get the requested number of bytes (or timeout) */
|
||||
const uint32_t req_size = MIN(uart_data->rx_msg_buff_size, *size);
|
||||
uint32_t total_read = 0;
|
||||
while (total_read < req_size) {
|
||||
uint32_t rx_len = uart_hal_get_rxfifo_len(&uart_data->hal_ctx);
|
||||
int to_read = MIN((uint32_t)(req_size - total_read), rx_len);
|
||||
if (to_read > 0) {
|
||||
uart_hal_read_rxfifo(&uart_data->hal_ctx, uart_data->rx_msg_buff + total_read, &to_read);
|
||||
total_read += to_read;
|
||||
continue;
|
||||
}
|
||||
|
||||
if (esp_apptrace_tmo_check(tmo) != ESP_OK) {
|
||||
break;
|
||||
}
|
||||
esp_rom_delay_us(50);
|
||||
}
|
||||
|
||||
*size = total_read;
|
||||
esp_apptrace_uart_unlock(uart_data);
|
||||
|
||||
return (*size > 0) ? uart_data->rx_msg_buff : NULL;
|
||||
|
||||
@@ -229,6 +229,8 @@ class OpenOCD:
|
||||
resp = self.telnet.read_very_eager()
|
||||
self.telnet.write(to_bytes(s, '\n'))
|
||||
resp += self.telnet.read_until(b'>', timeout=timeout)
|
||||
if not resp.endswith(b'>'):
|
||||
return ''
|
||||
return to_str(resp)
|
||||
|
||||
def apptrace_wait_stop(self, timeout: int = 30) -> None:
|
||||
|
||||
@@ -24,6 +24,20 @@ static const char *TAG = "example";
|
||||
void blink_dummy_func(void);
|
||||
void some_dummy_func(void);
|
||||
|
||||
#if !CONFIG_APPTRACE_DEST_JTAG
|
||||
#include "soc/uart_pins.h"
|
||||
#include "esp_app_trace.h"
|
||||
/* Override default uart config to use console pins as a uart channel */
|
||||
esp_apptrace_config_t esp_apptrace_get_user_params(void)
|
||||
{
|
||||
esp_apptrace_config_t config = APPTRACE_UART_CONFIG_DEFAULT();
|
||||
config.dest_cfg.uart.uart_num = 0;
|
||||
config.dest_cfg.uart.tx_pin_num = U0TXD_GPIO_NUM;
|
||||
config.dest_cfg.uart.rx_pin_num = U0RXD_GPIO_NUM;
|
||||
return config;
|
||||
}
|
||||
#endif
|
||||
|
||||
static void blink_task(void *pvParameter)
|
||||
{
|
||||
ESP_LOGI(TAG, "Ready for OpenOCD connection");
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
# SPDX-FileCopyrightText: 2022-2025 Espressif Systems (Shanghai) CO LTD
|
||||
# SPDX-FileCopyrightText: 2022-2026 Espressif Systems (Shanghai) CO LTD
|
||||
# SPDX-License-Identifier: Unlicense OR CC0-1.0
|
||||
import os.path
|
||||
import sys
|
||||
import time
|
||||
import typing
|
||||
|
||||
@@ -11,13 +12,58 @@ from pytest_embedded_idf.utils import idf_parametrize
|
||||
if typing.TYPE_CHECKING:
|
||||
from conftest import OpenOCD
|
||||
|
||||
try:
|
||||
from gcov_capture import UartGcovCapture
|
||||
from gcov_capture import get_coverage_data
|
||||
except ImportError:
|
||||
idf_path = os.getenv('IDF_PATH')
|
||||
if not idf_path:
|
||||
raise RuntimeError('IDF_PATH not found. Please run `source $IDF_PATH/export.sh`')
|
||||
esp_app_trace_dir = os.path.join(idf_path, 'tools', 'esp_app_trace')
|
||||
sys.path.insert(0, esp_app_trace_dir)
|
||||
from gcov_capture import UartGcovCapture
|
||||
from gcov_capture import get_coverage_data
|
||||
|
||||
def _test_gcov(openocd_dut: 'OpenOCD', dut: IdfDut) -> None:
|
||||
# create the generated .gcda folder, otherwise would have error: failed to open file.
|
||||
# normally this folder would be created via `idf.py build`. but in CI the non-related files would not be preserved
|
||||
|
||||
def get_expected_gcda_paths(dut: IdfDut) -> list:
|
||||
"""Get list of expected .gcda file paths for this example."""
|
||||
return [
|
||||
os.path.join(
|
||||
dut.app.binary_path, 'esp-idf', 'main', 'CMakeFiles', '__idf_main.dir', 'gcov_example_main.c.gcda'
|
||||
),
|
||||
os.path.join(
|
||||
dut.app.binary_path, 'esp-idf', 'main', 'CMakeFiles', '__idf_main.dir', 'gcov_example_func.c.gcda'
|
||||
),
|
||||
os.path.join(dut.app.binary_path, 'esp-idf', 'sample', 'CMakeFiles', '__idf_sample.dir', 'some_funcs.c.gcda'),
|
||||
]
|
||||
|
||||
|
||||
def prepare_test(dut: IdfDut) -> list:
|
||||
"""Prepare test environment: create directories and clean up old .gcda files.
|
||||
|
||||
Returns list of expected .gcda file paths.
|
||||
"""
|
||||
# Create the generated .gcda folders
|
||||
# Normally created via `idf.py build`, but in CI non-related files aren't preserved
|
||||
os.makedirs(os.path.join(dut.app.binary_path, 'esp-idf', 'main', 'CMakeFiles', '__idf_main.dir'), exist_ok=True)
|
||||
os.makedirs(os.path.join(dut.app.binary_path, 'esp-idf', 'sample', 'CMakeFiles', '__idf_sample.dir'), exist_ok=True)
|
||||
|
||||
# Get expected paths and clean up old files
|
||||
expected_gcda_paths = get_expected_gcda_paths(dut)
|
||||
for gcda_path in expected_gcda_paths:
|
||||
if os.path.isfile(gcda_path):
|
||||
try:
|
||||
os.remove(gcda_path)
|
||||
print(f'Removed old .gcda file: {os.path.basename(gcda_path)}')
|
||||
except OSError as e:
|
||||
print(f'Warning: Could not remove {gcda_path}: {e}')
|
||||
|
||||
return expected_gcda_paths
|
||||
|
||||
|
||||
def _test_gcov(openocd_dut: 'OpenOCD', dut: IdfDut) -> None:
|
||||
expected_gcda_paths = prepare_test(dut)
|
||||
|
||||
def expect_counter_output(loop: int, timeout: int = 10) -> None:
|
||||
dut.expect_exact(
|
||||
[f'blink_dummy_func: Counter = {loop}', f'some_dummy_func: Counter = {loop * 2}'],
|
||||
@@ -25,8 +71,8 @@ def _test_gcov(openocd_dut: 'OpenOCD', dut: IdfDut) -> None:
|
||||
timeout=timeout,
|
||||
)
|
||||
|
||||
def dump_coverage(cmd: str) -> None:
|
||||
response = openocd.write(cmd)
|
||||
def dump_coverage(on_the_fly: bool, expected_counts: dict | None = None) -> None:
|
||||
response = openocd.gcov_dump(on_the_fly=on_the_fly)
|
||||
|
||||
expect_lines = [
|
||||
'Targets connected.',
|
||||
@@ -47,6 +93,21 @@ def _test_gcov(openocd_dut: 'OpenOCD', dut: IdfDut) -> None:
|
||||
|
||||
assert len(expect_lines) == 0
|
||||
|
||||
# Verify execution counts if expected and coverage data is available
|
||||
if expected_counts:
|
||||
coverage = get_coverage_data(dut.app.binary_path, expected_gcda_paths)
|
||||
if coverage: # Only verify if detailed data is available
|
||||
print(f'Coverage data: {coverage}')
|
||||
for func, expected_count in expected_counts.items():
|
||||
actual_count = coverage.get(func)
|
||||
assert actual_count == expected_count, f'Expected {func}={expected_count}, got {actual_count}'
|
||||
else:
|
||||
# Backup verification: ensure .gcda files exist
|
||||
print('Coverage data not available (gcov tool not found)')
|
||||
for gcda_path in expected_gcda_paths:
|
||||
assert os.path.isfile(gcda_path), f'Expected .gcda file not found: {gcda_path}'
|
||||
print('Basic verification passed (all .gcda files exist)')
|
||||
|
||||
dut.expect_exact('example: Ready for OpenOCD connection', timeout=5)
|
||||
with openocd_dut.run() as openocd:
|
||||
openocd.write('reset run')
|
||||
@@ -55,32 +116,86 @@ def _test_gcov(openocd_dut: 'OpenOCD', dut: IdfDut) -> None:
|
||||
expect_counter_output(0)
|
||||
dut.expect('Ready to dump GCOV data...', timeout=5)
|
||||
|
||||
# Test two hard-coded dumps
|
||||
dump_coverage('esp gcov dump')
|
||||
# Test two hard-coded dumps with verification
|
||||
dump_coverage(False, {'gcov_example_func.c:blink_dummy_func': 1, 'some_funcs.c:some_dummy_func': 1})
|
||||
dut.expect('GCOV data have been dumped.', timeout=5)
|
||||
expect_counter_output(1)
|
||||
dut.expect('Ready to dump GCOV data...', timeout=5)
|
||||
dump_coverage('esp gcov dump')
|
||||
|
||||
dump_coverage(False, {'gcov_example_func.c:blink_dummy_func': 2, 'some_funcs.c:some_dummy_func': 2})
|
||||
dut.expect('GCOV data have been dumped.', timeout=5)
|
||||
|
||||
for i in range(2, 6):
|
||||
expect_counter_output(i)
|
||||
|
||||
for _ in range(3):
|
||||
# Test instant run-time dumps with verification
|
||||
expected_runtime_counts = [
|
||||
{'gcov_example_func.c:blink_dummy_func': 7, 'some_funcs.c:some_dummy_func': 7},
|
||||
{'gcov_example_func.c:blink_dummy_func': 8, 'some_funcs.c:some_dummy_func': 8},
|
||||
{'gcov_example_func.c:blink_dummy_func': 10, 'some_funcs.c:some_dummy_func': 10},
|
||||
]
|
||||
|
||||
for expected in expected_runtime_counts:
|
||||
time.sleep(1)
|
||||
# Test instant run-time dump
|
||||
dump_coverage('esp gcov')
|
||||
dump_coverage(True, expected)
|
||||
|
||||
|
||||
@pytest.mark.jtag
|
||||
@idf_parametrize('config', ['gcov_jtag'], indirect=['config'])
|
||||
@idf_parametrize('target', ['esp32', 'esp32c2', 'esp32s2'], indirect=['target'])
|
||||
def test_gcov(openocd_dut: 'OpenOCD', dut: IdfDut) -> None:
|
||||
_test_gcov(openocd_dut, dut)
|
||||
|
||||
|
||||
@pytest.mark.usb_serial_jtag
|
||||
@idf_parametrize('config', ['gcov_jtag'], indirect=['config'])
|
||||
@idf_parametrize(
|
||||
'target', ['esp32s3', 'esp32c3', 'esp32c5', 'esp32c6', 'esp32c61', 'esp32h2', 'esp32p4'], indirect=['target']
|
||||
)
|
||||
def test_gcov_usj(openocd_dut: 'OpenOCD', dut: IdfDut) -> None:
|
||||
_test_gcov(openocd_dut, dut)
|
||||
|
||||
|
||||
def _test_gcov_uart(dut: IdfDut) -> None:
|
||||
# Close console port to free up the port for the gcov capture
|
||||
dut.serial.close()
|
||||
|
||||
expected_gcda_paths = prepare_test(dut)
|
||||
log_file = os.path.join(dut.logdir, 'gcov_uart.log')
|
||||
uart_port = dut.serial.port
|
||||
baud = dut.app.sdkconfig.get('APPTRACE_UART_BAUDRATE')
|
||||
with UartGcovCapture(port=uart_port, baudrate=baud, log_file=log_file, log_level=1) as uart_capture:
|
||||
uart_capture.run(background=True)
|
||||
time.sleep(0.5)
|
||||
|
||||
# Expected execution counts
|
||||
expected_counts = [
|
||||
{'gcov_example_func.c:blink_dummy_func': 1, 'some_funcs.c:some_dummy_func': 1}, # First dump
|
||||
{'gcov_example_func.c:blink_dummy_func': 2, 'some_funcs.c:some_dummy_func': 2}, # Second dump
|
||||
]
|
||||
|
||||
# Verify each dump
|
||||
for dump_num, expected in enumerate(expected_counts, start=1):
|
||||
if uart_capture.wait_for_fstop(timeout=10.0):
|
||||
coverage = get_coverage_data(dut.app.binary_path, expected_gcda_paths)
|
||||
|
||||
if coverage: # Only verify details if coverage data is available
|
||||
print(f'Coverage data: {coverage}')
|
||||
for func, expected_count in expected.items():
|
||||
actual_count = coverage.get(func)
|
||||
assert actual_count == expected_count, f'Expected {func}={expected_count}, got {actual_count}'
|
||||
else:
|
||||
# Backup verification: ensure .gcda files exist
|
||||
print('Coverage data not available (gcov tool not found)')
|
||||
for gcda_path in expected_gcda_paths:
|
||||
assert os.path.isfile(gcda_path), f'Expected .gcda file not found: {gcda_path}'
|
||||
print('Basic verification passed (all .gcda files exist)')
|
||||
else:
|
||||
assert False, f'esp_gcov_dump {dump_num} timeout'
|
||||
|
||||
|
||||
@pytest.mark.generic
|
||||
@idf_parametrize('config', ['gcov_uart'], indirect=['config'])
|
||||
@idf_parametrize('target', ['supported_targets'], indirect=['target'])
|
||||
def test_gcov_uart(dut: IdfDut) -> None:
|
||||
_test_gcov_uart(dut)
|
||||
|
||||
@@ -0,0 +1,4 @@
|
||||
CONFIG_APPTRACE_DEST_JTAG=y
|
||||
CONFIG_APPTRACE_LOCK_ENABLE=y
|
||||
CONFIG_APPTRACE_ONPANIC_HOST_FLUSH_TMO=-1
|
||||
CONFIG_APPTRACE_POSTMORTEM_FLUSH_THRESH=0
|
||||
@@ -0,0 +1,5 @@
|
||||
CONFIG_ESP_CONSOLE_NONE=y
|
||||
CONFIG_APPTRACE_DEST_UART=y
|
||||
CONFIG_APPTRACE_DEST_UART_NUM=0
|
||||
CONFIG_APPTRACE_UART_BAUDRATE=1000000
|
||||
CONFIG_APPTRACE_UART_TX_MSG_SIZE=256
|
||||
@@ -1,9 +1,4 @@
|
||||
CONFIG_ESP_TRACE_ENABLE=y
|
||||
CONFIG_ESP_TRACE_LIB_NONE=y
|
||||
CONFIG_ESP_TRACE_TRANSPORT_APPTRACE=y
|
||||
CONFIG_APPTRACE_DEST_JTAG=y
|
||||
CONFIG_APPTRACE_LOCK_ENABLE=y
|
||||
CONFIG_APPTRACE_ONPANIC_HOST_FLUSH_TMO=-1
|
||||
CONFIG_APPTRACE_POSTMORTEM_FLUSH_THRESH=0
|
||||
|
||||
CONFIG_ESP_GCOV_ENABLE=y
|
||||
|
||||
@@ -86,6 +86,7 @@ tools/ci/test_autocomplete/test_autocomplete.py
|
||||
tools/ci/test_configure_ci_environment.sh
|
||||
tools/docker/entrypoint.sh
|
||||
tools/docs/gen_version_specific_includes.py
|
||||
tools/esp_app_trace/gcov_capture.py
|
||||
tools/esp_app_trace/logtrace_proc.py
|
||||
tools/esp_app_trace/sysviewtrace_proc.py
|
||||
tools/esp_app_trace/test/logtrace/test.sh
|
||||
|
||||
Executable
+837
@@ -0,0 +1,837 @@
|
||||
#!/usr/bin/env python
|
||||
#
|
||||
# SPDX-FileCopyrightText: 2025-2026 Espressif Systems (Shanghai) CO LTD
|
||||
# SPDX-License-Identifier: Apache-2.0
|
||||
#
|
||||
# GCOV Data Capture Tool
|
||||
# Captures data from UART serial port, logs raw bytes in hex format, and parses/executes
|
||||
# file I/O commands (gcov host file protocol) on the fly.
|
||||
#
|
||||
# Usage:
|
||||
# CLI mode:
|
||||
# python gcov_capture.py -p /dev/tty.usbserial-101 -b 115200 -o gcov.log -l1
|
||||
#
|
||||
# Python API for pytest (single dump):
|
||||
# from gcov_capture import gcov_capture_from_uart
|
||||
#
|
||||
# def test_my_gcov(dut):
|
||||
# dut.serial.close() # Free up the port if you use the same port with the dut.serial.port
|
||||
# assert gcov_capture_from_uart(
|
||||
# port=dut.serial.port,
|
||||
# baudrate=dut.app.sdkconfig.get('APPTRACE_UART_BAUDRATE'),
|
||||
# log_file='gcov.log'
|
||||
# ), "GCOV dump timeout"
|
||||
# # Create report with idf.py gcovr-report
|
||||
# # idf.py gcovr-report # Uses default build dir
|
||||
# # idf.py -B build_esp32c5_gcov_uart gcovr-report
|
||||
#
|
||||
# Python API for pytest (multiple dumps):
|
||||
# from gcov_capture import UartGcovCapture
|
||||
#
|
||||
# def test_multiple_gcov_dumps(dut):
|
||||
# dut.serial.close() # Free up the port if you use the same port with the dut.serial.port
|
||||
# with UartGcovCapture(
|
||||
# port=dut.serial.port,
|
||||
# baudrate=dut.app.sdkconfig.get('APPTRACE_UART_BAUDRATE'),
|
||||
# log_file='gcov.log',
|
||||
# log_level=1
|
||||
# ) as capture:
|
||||
# capture.run(background=True)
|
||||
# time.sleep(0.5)
|
||||
# # Wait for multiple dumps
|
||||
# for i in range(3):
|
||||
# assert capture.wait_for_fstop(timeout=30.0), f"Dump {i+1} timeout"
|
||||
# print(f"Dump {i+1} complete, total: {capture.get_fstop_count()}")
|
||||
# # Create report with idf.py gcovr-report
|
||||
# # idf.py gcovr-report # Uses default build dir
|
||||
# # idf.py -B build_esp32c5_gcov_uart gcovr-report
|
||||
#
|
||||
# Coverage verification helper (works for both JTAG and UART):
|
||||
# from gcov_capture import get_coverage_data
|
||||
# expected_gcda_paths = [...] # List of .gcda files under build dir
|
||||
# coverage = get_coverage_data(dut.app.binary_path, expected_gcda_paths)
|
||||
# print(coverage)
|
||||
|
||||
import argparse
|
||||
import gzip
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
import threading
|
||||
import time
|
||||
from collections.abc import Callable
|
||||
from dataclasses import dataclass
|
||||
from typing import Any
|
||||
from typing import BinaryIO
|
||||
from typing import TextIO
|
||||
from typing import cast
|
||||
|
||||
import serial
|
||||
|
||||
|
||||
def parse_args() -> argparse.Namespace:
|
||||
parser = argparse.ArgumentParser(
|
||||
description='Capture GCOV UART data and process file I/O commands in real-time',
|
||||
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||||
allow_abbrev=False,
|
||||
)
|
||||
|
||||
parser.add_argument('-p', '--port', required=True, help='Serial port (e.g., /dev/ttyUSB0, COM3)')
|
||||
parser.add_argument('-b', '--baudrate', type=int, required=True, help='Serial baudrate (e.g., 115200)')
|
||||
parser.add_argument('-o', '--output', help='Optional log file path (only used if log-level > 0)')
|
||||
parser.add_argument(
|
||||
'-l',
|
||||
'--log-level',
|
||||
type=int,
|
||||
default=0,
|
||||
choices=[0, 1, 2],
|
||||
help='Log level: 0=no logging, 1=REQ+RES+SUMMARY, 2=1+CONSOLE',
|
||||
)
|
||||
|
||||
return parser.parse_args()
|
||||
|
||||
|
||||
def _le_u32(data: bytes, offset: int) -> int:
|
||||
return int.from_bytes(data[offset : offset + 4], byteorder='little', signed=False)
|
||||
|
||||
|
||||
def _le_i32(data: bytes, offset: int) -> int:
|
||||
return int.from_bytes(data[offset : offset + 4], byteorder='little', signed=True)
|
||||
|
||||
|
||||
def _normalize_mode(mode: str) -> str | None:
|
||||
s = set(mode)
|
||||
base = s & {'r', 'w', 'a', 'x'}
|
||||
|
||||
if len(base) != 1 or s - {'r', 'w', 'a', 'x', '+', 'b', 't'}:
|
||||
return None
|
||||
|
||||
return next(iter(base)) + 'b' + ('+' if '+' in s else '')
|
||||
|
||||
|
||||
@dataclass
|
||||
class CommandInfo:
|
||||
"""Metadata for a file I/O command."""
|
||||
|
||||
code: int
|
||||
name: str
|
||||
min_payload_len: int
|
||||
needs_fd_validation: bool
|
||||
handler: Callable | None = None
|
||||
arg_extractor: Callable | None = None
|
||||
|
||||
|
||||
class FileOpProcessor:
|
||||
"""Emulates the gcov file protocol operations like esp_gcov_process_data."""
|
||||
|
||||
# Common response codes (little-endian)
|
||||
RESP_ZERO = (0).to_bytes(4, 'little') # Zero (success for most ops, error for fd/size)
|
||||
RESP_ERROR_NEG1 = (-1).to_bytes(4, 'little', signed=True) # -1 (error for signed ops)
|
||||
RESP_FWRITE_OK = (1).to_bytes(4, 'little') # 1 (FWRITE success)
|
||||
|
||||
def __init__(self) -> None:
|
||||
self.files: dict[int, BinaryIO] = {}
|
||||
self.max_files = 1024
|
||||
self.next_fd = 1
|
||||
self.running = True
|
||||
|
||||
# Unified command table with all metadata
|
||||
self._commands: dict[int, CommandInfo] = {
|
||||
0x0: CommandInfo(0x0, 'FOPEN', 0, False, None, None), # Special handling
|
||||
0x1: CommandInfo(0x1, 'FCLOSE', 4, True, self._close, lambda p: (_le_u32(p, 0),)),
|
||||
0x2: CommandInfo(0x2, 'FWRITE', 4, True, self._write, lambda p: (_le_u32(p, 0), p[4:])),
|
||||
0x3: CommandInfo(0x3, 'FREAD', 8, True, self._read, lambda p: (_le_u32(p, 0), _le_u32(p, 4))),
|
||||
0x4: CommandInfo(
|
||||
0x4, 'FSEEK', 12, True, self._seek, lambda p: (_le_u32(p, 0), _le_i32(p, 4), _le_i32(p, 8))
|
||||
),
|
||||
0x5: CommandInfo(0x5, 'FTELL', 4, True, self._tell, lambda p: (_le_u32(p, 0),)),
|
||||
0x6: CommandInfo(0x6, 'FSTOP', 0, False, None, None), # Special handling
|
||||
0x7: CommandInfo(0x7, 'FEOF', 4, True, self._feof, lambda p: (_le_u32(p, 0),)),
|
||||
}
|
||||
|
||||
def _alloc_fd(self) -> int:
|
||||
if self.next_fd > self.max_files:
|
||||
return 0
|
||||
fd = self.next_fd
|
||||
self.next_fd += 1
|
||||
return fd
|
||||
|
||||
def _get_file(self, fd: int) -> BinaryIO | None:
|
||||
return self.files.get(fd) # type: ignore[return-value]
|
||||
|
||||
def _validate_fd(self, fd: int) -> bool:
|
||||
"""Validate that FD is in valid range and exists."""
|
||||
if fd < 1 or fd > self.max_files:
|
||||
return False
|
||||
return fd in self.files
|
||||
|
||||
def _close_all_files(self) -> None:
|
||||
"""Close all open files and reset the files dictionary."""
|
||||
for fd, handle in list(self.files.items()):
|
||||
try:
|
||||
handle.close()
|
||||
except OSError:
|
||||
pass # Ignore errors when closing
|
||||
self.files.clear()
|
||||
self.next_fd = 1 # Reset FD counter
|
||||
|
||||
def _open(self, path: str, mode: str) -> tuple[str, bytes]:
|
||||
if len(self.files) >= self.max_files:
|
||||
return 'FOPEN failed: max files reached', self.RESP_ZERO
|
||||
|
||||
fd = self._alloc_fd()
|
||||
if fd == 0:
|
||||
return 'FOPEN failed: fd alloc error', self.RESP_ZERO
|
||||
|
||||
normalized = _normalize_mode(mode)
|
||||
if not normalized:
|
||||
return f"FOPEN path='{path}' invalid mode='{mode}'", self.RESP_ZERO
|
||||
|
||||
try:
|
||||
# Create parent dir for write/append/create modes
|
||||
if normalized[0] in ('w', 'a', 'x'):
|
||||
dir_path = os.path.dirname(path)
|
||||
if dir_path:
|
||||
try:
|
||||
os.makedirs(dir_path, exist_ok=True)
|
||||
except OSError:
|
||||
pass # let open() raise the real error if it's still a problem
|
||||
|
||||
# Open in binary mode (gcov files are always binary)
|
||||
# Always use binary mode since gcov protocol works with binary data
|
||||
if 'b' in normalized:
|
||||
handle = cast(BinaryIO, open(path, normalized, buffering=0))
|
||||
else:
|
||||
# Force binary mode even if not specified
|
||||
handle = cast(BinaryIO, open(path, normalized[0] + 'b' + normalized[2:], buffering=0))
|
||||
self.files[fd] = handle
|
||||
return f"FOPEN path='{path}' mode='{mode}' normalized='{normalized}' -> fd={fd}", fd.to_bytes(4, 'little')
|
||||
except (OSError, ValueError) as e:
|
||||
return f"FOPEN path='{path}' mode='{mode}' failed: {e}", self.RESP_ZERO
|
||||
|
||||
def _close(self, fd: int) -> tuple[str, bytes]:
|
||||
handle = self._get_file(fd)
|
||||
if not handle:
|
||||
return f'FCLOSE fd={fd} failed: not open', self.RESP_ERROR_NEG1
|
||||
try:
|
||||
handle.close()
|
||||
del self.files[fd]
|
||||
return f'FCLOSE fd={fd} -> 0', self.RESP_ZERO
|
||||
except OSError as e:
|
||||
return f'FCLOSE fd={fd} failed: {e}', self.RESP_ERROR_NEG1
|
||||
|
||||
def _write(self, fd: int, payload: bytes) -> tuple[str, bytes]:
|
||||
handle = self._get_file(fd)
|
||||
if not handle:
|
||||
return f'FWRITE fd={fd} failed: not open', self.RESP_ZERO
|
||||
try:
|
||||
written = handle.write(payload)
|
||||
handle.flush()
|
||||
fret = self.RESP_FWRITE_OK if written else self.RESP_ZERO
|
||||
return (
|
||||
f'FWRITE fd={fd} len={len(payload)} -> {1 if written else 0}',
|
||||
fret,
|
||||
)
|
||||
except OSError as e:
|
||||
return f'FWRITE fd={fd} failed: {e}', self.RESP_ZERO
|
||||
|
||||
def _read(self, fd: int, length: int) -> tuple[str, bytes]:
|
||||
handle = self._get_file(fd)
|
||||
if not handle:
|
||||
return f'FREAD fd={fd} failed: not open', self.RESP_ZERO
|
||||
try:
|
||||
data_bytes = handle.read(length)
|
||||
if not isinstance(data_bytes, bytes):
|
||||
# Should not happen with BinaryIO, but be defensive
|
||||
data_bytes = bytes(data_bytes)
|
||||
fret = len(data_bytes)
|
||||
resp = fret.to_bytes(4, 'little') + data_bytes
|
||||
preview = data_bytes[:16].hex(' ')
|
||||
suffix = '' if len(data_bytes) <= 16 else ' ...'
|
||||
return f'FREAD fd={fd} len={length} -> {fret} bytes "{preview}{suffix}"', resp
|
||||
except OSError as e:
|
||||
return f'FREAD fd={fd} failed: {e}', self.RESP_ZERO
|
||||
|
||||
def _seek(self, fd: int, offset: int, whence: int) -> tuple[str, bytes]:
|
||||
handle = self._get_file(fd)
|
||||
if not handle:
|
||||
return f'FSEEK fd={fd} failed: not open', self.RESP_ERROR_NEG1
|
||||
try:
|
||||
fret = handle.seek(offset, whence)
|
||||
return f'FSEEK fd={fd} off={offset} whence={whence} -> {fret}', fret.to_bytes(4, 'little', signed=True)
|
||||
except OSError as e:
|
||||
return f'FSEEK fd={fd} off={offset} whence={whence} failed: {e}', self.RESP_ERROR_NEG1
|
||||
|
||||
def _tell(self, fd: int) -> tuple[str, bytes]:
|
||||
handle = self._get_file(fd)
|
||||
if not handle:
|
||||
return f'FTELL fd={fd} failed: not open', self.RESP_ERROR_NEG1
|
||||
try:
|
||||
pos = handle.tell()
|
||||
return f'FTELL fd={fd} -> {pos}', pos.to_bytes(4, 'little', signed=True)
|
||||
except OSError as e:
|
||||
return f'FTELL fd={fd} failed: {e}', self.RESP_ERROR_NEG1
|
||||
|
||||
def _feof(self, fd: int) -> tuple[str, bytes]:
|
||||
handle = self._get_file(fd)
|
||||
if not handle:
|
||||
return f'FEOF fd={fd} failed: not open', self.RESP_ERROR_NEG1
|
||||
try:
|
||||
cur = handle.tell()
|
||||
handle.seek(0, 2) # end
|
||||
end = handle.tell()
|
||||
handle.seek(cur, 0)
|
||||
eof = int(cur >= end)
|
||||
return f'FEOF fd={fd} -> {eof}', eof.to_bytes(4, 'little', signed=True)
|
||||
except OSError as e:
|
||||
return f'FEOF fd={fd} failed: {e}', self.RESP_ERROR_NEG1
|
||||
|
||||
def process(self, data: bytes) -> tuple[str, bytes | None]:
|
||||
"""Process a single command buffer. Returns summary and optional response bytes."""
|
||||
if not data:
|
||||
return 'Empty data', None
|
||||
|
||||
cmd = data[0]
|
||||
payload = data[1:]
|
||||
|
||||
# Look up command info
|
||||
cmd_info = self._commands.get(cmd)
|
||||
if cmd_info is None:
|
||||
return f'Unknown command 0x{cmd:02X} (len={len(data)})', None
|
||||
|
||||
# Handle FOPEN specially. Extract path and mode from payload.
|
||||
if cmd_info.name == 'FOPEN':
|
||||
null_pos = payload.find(0)
|
||||
if null_pos <= 0:
|
||||
return 'FOPEN invalid (empty or missing path)', self.RESP_ZERO
|
||||
path = payload[:null_pos].decode('utf-8', errors='replace')
|
||||
mode_start = payload[null_pos + 1 :]
|
||||
null_pos = mode_start.find(0)
|
||||
mode = mode_start[:null_pos].decode('utf-8', errors='replace') if null_pos != -1 else ''
|
||||
return self._open(path, mode)
|
||||
|
||||
# Handle FSTOP specially (close all files, no response)
|
||||
if cmd_info.name == 'FSTOP':
|
||||
self._close_all_files()
|
||||
return 'FSTOP (all files closed, continuing capture)', None
|
||||
|
||||
# Handle other commands via unified command table
|
||||
if len(payload) < cmd_info.min_payload_len:
|
||||
return f'{cmd_info.name} invalid (too short)', None
|
||||
|
||||
# Extract arguments using the extractor function
|
||||
if cmd_info.arg_extractor is None:
|
||||
return f'{cmd_info.name} has no arg extractor', None
|
||||
args = cmd_info.arg_extractor(payload)
|
||||
|
||||
if cmd_info.needs_fd_validation:
|
||||
fd: int = args[0] # Type annotation for mypy
|
||||
if not self._validate_fd(fd):
|
||||
return f'{cmd_info.name} invalid fd={fd} (not open or out of range)', None
|
||||
|
||||
# Call handler function
|
||||
if cmd_info.handler is None:
|
||||
return f'{cmd_info.name} has no handler', None
|
||||
result: tuple[str, bytes | None] = cmd_info.handler(*args)
|
||||
return result
|
||||
|
||||
def _get_cmd_name(self, cmd: int) -> str:
|
||||
"""Get command name for error messages."""
|
||||
cmd_info = self._commands.get(cmd)
|
||||
return cmd_info.name if cmd_info else f'0x{cmd:02X}'
|
||||
|
||||
|
||||
def _parse_command(data: bytes, proc: FileOpProcessor) -> tuple[str, bytes | None]:
|
||||
"""Process a command buffer and return summary plus optional response bytes."""
|
||||
return proc.process(data)
|
||||
|
||||
|
||||
class FrameParser:
|
||||
"""State machine parser for STX|LEN(2)|payload|ETX frames."""
|
||||
|
||||
STX = 0x02
|
||||
ETX = 0x03
|
||||
|
||||
# states
|
||||
WAIT_STX = 0
|
||||
WAIT_LENL = 1
|
||||
WAIT_LENH = 2
|
||||
WAIT_PAYLOAD = 3
|
||||
WAIT_ETX = 4
|
||||
|
||||
# Timeout in seconds for incomplete frames (100ms)
|
||||
FRAME_TIMEOUT = 0.1
|
||||
|
||||
def __init__(self) -> None:
|
||||
self.state = self.WAIT_STX
|
||||
self.length = 0
|
||||
self.payload = bytearray()
|
||||
self.frame = bytearray()
|
||||
self.state_timestamp = time.time() # Track when we entered current state
|
||||
|
||||
def reset(self) -> None:
|
||||
"""Reset parser to initial state."""
|
||||
self.state = self.WAIT_STX
|
||||
self.length = 0
|
||||
self.payload.clear()
|
||||
self.frame.clear()
|
||||
self.state_timestamp = time.time()
|
||||
|
||||
def check_timeout(self) -> bytes | None:
|
||||
"""Check if parser has been stuck in a non-STX state for too long.
|
||||
|
||||
Returns the incomplete frame bytes if timeout occurred (and resets parser),
|
||||
or None if no timeout.
|
||||
"""
|
||||
# Only check timeout if we're not in WAIT_STX state
|
||||
if self.state != self.WAIT_STX:
|
||||
elapsed = time.time() - self.state_timestamp
|
||||
if elapsed > self.FRAME_TIMEOUT:
|
||||
# Timeout - save incomplete frame for caller to log
|
||||
incomplete = bytes(self.frame) if self.frame else None
|
||||
self.reset()
|
||||
return incomplete
|
||||
return None
|
||||
|
||||
def feed(self, data: bytes) -> list[tuple[bytes, bytes]]:
|
||||
"""Feed bytes to the parser.
|
||||
|
||||
Returns list of (frame, payload) for each complete frame found.
|
||||
"""
|
||||
out: list[tuple[bytes, bytes]] = []
|
||||
for byte in data:
|
||||
result = self._process_byte(byte)
|
||||
if result is not None:
|
||||
out.append(result)
|
||||
return out
|
||||
|
||||
def _process_byte(self, byte: int) -> tuple[bytes, bytes] | None:
|
||||
"""Process a single byte. Returns (frame, payload) when frame is complete."""
|
||||
self.frame.append(byte)
|
||||
|
||||
if self.state == self.WAIT_STX:
|
||||
if byte == self.STX:
|
||||
self.state = self.WAIT_LENL
|
||||
self.payload.clear()
|
||||
self.state_timestamp = time.time() # Update timestamp on state change
|
||||
else:
|
||||
# Not STX, reset and wait for next STX
|
||||
self.reset()
|
||||
|
||||
elif self.state == self.WAIT_LENL:
|
||||
self.length = byte
|
||||
self.state = self.WAIT_LENH
|
||||
self.state_timestamp = time.time()
|
||||
|
||||
elif self.state == self.WAIT_LENH:
|
||||
self.length |= byte << 8
|
||||
self.payload.clear()
|
||||
if self.length == 0:
|
||||
# There is no payload, go directly to ETX
|
||||
self.state = self.WAIT_ETX
|
||||
else:
|
||||
self.state = self.WAIT_PAYLOAD
|
||||
self.state_timestamp = time.time()
|
||||
|
||||
elif self.state == self.WAIT_PAYLOAD:
|
||||
self.payload.append(byte)
|
||||
if len(self.payload) >= self.length:
|
||||
self.state = self.WAIT_ETX
|
||||
self.state_timestamp = time.time()
|
||||
|
||||
elif self.state == self.WAIT_ETX:
|
||||
if byte == self.ETX:
|
||||
# Frame complete! Save the result and reset for next frame
|
||||
result = (bytes(self.frame), bytes(self.payload))
|
||||
self.reset()
|
||||
return result
|
||||
else:
|
||||
# Expected ETX but got something else, reset
|
||||
self.reset()
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def _log_frame_to_file(file_desc: TextIO | None, log_level: int, frame: bytes, resp: bytes | None, summary: str) -> str:
|
||||
"""Write frame data to log file based on log level and return console response string.
|
||||
|
||||
Args:
|
||||
file_desc: Open file descriptor (or None to skip logging)
|
||||
log_level: Log level (1=REQ+RES+summary, 2=1+CONSOLE)
|
||||
frame: Complete frame bytes including STX/LEN/payload/ETX
|
||||
resp: Response bytes (or None)
|
||||
summary: Parse summary string
|
||||
|
||||
Returns:
|
||||
Console response string to append to print output
|
||||
"""
|
||||
if not file_desc or log_level <= 0:
|
||||
return ''
|
||||
|
||||
hex_chunk = ' '.join(f'{byte:02X}' for byte in frame)
|
||||
resp_hex = resp.hex(' ').upper() if resp else ''
|
||||
console_resp = ''
|
||||
|
||||
if log_level >= 1:
|
||||
file_desc.write(f'REQ: {hex_chunk}\r\n')
|
||||
if resp:
|
||||
file_desc.write(f'RES: {resp_hex}\n')
|
||||
if summary:
|
||||
file_desc.write(f' : {summary}\r\n')
|
||||
|
||||
if log_level >= 2 and resp:
|
||||
console_resp = f' | RES: {resp_hex}'
|
||||
|
||||
file_desc.flush()
|
||||
return console_resp
|
||||
|
||||
|
||||
class UartGcovCapture:
|
||||
"""UART-based GCOV capture handler for both CLI and pytest-embedded-idf integration.
|
||||
|
||||
Can run in either blocking mode (CLI) or background thread mode (pytest).
|
||||
Processes gcov file I/O commands in real-time, sending responses immediately
|
||||
to avoid blocking gcov rtio operations.
|
||||
|
||||
Usage in CLI:
|
||||
capture = UartGcovCapture(port='/dev/ttyUSB0', baudrate=115200, log_file='gcov.log', log_level=2)
|
||||
capture.run(background=False) # Blocks until Ctrl+C
|
||||
|
||||
Usage in pytest:
|
||||
with UartGcovCapture(port='/dev/ttyUSB0', baudrate=115200, log_file='gcov.log', log_level=2) as capture:
|
||||
capture.run(background=True)
|
||||
# ... wait for FSTOP or perform test actions ...
|
||||
if capture.wait_for_fstop(timeout=30):
|
||||
print(f'Dump completed, count: {capture.get_fstop_count()}')
|
||||
"""
|
||||
|
||||
def __init__(self, port: str, baudrate: int = 115200, log_file: str | None = None, log_level: int = 2) -> None:
|
||||
"""Initialize UART GCOV capture.
|
||||
|
||||
Args:
|
||||
port: Serial port path (e.g., '/dev/ttyUSB0')
|
||||
baudrate: Serial baud rate (default: 115200)
|
||||
log_file: Optional path to log file for REQ/RES/SUMMARY output (default: None)
|
||||
log_level: Log level: 0=nothing, 1=REQ+RES+SUMMARY, 2=1+CONSOLE (default: 0)
|
||||
"""
|
||||
self.port = port
|
||||
self.baudrate = baudrate
|
||||
self.log_file = log_file
|
||||
self.log_level = log_level
|
||||
self.ser: serial.Serial | None = None
|
||||
self.capture_thread: threading.Thread | None = None
|
||||
self.stop_event = threading.Event()
|
||||
self.proc = FileOpProcessor()
|
||||
self.parser = FrameParser()
|
||||
self.logger = logging.getLogger(__name__)
|
||||
self.file_desc: TextIO | None = None
|
||||
self.fstop_count = 0 # Track number of FSTOP commands received
|
||||
self.fstop_event = threading.Event() # Event to signal FSTOP received
|
||||
self.total_bytes = 0 # Track total bytes captured
|
||||
|
||||
def __enter__(self) -> 'UartGcovCapture':
|
||||
"""Context manager entry."""
|
||||
return self
|
||||
|
||||
def __exit__(self, exception_type: Any, exception_value: Any, exception_traceback: Any) -> None:
|
||||
"""Context manager exit - stop capture and cleanup."""
|
||||
self.stop()
|
||||
|
||||
def run(self, background: bool = True) -> 'UartGcovCapture':
|
||||
"""Start UART capture.
|
||||
|
||||
Args:
|
||||
background: If True, run in background thread (pytest mode).
|
||||
If False, run in foreground blocking mode (CLI mode).
|
||||
|
||||
Returns:
|
||||
self for chaining
|
||||
"""
|
||||
if background:
|
||||
return self._run_background()
|
||||
else:
|
||||
return self._run_foreground()
|
||||
|
||||
def _run_background(self) -> 'UartGcovCapture':
|
||||
if self.capture_thread is not None and self.capture_thread.is_alive():
|
||||
self.logger.warning('Capture thread already running')
|
||||
return self
|
||||
|
||||
self._open_serial_and_log()
|
||||
|
||||
self.stop_event.clear()
|
||||
self.capture_thread = threading.Thread(target=self._capture_loop, daemon=True, name='UartGcovCapture')
|
||||
self.capture_thread.start()
|
||||
self.logger.info('Started UART capture thread')
|
||||
return self
|
||||
|
||||
def _run_foreground(self) -> 'UartGcovCapture':
|
||||
self._open_serial_and_log()
|
||||
|
||||
self.logger.info('Started gcov data capture and processing... Press Ctrl+C to exit')
|
||||
|
||||
try:
|
||||
self._capture_loop()
|
||||
except KeyboardInterrupt:
|
||||
self.logger.info(f'Data capture interrupted. Captured {self.total_bytes} bytes.')
|
||||
if self.log_file:
|
||||
self.logger.info(f'Data saved to: {self.log_file}')
|
||||
finally:
|
||||
self.stop()
|
||||
|
||||
return self
|
||||
|
||||
def _open_serial_and_log(self) -> None:
|
||||
"""Open serial port and log file (common for both modes)."""
|
||||
try:
|
||||
self.ser = serial.Serial(
|
||||
self.port,
|
||||
self.baudrate,
|
||||
timeout=0.1, # Short timeout for non-blocking reads
|
||||
write_timeout=1,
|
||||
inter_byte_timeout=None, # No timeout between bytes
|
||||
)
|
||||
self.logger.info(f'Opened serial port: {self.port} at {self.baudrate} baud')
|
||||
except serial.SerialException as e:
|
||||
self.logger.error(f'Failed to open serial port {self.port}: {e}')
|
||||
raise
|
||||
|
||||
if self.log_file and self.log_level > 0:
|
||||
try:
|
||||
self.file_desc = open(self.log_file, 'w', encoding='ascii', buffering=8192)
|
||||
self.logger.info(f'Opened log file: {self.log_file} (log level: {self.log_level})')
|
||||
except OSError as e:
|
||||
self.logger.warning(f'Failed to open log file {self.log_file}: {e}')
|
||||
|
||||
def wait_for_fstop(self, timeout: float = 10.0) -> bool:
|
||||
"""Wait for an FSTOP command to be received.
|
||||
|
||||
Args:
|
||||
timeout: Maximum time to wait in seconds
|
||||
|
||||
Returns:
|
||||
True if FSTOP was received, False if timeout
|
||||
"""
|
||||
result = self.fstop_event.wait(timeout=timeout)
|
||||
if result:
|
||||
self.fstop_event.clear() # Clear after successful wait
|
||||
return result
|
||||
|
||||
def get_fstop_count(self) -> int:
|
||||
"""Get the number of FSTOP commands received so far."""
|
||||
return self.fstop_count
|
||||
|
||||
def stop(self) -> None:
|
||||
"""Stop capture thread and close serial port."""
|
||||
if self.stop_event.is_set():
|
||||
return
|
||||
|
||||
self.stop_event.set()
|
||||
|
||||
# Close serial port first to unblock any blocking read in the capture thread
|
||||
if self.ser is not None and self.ser.is_open:
|
||||
try:
|
||||
self.ser.close()
|
||||
self.logger.info('Closed serial port')
|
||||
except OSError:
|
||||
pass
|
||||
self.ser = None
|
||||
|
||||
# Now wait for the thread to finish (should exit quickly after serial close)
|
||||
if self.capture_thread is not None and self.capture_thread.is_alive():
|
||||
self.logger.info('Waiting for capture thread to stop...')
|
||||
self.capture_thread.join(timeout=2.0)
|
||||
if self.capture_thread.is_alive():
|
||||
self.logger.warning('Capture thread did not stop within timeout')
|
||||
|
||||
# Close log file
|
||||
if self.file_desc is not None:
|
||||
try:
|
||||
self.file_desc.flush()
|
||||
self.file_desc.close()
|
||||
self.logger.info('Closed log file')
|
||||
except OSError:
|
||||
pass
|
||||
self.file_desc = None
|
||||
|
||||
def _capture_loop(self) -> None:
|
||||
"""Main capture loop (runs in either foreground or background)."""
|
||||
if self.ser is None:
|
||||
return
|
||||
|
||||
self.logger.info('Capture loop started')
|
||||
try:
|
||||
while not self.stop_event.is_set():
|
||||
# Check if serial port was closed (e.g., by stop())
|
||||
if self.ser is None or not self.ser.is_open:
|
||||
break
|
||||
|
||||
# Check for parser timeout (in case of incomplete frames)
|
||||
incomplete_frame = self.parser.check_timeout()
|
||||
if incomplete_frame is not None:
|
||||
hex_bytes = incomplete_frame.hex(' ').upper() if incomplete_frame else '(empty)'
|
||||
self.logger.warning(f'Parser timeout - incomplete frame discarded: {hex_bytes}')
|
||||
|
||||
if self.ser.in_waiting > 0:
|
||||
chunk = self.ser.read(self.ser.in_waiting)
|
||||
frames = self.parser.feed(chunk) # Returns List[Tuple[bytes, bytes]]
|
||||
|
||||
for frame, payload in frames:
|
||||
summary, resp = _parse_command(payload, self.proc)
|
||||
|
||||
# Check if this is an FSTOP command
|
||||
if payload and payload[0] == 0x6:
|
||||
self.fstop_count += 1
|
||||
self.fstop_event.set() # Signal that FSTOP was received
|
||||
self.fstop_event.clear() # Clear for next FSTOP
|
||||
|
||||
if resp and self.ser is not None and self.ser.is_open:
|
||||
self.ser.write(resp)
|
||||
self.ser.flush()
|
||||
|
||||
self.total_bytes += len(frame)
|
||||
|
||||
console_resp = _log_frame_to_file(self.file_desc, self.log_level, frame, resp, summary)
|
||||
|
||||
if self.log_level > 1:
|
||||
self.logger.info(f'Captured {self.total_bytes} bytes | {summary}{console_resp}')
|
||||
else:
|
||||
time.sleep(0.001)
|
||||
|
||||
except serial.SerialException:
|
||||
# Expected when serial port is closed during stop()
|
||||
pass
|
||||
except Exception as e:
|
||||
self.logger.error(f'Unexpected error in capture loop: {e}')
|
||||
finally:
|
||||
self.logger.info('Capture loop ended')
|
||||
|
||||
|
||||
def main() -> None:
|
||||
logging.basicConfig(
|
||||
level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s', datefmt='%Y-%m-%d %H:%M:%S'
|
||||
)
|
||||
|
||||
args = parse_args()
|
||||
|
||||
if args.log_level > 0 and not args.output:
|
||||
print('Error: -o/--output is required when log-level > 0')
|
||||
sys.exit(1)
|
||||
|
||||
print(f'Opening serial port: {args.port} at {args.baudrate} baud')
|
||||
if args.output:
|
||||
print(f'Log file: {args.output} (log level: {args.log_level})')
|
||||
else:
|
||||
print('Log level 0: No file logging')
|
||||
|
||||
try:
|
||||
capture = UartGcovCapture(
|
||||
port=args.port, baudrate=args.baudrate, log_file=args.output, log_level=args.log_level
|
||||
)
|
||||
capture.run(background=False)
|
||||
except Exception as e:
|
||||
print(f'Unexpected error: {e}')
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
def get_coverage_data(binary_path: str, expected_gcda_paths: list) -> dict:
|
||||
"""Extract coverage data from .gcda files using gcov tool.
|
||||
|
||||
Returns a dictionary with function execution counts for verification.
|
||||
If gcov tool is not available, returns empty dict after verifying files exist.
|
||||
"""
|
||||
desc_path = os.path.join(binary_path, 'project_description.json')
|
||||
with open(desc_path, encoding='utf-8') as f:
|
||||
project_desc = json.load(f)
|
||||
|
||||
toolchain_prefix = project_desc.get('monitor_toolprefix', '')
|
||||
gcov_tool = toolchain_prefix + 'gcov'
|
||||
|
||||
try:
|
||||
subprocess.run([gcov_tool, '--version'], capture_output=True, check=True, timeout=3)
|
||||
except (subprocess.SubprocessError, FileNotFoundError):
|
||||
print(f'gcov tool is not available ({gcov_tool}), skipping detailed coverage data processing')
|
||||
for gcda_path in expected_gcda_paths:
|
||||
if os.path.isfile(gcda_path):
|
||||
print(f'✓ .gcda file exists: {os.path.basename(gcda_path)}')
|
||||
return {}
|
||||
|
||||
coverage_data = {}
|
||||
|
||||
for gcda_path in expected_gcda_paths:
|
||||
if not os.path.isfile(gcda_path):
|
||||
continue
|
||||
|
||||
gcda_dir = os.path.dirname(gcda_path)
|
||||
|
||||
try:
|
||||
subprocess.run(
|
||||
[gcov_tool, '-j', os.path.basename(gcda_path)],
|
||||
cwd=gcda_dir,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
check=True,
|
||||
timeout=3,
|
||||
)
|
||||
|
||||
basename = os.path.basename(gcda_path).replace('.gcda', '')
|
||||
json_gz_file = os.path.join(gcda_dir, f'{basename}.gcov.json.gz')
|
||||
|
||||
if os.path.isfile(json_gz_file):
|
||||
with gzip.open(json_gz_file, 'rt', encoding='utf-8') as f:
|
||||
gcov_data = json.load(f)
|
||||
|
||||
for file_data in gcov_data.get('files', []):
|
||||
source_file = os.path.basename(file_data.get('file', ''))
|
||||
for function in file_data.get('functions', []):
|
||||
func_name = function.get('name', '')
|
||||
exec_count = function.get('execution_count', 0)
|
||||
coverage_data[f'{source_file}:{func_name}'] = exec_count
|
||||
|
||||
os.remove(json_gz_file)
|
||||
|
||||
except (subprocess.SubprocessError, FileNotFoundError, json.JSONDecodeError) as e:
|
||||
print(f'Warning: Could not extract coverage from {gcda_path}: {e}')
|
||||
|
||||
return coverage_data
|
||||
|
||||
|
||||
def gcov_capture_from_uart(
|
||||
port: str, baudrate: int = 115200, log_file: str | None = None, log_level: int = 1, timeout: float = 10.0
|
||||
) -> bool:
|
||||
"""Convenience function to capture a single GCOV dump from UART and wait for FSTOP.
|
||||
|
||||
This is a simple helper for pytest scripts that need to capture GCOV data via UART.
|
||||
It starts capture, waits for one FSTOP command, then stops.
|
||||
|
||||
Args:
|
||||
port: Serial port path (e.g., '/dev/ttyUSB0')
|
||||
baudrate: Serial baud rate (default: 115200)
|
||||
log_file: Optional path to log file for REQ/RES/SUMMARY output
|
||||
log_level: Log level: 0=nothing, 1=REQ+RES+SUMMARY, 2=1+CONSOLE (default: 1)
|
||||
timeout: Maximum time to wait for FSTOP in seconds (default: 10.0)
|
||||
|
||||
Returns:
|
||||
True if FSTOP was received, False if timeout
|
||||
|
||||
Example usage in pytest:
|
||||
>>> from gcov_capture import gcov_capture_from_uart
|
||||
>>> # Close DUT console
|
||||
>>> dut.serial.close()
|
||||
>>> # Trigger gcov dump on target via esp_gcov_dump()
|
||||
>>> # Wait for dump to complete
|
||||
>>> assert gcov_capture_from_uart(
|
||||
... port=dut.serial.port, baudrate=115200, log_file='gcov_dump.log', timeout=30.0
|
||||
... ), 'GCOV dump timeout'
|
||||
>>> # Check .gcda files were created
|
||||
"""
|
||||
with UartGcovCapture(port=port, baudrate=baudrate, log_file=log_file, log_level=log_level) as capture:
|
||||
capture.run(background=True)
|
||||
time.sleep(0.1) # Give capture a moment to start
|
||||
return capture.wait_for_fstop(timeout=timeout)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
Reference in New Issue
Block a user