feat(openthread): optimize fail cases in CI test

This commit is contained in:
yiwenxiu
2025-10-22 19:04:48 +08:00
parent 61d56f042c
commit 5d628f7ade
2 changed files with 140 additions and 98 deletions
+68 -36
View File
@@ -11,6 +11,7 @@ import subprocess
import time
from functools import wraps
from typing import Callable
from typing import Optional
from typing import Tuple
import netifaces
@@ -20,19 +21,33 @@ from pytest_embedded_idf.dut import IdfDut
def extract_address(
command: str, pattern: str, default_return: str = ''
command: str,
pattern: str,
default_return: str = '',
retries: int = 3,
delay: int = 2,
) -> Callable[[Callable[[str], str]], Callable[[IdfDut], str]]:
def decorator(func: Callable[[str], str]) -> Callable[[IdfDut], str]:
@wraps(func)
def wrapper(dut: IdfDut) -> str:
clean_buffer(dut)
execute_command(dut, command)
try:
result = dut.expect(pattern, timeout=5)[1].decode()
except Exception as e:
logging.error(f'Error: {e}')
return default_return
return func(result)
# requires Python3.10
# last_exception: Exception | None = None
last_exception: Optional[Exception] = None
for attempt in range(1, retries + 1):
try:
clean_buffer(dut)
execute_command(dut, command)
result = dut.expect(pattern, timeout=5)[1].decode()
return func(result)
except Exception as e:
logging.exception(f'[{command}] Attempt {attempt}/{retries} failed: {e}')
last_exception = e
if attempt < retries:
time.sleep(delay)
if last_exception:
logging.exception(f'[{command}] Giving up after {retries} retries.')
return default_return
return wrapper
@@ -174,6 +189,12 @@ def init_thread(dut: IdfDut) -> None:
reset_thread(dut)
def stop_thread(dut: IdfDut) -> None:
execute_command(dut, 'thread stop')
dut.expect('disabled', timeout=20)
reset_thread(dut)
def reset_thread(dut: IdfDut) -> None:
execute_command(dut, 'factoryreset')
dut.expect('OpenThread attached to netif', timeout=20)
@@ -181,28 +202,28 @@ def reset_thread(dut: IdfDut) -> None:
clean_buffer(dut)
def hardreset_dut(dut: IdfDut) -> None:
dut.serial.hard_reset()
time.sleep(5)
execute_command(dut, 'factoryreset')
# get the mleid address of the thread
def get_mleid_addr(dut: IdfDut) -> str:
dut_adress = ''
execute_command(dut, 'ipaddr mleid')
dut_adress = dut.expect(r'\n((?:\w+:){7}\w+)\r', timeout=5)[1].decode()
return dut_adress
@extract_address('ipaddr mleid', r'\n((?:\w+:){7}\w+)\r')
def get_mleid_addr(addr: str) -> str:
return addr
# get the rloc address of the thread
def get_rloc_addr(dut: IdfDut) -> str:
dut_adress = ''
execute_command(dut, 'ipaddr rloc')
dut_adress = dut.expect(r'\n((?:\w+:){7}\w+)\r', timeout=5)[1].decode()
return dut_adress
@extract_address('ipaddr rloc', r'\n((?:\w+:){7}\w+)\r')
def get_rloc_addr(addr: str) -> str:
return addr
# get the linklocal address of the thread
def get_linklocal_addr(dut: IdfDut) -> str:
dut_adress = ''
execute_command(dut, 'ipaddr linklocal')
dut_adress = dut.expect(r'\n((?:\w+:){7}\w+)\r', timeout=5)[1].decode()
return dut_adress
@extract_address('ipaddr linklocal', r'\n((?:\w+:){7}\w+)\r')
def get_linklocal_addr(addr: str) -> str:
return addr
# get the global unicast address of the thread:
@@ -620,22 +641,19 @@ def decimal_to_hex(decimal_str: str) -> str:
return hex_str
def get_omrprefix(br: IdfDut) -> str:
execute_command(br, 'br omrprefix')
omrprefix = br.expect(r'Local: ((?:\w+:){4}):/\d+\r', timeout=5)[1].decode()
return str(omrprefix)
@extract_address('br omrprefix', r'Local: ((?:\w+:){4}):/\d+\r')
def get_omrprefix(addr: str) -> str:
return addr
def get_onlinkprefix(br: IdfDut) -> str:
execute_command(br, 'br onlinkprefix')
onlinkprefix = br.expect(r'Local: ((?:\w+:){4}):/\d+\r', timeout=5)[1].decode()
return str(onlinkprefix)
@extract_address('br onlinkprefix', r'Local: ((?:\w+:){4}):/\d+\r')
def get_onlinkprefix(addr: str) -> str:
return addr
def get_nat64prefix(br: IdfDut) -> str:
execute_command(br, 'br nat64prefix')
nat64prefix = br.expect(r'Local: ((?:\w+:){6}):/\d+', timeout=5)[1].decode()
return str(nat64prefix)
@extract_address('br nat64prefix', r'Local: ((?:\w+:){6}):/\d+')
def get_nat64prefix(addr: str) -> str:
return addr
def execute_command(dut: IdfDut, command: str, prefix: str = 'ot ') -> None:
@@ -648,3 +666,17 @@ def get_ouput_string(dut: IdfDut, command: str, wait_time: int) -> str:
tmp = dut.expect(pexpect.TIMEOUT, timeout=wait_time)
clean_buffer(dut)
return str(tmp)
def wait_for_host_network(host: str = '8.8.8.8', retries: int = 6, interval: int = 10) -> None:
for attempt in range(1, retries + 1):
try:
subprocess.run(['ping', '-c', '1', '-W', '2', host], check=True)
logging.info(f'Host network reachable on attempt {attempt}')
return
except subprocess.CalledProcessError:
logging.info(f'Ping attempt {attempt} failed, retrying in {interval} seconds...')
if attempt < retries:
time.sleep(interval)
else:
raise RuntimeError(f'Host network is not reachable after {retries} attempts.')
+72 -62
View File
@@ -10,6 +10,7 @@ import secrets
import subprocess
import threading
import time
from typing import Tuple
import ot_ci_function as ocf
import pexpect
@@ -109,7 +110,7 @@ PORT_MAPPING = {
# Case 1: Thread network formation and attaching
@pytest.mark.supported_targets
@pytest.mark.openthread_br
@pytest.mark.flaky(reruns=1, reruns_delay=1)
@pytest.mark.flaky(reruns=1, reruns_delay=5)
@pytest.mark.parametrize(
'config, count, app_path, target, port',
[
@@ -136,7 +137,7 @@ PORT_MAPPING = {
],
indirect=True,
)
def test_thread_connect(dut: tuple[IdfDut, IdfDut, IdfDut]) -> None:
def test_thread_connect(dut: Tuple[IdfDut, IdfDut, IdfDut]) -> None:
br = dut[2]
cli_h2 = dut[1]
dut[0].serial.stop_redirect_thread()
@@ -164,9 +165,9 @@ def test_thread_connect(dut: tuple[IdfDut, IdfDut, IdfDut]) -> None:
rx_nums = ocf.ot_ping(br, cli_mleid_addr, count=5)[1]
assert rx_nums == 5
finally:
ocf.execute_command(br, 'factoryreset')
for cli in cli_list:
ocf.execute_command(cli, 'factoryreset')
ocf.stop_thread(cli)
ocf.stop_thread(br)
time.sleep(3)
@@ -193,7 +194,7 @@ def formBasicWiFiThreadNetwork(br:IdfDut, cli:IdfDut) -> None:
# Case 2: Bidirectional IPv6 connectivity
@pytest.mark.supported_targets
@pytest.mark.openthread_br
@pytest.mark.flaky(reruns=1, reruns_delay=1)
@pytest.mark.flaky(reruns=1, reruns_delay=5)
@pytest.mark.parametrize(
'config, count, app_path, target, port',
[
@@ -210,7 +211,7 @@ def formBasicWiFiThreadNetwork(br:IdfDut, cli:IdfDut) -> None:
],
indirect=True,
)
def test_Bidirectional_IPv6_connectivity(Init_interface: bool, dut: tuple[IdfDut, IdfDut, IdfDut]) -> None:
def test_Bidirectional_IPv6_connectivity(Init_interface: bool, dut: Tuple[IdfDut, IdfDut, IdfDut]) -> None:
br = dut[2]
cli = dut[1]
assert Init_interface
@@ -235,19 +236,20 @@ def test_Bidirectional_IPv6_connectivity(Init_interface: bool, dut: tuple[IdfDut
host_global_unicast_addr = re.findall(pattern, out_str)
rx_nums = 0
for ip_addr in host_global_unicast_addr:
txrx_nums = ocf.ot_ping(cli, str(ip_addr), count=5)
txrx_nums = ocf.ot_ping(cli, str(ip_addr), count=10)
rx_nums = rx_nums + int(txrx_nums[1])
logging.debug(f'rx_nums: {rx_nums}')
assert rx_nums != 0
finally:
ocf.execute_command(br, 'factoryreset')
ocf.execute_command(cli, 'factoryreset')
ocf.stop_thread(cli)
ocf.stop_thread(br)
time.sleep(3)
# Case 3: Multicast forwarding from Wi-Fi to Thread network
@pytest.mark.supported_targets
@pytest.mark.openthread_br
@pytest.mark.flaky(reruns=1, reruns_delay=1)
@pytest.mark.flaky(reruns=1, reruns_delay=5)
@pytest.mark.parametrize(
'config, count, app_path, target, port',
[
@@ -264,7 +266,7 @@ def test_Bidirectional_IPv6_connectivity(Init_interface: bool, dut: tuple[IdfDut
],
indirect=True,
)
def test_multicast_forwarding_A(Init_interface: bool, dut: tuple[IdfDut, IdfDut, IdfDut]) -> None:
def test_multicast_forwarding_A(Init_interface: bool, dut: Tuple[IdfDut, IdfDut, IdfDut]) -> None:
br = dut[2]
cli = dut[1]
assert Init_interface
@@ -293,15 +295,15 @@ def test_multicast_forwarding_A(Init_interface: bool, dut: tuple[IdfDut, IdfDut,
ocf.execute_command(cli, 'udp close')
cli.expect('Done', timeout=5)
finally:
ocf.execute_command(br, 'factoryreset')
ocf.execute_command(cli, 'factoryreset')
ocf.stop_thread(cli)
ocf.stop_thread(br)
time.sleep(3)
# Case 4: Multicast forwarding from Thread to Wi-Fi network
@pytest.mark.supported_targets
@pytest.mark.openthread_br
@pytest.mark.flaky(reruns=1, reruns_delay=1)
@pytest.mark.flaky(reruns=1, reruns_delay=5)
@pytest.mark.parametrize(
'config, count, app_path, target, port',
[
@@ -318,7 +320,7 @@ def test_multicast_forwarding_A(Init_interface: bool, dut: tuple[IdfDut, IdfDut,
],
indirect=True,
)
def test_multicast_forwarding_B(Init_interface: bool, dut: tuple[IdfDut, IdfDut, IdfDut]) -> None:
def test_multicast_forwarding_B(Init_interface: bool, dut: Tuple[IdfDut, IdfDut, IdfDut]) -> None:
br = dut[2]
cli = dut[1]
assert Init_interface
@@ -347,8 +349,8 @@ def test_multicast_forwarding_B(Init_interface: bool, dut: tuple[IdfDut, IdfDut,
while udp_mission.is_alive():
time.sleep(1)
finally:
ocf.execute_command(br, 'factoryreset')
ocf.execute_command(cli, 'factoryreset')
ocf.stop_thread(cli)
ocf.stop_thread(br)
time.sleep(3)
assert b'hello' in myudp.udp_bytes
@@ -356,7 +358,7 @@ def test_multicast_forwarding_B(Init_interface: bool, dut: tuple[IdfDut, IdfDut,
# Case 5: discover dervice published by Thread device
@pytest.mark.supported_targets
@pytest.mark.openthread_br
@pytest.mark.flaky(reruns=1, reruns_delay=1)
@pytest.mark.flaky(reruns=1, reruns_delay=5)
@pytest.mark.parametrize(
'config, count, app_path, target, port',
[
@@ -374,7 +376,7 @@ def test_multicast_forwarding_B(Init_interface: bool, dut: tuple[IdfDut, IdfDut,
indirect=True,
)
def test_service_discovery_of_Thread_device(
Init_interface: bool, Init_avahi: bool, dut: tuple[IdfDut, IdfDut, IdfDut]
Init_interface: bool, Init_avahi: bool, dut: Tuple[IdfDut, IdfDut, IdfDut]
) -> None:
br = dut[2]
cli = dut[1]
@@ -410,15 +412,15 @@ def test_service_discovery_of_Thread_device(
logging.info(f'avahi-browse:\n {out_str}')
assert 'myTest' in str(out_str)
finally:
ocf.execute_command(br, 'factoryreset')
ocf.execute_command(cli, 'factoryreset')
ocf.stop_thread(cli)
ocf.stop_thread(br)
time.sleep(3)
# Case 6: discover dervice published by Wi-Fi device
@pytest.mark.supported_targets
@pytest.mark.openthread_br
@pytest.mark.flaky(reruns=1, reruns_delay=1)
@pytest.mark.flaky(reruns=1, reruns_delay=5)
@pytest.mark.parametrize(
'config, count, app_path, target, port',
[
@@ -436,7 +438,7 @@ def test_service_discovery_of_Thread_device(
indirect=True,
)
def test_service_discovery_of_WiFi_device(
Init_interface: bool, Init_avahi: bool, dut: tuple[IdfDut, IdfDut, IdfDut]
Init_interface: bool, Init_avahi: bool, dut: Tuple[IdfDut, IdfDut, IdfDut]
) -> None:
br = dut[2]
cli = dut[1]
@@ -481,15 +483,15 @@ def test_service_discovery_of_WiFi_device(
finally:
ocf.host_close_service()
sp.terminate()
ocf.execute_command(br, 'factoryreset')
ocf.execute_command(cli, 'factoryreset')
ocf.stop_thread(cli)
ocf.stop_thread(br)
time.sleep(3)
# Case 7: ICMP communication via NAT64
@pytest.mark.supported_targets
@pytest.mark.openthread_br
@pytest.mark.flaky(reruns=1, reruns_delay=1)
@pytest.mark.flaky(reruns=1, reruns_delay=5)
@pytest.mark.parametrize(
'config, count, app_path, target, port',
[
@@ -506,7 +508,7 @@ def test_service_discovery_of_WiFi_device(
],
indirect=True,
)
def test_ICMP_NAT64(Init_interface: bool, dut: tuple[IdfDut, IdfDut, IdfDut]) -> None:
def test_ICMP_NAT64(Init_interface: bool, dut: Tuple[IdfDut, IdfDut, IdfDut]) -> None:
br = dut[2]
cli = dut[1]
assert Init_interface
@@ -520,15 +522,15 @@ def test_ICMP_NAT64(Init_interface: bool, dut: tuple[IdfDut, IdfDut, IdfDut]) ->
rx_nums = ocf.ot_ping(cli, str(host_ipv4_address), count=5)[1]
assert rx_nums != 0
finally:
ocf.execute_command(br, 'factoryreset')
ocf.execute_command(cli, 'factoryreset')
ocf.stop_thread(cli)
ocf.stop_thread(br)
time.sleep(3)
# Case 8: UDP communication via NAT64
@pytest.mark.supported_targets
@pytest.mark.openthread_br
@pytest.mark.flaky(reruns=1, reruns_delay=1)
@pytest.mark.flaky(reruns=1, reruns_delay=5)
@pytest.mark.parametrize(
'config, count, app_path, target, port',
[
@@ -545,7 +547,7 @@ def test_ICMP_NAT64(Init_interface: bool, dut: tuple[IdfDut, IdfDut, IdfDut]) ->
],
indirect=True,
)
def test_UDP_NAT64(Init_interface: bool, dut: tuple[IdfDut, IdfDut, IdfDut]) -> None:
def test_UDP_NAT64(Init_interface: bool, dut: Tuple[IdfDut, IdfDut, IdfDut]) -> None:
br = dut[2]
cli = dut[1]
assert Init_interface
@@ -576,8 +578,8 @@ def test_UDP_NAT64(Init_interface: bool, dut: tuple[IdfDut, IdfDut, IdfDut]) ->
while udp_mission.is_alive():
time.sleep(1)
finally:
ocf.execute_command(br, 'factoryreset')
ocf.execute_command(cli, 'factoryreset')
ocf.stop_thread(cli)
ocf.stop_thread(br)
time.sleep(3)
assert b'hello' in myudp.udp_bytes
@@ -585,7 +587,7 @@ def test_UDP_NAT64(Init_interface: bool, dut: tuple[IdfDut, IdfDut, IdfDut]) ->
# Case 9: TCP communication via NAT64
@pytest.mark.supported_targets
@pytest.mark.openthread_br
@pytest.mark.flaky(reruns=1, reruns_delay=1)
@pytest.mark.flaky(reruns=1, reruns_delay=5)
@pytest.mark.parametrize(
'config, count, app_path, target, port',
[
@@ -602,7 +604,7 @@ def test_UDP_NAT64(Init_interface: bool, dut: tuple[IdfDut, IdfDut, IdfDut]) ->
],
indirect=True,
)
def test_TCP_NAT64(Init_interface: bool, dut: tuple[IdfDut, IdfDut, IdfDut]) -> None:
def test_TCP_NAT64(Init_interface: bool, dut: Tuple[IdfDut, IdfDut, IdfDut]) -> None:
br = dut[2]
cli = dut[1]
assert Init_interface
@@ -639,8 +641,8 @@ def test_TCP_NAT64(Init_interface: bool, dut: tuple[IdfDut, IdfDut, IdfDut]) ->
while tcp_mission.is_alive():
time.sleep(1)
finally:
ocf.execute_command(br, 'factoryreset')
ocf.execute_command(cli, 'factoryreset')
ocf.stop_thread(cli)
ocf.stop_thread(br)
time.sleep(3)
assert b'hello' in mytcp.tcp_bytes
@@ -649,6 +651,7 @@ def test_TCP_NAT64(Init_interface: bool, dut: tuple[IdfDut, IdfDut, IdfDut]) ->
@pytest.mark.esp32h2
@pytest.mark.esp32c6
@pytest.mark.openthread_sleep
@pytest.mark.flaky(reruns=1, reruns_delay=5)
@pytest.mark.parametrize(
'config, count, app_path, target, port',
[
@@ -673,9 +676,10 @@ def test_TCP_NAT64(Init_interface: bool, dut: tuple[IdfDut, IdfDut, IdfDut]) ->
],
indirect=True,
)
def test_ot_sleepy_device(dut: tuple[IdfDut, IdfDut]) -> None:
def test_ot_sleepy_device(dut: Tuple[IdfDut, IdfDut]) -> None:
leader = dut[0]
sleepy_device = dut[1]
ocf.hardreset_dut(sleepy_device)
fail_info = re.compile(r'Core\W*?\d\W*?register dump')
try:
ocf.init_thread(leader)
@@ -702,13 +706,14 @@ def test_ot_sleepy_device(dut: tuple[IdfDut, IdfDut]) -> None:
assert not bool(fail_info.search(str(output)))
finally:
ocf.execute_command(leader, 'factoryreset')
ocf.hardreset_dut(sleepy_device)
time.sleep(3)
# Case 11: Basic startup Test of BR
@pytest.mark.supported_targets
@pytest.mark.openthread_br
@pytest.mark.flaky(reruns=1, reruns_delay=1)
@pytest.mark.flaky(reruns=1, reruns_delay=5)
@pytest.mark.parametrize(
'config, count, app_path, target, port',
[
@@ -724,7 +729,7 @@ def test_ot_sleepy_device(dut: tuple[IdfDut, IdfDut]) -> None:
],
indirect=True,
)
def test_basic_startup(dut: tuple[IdfDut, IdfDut]) -> None:
def test_basic_startup(dut: Tuple[IdfDut, IdfDut]) -> None:
br = dut[1]
dut[0].serial.stop_redirect_thread()
try:
@@ -743,14 +748,14 @@ def test_basic_startup(dut: tuple[IdfDut, IdfDut]) -> None:
br.expect('Done', timeout=5)
assert ocf.wait_for_join(br, 'leader')
finally:
ocf.execute_command(br, 'factoryreset')
ocf.stop_thread(br)
time.sleep(3)
# Case 12: Curl a website via DNS and NAT64
@pytest.mark.supported_targets
@pytest.mark.openthread_bbr
@pytest.mark.flaky(reruns=1, reruns_delay=1)
@pytest.mark.flaky(reruns=1, reruns_delay=5)
@pytest.mark.parametrize(
'config, count, app_path, target, port',
[
@@ -767,7 +772,7 @@ def test_basic_startup(dut: tuple[IdfDut, IdfDut]) -> None:
],
indirect=True,
)
def test_NAT64_DNS(Init_interface: bool, dut: tuple[IdfDut, IdfDut, IdfDut]) -> None:
def test_NAT64_DNS(Init_interface: bool, dut: Tuple[IdfDut, IdfDut, IdfDut]) -> None:
br = dut[2]
cli = dut[1]
assert Init_interface
@@ -775,24 +780,25 @@ def test_NAT64_DNS(Init_interface: bool, dut: tuple[IdfDut, IdfDut, IdfDut]) ->
formBasicWiFiThreadNetwork(br, cli)
try:
ocf.wait_for_host_network()
ocf.execute_command(br, 'bbr')
br.expect('server16', timeout=5)
ocf.execute_command(cli, 'dns64server 8.8.8.8')
cli.expect('Done', timeout=5)
command = 'curl http://www.espressif.com'
message = ocf.get_ouput_string(cli, command, 10)
assert '<html>' in str(message)
assert 'html' in str(message)
assert '301 Moved Permanently' in str(message)
finally:
ocf.execute_command(br, 'factoryreset')
ocf.execute_command(cli, 'factoryreset')
ocf.stop_thread(cli)
ocf.stop_thread(br)
time.sleep(3)
# Case 13: Meshcop discovery of Border Router
@pytest.mark.supported_targets
@pytest.mark.openthread_br
@pytest.mark.flaky(reruns=1, reruns_delay=1)
@pytest.mark.flaky(reruns=1, reruns_delay=5)
@pytest.mark.parametrize(
'config, count, app_path, target, port',
[
@@ -808,7 +814,7 @@ def test_NAT64_DNS(Init_interface: bool, dut: tuple[IdfDut, IdfDut, IdfDut]) ->
],
indirect=True,
)
def test_br_meshcop(Init_interface: bool, Init_avahi: bool, dut: tuple[IdfDut, IdfDut]) -> None:
def test_br_meshcop(Init_interface: bool, Init_avahi: bool, dut: Tuple[IdfDut, IdfDut]) -> None:
br = dut[1]
assert Init_interface
assert Init_avahi
@@ -847,14 +853,14 @@ def test_br_meshcop(Init_interface: bool, Init_avahi: bool, dut: tuple[IdfDut, I
assert 'vn=OpenThread' in str(output_str)
assert 'rv=1' in str(output_str)
finally:
ocf.execute_command(br, 'factoryreset')
ocf.stop_thread(br)
time.sleep(3)
# Case 14: Curl a website over HTTPS via DNS and NAT64
@pytest.mark.supported_targets
@pytest.mark.openthread_bbr
@pytest.mark.flaky(reruns=1, reruns_delay=1)
@pytest.mark.flaky(reruns=1, reruns_delay=5)
@pytest.mark.parametrize(
'config, count, app_path, target, port',
[
@@ -871,7 +877,7 @@ def test_br_meshcop(Init_interface: bool, Init_avahi: bool, dut: tuple[IdfDut, I
],
indirect=True,
)
def test_https_NAT64_DNS(Init_interface: bool, dut: tuple[IdfDut, IdfDut, IdfDut]) -> None:
def test_https_NAT64_DNS(Init_interface: bool, dut: Tuple[IdfDut, IdfDut, IdfDut]) -> None:
br = dut[2]
cli = dut[1]
assert Init_interface
@@ -879,22 +885,23 @@ def test_https_NAT64_DNS(Init_interface: bool, dut: tuple[IdfDut, IdfDut, IdfDut
formBasicWiFiThreadNetwork(br, cli)
try:
ocf.wait_for_host_network()
ocf.execute_command(cli, 'dns64server 8.8.8.8')
cli.expect('Done', timeout=5)
command = 'curl https://www.example.com/'
message = ocf.get_ouput_string(cli, command, 20)
assert '<html>' in str(message)
assert 'This domain is for use in illustrative examples in documents' in str(message)
assert 'html' in str(message)
assert 'This domain is for use in' in str(message)
finally:
ocf.execute_command(br, 'factoryreset')
ocf.execute_command(cli, 'factoryreset')
ocf.stop_thread(cli)
ocf.stop_thread(br)
time.sleep(3)
# Case 15: Thread network formation and attaching with TREL
@pytest.mark.supported_targets
@pytest.mark.openthread_br
@pytest.mark.flaky(reruns=1, reruns_delay=1)
@pytest.mark.flaky(reruns=1, reruns_delay=5)
@pytest.mark.parametrize(
'config, count, app_path, target, port',
[
@@ -910,7 +917,7 @@ def test_https_NAT64_DNS(Init_interface: bool, dut: tuple[IdfDut, IdfDut, IdfDut
],
indirect=True,
)
def test_trel_connect(dut: tuple[IdfDut, IdfDut]) -> None:
def test_trel_connect(dut: Tuple[IdfDut, IdfDut]) -> None:
trel_s3 = dut[1]
trel_c6 = dut[0]
trel_list = [trel_c6]
@@ -940,16 +947,16 @@ def test_trel_connect(dut: tuple[IdfDut, IdfDut]) -> None:
rx_nums = ocf.ot_ping(trel_s3, trel_mleid_addr, count=10)[1]
assert rx_nums > 5
finally:
ocf.execute_command(trel_s3, 'factoryreset')
for trel in trel_list:
ocf.execute_command(trel, 'factoryreset')
ocf.stop_thread(trel)
ocf.stop_thread(trel_s3)
time.sleep(3)
# Case 16: Thread network BR lib check
@pytest.mark.supported_targets
@pytest.mark.openthread_br
@pytest.mark.flaky(reruns=1, reruns_delay=1)
@pytest.mark.flaky(reruns=1, reruns_delay=5)
@pytest.mark.parametrize(
'config, count, app_path, target, port',
[
@@ -965,7 +972,7 @@ def test_trel_connect(dut: tuple[IdfDut, IdfDut]) -> None:
],
indirect=True,
)
def test_br_lib_check(dut: tuple[IdfDut, IdfDut]) -> None:
def test_br_lib_check(dut: Tuple[IdfDut, IdfDut]) -> None:
br = dut[1]
dut[0].serial.stop_redirect_thread()
try:
@@ -979,6 +986,7 @@ def test_br_lib_check(dut: tuple[IdfDut, IdfDut]) -> None:
# Case 17: SSED test
@pytest.mark.openthread_sleep
@pytest.mark.flaky(reruns=1, reruns_delay=5)
@pytest.mark.parametrize(
'config, count, app_path, target, port',
[
@@ -1003,10 +1011,11 @@ def test_br_lib_check(dut: tuple[IdfDut, IdfDut]) -> None:
],
indirect=True,
)
def test_ot_ssed_device(dut: tuple[IdfDut, IdfDut]) -> None:
def test_ot_ssed_device(dut: Tuple[IdfDut, IdfDut]) -> None:
leader = dut[0]
ssed_device = dut[1]
try:
ocf.hardreset_dut(ssed_device)
# CI device must have external XTAL to run SSED case, we will check this here first
ssed_device.expect('32k XTAL in use', timeout=10)
ocf.init_thread(leader)
@@ -1046,4 +1055,5 @@ def test_ot_ssed_device(dut: tuple[IdfDut, IdfDut]) -> None:
ocf.ping_and_check(dut=leader, target=ssed_address, tx_total=10, timeout=6)
finally:
ocf.execute_command(leader, 'factoryreset')
ocf.hardreset_dut(ssed_device)
time.sleep(3)