Merge branch 'bugfix/fix_thread_ci_testcases_not_stable_v5.5' into 'release/v5.5'

Bugfix/fix thread ci testcases not stable v5.5

See merge request espressif/esp-idf!43150
This commit is contained in:
Jiang Jiang Jian
2025-11-11 19:44:00 +08:00
2 changed files with 177 additions and 130 deletions
+106 -70
View File
@@ -2,14 +2,17 @@
# SPDX-License-Identifier: Unlicense OR CC0-1.0
# !/usr/bin/env python3
# this file defines some functions for testing cli and br under pytest framework
import logging
import os
import re
import socket
import struct
import subprocess
import time
from collections.abc import Callable
from functools import wraps
from typing import Callable
from typing import Optional
from typing import Tuple
import netifaces
import pexpect
@@ -18,19 +21,33 @@ from pytest_embedded_idf.dut import IdfDut
def extract_address(
command: str, pattern: str, default_return: str = ''
command: str,
pattern: str,
default_return: str = '',
retries: int = 3,
delay: int = 2,
) -> Callable[[Callable[[str], str]], Callable[[IdfDut], str]]:
def decorator(func: Callable[[str], str]) -> Callable[[IdfDut], str]:
@wraps(func)
def wrapper(dut: IdfDut) -> str:
clean_buffer(dut)
execute_command(dut, command)
try:
result = dut.expect(pattern, timeout=5)[1].decode()
except Exception as e:
print(f'Error: {e}')
return default_return
return func(result)
# requires Python3.10
# last_exception: Exception | None = None
last_exception: Optional[Exception] = None
for attempt in range(1, retries + 1):
try:
clean_buffer(dut)
execute_command(dut, command)
result = dut.expect(pattern, timeout=5)[1].decode()
return func(result)
except Exception as e:
logging.exception(f'[{command}] Attempt {attempt}/{retries} failed: {e}')
last_exception = e
if attempt < retries:
time.sleep(delay)
if last_exception:
logging.exception(f'[{command}] Giving up after {retries} retries.')
return default_return
return wrapper
@@ -132,7 +149,7 @@ def wait_for_join(dut: IdfDut, role: str) -> bool:
return False
def joinWiFiNetwork(dut: IdfDut, wifi: wifi_parameter) -> tuple[str, int]:
def joinWiFiNetwork(dut: IdfDut, wifi: wifi_parameter) -> Tuple[str, int]:
clean_buffer(dut)
ip_address = ''
for order in range(1, wifi.retry_times):
@@ -151,7 +168,7 @@ def getDeviceRole(dut: IdfDut) -> str:
wait(dut, 1)
execute_command(dut, 'state')
role = dut.expect(r'\W+(\w+)\W+Done', timeout=5)[1].decode()
print(role)
logging.info(role)
return str(role)
@@ -172,6 +189,12 @@ def init_thread(dut: IdfDut) -> None:
reset_thread(dut)
def stop_thread(dut: IdfDut) -> None:
execute_command(dut, 'thread stop')
dut.expect('disabled', timeout=20)
reset_thread(dut)
def reset_thread(dut: IdfDut) -> None:
execute_command(dut, 'factoryreset')
dut.expect('OpenThread attached to netif', timeout=20)
@@ -179,28 +202,28 @@ def reset_thread(dut: IdfDut) -> None:
clean_buffer(dut)
def hardreset_dut(dut: IdfDut) -> None:
dut.serial.hard_reset()
time.sleep(5)
execute_command(dut, 'factoryreset')
# get the mleid address of the thread
def get_mleid_addr(dut: IdfDut) -> str:
dut_adress = ''
execute_command(dut, 'ipaddr mleid')
dut_adress = dut.expect(r'\n((?:\w+:){7}\w+)\r', timeout=5)[1].decode()
return str(dut_adress)
@extract_address('ipaddr mleid', r'\n((?:\w+:){7}\w+)\r')
def get_mleid_addr(addr: str) -> str:
return addr
# get the rloc address of the thread
def get_rloc_addr(dut: IdfDut) -> str:
dut_adress = ''
execute_command(dut, 'ipaddr rloc')
dut_adress = dut.expect(r'\n((?:\w+:){7}\w+)\r', timeout=5)[1].decode()
return str(dut_adress)
@extract_address('ipaddr rloc', r'\n((?:\w+:){7}\w+)\r')
def get_rloc_addr(addr: str) -> str:
return addr
# get the linklocal address of the thread
def get_linklocal_addr(dut: IdfDut) -> str:
dut_adress = ''
execute_command(dut, 'ipaddr linklocal')
dut_adress = dut.expect(r'\n((?:\w+:){7}\w+)\r', timeout=5)[1].decode()
return str(dut_adress)
@extract_address('ipaddr linklocal', r'\n((?:\w+:){7}\w+)\r')
def get_linklocal_addr(addr: str) -> str:
return addr
# get the global unicast address of the thread:
@@ -221,7 +244,7 @@ def get_rloc16_addr(rloc16: str) -> str:
# ping of thread
def ot_ping(
dut: IdfDut, target: str, timeout: int = 5, count: int = 1, size: int = 56, interval: int = 1, hoplimit: int = 64
) -> tuple[int, int]:
) -> Tuple[int, int]:
command = f'ping {str(target)} {size} {count} {interval} {hoplimit} {str(timeout)}'
execute_command(dut, command)
transmitted = dut.expect(r'(\d+) packets transmitted', timeout=60)[1].decode()
@@ -310,7 +333,7 @@ def get_host_interface_name() -> str:
interface_name = config.get('interface_name')
if interface_name:
if interface_name == 'eth0':
print(
logging.warning(
f"Warning: 'eth0' is not recommended as a valid network interface. "
f"Please check and update the 'interface_name' in the configuration file: "
f'{config_path}'
@@ -318,9 +341,9 @@ def get_host_interface_name() -> str:
else:
return str(interface_name)
else:
print("Warning: Configuration file found but 'interface_name' is not defined.")
logging.warning("Warning: Configuration file found but 'interface_name' is not defined.")
except Exception as e:
print(f'Error: Failed to read or parse {config_path}. Details: {e}')
logging.error(f'Error: Failed to read or parse {config_path}. Details: {e}')
if 'eth1' in netifaces.interfaces():
return 'eth1'
@@ -338,8 +361,8 @@ def check_if_host_receive_ra(br: IdfDut) -> bool:
omrprefix = get_omrprefix(br)
command = 'ip -6 route | grep ' + str(interface_name)
out_str = subprocess.getoutput(command)
print('br omrprefix: ', str(omrprefix))
print('host route table:\n', str(out_str))
logging.info(f'br omrprefix: {omrprefix}')
logging.info(f'host route table:\n {out_str}')
return str(omrprefix) in str(out_str)
@@ -404,7 +427,7 @@ def create_host_udp_server(myudp: udp_parameter) -> None:
AF_INET = socket.AF_INET6
else:
AF_INET = socket.AF_INET
print('The host start to create udp server!')
logging.info('The host start to create udp server!')
if_index = socket.if_nametoindex(interface_name)
sock = socket.socket(AF_INET, socket.SOCK_DGRAM)
sock.bind((myudp.addr, myudp.port))
@@ -417,13 +440,14 @@ def create_host_udp_server(myudp: udp_parameter) -> None:
)
sock.settimeout(myudp.timeout)
myudp.init_flag = True
print('The host start to receive message!')
logging.info('The host start to receive message!')
myudp.udp_bytes = (sock.recvfrom(1024))[0]
print('The host has received message: ', myudp.udp_bytes)
udp_str = str(myudp.udp_bytes)
logging.info(f'The host has received message: {udp_str}')
except OSError:
print('The host did not receive message!')
logging.error('The host did not receive message!')
finally:
print('Close the socket.')
logging.info('Close the socket.')
sock.close()
@@ -438,10 +462,10 @@ def host_udp_send_message(udp_target: udp_parameter) -> None:
sock.bind(('::', 12350))
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BINDTODEVICE, interface_name.encode())
sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_HOPS, 32)
print('Host is sending message')
logging.info('Host is sending message')
sock.sendto(udp_target.udp_bytes, (udp_target.addr, udp_target.port))
except OSError:
print('Host cannot send message')
logging.error('Host cannot send message')
finally:
sock.close()
@@ -481,13 +505,13 @@ def host_close_service() -> None:
command = 'ps auxww | grep avahi-publish-s'
out_bytes = subprocess.check_output(command, shell=True, timeout=5)
out_str = out_bytes.decode('utf-8')
print('host close service avahi status:\n', out_str)
logging.info(f'host close service avahi status:\n {out_str}')
service_info = [line for line in out_str.splitlines() if 'testxxx _testxxx._udp' in line]
for line in service_info:
print('Process:', line)
logging.info(f'Process:{line}')
pid = line.split()[1]
command = 'kill -9 ' + pid
print('kill ', pid)
logging.info(f'kill {pid}')
subprocess.call(command, shell=True, timeout=5)
time.sleep(1)
@@ -520,24 +544,24 @@ def open_host_interface() -> None:
def get_domain() -> str:
hostname = socket.gethostname()
print('hostname is: ', hostname)
logging.info(f'hostname is: {hostname}')
command = 'ps -auxww | grep avahi-daemon | grep running'
out_str = subprocess.getoutput(command)
print('avahi status:\n', out_str)
logging.info(f'avahi status:\n {out_str}')
role = re.findall(r'\[([\w\W]+)\.local\]', str(out_str))[0]
print('active host is: ', role)
logging.info(f'active host is: {role}')
return str(role)
def flush_ipv6_addr_by_interface() -> None:
interface_name = get_host_interface_name()
print(f'flush ipv6 addr : {interface_name}')
logging.info(f'flush ipv6 addr : {interface_name}')
command_show_addr = f'ip -6 addr show dev {interface_name}'
command_show_route = f'ip -6 route show dev {interface_name}'
addr_before = subprocess.getoutput(command_show_addr)
route_before = subprocess.getoutput(command_show_route)
print(f'Before flush, IPv6 addresses: \n{addr_before}')
print(f'Before flush, IPv6 routes: \n{route_before}')
logging.info(f'Before flush, IPv6 addresses: \n{addr_before}')
logging.info(f'Before flush, IPv6 routes: \n{route_before}')
subprocess.run(['ip', 'link', 'set', interface_name, 'down'])
subprocess.run(['ip', '-6', 'addr', 'flush', 'dev', interface_name])
subprocess.run(['ip', '-6', 'route', 'flush', 'dev', interface_name])
@@ -545,8 +569,8 @@ def flush_ipv6_addr_by_interface() -> None:
time.sleep(5)
addr_after = subprocess.getoutput(command_show_addr)
route_after = subprocess.getoutput(command_show_route)
print(f'After flush, IPv6 addresses: \n{addr_after}')
print(f'After flush, IPv6 routes: \n{route_after}')
logging.info(f'After flush, IPv6 addresses: \n{addr_after}')
logging.info(f'After flush, IPv6 routes: \n{route_after}')
class tcp_parameter:
@@ -575,28 +599,29 @@ def create_host_tcp_server(mytcp: tcp_parameter) -> None:
AF_INET = socket.AF_INET6
else:
AF_INET = socket.AF_INET
print('The host start to create a tcp server!')
logging.info('The host start to create a tcp server!')
sock = socket.socket(AF_INET, socket.SOCK_STREAM)
sock.bind((mytcp.addr, mytcp.port))
sock.listen(5)
mytcp.listen_flag = True
print('The tcp server is waiting for connection!')
logging.info('The tcp server is waiting for connection!')
sock.settimeout(mytcp.timeout)
connfd, addr = sock.accept()
print('The tcp server connected with ', addr)
logging.info(f'The tcp server connected with {addr}')
mytcp.recv_flag = True
mytcp.tcp_bytes = connfd.recv(1024)
print('The tcp server has received message: ', mytcp.tcp_bytes)
tcp_str = str(mytcp.tcp_bytes)
logging.info(f'The tcp server has received message: {tcp_str}')
except OSError:
if mytcp.recv_flag:
print('The tcp server did not receive message!')
logging.error('The tcp server did not receive message!')
else:
print('The tcp server fail to connect!')
logging.error('The tcp server fail to connect!')
finally:
print('Close the socket.')
logging.info('Close the socket.')
sock.close()
@@ -616,22 +641,19 @@ def decimal_to_hex(decimal_str: str) -> str:
return hex_str
def get_omrprefix(br: IdfDut) -> str:
execute_command(br, 'br omrprefix')
omrprefix = br.expect(r'Local: ((?:\w+:){4}):/\d+\r', timeout=5)[1].decode()
return str(omrprefix)
@extract_address('br omrprefix', r'Local: ((?:\w+:){4}):/\d+\r')
def get_omrprefix(addr: str) -> str:
return addr
def get_onlinkprefix(br: IdfDut) -> str:
execute_command(br, 'br onlinkprefix')
onlinkprefix = br.expect(r'Local: ((?:\w+:){4}):/\d+\r', timeout=5)[1].decode()
return str(onlinkprefix)
@extract_address('br onlinkprefix', r'Local: ((?:\w+:){4}):/\d+\r')
def get_onlinkprefix(addr: str) -> str:
return addr
def get_nat64prefix(br: IdfDut) -> str:
execute_command(br, 'br nat64prefix')
nat64prefix = br.expect(r'Local: ((?:\w+:){6}):/\d+', timeout=5)[1].decode()
return str(nat64prefix)
@extract_address('br nat64prefix', r'Local: ((?:\w+:){6}):/\d+')
def get_nat64prefix(addr: str) -> str:
return addr
def execute_command(dut: IdfDut, command: str, prefix: str = 'ot ') -> None:
@@ -644,3 +666,17 @@ def get_ouput_string(dut: IdfDut, command: str, wait_time: int) -> str:
tmp = dut.expect(pexpect.TIMEOUT, timeout=wait_time)
clean_buffer(dut)
return str(tmp)
def wait_for_host_network(host: str = '8.8.8.8', retries: int = 6, interval: int = 10) -> None:
for attempt in range(1, retries + 1):
try:
subprocess.run(['ping', '-c', '1', '-W', '2', host], check=True)
logging.info(f'Host network reachable on attempt {attempt}')
return
except subprocess.CalledProcessError:
logging.info(f'Ping attempt {attempt} failed, retrying in {interval} seconds...')
if attempt < retries:
time.sleep(interval)
else:
raise RuntimeError(f'Host network is not reachable after {retries} attempts.')
+71 -60
View File
@@ -2,6 +2,7 @@
# SPDX-License-Identifier: Unlicense OR CC0-1.0
# !/usr/bin/env python3
import copy
import logging
import os.path
import random
import re
@@ -76,7 +77,7 @@ from pytest_embedded_idf.dut import IdfDut
@pytest.fixture(scope='module', name='Init_avahi')
def fixture_Init_avahi() -> bool:
print('Init Avahi')
logging.info('Init Avahi')
ocf.start_avahi()
time.sleep(10)
return True
@@ -84,7 +85,7 @@ def fixture_Init_avahi() -> bool:
@pytest.fixture(name='Init_interface')
def fixture_Init_interface() -> bool:
print('Init interface')
logging.info('Init interface')
ocf.flush_ipv6_addr_by_interface()
# The sleep time is set based on experience; reducing it might cause the host to be unready.
time.sleep(30)
@@ -104,7 +105,7 @@ PORT_MAPPING = {'ESPPORT1': 'esp32h2', 'ESPPORT2': 'esp32s3', 'ESPPORT3': 'esp32
# Case 1: Thread network formation and attaching
@pytest.mark.openthread_br
@pytest.mark.flaky(reruns=1, reruns_delay=1)
@pytest.mark.flaky(reruns=1, reruns_delay=5)
@pytest.mark.parametrize(
'config, count, app_path, target, port',
[
@@ -159,9 +160,9 @@ def test_thread_connect(dut: Tuple[IdfDut, IdfDut, IdfDut]) -> None:
rx_nums = ocf.ot_ping(br, cli_mleid_addr, count=5)[1]
assert rx_nums == 5
finally:
ocf.execute_command(br, 'factoryreset')
for cli in cli_list:
ocf.execute_command(cli, 'factoryreset')
ocf.stop_thread(cli)
ocf.stop_thread(br)
time.sleep(3)
@@ -187,7 +188,7 @@ def formBasicWiFiThreadNetwork(br: IdfDut, cli: IdfDut) -> None:
# Case 2: Bidirectional IPv6 connectivity
@pytest.mark.openthread_br
@pytest.mark.flaky(reruns=1, reruns_delay=1)
@pytest.mark.flaky(reruns=1, reruns_delay=5)
@pytest.mark.parametrize(
'config, count, app_path, target, port',
[
@@ -214,10 +215,10 @@ def test_Bidirectional_IPv6_connectivity(Init_interface: bool, dut: Tuple[IdfDut
try:
assert ocf.is_joined_wifi_network(br)
cli_global_unicast_addr = ocf.get_global_unicast_addr(cli, br)
print('cli_global_unicast_addr', cli_global_unicast_addr)
logging.info(f'cli_global_unicast_addr {cli_global_unicast_addr}')
command = 'ping ' + str(cli_global_unicast_addr) + ' -c 10'
out_str = subprocess.getoutput(command)
print('ping result:\n', str(out_str))
logging.info(f'ping result:\n{out_str}')
role = re.findall(r' (\d+)%', str(out_str))[0]
assert role != '100'
interface_name = ocf.get_host_interface_name()
@@ -225,21 +226,23 @@ def test_Bidirectional_IPv6_connectivity(Init_interface: bool, dut: Tuple[IdfDut
out_bytes = subprocess.check_output(command, shell=True, timeout=5)
out_str = out_bytes.decode('utf-8')
onlinkprefix = ocf.get_onlinkprefix(br)
host_global_unicast_addr = re.findall(r'\W+(%s(?:\w+:){3}\w+)\W+' % onlinkprefix, str(out_str))
pattern = rf'\W+({onlinkprefix}(?:\w+:){{3}}\w+)\W+'
host_global_unicast_addr = re.findall(pattern, out_str)
rx_nums = 0
for ip_addr in host_global_unicast_addr:
txrx_nums = ocf.ot_ping(cli, str(ip_addr), count=5)
txrx_nums = ocf.ot_ping(cli, str(ip_addr), count=10)
rx_nums = rx_nums + int(txrx_nums[1])
logging.debug(f'rx_nums: {rx_nums}')
assert rx_nums != 0
finally:
ocf.execute_command(br, 'factoryreset')
ocf.execute_command(cli, 'factoryreset')
ocf.stop_thread(cli)
ocf.stop_thread(br)
time.sleep(3)
# Case 3: Multicast forwarding from Wi-Fi to Thread network
@pytest.mark.openthread_br
@pytest.mark.flaky(reruns=1, reruns_delay=1)
@pytest.mark.flaky(reruns=1, reruns_delay=5)
@pytest.mark.parametrize(
'config, count, app_path, target, port',
[
@@ -271,7 +274,7 @@ def test_multicast_forwarding_A(Init_interface: bool, dut: Tuple[IdfDut, IdfDut,
interface_name = ocf.get_host_interface_name()
command = 'ping -I ' + str(interface_name) + ' -t 64 ff04::125 -c 10'
out_str = subprocess.getoutput(command)
print('ping result:\n', str(out_str))
logging.info(f'ping result:\n{out_str}')
role = re.findall(r' (\d+)%', str(out_str))[0]
assert role != '100'
ocf.execute_command(cli, 'udp open')
@@ -285,14 +288,14 @@ def test_multicast_forwarding_A(Init_interface: bool, dut: Tuple[IdfDut, IdfDut,
ocf.execute_command(cli, 'udp close')
cli.expect('Done', timeout=5)
finally:
ocf.execute_command(br, 'factoryreset')
ocf.execute_command(cli, 'factoryreset')
ocf.stop_thread(cli)
ocf.stop_thread(br)
time.sleep(3)
# Case 4: Multicast forwarding from Thread to Wi-Fi network
@pytest.mark.openthread_br
@pytest.mark.flaky(reruns=1, reruns_delay=1)
@pytest.mark.flaky(reruns=1, reruns_delay=5)
@pytest.mark.parametrize(
'config, count, app_path, target, port',
[
@@ -338,15 +341,15 @@ def test_multicast_forwarding_B(Init_interface: bool, dut: Tuple[IdfDut, IdfDut,
while udp_mission.is_alive():
time.sleep(1)
finally:
ocf.execute_command(br, 'factoryreset')
ocf.execute_command(cli, 'factoryreset')
ocf.stop_thread(cli)
ocf.stop_thread(br)
time.sleep(3)
assert b'hello' in myudp.udp_bytes
# Case 5: discover dervice published by Thread device
@pytest.mark.openthread_br
@pytest.mark.flaky(reruns=1, reruns_delay=1)
@pytest.mark.flaky(reruns=1, reruns_delay=5)
@pytest.mark.parametrize(
'config, count, app_path, target, port',
[
@@ -377,14 +380,14 @@ def test_service_discovery_of_Thread_device(
assert ocf.is_joined_wifi_network(br)
command = 'avahi-browse -rt _testyyy._udp'
out_str = subprocess.getoutput(command)
print('avahi-browse:\n', str(out_str))
logging.info(f'avahi-browse:\n{out_str}')
assert 'myTest' not in str(out_str)
hostname = 'myTest'
command = 'srp client host name ' + hostname
ocf.execute_command(cli, command)
cli.expect('Done', timeout=5)
cli_global_unicast_addr = ocf.get_global_unicast_addr(cli, br)
print('cli_global_unicast_addr', cli_global_unicast_addr)
logging.info(f'cli_global_unicast_addr {cli_global_unicast_addr}')
command = 'srp client host address ' + str(cli_global_unicast_addr)
ocf.execute_command(cli, command)
cli.expect('Done', timeout=5)
@@ -397,17 +400,17 @@ def test_service_discovery_of_Thread_device(
ocf.wait(cli, 3)
command = 'avahi-browse -rt _testyyy._udp'
out_str = subprocess.getoutput(command)
print('avahi-browse:\n', str(out_str))
logging.info(f'avahi-browse:\n {out_str}')
assert 'myTest' in str(out_str)
finally:
ocf.execute_command(br, 'factoryreset')
ocf.execute_command(cli, 'factoryreset')
ocf.stop_thread(cli)
ocf.stop_thread(br)
time.sleep(3)
# Case 6: discover dervice published by Wi-Fi device
@pytest.mark.openthread_br
@pytest.mark.flaky(reruns=1, reruns_delay=1)
@pytest.mark.flaky(reruns=1, reruns_delay=5)
@pytest.mark.parametrize(
'config, count, app_path, target, port',
[
@@ -442,7 +445,7 @@ def test_service_discovery_of_WiFi_device(
cli.expect('Done', timeout=5)
ocf.wait(cli, 1)
domain_name = ocf.get_domain()
print('domain name is: ', domain_name)
logging.info(f'domain name is: {domain_name}')
command = 'dns resolve ' + domain_name + '.default.service.arpa.'
ocf.execute_command(cli, command)
@@ -470,14 +473,14 @@ def test_service_discovery_of_WiFi_device(
finally:
ocf.host_close_service()
sp.terminate()
ocf.execute_command(br, 'factoryreset')
ocf.execute_command(cli, 'factoryreset')
ocf.stop_thread(cli)
ocf.stop_thread(br)
time.sleep(3)
# Case 7: ICMP communication via NAT64
@pytest.mark.openthread_br
@pytest.mark.flaky(reruns=1, reruns_delay=1)
@pytest.mark.flaky(reruns=1, reruns_delay=5)
@pytest.mark.parametrize(
'config, count, app_path, target, port',
[
@@ -504,18 +507,18 @@ def test_ICMP_NAT64(Init_interface: bool, dut: Tuple[IdfDut, IdfDut, IdfDut]) ->
try:
assert ocf.is_joined_wifi_network(br)
host_ipv4_address = ocf.get_host_ipv4_address()
print('host_ipv4_address: ', host_ipv4_address)
logging.info(f'host_ipv4_address: {host_ipv4_address}')
rx_nums = ocf.ot_ping(cli, str(host_ipv4_address), count=5)[1]
assert rx_nums != 0
finally:
ocf.execute_command(br, 'factoryreset')
ocf.execute_command(cli, 'factoryreset')
ocf.stop_thread(cli)
ocf.stop_thread(br)
time.sleep(3)
# Case 8: UDP communication via NAT64
@pytest.mark.openthread_br
@pytest.mark.flaky(reruns=1, reruns_delay=1)
@pytest.mark.flaky(reruns=1, reruns_delay=5)
@pytest.mark.parametrize(
'config, count, app_path, target, port',
[
@@ -547,7 +550,7 @@ def test_UDP_NAT64(Init_interface: bool, dut: Tuple[IdfDut, IdfDut, IdfDut]) ->
cli.expect('Done', timeout=5)
ocf.wait(cli, 3)
host_ipv4_address = ocf.get_host_ipv4_address()
print('host_ipv4_address: ', host_ipv4_address)
logging.info(f'host_ipv4_address: {host_ipv4_address}')
myudp = ocf.udp_parameter('INET4', host_ipv4_address, 5090, '', False, 15.0, b'')
udp_mission = threading.Thread(target=ocf.create_host_udp_server, args=(myudp,))
udp_mission.start()
@@ -563,15 +566,15 @@ def test_UDP_NAT64(Init_interface: bool, dut: Tuple[IdfDut, IdfDut, IdfDut]) ->
while udp_mission.is_alive():
time.sleep(1)
finally:
ocf.execute_command(br, 'factoryreset')
ocf.execute_command(cli, 'factoryreset')
ocf.stop_thread(cli)
ocf.stop_thread(br)
time.sleep(3)
assert b'hello' in myudp.udp_bytes
# Case 9: TCP communication via NAT64
@pytest.mark.openthread_br
@pytest.mark.flaky(reruns=1, reruns_delay=1)
@pytest.mark.flaky(reruns=1, reruns_delay=5)
@pytest.mark.parametrize(
'config, count, app_path, target, port',
[
@@ -604,7 +607,7 @@ def test_TCP_NAT64(Init_interface: bool, dut: Tuple[IdfDut, IdfDut, IdfDut]) ->
ocf.wait(cli, 3)
host_ipv4_address = ocf.get_host_ipv4_address()
connect_address = ocf.get_ipv6_from_ipv4(host_ipv4_address, br)
print('connect_address is: ', connect_address)
logging.info(f'connect_address is: {connect_address}')
mytcp = ocf.tcp_parameter('INET4', host_ipv4_address, 12345, False, False, 15.0, b'')
tcp_mission = threading.Thread(target=ocf.create_host_tcp_server, args=(mytcp,))
tcp_mission.start()
@@ -625,14 +628,15 @@ def test_TCP_NAT64(Init_interface: bool, dut: Tuple[IdfDut, IdfDut, IdfDut]) ->
while tcp_mission.is_alive():
time.sleep(1)
finally:
ocf.execute_command(br, 'factoryreset')
ocf.execute_command(cli, 'factoryreset')
ocf.stop_thread(cli)
ocf.stop_thread(br)
time.sleep(3)
assert b'hello' in mytcp.tcp_bytes
# Case 10: Sleepy device test
@pytest.mark.openthread_sleep
@pytest.mark.flaky(reruns=1, reruns_delay=5)
@pytest.mark.parametrize(
'config, count, app_path, target, port',
[
@@ -660,6 +664,7 @@ def test_TCP_NAT64(Init_interface: bool, dut: Tuple[IdfDut, IdfDut, IdfDut]) ->
def test_ot_sleepy_device(dut: Tuple[IdfDut, IdfDut]) -> None:
leader = dut[0]
sleepy_device = dut[1]
ocf.hardreset_dut(sleepy_device)
fail_info = re.compile(r'Core\W*?\d\W*?register dump')
try:
ocf.init_thread(leader)
@@ -686,12 +691,13 @@ def test_ot_sleepy_device(dut: Tuple[IdfDut, IdfDut]) -> None:
assert not bool(fail_info.search(str(output)))
finally:
ocf.execute_command(leader, 'factoryreset')
ocf.hardreset_dut(sleepy_device)
time.sleep(3)
# Case 11: Basic startup Test of BR
@pytest.mark.openthread_br
@pytest.mark.flaky(reruns=1, reruns_delay=1)
@pytest.mark.flaky(reruns=1, reruns_delay=5)
@pytest.mark.parametrize(
'config, count, app_path, target, port',
[
@@ -725,13 +731,13 @@ def test_basic_startup(dut: Tuple[IdfDut, IdfDut]) -> None:
br.expect('Done', timeout=5)
assert ocf.wait_for_join(br, 'leader')
finally:
ocf.execute_command(br, 'factoryreset')
ocf.stop_thread(br)
time.sleep(3)
# Case 12: Curl a website via DNS and NAT64
@pytest.mark.openthread_bbr
@pytest.mark.flaky(reruns=1, reruns_delay=1)
@pytest.mark.flaky(reruns=1, reruns_delay=5)
@pytest.mark.parametrize(
'config, count, app_path, target, port',
[
@@ -756,23 +762,24 @@ def test_NAT64_DNS(Init_interface: bool, dut: Tuple[IdfDut, IdfDut, IdfDut]) ->
formBasicWiFiThreadNetwork(br, cli)
try:
ocf.wait_for_host_network()
ocf.execute_command(br, 'bbr')
br.expect('server16', timeout=5)
ocf.execute_command(cli, 'dns64server 8.8.8.8')
cli.expect('Done', timeout=5)
command = 'curl http://www.espressif.com'
message = ocf.get_ouput_string(cli, command, 10)
assert '<html>' in str(message)
assert 'html' in str(message)
assert '301 Moved Permanently' in str(message)
finally:
ocf.execute_command(br, 'factoryreset')
ocf.execute_command(cli, 'factoryreset')
ocf.stop_thread(cli)
ocf.stop_thread(br)
time.sleep(3)
# Case 13: Meshcop discovery of Border Router
@pytest.mark.openthread_br
@pytest.mark.flaky(reruns=1, reruns_delay=1)
@pytest.mark.flaky(reruns=1, reruns_delay=5)
@pytest.mark.parametrize(
'config, count, app_path, target, port',
[
@@ -813,9 +820,9 @@ def test_br_meshcop(Init_interface: bool, Init_avahi: bool, dut: Tuple[IdfDut, I
except subprocess.CalledProcessError as e:
output_bytes = e.stdout
finally:
print('out_bytes: ', output_bytes)
logging.info(f'out_bytes: {output_bytes!r}')
output_str = str(output_bytes)
print('out_str: ', output_str)
logging.info(f'out_str: {output_str}')
assert 'hostname = [esp-ot-br.local]' in str(output_str)
assert ('address = [' + ipv4_address + ']') in str(output_str)
@@ -826,13 +833,13 @@ def test_br_meshcop(Init_interface: bool, Init_avahi: bool, dut: Tuple[IdfDut, I
assert 'vn=OpenThread' in str(output_str)
assert 'rv=1' in str(output_str)
finally:
ocf.execute_command(br, 'factoryreset')
ocf.stop_thread(br)
time.sleep(3)
# Case 14: Curl a website over HTTPS via DNS and NAT64
@pytest.mark.openthread_bbr
@pytest.mark.flaky(reruns=1, reruns_delay=1)
@pytest.mark.flaky(reruns=1, reruns_delay=5)
@pytest.mark.parametrize(
'config, count, app_path, target, port',
[
@@ -857,21 +864,22 @@ def test_https_NAT64_DNS(Init_interface: bool, dut: Tuple[IdfDut, IdfDut, IdfDut
formBasicWiFiThreadNetwork(br, cli)
try:
ocf.wait_for_host_network()
ocf.execute_command(cli, 'dns64server 8.8.8.8')
cli.expect('Done', timeout=5)
command = 'curl https://www.example.com/'
message = ocf.get_ouput_string(cli, command, 20)
assert '<html>' in str(message)
assert 'This domain is for use in illustrative examples in documents' in str(message)
assert 'html' in str(message)
assert 'This domain is for use in' in str(message)
finally:
ocf.execute_command(br, 'factoryreset')
ocf.execute_command(cli, 'factoryreset')
ocf.stop_thread(cli)
ocf.stop_thread(br)
time.sleep(3)
# Case 15: Thread network formation and attaching with TREL
@pytest.mark.openthread_br
@pytest.mark.flaky(reruns=1, reruns_delay=1)
@pytest.mark.flaky(reruns=1, reruns_delay=5)
@pytest.mark.parametrize(
'config, count, app_path, target, port',
[
@@ -917,15 +925,15 @@ def test_trel_connect(dut: Tuple[IdfDut, IdfDut]) -> None:
rx_nums = ocf.ot_ping(trel_s3, trel_mleid_addr, count=10)[1]
assert rx_nums > 5
finally:
ocf.execute_command(trel_s3, 'factoryreset')
for trel in trel_list:
ocf.execute_command(trel, 'factoryreset')
ocf.stop_thread(trel)
ocf.stop_thread(trel_s3)
time.sleep(3)
# Case 16: Thread network BR lib check
@pytest.mark.openthread_br
@pytest.mark.flaky(reruns=1, reruns_delay=1)
@pytest.mark.flaky(reruns=1, reruns_delay=5)
@pytest.mark.parametrize(
'config, count, app_path, target, port',
[
@@ -954,6 +962,7 @@ def test_br_lib_check(dut: Tuple[IdfDut, IdfDut]) -> None:
# Case 17: SSED test
@pytest.mark.openthread_sleep
@pytest.mark.flaky(reruns=1, reruns_delay=5)
@pytest.mark.parametrize(
'config, count, app_path, target, port',
[
@@ -982,6 +991,7 @@ def test_ot_ssed_device(dut: Tuple[IdfDut, IdfDut]) -> None:
leader = dut[0]
ssed_device = dut[1]
try:
ocf.hardreset_dut(ssed_device)
# CI device must have external XTAL to run SSED case, we will check this here first
ssed_device.expect('32k XTAL in use', timeout=10)
ocf.init_thread(leader)
@@ -1021,4 +1031,5 @@ def test_ot_ssed_device(dut: Tuple[IdfDut, IdfDut]) -> None:
ocf.ping_and_check(dut=leader, target=ssed_address, tx_total=10, timeout=6)
finally:
ocf.execute_command(leader, 'factoryreset')
ocf.hardreset_dut(ssed_device)
time.sleep(3)