From 643b7c93c1109f2f10520bd37ab6365cdabef729 Mon Sep 17 00:00:00 2001 From: Ondrej Kosta Date: Thu, 20 Nov 2025 14:40:12 +0100 Subject: [PATCH] feat(ci): updated Ethernet tests to align with new runners --- .../esp_eth/test_apps/main/esp_eth_test_l2.c | 2 +- .../esp_eth/test_apps/pytest_esp_eth.py | 24 ++--- .../network/bridge/pytest_example_bridge.py | 96 +++++++++++++------ .../l2tap/pytest_example_l2tap_echo.py | 38 +++++--- .../udp_multicast/pytest_udp_multicast.py | 20 ++-- 5 files changed, 116 insertions(+), 64 deletions(-) diff --git a/components/esp_eth/test_apps/main/esp_eth_test_l2.c b/components/esp_eth/test_apps/main/esp_eth_test_l2.c index 19d09b81b9..6af0f19229 100644 --- a/components/esp_eth/test_apps/main/esp_eth_test_l2.c +++ b/components/esp_eth/test_apps/main/esp_eth_test_l2.c @@ -637,7 +637,7 @@ TEST_CASE("heap utilization", "[ethernet_l2]") } #define FORMAT_MAC(mac_addr, a, b, c, d, e, f) do { mac_addr[0] = a; mac_addr[1] = b; mac_addr[2] = c; mac_addr[3] = d; mac_addr[4] = e; mac_addr[5] = f; } while(0) -TEST_CASE("w5500_multicast_filter", "[ethernet_l2]") +TEST_CASE("multicast_filter", "[ethernet_l2]") { esp_eth_mac_t *mac = mac_init(NULL, NULL); TEST_ASSERT_NOT_NULL(mac); diff --git a/components/esp_eth/test_apps/pytest_esp_eth.py b/components/esp_eth/test_apps/pytest_esp_eth.py index bfcb178b69..bca920dd0d 100644 --- a/components/esp_eth/test_apps/pytest_esp_eth.py +++ b/components/esp_eth/test_apps/pytest_esp_eth.py @@ -31,16 +31,18 @@ class EthTestIntf: netifs.sort(reverse=True) logging.info('detected interfaces: %s', str(netifs)) - for netif in netifs: - # if no interface defined, try to find it automatically - if my_if == '': - if netif.find('eth') == 0 or netif.find('enp') == 0 or netif.find('eno') == 0: - self.target_if = netif - break + if my_if == '': + if 'dut_p1' in netifs: + self.target_if = 'dut_p1' else: - if netif.find(my_if) == 0: - self.target_if = my_if - break + for netif in netifs: + # if no interface defined, try to find it automatically + if netif.find('eth') == 0 or netif.find('enp') == 0 or netif.find('eno') == 0: + self.target_if = netif + break + elif my_if in netifs: + self.target_if = my_if + if self.target_if == '': raise RuntimeError('network interface not found') logging.info('Use %s for testing', self.target_if) @@ -132,8 +134,8 @@ def ethernet_int_emac_test(dut: IdfDut) -> None: dut.run_all_single_board_cases(group='esp_emac', timeout=240) -def ethernet_l2_test(dut: IdfDut) -> None: - target_if = EthTestIntf(ETH_TYPE) +def ethernet_l2_test(dut: IdfDut, test_if: str = '') -> None: + target_if = EthTestIntf(ETH_TYPE, test_if) dut.expect_exact('Press ENTER to see the list of tests') dut.write('\n') diff --git a/examples/network/bridge/pytest_example_bridge.py b/examples/network/bridge/pytest_example_bridge.py index aaffdb7ebc..24f7dfcc0e 100644 --- a/examples/network/bridge/pytest_example_bridge.py +++ b/examples/network/bridge/pytest_example_bridge.py @@ -9,8 +9,10 @@ import re import socket import subprocess import time +from collections.abc import Generator from concurrent.futures import Future from concurrent.futures import ThreadPoolExecutor +from re import Match import netifaces import paramiko # type: ignore @@ -273,28 +275,38 @@ def send_brcast_msg_endnode_to_host(endnode: EndnodeSsh, host_brcast_ip: str, te return nc_host_out -@pytest.mark.eth_w5500 -@pytest.mark.parametrize( - 'config', - [ - 'w5500', - ], - indirect=True, -) -@idf_parametrize('target', ['esp32'], indirect=['target']) -def test_esp_eth_bridge(dut: Dut, dev_user: str, dev_password: str) -> None: +def get_legacy_host_name_match() -> Match[str] | None: + host_name = socket.gethostname() + regex = r'ethVM-(\d+)-(\d+)' + host_name_match = re.search(regex, host_name, re.DOTALL) + return host_name_match + + +def get_host_info() -> tuple[int, int]: + # Get switch configuration info from the hostname (legacy runners) + sw_info = get_legacy_host_name_match() + if sw_info is not None: + sw_num = int(sw_info.group(1)) + port_num = int(sw_info.group(2)) + return sw_num, port_num + else: + # Get switch configuration info from the IP address of the `switch` interface (new runners) + switch_if_ip = get_host_ip_by_interface('switch', netifaces.AF_INET) + # Parse IP address: x.y.. + ip_parts = switch_if_ip.split('.') + if len(ip_parts) == 4: + sw_num = int(ip_parts[2]) + port_num = int(ip_parts[3]) + return sw_num, port_num + else: + raise RuntimeError('Unexpected switch IP address') + + +def eth_bridge_test(dut: Dut, dev_user: str, dev_password: str) -> None: # ------------------------------ # # Pre-test testbed configuration # # ------------------------------ # - # Get switch configuration info from the hostname - host_name = socket.gethostname() - regex = r'ethVM-(\d+)-(\d+)' - sw_info = re.search(regex, host_name, re.DOTALL) - if sw_info is None: - raise RuntimeError('Unexpected hostname') - - sw_num = int(sw_info.group(1)) - port_num = int(sw_info.group(2)) + sw_num, port_num = get_host_info() port_num_endnode = int(port_num) + 1 # endnode address is always + 1 to the host endnode = EndnodeSsh(f'10.10.{sw_num}.{port_num_endnode}', ETHVM_ENDNODE_USER) @@ -330,7 +342,10 @@ def test_esp_eth_bridge(dut: Dut, dev_user: str, dev_password: str) -> None: host_ip = get_host_ip_by_interface(host_if, netifaces.AF_INET) logging.info('Host IP %s', host_ip) - endnode_if = host_if # endnode is a clone of the host + if get_legacy_host_name_match() is not None: + endnode_if = host_if # endnode is a clone of the host (legacy runners) + else: + endnode_if = 'dut_p2' # interface name connected to the second port of the DUT (new runners) # Endnode MAC endnode_mac = get_endnode_mac_by_interface(endnode, endnode_if) logging.info('Endnode MAC %s', endnode_mac) @@ -352,12 +367,12 @@ def test_esp_eth_bridge(dut: Dut, dev_user: str, dev_password: str) -> None: # TEST Objective 1: Ping the devices on the network # -------------------------------------------------- # ping bridge - ping_test = subprocess.call(f'ping {br_ip} -c 2', shell=True) + ping_test = subprocess.call(['ping', br_ip, '-c', '2']) if ping_test != 0: raise RuntimeError('ESP bridge is not reachable') # ping the end nodes of the network - ping_test = subprocess.call(f'ping {endnode_ip} -c 2', shell=True) + ping_test = subprocess.call(['ping', endnode_ip, '-c', '2']) if ping_test != 0: raise RuntimeError('End node is not reachable') @@ -526,13 +541,13 @@ def test_esp_eth_bridge(dut: Dut, dev_user: str, dev_password: str) -> None: logging.info('Drop `Endnode` MAC') dut.write('add --addr=' + endnode_mac + ' -d') dut.expect_exact('Bridge Config OK!') - ping_test = subprocess.call(f'ping {endnode_ip} -c 2', shell=True) + ping_test = subprocess.call(['ping', endnode_ip, '-c', '2']) if ping_test == 0: raise RuntimeError('End node should not be reachable') logging.info('Remove Drop `Endnode` MAC entry') dut.write('remove --addr=' + endnode_mac) dut.expect_exact('Bridge Config OK!') - ping_test = subprocess.call(f'ping {endnode_ip} -c 2', shell=True) + ping_test = subprocess.call(['ping', endnode_ip, '-c', '2']) if ping_test != 0: raise RuntimeError('End node is not reachable') @@ -565,9 +580,9 @@ def test_esp_eth_bridge(dut: Dut, dev_user: str, dev_password: str) -> None: # Remove ARP record from Test host computer. ARP is broadcasted, hence Bridge port does not reply to a request since # it does not receive it (no forward to Bridge port). As a result, Bridge is not pingable. - subprocess.call(f'sudo arp -d {br_ip}', shell=True) - subprocess.call('arp -a', shell=True) - ping_test = subprocess.call(f'ping {br_ip} -c 2', shell=True) + subprocess.call(['sudo', 'arp', '-d', br_ip]) + subprocess.call(['arp', '-a']) + ping_test = subprocess.call(['ping', br_ip, '-c', '2']) if ping_test == 0: raise RuntimeError('Bridge should not be reachable') @@ -577,7 +592,7 @@ def test_esp_eth_bridge(dut: Dut, dev_user: str, dev_password: str) -> None: dut.expect_exact('Bridge Config OK!') dut.write('add --addr=ff:ff:ff:ff:ff:ff -p 1 -c') dut.expect_exact('Bridge Config OK!') - ping_test = subprocess.call(f'ping {br_ip} -c 2', shell=True) + ping_test = subprocess.call(['ping', br_ip, '-c', '2']) if ping_test != 0: raise RuntimeError('Bridge is not reachable') @@ -588,3 +603,28 @@ def test_esp_eth_bridge(dut: Dut, dev_user: str, dev_password: str) -> None: endnode.close() switch1.close() + + +@pytest.fixture(scope='session', autouse=True) +def setup_test_environment() -> Generator[None, None, None]: + # Fixture code to run before any tests in the session + # make sure dut_p2 is down (only for new runners) + if get_legacy_host_name_match() is None: + subprocess.call(['sudo', 'ip', 'link', 'set', 'down', 'dev', 'dut_p2']) + + yield # Tests run here + + # Optional teardown after all tests... + + +@pytest.mark.eth_w5500 +@pytest.mark.parametrize( + 'config', + [ + 'w5500', + ], + indirect=True, +) +@idf_parametrize('target', ['esp32'], indirect=['target']) +def test_esp_eth_bridge(dut: Dut, dev_user: str, dev_password: str) -> None: + eth_bridge_test(dut, dev_user, dev_password) diff --git a/examples/protocols/l2tap/pytest_example_l2tap_echo.py b/examples/protocols/l2tap/pytest_example_l2tap_echo.py index eca3057aa3..d0f9feeac4 100644 --- a/examples/protocols/l2tap/pytest_example_l2tap_echo.py +++ b/examples/protocols/l2tap/pytest_example_l2tap_echo.py @@ -20,18 +20,26 @@ ETH_TYPE_3 = 0x2223 @contextlib.contextmanager def configure_eth_if(eth_type: int, target_if: str = '') -> Iterator[socket.socket]: + # try to determine which interface to use + netifs = os.listdir('/sys/class/net/') + # order matters - ETH NIC with the highest number is connected to DUT on CI runner + netifs.sort(reverse=True) + logging.info('detected interfaces: %s', str(netifs)) + if target_if == '': - # try to determine which interface to use - netifs = os.listdir('/sys/class/net/') - # order matters - ETH NIC with the highest number is connected to DUT on CI runner - netifs.sort(reverse=True) - logging.info('detected interfaces: %s', str(netifs)) - for netif in netifs: - if netif.find('eth') == 0 or netif.find('enx') == 0 or netif.find('enp') == 0 or netif.find('eno') == 0: - target_if = netif - break - if target_if == '': - raise Exception('no network interface found') + if 'dut_p1' in netifs: + target_if = 'dut_p1' + else: + for netif in netifs: + # if no interface defined, try to find it automatically + if netif.find('eth') == 0 or netif.find('enp') == 0 or netif.find('eno') == 0: + target_if = netif + break + elif target_if not in netifs: + target_if = '' + + if target_if == '': + raise RuntimeError('network interface not found') logging.info('Use %s for testing', target_if) so = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.htons(eth_type)) @@ -76,7 +84,7 @@ def recv_eth_frame(eth_type: int, eth_if: str = '') -> str: return str(eth_frame.load.decode().rstrip('\x00')) -def actual_test(dut: Dut) -> None: +def actual_test(dut: Dut, test_if: str = '') -> None: # Get DUT's MAC address res = dut.expect( r'([\s\S]*)' @@ -85,17 +93,17 @@ def actual_test(dut: Dut) -> None: dut_mac = res.group(2) # Receive "ESP32 Hello frame" - recv_eth_frame(ETH_TYPE_3) + recv_eth_frame(ETH_TYPE_3, test_if) # Sent a message and receive its echo message = 'ESP32 test message with EthType ' + hex(ETH_TYPE_1) - echoed = send_recv_eth_frame(message, ETH_TYPE_1, dut_mac) + echoed = send_recv_eth_frame(message, ETH_TYPE_1, dut_mac, test_if) if echoed == message: logging.info('PASS') else: raise Exception('Echoed message does not match!') message = 'ESP32 test message with EthType ' + hex(ETH_TYPE_2) - echoed = send_recv_eth_frame(message, ETH_TYPE_2, dut_mac) + echoed = send_recv_eth_frame(message, ETH_TYPE_2, dut_mac, test_if) if echoed == message: logging.info('PASS') else: diff --git a/examples/protocols/sockets/udp_multicast/pytest_udp_multicast.py b/examples/protocols/sockets/udp_multicast/pytest_udp_multicast.py index bfe7e70725..2f7a14bdf1 100644 --- a/examples/protocols/sockets/udp_multicast/pytest_udp_multicast.py +++ b/examples/protocols/sockets/udp_multicast/pytest_udp_multicast.py @@ -36,14 +36,16 @@ def find_target_if(my_if: str = '') -> str: netifs.sort(reverse=True) logging.info('detected interfaces: %s', str(netifs)) - for netif in netifs: - # if no interface defined, try to find it automatically - if my_if == '': - if netif.find('eth') == 0 or netif.find('enp') == 0 or netif.find('eno') == 0: - return netif + if my_if == '': + if 'dut_p1' in netifs: + return 'dut_p1' else: - if netif.find(my_if) == 0: - return my_if + for netif in netifs: + # if no interface defined, try to find it automatically + if netif.find('eth') == 0 or netif.find('enp') == 0 or netif.find('eno') == 0: + return netif + elif my_if in netifs: + return my_if raise RuntimeError('network interface not found') @@ -135,7 +137,7 @@ def test_examples_udp_multicast_proto(dut: Dut, ip_version: str = 'ipv4', nic: s try: data, recv_addr = sock.recvfrom(1024) logging.info(f'Received {len(data)} bytes from {recv_addr}') - except socket.timeout: + except TimeoutError: raise RuntimeError(f'Timeout waiting for {ip_version} multicast message from ESP32') # Check if received from expected source @@ -143,7 +145,7 @@ def test_examples_udp_multicast_proto(dut: Dut, ip_version: str = 'ipv4', nic: s raise RuntimeError(f'Received {ip_version} multicast message from unexpected source') # Send multicast message - message = '!!! Multicast test message from host !!!'.encode() + message = b'!!! Multicast test message from host !!!' logging.info(f'Sending {ip_version} multicast message to {multicast_addr}:{PORT}') sock.sendto(message, (multicast_addr, PORT)) if ip_version == 'ipv4_mapped':