From c23218fead1fc4a20e707e96bd104f7df416253a Mon Sep 17 00:00:00 2001 From: Xu Si Yu Date: Tue, 14 Oct 2025 17:34:04 +0800 Subject: [PATCH 1/2] feat(openthread): output logs of host for debugging CI issues --- examples/openthread/ot_ci_function.py | 67 ++++++++++++++------------- examples/openthread/pytest_otbr.py | 67 ++++++++++++++------------- 2 files changed, 69 insertions(+), 65 deletions(-) diff --git a/examples/openthread/ot_ci_function.py b/examples/openthread/ot_ci_function.py index 2b267e62ec..9c594894c3 100644 --- a/examples/openthread/ot_ci_function.py +++ b/examples/openthread/ot_ci_function.py @@ -2,6 +2,7 @@ # SPDX-License-Identifier: Unlicense OR CC0-1.0 # !/usr/bin/env python3 # this file defines some functions for testing cli and br under pytest framework +import logging import os import re import socket @@ -28,7 +29,7 @@ def extract_address( try: result = dut.expect(pattern, timeout=5)[1].decode() except Exception as e: - print(f'Error: {e}') + logging.error(f'Error: {e}') return default_return return func(result) @@ -151,7 +152,7 @@ def getDeviceRole(dut: IdfDut) -> str: wait(dut, 1) execute_command(dut, 'state') role = dut.expect(r'\W+(\w+)\W+Done', timeout=5)[1].decode() - print(role) + logging.info(role) return str(role) @@ -310,7 +311,7 @@ def get_host_interface_name() -> str: interface_name = config.get('interface_name') if interface_name: if interface_name == 'eth0': - print( + logging.warning( f"Warning: 'eth0' is not recommended as a valid network interface. " f"Please check and update the 'interface_name' in the configuration file: " f'{config_path}' @@ -318,9 +319,9 @@ def get_host_interface_name() -> str: else: return str(interface_name) else: - print("Warning: Configuration file found but 'interface_name' is not defined.") + logging.warning("Warning: Configuration file found but 'interface_name' is not defined.") except Exception as e: - print(f'Error: Failed to read or parse {config_path}. Details: {e}') + logging.error(f'Error: Failed to read or parse {config_path}. Details: {e}') if 'eth1' in netifaces.interfaces(): return 'eth1' @@ -338,8 +339,8 @@ def check_if_host_receive_ra(br: IdfDut) -> bool: omrprefix = get_omrprefix(br) command = 'ip -6 route | grep ' + str(interface_name) out_str = subprocess.getoutput(command) - print('br omrprefix: ', str(omrprefix)) - print('host route table:\n', str(out_str)) + logging.info(f'br omrprefix: {omrprefix}') + logging.info(f'host route table:\n {out_str}') return str(omrprefix) in str(out_str) @@ -404,7 +405,7 @@ def create_host_udp_server(myudp: udp_parameter) -> None: AF_INET = socket.AF_INET6 else: AF_INET = socket.AF_INET - print('The host start to create udp server!') + logging.info('The host start to create udp server!') if_index = socket.if_nametoindex(interface_name) sock = socket.socket(AF_INET, socket.SOCK_DGRAM) sock.bind((myudp.addr, myudp.port)) @@ -417,13 +418,14 @@ def create_host_udp_server(myudp: udp_parameter) -> None: ) sock.settimeout(myudp.timeout) myudp.init_flag = True - print('The host start to receive message!') + logging.info('The host start to receive message!') myudp.udp_bytes = (sock.recvfrom(1024))[0] - print('The host has received message: ', myudp.udp_bytes) + udp_str = str(myudp.udp_bytes) + logging.info(f'The host has received message: {udp_str}') except OSError: - print('The host did not receive message!') + logging.error('The host did not receive message!') finally: - print('Close the socket.') + logging.info('Close the socket.') sock.close() @@ -438,10 +440,10 @@ def host_udp_send_message(udp_target: udp_parameter) -> None: sock.bind(('::', 12350)) sock.setsockopt(socket.SOL_SOCKET, socket.SO_BINDTODEVICE, interface_name.encode()) sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_HOPS, 32) - print('Host is sending message') + logging.info('Host is sending message') sock.sendto(udp_target.udp_bytes, (udp_target.addr, udp_target.port)) except OSError: - print('Host cannot send message') + logging.error('Host cannot send message') finally: sock.close() @@ -481,13 +483,13 @@ def host_close_service() -> None: command = 'ps auxww | grep avahi-publish-s' out_bytes = subprocess.check_output(command, shell=True, timeout=5) out_str = out_bytes.decode('utf-8') - print('host close service avahi status:\n', out_str) + logging.info(f'host close service avahi status:\n {out_str}') service_info = [line for line in out_str.splitlines() if 'testxxx _testxxx._udp' in line] for line in service_info: - print('Process:', line) + logging.info(f'Process:{line}') pid = line.split()[1] command = 'kill -9 ' + pid - print('kill ', pid) + logging.info(f'kill {pid}') subprocess.call(command, shell=True, timeout=5) time.sleep(1) @@ -520,24 +522,24 @@ def open_host_interface() -> None: def get_domain() -> str: hostname = socket.gethostname() - print('hostname is: ', hostname) + logging.info(f'hostname is: {hostname}') command = 'ps -auxww | grep avahi-daemon | grep running' out_str = subprocess.getoutput(command) - print('avahi status:\n', out_str) + logging.info(f'avahi status:\n {out_str}') role = re.findall(r'\[([\w\W]+)\.local\]', str(out_str))[0] - print('active host is: ', role) + logging.info(f'active host is: {role}') return str(role) def flush_ipv6_addr_by_interface() -> None: interface_name = get_host_interface_name() - print(f'flush ipv6 addr : {interface_name}') + logging.info(f'flush ipv6 addr : {interface_name}') command_show_addr = f'ip -6 addr show dev {interface_name}' command_show_route = f'ip -6 route show dev {interface_name}' addr_before = subprocess.getoutput(command_show_addr) route_before = subprocess.getoutput(command_show_route) - print(f'Before flush, IPv6 addresses: \n{addr_before}') - print(f'Before flush, IPv6 routes: \n{route_before}') + logging.info(f'Before flush, IPv6 addresses: \n{addr_before}') + logging.info(f'Before flush, IPv6 routes: \n{route_before}') subprocess.run(['ip', 'link', 'set', interface_name, 'down']) subprocess.run(['ip', '-6', 'addr', 'flush', 'dev', interface_name]) subprocess.run(['ip', '-6', 'route', 'flush', 'dev', interface_name]) @@ -545,8 +547,8 @@ def flush_ipv6_addr_by_interface() -> None: time.sleep(5) addr_after = subprocess.getoutput(command_show_addr) route_after = subprocess.getoutput(command_show_route) - print(f'After flush, IPv6 addresses: \n{addr_after}') - print(f'After flush, IPv6 routes: \n{route_after}') + logging.info(f'After flush, IPv6 addresses: \n{addr_after}') + logging.info(f'After flush, IPv6 routes: \n{route_after}') class tcp_parameter: @@ -575,28 +577,29 @@ def create_host_tcp_server(mytcp: tcp_parameter) -> None: AF_INET = socket.AF_INET6 else: AF_INET = socket.AF_INET - print('The host start to create a tcp server!') + logging.info('The host start to create a tcp server!') sock = socket.socket(AF_INET, socket.SOCK_STREAM) sock.bind((mytcp.addr, mytcp.port)) sock.listen(5) mytcp.listen_flag = True - print('The tcp server is waiting for connection!') + logging.info('The tcp server is waiting for connection!') sock.settimeout(mytcp.timeout) connfd, addr = sock.accept() - print('The tcp server connected with ', addr) + logging.info(f'The tcp server connected with {addr}') mytcp.recv_flag = True mytcp.tcp_bytes = connfd.recv(1024) - print('The tcp server has received message: ', mytcp.tcp_bytes) + tcp_str = str(mytcp.tcp_bytes) + logging.info(f'The tcp server has received message: {tcp_str}') except OSError: if mytcp.recv_flag: - print('The tcp server did not receive message!') + logging.error('The tcp server did not receive message!') else: - print('The tcp server fail to connect!') + logging.error('The tcp server fail to connect!') finally: - print('Close the socket.') + logging.info('Close the socket.') sock.close() diff --git a/examples/openthread/pytest_otbr.py b/examples/openthread/pytest_otbr.py index e3a968eec7..46d081e9f3 100644 --- a/examples/openthread/pytest_otbr.py +++ b/examples/openthread/pytest_otbr.py @@ -2,6 +2,7 @@ # SPDX-License-Identifier: Unlicense OR CC0-1.0 # !/usr/bin/env python3 import copy +import logging import os.path import random import re @@ -9,7 +10,6 @@ import secrets import subprocess import threading import time -from typing import Tuple import ot_ci_function as ocf import pexpect @@ -76,7 +76,7 @@ from pytest_embedded_idf.dut import IdfDut @pytest.fixture(scope='module', name='Init_avahi') def fixture_Init_avahi() -> bool: - print('Init Avahi') + logging.info('Init Avahi') ocf.start_avahi() time.sleep(10) return True @@ -84,7 +84,7 @@ def fixture_Init_avahi() -> bool: @pytest.fixture(name='Init_interface') def fixture_Init_interface() -> bool: - print('Init interface') + logging.info('Init interface') ocf.flush_ipv6_addr_by_interface() # The sleep time is set based on experience; reducing it might cause the host to be unready. time.sleep(30) @@ -131,7 +131,7 @@ PORT_MAPPING = {'ESPPORT1': 'esp32h2', 'ESPPORT2': 'esp32s3', 'ESPPORT3': 'esp32 ], indirect=True, ) -def test_thread_connect(dut: Tuple[IdfDut, IdfDut, IdfDut]) -> None: +def test_thread_connect(dut: tuple[IdfDut, IdfDut, IdfDut]) -> None: br = dut[2] cli_h2 = dut[1] dut[0].serial.stop_redirect_thread() @@ -204,7 +204,7 @@ def formBasicWiFiThreadNetwork(br: IdfDut, cli: IdfDut) -> None: ], indirect=True, ) -def test_Bidirectional_IPv6_connectivity(Init_interface: bool, dut: Tuple[IdfDut, IdfDut, IdfDut]) -> None: +def test_Bidirectional_IPv6_connectivity(Init_interface: bool, dut: tuple[IdfDut, IdfDut, IdfDut]) -> None: br = dut[2] cli = dut[1] assert Init_interface @@ -214,10 +214,10 @@ def test_Bidirectional_IPv6_connectivity(Init_interface: bool, dut: Tuple[IdfDut try: assert ocf.is_joined_wifi_network(br) cli_global_unicast_addr = ocf.get_global_unicast_addr(cli, br) - print('cli_global_unicast_addr', cli_global_unicast_addr) + logging.info(f'cli_global_unicast_addr {cli_global_unicast_addr}') command = 'ping ' + str(cli_global_unicast_addr) + ' -c 10' out_str = subprocess.getoutput(command) - print('ping result:\n', str(out_str)) + logging.info(f'ping result:\n{out_str}') role = re.findall(r' (\d+)%', str(out_str))[0] assert role != '100' interface_name = ocf.get_host_interface_name() @@ -225,7 +225,8 @@ def test_Bidirectional_IPv6_connectivity(Init_interface: bool, dut: Tuple[IdfDut out_bytes = subprocess.check_output(command, shell=True, timeout=5) out_str = out_bytes.decode('utf-8') onlinkprefix = ocf.get_onlinkprefix(br) - host_global_unicast_addr = re.findall(r'\W+(%s(?:\w+:){3}\w+)\W+' % onlinkprefix, str(out_str)) + pattern = rf'\W+({onlinkprefix}(?:\w+:){{3}}\w+)\W+' + host_global_unicast_addr = re.findall(pattern, out_str) rx_nums = 0 for ip_addr in host_global_unicast_addr: txrx_nums = ocf.ot_ping(cli, str(ip_addr), count=5) @@ -256,7 +257,7 @@ def test_Bidirectional_IPv6_connectivity(Init_interface: bool, dut: Tuple[IdfDut ], indirect=True, ) -def test_multicast_forwarding_A(Init_interface: bool, dut: Tuple[IdfDut, IdfDut, IdfDut]) -> None: +def test_multicast_forwarding_A(Init_interface: bool, dut: tuple[IdfDut, IdfDut, IdfDut]) -> None: br = dut[2] cli = dut[1] assert Init_interface @@ -271,7 +272,7 @@ def test_multicast_forwarding_A(Init_interface: bool, dut: Tuple[IdfDut, IdfDut, interface_name = ocf.get_host_interface_name() command = 'ping -I ' + str(interface_name) + ' -t 64 ff04::125 -c 10' out_str = subprocess.getoutput(command) - print('ping result:\n', str(out_str)) + logging.info(f'ping result:\n{out_str}') role = re.findall(r' (\d+)%', str(out_str))[0] assert role != '100' ocf.execute_command(cli, 'udp open') @@ -309,7 +310,7 @@ def test_multicast_forwarding_A(Init_interface: bool, dut: Tuple[IdfDut, IdfDut, ], indirect=True, ) -def test_multicast_forwarding_B(Init_interface: bool, dut: Tuple[IdfDut, IdfDut, IdfDut]) -> None: +def test_multicast_forwarding_B(Init_interface: bool, dut: tuple[IdfDut, IdfDut, IdfDut]) -> None: br = dut[2] cli = dut[1] assert Init_interface @@ -364,7 +365,7 @@ def test_multicast_forwarding_B(Init_interface: bool, dut: Tuple[IdfDut, IdfDut, indirect=True, ) def test_service_discovery_of_Thread_device( - Init_interface: bool, Init_avahi: bool, dut: Tuple[IdfDut, IdfDut, IdfDut] + Init_interface: bool, Init_avahi: bool, dut: tuple[IdfDut, IdfDut, IdfDut] ) -> None: br = dut[2] cli = dut[1] @@ -377,14 +378,14 @@ def test_service_discovery_of_Thread_device( assert ocf.is_joined_wifi_network(br) command = 'avahi-browse -rt _testyyy._udp' out_str = subprocess.getoutput(command) - print('avahi-browse:\n', str(out_str)) + logging.info(f'avahi-browse:\n{out_str}') assert 'myTest' not in str(out_str) hostname = 'myTest' command = 'srp client host name ' + hostname ocf.execute_command(cli, command) cli.expect('Done', timeout=5) cli_global_unicast_addr = ocf.get_global_unicast_addr(cli, br) - print('cli_global_unicast_addr', cli_global_unicast_addr) + logging.info(f'cli_global_unicast_addr {cli_global_unicast_addr}') command = 'srp client host address ' + str(cli_global_unicast_addr) ocf.execute_command(cli, command) cli.expect('Done', timeout=5) @@ -397,7 +398,7 @@ def test_service_discovery_of_Thread_device( ocf.wait(cli, 3) command = 'avahi-browse -rt _testyyy._udp' out_str = subprocess.getoutput(command) - print('avahi-browse:\n', str(out_str)) + logging.info(f'avahi-browse:\n {out_str}') assert 'myTest' in str(out_str) finally: ocf.execute_command(br, 'factoryreset') @@ -425,7 +426,7 @@ def test_service_discovery_of_Thread_device( indirect=True, ) def test_service_discovery_of_WiFi_device( - Init_interface: bool, Init_avahi: bool, dut: Tuple[IdfDut, IdfDut, IdfDut] + Init_interface: bool, Init_avahi: bool, dut: tuple[IdfDut, IdfDut, IdfDut] ) -> None: br = dut[2] cli = dut[1] @@ -442,7 +443,7 @@ def test_service_discovery_of_WiFi_device( cli.expect('Done', timeout=5) ocf.wait(cli, 1) domain_name = ocf.get_domain() - print('domain name is: ', domain_name) + logging.info(f'domain name is: {domain_name}') command = 'dns resolve ' + domain_name + '.default.service.arpa.' ocf.execute_command(cli, command) @@ -494,7 +495,7 @@ def test_service_discovery_of_WiFi_device( ], indirect=True, ) -def test_ICMP_NAT64(Init_interface: bool, dut: Tuple[IdfDut, IdfDut, IdfDut]) -> None: +def test_ICMP_NAT64(Init_interface: bool, dut: tuple[IdfDut, IdfDut, IdfDut]) -> None: br = dut[2] cli = dut[1] assert Init_interface @@ -504,7 +505,7 @@ def test_ICMP_NAT64(Init_interface: bool, dut: Tuple[IdfDut, IdfDut, IdfDut]) -> try: assert ocf.is_joined_wifi_network(br) host_ipv4_address = ocf.get_host_ipv4_address() - print('host_ipv4_address: ', host_ipv4_address) + logging.info(f'host_ipv4_address: {host_ipv4_address}') rx_nums = ocf.ot_ping(cli, str(host_ipv4_address), count=5)[1] assert rx_nums != 0 finally: @@ -532,7 +533,7 @@ def test_ICMP_NAT64(Init_interface: bool, dut: Tuple[IdfDut, IdfDut, IdfDut]) -> ], indirect=True, ) -def test_UDP_NAT64(Init_interface: bool, dut: Tuple[IdfDut, IdfDut, IdfDut]) -> None: +def test_UDP_NAT64(Init_interface: bool, dut: tuple[IdfDut, IdfDut, IdfDut]) -> None: br = dut[2] cli = dut[1] assert Init_interface @@ -547,7 +548,7 @@ def test_UDP_NAT64(Init_interface: bool, dut: Tuple[IdfDut, IdfDut, IdfDut]) -> cli.expect('Done', timeout=5) ocf.wait(cli, 3) host_ipv4_address = ocf.get_host_ipv4_address() - print('host_ipv4_address: ', host_ipv4_address) + logging.info(f'host_ipv4_address: {host_ipv4_address}') myudp = ocf.udp_parameter('INET4', host_ipv4_address, 5090, '', False, 15.0, b'') udp_mission = threading.Thread(target=ocf.create_host_udp_server, args=(myudp,)) udp_mission.start() @@ -588,7 +589,7 @@ def test_UDP_NAT64(Init_interface: bool, dut: Tuple[IdfDut, IdfDut, IdfDut]) -> ], indirect=True, ) -def test_TCP_NAT64(Init_interface: bool, dut: Tuple[IdfDut, IdfDut, IdfDut]) -> None: +def test_TCP_NAT64(Init_interface: bool, dut: tuple[IdfDut, IdfDut, IdfDut]) -> None: br = dut[2] cli = dut[1] assert Init_interface @@ -604,7 +605,7 @@ def test_TCP_NAT64(Init_interface: bool, dut: Tuple[IdfDut, IdfDut, IdfDut]) -> ocf.wait(cli, 3) host_ipv4_address = ocf.get_host_ipv4_address() connect_address = ocf.get_ipv6_from_ipv4(host_ipv4_address, br) - print('connect_address is: ', connect_address) + logging.info(f'connect_address is: {connect_address}') mytcp = ocf.tcp_parameter('INET4', host_ipv4_address, 12345, False, False, 15.0, b'') tcp_mission = threading.Thread(target=ocf.create_host_tcp_server, args=(mytcp,)) tcp_mission.start() @@ -657,7 +658,7 @@ def test_TCP_NAT64(Init_interface: bool, dut: Tuple[IdfDut, IdfDut, IdfDut]) -> ], indirect=True, ) -def test_ot_sleepy_device(dut: Tuple[IdfDut, IdfDut]) -> None: +def test_ot_sleepy_device(dut: tuple[IdfDut, IdfDut]) -> None: leader = dut[0] sleepy_device = dut[1] fail_info = re.compile(r'Core\W*?\d\W*?register dump') @@ -706,7 +707,7 @@ def test_ot_sleepy_device(dut: Tuple[IdfDut, IdfDut]) -> None: ], indirect=True, ) -def test_basic_startup(dut: Tuple[IdfDut, IdfDut]) -> None: +def test_basic_startup(dut: tuple[IdfDut, IdfDut]) -> None: br = dut[1] dut[0].serial.stop_redirect_thread() try: @@ -748,7 +749,7 @@ def test_basic_startup(dut: Tuple[IdfDut, IdfDut]) -> None: ], indirect=True, ) -def test_NAT64_DNS(Init_interface: bool, dut: Tuple[IdfDut, IdfDut, IdfDut]) -> None: +def test_NAT64_DNS(Init_interface: bool, dut: tuple[IdfDut, IdfDut, IdfDut]) -> None: br = dut[2] cli = dut[1] assert Init_interface @@ -787,7 +788,7 @@ def test_NAT64_DNS(Init_interface: bool, dut: Tuple[IdfDut, IdfDut, IdfDut]) -> ], indirect=True, ) -def test_br_meshcop(Init_interface: bool, Init_avahi: bool, dut: Tuple[IdfDut, IdfDut]) -> None: +def test_br_meshcop(Init_interface: bool, Init_avahi: bool, dut: tuple[IdfDut, IdfDut]) -> None: br = dut[1] assert Init_interface assert Init_avahi @@ -813,9 +814,9 @@ def test_br_meshcop(Init_interface: bool, Init_avahi: bool, dut: Tuple[IdfDut, I except subprocess.CalledProcessError as e: output_bytes = e.stdout finally: - print('out_bytes: ', output_bytes) + logging.info(f'out_bytes: {output_bytes!r}') output_str = str(output_bytes) - print('out_str: ', output_str) + logging.info(f'out_str: {output_str}') assert 'hostname = [esp-ot-br.local]' in str(output_str) assert ('address = [' + ipv4_address + ']') in str(output_str) @@ -849,7 +850,7 @@ def test_br_meshcop(Init_interface: bool, Init_avahi: bool, dut: Tuple[IdfDut, I ], indirect=True, ) -def test_https_NAT64_DNS(Init_interface: bool, dut: Tuple[IdfDut, IdfDut, IdfDut]) -> None: +def test_https_NAT64_DNS(Init_interface: bool, dut: tuple[IdfDut, IdfDut, IdfDut]) -> None: br = dut[2] cli = dut[1] assert Init_interface @@ -887,7 +888,7 @@ def test_https_NAT64_DNS(Init_interface: bool, dut: Tuple[IdfDut, IdfDut, IdfDut ], indirect=True, ) -def test_trel_connect(dut: Tuple[IdfDut, IdfDut]) -> None: +def test_trel_connect(dut: tuple[IdfDut, IdfDut]) -> None: trel_s3 = dut[1] trel_c6 = dut[0] trel_list = [trel_c6] @@ -940,7 +941,7 @@ def test_trel_connect(dut: Tuple[IdfDut, IdfDut]) -> None: ], indirect=True, ) -def test_br_lib_check(dut: Tuple[IdfDut, IdfDut]) -> None: +def test_br_lib_check(dut: tuple[IdfDut, IdfDut]) -> None: br = dut[1] dut[0].serial.stop_redirect_thread() try: @@ -978,7 +979,7 @@ def test_br_lib_check(dut: Tuple[IdfDut, IdfDut]) -> None: ], indirect=True, ) -def test_ot_ssed_device(dut: Tuple[IdfDut, IdfDut]) -> None: +def test_ot_ssed_device(dut: tuple[IdfDut, IdfDut]) -> None: leader = dut[0] ssed_device = dut[1] try: From 20710cc14a6c3204106f2ca770e74f6d0a4fe48e Mon Sep 17 00:00:00 2001 From: yiwenxiu Date: Wed, 22 Oct 2025 19:04:48 +0800 Subject: [PATCH 2/2] feat(openthread): optimize fail cases in CI test --- examples/openthread/ot_ci_function.py | 111 +++++++++++++-------- examples/openthread/pytest_otbr.py | 134 ++++++++++++++------------ 2 files changed, 144 insertions(+), 101 deletions(-) diff --git a/examples/openthread/ot_ci_function.py b/examples/openthread/ot_ci_function.py index 9c594894c3..5a706852e6 100644 --- a/examples/openthread/ot_ci_function.py +++ b/examples/openthread/ot_ci_function.py @@ -9,8 +9,10 @@ import socket import struct import subprocess import time -from collections.abc import Callable from functools import wraps +from typing import Callable +from typing import Optional +from typing import Tuple import netifaces import pexpect @@ -19,19 +21,33 @@ from pytest_embedded_idf.dut import IdfDut def extract_address( - command: str, pattern: str, default_return: str = '' + command: str, + pattern: str, + default_return: str = '', + retries: int = 3, + delay: int = 2, ) -> Callable[[Callable[[str], str]], Callable[[IdfDut], str]]: def decorator(func: Callable[[str], str]) -> Callable[[IdfDut], str]: @wraps(func) def wrapper(dut: IdfDut) -> str: - clean_buffer(dut) - execute_command(dut, command) - try: - result = dut.expect(pattern, timeout=5)[1].decode() - except Exception as e: - logging.error(f'Error: {e}') - return default_return - return func(result) + # requires Python3.10 + # last_exception: Exception | None = None + last_exception: Optional[Exception] = None + for attempt in range(1, retries + 1): + try: + clean_buffer(dut) + execute_command(dut, command) + result = dut.expect(pattern, timeout=5)[1].decode() + return func(result) + except Exception as e: + logging.exception(f'[{command}] Attempt {attempt}/{retries} failed: {e}') + last_exception = e + if attempt < retries: + time.sleep(delay) + + if last_exception: + logging.exception(f'[{command}] Giving up after {retries} retries.') + return default_return return wrapper @@ -133,7 +149,7 @@ def wait_for_join(dut: IdfDut, role: str) -> bool: return False -def joinWiFiNetwork(dut: IdfDut, wifi: wifi_parameter) -> tuple[str, int]: +def joinWiFiNetwork(dut: IdfDut, wifi: wifi_parameter) -> Tuple[str, int]: clean_buffer(dut) ip_address = '' for order in range(1, wifi.retry_times): @@ -173,6 +189,12 @@ def init_thread(dut: IdfDut) -> None: reset_thread(dut) +def stop_thread(dut: IdfDut) -> None: + execute_command(dut, 'thread stop') + dut.expect('disabled', timeout=20) + reset_thread(dut) + + def reset_thread(dut: IdfDut) -> None: execute_command(dut, 'factoryreset') dut.expect('OpenThread attached to netif', timeout=20) @@ -180,28 +202,28 @@ def reset_thread(dut: IdfDut) -> None: clean_buffer(dut) +def hardreset_dut(dut: IdfDut) -> None: + dut.serial.hard_reset() + time.sleep(5) + execute_command(dut, 'factoryreset') + + # get the mleid address of the thread -def get_mleid_addr(dut: IdfDut) -> str: - dut_adress = '' - execute_command(dut, 'ipaddr mleid') - dut_adress = dut.expect(r'\n((?:\w+:){7}\w+)\r', timeout=5)[1].decode() - return str(dut_adress) +@extract_address('ipaddr mleid', r'\n((?:\w+:){7}\w+)\r') +def get_mleid_addr(addr: str) -> str: + return addr # get the rloc address of the thread -def get_rloc_addr(dut: IdfDut) -> str: - dut_adress = '' - execute_command(dut, 'ipaddr rloc') - dut_adress = dut.expect(r'\n((?:\w+:){7}\w+)\r', timeout=5)[1].decode() - return str(dut_adress) +@extract_address('ipaddr rloc', r'\n((?:\w+:){7}\w+)\r') +def get_rloc_addr(addr: str) -> str: + return addr # get the linklocal address of the thread -def get_linklocal_addr(dut: IdfDut) -> str: - dut_adress = '' - execute_command(dut, 'ipaddr linklocal') - dut_adress = dut.expect(r'\n((?:\w+:){7}\w+)\r', timeout=5)[1].decode() - return str(dut_adress) +@extract_address('ipaddr linklocal', r'\n((?:\w+:){7}\w+)\r') +def get_linklocal_addr(addr: str) -> str: + return addr # get the global unicast address of the thread: @@ -222,7 +244,7 @@ def get_rloc16_addr(rloc16: str) -> str: # ping of thread def ot_ping( dut: IdfDut, target: str, timeout: int = 5, count: int = 1, size: int = 56, interval: int = 1, hoplimit: int = 64 -) -> tuple[int, int]: +) -> Tuple[int, int]: command = f'ping {str(target)} {size} {count} {interval} {hoplimit} {str(timeout)}' execute_command(dut, command) transmitted = dut.expect(r'(\d+) packets transmitted', timeout=60)[1].decode() @@ -619,22 +641,19 @@ def decimal_to_hex(decimal_str: str) -> str: return hex_str -def get_omrprefix(br: IdfDut) -> str: - execute_command(br, 'br omrprefix') - omrprefix = br.expect(r'Local: ((?:\w+:){4}):/\d+\r', timeout=5)[1].decode() - return str(omrprefix) +@extract_address('br omrprefix', r'Local: ((?:\w+:){4}):/\d+\r') +def get_omrprefix(addr: str) -> str: + return addr -def get_onlinkprefix(br: IdfDut) -> str: - execute_command(br, 'br onlinkprefix') - onlinkprefix = br.expect(r'Local: ((?:\w+:){4}):/\d+\r', timeout=5)[1].decode() - return str(onlinkprefix) +@extract_address('br onlinkprefix', r'Local: ((?:\w+:){4}):/\d+\r') +def get_onlinkprefix(addr: str) -> str: + return addr -def get_nat64prefix(br: IdfDut) -> str: - execute_command(br, 'br nat64prefix') - nat64prefix = br.expect(r'Local: ((?:\w+:){6}):/\d+', timeout=5)[1].decode() - return str(nat64prefix) +@extract_address('br nat64prefix', r'Local: ((?:\w+:){6}):/\d+') +def get_nat64prefix(addr: str) -> str: + return addr def execute_command(dut: IdfDut, command: str, prefix: str = 'ot ') -> None: @@ -647,3 +666,17 @@ def get_ouput_string(dut: IdfDut, command: str, wait_time: int) -> str: tmp = dut.expect(pexpect.TIMEOUT, timeout=wait_time) clean_buffer(dut) return str(tmp) + + +def wait_for_host_network(host: str = '8.8.8.8', retries: int = 6, interval: int = 10) -> None: + for attempt in range(1, retries + 1): + try: + subprocess.run(['ping', '-c', '1', '-W', '2', host], check=True) + logging.info(f'Host network reachable on attempt {attempt}') + return + except subprocess.CalledProcessError: + logging.info(f'Ping attempt {attempt} failed, retrying in {interval} seconds...') + if attempt < retries: + time.sleep(interval) + else: + raise RuntimeError(f'Host network is not reachable after {retries} attempts.') diff --git a/examples/openthread/pytest_otbr.py b/examples/openthread/pytest_otbr.py index 46d081e9f3..6379f83d5e 100644 --- a/examples/openthread/pytest_otbr.py +++ b/examples/openthread/pytest_otbr.py @@ -10,6 +10,7 @@ import secrets import subprocess import threading import time +from typing import Tuple import ot_ci_function as ocf import pexpect @@ -104,7 +105,7 @@ PORT_MAPPING = {'ESPPORT1': 'esp32h2', 'ESPPORT2': 'esp32s3', 'ESPPORT3': 'esp32 # Case 1: Thread network formation and attaching @pytest.mark.openthread_br -@pytest.mark.flaky(reruns=1, reruns_delay=1) +@pytest.mark.flaky(reruns=1, reruns_delay=5) @pytest.mark.parametrize( 'config, count, app_path, target, port', [ @@ -131,7 +132,7 @@ PORT_MAPPING = {'ESPPORT1': 'esp32h2', 'ESPPORT2': 'esp32s3', 'ESPPORT3': 'esp32 ], indirect=True, ) -def test_thread_connect(dut: tuple[IdfDut, IdfDut, IdfDut]) -> None: +def test_thread_connect(dut: Tuple[IdfDut, IdfDut, IdfDut]) -> None: br = dut[2] cli_h2 = dut[1] dut[0].serial.stop_redirect_thread() @@ -159,9 +160,9 @@ def test_thread_connect(dut: tuple[IdfDut, IdfDut, IdfDut]) -> None: rx_nums = ocf.ot_ping(br, cli_mleid_addr, count=5)[1] assert rx_nums == 5 finally: - ocf.execute_command(br, 'factoryreset') for cli in cli_list: - ocf.execute_command(cli, 'factoryreset') + ocf.stop_thread(cli) + ocf.stop_thread(br) time.sleep(3) @@ -187,7 +188,7 @@ def formBasicWiFiThreadNetwork(br: IdfDut, cli: IdfDut) -> None: # Case 2: Bidirectional IPv6 connectivity @pytest.mark.openthread_br -@pytest.mark.flaky(reruns=1, reruns_delay=1) +@pytest.mark.flaky(reruns=1, reruns_delay=5) @pytest.mark.parametrize( 'config, count, app_path, target, port', [ @@ -204,7 +205,7 @@ def formBasicWiFiThreadNetwork(br: IdfDut, cli: IdfDut) -> None: ], indirect=True, ) -def test_Bidirectional_IPv6_connectivity(Init_interface: bool, dut: tuple[IdfDut, IdfDut, IdfDut]) -> None: +def test_Bidirectional_IPv6_connectivity(Init_interface: bool, dut: Tuple[IdfDut, IdfDut, IdfDut]) -> None: br = dut[2] cli = dut[1] assert Init_interface @@ -229,18 +230,19 @@ def test_Bidirectional_IPv6_connectivity(Init_interface: bool, dut: tuple[IdfDut host_global_unicast_addr = re.findall(pattern, out_str) rx_nums = 0 for ip_addr in host_global_unicast_addr: - txrx_nums = ocf.ot_ping(cli, str(ip_addr), count=5) + txrx_nums = ocf.ot_ping(cli, str(ip_addr), count=10) rx_nums = rx_nums + int(txrx_nums[1]) + logging.debug(f'rx_nums: {rx_nums}') assert rx_nums != 0 finally: - ocf.execute_command(br, 'factoryreset') - ocf.execute_command(cli, 'factoryreset') + ocf.stop_thread(cli) + ocf.stop_thread(br) time.sleep(3) # Case 3: Multicast forwarding from Wi-Fi to Thread network @pytest.mark.openthread_br -@pytest.mark.flaky(reruns=1, reruns_delay=1) +@pytest.mark.flaky(reruns=1, reruns_delay=5) @pytest.mark.parametrize( 'config, count, app_path, target, port', [ @@ -257,7 +259,7 @@ def test_Bidirectional_IPv6_connectivity(Init_interface: bool, dut: tuple[IdfDut ], indirect=True, ) -def test_multicast_forwarding_A(Init_interface: bool, dut: tuple[IdfDut, IdfDut, IdfDut]) -> None: +def test_multicast_forwarding_A(Init_interface: bool, dut: Tuple[IdfDut, IdfDut, IdfDut]) -> None: br = dut[2] cli = dut[1] assert Init_interface @@ -286,14 +288,14 @@ def test_multicast_forwarding_A(Init_interface: bool, dut: tuple[IdfDut, IdfDut, ocf.execute_command(cli, 'udp close') cli.expect('Done', timeout=5) finally: - ocf.execute_command(br, 'factoryreset') - ocf.execute_command(cli, 'factoryreset') + ocf.stop_thread(cli) + ocf.stop_thread(br) time.sleep(3) # Case 4: Multicast forwarding from Thread to Wi-Fi network @pytest.mark.openthread_br -@pytest.mark.flaky(reruns=1, reruns_delay=1) +@pytest.mark.flaky(reruns=1, reruns_delay=5) @pytest.mark.parametrize( 'config, count, app_path, target, port', [ @@ -310,7 +312,7 @@ def test_multicast_forwarding_A(Init_interface: bool, dut: tuple[IdfDut, IdfDut, ], indirect=True, ) -def test_multicast_forwarding_B(Init_interface: bool, dut: tuple[IdfDut, IdfDut, IdfDut]) -> None: +def test_multicast_forwarding_B(Init_interface: bool, dut: Tuple[IdfDut, IdfDut, IdfDut]) -> None: br = dut[2] cli = dut[1] assert Init_interface @@ -339,15 +341,15 @@ def test_multicast_forwarding_B(Init_interface: bool, dut: tuple[IdfDut, IdfDut, while udp_mission.is_alive(): time.sleep(1) finally: - ocf.execute_command(br, 'factoryreset') - ocf.execute_command(cli, 'factoryreset') + ocf.stop_thread(cli) + ocf.stop_thread(br) time.sleep(3) assert b'hello' in myudp.udp_bytes # Case 5: discover dervice published by Thread device @pytest.mark.openthread_br -@pytest.mark.flaky(reruns=1, reruns_delay=1) +@pytest.mark.flaky(reruns=1, reruns_delay=5) @pytest.mark.parametrize( 'config, count, app_path, target, port', [ @@ -365,7 +367,7 @@ def test_multicast_forwarding_B(Init_interface: bool, dut: tuple[IdfDut, IdfDut, indirect=True, ) def test_service_discovery_of_Thread_device( - Init_interface: bool, Init_avahi: bool, dut: tuple[IdfDut, IdfDut, IdfDut] + Init_interface: bool, Init_avahi: bool, dut: Tuple[IdfDut, IdfDut, IdfDut] ) -> None: br = dut[2] cli = dut[1] @@ -401,14 +403,14 @@ def test_service_discovery_of_Thread_device( logging.info(f'avahi-browse:\n {out_str}') assert 'myTest' in str(out_str) finally: - ocf.execute_command(br, 'factoryreset') - ocf.execute_command(cli, 'factoryreset') + ocf.stop_thread(cli) + ocf.stop_thread(br) time.sleep(3) # Case 6: discover dervice published by Wi-Fi device @pytest.mark.openthread_br -@pytest.mark.flaky(reruns=1, reruns_delay=1) +@pytest.mark.flaky(reruns=1, reruns_delay=5) @pytest.mark.parametrize( 'config, count, app_path, target, port', [ @@ -426,7 +428,7 @@ def test_service_discovery_of_Thread_device( indirect=True, ) def test_service_discovery_of_WiFi_device( - Init_interface: bool, Init_avahi: bool, dut: tuple[IdfDut, IdfDut, IdfDut] + Init_interface: bool, Init_avahi: bool, dut: Tuple[IdfDut, IdfDut, IdfDut] ) -> None: br = dut[2] cli = dut[1] @@ -471,14 +473,14 @@ def test_service_discovery_of_WiFi_device( finally: ocf.host_close_service() sp.terminate() - ocf.execute_command(br, 'factoryreset') - ocf.execute_command(cli, 'factoryreset') + ocf.stop_thread(cli) + ocf.stop_thread(br) time.sleep(3) # Case 7: ICMP communication via NAT64 @pytest.mark.openthread_br -@pytest.mark.flaky(reruns=1, reruns_delay=1) +@pytest.mark.flaky(reruns=1, reruns_delay=5) @pytest.mark.parametrize( 'config, count, app_path, target, port', [ @@ -495,7 +497,7 @@ def test_service_discovery_of_WiFi_device( ], indirect=True, ) -def test_ICMP_NAT64(Init_interface: bool, dut: tuple[IdfDut, IdfDut, IdfDut]) -> None: +def test_ICMP_NAT64(Init_interface: bool, dut: Tuple[IdfDut, IdfDut, IdfDut]) -> None: br = dut[2] cli = dut[1] assert Init_interface @@ -509,14 +511,14 @@ def test_ICMP_NAT64(Init_interface: bool, dut: tuple[IdfDut, IdfDut, IdfDut]) -> rx_nums = ocf.ot_ping(cli, str(host_ipv4_address), count=5)[1] assert rx_nums != 0 finally: - ocf.execute_command(br, 'factoryreset') - ocf.execute_command(cli, 'factoryreset') + ocf.stop_thread(cli) + ocf.stop_thread(br) time.sleep(3) # Case 8: UDP communication via NAT64 @pytest.mark.openthread_br -@pytest.mark.flaky(reruns=1, reruns_delay=1) +@pytest.mark.flaky(reruns=1, reruns_delay=5) @pytest.mark.parametrize( 'config, count, app_path, target, port', [ @@ -533,7 +535,7 @@ def test_ICMP_NAT64(Init_interface: bool, dut: tuple[IdfDut, IdfDut, IdfDut]) -> ], indirect=True, ) -def test_UDP_NAT64(Init_interface: bool, dut: tuple[IdfDut, IdfDut, IdfDut]) -> None: +def test_UDP_NAT64(Init_interface: bool, dut: Tuple[IdfDut, IdfDut, IdfDut]) -> None: br = dut[2] cli = dut[1] assert Init_interface @@ -564,15 +566,15 @@ def test_UDP_NAT64(Init_interface: bool, dut: tuple[IdfDut, IdfDut, IdfDut]) -> while udp_mission.is_alive(): time.sleep(1) finally: - ocf.execute_command(br, 'factoryreset') - ocf.execute_command(cli, 'factoryreset') + ocf.stop_thread(cli) + ocf.stop_thread(br) time.sleep(3) assert b'hello' in myudp.udp_bytes # Case 9: TCP communication via NAT64 @pytest.mark.openthread_br -@pytest.mark.flaky(reruns=1, reruns_delay=1) +@pytest.mark.flaky(reruns=1, reruns_delay=5) @pytest.mark.parametrize( 'config, count, app_path, target, port', [ @@ -589,7 +591,7 @@ def test_UDP_NAT64(Init_interface: bool, dut: tuple[IdfDut, IdfDut, IdfDut]) -> ], indirect=True, ) -def test_TCP_NAT64(Init_interface: bool, dut: tuple[IdfDut, IdfDut, IdfDut]) -> None: +def test_TCP_NAT64(Init_interface: bool, dut: Tuple[IdfDut, IdfDut, IdfDut]) -> None: br = dut[2] cli = dut[1] assert Init_interface @@ -626,14 +628,15 @@ def test_TCP_NAT64(Init_interface: bool, dut: tuple[IdfDut, IdfDut, IdfDut]) -> while tcp_mission.is_alive(): time.sleep(1) finally: - ocf.execute_command(br, 'factoryreset') - ocf.execute_command(cli, 'factoryreset') + ocf.stop_thread(cli) + ocf.stop_thread(br) time.sleep(3) assert b'hello' in mytcp.tcp_bytes # Case 10: Sleepy device test @pytest.mark.openthread_sleep +@pytest.mark.flaky(reruns=1, reruns_delay=5) @pytest.mark.parametrize( 'config, count, app_path, target, port', [ @@ -658,9 +661,10 @@ def test_TCP_NAT64(Init_interface: bool, dut: tuple[IdfDut, IdfDut, IdfDut]) -> ], indirect=True, ) -def test_ot_sleepy_device(dut: tuple[IdfDut, IdfDut]) -> None: +def test_ot_sleepy_device(dut: Tuple[IdfDut, IdfDut]) -> None: leader = dut[0] sleepy_device = dut[1] + ocf.hardreset_dut(sleepy_device) fail_info = re.compile(r'Core\W*?\d\W*?register dump') try: ocf.init_thread(leader) @@ -687,12 +691,13 @@ def test_ot_sleepy_device(dut: tuple[IdfDut, IdfDut]) -> None: assert not bool(fail_info.search(str(output))) finally: ocf.execute_command(leader, 'factoryreset') + ocf.hardreset_dut(sleepy_device) time.sleep(3) # Case 11: Basic startup Test of BR @pytest.mark.openthread_br -@pytest.mark.flaky(reruns=1, reruns_delay=1) +@pytest.mark.flaky(reruns=1, reruns_delay=5) @pytest.mark.parametrize( 'config, count, app_path, target, port', [ @@ -707,7 +712,7 @@ def test_ot_sleepy_device(dut: tuple[IdfDut, IdfDut]) -> None: ], indirect=True, ) -def test_basic_startup(dut: tuple[IdfDut, IdfDut]) -> None: +def test_basic_startup(dut: Tuple[IdfDut, IdfDut]) -> None: br = dut[1] dut[0].serial.stop_redirect_thread() try: @@ -726,13 +731,13 @@ def test_basic_startup(dut: tuple[IdfDut, IdfDut]) -> None: br.expect('Done', timeout=5) assert ocf.wait_for_join(br, 'leader') finally: - ocf.execute_command(br, 'factoryreset') + ocf.stop_thread(br) time.sleep(3) # Case 12: Curl a website via DNS and NAT64 @pytest.mark.openthread_bbr -@pytest.mark.flaky(reruns=1, reruns_delay=1) +@pytest.mark.flaky(reruns=1, reruns_delay=5) @pytest.mark.parametrize( 'config, count, app_path, target, port', [ @@ -749,7 +754,7 @@ def test_basic_startup(dut: tuple[IdfDut, IdfDut]) -> None: ], indirect=True, ) -def test_NAT64_DNS(Init_interface: bool, dut: tuple[IdfDut, IdfDut, IdfDut]) -> None: +def test_NAT64_DNS(Init_interface: bool, dut: Tuple[IdfDut, IdfDut, IdfDut]) -> None: br = dut[2] cli = dut[1] assert Init_interface @@ -757,23 +762,24 @@ def test_NAT64_DNS(Init_interface: bool, dut: tuple[IdfDut, IdfDut, IdfDut]) -> formBasicWiFiThreadNetwork(br, cli) try: + ocf.wait_for_host_network() ocf.execute_command(br, 'bbr') br.expect('server16', timeout=5) ocf.execute_command(cli, 'dns64server 8.8.8.8') cli.expect('Done', timeout=5) command = 'curl http://www.espressif.com' message = ocf.get_ouput_string(cli, command, 10) - assert '' in str(message) + assert 'html' in str(message) assert '301 Moved Permanently' in str(message) finally: - ocf.execute_command(br, 'factoryreset') - ocf.execute_command(cli, 'factoryreset') + ocf.stop_thread(cli) + ocf.stop_thread(br) time.sleep(3) # Case 13: Meshcop discovery of Border Router @pytest.mark.openthread_br -@pytest.mark.flaky(reruns=1, reruns_delay=1) +@pytest.mark.flaky(reruns=1, reruns_delay=5) @pytest.mark.parametrize( 'config, count, app_path, target, port', [ @@ -788,7 +794,7 @@ def test_NAT64_DNS(Init_interface: bool, dut: tuple[IdfDut, IdfDut, IdfDut]) -> ], indirect=True, ) -def test_br_meshcop(Init_interface: bool, Init_avahi: bool, dut: tuple[IdfDut, IdfDut]) -> None: +def test_br_meshcop(Init_interface: bool, Init_avahi: bool, dut: Tuple[IdfDut, IdfDut]) -> None: br = dut[1] assert Init_interface assert Init_avahi @@ -827,13 +833,13 @@ def test_br_meshcop(Init_interface: bool, Init_avahi: bool, dut: tuple[IdfDut, I assert 'vn=OpenThread' in str(output_str) assert 'rv=1' in str(output_str) finally: - ocf.execute_command(br, 'factoryreset') + ocf.stop_thread(br) time.sleep(3) # Case 14: Curl a website over HTTPS via DNS and NAT64 @pytest.mark.openthread_bbr -@pytest.mark.flaky(reruns=1, reruns_delay=1) +@pytest.mark.flaky(reruns=1, reruns_delay=5) @pytest.mark.parametrize( 'config, count, app_path, target, port', [ @@ -850,7 +856,7 @@ def test_br_meshcop(Init_interface: bool, Init_avahi: bool, dut: tuple[IdfDut, I ], indirect=True, ) -def test_https_NAT64_DNS(Init_interface: bool, dut: tuple[IdfDut, IdfDut, IdfDut]) -> None: +def test_https_NAT64_DNS(Init_interface: bool, dut: Tuple[IdfDut, IdfDut, IdfDut]) -> None: br = dut[2] cli = dut[1] assert Init_interface @@ -858,21 +864,22 @@ def test_https_NAT64_DNS(Init_interface: bool, dut: tuple[IdfDut, IdfDut, IdfDut formBasicWiFiThreadNetwork(br, cli) try: + ocf.wait_for_host_network() ocf.execute_command(cli, 'dns64server 8.8.8.8') cli.expect('Done', timeout=5) command = 'curl https://www.example.com/' message = ocf.get_ouput_string(cli, command, 20) - assert '' in str(message) - assert 'This domain is for use in illustrative examples in documents' in str(message) + assert 'html' in str(message) + assert 'This domain is for use in' in str(message) finally: - ocf.execute_command(br, 'factoryreset') - ocf.execute_command(cli, 'factoryreset') + ocf.stop_thread(cli) + ocf.stop_thread(br) time.sleep(3) # Case 15: Thread network formation and attaching with TREL @pytest.mark.openthread_br -@pytest.mark.flaky(reruns=1, reruns_delay=1) +@pytest.mark.flaky(reruns=1, reruns_delay=5) @pytest.mark.parametrize( 'config, count, app_path, target, port', [ @@ -888,7 +895,7 @@ def test_https_NAT64_DNS(Init_interface: bool, dut: tuple[IdfDut, IdfDut, IdfDut ], indirect=True, ) -def test_trel_connect(dut: tuple[IdfDut, IdfDut]) -> None: +def test_trel_connect(dut: Tuple[IdfDut, IdfDut]) -> None: trel_s3 = dut[1] trel_c6 = dut[0] trel_list = [trel_c6] @@ -918,15 +925,15 @@ def test_trel_connect(dut: tuple[IdfDut, IdfDut]) -> None: rx_nums = ocf.ot_ping(trel_s3, trel_mleid_addr, count=10)[1] assert rx_nums > 5 finally: - ocf.execute_command(trel_s3, 'factoryreset') for trel in trel_list: - ocf.execute_command(trel, 'factoryreset') + ocf.stop_thread(trel) + ocf.stop_thread(trel_s3) time.sleep(3) # Case 16: Thread network BR lib check @pytest.mark.openthread_br -@pytest.mark.flaky(reruns=1, reruns_delay=1) +@pytest.mark.flaky(reruns=1, reruns_delay=5) @pytest.mark.parametrize( 'config, count, app_path, target, port', [ @@ -941,7 +948,7 @@ def test_trel_connect(dut: tuple[IdfDut, IdfDut]) -> None: ], indirect=True, ) -def test_br_lib_check(dut: tuple[IdfDut, IdfDut]) -> None: +def test_br_lib_check(dut: Tuple[IdfDut, IdfDut]) -> None: br = dut[1] dut[0].serial.stop_redirect_thread() try: @@ -955,6 +962,7 @@ def test_br_lib_check(dut: tuple[IdfDut, IdfDut]) -> None: # Case 17: SSED test @pytest.mark.openthread_sleep +@pytest.mark.flaky(reruns=1, reruns_delay=5) @pytest.mark.parametrize( 'config, count, app_path, target, port', [ @@ -979,10 +987,11 @@ def test_br_lib_check(dut: tuple[IdfDut, IdfDut]) -> None: ], indirect=True, ) -def test_ot_ssed_device(dut: tuple[IdfDut, IdfDut]) -> None: +def test_ot_ssed_device(dut: Tuple[IdfDut, IdfDut]) -> None: leader = dut[0] ssed_device = dut[1] try: + ocf.hardreset_dut(ssed_device) # CI device must have external XTAL to run SSED case, we will check this here first ssed_device.expect('32k XTAL in use', timeout=10) ocf.init_thread(leader) @@ -1022,4 +1031,5 @@ def test_ot_ssed_device(dut: tuple[IdfDut, IdfDut]) -> None: ocf.ping_and_check(dut=leader, target=ssed_address, tx_total=10, timeout=6) finally: ocf.execute_command(leader, 'factoryreset') + ocf.hardreset_dut(ssed_device) time.sleep(3)