From a485d7f8a6ae9cf56af639050dd35783a395e93e Mon Sep 17 00:00:00 2001 From: wanckl Date: Fri, 14 Nov 2025 15:25:41 +0800 Subject: [PATCH] fix(driver_twai): enhance ci test and fix example --- .../test_apps/legacy_twai/pytest_twai.py | 73 +- .../test_apps/test_twai/pytest_driver_twai.py | 70 +- .../esp_hal_gpio/include/hal/gpio_types.h | 4 +- .../twai/twai_utils/main/cmd_twai_core.c | 16 +- .../twai/twai_utils/main/cmd_twai_dump.c | 1 + .../twai/twai_utils/main/twai_utils_parser.c | 20 +- .../twai/twai_utils/pytest_twai_utils.py | 1096 ++++++++--------- .../twai/twai_utils/sdkconfig.defaults | 1 + 8 files changed, 631 insertions(+), 650 deletions(-) create mode 100644 examples/peripherals/twai/twai_utils/sdkconfig.defaults diff --git a/components/driver/test_apps/legacy_twai/pytest_twai.py b/components/driver/test_apps/legacy_twai/pytest_twai.py index ae59da9786..f8d4b70a7a 100644 --- a/components/driver/test_apps/legacy_twai/pytest_twai.py +++ b/components/driver/test_apps/legacy_twai/pytest_twai.py @@ -2,7 +2,7 @@ # SPDX-License-Identifier: CC0-1.0 import logging import subprocess -from time import sleep +import time import pytest from can import Bus @@ -10,6 +10,10 @@ from can import Message from pytest_embedded import Dut from pytest_embedded_idf.utils import idf_parametrize +# --------------------------------------------------------------------------- +# Loop Back Tests +# --------------------------------------------------------------------------- + @pytest.mark.generic @pytest.mark.parametrize( @@ -26,16 +30,57 @@ def test_legacy_twai_self(dut: Dut) -> None: dut.run_all_single_board_cases(group='twai-loop-back') +# --------------------------------------------------------------------------- +# Helper Functions +# --------------------------------------------------------------------------- + + +def esp_enter_flash_mode(dut: Dut) -> None: + ser = dut.serial.proc + ser.setRTS(True) # EN Low + time.sleep(0.5) + ser.setDTR(True) # GPIO0 Low + ser.setRTS(False) # EN High + dut.expect('waiting for download', timeout=2) + ser.setDTR(False) # Back RTS/DTR to 1/1 to avoid affect to esptool + + +def esp_reset_and_wait_ready(dut: Dut) -> None: + dut.serial.hard_reset() + time.sleep(0.5) + dut.expect_exact('Press ENTER to see the list of tests') + + @pytest.fixture(name='socket_can') def fixture_create_socket_can() -> Bus: # Set up the socket CAN with the bitrate - start_command = 'sudo ip link set can0 up type can bitrate 250000 restart-ms 100' - stop_command = 'sudo ip link set can0 down' - subprocess.run(start_command, shell=True, capture_output=True, text=True) - bus = Bus(interface='socketcan', channel='can0', bitrate=250000) - yield bus # test invoked here - bus.shutdown() - subprocess.run(stop_command, shell=True, capture_output=True, text=True) + start_command = 'sudo -n ip link set can0 up type can bitrate 250000 restart-ms 100' + stop_command = 'sudo -n ip link set can0 down' + status_command = 'sudo -n ip -details link show can0' + + try: + result = subprocess.run(status_command, shell=True, capture_output=True, text=True) + if result.returncode != 0: + raise Exception('CAN interface "can0" not found') + + if 'UP' in result.stdout: # Close the bus anyway if it is already up + subprocess.run(stop_command, shell=True, capture_output=True, text=True) + subprocess.run(start_command, shell=True, capture_output=True, text=True) + + time.sleep(0.5) + bus = Bus(interface='socketcan', channel='can0', bitrate=250000) + yield bus # test invoked here + + bus.shutdown() + except Exception as e: + pytest.skip(f'Open usb-can bus Error: {str(e)}') + finally: + subprocess.run(stop_command, shell=True, capture_output=True, text=True) + + +# --------------------------------------------------------------------------- +# Interactive Tests +# --------------------------------------------------------------------------- @pytest.mark.twai_std @@ -50,14 +95,13 @@ def fixture_create_socket_can() -> Bus: 'target', ['esp32', 'esp32c3', 'esp32c6', 'esp32h2', 'esp32s2', 'esp32s3', 'esp32p4'], indirect=['target'] ) def test_legacy_twai_listen_only(dut: Dut, socket_can: Bus) -> None: - dut.serial.hard_reset() - dut.expect_exact('Press ENTER to see the list of tests') + esp_reset_and_wait_ready(dut) # TEST_CASE("twai_listen_only", "[twai]") dut.write('"twai_listen_only"') - # wait the DUT to block at the receive API - sleep(0.03) + # wait the DUT to start listening + time.sleep(0.1) message = Message( arbitration_id=0x123, @@ -66,6 +110,7 @@ def test_legacy_twai_listen_only(dut: Dut, socket_can: Bus) -> None: ) socket_can.send(message, timeout=0.2) dut.expect_unity_test_output() + esp_enter_flash_mode(dut) @pytest.mark.twai_std @@ -80,8 +125,7 @@ def test_legacy_twai_listen_only(dut: Dut, socket_can: Bus) -> None: 'target', ['esp32', 'esp32c3', 'esp32c6', 'esp32h2', 'esp32s2', 'esp32s3', 'esp32p4'], indirect=['target'] ) def test_legacy_twai_remote_request(dut: Dut, socket_can: Bus) -> None: - dut.serial.hard_reset() - dut.expect_exact('Press ENTER to see the list of tests') + esp_reset_and_wait_ready(dut) # TEST_CASE("twai_remote_request", "[twai]") dut.write('"twai_remote_request"') @@ -103,3 +147,4 @@ def test_legacy_twai_remote_request(dut: Dut, socket_can: Bus) -> None: print('send', reply) dut.expect_unity_test_output() + esp_enter_flash_mode(dut) diff --git a/components/esp_driver_twai/test_apps/test_twai/pytest_driver_twai.py b/components/esp_driver_twai/test_apps/test_twai/pytest_driver_twai.py index 81dc7af80a..da8f6a814e 100644 --- a/components/esp_driver_twai/test_apps/test_twai/pytest_driver_twai.py +++ b/components/esp_driver_twai/test_apps/test_twai/pytest_driver_twai.py @@ -2,7 +2,7 @@ # SPDX-License-Identifier: Apache-2.0 import subprocess -from time import sleep +import time import pytest from can import Bus @@ -12,6 +12,9 @@ from pytest_embedded_idf.utils import idf_parametrize from pytest_embedded_idf.utils import soc_filtered_targets +# --------------------------------------------------------------------------- +# Loop Back Tests +# --------------------------------------------------------------------------- @pytest.mark.generic @pytest.mark.parametrize('config', ['release', 'cache_safe'], indirect=True) @idf_parametrize('target', soc_filtered_targets('SOC_TWAI_SUPPORTED == 1'), indirect=['target']) @@ -19,43 +22,77 @@ def test_driver_twai_loopbk(dut: Dut) -> None: dut.run_all_single_board_cases(group='twai', reset=True) -# -------------------------------- test twai interactive ------------------------------ +# --------------------------------------------------------------------------- +# Helper Functions +# --------------------------------------------------------------------------- + + +def esp_enter_flash_mode(dut: Dut) -> None: + ser = dut.serial.proc + ser.setRTS(True) # EN Low + time.sleep(0.5) + ser.setDTR(True) # GPIO0 Low + ser.setRTS(False) # EN High + dut.expect('waiting for download', timeout=2) + ser.setDTR(False) # Back RTS/DTR to 1/1 to avoid affect to esptool + + +def esp_reset_and_wait_ready(dut: Dut) -> None: + dut.serial.hard_reset() + time.sleep(0.5) + dut.expect_exact('Press ENTER to see the list of tests') + + @pytest.fixture(name='socket_can') def fixture_create_socket_can() -> Bus: # Set up the socket CAN with the bitrate - start_command = 'sudo ip link set can0 up type can bitrate 250000' - stop_command = 'sudo ip link set can0 down' + start_command = 'sudo -n ip link set can0 up type can bitrate 250000 restart-ms 100' + stop_command = 'sudo -n ip link set can0 down' + status_command = 'sudo -n ip -details link show can0' + try: + result = subprocess.run(status_command, shell=True, capture_output=True, text=True) + if result.returncode != 0: + raise Exception('CAN interface "can0" not found') + + if 'UP' in result.stdout: # Close the bus anyway if it is already up + subprocess.run(stop_command, shell=True, capture_output=True, text=True) subprocess.run(start_command, shell=True, capture_output=True, text=True) + + time.sleep(0.5) + bus = Bus(interface='socketcan', channel='can0', bitrate=250000) + yield bus # test invoked here + + bus.shutdown() except Exception as e: - print(f'Open bus Error: {e}') - bus = Bus(interface='socketcan', channel='can0', bitrate=250000) - yield bus # test invoked here - bus.shutdown() - subprocess.run(stop_command, shell=True, capture_output=True, text=True) + pytest.skip(f'Open usb-can bus Error: {str(e)}') + finally: + subprocess.run(stop_command, shell=True, capture_output=True, text=True) +# --------------------------------------------------------------------------- +# Interactive Tests +# --------------------------------------------------------------------------- @pytest.mark.twai_std @pytest.mark.temp_skip_ci(targets=['esp32h4'], reason='no runner') @pytest.mark.parametrize('config', ['release'], indirect=True) @idf_parametrize('target', soc_filtered_targets('SOC_TWAI_SUPPORTED == 1'), indirect=['target']) def test_driver_twai_listen_only(dut: Dut, socket_can: Bus) -> None: - dut.serial.hard_reset() - dut.expect_exact('Press ENTER to see the list of tests') + esp_reset_and_wait_ready(dut) dut.write('"twai_listen_only"') # wait the DUT to finish initialize - sleep(0.1) + time.sleep(0.1) message = Message( arbitration_id=0x6688, is_extended_id=True, data=[0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77, 0x88], ) - print('USB Socket CAN Send:', message) - socket_can.send(message, timeout=0.2) + print('USB Socket CAN Send:', message, 'Return:', socket_can.send(message)) dut.expect_unity_test_output(timeout=10) + esp_enter_flash_mode(dut) @pytest.mark.twai_std @@ -63,8 +100,7 @@ def test_driver_twai_listen_only(dut: Dut, socket_can: Bus) -> None: @pytest.mark.parametrize('config', ['release'], indirect=True) @idf_parametrize('target', soc_filtered_targets('SOC_TWAI_SUPPORTED == 1'), indirect=['target']) def test_driver_twai_remote_request(dut: Dut, socket_can: Bus) -> None: - dut.serial.hard_reset() - dut.expect_exact('Press ENTER to see the list of tests') + esp_reset_and_wait_ready(dut) dut.write('"twai_remote_request"') @@ -82,4 +118,6 @@ def test_driver_twai_remote_request(dut: Dut, socket_can: Bus) -> None: ) socket_can.send(reply, timeout=0.2) print('USB Socket CAN Replied:', reply) + dut.expect_unity_test_output(timeout=10) + esp_enter_flash_mode(dut) diff --git a/components/esp_hal_gpio/include/hal/gpio_types.h b/components/esp_hal_gpio/include/hal/gpio_types.h index d60bb6e4e3..164904b1cc 100644 --- a/components/esp_hal_gpio/include/hal/gpio_types.h +++ b/components/esp_hal_gpio/include/hal/gpio_types.h @@ -18,10 +18,10 @@ extern "C" { #define GPIO_PIN_COUNT (SOC_GPIO_PIN_COUNT) /// Check whether it is a valid GPIO number -#define GPIO_IS_VALID_GPIO(gpio_num) ((gpio_num >= 0) && \ +#define GPIO_IS_VALID_GPIO(gpio_num) ((gpio_num >= 0) && (gpio_num <= (SOC_GPIO_PIN_COUNT - 1)) && \ (((1ULL << (gpio_num)) & SOC_GPIO_VALID_GPIO_MASK) != 0)) /// Check whether it can be a valid GPIO number of output mode -#define GPIO_IS_VALID_OUTPUT_GPIO(gpio_num) ((gpio_num >= 0) && \ +#define GPIO_IS_VALID_OUTPUT_GPIO(gpio_num) ((gpio_num >= 0) && (gpio_num <= SOC_GPIO_OUT_RANGE_MAX) && \ (((1ULL << (gpio_num)) & SOC_GPIO_VALID_OUTPUT_GPIO_MASK) != 0)) /// Check whether it can be a valid digital I/O pad #define GPIO_IS_VALID_DIGITAL_IO_PAD(gpio_num) ((gpio_num >= 0) && \ diff --git a/examples/peripherals/twai/twai_utils/main/cmd_twai_core.c b/examples/peripherals/twai/twai_utils/main/cmd_twai_core.c index dbfdddfea2..dcd1c9f309 100644 --- a/examples/peripherals/twai/twai_utils/main/cmd_twai_core.c +++ b/examples/peripherals/twai/twai_utils/main/cmd_twai_core.c @@ -246,12 +246,10 @@ static int twai_init_handler(int argc, char **argv) /* Configure optional clock output GPIO */ if (twai_init_args.clk_out_gpio->count > 0) { clk_gpio = twai_init_args.clk_out_gpio->ival[0]; - if (clk_gpio >= 0) { - ret = GPIO_IS_VALID_OUTPUT_GPIO(clk_gpio) ? ESP_OK : ESP_ERR_INVALID_ARG; - ESP_GOTO_ON_ERROR(ret, err, TAG, "Invalid CLK out GPIO: %d", clk_gpio); - ctx->driver_config.io_cfg.quanta_clk_out = clk_gpio; - ESP_LOGI(TAG, "Clock output GPIO set to %d", clk_gpio); - } + ret = GPIO_IS_VALID_OUTPUT_GPIO(clk_gpio) ? ESP_OK : ESP_ERR_INVALID_ARG; + ESP_GOTO_ON_ERROR(ret, err, TAG, "Invalid CLK out GPIO: %d", clk_gpio); + ctx->driver_config.io_cfg.quanta_clk_out = clk_gpio; + ESP_LOGI(TAG, "Clock output GPIO set to %d", clk_gpio); } else { ctx->driver_config.io_cfg.quanta_clk_out = -1; ESP_LOGI(TAG, "Clock output disabled"); @@ -605,12 +603,6 @@ void register_twai_core_commands(void) .data_timing = { #if CONFIG_EXAMPLE_ENABLE_TWAI_FD .bitrate = CONFIG_EXAMPLE_DEFAULT_FD_BITRATE, - .sp_permill = 0, - .ssp_permill = 700, -#else - .bitrate = 0, - .sp_permill = 0, - .ssp_permill = 0, #endif }, .fail_retry_cnt = -1, diff --git a/examples/peripherals/twai/twai_utils/main/cmd_twai_dump.c b/examples/peripherals/twai/twai_utils/main/cmd_twai_dump.c index 0b14f883cd..ea7dc915fd 100644 --- a/examples/peripherals/twai/twai_utils/main/cmd_twai_dump.c +++ b/examples/peripherals/twai/twai_utils/main/cmd_twai_dump.c @@ -206,6 +206,7 @@ static void dump_task(void *parameter) while (atomic_load(&dump_ctx->is_running)) { rx_queue_item_t item; if (xQueueReceive(dump_ctx->rx_queue, &item, pdMS_TO_TICKS(CONFIG_EXAMPLE_DUMP_TASK_TIMEOUT_MS)) == pdPASS) { + item.frame.buffer = item.buffer; // point to the new buffer format_twaidump_frame(dump_ctx->timestamp_mode, &item.frame, item.timestamp_us, dump_ctx->start_time_us, &dump_ctx->last_frame_time_us, diff --git a/examples/peripherals/twai/twai_utils/main/twai_utils_parser.c b/examples/peripherals/twai/twai_utils/main/twai_utils_parser.c index 6747255de6..139b53d522 100644 --- a/examples/peripherals/twai/twai_utils/main/twai_utils_parser.c +++ b/examples/peripherals/twai/twai_utils/main/twai_utils_parser.c @@ -188,24 +188,23 @@ int parse_classic_frame(const char *body, twai_frame_t *f) /* Handle data frame */ f->header.rtr = false; // Ensure RTR flag is cleared. + if (((strlen(body) + 1) / 2) > TWAI_FRAME_MAX_LEN) { + return PARSE_OUT_OF_RANGE; + } + int dl = parse_payload(body, f->buffer, TWAI_FRAME_MAX_LEN); if (dl < 0) { return dl; } + f->header.dlc = (uint8_t)dl; + f->buffer_len = dl; /* Check for optional _dlc suffix */ const char *underscore = strchr(body, '_'); if (underscore && underscore[1] != '\0') { - uint8_t dlc = (uint8_t)strtoul(underscore + 1, NULL, 16); - if (dlc <= TWAI_FRAME_MAX_LEN) { - f->header.dlc = dlc; - } else { - f->header.dlc = TWAI_FRAME_MAX_LEN; - } - } else { - f->header.dlc = (uint8_t)dl; + uint8_t arg_dlc = (uint8_t)strtoul(underscore + 1, NULL, 16); + f->header.dlc = (TWAI_FRAME_MAX_LEN < arg_dlc) ? TWAI_FRAME_MAX_LEN : arg_dlc; } - f->buffer_len = dl; return PARSE_OK; } @@ -320,11 +319,10 @@ void format_twaidump_frame(timestamp_mode_t timestamp_mode, const twai_frame_t * pos += snprintf(output_line + pos, max_len - pos, "[R%d]", frame->header.dlc); } else { /* Data frame: add DLC and data bytes with spaces */ - printf("frame->header.dlc: %d\n", frame->header.dlc); int actual_len = twaifd_dlc2len(frame->header.dlc); pos += snprintf(output_line + pos, max_len - pos, "[%d]", actual_len); for (int i = 0; i < actual_len && i < frame->buffer_len && pos < max_len - 4; i++) { - pos += snprintf(output_line + pos, max_len - pos, " %02X", frame->buffer[i]); + pos += snprintf(output_line + pos, max_len - pos, " %02X", frame->buffer[i]); } } diff --git a/examples/peripherals/twai/twai_utils/pytest_twai_utils.py b/examples/peripherals/twai/twai_utils/pytest_twai_utils.py index 8168a55cfa..e4b9dd48f6 100644 --- a/examples/peripherals/twai/twai_utils/pytest_twai_utils.py +++ b/examples/peripherals/twai/twai_utils/pytest_twai_utils.py @@ -1,8 +1,6 @@ # SPDX-FileCopyrightText: 2025 Espressif Systems (Shanghai) CO LTD # SPDX-License-Identifier: Apache-2.0 -import logging -import re import subprocess import time from collections.abc import Generator @@ -20,10 +18,16 @@ from pytest_embedded_idf.utils import soc_filtered_targets # Constants / Helpers # --------------------------------------------------------------------------- -PROMPTS = ['esp>', 'twai>', '>'] +PROMPTS = ['twai>'] + +# Hardware configuration +DEFAULT_BITRATE = 500000 +DEFAULT_TX_GPIO = 4 +DEFAULT_RX_GPIO = 5 +NO_TRANSCEIVER_GPIO = 4 -def _ctrl(controller_id: int) -> str: +def _ctrler_name(controller_id: int) -> str: return f'twai{controller_id}' @@ -33,60 +37,464 @@ def _id_pattern(controller_str: str, can_id: int) -> str: return rf'{controller_str}\s+{hex_part}\s+\[' -class TestConfig: - """Test configuration""" +def esp_enter_flash_mode(dut: Dut) -> None: + ser = dut.serial.proc + ser.setRTS(True) # EN Low + time.sleep(0.5) + ser.setDTR(True) # GPIO0 Low + ser.setRTS(False) # EN High + dut.expect('waiting for download', timeout=2) + ser.setDTR(False) # Back RTS/DTR to 1/1 to avoid affect to esptool - # Hardware configuration - DEFAULT_BITRATE = 500000 - BITRATES = [125000, 250000, 500000, 1000000] - DEFAULT_TX_GPIO = 4 - DEFAULT_RX_GPIO = 5 - NO_TRANSCEIVER_GPIO = 4 - # Test frame data - BASIC_FRAMES = [ - ('123#', 'Empty data'), - ('124#AA', '1 byte'), - ('125#DEADBEEF', '4 bytes'), - ('126#DEADBEEFCAFEBABE', '8 bytes'), - ] +# --------------------------------------------------------------------------- +# TWAI helper +# --------------------------------------------------------------------------- - EXTENDED_FRAMES = [ - ('12345678#ABCD', 'Extended frame'), - ('1FFFFFFF#AA55BB66', 'Max extended ID'), - ] - RTR_FRAMES = [ - ('123#R', 'RTR default'), - ('124#R8', 'RTR 8 bytes'), - ] +class TwaiTestHelper: + """TWAI test helper built on small, reusable atomic operations.""" - # FD frames (if FD is supported) - FD_FRAMES = [ - ('123##0AABBCC', 'FD frame without BRS'), - ('456##1DEADBEEF', 'FD frame with BRS'), - ('789##2CAFEBABE', 'FD frame with ESI'), - ('ABC##3112233', 'FD frame with BRS+ESI'), - ] + def __init__(self, dut: Dut) -> None: + self.dut = dut + self.timeout = 5 + self._wait_ready() - # Boundary ID tests - BOUNDARY_ID_FRAMES = [ - ('7FF#AA', 'Max standard ID'), - ('800#BB', 'Min extended ID (in extended format: 00000800)'), - ('000#CC', 'Min ID'), - ] + # ------------------------- atomic I/O ops ------------------------- + def _wait_ready(self) -> None: + try: + self.dut.expect_exact(PROMPTS, timeout=10) + except (pexpect.exceptions.TIMEOUT, pexpect.exceptions.EOF): + self.sendline('help') + self.expect(['Commands:'], timeout=5) - INVALID_FRAMES = [ - ('G123#DEAD', 'Invalid ID character'), - ('123#GG', 'Invalid data character'), - ('123', 'Missing separator'), - ('123#DEADBEEFCAFEBABEAA', 'Too much data'), - ('123###DEAD', 'Too many separators'), - ('123##', 'FD frame without data or flags'), - ] + def sendline(self, cmd: str) -> None: + self.dut.write(f'\n{cmd}\n') - # Filter tests (includes both basic and extended frame filtering) - FILTER_TESTS = [ + def expect(self, patterns: list[str] | str, timeout: float | None = None) -> bool: + timeout = timeout or self.timeout + try: + self.dut.expect(patterns, timeout=timeout) + return True + except (pexpect.exceptions.TIMEOUT, pexpect.exceptions.EOF): + return False + + def run(self, cmd: str, expect: list[str] | str | None = None, timeout: float | None = None) -> bool: + self.sendline(cmd) + if self.expect([r'Command returned non-zero error code:', r'ERROR', r'Failed', r'Invalid'], timeout=0.5): + return False + return self.expect(expect or PROMPTS, timeout) + + # ------------------------- command builders ------------------------- + def build_init_cmd_str( + self, + *, + controller_id: int = 0, + tx_gpio: int | None = None, + rx_gpio: int | None = None, + bitrate: int | None = None, + clk_out_gpio: int | None = None, + bus_off_gpio: int | None = None, + fd_bitrate: int | None = None, + loopback: bool = False, + self_test: bool = False, + listen: bool = False, + ) -> str: + parts = [f'twai_init {_ctrler_name(controller_id)}'] + # GPIO and bitrate options (order preserved: tx, rx, bitrate, fd_bitrate, clk_out, bus_off) + for value, flag in [ + (tx_gpio, '-t'), + (rx_gpio, '-r'), + (bitrate, '-b'), + (fd_bitrate, '-B'), + (clk_out_gpio, '-c'), + (bus_off_gpio, '-o'), + ]: + if value is not None: + parts.append(f'{flag} {value}') + # Boolean flags (order preserved: loopback, self_test, listen) + for value, flag in [(loopback, '--loopback'), (self_test, '--self-test'), (listen, '--listen')]: + if value: + parts.append(flag) + return ' '.join(parts) + + # ------------------------- high-level ops ------------------------- + def init(self, controller_id: int = 0, **kwargs: Any) -> bool: + return self.run(self.build_init_cmd_str(controller_id=controller_id, **kwargs)) + + def deinit(self, controller_id: int = 0) -> bool: + return self.run(f'twai_deinit {_ctrler_name(controller_id)}') + + def dump_start(self, controller_id: int = 0, dump_filter: str | None = None) -> bool: + cmd = f'twai_dump {_ctrler_name(controller_id)}' + if dump_filter: + cmd += f',{dump_filter}' + return self.run(cmd) + + def dump_stop(self, controller_id: int = 0) -> tuple[bool, bool]: + """Stop dump and return (stopped_ok, timeout_warning_seen).""" + self.sendline(f'twai_dump {_ctrler_name(controller_id)} --stop') + # If the dump task does not exit naturally, the implementation prints this warning. + warning_seen = self.expect(r'Dump task did not exit naturally, timeout', timeout=2) + # Whether or not warning appears, we should be back to a prompt. + prompt_ok = self.expect(PROMPTS, timeout=2) or True # relax + return prompt_ok, warning_seen + + def send(self, frame_str: str, controller_id: int = 0) -> bool: + return self.run(f'twai_send {_ctrler_name(controller_id)} {frame_str}') + + def info(self, controller_id: int = 0) -> bool: + return self.run( + f'twai_info {_ctrler_name(controller_id)}', + [rf'TWAI{controller_id} Status:', r'Node State:', r'Bitrate:'], + ) + + def recover(self, controller_id: int = 0, timeout_ms: int | None = None) -> bool: + cmd = f'twai_recover {_ctrler_name(controller_id)}' + if timeout_ms is not None: + cmd += f' -t {timeout_ms}' + self.sendline(cmd) + return self.expect(['Recovery not needed', 'node is Error Active', 'ESP_ERR_INVALID_STATE']) # any + + def expect_info_format(self, controller_id: int = 0) -> bool: + self.sendline(f'twai_info {_ctrler_name(controller_id)}') + checks = [ + rf'TWAI{controller_id} Status: \w+', + r'Node State: \w+', + r'Error Counters: TX=\d+, RX=\d+', + r'Bitrate: \d+ bps', + ] + return all(self.expect(p, timeout=2) for p in checks) + + def invalid_should_fail(self, cmd: str, timeout: float = 2.0) -> bool: + # Wait for prompt to clear any previous error logs from buffer + self.expect(PROMPTS, timeout=1.0) + self.sendline(cmd) + return self.expect([r'Command returned non-zero error code:', r'ERROR', r'Failed', r'Invalid'], timeout=timeout) + + def test_with_patterns(self, cmd: str, patterns: list[str], timeout: float = 3.0) -> bool: + self.sendline(cmd) + return all(self.expect(p, timeout=timeout) for p in patterns) + + def send_and_expect_in_dump( + self, + frame_str: str, + frame_id: int, + controller_id: int = 0, + timeout: float = 3.0, + ) -> bool: + ctrl = _ctrler_name(controller_id) + self.sendline(f'twai_send {ctrl} {frame_str}') + return self.expect(_id_pattern(ctrl, frame_id), timeout=timeout) + + # ------------------------- context manager ------------------------- + @contextmanager + def session( + self, + *, + controller_id: int = 0, + mode: str = 'no_transceiver', + start_dump: bool = True, + dump_filter: str | None = None, + **kwargs: Any, + ) -> Generator['TwaiTestHelper', None, None]: + """Manage init/dump lifecycle consistently. + + - mode="no_transceiver": loopback + self_test on a single GPIO. + - mode="standard": caller must provide tx_gpio/rx_gpio (or we use defaults). + """ + # Build effective init args + init_args = dict(kwargs) + init_args['controller_id'] = controller_id + + if mode == 'no_transceiver': + init_args |= dict( + tx_gpio=NO_TRANSCEIVER_GPIO, + rx_gpio=NO_TRANSCEIVER_GPIO, + bitrate=kwargs.get('bitrate', DEFAULT_BITRATE), + loopback=True, + self_test=True, + ) + elif mode == 'standard': + init_args.setdefault('tx_gpio', DEFAULT_TX_GPIO) + init_args.setdefault('rx_gpio', DEFAULT_RX_GPIO) + init_args.setdefault('bitrate', kwargs.get('bitrate', DEFAULT_BITRATE)) + else: + raise ValueError(f'Unknown mode: {mode}') + + self.dut._hard_reset() # Reset the chip to start running the test + self._wait_ready() + if not self.init(**init_args): + raise RuntimeError(f'Failed to initialize TWAI in {mode} mode') + + dump_started = False + dump_timeout_flag = False + try: + if start_dump: + dump_started = self.dump_start(controller_id=controller_id, dump_filter=dump_filter) + yield self + finally: + if dump_started: + _, warning = self.dump_stop(controller_id=controller_id) + dump_timeout_flag = warning + + self.deinit(controller_id=controller_id) + esp_enter_flash_mode(self.dut) # Stop the esp chip to avoid affect test bus + + if dump_timeout_flag: + pytest.fail(f'Dump stop timed out for {_ctrler_name(controller_id)}') + + +# --------------------------------------------------------------------------- +# USB-CAN bus manager (external hardware) +# --------------------------------------------------------------------------- + + +class CanBusManager: + """CAN bus manager for external hardware tests""" + + def __init__(self, interface: str = 'can0'): + self.interface = interface + self.bus: can.Bus | None = None + + @contextmanager + def managed_bus(self, bitrate: int = 500000) -> Generator[can.Bus, None, None]: + try: + result = subprocess.run(['ip', '-details', 'link', 'show', self.interface], capture_output=True, text=True) + if result.returncode != 0: + raise Exception(f'CAN interface {self.interface} not found') + + try: + if 'UP' in result.stdout: + subprocess.run( # Close the bus anyway if it is already up + ['sudo', '-n', 'ip', 'link', 'set', self.interface, 'down'], check=True, capture_output=True + ) + subprocess.run( + ['sudo', '-n', 'ip', 'link', 'set', self.interface, 'up', 'type', 'can', 'bitrate', f'{bitrate}'], + check=True, + capture_output=True, + ) + time.sleep(0.5) + except subprocess.CalledProcessError: + raise Exception(f'Failed to configure CAN interface {self.interface} on bitrate {bitrate}.') + + self.bus = can.Bus(interface='socketcan', channel=self.interface) + yield self.bus + except Exception as e: + pytest.skip(f'CAN interface not available: {str(e)}') + finally: + if self.bus: + self.bus.shutdown() + subprocess.run(['sudo', '-n', 'ip', 'link', 'set', self.interface, 'down'], check=True) + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + + +@pytest.fixture +def twai(dut: Dut) -> TwaiTestHelper: + return TwaiTestHelper(dut) + + +@pytest.fixture +def usb_can() -> CanBusManager: + return CanBusManager() + + +# --------------------------------------------------------------------------- +# CORE TESTS +# --------------------------------------------------------------------------- + + +@pytest.mark.generic +@idf_parametrize('target', soc_filtered_targets('SOC_TWAI_SUPPORTED == 1'), indirect=['target']) +def test_twai_utils_basic_operations(twai: TwaiTestHelper) -> None: + with twai.session(start_dump=False): + # Test basic send operation + assert twai.send('123#DEADBEEF'), 'Basic send operation failed' + + # Test dump filter operations - first start should succeed + assert twai.dump_start(dump_filter='123:7FF'), 'First dump start failed' + + # Second start should be handled gracefully (already running) + twai.dump_start(dump_filter='456:7FF') # Should handle "already running" case + + # Stop should work normally + stopped_ok, warning = twai.dump_stop() + assert stopped_ok, 'Dump stop failed' + + +@pytest.mark.generic +@idf_parametrize('target', soc_filtered_targets('SOC_TWAI_SUPPORTED == 1'), indirect=['target']) +def test_twai_utils_bitrate_configuration(twai: TwaiTestHelper) -> None: + for bitrate in [125000, 250000, 500000, 1000000]: + with twai.session(mode='standard', bitrate=bitrate, tx_gpio=DEFAULT_TX_GPIO, rx_gpio=DEFAULT_RX_GPIO): + assert twai.info(), f'Info failed for bitrate {bitrate}' + + +@pytest.mark.generic +@idf_parametrize('target', soc_filtered_targets('SOC_TWAI_SUPPORTED == 1'), indirect=['target']) +def test_twai_utils_frame_formats(twai: TwaiTestHelper) -> None: + with twai.session(): + BASIC_FRAMES = [ + ('123#', 'Empty data'), + ('124#AA', '1 byte'), + ('125#DEADBEEF', '4 bytes'), + ('126#DEADBEEFCAFEBABE', '8 bytes'), + ] + for frame_str, desc in BASIC_FRAMES: + can_id = int(frame_str.split('#')[0], 16) + assert twai.send_and_expect_in_dump(frame_str, can_id), f'Basic frame failed: {frame_str} ({desc})' + + EXTENDED_FRAMES = [ + ('12345678#ABCD', 'Extended frame'), + ('1FFFFFFF#AA55BB66', 'Max extended ID'), + ] + for frame_str, desc in EXTENDED_FRAMES: + can_id = int(frame_str.split('#')[0], 16) + assert twai.send_and_expect_in_dump(frame_str, can_id), f'Extended frame failed: {frame_str} ({desc})' + + RTR_FRAMES = [ + ('123#R', 'RTR default'), + ('124#R8', 'RTR 8 bytes'), + ] + for frame_str, desc in RTR_FRAMES: + assert twai.send(frame_str), f'RTR frame failed: {frame_str} ({desc})' + + +@pytest.mark.generic +@idf_parametrize('target', soc_filtered_targets('SOC_TWAI_SUPPORTED == 1'), indirect=['target']) +def test_twai_utils_info_and_recovery(twai: TwaiTestHelper) -> None: + with twai.session(): + assert twai.info(), 'Info command failed' + assert twai.expect_info_format(), 'Info format check failed' + + assert twai.test_with_patterns( + f'twai_info {_ctrler_name(0)}', + [ + r'TWAI0 Status: Running', + r'Node State: Error Active', + r'Error Counters: TX=0, RX=0', + ], + ), 'Expected status patterns not found' + + assert twai.recover(), 'Recover status check failed' + assert twai.recover(timeout_ms=1000), 'Recover command with timeout failed' + + +@pytest.mark.generic +@idf_parametrize('target', soc_filtered_targets('SOC_TWAI_SUPPORTED == 1'), indirect=['target']) +def test_twai_utils_input_validation(twai: TwaiTestHelper) -> None: + with twai.session(start_dump=False): + INVALID_FRAMES = [ + ('G123#DEAD', 'Invalid ID character'), + ('123#GG', 'Invalid data character'), + ('123', 'Missing separator'), + ('123#DEADBEEFCAFEBABEAA', 'Too much data'), + ('123###DEAD', 'Too many separators'), + ('123##', 'FD frame without data or flags'), + ] + for frame_str, desc in INVALID_FRAMES: + assert twai.invalid_should_fail(f'twai_send {_ctrler_name(0)} {frame_str}'), ( + f'Invalid frame should be rejected: {frame_str} ({desc})' + ) + + assert twai.invalid_should_fail(f'twai_init {_ctrler_name(0)} -t 4 -r 5'), ( + 'Duplicate initialization should be prevented' + ) + + # deinit esp_twai for following tests + twai.deinit() + + invalid_commands = [ + 'twai_init', # Missing controller ID + f'twai_init {_ctrler_name(0)}', # Missing required GPIO + 'twai_init twai99 -t 4 -r 5', # Invalid controller ID + f'twai_recover {_ctrler_name(0)} -t -5', # Invalid timeout value + f'twai_init {_ctrler_name(0)} -t -1 -r 5', # Negative TX GPIO + f'twai_init {_ctrler_name(0)} -t 99 -r 5', # High GPIO number + f'twai_init {_ctrler_name(0)} -t 4 -r 5 -c -1', # Negative clk_out GPIO + f'twai_init {_ctrler_name(0)} -t 4 -r 5 -b 0', # Zero bitrate + ] + for cmd in invalid_commands: + assert twai.invalid_should_fail(cmd), f'Invalid command should fail: {cmd}' + + uninitialized_ops = [ + f'twai_send {_ctrler_name(0)} 123#DEAD', + f'twai_recover {_ctrler_name(0)}', + f'twai_dump {_ctrler_name(0)}', + ] + for cmd in uninitialized_ops: + assert twai.invalid_should_fail(cmd), f'Non-initialized operation should fail: {cmd}' + + +@pytest.mark.generic +@idf_parametrize('target', soc_filtered_targets('SOC_TWAI_SUPPORTED == 1'), indirect=['target']) +def test_twai_utils_gpio_and_basic_send(twai: TwaiTestHelper) -> None: + with twai.session(): + assert twai.info(), 'GPIO info failed' + assert twai.test_with_patterns( + f'twai_info {_ctrler_name(0)}', + [rf'GPIOs: TX=GPIO{NO_TRANSCEIVER_GPIO}, RX=GPIO{NO_TRANSCEIVER_GPIO}'], + ) + # Basic send test frames + BASIC_SEND_FRAMES = ['123#DEADBEEF', '7FF#AA55', '12345678#CAFEBABE'] + for frame_str in BASIC_SEND_FRAMES: + assert twai.send(frame_str), f'Standard mode send failed: {frame_str}' + + # skip esp32 due to gpio 6/7 not available, esp32c5 due to clk_io not supported + if twai.dut.app.target in ['esp32', 'esp32c5']: + return + + # deinit esp_twai for following tests + twai.deinit() + if twai.init(tx_gpio=4, rx_gpio=5, clk_out_gpio=6, bus_off_gpio=7): + assert twai.info(), 'Optional GPIO info failed' + assert twai.test_with_patterns(f'twai_info {_ctrler_name(0)}', [r'TWAI0 Status:', r'GPIOs: TX=GPIO4']), ( + 'GPIO info format failed' + ) + + +@pytest.mark.generic +@idf_parametrize('target', soc_filtered_targets('SOC_TWAI_SUPPORTED == 1'), indirect=['target']) +def test_twai_utils_send_various_frames(twai: TwaiTestHelper) -> None: + with twai.session(): + # Boundary ID tests + BOUNDARY_ID_FRAMES = [ + ('7FF#AA', 'Max standard ID'), + ('800#BB', 'Min extended ID (in extended format: 00000800)'), + ('000#CC', 'Min ID'), + ] + for frame_str, desc in BOUNDARY_ID_FRAMES: + assert twai.send(frame_str), f'Boundary ID failed: {frame_str} ({desc})' + + # Rapid succession test frames + RAPID_FRAMES = ['123#AA', '124#BB', '125#CC', '126#DD', '127#EE'] + for frame_str in RAPID_FRAMES: + assert twai.send(frame_str), f'Rapid send failed: {frame_str}' + + +@pytest.mark.generic +@idf_parametrize('target', soc_filtered_targets('SOC_TWAI_SUPPORT_FD == 1'), indirect=['target']) +def test_twai_utils_fd_frames(twai: TwaiTestHelper) -> None: + with twai.session(): + FD_FRAMES = [ + ('123##0AABBCC', 'FD frame without BRS'), + ('456##1DEADBEEF', 'FD frame with BRS'), + ('789##2CAFEBABE', 'FD frame with ESI'), + ('ABC##3112233', 'FD frame with BRS+ESI'), + ] + for frame_str, desc in FD_FRAMES: + assert twai.send(frame_str), f'FD frame failed: {frame_str} ({desc})' + + +@pytest.mark.generic +@idf_parametrize('target', soc_filtered_targets('SOC_TWAI_SUPPORTED == 1'), indirect=['target']) +def test_twai_utils_mask_filters(twai: TwaiTestHelper) -> None: + """Test TWAI filtering including automatic extended frame detection.""" + MASK_FILTER_GROUPS = [ # No filter - basic functionality ( '', @@ -121,9 +529,30 @@ class TestConfig: ], ), ] + for filter_str, test_frames in MASK_FILTER_GROUPS: + with twai.session(dump_filter=filter_str): + failed_cases: list[str] = [] + for frame_str, frame_id, should_receive in test_frames: + received = twai.send_and_expect_in_dump(frame_str, frame_id, timeout=1.0) + if received != should_receive: + expected_action = 'receive' if should_receive else 'filter out' + actual_action = 'received' if received else 'filtered out' + failed_cases.append(f'{frame_str}: expected {expected_action}, got {actual_action}') - # Range filter tests - RANGE_FILTER_TESTS = [ + if failed_cases: + pytest.fail( + f'Filter test failed for filter "{filter_str or "no filter"}":\n' + + '\n'.join(failed_cases) + + '\n\nNote: Filters auto-detect extended frames by:' + '\n- String length > 3 chars or ID value > 0x7FF' + ) + + +@pytest.mark.generic +@idf_parametrize('target', soc_filtered_targets('SOC_TWAI_RANGE_FILTER_NUM > 0'), indirect=['target']) +def test_twai_utils_range_filters(twai: TwaiTestHelper) -> None: + """Test TWAI range filters (available on chips with range filter support).""" + RANGE_FILTER_GROUPS = [ # Standard frame range filter ( 'a-15', # Test hex range parsing @@ -146,490 +575,11 @@ class TestConfig: ], ), ] - - # Rapid succession test frames - RAPID_FRAMES = ['123#AA', '124#BB', '125#CC', '126#DD', '127#EE'] - - # Basic send test frames - BASIC_SEND_FRAMES = ['123#DEADBEEF', '7FF#AA55', '12345678#CAFEBABE'] - - -# --------------------------------------------------------------------------- -# TWAI helper (refactored) -# --------------------------------------------------------------------------- - - -class TwaiTestHelper: - """TWAI test helper built on small, reusable atomic operations.""" - - def __init__(self, dut: Dut) -> None: - self.dut = dut - self.timeout = 5 - self._wait_ready() - - # ------------------------- atomic I/O ops ------------------------- - def _wait_ready(self) -> None: - try: - self.dut.expect(PROMPTS, timeout=10) - except (pexpect.exceptions.TIMEOUT, pexpect.exceptions.EOF): - self.sendline('help') - self.expect(['Commands:'], timeout=5) - - def sendline(self, cmd: str) -> None: - self.dut.write(f'\n{cmd}\n') - - def expect(self, patterns: list[str] | str, timeout: float | None = None) -> bool: - timeout = timeout or self.timeout - try: - self.dut.expect(patterns, timeout=timeout) - return True - except (pexpect.exceptions.TIMEOUT, pexpect.exceptions.EOF): - return False - - def run(self, cmd: str, expect: list[str] | str | None = None, timeout: float | None = None) -> bool: - self.sendline(cmd) - return self.expect(expect or PROMPTS, timeout) - - # ------------------------- command builders ------------------------- - def build_init( - self, - *, - controller_id: int = 0, - tx_gpio: int | None = None, - rx_gpio: int | None = None, - bitrate: int | None = None, - clk_out_gpio: int | None = None, - bus_off_gpio: int | None = None, - fd_bitrate: int | None = None, - loopback: bool = False, - self_test: bool = False, - listen: bool = False, - ) -> str: - ctrl = _ctrl(controller_id) - parts = [f'twai_init {ctrl}'] - if tx_gpio is not None: - parts += [f'-t {tx_gpio}'] - if rx_gpio is not None: - parts += [f'-r {rx_gpio}'] - if bitrate is not None: - parts += [f'-b {bitrate}'] - if fd_bitrate is not None: - parts += [f'-B {fd_bitrate}'] - if clk_out_gpio is not None: - parts += [f'-c {clk_out_gpio}'] - if bus_off_gpio is not None: - parts += [f'-o {bus_off_gpio}'] - if loopback: - parts += ['--loopback'] - if self_test: - parts += ['--self-test'] - if listen: - parts += ['--listen'] - return ' '.join(parts) - - def build_dump_start(self, *, controller_id: int = 0, dump_filter: str | None = None) -> str: - cmd = f'twai_dump {_ctrl(controller_id)}' - if dump_filter: - cmd += f',{dump_filter}' - return cmd - - def build_dump_stop(self, *, controller_id: int = 0) -> str: - return f'twai_dump {_ctrl(controller_id)} --stop' - - # ------------------------- high-level ops ------------------------- - def init(self, controller_id: int = 0, **kwargs: Any) -> bool: - return self.run(self.build_init(controller_id=controller_id, **kwargs)) - - def deinit(self, controller_id: int = 0) -> bool: - return self.run(f'twai_deinit {_ctrl(controller_id)}') - - def dump_start(self, controller_id: int = 0, dump_filter: str | None = None) -> bool: - return self.run(self.build_dump_start(controller_id=controller_id, dump_filter=dump_filter)) - - def dump_stop(self, controller_id: int = 0) -> tuple[bool, bool]: - """Stop dump and return (stopped_ok, timeout_warning_seen).""" - self.sendline(self.build_dump_stop(controller_id=controller_id)) - # If the dump task does not exit naturally, the implementation prints this warning. - warning_seen = self.expect(r'Dump task did not exit naturally, timeout', timeout=5) - # Whether or not warning appears, we should be back to a prompt. - prompt_ok = self.expect(PROMPTS, timeout=2) or True # relax - return prompt_ok, warning_seen - - def send(self, frame_str: str, controller_id: int = 0) -> bool: - return self.run(f'twai_send {_ctrl(controller_id)} {frame_str}') - - def info(self, controller_id: int = 0) -> bool: - return self.run( - f'twai_info {_ctrl(controller_id)}', - [rf'TWAI{controller_id} Status:', r'Node State:', r'Bitrate:'], - ) - - def recover(self, controller_id: int = 0, timeout_ms: int | None = None) -> bool: - cmd = f'twai_recover {_ctrl(controller_id)}' - if timeout_ms is not None: - cmd += f' -t {timeout_ms}' - return self.run(cmd, ['Recovery not needed', 'node is Error Active', 'ESP_ERR_INVALID_STATE']) # any - - def expect_info_format(self, controller_id: int = 0) -> bool: - self.sendline(f'twai_info {_ctrl(controller_id)}') - checks = [ - rf'TWAI{controller_id} Status: \w+', - r'Node State: \w+', - r'Error Counters: TX=\d+, RX=\d+', - r'Bitrate: \d+ bps', - ] - return all(self.expect(p, timeout=2) for p in checks) - - def invalid_should_fail(self, cmd: str, timeout: float = 2.0) -> bool: - self.sendline(cmd) - return self.expect([r'Command returned non-zero error code:', r'ERROR', r'Failed', r'Invalid'], timeout=timeout) - - def test_with_patterns(self, cmd: str, patterns: list[str], timeout: float = 3.0) -> bool: - self.sendline(cmd) - return all(self.expect(p, timeout=timeout) for p in patterns) - - def send_and_expect_in_dump( - self, - frame_str: str, - expected_id: int, - controller_id: int = 0, - timeout: float = 3.0, - ) -> bool: - ctrl = _ctrl(controller_id) - self.sendline(f'twai_send {ctrl} {frame_str}') - return self.expect(_id_pattern(ctrl, expected_id), timeout=timeout) - - # ------------------------- context manager ------------------------- - @contextmanager - def session( - self, - *, - controller_id: int = 0, - mode: str = 'no_transceiver', - start_dump: bool = True, - dump_filter: str | None = None, - **kwargs: Any, - ) -> Generator['TwaiTestHelper', None, None]: - """Manage init/dump lifecycle consistently. - - - mode="no_transceiver": loopback + self_test on a single GPIO. - - mode="standard": caller must provide tx_gpio/rx_gpio (or we use defaults). - """ - # Build effective init args - init_args = dict(kwargs) - init_args['controller_id'] = controller_id - - if mode == 'no_transceiver': - init_args |= dict( - tx_gpio=TestConfig.NO_TRANSCEIVER_GPIO, - rx_gpio=TestConfig.NO_TRANSCEIVER_GPIO, - bitrate=kwargs.get('bitrate', TestConfig.DEFAULT_BITRATE), - loopback=True, - self_test=True, - ) - elif mode == 'standard': - init_args.setdefault('tx_gpio', TestConfig.DEFAULT_TX_GPIO) - init_args.setdefault('rx_gpio', TestConfig.DEFAULT_RX_GPIO) - init_args.setdefault('bitrate', kwargs.get('bitrate', TestConfig.DEFAULT_BITRATE)) - else: - raise ValueError(f'Unknown mode: {mode}') - - if not self.init(**init_args): - raise RuntimeError(f'Failed to initialize TWAI in {mode} mode') - - dump_started = False - dump_timeout_flag = False - try: - if start_dump: - dump_started = self.dump_start(controller_id=controller_id, dump_filter=dump_filter) - yield self - finally: - if dump_started: - _, warning = self.dump_stop(controller_id=controller_id) - dump_timeout_flag = warning - - self.deinit(controller_id=controller_id) - - if dump_timeout_flag: - pytest.fail(f'Dump stop timed out for {_ctrl(controller_id)}') - - -# --------------------------------------------------------------------------- -# CAN bus manager (external hardware) -# --------------------------------------------------------------------------- - - -class CanBusManager: - """CAN bus manager for external hardware tests""" - - def __init__(self, interface: str = 'can0'): - self.interface = interface - self.bus: can.Bus | None = None - - @contextmanager - def managed_bus(self, bitrate: int = 500000) -> Generator[can.Bus, None, None]: - try: - result = subprocess.run(['ip', '-details', 'link', 'show', self.interface], capture_output=True, text=True) - if result.returncode != 0: - raise Exception(f'CAN interface {self.interface} not found') - - interface_up = 'UP' in result.stdout - current_bitrate = None - m = re.search(r'bitrate (\d+)', result.stdout) - if m: - current_bitrate = int(m.group(1)) - - if current_bitrate != bitrate: - logging.info( - f'Configuring CAN interface: current_bitrate={current_bitrate}, required_bitrate={bitrate}' - ) - try: - if interface_up: - subprocess.run( - ['sudo', '-n', 'ip', 'link', 'set', self.interface, 'down'], check=True, capture_output=True - ) - subprocess.run( - [ - 'sudo', - '-n', - 'ip', - 'link', - 'set', - self.interface, - 'up', - 'type', - 'can', - 'bitrate', - str(bitrate), - ], - check=True, - capture_output=True, - ) - time.sleep(0.5) - except subprocess.CalledProcessError: - raise Exception( - f'Failed to configure CAN interface {self.interface}. ' - f'Try: sudo ip link set {self.interface} down && ' - f'sudo ip link set {self.interface} up type can bitrate {bitrate}' - ) - - self.bus = can.Bus(interface='socketcan', channel=self.interface) - yield self.bus - except Exception as e: - pytest.skip(f'CAN interface not available: {str(e)}') - finally: - if self.bus: - try: - self.bus.shutdown() - subprocess.run(['sudo', '-n', 'ip', 'link', 'set', self.interface, 'down'], check=True) - except Exception: - pass - - -# --------------------------------------------------------------------------- -# Fixtures -# --------------------------------------------------------------------------- - - -@pytest.fixture -def twai(dut: Dut) -> TwaiTestHelper: - return TwaiTestHelper(dut) - - -@pytest.fixture -def can_manager() -> CanBusManager: - return CanBusManager() - - -# --------------------------------------------------------------------------- -# CORE TESTS -# --------------------------------------------------------------------------- - - -@pytest.mark.generic -@idf_parametrize('target', soc_filtered_targets('SOC_TWAI_SUPPORTED == 1'), indirect=['target']) -def test_twai_basic_operations(twai: TwaiTestHelper) -> None: - with twai.session( - mode='standard', tx_gpio=TestConfig.DEFAULT_TX_GPIO, rx_gpio=TestConfig.DEFAULT_RX_GPIO, start_dump=False - ): - # Test basic send operation - assert twai.send('123#DEADBEEF'), 'Basic send operation failed' - - # Test dump filter operations - first start should succeed - assert twai.dump_start(dump_filter='123:7FF'), 'First dump start failed' - - # Second start should be handled gracefully (already running) - twai.dump_start(dump_filter='456:7FF') # Should handle "already running" case - - # Stop should work normally - stopped_ok, warning = twai.dump_stop() - assert stopped_ok, 'Dump stop failed' - - -@pytest.mark.generic -@idf_parametrize('target', soc_filtered_targets('SOC_TWAI_SUPPORTED == 1'), indirect=['target']) -def test_twai_bitrate_configuration(twai: TwaiTestHelper) -> None: - for bitrate in TestConfig.BITRATES: - with twai.session( - mode='standard', bitrate=bitrate, tx_gpio=TestConfig.DEFAULT_TX_GPIO, rx_gpio=TestConfig.DEFAULT_RX_GPIO - ): - assert twai.info(), f'Info failed for bitrate {bitrate}' - - # TWAI-FD bitrate validation (intentionally invalid: data bitrate < arbitration) - if twai.init( - tx_gpio=TestConfig.DEFAULT_TX_GPIO, rx_gpio=TestConfig.DEFAULT_RX_GPIO, bitrate=1_000_000, fd_bitrate=500_000 - ): - try: - ok = twai.test_with_patterns( - f'twai_info {_ctrl(0)}', - [r'TWAI0 Status:', r'Bitrate: 1000000'], - ) - assert ok, 'FD bitrate validation info failed' - finally: - twai.deinit() - - -@pytest.mark.generic -@idf_parametrize('target', soc_filtered_targets('SOC_TWAI_SUPPORTED == 1'), indirect=['target']) -def test_twai_frame_formats(twai: TwaiTestHelper) -> None: - with twai.session(): - for frame_str, desc in TestConfig.BASIC_FRAMES: - can_id = int(frame_str.split('#')[0], 16) - assert twai.send_and_expect_in_dump(frame_str, can_id), f'Basic frame failed: {frame_str} ({desc})' - for frame_str, desc in TestConfig.EXTENDED_FRAMES: - can_id = int(frame_str.split('#')[0], 16) - assert twai.send_and_expect_in_dump(frame_str, can_id), f'Extended frame failed: {frame_str} ({desc})' - for frame_str, desc in TestConfig.RTR_FRAMES: - assert twai.send(frame_str), f'RTR frame failed: {frame_str} ({desc})' - - -@pytest.mark.generic -@idf_parametrize('target', soc_filtered_targets('SOC_TWAI_SUPPORTED == 1'), indirect=['target']) -def test_twai_info_and_recovery(twai: TwaiTestHelper) -> None: - with twai.session(): - assert twai.info(), 'Info command failed' - assert twai.expect_info_format(), 'Info format check failed' - - assert twai.test_with_patterns( - f'twai_info {_ctrl(0)}', - [ - r'TWAI0 Status: Running', - r'Node State: Error Active', - r'Error Counters: TX=0, RX=0', - ], - ), 'Expected status patterns not found' - - assert twai.recover(), 'Recover status check failed' - assert twai.recover(timeout_ms=1000), 'Recover command with timeout failed' - - -@pytest.mark.generic -@idf_parametrize('target', soc_filtered_targets('SOC_TWAI_SUPPORTED == 1'), indirect=['target']) -def test_twai_input_validation(twai: TwaiTestHelper) -> None: - with twai.session(start_dump=False): - for frame_str, desc in TestConfig.INVALID_FRAMES: - assert twai.invalid_should_fail(f'twai_send {_ctrl(0)} {frame_str}'), ( - f'Invalid frame should be rejected: {frame_str} ({desc})' - ) - - invalid_commands = [ - 'twai_init', # Missing controller ID - f'twai_init {_ctrl(0)}', # Missing required GPIO - 'twai_init twai99 -t 4 -r 5', # Invalid controller ID - f'twai_recover {_ctrl(0)} -t -5', # Invalid timeout value - f'twai_init {_ctrl(0)} -t -1 -r 5', # Negative TX GPIO - f'twai_init {_ctrl(0)} -t 99 -r 5', # High GPIO number - f'twai_init {_ctrl(0)} -t 4 -r 5 -c -1', # Negative clk_out GPIO - f'twai_init {_ctrl(0)} -t 4 -r 5 -b 0', # Zero bitrate - ] - for cmd in invalid_commands: - assert twai.invalid_should_fail(cmd), f'Invalid command should fail: {cmd}' - - uninitialized_ops = [f'twai_send {_ctrl(0)} 123#DEAD', f'twai_recover {_ctrl(0)}', f'twai_dump {_ctrl(0)}'] - for cmd in uninitialized_ops: - assert twai.invalid_should_fail(cmd), f'Non-initialized operation should fail: {cmd}' - - with twai.session(start_dump=False): - assert twai.invalid_should_fail(f'twai_init {_ctrl(0)} -t 4 -r 5'), ( - 'Duplicate initialization should be prevented' - ) - - -@pytest.mark.generic -@idf_parametrize('target', soc_filtered_targets('SOC_TWAI_SUPPORTED == 1'), indirect=['target']) -def test_twai_gpio_and_basic_send(twai: TwaiTestHelper) -> None: - with twai.session(): - assert twai.send('123#DEADBEEF'), 'No-transceiver send failed' - - with twai.session(mode='standard', tx_gpio=TestConfig.DEFAULT_TX_GPIO, rx_gpio=TestConfig.DEFAULT_RX_GPIO): - assert twai.info(), 'GPIO info failed' - assert twai.test_with_patterns( - f'twai_info {_ctrl(0)}', - [rf'GPIOs: TX=GPIO{TestConfig.DEFAULT_TX_GPIO}, RX=GPIO{TestConfig.DEFAULT_RX_GPIO}'], - ) - for frame_str in TestConfig.BASIC_SEND_FRAMES: - assert twai.send(frame_str), f'Standard mode send failed: {frame_str}' - - if twai.init(tx_gpio=4, rx_gpio=5, clk_out_gpio=6, bus_off_gpio=7): - try: - assert twai.info(), 'Optional GPIO info failed' - assert twai.test_with_patterns(f'twai_info {_ctrl(0)}', [r'TWAI0 Status:', r'GPIOs: TX=GPIO4']), ( - 'GPIO info format failed' - ) - finally: - twai.deinit() - - -@pytest.mark.generic -@idf_parametrize('target', soc_filtered_targets('SOC_TWAI_SUPPORTED == 1'), indirect=['target']) -def test_twai_send_various_frames(twai: TwaiTestHelper) -> None: - with twai.session(): - for frame_str, desc in TestConfig.BOUNDARY_ID_FRAMES: - assert twai.send(frame_str), f'Boundary ID failed: {frame_str} ({desc})' - for frame_str in TestConfig.RAPID_FRAMES: - assert twai.send(frame_str), f'Rapid send failed: {frame_str}' - - -@pytest.mark.generic -@idf_parametrize('target', soc_filtered_targets('SOC_TWAI_SUPPORT_FD == 1'), indirect=['target']) -def test_twai_fd_frames(twai: TwaiTestHelper) -> None: - with twai.session(): - for frame_str, desc in TestConfig.FD_FRAMES: - assert twai.send(frame_str), f'FD frame failed: {frame_str} ({desc})' - - -@pytest.mark.generic -@idf_parametrize('target', soc_filtered_targets('SOC_TWAI_SUPPORTED == 1'), indirect=['target']) -def test_twai_filtering(twai: TwaiTestHelper) -> None: - """Test TWAI filtering including automatic extended frame detection.""" - for filter_str, test_frames in TestConfig.FILTER_TESTS: + for filter_str, test_frames in RANGE_FILTER_GROUPS: with twai.session(dump_filter=filter_str): failed_cases: list[str] = [] - for frame_str, expected_id, should_receive in test_frames: - received = twai.send_and_expect_in_dump(frame_str, expected_id, timeout=1.0) - if received != should_receive: - expected_action = 'receive' if should_receive else 'filter out' - actual_action = 'received' if received else 'filtered out' - failed_cases.append(f'{frame_str}: expected {expected_action}, got {actual_action}') - - if failed_cases: - pytest.fail( - f'Filter test failed for filter "{filter_str or "no filter"}":\n' - + '\n'.join(failed_cases) - + '\n\nNote: Filters auto-detect extended frames by:' - '\n- String length > 3 chars or ID value > 0x7FF' - ) - - -@pytest.mark.generic -@idf_parametrize('target', soc_filtered_targets('SOC_TWAI_RANGE_FILTER_NUM > 0'), indirect=['target']) -def test_twai_range_filters(twai: TwaiTestHelper) -> None: - """Test TWAI range filters (available on chips with range filter support).""" - for filter_str, test_frames in TestConfig.RANGE_FILTER_TESTS: - with twai.session(dump_filter=filter_str): - failed_cases: list[str] = [] - for frame_str, expected_id, should_receive in test_frames: - received = twai.send_and_expect_in_dump(frame_str, expected_id, timeout=1.0) + for frame_str, frame_id, should_receive in test_frames: + received = twai.send_and_expect_in_dump(frame_str, frame_id, timeout=1.0) if received != should_receive: expected_action = 'receive' if should_receive else 'filter out' actual_action = 'received' if received else 'filtered out' @@ -647,79 +597,41 @@ def test_twai_range_filters(twai: TwaiTestHelper) -> None: @pytest.mark.temp_skip_ci(targets=['esp32h4'], reason='no runner') @idf_parametrize('target', soc_filtered_targets('SOC_TWAI_SUPPORTED == 1'), indirect=['target']) @pytest.mark.temp_skip_ci(targets=['esp32p4'], reason='p4 rev3 migration # TODO: IDF-14393') -def test_twai_external_communication(twai: TwaiTestHelper, can_manager: CanBusManager) -> None: - """ - Test bidirectional communication with external CAN interface (hardware level). - - Requirements: - - ESP node connected to physical CAN transceiver, properly wired to PC's socketcan - interface (default can0) via CANH/CANL. - - PC has `python-can` and can0 is available. - - Bitrate matches TestConfig.DEFAULT_BITRATE (default 500 kbps). - """ +def test_twai_utils_external_communication(twai: TwaiTestHelper, usb_can: CanBusManager) -> None: test_frames = [ ('123#DEADBEEF', 0x123, bytes.fromhex('DEADBEEF'), False), ('7FF#AA55', 0x7FF, bytes.fromhex('AA55'), False), ('12345678#CAFEBABE', 0x12345678, bytes.fromhex('CAFEBABE'), True), ] - with can_manager.managed_bus(bitrate=TestConfig.DEFAULT_BITRATE) as can_bus: + with usb_can.managed_bus(bitrate=DEFAULT_BITRATE) as can_bus: with twai.session( mode='standard', - tx_gpio=TestConfig.DEFAULT_TX_GPIO, - rx_gpio=TestConfig.DEFAULT_RX_GPIO, - bitrate=TestConfig.DEFAULT_BITRATE, + tx_gpio=DEFAULT_TX_GPIO, + rx_gpio=DEFAULT_RX_GPIO, + bitrate=DEFAULT_BITRATE, start_dump=False, ): - # --- ESP -> PC Connectivity Test --- - first_frame, test_id, test_data, test_extended = test_frames[0] - if not twai.send(first_frame): - pytest.skip( - f'ESP CAN send failed - check ESP GPIO ' - f'{TestConfig.DEFAULT_TX_GPIO}/{TestConfig.DEFAULT_RX_GPIO} -> ' - f'CAN transceiver connection' - ) - - deadline = time.time() + 3.0 - got: can.Message | None = None - while time.time() < deadline: - try: - msg = can_bus.recv(timeout=0.2) - if msg and msg.arbitration_id == test_id: - got = msg - break - except Exception as e: - logging.debug(f'PC CAN receive exception: {e}') - if got is None: - pytest.skip( - 'ESP->PC communication failed - check CAN transceiver -> PC can0 connection. ' - "Verify wiring and 'sudo ip link set can0 up type can bitrate 500000'" - ) - if got is not None and bytes(got.data) != test_data: - pytest.fail( - f'ESP->PC data corruption detected: expected {test_data.hex()}, got {bytes(got.data).hex()}' - ) - - # --- Full ESP -> PC Test --- - for frame_str, expected_id, expected_data, is_extended in test_frames: + # --- ESP -> PC Test --- + for frame_str, frame_id, expected_data, is_extended in test_frames: assert twai.send(frame_str), f'ESP->PC send failed: {frame_str}' - deadline = time.time() + 1.0 + deadline = time.time() + 2.0 got = None while time.time() < deadline: try: msg = can_bus.recv(timeout=0.1) - if msg and msg.arbitration_id == expected_id: + if msg and msg.arbitration_id == frame_id: got = msg break except Exception: continue - assert got is not None, f'ESP->PC receive timeout for ID=0x{expected_id:X}' + assert got is not None, f'ESP->PC receive timeout for ID=0x{frame_id:X}' assert bool(got.is_extended_id) == is_extended, ( - f'ESP->PC extended flag mismatch for 0x{expected_id:X}: ' + f'ESP->PC extended flag mismatch for 0x{frame_id:X}: ' f'expected {is_extended}, got {got.is_extended_id}' ) assert bytes(got.data) == expected_data, ( - f'ESP->PC data mismatch for 0x{expected_id:X}: ' + f'ESP->PC data mismatch for 0x{frame_id:X}: ' f'expected {expected_data.hex()}, got {bytes(got.data).hex()}' ) @@ -727,19 +639,13 @@ def test_twai_external_communication(twai: TwaiTestHelper, can_manager: CanBusMa assert twai.dump_start(), 'Failed to start twai_dump' assert twai.info(), 'Failed to get twai_info' - test_msg = can.Message(arbitration_id=test_id, data=test_data, is_extended_id=test_extended) try: - can_bus.send(test_msg) - time.sleep(0.2) - assert twai.expect(_id_pattern('twai0', test_id), timeout=2.0), ( - f'PC->ESP frame not received: ID=0x{test_id:X}, data={test_data.hex()}' - ) - for frame_str, expected_id, expected_data, is_extended in test_frames[1:]: - msg = can.Message(arbitration_id=expected_id, data=expected_data, is_extended_id=is_extended) - can_bus.send(msg) + for frame_str, frame_id, expected_data, is_extended in test_frames: + msg = can.Message(arbitration_id=frame_id, data=expected_data, is_extended_id=is_extended) + print(f'\nPC->ESP sending frame: {msg}, Return: {can_bus.send(msg)}') time.sleep(0.1) - assert twai.expect(_id_pattern('twai0', expected_id), timeout=1.0), ( - f'PC->ESP frame not received: ID=0x{expected_id:X}, data={expected_data.hex()}' + assert twai.expect(_id_pattern('twai0', frame_id), timeout=1.0), ( + f'PC->ESP frame not received: ID=0x{frame_id:X}, data={expected_data.hex()}' ) finally: twai.dump_stop() diff --git a/examples/peripherals/twai/twai_utils/sdkconfig.defaults b/examples/peripherals/twai/twai_utils/sdkconfig.defaults new file mode 100644 index 0000000000..5b28a974f7 --- /dev/null +++ b/examples/peripherals/twai/twai_utils/sdkconfig.defaults @@ -0,0 +1 @@ +# DON'T remove, hold this file to let sdkconfig.defaults.{name} take effect