From f58b23ed7438a76d7525f90f778043226b073376 Mon Sep 17 00:00:00 2001 From: yiwenxiu Date: Wed, 22 Oct 2025 19:04:48 +0800 Subject: [PATCH] feat(openthread): optimize fail cases in CI test --- examples/openthread/ot_ci_function.py | 101 +++++++++++++++++--------- examples/openthread/pytest_otbr.py | 99 +++++++++++++------------ 2 files changed, 119 insertions(+), 81 deletions(-) diff --git a/examples/openthread/ot_ci_function.py b/examples/openthread/ot_ci_function.py index 9c594894c3..3a5339d341 100644 --- a/examples/openthread/ot_ci_function.py +++ b/examples/openthread/ot_ci_function.py @@ -19,19 +19,31 @@ from pytest_embedded_idf.dut import IdfDut def extract_address( - command: str, pattern: str, default_return: str = '' + command: str, + pattern: str, + default_return: str = '', + retries: int = 3, + delay: int = 2, ) -> Callable[[Callable[[str], str]], Callable[[IdfDut], str]]: def decorator(func: Callable[[str], str]) -> Callable[[IdfDut], str]: @wraps(func) def wrapper(dut: IdfDut) -> str: - clean_buffer(dut) - execute_command(dut, command) - try: - result = dut.expect(pattern, timeout=5)[1].decode() - except Exception as e: - logging.error(f'Error: {e}') - return default_return - return func(result) + last_exception: Exception | None = None + for attempt in range(1, retries + 1): + try: + clean_buffer(dut) + execute_command(dut, command) + result = dut.expect(pattern, timeout=5)[1].decode() + return func(result) + except Exception as e: + logging.exception(f'[{command}] Attempt {attempt}/{retries} failed: {e}') + last_exception = e + if attempt < retries: + time.sleep(delay) + + if last_exception: + logging.exception(f'[{command}] Giving up after {retries} retries.') + return default_return return wrapper @@ -173,6 +185,12 @@ def init_thread(dut: IdfDut) -> None: reset_thread(dut) +def stop_thread(dut: IdfDut) -> None: + execute_command(dut, 'thread stop') + dut.expect('disabled', timeout=20) + reset_thread(dut) + + def reset_thread(dut: IdfDut) -> None: execute_command(dut, 'factoryreset') dut.expect('OpenThread attached to netif', timeout=20) @@ -180,28 +198,28 @@ def reset_thread(dut: IdfDut) -> None: clean_buffer(dut) +def hardreset_dut(dut: IdfDut) -> None: + dut.serial.hard_reset() + time.sleep(5) + execute_command(dut, 'factoryreset') + + # get the mleid address of the thread -def get_mleid_addr(dut: IdfDut) -> str: - dut_adress = '' - execute_command(dut, 'ipaddr mleid') - dut_adress = dut.expect(r'\n((?:\w+:){7}\w+)\r', timeout=5)[1].decode() - return str(dut_adress) +@extract_address('ipaddr mleid', r'\n((?:\w+:){7}\w+)\r') +def get_mleid_addr(addr: str) -> str: + return addr # get the rloc address of the thread -def get_rloc_addr(dut: IdfDut) -> str: - dut_adress = '' - execute_command(dut, 'ipaddr rloc') - dut_adress = dut.expect(r'\n((?:\w+:){7}\w+)\r', timeout=5)[1].decode() - return str(dut_adress) +@extract_address('ipaddr rloc', r'\n((?:\w+:){7}\w+)\r') +def get_rloc_addr(addr: str) -> str: + return addr # get the linklocal address of the thread -def get_linklocal_addr(dut: IdfDut) -> str: - dut_adress = '' - execute_command(dut, 'ipaddr linklocal') - dut_adress = dut.expect(r'\n((?:\w+:){7}\w+)\r', timeout=5)[1].decode() - return str(dut_adress) +@extract_address('ipaddr linklocal', r'\n((?:\w+:){7}\w+)\r') +def get_linklocal_addr(addr: str) -> str: + return addr # get the global unicast address of the thread: @@ -619,22 +637,19 @@ def decimal_to_hex(decimal_str: str) -> str: return hex_str -def get_omrprefix(br: IdfDut) -> str: - execute_command(br, 'br omrprefix') - omrprefix = br.expect(r'Local: ((?:\w+:){4}):/\d+\r', timeout=5)[1].decode() - return str(omrprefix) +@extract_address('br omrprefix', r'Local: ((?:\w+:){4}):/\d+\r') +def get_omrprefix(addr: str) -> str: + return addr -def get_onlinkprefix(br: IdfDut) -> str: - execute_command(br, 'br onlinkprefix') - onlinkprefix = br.expect(r'Local: ((?:\w+:){4}):/\d+\r', timeout=5)[1].decode() - return str(onlinkprefix) +@extract_address('br onlinkprefix', r'Local: ((?:\w+:){4}):/\d+\r') +def get_onlinkprefix(addr: str) -> str: + return addr -def get_nat64prefix(br: IdfDut) -> str: - execute_command(br, 'br nat64prefix') - nat64prefix = br.expect(r'Local: ((?:\w+:){6}):/\d+', timeout=5)[1].decode() - return str(nat64prefix) +@extract_address('br nat64prefix', r'Local: ((?:\w+:){6}):/\d+') +def get_nat64prefix(addr: str) -> str: + return addr def execute_command(dut: IdfDut, command: str, prefix: str = 'ot ') -> None: @@ -647,3 +662,17 @@ def get_ouput_string(dut: IdfDut, command: str, wait_time: int) -> str: tmp = dut.expect(pexpect.TIMEOUT, timeout=wait_time) clean_buffer(dut) return str(tmp) + + +def wait_for_host_network(host: str = '8.8.8.8', retries: int = 6, interval: int = 10) -> None: + for attempt in range(1, retries + 1): + try: + subprocess.run(['ping', '-c', '1', '-W', '2', host], check=True) + logging.info(f'Host network reachable on attempt {attempt}') + return + except subprocess.CalledProcessError: + logging.info(f'Ping attempt {attempt} failed, retrying in {interval} seconds...') + if attempt < retries: + time.sleep(interval) + else: + raise RuntimeError(f'Host network is not reachable after {retries} attempts.') diff --git a/examples/openthread/pytest_otbr.py b/examples/openthread/pytest_otbr.py index 1e06b4b315..8642f47ea6 100644 --- a/examples/openthread/pytest_otbr.py +++ b/examples/openthread/pytest_otbr.py @@ -106,7 +106,7 @@ PORT_MAPPING = {'ESPPORT1': 'esp32h2', 'ESPPORT2': 'esp32s3', 'ESPPORT3': 'esp32 # Case 1: Thread network formation and attaching @pytest.mark.openthread_br -@pytest.mark.flaky(reruns=1, reruns_delay=1) +@pytest.mark.flaky(reruns=1, reruns_delay=5) @pytest.mark.parametrize( 'config, count, app_path, target, port', [ @@ -161,9 +161,9 @@ def test_thread_connect(dut: tuple[IdfDut, IdfDut, IdfDut]) -> None: rx_nums = ocf.ot_ping(br, cli_mleid_addr, count=5)[1] assert rx_nums == 5 finally: - ocf.execute_command(br, 'factoryreset') for cli in cli_list: - ocf.execute_command(cli, 'factoryreset') + ocf.stop_thread(cli) + ocf.stop_thread(br) time.sleep(3) @@ -189,7 +189,7 @@ def formBasicWiFiThreadNetwork(br: IdfDut, cli: IdfDut) -> None: # Case 2: Bidirectional IPv6 connectivity @pytest.mark.openthread_br -@pytest.mark.flaky(reruns=1, reruns_delay=1) +@pytest.mark.flaky(reruns=1, reruns_delay=5) @pytest.mark.parametrize( 'config, count, app_path, target, port', [ @@ -231,18 +231,19 @@ def test_Bidirectional_IPv6_connectivity(Init_interface: bool, dut: tuple[IdfDut host_global_unicast_addr = re.findall(pattern, out_str) rx_nums = 0 for ip_addr in host_global_unicast_addr: - txrx_nums = ocf.ot_ping(cli, str(ip_addr), count=5) + txrx_nums = ocf.ot_ping(cli, str(ip_addr), count=10) rx_nums = rx_nums + int(txrx_nums[1]) + logging.debug(f'rx_nums: {rx_nums}') assert rx_nums != 0 finally: - ocf.execute_command(br, 'factoryreset') - ocf.execute_command(cli, 'factoryreset') + ocf.stop_thread(cli) + ocf.stop_thread(br) time.sleep(3) # Case 3: Multicast forwarding from Wi-Fi to Thread network @pytest.mark.openthread_br -@pytest.mark.flaky(reruns=1, reruns_delay=1) +@pytest.mark.flaky(reruns=1, reruns_delay=5) @pytest.mark.parametrize( 'config, count, app_path, target, port', [ @@ -288,14 +289,14 @@ def test_multicast_forwarding_A(Init_interface: bool, dut: tuple[IdfDut, IdfDut, ocf.execute_command(cli, 'udp close') cli.expect('Done', timeout=5) finally: - ocf.execute_command(br, 'factoryreset') - ocf.execute_command(cli, 'factoryreset') + ocf.stop_thread(cli) + ocf.stop_thread(br) time.sleep(3) # Case 4: Multicast forwarding from Thread to Wi-Fi network @pytest.mark.openthread_br -@pytest.mark.flaky(reruns=1, reruns_delay=1) +@pytest.mark.flaky(reruns=1, reruns_delay=5) @pytest.mark.parametrize( 'config, count, app_path, target, port', [ @@ -341,15 +342,15 @@ def test_multicast_forwarding_B(Init_interface: bool, dut: tuple[IdfDut, IdfDut, while udp_mission.is_alive(): time.sleep(1) finally: - ocf.execute_command(br, 'factoryreset') - ocf.execute_command(cli, 'factoryreset') + ocf.stop_thread(cli) + ocf.stop_thread(br) time.sleep(3) assert b'hello' in myudp.udp_bytes # Case 5: discover dervice published by Thread device @pytest.mark.openthread_br -@pytest.mark.flaky(reruns=1, reruns_delay=1) +@pytest.mark.flaky(reruns=1, reruns_delay=5) @pytest.mark.parametrize( 'config, count, app_path, target, port', [ @@ -403,14 +404,14 @@ def test_service_discovery_of_Thread_device( logging.info(f'avahi-browse:\n {out_str}') assert 'myTest' in str(out_str) finally: - ocf.execute_command(br, 'factoryreset') - ocf.execute_command(cli, 'factoryreset') + ocf.stop_thread(cli) + ocf.stop_thread(br) time.sleep(3) # Case 6: discover dervice published by Wi-Fi device @pytest.mark.openthread_br -@pytest.mark.flaky(reruns=1, reruns_delay=1) +@pytest.mark.flaky(reruns=1, reruns_delay=5) @pytest.mark.parametrize( 'config, count, app_path, target, port', [ @@ -473,14 +474,14 @@ def test_service_discovery_of_WiFi_device( finally: ocf.host_close_service() sp.terminate() - ocf.execute_command(br, 'factoryreset') - ocf.execute_command(cli, 'factoryreset') + ocf.stop_thread(cli) + ocf.stop_thread(br) time.sleep(3) # Case 7: ICMP communication via NAT64 @pytest.mark.openthread_br -@pytest.mark.flaky(reruns=1, reruns_delay=1) +@pytest.mark.flaky(reruns=1, reruns_delay=5) @pytest.mark.parametrize( 'config, count, app_path, target, port', [ @@ -511,14 +512,14 @@ def test_ICMP_NAT64(Init_interface: bool, dut: tuple[IdfDut, IdfDut, IdfDut]) -> rx_nums = ocf.ot_ping(cli, str(host_ipv4_address), count=5)[1] assert rx_nums != 0 finally: - ocf.execute_command(br, 'factoryreset') - ocf.execute_command(cli, 'factoryreset') + ocf.stop_thread(cli) + ocf.stop_thread(br) time.sleep(3) # Case 8: UDP communication via NAT64 @pytest.mark.openthread_br -@pytest.mark.flaky(reruns=1, reruns_delay=1) +@pytest.mark.flaky(reruns=1, reruns_delay=5) @pytest.mark.parametrize( 'config, count, app_path, target, port', [ @@ -566,15 +567,15 @@ def test_UDP_NAT64(Init_interface: bool, dut: tuple[IdfDut, IdfDut, IdfDut]) -> while udp_mission.is_alive(): time.sleep(1) finally: - ocf.execute_command(br, 'factoryreset') - ocf.execute_command(cli, 'factoryreset') + ocf.stop_thread(cli) + ocf.stop_thread(br) time.sleep(3) assert b'hello' in myudp.udp_bytes # Case 9: TCP communication via NAT64 @pytest.mark.openthread_br -@pytest.mark.flaky(reruns=1, reruns_delay=1) +@pytest.mark.flaky(reruns=1, reruns_delay=5) @pytest.mark.parametrize( 'config, count, app_path, target, port', [ @@ -628,14 +629,15 @@ def test_TCP_NAT64(Init_interface: bool, dut: tuple[IdfDut, IdfDut, IdfDut]) -> while tcp_mission.is_alive(): time.sleep(1) finally: - ocf.execute_command(br, 'factoryreset') - ocf.execute_command(cli, 'factoryreset') + ocf.stop_thread(cli) + ocf.stop_thread(br) time.sleep(3) assert b'hello' in mytcp.tcp_bytes # Case 10: Sleepy device test @pytest.mark.openthread_sleep +@pytest.mark.flaky(reruns=1, reruns_delay=5) @pytest.mark.parametrize( 'config, count, app_path, target, port', [ @@ -663,6 +665,7 @@ def test_TCP_NAT64(Init_interface: bool, dut: tuple[IdfDut, IdfDut, IdfDut]) -> def test_ot_sleepy_device(dut: tuple[IdfDut, IdfDut]) -> None: leader = dut[0] sleepy_device = dut[1] + ocf.hardreset_dut(sleepy_device) fail_info = re.compile(r'Core\W*?\d\W*?register dump') try: ocf.init_thread(leader) @@ -689,12 +692,13 @@ def test_ot_sleepy_device(dut: tuple[IdfDut, IdfDut]) -> None: assert not bool(fail_info.search(str(output))) finally: ocf.execute_command(leader, 'factoryreset') + ocf.hardreset_dut(sleepy_device) time.sleep(3) # Case 11: Basic startup Test of BR @pytest.mark.openthread_br -@pytest.mark.flaky(reruns=1, reruns_delay=1) +@pytest.mark.flaky(reruns=1, reruns_delay=5) @pytest.mark.parametrize( 'config, count, app_path, target, port', [ @@ -728,13 +732,13 @@ def test_basic_startup(dut: tuple[IdfDut, IdfDut]) -> None: br.expect('Done', timeout=5) assert ocf.wait_for_join(br, 'leader') finally: - ocf.execute_command(br, 'factoryreset') + ocf.stop_thread(br) time.sleep(3) # Case 12: Curl a website via DNS and NAT64 @pytest.mark.openthread_bbr -@pytest.mark.flaky(reruns=1, reruns_delay=1) +@pytest.mark.flaky(reruns=1, reruns_delay=5) @pytest.mark.parametrize( 'config, count, app_path, target, port', [ @@ -759,23 +763,24 @@ def test_NAT64_DNS(Init_interface: bool, dut: tuple[IdfDut, IdfDut, IdfDut]) -> formBasicWiFiThreadNetwork(br, cli) try: + ocf.wait_for_host_network() ocf.execute_command(br, 'bbr') br.expect('server16', timeout=5) ocf.execute_command(cli, 'dns64server 8.8.8.8') cli.expect('Done', timeout=5) command = 'curl http://www.espressif.com' message = ocf.get_ouput_string(cli, command, 10) - assert '' in str(message) + assert 'html' in str(message) assert '301 Moved Permanently' in str(message) finally: - ocf.execute_command(br, 'factoryreset') - ocf.execute_command(cli, 'factoryreset') + ocf.stop_thread(cli) + ocf.stop_thread(br) time.sleep(3) # Case 13: Meshcop discovery of Border Router @pytest.mark.openthread_br -@pytest.mark.flaky(reruns=1, reruns_delay=1) +@pytest.mark.flaky(reruns=1, reruns_delay=5) @pytest.mark.parametrize( 'config, count, app_path, target, port', [ @@ -829,13 +834,13 @@ def test_br_meshcop(Init_interface: bool, Init_avahi: bool, dut: tuple[IdfDut, I assert 'vn=OpenThread' in str(output_str) assert 'rv=1' in str(output_str) finally: - ocf.execute_command(br, 'factoryreset') + ocf.stop_thread(br) time.sleep(3) # Case 14: Curl a website over HTTPS via DNS and NAT64 @pytest.mark.openthread_bbr -@pytest.mark.flaky(reruns=1, reruns_delay=1) +@pytest.mark.flaky(reruns=1, reruns_delay=5) @pytest.mark.parametrize( 'config, count, app_path, target, port', [ @@ -860,21 +865,22 @@ def test_https_NAT64_DNS(Init_interface: bool, dut: tuple[IdfDut, IdfDut, IdfDut formBasicWiFiThreadNetwork(br, cli) try: + ocf.wait_for_host_network() ocf.execute_command(cli, 'dns64server 8.8.8.8') cli.expect('Done', timeout=5) command = 'curl https://www.example.com/' message = ocf.get_ouput_string(cli, command, 20) - assert '' in str(message) - assert 'This domain is for use in illustrative examples in documents' in str(message) + assert 'html' in str(message) + assert 'This domain is for use in' in str(message) finally: - ocf.execute_command(br, 'factoryreset') - ocf.execute_command(cli, 'factoryreset') + ocf.stop_thread(cli) + ocf.stop_thread(br) time.sleep(3) # Case 15: Thread network formation and attaching with TREL @pytest.mark.openthread_br -@pytest.mark.flaky(reruns=1, reruns_delay=1) +@pytest.mark.flaky(reruns=1, reruns_delay=5) @pytest.mark.parametrize( 'config, count, app_path, target, port', [ @@ -920,15 +926,15 @@ def test_trel_connect(dut: tuple[IdfDut, IdfDut]) -> None: rx_nums = ocf.ot_ping(trel_s3, trel_mleid_addr, count=10)[1] assert rx_nums > 5 finally: - ocf.execute_command(trel_s3, 'factoryreset') for trel in trel_list: - ocf.execute_command(trel, 'factoryreset') + ocf.stop_thread(trel) + ocf.stop_thread(trel_s3) time.sleep(3) # Case 16: Thread network BR lib check @pytest.mark.openthread_br -@pytest.mark.flaky(reruns=1, reruns_delay=1) +@pytest.mark.flaky(reruns=1, reruns_delay=5) @pytest.mark.parametrize( 'config, count, app_path, target, port', [ @@ -957,6 +963,7 @@ def test_br_lib_check(dut: tuple[IdfDut, IdfDut]) -> None: # Case 17: SSED test @pytest.mark.openthread_sleep +@pytest.mark.flaky(reruns=1, reruns_delay=5) @pytest.mark.parametrize( 'config, count, app_path, target, port', [ @@ -985,6 +992,7 @@ def test_ot_ssed_device(dut: tuple[IdfDut, IdfDut]) -> None: leader = dut[0] ssed_device = dut[1] try: + ocf.hardreset_dut(ssed_device) # CI device must have external XTAL to run SSED case, we will check this here first ssed_device.expect('32k XTAL in use', timeout=10) ocf.init_thread(leader) @@ -1024,4 +1032,5 @@ def test_ot_ssed_device(dut: tuple[IdfDut, IdfDut]) -> None: ocf.ping_and_check(dut=leader, target=ssed_address, tx_total=10, timeout=6) finally: ocf.execute_command(leader, 'factoryreset') + ocf.hardreset_dut(ssed_device) time.sleep(3)