feat(openthread): optimize fail cases in CI test

This commit is contained in:
yiwenxiu
2025-10-22 19:04:48 +08:00
parent 9748cae749
commit f58b23ed74
2 changed files with 119 additions and 81 deletions
+65 -36
View File
@@ -19,19 +19,31 @@ from pytest_embedded_idf.dut import IdfDut
def extract_address(
command: str, pattern: str, default_return: str = ''
command: str,
pattern: str,
default_return: str = '',
retries: int = 3,
delay: int = 2,
) -> Callable[[Callable[[str], str]], Callable[[IdfDut], str]]:
def decorator(func: Callable[[str], str]) -> Callable[[IdfDut], str]:
@wraps(func)
def wrapper(dut: IdfDut) -> str:
clean_buffer(dut)
execute_command(dut, command)
try:
result = dut.expect(pattern, timeout=5)[1].decode()
except Exception as e:
logging.error(f'Error: {e}')
return default_return
return func(result)
last_exception: Exception | None = None
for attempt in range(1, retries + 1):
try:
clean_buffer(dut)
execute_command(dut, command)
result = dut.expect(pattern, timeout=5)[1].decode()
return func(result)
except Exception as e:
logging.exception(f'[{command}] Attempt {attempt}/{retries} failed: {e}')
last_exception = e
if attempt < retries:
time.sleep(delay)
if last_exception:
logging.exception(f'[{command}] Giving up after {retries} retries.')
return default_return
return wrapper
@@ -173,6 +185,12 @@ def init_thread(dut: IdfDut) -> None:
reset_thread(dut)
def stop_thread(dut: IdfDut) -> None:
execute_command(dut, 'thread stop')
dut.expect('disabled', timeout=20)
reset_thread(dut)
def reset_thread(dut: IdfDut) -> None:
execute_command(dut, 'factoryreset')
dut.expect('OpenThread attached to netif', timeout=20)
@@ -180,28 +198,28 @@ def reset_thread(dut: IdfDut) -> None:
clean_buffer(dut)
def hardreset_dut(dut: IdfDut) -> None:
dut.serial.hard_reset()
time.sleep(5)
execute_command(dut, 'factoryreset')
# get the mleid address of the thread
def get_mleid_addr(dut: IdfDut) -> str:
dut_adress = ''
execute_command(dut, 'ipaddr mleid')
dut_adress = dut.expect(r'\n((?:\w+:){7}\w+)\r', timeout=5)[1].decode()
return str(dut_adress)
@extract_address('ipaddr mleid', r'\n((?:\w+:){7}\w+)\r')
def get_mleid_addr(addr: str) -> str:
return addr
# get the rloc address of the thread
def get_rloc_addr(dut: IdfDut) -> str:
dut_adress = ''
execute_command(dut, 'ipaddr rloc')
dut_adress = dut.expect(r'\n((?:\w+:){7}\w+)\r', timeout=5)[1].decode()
return str(dut_adress)
@extract_address('ipaddr rloc', r'\n((?:\w+:){7}\w+)\r')
def get_rloc_addr(addr: str) -> str:
return addr
# get the linklocal address of the thread
def get_linklocal_addr(dut: IdfDut) -> str:
dut_adress = ''
execute_command(dut, 'ipaddr linklocal')
dut_adress = dut.expect(r'\n((?:\w+:){7}\w+)\r', timeout=5)[1].decode()
return str(dut_adress)
@extract_address('ipaddr linklocal', r'\n((?:\w+:){7}\w+)\r')
def get_linklocal_addr(addr: str) -> str:
return addr
# get the global unicast address of the thread:
@@ -619,22 +637,19 @@ def decimal_to_hex(decimal_str: str) -> str:
return hex_str
def get_omrprefix(br: IdfDut) -> str:
execute_command(br, 'br omrprefix')
omrprefix = br.expect(r'Local: ((?:\w+:){4}):/\d+\r', timeout=5)[1].decode()
return str(omrprefix)
@extract_address('br omrprefix', r'Local: ((?:\w+:){4}):/\d+\r')
def get_omrprefix(addr: str) -> str:
return addr
def get_onlinkprefix(br: IdfDut) -> str:
execute_command(br, 'br onlinkprefix')
onlinkprefix = br.expect(r'Local: ((?:\w+:){4}):/\d+\r', timeout=5)[1].decode()
return str(onlinkprefix)
@extract_address('br onlinkprefix', r'Local: ((?:\w+:){4}):/\d+\r')
def get_onlinkprefix(addr: str) -> str:
return addr
def get_nat64prefix(br: IdfDut) -> str:
execute_command(br, 'br nat64prefix')
nat64prefix = br.expect(r'Local: ((?:\w+:){6}):/\d+', timeout=5)[1].decode()
return str(nat64prefix)
@extract_address('br nat64prefix', r'Local: ((?:\w+:){6}):/\d+')
def get_nat64prefix(addr: str) -> str:
return addr
def execute_command(dut: IdfDut, command: str, prefix: str = 'ot ') -> None:
@@ -647,3 +662,17 @@ def get_ouput_string(dut: IdfDut, command: str, wait_time: int) -> str:
tmp = dut.expect(pexpect.TIMEOUT, timeout=wait_time)
clean_buffer(dut)
return str(tmp)
def wait_for_host_network(host: str = '8.8.8.8', retries: int = 6, interval: int = 10) -> None:
for attempt in range(1, retries + 1):
try:
subprocess.run(['ping', '-c', '1', '-W', '2', host], check=True)
logging.info(f'Host network reachable on attempt {attempt}')
return
except subprocess.CalledProcessError:
logging.info(f'Ping attempt {attempt} failed, retrying in {interval} seconds...')
if attempt < retries:
time.sleep(interval)
else:
raise RuntimeError(f'Host network is not reachable after {retries} attempts.')
+54 -45
View File
@@ -106,7 +106,7 @@ PORT_MAPPING = {'ESPPORT1': 'esp32h2', 'ESPPORT2': 'esp32s3', 'ESPPORT3': 'esp32
# Case 1: Thread network formation and attaching
@pytest.mark.openthread_br
@pytest.mark.flaky(reruns=1, reruns_delay=1)
@pytest.mark.flaky(reruns=1, reruns_delay=5)
@pytest.mark.parametrize(
'config, count, app_path, target, port',
[
@@ -161,9 +161,9 @@ def test_thread_connect(dut: tuple[IdfDut, IdfDut, IdfDut]) -> None:
rx_nums = ocf.ot_ping(br, cli_mleid_addr, count=5)[1]
assert rx_nums == 5
finally:
ocf.execute_command(br, 'factoryreset')
for cli in cli_list:
ocf.execute_command(cli, 'factoryreset')
ocf.stop_thread(cli)
ocf.stop_thread(br)
time.sleep(3)
@@ -189,7 +189,7 @@ def formBasicWiFiThreadNetwork(br: IdfDut, cli: IdfDut) -> None:
# Case 2: Bidirectional IPv6 connectivity
@pytest.mark.openthread_br
@pytest.mark.flaky(reruns=1, reruns_delay=1)
@pytest.mark.flaky(reruns=1, reruns_delay=5)
@pytest.mark.parametrize(
'config, count, app_path, target, port',
[
@@ -231,18 +231,19 @@ def test_Bidirectional_IPv6_connectivity(Init_interface: bool, dut: tuple[IdfDut
host_global_unicast_addr = re.findall(pattern, out_str)
rx_nums = 0
for ip_addr in host_global_unicast_addr:
txrx_nums = ocf.ot_ping(cli, str(ip_addr), count=5)
txrx_nums = ocf.ot_ping(cli, str(ip_addr), count=10)
rx_nums = rx_nums + int(txrx_nums[1])
logging.debug(f'rx_nums: {rx_nums}')
assert rx_nums != 0
finally:
ocf.execute_command(br, 'factoryreset')
ocf.execute_command(cli, 'factoryreset')
ocf.stop_thread(cli)
ocf.stop_thread(br)
time.sleep(3)
# Case 3: Multicast forwarding from Wi-Fi to Thread network
@pytest.mark.openthread_br
@pytest.mark.flaky(reruns=1, reruns_delay=1)
@pytest.mark.flaky(reruns=1, reruns_delay=5)
@pytest.mark.parametrize(
'config, count, app_path, target, port',
[
@@ -288,14 +289,14 @@ def test_multicast_forwarding_A(Init_interface: bool, dut: tuple[IdfDut, IdfDut,
ocf.execute_command(cli, 'udp close')
cli.expect('Done', timeout=5)
finally:
ocf.execute_command(br, 'factoryreset')
ocf.execute_command(cli, 'factoryreset')
ocf.stop_thread(cli)
ocf.stop_thread(br)
time.sleep(3)
# Case 4: Multicast forwarding from Thread to Wi-Fi network
@pytest.mark.openthread_br
@pytest.mark.flaky(reruns=1, reruns_delay=1)
@pytest.mark.flaky(reruns=1, reruns_delay=5)
@pytest.mark.parametrize(
'config, count, app_path, target, port',
[
@@ -341,15 +342,15 @@ def test_multicast_forwarding_B(Init_interface: bool, dut: tuple[IdfDut, IdfDut,
while udp_mission.is_alive():
time.sleep(1)
finally:
ocf.execute_command(br, 'factoryreset')
ocf.execute_command(cli, 'factoryreset')
ocf.stop_thread(cli)
ocf.stop_thread(br)
time.sleep(3)
assert b'hello' in myudp.udp_bytes
# Case 5: discover dervice published by Thread device
@pytest.mark.openthread_br
@pytest.mark.flaky(reruns=1, reruns_delay=1)
@pytest.mark.flaky(reruns=1, reruns_delay=5)
@pytest.mark.parametrize(
'config, count, app_path, target, port',
[
@@ -403,14 +404,14 @@ def test_service_discovery_of_Thread_device(
logging.info(f'avahi-browse:\n {out_str}')
assert 'myTest' in str(out_str)
finally:
ocf.execute_command(br, 'factoryreset')
ocf.execute_command(cli, 'factoryreset')
ocf.stop_thread(cli)
ocf.stop_thread(br)
time.sleep(3)
# Case 6: discover dervice published by Wi-Fi device
@pytest.mark.openthread_br
@pytest.mark.flaky(reruns=1, reruns_delay=1)
@pytest.mark.flaky(reruns=1, reruns_delay=5)
@pytest.mark.parametrize(
'config, count, app_path, target, port',
[
@@ -473,14 +474,14 @@ def test_service_discovery_of_WiFi_device(
finally:
ocf.host_close_service()
sp.terminate()
ocf.execute_command(br, 'factoryreset')
ocf.execute_command(cli, 'factoryreset')
ocf.stop_thread(cli)
ocf.stop_thread(br)
time.sleep(3)
# Case 7: ICMP communication via NAT64
@pytest.mark.openthread_br
@pytest.mark.flaky(reruns=1, reruns_delay=1)
@pytest.mark.flaky(reruns=1, reruns_delay=5)
@pytest.mark.parametrize(
'config, count, app_path, target, port',
[
@@ -511,14 +512,14 @@ def test_ICMP_NAT64(Init_interface: bool, dut: tuple[IdfDut, IdfDut, IdfDut]) ->
rx_nums = ocf.ot_ping(cli, str(host_ipv4_address), count=5)[1]
assert rx_nums != 0
finally:
ocf.execute_command(br, 'factoryreset')
ocf.execute_command(cli, 'factoryreset')
ocf.stop_thread(cli)
ocf.stop_thread(br)
time.sleep(3)
# Case 8: UDP communication via NAT64
@pytest.mark.openthread_br
@pytest.mark.flaky(reruns=1, reruns_delay=1)
@pytest.mark.flaky(reruns=1, reruns_delay=5)
@pytest.mark.parametrize(
'config, count, app_path, target, port',
[
@@ -566,15 +567,15 @@ def test_UDP_NAT64(Init_interface: bool, dut: tuple[IdfDut, IdfDut, IdfDut]) ->
while udp_mission.is_alive():
time.sleep(1)
finally:
ocf.execute_command(br, 'factoryreset')
ocf.execute_command(cli, 'factoryreset')
ocf.stop_thread(cli)
ocf.stop_thread(br)
time.sleep(3)
assert b'hello' in myudp.udp_bytes
# Case 9: TCP communication via NAT64
@pytest.mark.openthread_br
@pytest.mark.flaky(reruns=1, reruns_delay=1)
@pytest.mark.flaky(reruns=1, reruns_delay=5)
@pytest.mark.parametrize(
'config, count, app_path, target, port',
[
@@ -628,14 +629,15 @@ def test_TCP_NAT64(Init_interface: bool, dut: tuple[IdfDut, IdfDut, IdfDut]) ->
while tcp_mission.is_alive():
time.sleep(1)
finally:
ocf.execute_command(br, 'factoryreset')
ocf.execute_command(cli, 'factoryreset')
ocf.stop_thread(cli)
ocf.stop_thread(br)
time.sleep(3)
assert b'hello' in mytcp.tcp_bytes
# Case 10: Sleepy device test
@pytest.mark.openthread_sleep
@pytest.mark.flaky(reruns=1, reruns_delay=5)
@pytest.mark.parametrize(
'config, count, app_path, target, port',
[
@@ -663,6 +665,7 @@ def test_TCP_NAT64(Init_interface: bool, dut: tuple[IdfDut, IdfDut, IdfDut]) ->
def test_ot_sleepy_device(dut: tuple[IdfDut, IdfDut]) -> None:
leader = dut[0]
sleepy_device = dut[1]
ocf.hardreset_dut(sleepy_device)
fail_info = re.compile(r'Core\W*?\d\W*?register dump')
try:
ocf.init_thread(leader)
@@ -689,12 +692,13 @@ def test_ot_sleepy_device(dut: tuple[IdfDut, IdfDut]) -> None:
assert not bool(fail_info.search(str(output)))
finally:
ocf.execute_command(leader, 'factoryreset')
ocf.hardreset_dut(sleepy_device)
time.sleep(3)
# Case 11: Basic startup Test of BR
@pytest.mark.openthread_br
@pytest.mark.flaky(reruns=1, reruns_delay=1)
@pytest.mark.flaky(reruns=1, reruns_delay=5)
@pytest.mark.parametrize(
'config, count, app_path, target, port',
[
@@ -728,13 +732,13 @@ def test_basic_startup(dut: tuple[IdfDut, IdfDut]) -> None:
br.expect('Done', timeout=5)
assert ocf.wait_for_join(br, 'leader')
finally:
ocf.execute_command(br, 'factoryreset')
ocf.stop_thread(br)
time.sleep(3)
# Case 12: Curl a website via DNS and NAT64
@pytest.mark.openthread_bbr
@pytest.mark.flaky(reruns=1, reruns_delay=1)
@pytest.mark.flaky(reruns=1, reruns_delay=5)
@pytest.mark.parametrize(
'config, count, app_path, target, port',
[
@@ -759,23 +763,24 @@ def test_NAT64_DNS(Init_interface: bool, dut: tuple[IdfDut, IdfDut, IdfDut]) ->
formBasicWiFiThreadNetwork(br, cli)
try:
ocf.wait_for_host_network()
ocf.execute_command(br, 'bbr')
br.expect('server16', timeout=5)
ocf.execute_command(cli, 'dns64server 8.8.8.8')
cli.expect('Done', timeout=5)
command = 'curl http://www.espressif.com'
message = ocf.get_ouput_string(cli, command, 10)
assert '<html>' in str(message)
assert 'html' in str(message)
assert '301 Moved Permanently' in str(message)
finally:
ocf.execute_command(br, 'factoryreset')
ocf.execute_command(cli, 'factoryreset')
ocf.stop_thread(cli)
ocf.stop_thread(br)
time.sleep(3)
# Case 13: Meshcop discovery of Border Router
@pytest.mark.openthread_br
@pytest.mark.flaky(reruns=1, reruns_delay=1)
@pytest.mark.flaky(reruns=1, reruns_delay=5)
@pytest.mark.parametrize(
'config, count, app_path, target, port',
[
@@ -829,13 +834,13 @@ def test_br_meshcop(Init_interface: bool, Init_avahi: bool, dut: tuple[IdfDut, I
assert 'vn=OpenThread' in str(output_str)
assert 'rv=1' in str(output_str)
finally:
ocf.execute_command(br, 'factoryreset')
ocf.stop_thread(br)
time.sleep(3)
# Case 14: Curl a website over HTTPS via DNS and NAT64
@pytest.mark.openthread_bbr
@pytest.mark.flaky(reruns=1, reruns_delay=1)
@pytest.mark.flaky(reruns=1, reruns_delay=5)
@pytest.mark.parametrize(
'config, count, app_path, target, port',
[
@@ -860,21 +865,22 @@ def test_https_NAT64_DNS(Init_interface: bool, dut: tuple[IdfDut, IdfDut, IdfDut
formBasicWiFiThreadNetwork(br, cli)
try:
ocf.wait_for_host_network()
ocf.execute_command(cli, 'dns64server 8.8.8.8')
cli.expect('Done', timeout=5)
command = 'curl https://www.example.com/'
message = ocf.get_ouput_string(cli, command, 20)
assert '<html>' in str(message)
assert 'This domain is for use in illustrative examples in documents' in str(message)
assert 'html' in str(message)
assert 'This domain is for use in' in str(message)
finally:
ocf.execute_command(br, 'factoryreset')
ocf.execute_command(cli, 'factoryreset')
ocf.stop_thread(cli)
ocf.stop_thread(br)
time.sleep(3)
# Case 15: Thread network formation and attaching with TREL
@pytest.mark.openthread_br
@pytest.mark.flaky(reruns=1, reruns_delay=1)
@pytest.mark.flaky(reruns=1, reruns_delay=5)
@pytest.mark.parametrize(
'config, count, app_path, target, port',
[
@@ -920,15 +926,15 @@ def test_trel_connect(dut: tuple[IdfDut, IdfDut]) -> None:
rx_nums = ocf.ot_ping(trel_s3, trel_mleid_addr, count=10)[1]
assert rx_nums > 5
finally:
ocf.execute_command(trel_s3, 'factoryreset')
for trel in trel_list:
ocf.execute_command(trel, 'factoryreset')
ocf.stop_thread(trel)
ocf.stop_thread(trel_s3)
time.sleep(3)
# Case 16: Thread network BR lib check
@pytest.mark.openthread_br
@pytest.mark.flaky(reruns=1, reruns_delay=1)
@pytest.mark.flaky(reruns=1, reruns_delay=5)
@pytest.mark.parametrize(
'config, count, app_path, target, port',
[
@@ -957,6 +963,7 @@ def test_br_lib_check(dut: tuple[IdfDut, IdfDut]) -> None:
# Case 17: SSED test
@pytest.mark.openthread_sleep
@pytest.mark.flaky(reruns=1, reruns_delay=5)
@pytest.mark.parametrize(
'config, count, app_path, target, port',
[
@@ -985,6 +992,7 @@ def test_ot_ssed_device(dut: tuple[IdfDut, IdfDut]) -> None:
leader = dut[0]
ssed_device = dut[1]
try:
ocf.hardreset_dut(ssed_device)
# CI device must have external XTAL to run SSED case, we will check this here first
ssed_device.expect('32k XTAL in use', timeout=10)
ocf.init_thread(leader)
@@ -1024,4 +1032,5 @@ def test_ot_ssed_device(dut: tuple[IdfDut, IdfDut]) -> None:
ocf.ping_and_check(dut=leader, target=ssed_address, tx_total=10, timeout=6)
finally:
ocf.execute_command(leader, 'factoryreset')
ocf.hardreset_dut(ssed_device)
time.sleep(3)