From 5d628f7ade75bd8919fec45e7e0de0dd93bab2f5 Mon Sep 17 00:00:00 2001 From: yiwenxiu Date: Wed, 22 Oct 2025 19:04:48 +0800 Subject: [PATCH] feat(openthread): optimize fail cases in CI test --- examples/openthread/ot_ci_function.py | 104 +++++++++++++------- examples/openthread/pytest_otbr.py | 134 ++++++++++++++------------ 2 files changed, 140 insertions(+), 98 deletions(-) diff --git a/examples/openthread/ot_ci_function.py b/examples/openthread/ot_ci_function.py index a661d5ed10..bc32af980c 100644 --- a/examples/openthread/ot_ci_function.py +++ b/examples/openthread/ot_ci_function.py @@ -11,6 +11,7 @@ import subprocess import time from functools import wraps from typing import Callable +from typing import Optional from typing import Tuple import netifaces @@ -20,19 +21,33 @@ from pytest_embedded_idf.dut import IdfDut def extract_address( - command: str, pattern: str, default_return: str = '' + command: str, + pattern: str, + default_return: str = '', + retries: int = 3, + delay: int = 2, ) -> Callable[[Callable[[str], str]], Callable[[IdfDut], str]]: def decorator(func: Callable[[str], str]) -> Callable[[IdfDut], str]: @wraps(func) def wrapper(dut: IdfDut) -> str: - clean_buffer(dut) - execute_command(dut, command) - try: - result = dut.expect(pattern, timeout=5)[1].decode() - except Exception as e: - logging.error(f'Error: {e}') - return default_return - return func(result) + # requires Python3.10 + # last_exception: Exception | None = None + last_exception: Optional[Exception] = None + for attempt in range(1, retries + 1): + try: + clean_buffer(dut) + execute_command(dut, command) + result = dut.expect(pattern, timeout=5)[1].decode() + return func(result) + except Exception as e: + logging.exception(f'[{command}] Attempt {attempt}/{retries} failed: {e}') + last_exception = e + if attempt < retries: + time.sleep(delay) + + if last_exception: + logging.exception(f'[{command}] Giving up after {retries} retries.') + return default_return return wrapper @@ -174,6 +189,12 @@ def init_thread(dut: IdfDut) -> None: reset_thread(dut) +def stop_thread(dut: IdfDut) -> None: + execute_command(dut, 'thread stop') + dut.expect('disabled', timeout=20) + reset_thread(dut) + + def reset_thread(dut: IdfDut) -> None: execute_command(dut, 'factoryreset') dut.expect('OpenThread attached to netif', timeout=20) @@ -181,28 +202,28 @@ def reset_thread(dut: IdfDut) -> None: clean_buffer(dut) +def hardreset_dut(dut: IdfDut) -> None: + dut.serial.hard_reset() + time.sleep(5) + execute_command(dut, 'factoryreset') + + # get the mleid address of the thread -def get_mleid_addr(dut: IdfDut) -> str: - dut_adress = '' - execute_command(dut, 'ipaddr mleid') - dut_adress = dut.expect(r'\n((?:\w+:){7}\w+)\r', timeout=5)[1].decode() - return dut_adress +@extract_address('ipaddr mleid', r'\n((?:\w+:){7}\w+)\r') +def get_mleid_addr(addr: str) -> str: + return addr # get the rloc address of the thread -def get_rloc_addr(dut: IdfDut) -> str: - dut_adress = '' - execute_command(dut, 'ipaddr rloc') - dut_adress = dut.expect(r'\n((?:\w+:){7}\w+)\r', timeout=5)[1].decode() - return dut_adress +@extract_address('ipaddr rloc', r'\n((?:\w+:){7}\w+)\r') +def get_rloc_addr(addr: str) -> str: + return addr # get the linklocal address of the thread -def get_linklocal_addr(dut: IdfDut) -> str: - dut_adress = '' - execute_command(dut, 'ipaddr linklocal') - dut_adress = dut.expect(r'\n((?:\w+:){7}\w+)\r', timeout=5)[1].decode() - return dut_adress +@extract_address('ipaddr linklocal', r'\n((?:\w+:){7}\w+)\r') +def get_linklocal_addr(addr: str) -> str: + return addr # get the global unicast address of the thread: @@ -620,22 +641,19 @@ def decimal_to_hex(decimal_str: str) -> str: return hex_str -def get_omrprefix(br: IdfDut) -> str: - execute_command(br, 'br omrprefix') - omrprefix = br.expect(r'Local: ((?:\w+:){4}):/\d+\r', timeout=5)[1].decode() - return str(omrprefix) +@extract_address('br omrprefix', r'Local: ((?:\w+:){4}):/\d+\r') +def get_omrprefix(addr: str) -> str: + return addr -def get_onlinkprefix(br: IdfDut) -> str: - execute_command(br, 'br onlinkprefix') - onlinkprefix = br.expect(r'Local: ((?:\w+:){4}):/\d+\r', timeout=5)[1].decode() - return str(onlinkprefix) +@extract_address('br onlinkprefix', r'Local: ((?:\w+:){4}):/\d+\r') +def get_onlinkprefix(addr: str) -> str: + return addr -def get_nat64prefix(br: IdfDut) -> str: - execute_command(br, 'br nat64prefix') - nat64prefix = br.expect(r'Local: ((?:\w+:){6}):/\d+', timeout=5)[1].decode() - return str(nat64prefix) +@extract_address('br nat64prefix', r'Local: ((?:\w+:){6}):/\d+') +def get_nat64prefix(addr: str) -> str: + return addr def execute_command(dut: IdfDut, command: str, prefix: str = 'ot ') -> None: @@ -648,3 +666,17 @@ def get_ouput_string(dut: IdfDut, command: str, wait_time: int) -> str: tmp = dut.expect(pexpect.TIMEOUT, timeout=wait_time) clean_buffer(dut) return str(tmp) + + +def wait_for_host_network(host: str = '8.8.8.8', retries: int = 6, interval: int = 10) -> None: + for attempt in range(1, retries + 1): + try: + subprocess.run(['ping', '-c', '1', '-W', '2', host], check=True) + logging.info(f'Host network reachable on attempt {attempt}') + return + except subprocess.CalledProcessError: + logging.info(f'Ping attempt {attempt} failed, retrying in {interval} seconds...') + if attempt < retries: + time.sleep(interval) + else: + raise RuntimeError(f'Host network is not reachable after {retries} attempts.') diff --git a/examples/openthread/pytest_otbr.py b/examples/openthread/pytest_otbr.py index 7c3374fb13..670f6bd8ee 100644 --- a/examples/openthread/pytest_otbr.py +++ b/examples/openthread/pytest_otbr.py @@ -10,6 +10,7 @@ import secrets import subprocess import threading import time +from typing import Tuple import ot_ci_function as ocf import pexpect @@ -109,7 +110,7 @@ PORT_MAPPING = { # Case 1: Thread network formation and attaching @pytest.mark.supported_targets @pytest.mark.openthread_br -@pytest.mark.flaky(reruns=1, reruns_delay=1) +@pytest.mark.flaky(reruns=1, reruns_delay=5) @pytest.mark.parametrize( 'config, count, app_path, target, port', [ @@ -136,7 +137,7 @@ PORT_MAPPING = { ], indirect=True, ) -def test_thread_connect(dut: tuple[IdfDut, IdfDut, IdfDut]) -> None: +def test_thread_connect(dut: Tuple[IdfDut, IdfDut, IdfDut]) -> None: br = dut[2] cli_h2 = dut[1] dut[0].serial.stop_redirect_thread() @@ -164,9 +165,9 @@ def test_thread_connect(dut: tuple[IdfDut, IdfDut, IdfDut]) -> None: rx_nums = ocf.ot_ping(br, cli_mleid_addr, count=5)[1] assert rx_nums == 5 finally: - ocf.execute_command(br, 'factoryreset') for cli in cli_list: - ocf.execute_command(cli, 'factoryreset') + ocf.stop_thread(cli) + ocf.stop_thread(br) time.sleep(3) @@ -193,7 +194,7 @@ def formBasicWiFiThreadNetwork(br:IdfDut, cli:IdfDut) -> None: # Case 2: Bidirectional IPv6 connectivity @pytest.mark.supported_targets @pytest.mark.openthread_br -@pytest.mark.flaky(reruns=1, reruns_delay=1) +@pytest.mark.flaky(reruns=1, reruns_delay=5) @pytest.mark.parametrize( 'config, count, app_path, target, port', [ @@ -210,7 +211,7 @@ def formBasicWiFiThreadNetwork(br:IdfDut, cli:IdfDut) -> None: ], indirect=True, ) -def test_Bidirectional_IPv6_connectivity(Init_interface: bool, dut: tuple[IdfDut, IdfDut, IdfDut]) -> None: +def test_Bidirectional_IPv6_connectivity(Init_interface: bool, dut: Tuple[IdfDut, IdfDut, IdfDut]) -> None: br = dut[2] cli = dut[1] assert Init_interface @@ -235,19 +236,20 @@ def test_Bidirectional_IPv6_connectivity(Init_interface: bool, dut: tuple[IdfDut host_global_unicast_addr = re.findall(pattern, out_str) rx_nums = 0 for ip_addr in host_global_unicast_addr: - txrx_nums = ocf.ot_ping(cli, str(ip_addr), count=5) + txrx_nums = ocf.ot_ping(cli, str(ip_addr), count=10) rx_nums = rx_nums + int(txrx_nums[1]) + logging.debug(f'rx_nums: {rx_nums}') assert rx_nums != 0 finally: - ocf.execute_command(br, 'factoryreset') - ocf.execute_command(cli, 'factoryreset') + ocf.stop_thread(cli) + ocf.stop_thread(br) time.sleep(3) # Case 3: Multicast forwarding from Wi-Fi to Thread network @pytest.mark.supported_targets @pytest.mark.openthread_br -@pytest.mark.flaky(reruns=1, reruns_delay=1) +@pytest.mark.flaky(reruns=1, reruns_delay=5) @pytest.mark.parametrize( 'config, count, app_path, target, port', [ @@ -264,7 +266,7 @@ def test_Bidirectional_IPv6_connectivity(Init_interface: bool, dut: tuple[IdfDut ], indirect=True, ) -def test_multicast_forwarding_A(Init_interface: bool, dut: tuple[IdfDut, IdfDut, IdfDut]) -> None: +def test_multicast_forwarding_A(Init_interface: bool, dut: Tuple[IdfDut, IdfDut, IdfDut]) -> None: br = dut[2] cli = dut[1] assert Init_interface @@ -293,15 +295,15 @@ def test_multicast_forwarding_A(Init_interface: bool, dut: tuple[IdfDut, IdfDut, ocf.execute_command(cli, 'udp close') cli.expect('Done', timeout=5) finally: - ocf.execute_command(br, 'factoryreset') - ocf.execute_command(cli, 'factoryreset') + ocf.stop_thread(cli) + ocf.stop_thread(br) time.sleep(3) # Case 4: Multicast forwarding from Thread to Wi-Fi network @pytest.mark.supported_targets @pytest.mark.openthread_br -@pytest.mark.flaky(reruns=1, reruns_delay=1) +@pytest.mark.flaky(reruns=1, reruns_delay=5) @pytest.mark.parametrize( 'config, count, app_path, target, port', [ @@ -318,7 +320,7 @@ def test_multicast_forwarding_A(Init_interface: bool, dut: tuple[IdfDut, IdfDut, ], indirect=True, ) -def test_multicast_forwarding_B(Init_interface: bool, dut: tuple[IdfDut, IdfDut, IdfDut]) -> None: +def test_multicast_forwarding_B(Init_interface: bool, dut: Tuple[IdfDut, IdfDut, IdfDut]) -> None: br = dut[2] cli = dut[1] assert Init_interface @@ -347,8 +349,8 @@ def test_multicast_forwarding_B(Init_interface: bool, dut: tuple[IdfDut, IdfDut, while udp_mission.is_alive(): time.sleep(1) finally: - ocf.execute_command(br, 'factoryreset') - ocf.execute_command(cli, 'factoryreset') + ocf.stop_thread(cli) + ocf.stop_thread(br) time.sleep(3) assert b'hello' in myudp.udp_bytes @@ -356,7 +358,7 @@ def test_multicast_forwarding_B(Init_interface: bool, dut: tuple[IdfDut, IdfDut, # Case 5: discover dervice published by Thread device @pytest.mark.supported_targets @pytest.mark.openthread_br -@pytest.mark.flaky(reruns=1, reruns_delay=1) +@pytest.mark.flaky(reruns=1, reruns_delay=5) @pytest.mark.parametrize( 'config, count, app_path, target, port', [ @@ -374,7 +376,7 @@ def test_multicast_forwarding_B(Init_interface: bool, dut: tuple[IdfDut, IdfDut, indirect=True, ) def test_service_discovery_of_Thread_device( - Init_interface: bool, Init_avahi: bool, dut: tuple[IdfDut, IdfDut, IdfDut] + Init_interface: bool, Init_avahi: bool, dut: Tuple[IdfDut, IdfDut, IdfDut] ) -> None: br = dut[2] cli = dut[1] @@ -410,15 +412,15 @@ def test_service_discovery_of_Thread_device( logging.info(f'avahi-browse:\n {out_str}') assert 'myTest' in str(out_str) finally: - ocf.execute_command(br, 'factoryreset') - ocf.execute_command(cli, 'factoryreset') + ocf.stop_thread(cli) + ocf.stop_thread(br) time.sleep(3) # Case 6: discover dervice published by Wi-Fi device @pytest.mark.supported_targets @pytest.mark.openthread_br -@pytest.mark.flaky(reruns=1, reruns_delay=1) +@pytest.mark.flaky(reruns=1, reruns_delay=5) @pytest.mark.parametrize( 'config, count, app_path, target, port', [ @@ -436,7 +438,7 @@ def test_service_discovery_of_Thread_device( indirect=True, ) def test_service_discovery_of_WiFi_device( - Init_interface: bool, Init_avahi: bool, dut: tuple[IdfDut, IdfDut, IdfDut] + Init_interface: bool, Init_avahi: bool, dut: Tuple[IdfDut, IdfDut, IdfDut] ) -> None: br = dut[2] cli = dut[1] @@ -481,15 +483,15 @@ def test_service_discovery_of_WiFi_device( finally: ocf.host_close_service() sp.terminate() - ocf.execute_command(br, 'factoryreset') - ocf.execute_command(cli, 'factoryreset') + ocf.stop_thread(cli) + ocf.stop_thread(br) time.sleep(3) # Case 7: ICMP communication via NAT64 @pytest.mark.supported_targets @pytest.mark.openthread_br -@pytest.mark.flaky(reruns=1, reruns_delay=1) +@pytest.mark.flaky(reruns=1, reruns_delay=5) @pytest.mark.parametrize( 'config, count, app_path, target, port', [ @@ -506,7 +508,7 @@ def test_service_discovery_of_WiFi_device( ], indirect=True, ) -def test_ICMP_NAT64(Init_interface: bool, dut: tuple[IdfDut, IdfDut, IdfDut]) -> None: +def test_ICMP_NAT64(Init_interface: bool, dut: Tuple[IdfDut, IdfDut, IdfDut]) -> None: br = dut[2] cli = dut[1] assert Init_interface @@ -520,15 +522,15 @@ def test_ICMP_NAT64(Init_interface: bool, dut: tuple[IdfDut, IdfDut, IdfDut]) -> rx_nums = ocf.ot_ping(cli, str(host_ipv4_address), count=5)[1] assert rx_nums != 0 finally: - ocf.execute_command(br, 'factoryreset') - ocf.execute_command(cli, 'factoryreset') + ocf.stop_thread(cli) + ocf.stop_thread(br) time.sleep(3) # Case 8: UDP communication via NAT64 @pytest.mark.supported_targets @pytest.mark.openthread_br -@pytest.mark.flaky(reruns=1, reruns_delay=1) +@pytest.mark.flaky(reruns=1, reruns_delay=5) @pytest.mark.parametrize( 'config, count, app_path, target, port', [ @@ -545,7 +547,7 @@ def test_ICMP_NAT64(Init_interface: bool, dut: tuple[IdfDut, IdfDut, IdfDut]) -> ], indirect=True, ) -def test_UDP_NAT64(Init_interface: bool, dut: tuple[IdfDut, IdfDut, IdfDut]) -> None: +def test_UDP_NAT64(Init_interface: bool, dut: Tuple[IdfDut, IdfDut, IdfDut]) -> None: br = dut[2] cli = dut[1] assert Init_interface @@ -576,8 +578,8 @@ def test_UDP_NAT64(Init_interface: bool, dut: tuple[IdfDut, IdfDut, IdfDut]) -> while udp_mission.is_alive(): time.sleep(1) finally: - ocf.execute_command(br, 'factoryreset') - ocf.execute_command(cli, 'factoryreset') + ocf.stop_thread(cli) + ocf.stop_thread(br) time.sleep(3) assert b'hello' in myudp.udp_bytes @@ -585,7 +587,7 @@ def test_UDP_NAT64(Init_interface: bool, dut: tuple[IdfDut, IdfDut, IdfDut]) -> # Case 9: TCP communication via NAT64 @pytest.mark.supported_targets @pytest.mark.openthread_br -@pytest.mark.flaky(reruns=1, reruns_delay=1) +@pytest.mark.flaky(reruns=1, reruns_delay=5) @pytest.mark.parametrize( 'config, count, app_path, target, port', [ @@ -602,7 +604,7 @@ def test_UDP_NAT64(Init_interface: bool, dut: tuple[IdfDut, IdfDut, IdfDut]) -> ], indirect=True, ) -def test_TCP_NAT64(Init_interface: bool, dut: tuple[IdfDut, IdfDut, IdfDut]) -> None: +def test_TCP_NAT64(Init_interface: bool, dut: Tuple[IdfDut, IdfDut, IdfDut]) -> None: br = dut[2] cli = dut[1] assert Init_interface @@ -639,8 +641,8 @@ def test_TCP_NAT64(Init_interface: bool, dut: tuple[IdfDut, IdfDut, IdfDut]) -> while tcp_mission.is_alive(): time.sleep(1) finally: - ocf.execute_command(br, 'factoryreset') - ocf.execute_command(cli, 'factoryreset') + ocf.stop_thread(cli) + ocf.stop_thread(br) time.sleep(3) assert b'hello' in mytcp.tcp_bytes @@ -649,6 +651,7 @@ def test_TCP_NAT64(Init_interface: bool, dut: tuple[IdfDut, IdfDut, IdfDut]) -> @pytest.mark.esp32h2 @pytest.mark.esp32c6 @pytest.mark.openthread_sleep +@pytest.mark.flaky(reruns=1, reruns_delay=5) @pytest.mark.parametrize( 'config, count, app_path, target, port', [ @@ -673,9 +676,10 @@ def test_TCP_NAT64(Init_interface: bool, dut: tuple[IdfDut, IdfDut, IdfDut]) -> ], indirect=True, ) -def test_ot_sleepy_device(dut: tuple[IdfDut, IdfDut]) -> None: +def test_ot_sleepy_device(dut: Tuple[IdfDut, IdfDut]) -> None: leader = dut[0] sleepy_device = dut[1] + ocf.hardreset_dut(sleepy_device) fail_info = re.compile(r'Core\W*?\d\W*?register dump') try: ocf.init_thread(leader) @@ -702,13 +706,14 @@ def test_ot_sleepy_device(dut: tuple[IdfDut, IdfDut]) -> None: assert not bool(fail_info.search(str(output))) finally: ocf.execute_command(leader, 'factoryreset') + ocf.hardreset_dut(sleepy_device) time.sleep(3) # Case 11: Basic startup Test of BR @pytest.mark.supported_targets @pytest.mark.openthread_br -@pytest.mark.flaky(reruns=1, reruns_delay=1) +@pytest.mark.flaky(reruns=1, reruns_delay=5) @pytest.mark.parametrize( 'config, count, app_path, target, port', [ @@ -724,7 +729,7 @@ def test_ot_sleepy_device(dut: tuple[IdfDut, IdfDut]) -> None: ], indirect=True, ) -def test_basic_startup(dut: tuple[IdfDut, IdfDut]) -> None: +def test_basic_startup(dut: Tuple[IdfDut, IdfDut]) -> None: br = dut[1] dut[0].serial.stop_redirect_thread() try: @@ -743,14 +748,14 @@ def test_basic_startup(dut: tuple[IdfDut, IdfDut]) -> None: br.expect('Done', timeout=5) assert ocf.wait_for_join(br, 'leader') finally: - ocf.execute_command(br, 'factoryreset') + ocf.stop_thread(br) time.sleep(3) # Case 12: Curl a website via DNS and NAT64 @pytest.mark.supported_targets @pytest.mark.openthread_bbr -@pytest.mark.flaky(reruns=1, reruns_delay=1) +@pytest.mark.flaky(reruns=1, reruns_delay=5) @pytest.mark.parametrize( 'config, count, app_path, target, port', [ @@ -767,7 +772,7 @@ def test_basic_startup(dut: tuple[IdfDut, IdfDut]) -> None: ], indirect=True, ) -def test_NAT64_DNS(Init_interface: bool, dut: tuple[IdfDut, IdfDut, IdfDut]) -> None: +def test_NAT64_DNS(Init_interface: bool, dut: Tuple[IdfDut, IdfDut, IdfDut]) -> None: br = dut[2] cli = dut[1] assert Init_interface @@ -775,24 +780,25 @@ def test_NAT64_DNS(Init_interface: bool, dut: tuple[IdfDut, IdfDut, IdfDut]) -> formBasicWiFiThreadNetwork(br, cli) try: + ocf.wait_for_host_network() ocf.execute_command(br, 'bbr') br.expect('server16', timeout=5) ocf.execute_command(cli, 'dns64server 8.8.8.8') cli.expect('Done', timeout=5) command = 'curl http://www.espressif.com' message = ocf.get_ouput_string(cli, command, 10) - assert '' in str(message) + assert 'html' in str(message) assert '301 Moved Permanently' in str(message) finally: - ocf.execute_command(br, 'factoryreset') - ocf.execute_command(cli, 'factoryreset') + ocf.stop_thread(cli) + ocf.stop_thread(br) time.sleep(3) # Case 13: Meshcop discovery of Border Router @pytest.mark.supported_targets @pytest.mark.openthread_br -@pytest.mark.flaky(reruns=1, reruns_delay=1) +@pytest.mark.flaky(reruns=1, reruns_delay=5) @pytest.mark.parametrize( 'config, count, app_path, target, port', [ @@ -808,7 +814,7 @@ def test_NAT64_DNS(Init_interface: bool, dut: tuple[IdfDut, IdfDut, IdfDut]) -> ], indirect=True, ) -def test_br_meshcop(Init_interface: bool, Init_avahi: bool, dut: tuple[IdfDut, IdfDut]) -> None: +def test_br_meshcop(Init_interface: bool, Init_avahi: bool, dut: Tuple[IdfDut, IdfDut]) -> None: br = dut[1] assert Init_interface assert Init_avahi @@ -847,14 +853,14 @@ def test_br_meshcop(Init_interface: bool, Init_avahi: bool, dut: tuple[IdfDut, I assert 'vn=OpenThread' in str(output_str) assert 'rv=1' in str(output_str) finally: - ocf.execute_command(br, 'factoryreset') + ocf.stop_thread(br) time.sleep(3) # Case 14: Curl a website over HTTPS via DNS and NAT64 @pytest.mark.supported_targets @pytest.mark.openthread_bbr -@pytest.mark.flaky(reruns=1, reruns_delay=1) +@pytest.mark.flaky(reruns=1, reruns_delay=5) @pytest.mark.parametrize( 'config, count, app_path, target, port', [ @@ -871,7 +877,7 @@ def test_br_meshcop(Init_interface: bool, Init_avahi: bool, dut: tuple[IdfDut, I ], indirect=True, ) -def test_https_NAT64_DNS(Init_interface: bool, dut: tuple[IdfDut, IdfDut, IdfDut]) -> None: +def test_https_NAT64_DNS(Init_interface: bool, dut: Tuple[IdfDut, IdfDut, IdfDut]) -> None: br = dut[2] cli = dut[1] assert Init_interface @@ -879,22 +885,23 @@ def test_https_NAT64_DNS(Init_interface: bool, dut: tuple[IdfDut, IdfDut, IdfDut formBasicWiFiThreadNetwork(br, cli) try: + ocf.wait_for_host_network() ocf.execute_command(cli, 'dns64server 8.8.8.8') cli.expect('Done', timeout=5) command = 'curl https://www.example.com/' message = ocf.get_ouput_string(cli, command, 20) - assert '' in str(message) - assert 'This domain is for use in illustrative examples in documents' in str(message) + assert 'html' in str(message) + assert 'This domain is for use in' in str(message) finally: - ocf.execute_command(br, 'factoryreset') - ocf.execute_command(cli, 'factoryreset') + ocf.stop_thread(cli) + ocf.stop_thread(br) time.sleep(3) # Case 15: Thread network formation and attaching with TREL @pytest.mark.supported_targets @pytest.mark.openthread_br -@pytest.mark.flaky(reruns=1, reruns_delay=1) +@pytest.mark.flaky(reruns=1, reruns_delay=5) @pytest.mark.parametrize( 'config, count, app_path, target, port', [ @@ -910,7 +917,7 @@ def test_https_NAT64_DNS(Init_interface: bool, dut: tuple[IdfDut, IdfDut, IdfDut ], indirect=True, ) -def test_trel_connect(dut: tuple[IdfDut, IdfDut]) -> None: +def test_trel_connect(dut: Tuple[IdfDut, IdfDut]) -> None: trel_s3 = dut[1] trel_c6 = dut[0] trel_list = [trel_c6] @@ -940,16 +947,16 @@ def test_trel_connect(dut: tuple[IdfDut, IdfDut]) -> None: rx_nums = ocf.ot_ping(trel_s3, trel_mleid_addr, count=10)[1] assert rx_nums > 5 finally: - ocf.execute_command(trel_s3, 'factoryreset') for trel in trel_list: - ocf.execute_command(trel, 'factoryreset') + ocf.stop_thread(trel) + ocf.stop_thread(trel_s3) time.sleep(3) # Case 16: Thread network BR lib check @pytest.mark.supported_targets @pytest.mark.openthread_br -@pytest.mark.flaky(reruns=1, reruns_delay=1) +@pytest.mark.flaky(reruns=1, reruns_delay=5) @pytest.mark.parametrize( 'config, count, app_path, target, port', [ @@ -965,7 +972,7 @@ def test_trel_connect(dut: tuple[IdfDut, IdfDut]) -> None: ], indirect=True, ) -def test_br_lib_check(dut: tuple[IdfDut, IdfDut]) -> None: +def test_br_lib_check(dut: Tuple[IdfDut, IdfDut]) -> None: br = dut[1] dut[0].serial.stop_redirect_thread() try: @@ -979,6 +986,7 @@ def test_br_lib_check(dut: tuple[IdfDut, IdfDut]) -> None: # Case 17: SSED test @pytest.mark.openthread_sleep +@pytest.mark.flaky(reruns=1, reruns_delay=5) @pytest.mark.parametrize( 'config, count, app_path, target, port', [ @@ -1003,10 +1011,11 @@ def test_br_lib_check(dut: tuple[IdfDut, IdfDut]) -> None: ], indirect=True, ) -def test_ot_ssed_device(dut: tuple[IdfDut, IdfDut]) -> None: +def test_ot_ssed_device(dut: Tuple[IdfDut, IdfDut]) -> None: leader = dut[0] ssed_device = dut[1] try: + ocf.hardreset_dut(ssed_device) # CI device must have external XTAL to run SSED case, we will check this here first ssed_device.expect('32k XTAL in use', timeout=10) ocf.init_thread(leader) @@ -1046,4 +1055,5 @@ def test_ot_ssed_device(dut: tuple[IdfDut, IdfDut]) -> None: ocf.ping_and_check(dut=leader, target=ssed_address, tx_total=10, timeout=6) finally: ocf.execute_command(leader, 'factoryreset') + ocf.hardreset_dut(ssed_device) time.sleep(3)