From 7a99020765c2e951e0244a28af8d74e8564aeab9 Mon Sep 17 00:00:00 2001 From: Xu Si Yu Date: Tue, 14 Oct 2025 17:34:04 +0800 Subject: [PATCH] feat(openthread): output logs of host for debugging CI issues --- examples/openthread/ot_ci_function.py | 71 ++++++++++++++------------- examples/openthread/pytest_otbr.py | 71 ++++++++++++++------------- 2 files changed, 75 insertions(+), 67 deletions(-) diff --git a/examples/openthread/ot_ci_function.py b/examples/openthread/ot_ci_function.py index 0c438e9f60..a661d5ed10 100644 --- a/examples/openthread/ot_ci_function.py +++ b/examples/openthread/ot_ci_function.py @@ -2,6 +2,7 @@ # SPDX-License-Identifier: Unlicense OR CC0-1.0 # !/usr/bin/env python3 # this file defines some functions for testing cli and br under pytest framework +import logging import os import re import socket @@ -29,7 +30,7 @@ def extract_address( try: result = dut.expect(pattern, timeout=5)[1].decode() except Exception as e: - print(f'Error: {e}') + logging.error(f'Error: {e}') return default_return return func(result) @@ -152,7 +153,7 @@ def getDeviceRole(dut: IdfDut) -> str: wait(dut, 1) execute_command(dut, 'state') role = dut.expect(r'\W+(\w+)\W+Done', timeout=5)[1].decode() - print(role) + logging.info(role) return str(role) @@ -311,7 +312,7 @@ def get_host_interface_name() -> str: interface_name = config.get('interface_name') if interface_name: if interface_name == 'eth0': - print( + logging.warning( f"Warning: 'eth0' is not recommended as a valid network interface. " f"Please check and update the 'interface_name' in the configuration file: " f'{config_path}' @@ -319,9 +320,9 @@ def get_host_interface_name() -> str: else: return str(interface_name) else: - print("Warning: Configuration file found but 'interface_name' is not defined.") + logging.warning("Warning: Configuration file found but 'interface_name' is not defined.") except Exception as e: - print(f'Error: Failed to read or parse {config_path}. Details: {e}') + logging.error(f'Error: Failed to read or parse {config_path}. Details: {e}') if 'eth1' in netifaces.interfaces(): return 'eth1' @@ -339,8 +340,8 @@ def check_if_host_receive_ra(br: IdfDut) -> bool: omrprefix = get_omrprefix(br) command = 'ip -6 route | grep ' + str(interface_name) out_str = subprocess.getoutput(command) - print('br omrprefix: ', str(omrprefix)) - print('host route table:\n', str(out_str)) + logging.info(f'br omrprefix: {omrprefix}') + logging.info(f'host route table:\n {out_str}') return str(omrprefix) in str(out_str) @@ -405,7 +406,7 @@ def create_host_udp_server(myudp: udp_parameter) -> None: AF_INET = socket.AF_INET6 else: AF_INET = socket.AF_INET - print('The host start to create udp server!') + logging.info('The host start to create udp server!') if_index = socket.if_nametoindex(interface_name) sock = socket.socket(AF_INET, socket.SOCK_DGRAM) sock.bind((myudp.addr, myudp.port)) @@ -418,13 +419,14 @@ def create_host_udp_server(myudp: udp_parameter) -> None: ) sock.settimeout(myudp.timeout) myudp.init_flag = True - print('The host start to receive message!') + logging.info('The host start to receive message!') myudp.udp_bytes = (sock.recvfrom(1024))[0] - print('The host has received message: ', myudp.udp_bytes) - except socket.error: - print('The host did not receive message!') + udp_str = str(myudp.udp_bytes) + logging.info(f'The host has received message: {udp_str}') + except OSError: + logging.error('The host did not receive message!') finally: - print('Close the socket.') + logging.info('Close the socket.') sock.close() @@ -439,10 +441,10 @@ def host_udp_send_message(udp_target: udp_parameter) -> None: sock.bind(('::', 12350)) sock.setsockopt(socket.SOL_SOCKET, socket.SO_BINDTODEVICE, interface_name.encode()) sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_HOPS, 32) - print('Host is sending message') + logging.info('Host is sending message') sock.sendto(udp_target.udp_bytes, (udp_target.addr, udp_target.port)) - except socket.error: - print('Host cannot send message') + except OSError: + logging.error('Host cannot send message') finally: sock.close() @@ -482,13 +484,13 @@ def host_close_service() -> None: command = 'ps auxww | grep avahi-publish-s' out_bytes = subprocess.check_output(command, shell=True, timeout=5) out_str = out_bytes.decode('utf-8') - print('host close service avahi status:\n', out_str) + logging.info(f'host close service avahi status:\n {out_str}') service_info = [line for line in out_str.splitlines() if 'testxxx _testxxx._udp' in line] for line in service_info: - print('Process:', line) + logging.info(f'Process:{line}') pid = line.split()[1] command = 'kill -9 ' + pid - print('kill ', pid) + logging.info(f'kill {pid}') subprocess.call(command, shell=True, timeout=5) time.sleep(1) @@ -521,24 +523,24 @@ def open_host_interface() -> None: def get_domain() -> str: hostname = socket.gethostname() - print('hostname is: ', hostname) + logging.info(f'hostname is: {hostname}') command = 'ps -auxww | grep avahi-daemon | grep running' out_str = subprocess.getoutput(command) - print('avahi status:\n', out_str) + logging.info(f'avahi status:\n {out_str}') role = re.findall(r'\[([\w\W]+)\.local\]', str(out_str))[0] - print('active host is: ', role) + logging.info(f'active host is: {role}') return str(role) def flush_ipv6_addr_by_interface() -> None: interface_name = get_host_interface_name() - print(f'flush ipv6 addr : {interface_name}') + logging.info(f'flush ipv6 addr : {interface_name}') command_show_addr = f'ip -6 addr show dev {interface_name}' command_show_route = f'ip -6 route show dev {interface_name}' addr_before = subprocess.getoutput(command_show_addr) route_before = subprocess.getoutput(command_show_route) - print(f'Before flush, IPv6 addresses: \n{addr_before}') - print(f'Before flush, IPv6 routes: \n{route_before}') + logging.info(f'Before flush, IPv6 addresses: \n{addr_before}') + logging.info(f'Before flush, IPv6 routes: \n{route_before}') subprocess.run(['ip', 'link', 'set', interface_name, 'down']) subprocess.run(['ip', '-6', 'addr', 'flush', 'dev', interface_name]) subprocess.run(['ip', '-6', 'route', 'flush', 'dev', interface_name]) @@ -546,8 +548,8 @@ def flush_ipv6_addr_by_interface() -> None: time.sleep(5) addr_after = subprocess.getoutput(command_show_addr) route_after = subprocess.getoutput(command_show_route) - print(f'After flush, IPv6 addresses: \n{addr_after}') - print(f'After flush, IPv6 routes: \n{route_after}') + logging.info(f'After flush, IPv6 addresses: \n{addr_after}') + logging.info(f'After flush, IPv6 routes: \n{route_after}') class tcp_parameter: @@ -576,28 +578,29 @@ def create_host_tcp_server(mytcp: tcp_parameter) -> None: AF_INET = socket.AF_INET6 else: AF_INET = socket.AF_INET - print('The host start to create a tcp server!') + logging.info('The host start to create a tcp server!') sock = socket.socket(AF_INET, socket.SOCK_STREAM) sock.bind((mytcp.addr, mytcp.port)) sock.listen(5) mytcp.listen_flag = True - print('The tcp server is waiting for connection!') + logging.info('The tcp server is waiting for connection!') sock.settimeout(mytcp.timeout) connfd, addr = sock.accept() - print('The tcp server connected with ', addr) + logging.info(f'The tcp server connected with {addr}') mytcp.recv_flag = True mytcp.tcp_bytes = connfd.recv(1024) - print('The tcp server has received message: ', mytcp.tcp_bytes) + tcp_str = str(mytcp.tcp_bytes) + logging.info(f'The tcp server has received message: {tcp_str}') except socket.error: if mytcp.recv_flag: - print('The tcp server did not receive message!') + logging.error('The tcp server did not receive message!') else: - print('The tcp server fail to connect!') + logging.error('The tcp server fail to connect!') finally: - print('Close the socket.') + logging.info('Close the socket.') sock.close() diff --git a/examples/openthread/pytest_otbr.py b/examples/openthread/pytest_otbr.py index 36b8801793..7c3374fb13 100644 --- a/examples/openthread/pytest_otbr.py +++ b/examples/openthread/pytest_otbr.py @@ -2,6 +2,7 @@ # SPDX-License-Identifier: Unlicense OR CC0-1.0 # !/usr/bin/env python3 import copy +import logging import os.path import random import re @@ -9,7 +10,6 @@ import secrets import subprocess import threading import time -from typing import Tuple import ot_ci_function as ocf import pexpect @@ -76,7 +76,7 @@ from pytest_embedded_idf.dut import IdfDut @pytest.fixture(scope='module', name='Init_avahi') def fixture_Init_avahi() -> bool: - print('Init Avahi') + logging.info('Init Avahi') ocf.start_avahi() time.sleep(10) return True @@ -84,7 +84,7 @@ def fixture_Init_avahi() -> bool: @pytest.fixture(name='Init_interface') def fixture_Init_interface() -> bool: - print('Init interface') + logging.info('Init interface') ocf.flush_ipv6_addr_by_interface() # The sleep time is set based on experience; reducing it might cause the host to be unready. time.sleep(30) @@ -136,7 +136,7 @@ PORT_MAPPING = { ], indirect=True, ) -def test_thread_connect(dut:Tuple[IdfDut, IdfDut, IdfDut]) -> None: +def test_thread_connect(dut: tuple[IdfDut, IdfDut, IdfDut]) -> None: br = dut[2] cli_h2 = dut[1] dut[0].serial.stop_redirect_thread() @@ -210,7 +210,7 @@ def formBasicWiFiThreadNetwork(br:IdfDut, cli:IdfDut) -> None: ], indirect=True, ) -def test_Bidirectional_IPv6_connectivity(Init_interface:bool, dut: Tuple[IdfDut, IdfDut, IdfDut]) -> None: +def test_Bidirectional_IPv6_connectivity(Init_interface: bool, dut: tuple[IdfDut, IdfDut, IdfDut]) -> None: br = dut[2] cli = dut[1] assert Init_interface @@ -220,10 +220,10 @@ def test_Bidirectional_IPv6_connectivity(Init_interface:bool, dut: Tuple[IdfDut, try: assert ocf.is_joined_wifi_network(br) cli_global_unicast_addr = ocf.get_global_unicast_addr(cli, br) - print('cli_global_unicast_addr', cli_global_unicast_addr) + logging.info(f'cli_global_unicast_addr {cli_global_unicast_addr}') command = 'ping ' + str(cli_global_unicast_addr) + ' -c 10' out_str = subprocess.getoutput(command) - print('ping result:\n', str(out_str)) + logging.info(f'ping result:\n{out_str}') role = re.findall(r' (\d+)%', str(out_str))[0] assert role != '100' interface_name = ocf.get_host_interface_name() @@ -231,7 +231,8 @@ def test_Bidirectional_IPv6_connectivity(Init_interface:bool, dut: Tuple[IdfDut, out_bytes = subprocess.check_output(command, shell=True, timeout=5) out_str = out_bytes.decode('utf-8') onlinkprefix = ocf.get_onlinkprefix(br) - host_global_unicast_addr = re.findall(r'\W+(%s(?:\w+:){3}\w+)\W+' % onlinkprefix, str(out_str)) + pattern = rf'\W+({onlinkprefix}(?:\w+:){{3}}\w+)\W+' + host_global_unicast_addr = re.findall(pattern, out_str) rx_nums = 0 for ip_addr in host_global_unicast_addr: txrx_nums = ocf.ot_ping(cli, str(ip_addr), count=5) @@ -263,7 +264,7 @@ def test_Bidirectional_IPv6_connectivity(Init_interface:bool, dut: Tuple[IdfDut, ], indirect=True, ) -def test_multicast_forwarding_A(Init_interface:bool, dut: Tuple[IdfDut, IdfDut, IdfDut]) -> None: +def test_multicast_forwarding_A(Init_interface: bool, dut: tuple[IdfDut, IdfDut, IdfDut]) -> None: br = dut[2] cli = dut[1] assert Init_interface @@ -278,7 +279,7 @@ def test_multicast_forwarding_A(Init_interface:bool, dut: Tuple[IdfDut, IdfDut, interface_name = ocf.get_host_interface_name() command = 'ping -I ' + str(interface_name) + ' -t 64 ff04::125 -c 10' out_str = subprocess.getoutput(command) - print('ping result:\n', str(out_str)) + logging.info(f'ping result:\n{out_str}') role = re.findall(r' (\d+)%', str(out_str))[0] assert role != '100' ocf.execute_command(cli, 'udp open') @@ -317,7 +318,7 @@ def test_multicast_forwarding_A(Init_interface:bool, dut: Tuple[IdfDut, IdfDut, ], indirect=True, ) -def test_multicast_forwarding_B(Init_interface:bool, dut: Tuple[IdfDut, IdfDut, IdfDut]) -> None: +def test_multicast_forwarding_B(Init_interface: bool, dut: tuple[IdfDut, IdfDut, IdfDut]) -> None: br = dut[2] cli = dut[1] assert Init_interface @@ -372,7 +373,9 @@ def test_multicast_forwarding_B(Init_interface:bool, dut: Tuple[IdfDut, IdfDut, ], indirect=True, ) -def test_service_discovery_of_Thread_device(Init_interface:bool, Init_avahi:bool, dut: Tuple[IdfDut, IdfDut, IdfDut]) -> None: +def test_service_discovery_of_Thread_device( + Init_interface: bool, Init_avahi: bool, dut: tuple[IdfDut, IdfDut, IdfDut] +) -> None: br = dut[2] cli = dut[1] assert Init_interface @@ -384,14 +387,14 @@ def test_service_discovery_of_Thread_device(Init_interface:bool, Init_avahi:bool assert ocf.is_joined_wifi_network(br) command = 'avahi-browse -rt _testyyy._udp' out_str = subprocess.getoutput(command) - print('avahi-browse:\n', str(out_str)) + logging.info(f'avahi-browse:\n{out_str}') assert 'myTest' not in str(out_str) hostname = 'myTest' command = 'srp client host name ' + hostname ocf.execute_command(cli, command) cli.expect('Done', timeout=5) cli_global_unicast_addr = ocf.get_global_unicast_addr(cli, br) - print('cli_global_unicast_addr', cli_global_unicast_addr) + logging.info(f'cli_global_unicast_addr {cli_global_unicast_addr}') command = 'srp client host address ' + str(cli_global_unicast_addr) ocf.execute_command(cli, command) cli.expect('Done', timeout=5) @@ -404,7 +407,7 @@ def test_service_discovery_of_Thread_device(Init_interface:bool, Init_avahi:bool ocf.wait(cli, 3) command = 'avahi-browse -rt _testyyy._udp' out_str = subprocess.getoutput(command) - print('avahi-browse:\n', str(out_str)) + logging.info(f'avahi-browse:\n {out_str}') assert 'myTest' in str(out_str) finally: ocf.execute_command(br, 'factoryreset') @@ -432,7 +435,9 @@ def test_service_discovery_of_Thread_device(Init_interface:bool, Init_avahi:bool ], indirect=True, ) -def test_service_discovery_of_WiFi_device(Init_interface:bool, Init_avahi:bool, dut: Tuple[IdfDut, IdfDut, IdfDut]) -> None: +def test_service_discovery_of_WiFi_device( + Init_interface: bool, Init_avahi: bool, dut: tuple[IdfDut, IdfDut, IdfDut] +) -> None: br = dut[2] cli = dut[1] assert Init_interface @@ -448,7 +453,7 @@ def test_service_discovery_of_WiFi_device(Init_interface:bool, Init_avahi:bool, cli.expect('Done', timeout=5) ocf.wait(cli, 1) domain_name = ocf.get_domain() - print('domain name is: ', domain_name) + logging.info(f'domain name is: {domain_name}') command = 'dns resolve ' + domain_name + '.default.service.arpa.' ocf.execute_command(cli, command) @@ -501,7 +506,7 @@ def test_service_discovery_of_WiFi_device(Init_interface:bool, Init_avahi:bool, ], indirect=True, ) -def test_ICMP_NAT64(Init_interface:bool, dut: Tuple[IdfDut, IdfDut, IdfDut]) -> None: +def test_ICMP_NAT64(Init_interface: bool, dut: tuple[IdfDut, IdfDut, IdfDut]) -> None: br = dut[2] cli = dut[1] assert Init_interface @@ -511,7 +516,7 @@ def test_ICMP_NAT64(Init_interface:bool, dut: Tuple[IdfDut, IdfDut, IdfDut]) -> try: assert ocf.is_joined_wifi_network(br) host_ipv4_address = ocf.get_host_ipv4_address() - print('host_ipv4_address: ', host_ipv4_address) + logging.info(f'host_ipv4_address: {host_ipv4_address}') rx_nums = ocf.ot_ping(cli, str(host_ipv4_address), count=5)[1] assert rx_nums != 0 finally: @@ -540,7 +545,7 @@ def test_ICMP_NAT64(Init_interface:bool, dut: Tuple[IdfDut, IdfDut, IdfDut]) -> ], indirect=True, ) -def test_UDP_NAT64(Init_interface:bool, dut: Tuple[IdfDut, IdfDut, IdfDut]) -> None: +def test_UDP_NAT64(Init_interface: bool, dut: tuple[IdfDut, IdfDut, IdfDut]) -> None: br = dut[2] cli = dut[1] assert Init_interface @@ -555,7 +560,7 @@ def test_UDP_NAT64(Init_interface:bool, dut: Tuple[IdfDut, IdfDut, IdfDut]) -> N cli.expect('Done', timeout=5) ocf.wait(cli, 3) host_ipv4_address = ocf.get_host_ipv4_address() - print('host_ipv4_address: ', host_ipv4_address) + logging.info(f'host_ipv4_address: {host_ipv4_address}') myudp = ocf.udp_parameter('INET4', host_ipv4_address, 5090, '', False, 15.0, b'') udp_mission = threading.Thread(target=ocf.create_host_udp_server, args=(myudp, )) udp_mission.start() @@ -597,7 +602,7 @@ def test_UDP_NAT64(Init_interface:bool, dut: Tuple[IdfDut, IdfDut, IdfDut]) -> N ], indirect=True, ) -def test_TCP_NAT64(Init_interface:bool, dut: Tuple[IdfDut, IdfDut, IdfDut]) -> None: +def test_TCP_NAT64(Init_interface: bool, dut: tuple[IdfDut, IdfDut, IdfDut]) -> None: br = dut[2] cli = dut[1] assert Init_interface @@ -613,7 +618,7 @@ def test_TCP_NAT64(Init_interface:bool, dut: Tuple[IdfDut, IdfDut, IdfDut]) -> N ocf.wait(cli, 3) host_ipv4_address = ocf.get_host_ipv4_address() connect_address = ocf.get_ipv6_from_ipv4(host_ipv4_address, br) - print('connect_address is: ', connect_address) + logging.info(f'connect_address is: {connect_address}') mytcp = ocf.tcp_parameter('INET4', host_ipv4_address, 12345, False, False, 15.0, b'') tcp_mission = threading.Thread(target=ocf.create_host_tcp_server, args=(mytcp, )) tcp_mission.start() @@ -668,7 +673,7 @@ def test_TCP_NAT64(Init_interface:bool, dut: Tuple[IdfDut, IdfDut, IdfDut]) -> N ], indirect=True, ) -def test_ot_sleepy_device(dut: Tuple[IdfDut, IdfDut]) -> None: +def test_ot_sleepy_device(dut: tuple[IdfDut, IdfDut]) -> None: leader = dut[0] sleepy_device = dut[1] fail_info = re.compile(r'Core\W*?\d\W*?register dump') @@ -719,7 +724,7 @@ def test_ot_sleepy_device(dut: Tuple[IdfDut, IdfDut]) -> None: ], indirect=True, ) -def test_basic_startup(dut: Tuple[IdfDut, IdfDut]) -> None: +def test_basic_startup(dut: tuple[IdfDut, IdfDut]) -> None: br = dut[1] dut[0].serial.stop_redirect_thread() try: @@ -762,7 +767,7 @@ def test_basic_startup(dut: Tuple[IdfDut, IdfDut]) -> None: ], indirect=True, ) -def test_NAT64_DNS(Init_interface:bool, dut: Tuple[IdfDut, IdfDut, IdfDut]) -> None: +def test_NAT64_DNS(Init_interface: bool, dut: tuple[IdfDut, IdfDut, IdfDut]) -> None: br = dut[2] cli = dut[1] assert Init_interface @@ -803,7 +808,7 @@ def test_NAT64_DNS(Init_interface:bool, dut: Tuple[IdfDut, IdfDut, IdfDut]) -> N ], indirect=True, ) -def test_br_meshcop(Init_interface:bool, Init_avahi:bool, dut: Tuple[IdfDut, IdfDut]) -> None: +def test_br_meshcop(Init_interface: bool, Init_avahi: bool, dut: tuple[IdfDut, IdfDut]) -> None: br = dut[1] assert Init_interface assert Init_avahi @@ -829,9 +834,9 @@ def test_br_meshcop(Init_interface:bool, Init_avahi:bool, dut: Tuple[IdfDut, Idf except subprocess.CalledProcessError as e: output_bytes = e.stdout finally: - print('out_bytes: ', output_bytes) + logging.info(f'out_bytes: {output_bytes!r}') output_str = str(output_bytes) - print('out_str: ', output_str) + logging.info(f'out_str: {output_str}') assert 'hostname = [esp-ot-br.local]' in str(output_str) assert ('address = [' + ipv4_address + ']') in str(output_str) @@ -866,7 +871,7 @@ def test_br_meshcop(Init_interface:bool, Init_avahi:bool, dut: Tuple[IdfDut, Idf ], indirect=True, ) -def test_https_NAT64_DNS(Init_interface:bool, dut: Tuple[IdfDut, IdfDut, IdfDut]) -> None: +def test_https_NAT64_DNS(Init_interface: bool, dut: tuple[IdfDut, IdfDut, IdfDut]) -> None: br = dut[2] cli = dut[1] assert Init_interface @@ -905,7 +910,7 @@ def test_https_NAT64_DNS(Init_interface:bool, dut: Tuple[IdfDut, IdfDut, IdfDut] ], indirect=True, ) -def test_trel_connect(dut: Tuple[IdfDut, IdfDut]) -> None: +def test_trel_connect(dut: tuple[IdfDut, IdfDut]) -> None: trel_s3 = dut[1] trel_c6 = dut[0] trel_list = [trel_c6] @@ -960,7 +965,7 @@ def test_trel_connect(dut: Tuple[IdfDut, IdfDut]) -> None: ], indirect=True, ) -def test_br_lib_check(dut: Tuple[IdfDut, IdfDut]) -> None: +def test_br_lib_check(dut: tuple[IdfDut, IdfDut]) -> None: br = dut[1] dut[0].serial.stop_redirect_thread() try: @@ -998,7 +1003,7 @@ def test_br_lib_check(dut: Tuple[IdfDut, IdfDut]) -> None: ], indirect=True, ) -def test_ot_ssed_device(dut: Tuple[IdfDut, IdfDut]) -> None: +def test_ot_ssed_device(dut: tuple[IdfDut, IdfDut]) -> None: leader = dut[0] ssed_device = dut[1] try: