fix(driver_twai): enhance ci test and fix example

This commit is contained in:
wanckl
2025-11-14 15:25:41 +08:00
committed by morris
parent 007b376057
commit a485d7f8a6
8 changed files with 631 additions and 650 deletions
@@ -2,7 +2,7 @@
# SPDX-License-Identifier: CC0-1.0
import logging
import subprocess
from time import sleep
import time
import pytest
from can import Bus
@@ -10,6 +10,10 @@ from can import Message
from pytest_embedded import Dut
from pytest_embedded_idf.utils import idf_parametrize
# ---------------------------------------------------------------------------
# Loop Back Tests
# ---------------------------------------------------------------------------
@pytest.mark.generic
@pytest.mark.parametrize(
@@ -26,16 +30,57 @@ def test_legacy_twai_self(dut: Dut) -> None:
dut.run_all_single_board_cases(group='twai-loop-back')
# ---------------------------------------------------------------------------
# Helper Functions
# ---------------------------------------------------------------------------
def esp_enter_flash_mode(dut: Dut) -> None:
ser = dut.serial.proc
ser.setRTS(True) # EN Low
time.sleep(0.5)
ser.setDTR(True) # GPIO0 Low
ser.setRTS(False) # EN High
dut.expect('waiting for download', timeout=2)
ser.setDTR(False) # Back RTS/DTR to 1/1 to avoid affect to esptool
def esp_reset_and_wait_ready(dut: Dut) -> None:
dut.serial.hard_reset()
time.sleep(0.5)
dut.expect_exact('Press ENTER to see the list of tests')
@pytest.fixture(name='socket_can')
def fixture_create_socket_can() -> Bus:
# Set up the socket CAN with the bitrate
start_command = 'sudo ip link set can0 up type can bitrate 250000 restart-ms 100'
stop_command = 'sudo ip link set can0 down'
subprocess.run(start_command, shell=True, capture_output=True, text=True)
bus = Bus(interface='socketcan', channel='can0', bitrate=250000)
yield bus # test invoked here
bus.shutdown()
subprocess.run(stop_command, shell=True, capture_output=True, text=True)
start_command = 'sudo -n ip link set can0 up type can bitrate 250000 restart-ms 100'
stop_command = 'sudo -n ip link set can0 down'
status_command = 'sudo -n ip -details link show can0'
try:
result = subprocess.run(status_command, shell=True, capture_output=True, text=True)
if result.returncode != 0:
raise Exception('CAN interface "can0" not found')
if 'UP' in result.stdout: # Close the bus anyway if it is already up
subprocess.run(stop_command, shell=True, capture_output=True, text=True)
subprocess.run(start_command, shell=True, capture_output=True, text=True)
time.sleep(0.5)
bus = Bus(interface='socketcan', channel='can0', bitrate=250000)
yield bus # test invoked here
bus.shutdown()
except Exception as e:
pytest.skip(f'Open usb-can bus Error: {str(e)}')
finally:
subprocess.run(stop_command, shell=True, capture_output=True, text=True)
# ---------------------------------------------------------------------------
# Interactive Tests
# ---------------------------------------------------------------------------
@pytest.mark.twai_std
@@ -50,14 +95,13 @@ def fixture_create_socket_can() -> Bus:
'target', ['esp32', 'esp32c3', 'esp32c6', 'esp32h2', 'esp32s2', 'esp32s3', 'esp32p4'], indirect=['target']
)
def test_legacy_twai_listen_only(dut: Dut, socket_can: Bus) -> None:
dut.serial.hard_reset()
dut.expect_exact('Press ENTER to see the list of tests')
esp_reset_and_wait_ready(dut)
# TEST_CASE("twai_listen_only", "[twai]")
dut.write('"twai_listen_only"')
# wait the DUT to block at the receive API
sleep(0.03)
# wait the DUT to start listening
time.sleep(0.1)
message = Message(
arbitration_id=0x123,
@@ -66,6 +110,7 @@ def test_legacy_twai_listen_only(dut: Dut, socket_can: Bus) -> None:
)
socket_can.send(message, timeout=0.2)
dut.expect_unity_test_output()
esp_enter_flash_mode(dut)
@pytest.mark.twai_std
@@ -80,8 +125,7 @@ def test_legacy_twai_listen_only(dut: Dut, socket_can: Bus) -> None:
'target', ['esp32', 'esp32c3', 'esp32c6', 'esp32h2', 'esp32s2', 'esp32s3', 'esp32p4'], indirect=['target']
)
def test_legacy_twai_remote_request(dut: Dut, socket_can: Bus) -> None:
dut.serial.hard_reset()
dut.expect_exact('Press ENTER to see the list of tests')
esp_reset_and_wait_ready(dut)
# TEST_CASE("twai_remote_request", "[twai]")
dut.write('"twai_remote_request"')
@@ -103,3 +147,4 @@ def test_legacy_twai_remote_request(dut: Dut, socket_can: Bus) -> None:
print('send', reply)
dut.expect_unity_test_output()
esp_enter_flash_mode(dut)
@@ -2,7 +2,7 @@
# SPDX-License-Identifier: Apache-2.0
import subprocess
from time import sleep
import time
import pytest
from can import Bus
@@ -12,6 +12,9 @@ from pytest_embedded_idf.utils import idf_parametrize
from pytest_embedded_idf.utils import soc_filtered_targets
# ---------------------------------------------------------------------------
# Loop Back Tests
# ---------------------------------------------------------------------------
@pytest.mark.generic
@pytest.mark.parametrize('config', ['release', 'cache_safe'], indirect=True)
@idf_parametrize('target', soc_filtered_targets('SOC_TWAI_SUPPORTED == 1'), indirect=['target'])
@@ -19,43 +22,77 @@ def test_driver_twai_loopbk(dut: Dut) -> None:
dut.run_all_single_board_cases(group='twai', reset=True)
# -------------------------------- test twai interactive ------------------------------
# ---------------------------------------------------------------------------
# Helper Functions
# ---------------------------------------------------------------------------
def esp_enter_flash_mode(dut: Dut) -> None:
ser = dut.serial.proc
ser.setRTS(True) # EN Low
time.sleep(0.5)
ser.setDTR(True) # GPIO0 Low
ser.setRTS(False) # EN High
dut.expect('waiting for download', timeout=2)
ser.setDTR(False) # Back RTS/DTR to 1/1 to avoid affect to esptool
def esp_reset_and_wait_ready(dut: Dut) -> None:
dut.serial.hard_reset()
time.sleep(0.5)
dut.expect_exact('Press ENTER to see the list of tests')
@pytest.fixture(name='socket_can')
def fixture_create_socket_can() -> Bus:
# Set up the socket CAN with the bitrate
start_command = 'sudo ip link set can0 up type can bitrate 250000'
stop_command = 'sudo ip link set can0 down'
start_command = 'sudo -n ip link set can0 up type can bitrate 250000 restart-ms 100'
stop_command = 'sudo -n ip link set can0 down'
status_command = 'sudo -n ip -details link show can0'
try:
result = subprocess.run(status_command, shell=True, capture_output=True, text=True)
if result.returncode != 0:
raise Exception('CAN interface "can0" not found')
if 'UP' in result.stdout: # Close the bus anyway if it is already up
subprocess.run(stop_command, shell=True, capture_output=True, text=True)
subprocess.run(start_command, shell=True, capture_output=True, text=True)
time.sleep(0.5)
bus = Bus(interface='socketcan', channel='can0', bitrate=250000)
yield bus # test invoked here
bus.shutdown()
except Exception as e:
print(f'Open bus Error: {e}')
bus = Bus(interface='socketcan', channel='can0', bitrate=250000)
yield bus # test invoked here
bus.shutdown()
subprocess.run(stop_command, shell=True, capture_output=True, text=True)
pytest.skip(f'Open usb-can bus Error: {str(e)}')
finally:
subprocess.run(stop_command, shell=True, capture_output=True, text=True)
# ---------------------------------------------------------------------------
# Interactive Tests
# ---------------------------------------------------------------------------
@pytest.mark.twai_std
@pytest.mark.temp_skip_ci(targets=['esp32h4'], reason='no runner')
@pytest.mark.parametrize('config', ['release'], indirect=True)
@idf_parametrize('target', soc_filtered_targets('SOC_TWAI_SUPPORTED == 1'), indirect=['target'])
def test_driver_twai_listen_only(dut: Dut, socket_can: Bus) -> None:
dut.serial.hard_reset()
dut.expect_exact('Press ENTER to see the list of tests')
esp_reset_and_wait_ready(dut)
dut.write('"twai_listen_only"')
# wait the DUT to finish initialize
sleep(0.1)
time.sleep(0.1)
message = Message(
arbitration_id=0x6688,
is_extended_id=True,
data=[0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77, 0x88],
)
print('USB Socket CAN Send:', message)
socket_can.send(message, timeout=0.2)
print('USB Socket CAN Send:', message, 'Return:', socket_can.send(message))
dut.expect_unity_test_output(timeout=10)
esp_enter_flash_mode(dut)
@pytest.mark.twai_std
@@ -63,8 +100,7 @@ def test_driver_twai_listen_only(dut: Dut, socket_can: Bus) -> None:
@pytest.mark.parametrize('config', ['release'], indirect=True)
@idf_parametrize('target', soc_filtered_targets('SOC_TWAI_SUPPORTED == 1'), indirect=['target'])
def test_driver_twai_remote_request(dut: Dut, socket_can: Bus) -> None:
dut.serial.hard_reset()
dut.expect_exact('Press ENTER to see the list of tests')
esp_reset_and_wait_ready(dut)
dut.write('"twai_remote_request"')
@@ -82,4 +118,6 @@ def test_driver_twai_remote_request(dut: Dut, socket_can: Bus) -> None:
)
socket_can.send(reply, timeout=0.2)
print('USB Socket CAN Replied:', reply)
dut.expect_unity_test_output(timeout=10)
esp_enter_flash_mode(dut)
@@ -18,10 +18,10 @@ extern "C" {
#define GPIO_PIN_COUNT (SOC_GPIO_PIN_COUNT)
/// Check whether it is a valid GPIO number
#define GPIO_IS_VALID_GPIO(gpio_num) ((gpio_num >= 0) && \
#define GPIO_IS_VALID_GPIO(gpio_num) ((gpio_num >= 0) && (gpio_num <= (SOC_GPIO_PIN_COUNT - 1)) && \
(((1ULL << (gpio_num)) & SOC_GPIO_VALID_GPIO_MASK) != 0))
/// Check whether it can be a valid GPIO number of output mode
#define GPIO_IS_VALID_OUTPUT_GPIO(gpio_num) ((gpio_num >= 0) && \
#define GPIO_IS_VALID_OUTPUT_GPIO(gpio_num) ((gpio_num >= 0) && (gpio_num <= SOC_GPIO_OUT_RANGE_MAX) && \
(((1ULL << (gpio_num)) & SOC_GPIO_VALID_OUTPUT_GPIO_MASK) != 0))
/// Check whether it can be a valid digital I/O pad
#define GPIO_IS_VALID_DIGITAL_IO_PAD(gpio_num) ((gpio_num >= 0) && \
@@ -246,12 +246,10 @@ static int twai_init_handler(int argc, char **argv)
/* Configure optional clock output GPIO */
if (twai_init_args.clk_out_gpio->count > 0) {
clk_gpio = twai_init_args.clk_out_gpio->ival[0];
if (clk_gpio >= 0) {
ret = GPIO_IS_VALID_OUTPUT_GPIO(clk_gpio) ? ESP_OK : ESP_ERR_INVALID_ARG;
ESP_GOTO_ON_ERROR(ret, err, TAG, "Invalid CLK out GPIO: %d", clk_gpio);
ctx->driver_config.io_cfg.quanta_clk_out = clk_gpio;
ESP_LOGI(TAG, "Clock output GPIO set to %d", clk_gpio);
}
ret = GPIO_IS_VALID_OUTPUT_GPIO(clk_gpio) ? ESP_OK : ESP_ERR_INVALID_ARG;
ESP_GOTO_ON_ERROR(ret, err, TAG, "Invalid CLK out GPIO: %d", clk_gpio);
ctx->driver_config.io_cfg.quanta_clk_out = clk_gpio;
ESP_LOGI(TAG, "Clock output GPIO set to %d", clk_gpio);
} else {
ctx->driver_config.io_cfg.quanta_clk_out = -1;
ESP_LOGI(TAG, "Clock output disabled");
@@ -605,12 +603,6 @@ void register_twai_core_commands(void)
.data_timing = {
#if CONFIG_EXAMPLE_ENABLE_TWAI_FD
.bitrate = CONFIG_EXAMPLE_DEFAULT_FD_BITRATE,
.sp_permill = 0,
.ssp_permill = 700,
#else
.bitrate = 0,
.sp_permill = 0,
.ssp_permill = 0,
#endif
},
.fail_retry_cnt = -1,
@@ -206,6 +206,7 @@ static void dump_task(void *parameter)
while (atomic_load(&dump_ctx->is_running)) {
rx_queue_item_t item;
if (xQueueReceive(dump_ctx->rx_queue, &item, pdMS_TO_TICKS(CONFIG_EXAMPLE_DUMP_TASK_TIMEOUT_MS)) == pdPASS) {
item.frame.buffer = item.buffer; // point to the new buffer
format_twaidump_frame(dump_ctx->timestamp_mode, &item.frame, item.timestamp_us,
dump_ctx->start_time_us, &dump_ctx->last_frame_time_us,
@@ -188,24 +188,23 @@ int parse_classic_frame(const char *body, twai_frame_t *f)
/* Handle data frame */
f->header.rtr = false; // Ensure RTR flag is cleared.
if (((strlen(body) + 1) / 2) > TWAI_FRAME_MAX_LEN) {
return PARSE_OUT_OF_RANGE;
}
int dl = parse_payload(body, f->buffer, TWAI_FRAME_MAX_LEN);
if (dl < 0) {
return dl;
}
f->header.dlc = (uint8_t)dl;
f->buffer_len = dl;
/* Check for optional _dlc suffix */
const char *underscore = strchr(body, '_');
if (underscore && underscore[1] != '\0') {
uint8_t dlc = (uint8_t)strtoul(underscore + 1, NULL, 16);
if (dlc <= TWAI_FRAME_MAX_LEN) {
f->header.dlc = dlc;
} else {
f->header.dlc = TWAI_FRAME_MAX_LEN;
}
} else {
f->header.dlc = (uint8_t)dl;
uint8_t arg_dlc = (uint8_t)strtoul(underscore + 1, NULL, 16);
f->header.dlc = (TWAI_FRAME_MAX_LEN < arg_dlc) ? TWAI_FRAME_MAX_LEN : arg_dlc;
}
f->buffer_len = dl;
return PARSE_OK;
}
@@ -320,11 +319,10 @@ void format_twaidump_frame(timestamp_mode_t timestamp_mode, const twai_frame_t *
pos += snprintf(output_line + pos, max_len - pos, "[R%d]", frame->header.dlc);
} else {
/* Data frame: add DLC and data bytes with spaces */
printf("frame->header.dlc: %d\n", frame->header.dlc);
int actual_len = twaifd_dlc2len(frame->header.dlc);
pos += snprintf(output_line + pos, max_len - pos, "[%d]", actual_len);
for (int i = 0; i < actual_len && i < frame->buffer_len && pos < max_len - 4; i++) {
pos += snprintf(output_line + pos, max_len - pos, " %02X", frame->buffer[i]);
pos += snprintf(output_line + pos, max_len - pos, " %02X", frame->buffer[i]);
}
}
File diff suppressed because it is too large Load Diff
@@ -0,0 +1 @@
# DON'T remove, hold this file to let sdkconfig.defaults.{name} take effect