Merge branch 'feat/upt_tests_new_runners_v5.3' into 'release/v5.3'

feat(ci): updated Ethernet test to align with new runners (v5.3)

See merge request espressif/esp-idf!43767
This commit is contained in:
Euripedes Rocha
2025-12-17 12:54:23 +01:00
3 changed files with 106 additions and 55 deletions
+13 -11
View File
@@ -30,16 +30,18 @@ class EthTestIntf(object):
netifs.sort(reverse=True)
logging.info('detected interfaces: %s', str(netifs))
for netif in netifs:
# if no interface defined, try to find it automatically
if my_if == '':
if netif.find('eth') == 0 or netif.find('enp') == 0 or netif.find('eno') == 0:
self.target_if = netif
break
if my_if == '':
if 'dut_p1' in netifs:
self.target_if = 'dut_p1'
else:
if netif.find(my_if) == 0:
self.target_if = my_if
break
for netif in netifs:
# if no interface defined, try to find it automatically
if netif.find('eth') == 0 or netif.find('enp') == 0 or netif.find('eno') == 0:
self.target_if = netif
break
elif my_if in netifs:
self.target_if = my_if
if self.target_if == '':
raise RuntimeError('network interface not found')
logging.info('Use %s for testing', self.target_if)
@@ -131,8 +133,8 @@ def ethernet_int_emac_test(dut: IdfDut) -> None:
dut.run_all_single_board_cases(group='esp_emac', timeout=240)
def ethernet_l2_test(dut: IdfDut) -> None:
target_if = EthTestIntf(ETH_TYPE)
def ethernet_l2_test(dut: IdfDut, test_if: str = '') -> None:
target_if = EthTestIntf(ETH_TYPE, test_if)
dut.expect_exact('Press ENTER to see the list of tests')
dut.write('\n')
@@ -11,8 +11,11 @@ import subprocess
import time
from concurrent.futures import Future
from concurrent.futures import ThreadPoolExecutor
from typing import Generator
from typing import List
from typing import Match
from typing import Optional
from typing import Tuple
from typing import Union
import netifaces
@@ -267,28 +270,38 @@ def send_brcast_msg_endnode_to_host(endnode: EndnodeSsh, host_brcast_ip: str, te
return nc_host_out
@pytest.mark.esp32
@pytest.mark.eth_w5500
@pytest.mark.parametrize('config', [
'w5500',
], indirect=True)
def test_esp_eth_bridge(
dut: Dut,
dev_user: str,
dev_password: str
) -> None:
def get_legacy_host_name_match() -> Optional[Match[str]]:
host_name = socket.gethostname()
regex = r'ethVM-(\d+)-(\d+)'
host_name_match = re.search(regex, host_name, re.DOTALL)
return host_name_match
def get_host_info() -> Tuple[int, int]:
# Get switch configuration info from the hostname (legacy runners)
sw_info = get_legacy_host_name_match()
if sw_info is not None:
sw_num = int(sw_info.group(1))
port_num = int(sw_info.group(2))
return sw_num, port_num
else:
# Get switch configuration info from the IP address of the `switch` interface (new runners)
switch_if_ip = get_host_ip_by_interface('switch', netifaces.AF_INET)
# Parse IP address: x.y.<sw_num>.<port_num>
ip_parts = switch_if_ip.split('.')
if len(ip_parts) == 4:
sw_num = int(ip_parts[2])
port_num = int(ip_parts[3])
return sw_num, port_num
else:
raise RuntimeError('Unexpected switch IP address')
def eth_bridge_test(dut: Dut, dev_user: str, dev_password: str) -> None:
# ------------------------------ #
# Pre-test testbed configuration #
# ------------------------------ #
# Get switch configuration info from the hostname
host_name = socket.gethostname()
regex = r'ethVM-(\d+)-(\d+)'
sw_info = re.search(regex, host_name, re.DOTALL)
if sw_info is None:
raise RuntimeError('Unexpected hostname')
sw_num = int(sw_info.group(1))
port_num = int(sw_info.group(2))
sw_num, port_num = get_host_info()
port_num_endnode = int(port_num) + 1 # endnode address is always + 1 to the host
endnode = EndnodeSsh(f'10.10.{sw_num}.{port_num_endnode}', ETHVM_ENDNODE_USER)
@@ -324,7 +337,10 @@ def test_esp_eth_bridge(
host_ip = get_host_ip_by_interface(host_if, netifaces.AF_INET)
logging.info('Host IP %s', host_ip)
endnode_if = host_if # endnode is a clone of the host
if get_legacy_host_name_match() is not None:
endnode_if = host_if # endnode is a clone of the host (legacy runners)
else:
endnode_if = 'dut_p2' # interface name connected to the second port of the DUT (new runners)
# Endnode MAC
endnode_mac = get_endnode_mac_by_interface(endnode, endnode_if)
logging.info('Endnode MAC %s', endnode_mac)
@@ -346,12 +362,12 @@ def test_esp_eth_bridge(
# TEST Objective 1: Ping the devices on the network
# --------------------------------------------------
# ping bridge
ping_test = subprocess.call(f'ping {br_ip} -c 2', shell=True)
ping_test = subprocess.call(['ping', br_ip, '-c', '2'])
if ping_test != 0:
raise RuntimeError('ESP bridge is not reachable')
# ping the end nodes of the network
ping_test = subprocess.call(f'ping {endnode_ip} -c 2', shell=True)
ping_test = subprocess.call(['ping', endnode_ip, '-c', '2'])
if ping_test != 0:
raise RuntimeError('End node is not reachable')
@@ -518,13 +534,13 @@ def test_esp_eth_bridge(
logging.info('Drop `Endnode` MAC')
dut.write('add --addr=' + endnode_mac + ' -d')
dut.expect_exact('Bridge Config OK!')
ping_test = subprocess.call(f'ping {endnode_ip} -c 2', shell=True)
ping_test = subprocess.call(['ping', endnode_ip, '-c', '2'])
if ping_test == 0:
raise RuntimeError('End node should not be reachable')
logging.info('Remove Drop `Endnode` MAC entry')
dut.write('remove --addr=' + endnode_mac)
dut.expect_exact('Bridge Config OK!')
ping_test = subprocess.call(f'ping {endnode_ip} -c 2', shell=True)
ping_test = subprocess.call(['ping', endnode_ip, '-c', '2'])
if ping_test != 0:
raise RuntimeError('End node is not reachable')
@@ -557,9 +573,9 @@ def test_esp_eth_bridge(
# Remove ARP record from Test host computer. ARP is broadcasted, hence Bridge port does not reply to a request since
# it does not receive it (no forward to Bridge port). As a result, Bridge is not pingable.
subprocess.call(f'sudo arp -d {br_ip}', shell=True)
subprocess.call('arp -a', shell=True)
ping_test = subprocess.call(f'ping {br_ip} -c 2', shell=True)
subprocess.call(['sudo', 'arp', '-d', br_ip])
subprocess.call(['arp', '-a'])
ping_test = subprocess.call(['ping', br_ip, '-c', '2'])
if ping_test == 0:
raise RuntimeError('Bridge should not be reachable')
@@ -569,7 +585,7 @@ def test_esp_eth_bridge(
dut.expect_exact('Bridge Config OK!')
dut.write('add --addr=ff:ff:ff:ff:ff:ff -p 1 -c')
dut.expect_exact('Bridge Config OK!')
ping_test = subprocess.call(f'ping {br_ip} -c 2', shell=True)
ping_test = subprocess.call(['ping', br_ip, '-c', '2'])
if ping_test != 0:
raise RuntimeError('Bridge is not reachable')
@@ -580,3 +596,28 @@ def test_esp_eth_bridge(
endnode.close()
switch1.close()
@pytest.fixture(scope='session', autouse=True)
def setup_test_environment() -> Generator[None, None, None]:
# Fixture code to run before any tests in the session
# make sure dut_p2 is down (only for new runners)
if get_legacy_host_name_match() is None:
subprocess.call(['sudo', 'ip', 'link', 'set', 'down', 'dev', 'dut_p2'])
yield # Tests run here
# Optional teardown after all tests...
@pytest.mark.esp32
@pytest.mark.eth_w5500
@pytest.mark.parametrize(
'config',
[
'w5500',
],
indirect=True,
)
def test_esp_eth_bridge(dut: Dut, dev_user: str, dev_password: str) -> None:
eth_bridge_test(dut, dev_user, dev_password)
@@ -18,19 +18,27 @@ ETH_TYPE_3 = 0x2223
@contextlib.contextmanager
def configure_eth_if(eth_type: int, target_if: str='') -> Iterator[socket.socket]:
def configure_eth_if(eth_type: int, target_if: str = '') -> Iterator[socket.socket]:
# try to determine which interface to use
netifs = os.listdir('/sys/class/net/')
# order matters - ETH NIC with the highest number is connected to DUT on CI runner
netifs.sort(reverse=True)
logging.info('detected interfaces: %s', str(netifs))
if target_if == '':
# try to determine which interface to use
netifs = os.listdir('/sys/class/net/')
# order matters - ETH NIC with the highest number is connected to DUT on CI runner
netifs.sort(reverse=True)
logging.info('detected interfaces: %s', str(netifs))
for netif in netifs:
if netif.find('eth') == 0 or netif.find('enx') == 0 or netif.find('enp') == 0 or netif.find('eno') == 0:
target_if = netif
break
if target_if == '':
raise Exception('no network interface found')
if 'dut_p1' in netifs:
target_if = 'dut_p1'
else:
for netif in netifs:
# if no interface defined, try to find it automatically
if netif.find('eth') == 0 or netif.find('enp') == 0 or netif.find('eno') == 0:
target_if = netif
break
elif target_if not in netifs:
target_if = ''
if target_if == '':
raise RuntimeError('network interface not found')
logging.info('Use %s for testing', target_if)
so = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.htons(eth_type))
@@ -75,7 +83,7 @@ def recv_eth_frame(eth_type: int, eth_if: str='') -> str:
return str(eth_frame.load.decode().rstrip('\x00'))
def actual_test(dut: Dut) -> None:
def actual_test(dut: Dut, test_if: str = '') -> None:
# Get DUT's MAC address
res = dut.expect(
r'([\s\S]*)'
@@ -84,17 +92,17 @@ def actual_test(dut: Dut) -> None:
dut_mac = res.group(2)
# Receive "ESP32 Hello frame"
recv_eth_frame(ETH_TYPE_3)
recv_eth_frame(ETH_TYPE_3, test_if)
# Sent a message and receive its echo
message = 'ESP32 test message with EthType ' + hex(ETH_TYPE_1)
echoed = send_recv_eth_frame(message, ETH_TYPE_1, dut_mac)
echoed = send_recv_eth_frame(message, ETH_TYPE_1, dut_mac, test_if)
if echoed == message:
logging.info('PASS')
else:
raise Exception('Echoed message does not match!')
message = 'ESP32 test message with EthType ' + hex(ETH_TYPE_2)
echoed = send_recv_eth_frame(message, ETH_TYPE_2, dut_mac)
echoed = send_recv_eth_frame(message, ETH_TYPE_2, dut_mac, test_if)
if echoed == message:
logging.info('PASS')
else: