Merge branch 'fix/optimize_the_process_of_ot_ci' into 'master'

fix(openthread): optimize the process of openthread pytest cases

See merge request espressif/esp-idf!47672
This commit is contained in:
Zhang Wen Xu
2026-04-21 09:04:50 +00:00
2 changed files with 67 additions and 48 deletions
+29 -16
View File
@@ -87,7 +87,7 @@ class wifi_parameter:
self.retry_times = retry_times
def joinThreadNetwork(dut: IdfDut, thread: thread_parameter) -> None:
def SetThreadNetworkPara(dut: IdfDut, thread: thread_parameter) -> None:
if thread.dataset:
command = 'dataset set active ' + thread.dataset
execute_command(dut, command)
@@ -125,6 +125,9 @@ def joinThreadNetwork(dut: IdfDut, thread: thread_parameter) -> None:
dut.expect('Done', timeout=5)
execute_command(dut, 'dataset commit active')
dut.expect('Done', timeout=5)
def StartThreadNetwork(dut: IdfDut, thread: thread_parameter) -> None:
if thread.bbr:
execute_command(dut, 'bbr enable')
dut.expect('Done', timeout=5)
@@ -155,7 +158,7 @@ def joinWiFiNetwork(dut: IdfDut, wifi: wifi_parameter) -> tuple[str, int]:
ip_address = ''
for order in range(1, wifi.retry_times):
command = 'wifi connect -s ' + str(wifi.ssid) + ' -p ' + str(wifi.psk)
tmp = get_ouput_string(dut, command, 10)
tmp = get_output_string(dut, command, 10)
if 'sta ip' in str(tmp):
ip_address = re.findall(r'sta ip: (\w+.\w+.\w+.\w+),', str(tmp))[0]
wait(dut, 2)
@@ -361,11 +364,19 @@ def check_if_host_receive_ra(br: IdfDut) -> bool:
interface_name = get_host_interface_name()
clean_buffer(br)
omrprefix = get_omrprefix(br)
command = 'ip -6 route | grep ' + str(interface_name)
onlinkprefix = get_onlinkprefix(br)
command = f'ip -6 route show dev {interface_name}'
out_str = subprocess.getoutput(command)
logging.info(f'br omrprefix: {omrprefix}')
logging.info(f'host route table:\n {out_str}')
return str(omrprefix) in str(out_str)
has_omr_route = str(omrprefix) in str(out_str)
has_onlink_gua = host_global_address_has_onlink_prefix(interface_name, onlinkprefix)
logging.info(
'RA check: omrprefix=%s onlinkprefix=%s has_omr_route=%s has_onlink_gua=%s',
omrprefix,
onlinkprefix,
has_omr_route,
has_onlink_gua,
)
return has_omr_route and has_onlink_gua
def host_connect_wifi() -> None:
@@ -390,14 +401,16 @@ def wait_for_host_ra_route(
for attempt in range(1, retries + 1):
if is_joined_wifi_network(br):
log_ipv6_addr_route_by_interface(interface_name, title='RA Ready!')
logging.info(f'Host RA is ready on attempt {attempt}/{retries}.')
return
logging.info(f'Host route not ready yet, retry {attempt}/{retries}...')
logging.info(f'Host RA not ready yet, retry {attempt}/{retries}...')
log_ipv6_addr_route_by_interface(interface_name, title=f'Wait RA ({attempt}/{retries})')
if attempt < retries:
time.sleep(interval_s)
time.sleep(interval_s)
raise AssertionError('Host did not receive RA / OMR route in time')
log_ipv6_addr_route_by_interface(interface_name, title='RA check failed')
raise AssertionError('Host did not receive valid RA in time (OMR route and onlink GUA both required)')
def host_global_address_has_onlink_prefix(interface_name: str, onlinkprefix: str) -> bool:
@@ -436,17 +449,17 @@ def wait_for_host_onlink_global_address(
interface_name = get_host_interface_name()
onlinkprefix = get_onlinkprefix(br)
logging.info(f'Wait for host GUA in BR onlink prefix {onlinkprefix!r} on {interface_name}')
log_ipv6_addr_route_by_interface(interface_name, title='Wait onlink GUA (initial)')
for attempt in range(1, retries + 1):
if host_global_address_has_onlink_prefix(interface_name, onlinkprefix):
log_ipv6_addr_route_by_interface(interface_name, title='Onlink GUA ready!')
logging.info(f'Host onlink GUA is ready on attempt {attempt}/{retries}.')
return onlinkprefix
logging.info(f'Host onlink GUA not ready yet, retry {attempt}/{retries}...')
log_ipv6_addr_route_by_interface(interface_name, title=f'Wait onlink GUA ({attempt}/{retries})')
time.sleep(interval_s)
if attempt < retries:
time.sleep(interval_s)
log_ipv6_addr_route_by_interface(interface_name, title='Onlink GUA check failed')
raise AssertionError(f'Host did not get a global IPv6 address in onlink prefix {onlinkprefix!r} in time')
@@ -454,7 +467,7 @@ thread_ipv6_group = 'ff04:0:0:0:0:0:0:125'
def check_ipmaddr(dut: IdfDut) -> bool:
info = get_ouput_string(dut, 'ipmaddr', 2)
info = get_output_string(dut, 'ipmaddr', 2)
if thread_ipv6_group in str(info):
return True
return False
@@ -772,7 +785,7 @@ def execute_command(dut: IdfDut, command: str, prefix: str = 'ot ') -> None:
dut.write(prefix + command)
def get_ouput_string(dut: IdfDut, command: str, wait_time: int) -> str:
def get_output_string(dut: IdfDut, command: str, wait_time: int) -> str:
execute_command(dut, command)
tmp = dut.expect(pexpect.TIMEOUT, timeout=wait_time)
clean_buffer(dut)
+38 -32
View File
@@ -145,7 +145,8 @@ def test_thread_connect(dut: tuple[IdfDut, IdfDut, IdfDut]) -> None:
for cli in cli_list:
ocf.init_thread(cli)
br_ot_para = copy.copy(default_br_ot_para)
ocf.joinThreadNetwork(br, br_ot_para)
ocf.SetThreadNetworkPara(br, br_ot_para)
ocf.StartThreadNetwork(br, br_ot_para)
cli_ot_para = copy.copy(default_cli_ot_para)
cli_ot_para.dataset = ocf.getDataset(br)
try:
@@ -153,7 +154,8 @@ def test_thread_connect(dut: tuple[IdfDut, IdfDut, IdfDut]) -> None:
for cli in cli_list:
cli_ot_para.exaddr = router_extaddr_list[order]
order = order + 1
ocf.joinThreadNetwork(cli, cli_ot_para)
ocf.SetThreadNetworkPara(cli, cli_ot_para)
ocf.StartThreadNetwork(cli, cli_ot_para)
for cli in cli_list:
cli_mleid_addr = ocf.get_mleid_addr(cli)
br_mleid_addr = ocf.get_mleid_addr(br)
@@ -177,15 +179,20 @@ def test_thread_connect(dut: tuple[IdfDut, IdfDut, IdfDut]) -> None:
def formBasicWiFiThreadNetwork(br: IdfDut, cli: IdfDut) -> None:
ocf.init_thread(br)
ocf.init_thread(cli)
otbr_wifi_para = copy.copy(default_br_wifi_para)
ocf.joinWiFiNetwork(br, otbr_wifi_para)
otbr_thread_para = copy.copy(default_br_ot_para)
ocf.joinThreadNetwork(br, otbr_thread_para)
otbr_wifi_para = copy.copy(default_br_wifi_para)
otbr_thread_para.extpanid = secrets.token_hex(8)
ocf.SetThreadNetworkPara(br, otbr_thread_para)
ocf.joinWiFiNetwork(br, otbr_wifi_para)
ocf.StartThreadNetwork(br, otbr_thread_para)
otcli_thread_para = copy.copy(default_cli_ot_para)
otcli_thread_para.dataset = ocf.getDataset(br)
otcli_thread_para.exaddr = '7766554433221101'
ocf.joinThreadNetwork(cli, otcli_thread_para)
ocf.SetThreadNetworkPara(cli, otcli_thread_para)
ocf.StartThreadNetwork(cli, otcli_thread_para)
ocf.wait(cli, 10)
# Polling wait for host RA readiness: both OMR route and onlink GUA must exist.
ocf.wait_for_host_ra_route(br)
# Case 2: Bidirectional IPv6 connectivity
@@ -215,16 +222,12 @@ def test_Bidirectional_IPv6_connectivity(Init_interface: bool, dut: tuple[IdfDut
formBasicWiFiThreadNetwork(br, cli)
try:
ocf.wait_for_host_ra_route(br)
onlinkprefix = ocf.wait_for_host_onlink_global_address(br)
logging.info(f'br onlinkprefix: {onlinkprefix}')
onlinkprefix = ocf.get_onlinkprefix(br)
cli_global_unicast_addr = ocf.get_global_unicast_addr(cli, br)
logging.info(f'cli_global_unicast_addr {cli_global_unicast_addr}')
interface_name = ocf.get_host_interface_name()
ocf.log_ipv6_addr_route_by_interface(interface_name, title='Before ping test')
command = 'ping ' + str(cli_global_unicast_addr) + ' -c 10'
out_str = subprocess.getoutput(command)
ocf.log_ipv6_addr_route_by_interface(interface_name, title='After ping test')
logging.info(f'ping result:\n{out_str}')
role = re.findall(r' (\d+)%', str(out_str))[0]
assert role != '100'
@@ -234,7 +237,7 @@ def test_Bidirectional_IPv6_connectivity(Init_interface: bool, dut: tuple[IdfDut
pattern = rf'\W+({onlinkprefix}(?:\w+:){{3}}\w+)\W+'
host_global_unicast_addr = re.findall(pattern, out_str)
logging.info(f'host_global_unicast_addr: {host_global_unicast_addr}')
if host_global_unicast_addr is None:
if not host_global_unicast_addr:
raise Exception(f'onlinkprefix: {onlinkprefix}, host_global_unicast_addr: {host_global_unicast_addr}')
rx_nums = 0
for ip_addr in host_global_unicast_addr:
@@ -275,7 +278,6 @@ def test_multicast_forwarding_A(Init_interface: bool, dut: tuple[IdfDut, IdfDut,
formBasicWiFiThreadNetwork(br, cli)
try:
ocf.wait_for_host_ra_route(br)
ocf.execute_command(br, 'bbr')
br.expect('server16', timeout=5)
assert ocf.thread_is_joined_group(cli)
@@ -328,7 +330,6 @@ def test_multicast_forwarding_B(Init_interface: bool, dut: tuple[IdfDut, IdfDut,
formBasicWiFiThreadNetwork(br, cli)
try:
ocf.wait_for_host_ra_route(br)
ocf.execute_command(br, 'bbr')
br.expect('server16', timeout=5)
ocf.execute_command(cli, 'udp open')
@@ -385,7 +386,6 @@ def test_service_discovery_of_Thread_device(
formBasicWiFiThreadNetwork(br, cli)
try:
ocf.wait_for_host_ra_route(br)
command = 'avahi-browse -rt _testyyy._udp'
out_str = subprocess.getoutput(command)
logging.info(f'avahi-browse:\n{out_str}')
@@ -447,7 +447,6 @@ def test_service_discovery_of_WiFi_device(
formBasicWiFiThreadNetwork(br, cli)
sp: subprocess.Popen | None = None
try:
ocf.wait_for_host_ra_route(br)
br_global_unicast_addr = ocf.get_global_unicast_addr(br, br)
command = 'dns config ' + br_global_unicast_addr
ocf.execute_command(cli, command)
@@ -458,12 +457,17 @@ def test_service_discovery_of_WiFi_device(
logging.info(f'domain name is: {domain_name}')
command = 'dns resolve ' + domain_name + '.default.service.arpa.'
ocf.execute_command(cli, command)
cli.expect('TTL', timeout=10)
cli.expect('Done', timeout=10)
for _ in range(3):
tmp = ocf.get_output_string(cli, command, 5)
if 'TTL' in tmp and 'Done' in str(tmp):
break
time.sleep(1)
else:
logging.info('DNS resolution failed after 3 retries, with no response received.')
assert False
command = 'dns browse _testxxx._udp.default.service.arpa'
tmp = ocf.get_ouput_string(cli, command, 10)
tmp = ocf.get_output_string(cli, command, 10)
assert 'Port:12347' not in str(tmp)
ocf.restart_avahi()
command = 'avahi-publish-service testxxx _testxxx._udp 12347 test=1235 dn="for_ci_br_test"'
@@ -472,12 +476,12 @@ def test_service_discovery_of_WiFi_device(
ocf.wait(cli, 5)
command = 'dns browse _testxxx._udp.default.service.arpa'
tmp = ocf.get_ouput_string(cli, command, 10)
tmp = ocf.get_output_string(cli, command, 10)
assert 'response for _testxxx' in str(tmp)
assert 'Port:12347' in str(tmp)
command = 'dns service testxxx _testxxx._udp.default.service.arpa.'
tmp = ocf.get_ouput_string(cli, command, 10)
tmp = ocf.get_output_string(cli, command, 10)
assert 'response for testxxx' in str(tmp)
assert 'Port:12347' in str(tmp)
finally:
@@ -516,7 +520,6 @@ def test_ICMP_NAT64(Init_interface: bool, dut: tuple[IdfDut, IdfDut, IdfDut]) ->
formBasicWiFiThreadNetwork(br, cli)
try:
ocf.wait_for_host_ra_route(br)
host_ipv4_address = ocf.get_host_ipv4_address()
logging.info(f'host_ipv4_address: {host_ipv4_address}')
rx_nums = ocf.ot_ping(cli, str(host_ipv4_address), count=5)[1]
@@ -554,7 +557,6 @@ def test_UDP_NAT64(Init_interface: bool, dut: tuple[IdfDut, IdfDut, IdfDut]) ->
formBasicWiFiThreadNetwork(br, cli)
try:
ocf.wait_for_host_ra_route(br)
ocf.execute_command(br, 'bbr')
br.expect('server16', timeout=5)
ocf.execute_command(cli, 'udp open')
@@ -610,7 +612,6 @@ def test_TCP_NAT64(Init_interface: bool, dut: tuple[IdfDut, IdfDut, IdfDut]) ->
formBasicWiFiThreadNetwork(br, cli)
try:
ocf.wait_for_host_ra_route(br)
ocf.execute_command(br, 'bbr')
br.expect('server16', timeout=5)
ocf.execute_command(cli, 'tcpsockclient open')
@@ -690,7 +691,8 @@ def test_ot_sleepy_device(dut: tuple[IdfDut, IdfDut]) -> None:
ocf.init_thread(leader)
time.sleep(3)
leader_para = ocf.thread_parameter('leader', '', '12', '7766554433221100', False)
ocf.joinThreadNetwork(leader, leader_para)
ocf.SetThreadNetworkPara(leader, leader_para)
ocf.StartThreadNetwork(leader, leader_para)
ocf.wait(leader, 5)
dataset = ocf.getDataset(leader)
ocf.execute_command(sleepy_device, 'mode -')
@@ -795,7 +797,7 @@ def test_NAT64_DNS(Init_interface: bool, dut: tuple[IdfDut, IdfDut, IdfDut]) ->
ocf.execute_command(cli, 'dns64server 8.8.8.8')
cli.expect('Done', timeout=5)
command = 'curl http://www.espressif.com'
message = ocf.get_ouput_string(cli, command, 10)
message = ocf.get_output_string(cli, command, 10)
assert 'html' in str(message)
assert '301 Moved Permanently' in str(message)
finally:
@@ -836,7 +838,8 @@ def test_br_meshcop(Init_interface: bool, Init_avahi: bool, dut: tuple[IdfDut, I
br_thread_para = copy.copy(default_br_ot_para)
networkname = 'OTCI-' + str(secrets.token_hex(1))
br_thread_para.setnetworkname(networkname)
ocf.joinThreadNetwork(br, br_thread_para)
ocf.SetThreadNetworkPara(br, br_thread_para)
ocf.StartThreadNetwork(br, br_thread_para)
ocf.wait(br, 10)
ocf.wait_for_host_ra_route(br)
command = 'timeout 3 avahi-browse -r _meshcop._udp'
@@ -895,7 +898,7 @@ def test_https_NAT64_DNS(Init_interface: bool, dut: tuple[IdfDut, IdfDut, IdfDut
ocf.execute_command(cli, 'dns64server 8.8.8.8')
cli.expect('Done', timeout=5)
command = 'curl https://www.example.com/'
message = ocf.get_ouput_string(cli, command, 20)
message = ocf.get_output_string(cli, command, 20)
assert 'html' in str(message)
assert 'This domain is for use in' in str(message)
finally:
@@ -935,7 +938,8 @@ def test_trel_connect(dut: tuple[IdfDut, IdfDut]) -> None:
ocf.init_thread(trel)
trel_leader_para = copy.copy(default_br_ot_para)
trel_leader_para.bbr = False
ocf.joinThreadNetwork(trel_s3, trel_leader_para)
ocf.SetThreadNetworkPara(trel_s3, trel_leader_para)
ocf.StartThreadNetwork(trel_s3, trel_leader_para)
trel_para = copy.copy(default_cli_ot_para)
trel_para.dataset = ocf.getDataset(trel_s3)
try:
@@ -943,7 +947,8 @@ def test_trel_connect(dut: tuple[IdfDut, IdfDut]) -> None:
for trel in trel_list:
trel_para.exaddr = router_extaddr_list[order]
order = order + 1
ocf.joinThreadNetwork(trel, trel_para)
ocf.SetThreadNetworkPara(trel, trel_para)
ocf.StartThreadNetwork(trel, trel_para)
for trel in trel_list:
trel_mleid_addr = ocf.get_mleid_addr(trel)
trel_s3_mleid_addr = ocf.get_mleid_addr(trel_s3)
@@ -1025,7 +1030,8 @@ def test_ot_ssed_device(dut: tuple[IdfDut, IdfDut]) -> None:
ocf.init_thread(leader)
time.sleep(3)
leader_para = ocf.thread_parameter('leader', '', '12', '7766554433221100', False)
ocf.joinThreadNetwork(leader, leader_para)
ocf.SetThreadNetworkPara(leader, leader_para)
ocf.StartThreadNetwork(leader, leader_para)
ocf.wait(leader, 5)
ocf.execute_command(leader, 'networkkey')
dataset = ocf.getDataset(leader)