feat(ci): updated Ethernet tests to align with new runners

This commit is contained in:
Ondrej Kosta
2025-11-20 14:40:12 +01:00
parent 1e6f49d178
commit 4e6b35ca9e
5 changed files with 116 additions and 64 deletions
@@ -637,7 +637,7 @@ TEST_CASE("heap utilization", "[ethernet_l2]")
}
#define FORMAT_MAC(mac_addr, a, b, c, d, e, f) do { mac_addr[0] = a; mac_addr[1] = b; mac_addr[2] = c; mac_addr[3] = d; mac_addr[4] = e; mac_addr[5] = f; } while(0)
TEST_CASE("w5500_multicast_filter", "[ethernet_l2]")
TEST_CASE("multicast_filter", "[ethernet_l2]")
{
esp_eth_mac_t *mac = mac_init(NULL, NULL);
TEST_ASSERT_NOT_NULL(mac);
+13 -11
View File
@@ -31,16 +31,18 @@ class EthTestIntf:
netifs.sort(reverse=True)
logging.info('detected interfaces: %s', str(netifs))
for netif in netifs:
# if no interface defined, try to find it automatically
if my_if == '':
if netif.find('eth') == 0 or netif.find('enp') == 0 or netif.find('eno') == 0:
self.target_if = netif
break
if my_if == '':
if 'dut_p1' in netifs:
self.target_if = 'dut_p1'
else:
if netif.find(my_if) == 0:
self.target_if = my_if
break
for netif in netifs:
# if no interface defined, try to find it automatically
if netif.find('eth') == 0 or netif.find('enp') == 0 or netif.find('eno') == 0:
self.target_if = netif
break
elif my_if in netifs:
self.target_if = my_if
if self.target_if == '':
raise RuntimeError('network interface not found')
logging.info('Use %s for testing', self.target_if)
@@ -132,8 +134,8 @@ def ethernet_int_emac_test(dut: IdfDut) -> None:
dut.run_all_single_board_cases(group='esp_emac', timeout=240)
def ethernet_l2_test(dut: IdfDut) -> None:
target_if = EthTestIntf(ETH_TYPE)
def ethernet_l2_test(dut: IdfDut, test_if: str = '') -> None:
target_if = EthTestIntf(ETH_TYPE, test_if)
dut.expect_exact('Press ENTER to see the list of tests')
dut.write('\n')
@@ -9,8 +9,10 @@ import re
import socket
import subprocess
import time
from collections.abc import Generator
from concurrent.futures import Future
from concurrent.futures import ThreadPoolExecutor
from re import Match
import netifaces
import paramiko # type: ignore
@@ -273,28 +275,38 @@ def send_brcast_msg_endnode_to_host(endnode: EndnodeSsh, host_brcast_ip: str, te
return nc_host_out
@pytest.mark.eth_w5500
@pytest.mark.parametrize(
'config',
[
'w5500',
],
indirect=True,
)
@idf_parametrize('target', ['esp32'], indirect=['target'])
def test_esp_eth_bridge(dut: Dut, dev_user: str, dev_password: str) -> None:
def get_legacy_host_name_match() -> Match[str] | None:
host_name = socket.gethostname()
regex = r'ethVM-(\d+)-(\d+)'
host_name_match = re.search(regex, host_name, re.DOTALL)
return host_name_match
def get_host_info() -> tuple[int, int]:
# Get switch configuration info from the hostname (legacy runners)
sw_info = get_legacy_host_name_match()
if sw_info is not None:
sw_num = int(sw_info.group(1))
port_num = int(sw_info.group(2))
return sw_num, port_num
else:
# Get switch configuration info from the IP address of the `switch` interface (new runners)
switch_if_ip = get_host_ip_by_interface('switch', netifaces.AF_INET)
# Parse IP address: x.y.<sw_num>.<port_num>
ip_parts = switch_if_ip.split('.')
if len(ip_parts) == 4:
sw_num = int(ip_parts[2])
port_num = int(ip_parts[3])
return sw_num, port_num
else:
raise RuntimeError('Unexpected switch IP address')
def eth_bridge_test(dut: Dut, dev_user: str, dev_password: str) -> None:
# ------------------------------ #
# Pre-test testbed configuration #
# ------------------------------ #
# Get switch configuration info from the hostname
host_name = socket.gethostname()
regex = r'ethVM-(\d+)-(\d+)'
sw_info = re.search(regex, host_name, re.DOTALL)
if sw_info is None:
raise RuntimeError('Unexpected hostname')
sw_num = int(sw_info.group(1))
port_num = int(sw_info.group(2))
sw_num, port_num = get_host_info()
port_num_endnode = int(port_num) + 1 # endnode address is always + 1 to the host
endnode = EndnodeSsh(f'10.10.{sw_num}.{port_num_endnode}', ETHVM_ENDNODE_USER)
@@ -330,7 +342,10 @@ def test_esp_eth_bridge(dut: Dut, dev_user: str, dev_password: str) -> None:
host_ip = get_host_ip_by_interface(host_if, netifaces.AF_INET)
logging.info('Host IP %s', host_ip)
endnode_if = host_if # endnode is a clone of the host
if get_legacy_host_name_match() is not None:
endnode_if = host_if # endnode is a clone of the host (legacy runners)
else:
endnode_if = 'dut_p2' # interface name connected to the second port of the DUT (new runners)
# Endnode MAC
endnode_mac = get_endnode_mac_by_interface(endnode, endnode_if)
logging.info('Endnode MAC %s', endnode_mac)
@@ -352,12 +367,12 @@ def test_esp_eth_bridge(dut: Dut, dev_user: str, dev_password: str) -> None:
# TEST Objective 1: Ping the devices on the network
# --------------------------------------------------
# ping bridge
ping_test = subprocess.call(f'ping {br_ip} -c 2', shell=True)
ping_test = subprocess.call(['ping', br_ip, '-c', '2'])
if ping_test != 0:
raise RuntimeError('ESP bridge is not reachable')
# ping the end nodes of the network
ping_test = subprocess.call(f'ping {endnode_ip} -c 2', shell=True)
ping_test = subprocess.call(['ping', endnode_ip, '-c', '2'])
if ping_test != 0:
raise RuntimeError('End node is not reachable')
@@ -526,13 +541,13 @@ def test_esp_eth_bridge(dut: Dut, dev_user: str, dev_password: str) -> None:
logging.info('Drop `Endnode` MAC')
dut.write('add --addr=' + endnode_mac + ' -d')
dut.expect_exact('Bridge Config OK!')
ping_test = subprocess.call(f'ping {endnode_ip} -c 2', shell=True)
ping_test = subprocess.call(['ping', endnode_ip, '-c', '2'])
if ping_test == 0:
raise RuntimeError('End node should not be reachable')
logging.info('Remove Drop `Endnode` MAC entry')
dut.write('remove --addr=' + endnode_mac)
dut.expect_exact('Bridge Config OK!')
ping_test = subprocess.call(f'ping {endnode_ip} -c 2', shell=True)
ping_test = subprocess.call(['ping', endnode_ip, '-c', '2'])
if ping_test != 0:
raise RuntimeError('End node is not reachable')
@@ -565,9 +580,9 @@ def test_esp_eth_bridge(dut: Dut, dev_user: str, dev_password: str) -> None:
# Remove ARP record from Test host computer. ARP is broadcasted, hence Bridge port does not reply to a request since
# it does not receive it (no forward to Bridge port). As a result, Bridge is not pingable.
subprocess.call(f'sudo arp -d {br_ip}', shell=True)
subprocess.call('arp -a', shell=True)
ping_test = subprocess.call(f'ping {br_ip} -c 2', shell=True)
subprocess.call(['sudo', 'arp', '-d', br_ip])
subprocess.call(['arp', '-a'])
ping_test = subprocess.call(['ping', br_ip, '-c', '2'])
if ping_test == 0:
raise RuntimeError('Bridge should not be reachable')
@@ -577,7 +592,7 @@ def test_esp_eth_bridge(dut: Dut, dev_user: str, dev_password: str) -> None:
dut.expect_exact('Bridge Config OK!')
dut.write('add --addr=ff:ff:ff:ff:ff:ff -p 1 -c')
dut.expect_exact('Bridge Config OK!')
ping_test = subprocess.call(f'ping {br_ip} -c 2', shell=True)
ping_test = subprocess.call(['ping', br_ip, '-c', '2'])
if ping_test != 0:
raise RuntimeError('Bridge is not reachable')
@@ -588,3 +603,28 @@ def test_esp_eth_bridge(dut: Dut, dev_user: str, dev_password: str) -> None:
endnode.close()
switch1.close()
@pytest.fixture(scope='session', autouse=True)
def setup_test_environment() -> Generator[None, None, None]:
# Fixture code to run before any tests in the session
# make sure dut_p2 is down (only for new runners)
if get_legacy_host_name_match() is None:
subprocess.call(['sudo', 'ip', 'link', 'set', 'down', 'dev', 'dut_p2'])
yield # Tests run here
# Optional teardown after all tests...
@pytest.mark.eth_w5500
@pytest.mark.parametrize(
'config',
[
'w5500',
],
indirect=True,
)
@idf_parametrize('target', ['esp32'], indirect=['target'])
def test_esp_eth_bridge(dut: Dut, dev_user: str, dev_password: str) -> None:
eth_bridge_test(dut, dev_user, dev_password)
@@ -20,18 +20,26 @@ ETH_TYPE_3 = 0x2223
@contextlib.contextmanager
def configure_eth_if(eth_type: int, target_if: str = '') -> Iterator[socket.socket]:
# try to determine which interface to use
netifs = os.listdir('/sys/class/net/')
# order matters - ETH NIC with the highest number is connected to DUT on CI runner
netifs.sort(reverse=True)
logging.info('detected interfaces: %s', str(netifs))
if target_if == '':
# try to determine which interface to use
netifs = os.listdir('/sys/class/net/')
# order matters - ETH NIC with the highest number is connected to DUT on CI runner
netifs.sort(reverse=True)
logging.info('detected interfaces: %s', str(netifs))
for netif in netifs:
if netif.find('eth') == 0 or netif.find('enx') == 0 or netif.find('enp') == 0 or netif.find('eno') == 0:
target_if = netif
break
if target_if == '':
raise Exception('no network interface found')
if 'dut_p1' in netifs:
target_if = 'dut_p1'
else:
for netif in netifs:
# if no interface defined, try to find it automatically
if netif.find('eth') == 0 or netif.find('enp') == 0 or netif.find('eno') == 0:
target_if = netif
break
elif target_if not in netifs:
target_if = ''
if target_if == '':
raise RuntimeError('network interface not found')
logging.info('Use %s for testing', target_if)
so = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.htons(eth_type))
@@ -76,7 +84,7 @@ def recv_eth_frame(eth_type: int, eth_if: str = '') -> str:
return str(eth_frame.load.decode().rstrip('\x00'))
def actual_test(dut: Dut) -> None:
def actual_test(dut: Dut, test_if: str = '') -> None:
# Get DUT's MAC address
res = dut.expect(
r'([\s\S]*)'
@@ -85,17 +93,17 @@ def actual_test(dut: Dut) -> None:
dut_mac = res.group(2)
# Receive "ESP32 Hello frame"
recv_eth_frame(ETH_TYPE_3)
recv_eth_frame(ETH_TYPE_3, test_if)
# Sent a message and receive its echo
message = 'ESP32 test message with EthType ' + hex(ETH_TYPE_1)
echoed = send_recv_eth_frame(message, ETH_TYPE_1, dut_mac)
echoed = send_recv_eth_frame(message, ETH_TYPE_1, dut_mac, test_if)
if echoed == message:
logging.info('PASS')
else:
raise Exception('Echoed message does not match!')
message = 'ESP32 test message with EthType ' + hex(ETH_TYPE_2)
echoed = send_recv_eth_frame(message, ETH_TYPE_2, dut_mac)
echoed = send_recv_eth_frame(message, ETH_TYPE_2, dut_mac, test_if)
if echoed == message:
logging.info('PASS')
else:
@@ -36,14 +36,16 @@ def find_target_if(my_if: str = '') -> str:
netifs.sort(reverse=True)
logging.info('detected interfaces: %s', str(netifs))
for netif in netifs:
# if no interface defined, try to find it automatically
if my_if == '':
if netif.find('eth') == 0 or netif.find('enp') == 0 or netif.find('eno') == 0:
return netif
if my_if == '':
if 'dut_p1' in netifs:
return 'dut_p1'
else:
if netif.find(my_if) == 0:
return my_if
for netif in netifs:
# if no interface defined, try to find it automatically
if netif.find('eth') == 0 or netif.find('enp') == 0 or netif.find('eno') == 0:
return netif
elif my_if in netifs:
return my_if
raise RuntimeError('network interface not found')
@@ -135,7 +137,7 @@ def test_examples_udp_multicast_proto(dut: Dut, ip_version: str = 'ipv4', nic: s
try:
data, recv_addr = sock.recvfrom(1024)
logging.info(f'Received {len(data)} bytes from {recv_addr}')
except socket.timeout:
except TimeoutError:
raise RuntimeError(f'Timeout waiting for {ip_version} multicast message from ESP32')
# Check if received from expected source
@@ -143,7 +145,7 @@ def test_examples_udp_multicast_proto(dut: Dut, ip_version: str = 'ipv4', nic: s
raise RuntimeError(f'Received {ip_version} multicast message from unexpected source')
# Send multicast message
message = '!!! Multicast test message from host !!!'.encode()
message = b'!!! Multicast test message from host !!!'
logging.info(f'Sending {ip_version} multicast message to {multicast_addr}:{PORT}')
sock.sendto(message, (multicast_addr, PORT))
if ip_version == 'ipv4_mapped':