mirror of
https://github.com/espressif/esp-idf.git
synced 2026-04-27 19:13:21 +00:00
tests: change wifi tests expect timeout to 60
- wifi_router - wifi_ap - wifi_high_traffic
This commit is contained in:
@@ -18,12 +18,12 @@ def get_sdk_path() -> str:
|
||||
return idf_path
|
||||
|
||||
|
||||
class CustomProcess(object):
|
||||
def __init__(self, cmd: str, logfile: str, verbose:bool =True) -> None:
|
||||
class CustomProcess:
|
||||
def __init__(self, cmd: str, logfile: str, verbose: bool = True) -> None:
|
||||
self.verbose = verbose
|
||||
self.f = open(logfile, 'w', encoding='utf-8')
|
||||
if self.verbose:
|
||||
logging.info('Starting {} > {}'.format(cmd, self.f.name))
|
||||
logging.info(f'Starting {cmd} > {self.f.name}')
|
||||
self.pexpect_proc = pexpect.spawn(cmd, timeout=60, logfile=self.f, encoding='utf-8', codec_errors='ignore')
|
||||
|
||||
def __enter__(self): # type: ignore
|
||||
@@ -32,7 +32,7 @@ class CustomProcess(object):
|
||||
def close(self) -> None:
|
||||
self.pexpect_proc.terminate(force=True)
|
||||
|
||||
def __exit__(self, type, value, traceback): # type: ignore
|
||||
def __exit__(self, type, value, traceback): # type: ignore # noqa
|
||||
self.close()
|
||||
self.f.close()
|
||||
|
||||
@@ -60,7 +60,7 @@ def test_examples_esp_local_ctrl(config: str, dut: Dut) -> None:
|
||||
ap_ssid = get_env_config_variable(env_name, 'ap_ssid')
|
||||
ap_password = get_env_config_variable(env_name, 'ap_password')
|
||||
dut.write(f'{ap_ssid} {ap_password}')
|
||||
dut_ip = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]')[1].decode()
|
||||
dut_ip = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=60)[1].decode()
|
||||
if config == 'default':
|
||||
dut.expect('esp_https_server: Starting server')
|
||||
dut.expect('esp_https_server: Server listening on port 443')
|
||||
@@ -75,20 +75,32 @@ def test_examples_esp_local_ctrl(config: str, dut: Dut) -> None:
|
||||
# Running mDNS services in docker is not a trivial task. Therefore, the script won't connect to the host name but
|
||||
# to IP address. However, the certificates were generated for the host name and will be rejected.
|
||||
if config == 'default':
|
||||
cmd = ' '.join([sys.executable, os.path.join(idf_path, rel_project_path, 'scripts/esp_local_ctrl.py'),
|
||||
'--sec_ver 2',
|
||||
'--sec2_username wifiprov',
|
||||
'--sec2_pwd abcd1234',
|
||||
'--name', dut_ip,
|
||||
'--dont-check-hostname']) # don't reject the certificate because of the hostname
|
||||
cmd = ' '.join(
|
||||
[
|
||||
sys.executable,
|
||||
os.path.join(idf_path, rel_project_path, 'scripts/esp_local_ctrl.py'),
|
||||
'--sec_ver 2',
|
||||
'--sec2_username wifiprov',
|
||||
'--sec2_pwd abcd1234',
|
||||
'--name',
|
||||
dut_ip,
|
||||
'--dont-check-hostname',
|
||||
]
|
||||
) # don't reject the certificate because of the hostname
|
||||
elif config == 'http':
|
||||
cmd = ' '.join([sys.executable, os.path.join(idf_path, rel_project_path, 'scripts/esp_local_ctrl.py'),
|
||||
'--sec_ver 2',
|
||||
'--transport http',
|
||||
'--sec2_username wifiprov',
|
||||
'--sec2_pwd abcd1234',
|
||||
'--name', dut_ip,
|
||||
'--dont-check-hostname'])
|
||||
cmd = ' '.join(
|
||||
[
|
||||
sys.executable,
|
||||
os.path.join(idf_path, rel_project_path, 'scripts/esp_local_ctrl.py'),
|
||||
'--sec_ver 2',
|
||||
'--transport http',
|
||||
'--sec2_username wifiprov',
|
||||
'--sec2_pwd abcd1234',
|
||||
'--name',
|
||||
dut_ip,
|
||||
'--dont-check-hostname',
|
||||
]
|
||||
)
|
||||
esp_local_ctrl_log = os.path.join(idf_path, rel_project_path, 'esp_local_ctrl.log')
|
||||
with CustomProcess(cmd, esp_local_ctrl_log) as ctrl_py:
|
||||
|
||||
@@ -97,15 +109,15 @@ def test_examples_esp_local_ctrl(config: str, dut: Dut) -> None:
|
||||
ctrl_py.pexpect_proc.expect_exact('==== Available Properties ====')
|
||||
ctrl_py.pexpect_proc.expect(re.compile(r'S.N. Name\s+Type\s+Flags\s+Value'))
|
||||
ctrl_py.pexpect_proc.expect(re.compile(r'\[ 1\] timestamp \(us\)\s+TIME\(us\)\s+Read-Only\s+\d+'))
|
||||
ctrl_py.pexpect_proc.expect(re.compile(r'\[ 2\] property1\s+INT32\s+{}'.format(prop1)))
|
||||
ctrl_py.pexpect_proc.expect(re.compile(rf'\[ 2\] property1\s+INT32\s+{prop1}'))
|
||||
ctrl_py.pexpect_proc.expect(re.compile(r'\[ 3\] property2\s+BOOLEAN\s+Read-Only\s+(True)|(False)'))
|
||||
ctrl_py.pexpect_proc.expect(re.compile(r'\[ 4\] property3\s+STRING\s+{}'.format(prop3)))
|
||||
ctrl_py.pexpect_proc.expect_exact('Select properties to set (0 to re-read, \'q\' to quit) :')
|
||||
ctrl_py.pexpect_proc.expect(re.compile(rf'\[ 4\] property3\s+STRING\s+{prop3}'))
|
||||
ctrl_py.pexpect_proc.expect_exact("Select properties to set (0 to re-read, 'q' to quit) :")
|
||||
|
||||
property1 = 123456789
|
||||
property3 = ''
|
||||
|
||||
ctrl_py.pexpect_proc.expect_exact('Connecting to {}'.format(dut_ip))
|
||||
ctrl_py.pexpect_proc.expect_exact(f'Connecting to {dut_ip}')
|
||||
if config == 'default':
|
||||
dut.expect('esp_https_server: performing session handshake', timeout=60)
|
||||
expect_properties(property1, property3)
|
||||
@@ -121,14 +133,14 @@ def test_examples_esp_local_ctrl(config: str, dut: Dut) -> None:
|
||||
ctrl_py.pexpect_proc.sendline('2')
|
||||
ctrl_py.pexpect_proc.expect_exact('Enter value to set for property (property1) :')
|
||||
ctrl_py.pexpect_proc.sendline(str(property1))
|
||||
dut.expect_exact('control: Setting property1 value to {}'.format(property1))
|
||||
dut.expect_exact(f'control: Setting property1 value to {property1}')
|
||||
expect_properties(property1, property3)
|
||||
|
||||
property3 = 'test'
|
||||
ctrl_py.pexpect_proc.sendline('4')
|
||||
ctrl_py.pexpect_proc.expect_exact('Enter value to set for property (property3) :')
|
||||
ctrl_py.pexpect_proc.sendline(property3)
|
||||
dut.expect_exact('control: Setting property3 value to {}'.format(property3))
|
||||
dut.expect_exact(f'control: Setting property3 value to {property3}')
|
||||
expect_properties(property1, property3)
|
||||
|
||||
ctrl_py.pexpect_proc.sendline('q')
|
||||
|
||||
@@ -2,9 +2,6 @@
|
||||
#
|
||||
# SPDX-FileCopyrightText: 2018-2022 Espressif Systems (Shanghai) CO LTD
|
||||
# SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
from __future__ import division, print_function, unicode_literals
|
||||
|
||||
import logging
|
||||
import os
|
||||
import sys
|
||||
@@ -41,7 +38,7 @@ def test_examples_protocol_http_server_advanced(dut: Dut) -> None:
|
||||
# Get binary file
|
||||
binary_file = os.path.join(dut.app.binary_path, 'tests.bin')
|
||||
bin_size = os.path.getsize(binary_file)
|
||||
logging.info('http_server_bin_size : {}KB'.format(bin_size // 1024))
|
||||
logging.info(f'http_server_bin_size : {bin_size // 1024}KB')
|
||||
|
||||
logging.info('Starting http_server advanced test app')
|
||||
|
||||
@@ -53,7 +50,7 @@ def test_examples_protocol_http_server_advanced(dut: Dut) -> None:
|
||||
ap_ssid = get_env_config_variable(env_name, 'ap_ssid')
|
||||
ap_password = get_env_config_variable(env_name, 'ap_password')
|
||||
dut.write(f'{ap_ssid} {ap_password}')
|
||||
got_ip = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
|
||||
got_ip = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=60)[1].decode()
|
||||
|
||||
got_port = dut.expect(r"(?:[\s\S]*)Started HTTP server on port: '(\d+)'", timeout=30)[1].decode()
|
||||
|
||||
@@ -66,8 +63,8 @@ def test_examples_protocol_http_server_advanced(dut: Dut) -> None:
|
||||
max_uri_len = int(result[4])
|
||||
max_stack_size = int(result[5])
|
||||
|
||||
logging.info('Got Port : {}'.format(got_port))
|
||||
logging.info('Got IP : {}'.format(got_ip))
|
||||
logging.info(f'Got Port : {got_port}')
|
||||
logging.info(f'Got IP : {got_ip}')
|
||||
|
||||
# Run test script
|
||||
# If failed raise appropriate exception
|
||||
|
||||
@@ -28,7 +28,7 @@ def test_examples_protocol_http_server_file_serving(dut: Dut) -> None:
|
||||
# Get binary file
|
||||
binary_file = os.path.join(dut.app.binary_path, 'file_server.bin')
|
||||
bin_size = os.path.getsize(binary_file)
|
||||
logging.info('file_server_bin_size : {}KB'.format(bin_size // 1024))
|
||||
logging.info(f'file_server_bin_size : {bin_size // 1024}KB')
|
||||
logging.info('Erasing the storage partition on the chip')
|
||||
dut.serial.erase_partition('storage')
|
||||
# Upload binary and start testing
|
||||
@@ -37,11 +37,11 @@ def test_examples_protocol_http_server_file_serving(dut: Dut) -> None:
|
||||
dut.expect('Initializing SPIFFS', timeout=60)
|
||||
# Parse IP address of STA
|
||||
logging.info('Waiting to connect with AP')
|
||||
got_ip = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
|
||||
got_ip = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=60)[1].decode()
|
||||
# Expected logs
|
||||
got_port = dut.expect(r"Starting HTTP Server on port: '(\d+)'", timeout=30)[1].decode()
|
||||
logging.info('Got IP : {}'.format(got_ip))
|
||||
logging.info('Got Port : {}'.format(got_port))
|
||||
logging.info(f'Got IP : {got_ip}')
|
||||
logging.info(f'Got Port : {got_port}')
|
||||
|
||||
# Run test script
|
||||
conn = client.start_session(got_ip, got_port)
|
||||
|
||||
@@ -32,10 +32,10 @@ def test_examples_protocol_http_server_persistence(dut: Dut) -> None:
|
||||
# Get binary file
|
||||
binary_file = os.path.join(dut.app.binary_path, 'persistent_sockets.bin')
|
||||
bin_size = os.path.getsize(binary_file)
|
||||
logging.info('http_server_bin_size : {}KB'.format(bin_size // 1024))
|
||||
logging.info(f'http_server_bin_size : {bin_size // 1024}KB')
|
||||
|
||||
# Upload binary and start testing
|
||||
logging.info('Starting http_server persistance test app')
|
||||
logging.info('Starting http_server persistence test app')
|
||||
|
||||
# Parse IP address of STA
|
||||
logging.info('Waiting to connect with AP')
|
||||
@@ -45,11 +45,11 @@ def test_examples_protocol_http_server_persistence(dut: Dut) -> None:
|
||||
ap_ssid = get_env_config_variable(env_name, 'ap_ssid')
|
||||
ap_password = get_env_config_variable(env_name, 'ap_password')
|
||||
dut.write(f'{ap_ssid} {ap_password}')
|
||||
got_ip = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
|
||||
got_ip = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=60)[1].decode()
|
||||
got_port = dut.expect(r"(?:[\s\S]*)Starting server on port: '(\d+)'", timeout=30)[1].decode()
|
||||
|
||||
logging.info('Got IP : {}'.format(got_ip))
|
||||
logging.info('Got Port : {}'.format(got_port))
|
||||
logging.info(f'Got IP : {got_ip}')
|
||||
logging.info(f'Got Port : {got_port}')
|
||||
|
||||
# Expected Logs
|
||||
dut.expect('Registering URI handlers', timeout=30)
|
||||
@@ -78,8 +78,8 @@ def test_examples_protocol_http_server_persistence(dut: Dut) -> None:
|
||||
dut.expect('PUT allocating new session', timeout=30)
|
||||
|
||||
# Retest PUT request and change session context value
|
||||
num = random.randint(0,100)
|
||||
logging.info('Adding: {}'.format(num))
|
||||
num = random.randint(0, 100)
|
||||
logging.info(f'Adding: {num}')
|
||||
client.putreq(conn, '/adder', str(num))
|
||||
visitor += 1
|
||||
adder += num
|
||||
@@ -97,7 +97,7 @@ def test_examples_protocol_http_server_persistence(dut: Dut) -> None:
|
||||
# Test POST request and session persistence
|
||||
random_nums = [random.randint(0,100) for _ in range(100)]
|
||||
for num in random_nums:
|
||||
logging.info('Adding: {}'.format(num))
|
||||
logging.info(f'Adding: {num}')
|
||||
client.postreq(conn, '/adder', str(num))
|
||||
visitor += 1
|
||||
adder += num
|
||||
@@ -105,7 +105,7 @@ def test_examples_protocol_http_server_persistence(dut: Dut) -> None:
|
||||
dut.expect('/adder handler read ' + str(num), timeout=30)
|
||||
|
||||
# Test GET request and session persistence
|
||||
logging.info('Matching final sum: {}'.format(adder))
|
||||
logging.info(f'Matching final sum: {adder}')
|
||||
if client.getreq(conn, '/adder').decode() != str(adder):
|
||||
raise RuntimeError
|
||||
visitor += 1
|
||||
|
||||
@@ -41,13 +41,13 @@ class http_client_thread(threading.Thread):
|
||||
def run(self) -> None:
|
||||
try:
|
||||
self.open_connection(self.ip, self.port, self.delay)
|
||||
except socket.timeout:
|
||||
except TimeoutError:
|
||||
self.exc = 1
|
||||
|
||||
def join(self, timeout=None): # type: ignore
|
||||
threading.Thread.join(self)
|
||||
if self.exc:
|
||||
raise socket.timeout
|
||||
raise TimeoutError
|
||||
|
||||
|
||||
# When running on local machine execute the following before running this script
|
||||
@@ -64,7 +64,7 @@ def test_examples_protocol_http_server_simple(dut: Dut) -> None:
|
||||
# Get binary file
|
||||
binary_file = os.path.join(dut.app.binary_path, 'simple.bin')
|
||||
bin_size = os.path.getsize(binary_file)
|
||||
logging.info('http_server_bin_size : {}KB'.format(bin_size // 1024))
|
||||
logging.info(f'http_server_bin_size : {bin_size // 1024}KB')
|
||||
|
||||
# Upload binary and start testing
|
||||
logging.info('Starting http_server simple test app')
|
||||
@@ -77,11 +77,11 @@ def test_examples_protocol_http_server_simple(dut: Dut) -> None:
|
||||
ap_ssid = get_env_config_variable(env_name, 'ap_ssid')
|
||||
ap_password = get_env_config_variable(env_name, 'ap_password')
|
||||
dut.write(' '.join([ap_ssid, ap_password]))
|
||||
got_ip = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
|
||||
got_ip = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=60)[1].decode()
|
||||
got_port = dut.expect(r"(?:[\s\S]*)Starting server on port: '(\d+)'", timeout=30)[1].decode()
|
||||
|
||||
logging.info('Got IP : {}'.format(got_ip))
|
||||
logging.info('Got Port : {}'.format(got_port))
|
||||
logging.info(f'Got IP : {got_ip}')
|
||||
logging.info(f'Got Port : {got_port}')
|
||||
|
||||
# Expected Logs
|
||||
dut.expect('Registering URI handlers', timeout=30)
|
||||
@@ -138,7 +138,7 @@ def test_examples_protocol_http_server_lru_purge_enable(dut: Dut) -> None:
|
||||
# Get binary file
|
||||
binary_file = os.path.join(dut.app.binary_path, 'simple.bin')
|
||||
bin_size = os.path.getsize(binary_file)
|
||||
logging.info('http_server_bin_size : {}KB'.format(bin_size // 1024))
|
||||
logging.info(f'http_server_bin_size : {bin_size // 1024}KB')
|
||||
|
||||
# Upload binary and start testing
|
||||
logging.info('Starting http_server simple test app')
|
||||
@@ -151,11 +151,11 @@ def test_examples_protocol_http_server_lru_purge_enable(dut: Dut) -> None:
|
||||
ap_ssid = get_env_config_variable(env_name, 'ap_ssid')
|
||||
ap_password = get_env_config_variable(env_name, 'ap_password')
|
||||
dut.write(f'{ap_ssid} {ap_password}')
|
||||
got_ip = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
|
||||
got_ip = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=60)[1].decode()
|
||||
got_port = dut.expect(r"(?:[\s\S]*)Starting server on port: '(\d+)'", timeout=30)[1].decode()
|
||||
|
||||
logging.info('Got IP : {}'.format(got_ip))
|
||||
logging.info('Got Port : {}'.format(got_port))
|
||||
logging.info(f'Got IP : {got_ip}')
|
||||
logging.info(f'Got Port : {got_port}')
|
||||
|
||||
# Expected Logs
|
||||
dut.expect('Registering URI handlers', timeout=30)
|
||||
@@ -167,7 +167,7 @@ def test_examples_protocol_http_server_lru_purge_enable(dut: Dut) -> None:
|
||||
thread.start()
|
||||
threads.append(thread)
|
||||
except OSError as err:
|
||||
logging.info('Error: unable to start thread, {}'.format(err))
|
||||
logging.info(f'Error: unable to start thread, {err}')
|
||||
|
||||
for t in threads:
|
||||
t.join()
|
||||
|
||||
@@ -2,9 +2,6 @@
|
||||
#
|
||||
# SPDX-FileCopyrightText: 2021-2022 Espressif Systems (Shanghai) CO LTD
|
||||
# SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
from __future__ import division, print_function, unicode_literals
|
||||
|
||||
import logging
|
||||
import os
|
||||
|
||||
@@ -30,8 +27,8 @@ class WsClient:
|
||||
self.ip = ip
|
||||
self.ws = websocket.WebSocket()
|
||||
|
||||
def __enter__(self): # type: ignore
|
||||
self.ws.connect('ws://{}:{}/ws'.format(self.ip, self.port))
|
||||
def __enter__(self): # type: ignore
|
||||
self.ws.connect(f'ws://{self.ip}:{self.port}/{self.uri}')
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc_value, traceback): # type: ignore
|
||||
@@ -54,7 +51,7 @@ def test_examples_protocol_http_ws_echo_server(dut: Dut) -> None:
|
||||
# Get binary file
|
||||
binary_file = os.path.join(dut.app.binary_path, 'ws_echo_server.bin')
|
||||
bin_size = os.path.getsize(binary_file)
|
||||
logging.info('http_ws_server_bin_size : {}KB'.format(bin_size // 1024))
|
||||
logging.info(f'http_ws_server_bin_size : {bin_size // 1024}KB')
|
||||
|
||||
logging.info('Starting ws-echo-server test app based on http_server')
|
||||
|
||||
@@ -66,11 +63,11 @@ def test_examples_protocol_http_ws_echo_server(dut: Dut) -> None:
|
||||
ap_ssid = get_env_config_variable(env_name, 'ap_ssid')
|
||||
ap_password = get_env_config_variable(env_name, 'ap_password')
|
||||
dut.write(f'{ap_ssid} {ap_password}')
|
||||
got_ip = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
|
||||
got_ip = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=60)[1].decode()
|
||||
got_port = dut.expect(r"Starting server on port: '(\d+)'", timeout=30)[1].decode()
|
||||
|
||||
logging.info('Got IP : {}'.format(got_ip))
|
||||
logging.info('Got Port : {}'.format(got_port))
|
||||
logging.info(f'Got IP : {got_ip}')
|
||||
logging.info(f'Got Port : {got_port}')
|
||||
|
||||
# Start ws server test
|
||||
with WsClient(got_ip, int(got_port)) as ws:
|
||||
@@ -78,21 +75,21 @@ def test_examples_protocol_http_ws_echo_server(dut: Dut) -> None:
|
||||
for expected_opcode in [OPCODE_TEXT, OPCODE_BIN, OPCODE_PING]:
|
||||
ws.write(data=DATA, opcode=expected_opcode)
|
||||
opcode, data = ws.read()
|
||||
logging.info('Testing opcode {}: Received opcode:{}, data:{}'.format(expected_opcode, opcode, data))
|
||||
logging.info(f'Testing opcode {expected_opcode}: Received opcode:{opcode}, data:{data}')
|
||||
data = data.decode()
|
||||
if expected_opcode == OPCODE_PING:
|
||||
dut.expect('Got a WS PING frame, Replying PONG')
|
||||
if opcode != OPCODE_PONG or data != DATA:
|
||||
raise RuntimeError('Failed to receive correct opcode:{} or data:{}'.format(opcode, data))
|
||||
raise RuntimeError(f'Failed to receive correct opcode:{opcode} or data:{data}')
|
||||
continue
|
||||
dut_data = dut.expect(r'Got packet with message: ([A-Za-z0-9_]*)')[1]
|
||||
dut_opcode = dut.expect(r'Packet type: ([0-9]*)')[1].decode()
|
||||
|
||||
if opcode != expected_opcode or data != DATA or opcode != int(dut_opcode) or (data not in str(dut_data)):
|
||||
raise RuntimeError('Failed to receive correct opcode:{} or data:{}'.format(opcode, data))
|
||||
raise RuntimeError(f'Failed to receive correct opcode:{opcode} or data:{data}')
|
||||
ws.write(data='Trigger async', opcode=OPCODE_TEXT)
|
||||
opcode, data = ws.read()
|
||||
logging.info('Testing async send: Received opcode:{}, data:{}'.format(opcode, data))
|
||||
logging.info(f'Testing async send: Received opcode:{opcode}, data:{data}')
|
||||
data = data.decode()
|
||||
if opcode != OPCODE_TEXT or data != 'Async data':
|
||||
raise RuntimeError('Failed to receive correct opcode:{} or data:{}'.format(opcode, data))
|
||||
|
||||
@@ -2,7 +2,6 @@
|
||||
#
|
||||
# SPDX-FileCopyrightText: 2021-2022 Espressif Systems (Shanghai) CO LTD
|
||||
# SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
import http.client
|
||||
import logging
|
||||
import os
|
||||
@@ -119,7 +118,7 @@ def test_examples_protocol_https_server_simple(dut: Dut) -> None:
|
||||
# Parse IP address and port of the server
|
||||
dut.expect(r'Starting server')
|
||||
got_port = int(dut.expect(r'Server listening on port (\d+)', timeout=30)[1].decode())
|
||||
got_ip = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
|
||||
got_ip = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=60)[1].decode()
|
||||
|
||||
# Expected logs
|
||||
|
||||
@@ -191,7 +190,7 @@ def test_examples_protocol_https_server_simple_dynamic_buffers(dut: Dut) -> None
|
||||
# Parse IP address and port of the server
|
||||
dut.expect(r'Starting server')
|
||||
got_port = int(dut.expect(r'Server listening on port (\d+)', timeout=30)[1].decode())
|
||||
got_ip = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
|
||||
got_ip = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=60)[1].decode()
|
||||
|
||||
# Expected logs
|
||||
|
||||
|
||||
@@ -1,16 +1,13 @@
|
||||
#!/usr/bin/env python
|
||||
#
|
||||
# SPDX-FileCopyrightText: 2021-2022 Espressif Systems (Shanghai) CO LTD
|
||||
# SPDX-FileCopyrightText: 2021-2026 Espressif Systems (Shanghai) CO LTD
|
||||
# SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
from __future__ import division, print_function, unicode_literals
|
||||
|
||||
import logging
|
||||
import os
|
||||
import threading
|
||||
import time
|
||||
from types import TracebackType
|
||||
from typing import Any, Optional
|
||||
from typing import Any
|
||||
from typing import Optional
|
||||
|
||||
import pytest
|
||||
import websocket
|
||||
@@ -30,20 +27,20 @@ class WsClient:
|
||||
self.ip = ip
|
||||
sslopt = {'ca_certs':ca_file, 'check_hostname': False}
|
||||
self.ws = websocket.WebSocket(sslopt=sslopt)
|
||||
# Set timeout to 10 seconds to avoid conection failure at the time of handshake
|
||||
# Set timeout to 10 seconds to avoid connection failure at the time of handshake
|
||||
self.ws.settimeout(10)
|
||||
|
||||
def __enter__(self): # type: ignore
|
||||
self.ws.connect('wss://{}:{}/ws'.format(self.ip, self.port))
|
||||
self.ws.connect(f'wss://{self.ip}:{self.port}/ws')
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc_value, traceback): # type: (type, RuntimeError, TracebackType) -> None
|
||||
def __exit__(self, exc_type, exc_value, traceback): # type: ignore
|
||||
self.ws.close()
|
||||
|
||||
def read(self): # type: () -> Any
|
||||
def read(self) -> Any:
|
||||
return self.ws.recv_data(control_frame=True)
|
||||
|
||||
def write(self, data, opcode=OPCODE_TEXT): # type: (str, int) -> Any
|
||||
def write(self, data: str, opcode: int = OPCODE_TEXT) -> Any:
|
||||
if opcode == OPCODE_PING:
|
||||
return self.ws.ping(data)
|
||||
if opcode == OPCODE_PONG:
|
||||
@@ -62,7 +59,7 @@ class wss_client_thread(threading.Thread):
|
||||
self.data = 'Espressif'
|
||||
self.async_response = False
|
||||
|
||||
def run(self): # type: () -> None
|
||||
def run(self) -> None:
|
||||
with WsClient(self.ip, self.port, self.ca_file) as ws:
|
||||
while True:
|
||||
try:
|
||||
@@ -74,7 +71,7 @@ class wss_client_thread(threading.Thread):
|
||||
if opcode == OPCODE_TEXT:
|
||||
if data == CORRECT_ASYNC_DATA:
|
||||
self.async_response = True
|
||||
logging.info('Thread {} obtained correct async message'.format(self.name))
|
||||
logging.info(f'Thread {self.name} obtained correct async message')
|
||||
# Keep sending pong to update the keepalive in the server
|
||||
if (time.time() - self.start_time) > 20:
|
||||
break
|
||||
@@ -84,7 +81,7 @@ class wss_client_thread(threading.Thread):
|
||||
if self.async_response is not True:
|
||||
self.exc = RuntimeError('Failed to obtain correct async data') # type: ignore
|
||||
|
||||
def join(self, timeout=0): # type:(Optional[float]) -> None
|
||||
def join(self, timeout: Optional[float] = 0) -> None:
|
||||
threading.Thread.join(self)
|
||||
if self.exc:
|
||||
raise self.exc
|
||||
@@ -113,7 +110,7 @@ def test_examples_protocol_https_wss_server(dut: Dut) -> None:
|
||||
# Get binary file
|
||||
binary_file = os.path.join(dut.app.binary_path, 'wss_server.bin')
|
||||
bin_size = os.path.getsize(binary_file)
|
||||
logging.info('https_wss_server_bin_size : {}KB'.format(bin_size // 1024))
|
||||
logging.info(f'https_wss_server_bin_size : {bin_size // 1024}KB')
|
||||
|
||||
logging.info('Starting wss_server test app')
|
||||
|
||||
@@ -126,10 +123,10 @@ def test_examples_protocol_https_wss_server(dut: Dut) -> None:
|
||||
dut.write(f'{ap_ssid} {ap_password}')
|
||||
# Parse IP address of STA
|
||||
got_port = int(dut.expect(r'Server listening on port (\d+)', timeout=30)[1].decode())
|
||||
got_ip = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
|
||||
got_ip = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=60)[1].decode()
|
||||
|
||||
logging.info('Got IP : {}'.format(got_ip))
|
||||
logging.info('Got Port : {}'.format(got_port))
|
||||
logging.info(f'Got IP : {got_ip}')
|
||||
logging.info(f'Got Port : {got_port}')
|
||||
|
||||
ca_file = os.path.join(os.path.dirname(__file__), 'main', 'certs', 'servercert.pem')
|
||||
# Start ws server test
|
||||
@@ -139,11 +136,11 @@ def test_examples_protocol_https_wss_server(dut: Dut) -> None:
|
||||
dut.expect('performing session handshake')
|
||||
client_fd = int(dut.expect(r'New client connected (\d+)', timeout=30)[1].decode())
|
||||
ws.write(data=DATA, opcode=OPCODE_TEXT)
|
||||
dut.expect(r'Received packet with message: {}'.format(DATA))
|
||||
dut.expect(rf'Received packet with message: {DATA}')
|
||||
opcode, data = ws.read()
|
||||
data = data.decode('UTF-8')
|
||||
if data != DATA:
|
||||
raise RuntimeError(f'Failed to receive the correct echo response.')
|
||||
raise RuntimeError('Failed to receive the correct echo response.')
|
||||
logging.info('Correct echo response obtained from the wss server')
|
||||
|
||||
# Test for PING
|
||||
@@ -172,11 +169,14 @@ def test_examples_protocol_https_wss_server(dut: Dut) -> None:
|
||||
logging.info('Failed the test for keep alive,\nthe client got abruptly disconnected')
|
||||
raise
|
||||
|
||||
# keepalive timeout is 10 seconds so do not respond for (10 + 1) senconds
|
||||
logging.info('Testing if client is disconnected if it does not respond for 10s i.e. keep_alive timeout (approx time = 11s)')
|
||||
# keepalive timeout is 10 seconds so do not respond for (10 + 1) seconds
|
||||
logging.info(
|
||||
'Testing if client is disconnected if it does not respond for 10s '
|
||||
'i.e. keep_alive timeout (approx time = 11s)'
|
||||
)
|
||||
try:
|
||||
dut.expect('Client not alive, closing fd {}'.format(client_fd), timeout=20)
|
||||
dut.expect('Client disconnected {}'.format(client_fd))
|
||||
dut.expect(f'Client not alive, closing fd {client_fd}', timeout=20)
|
||||
dut.expect(f'Client disconnected {client_fd}')
|
||||
except Exception:
|
||||
logging.info('ENV_ERROR:Failed the test for keep alive,\nthe connection was not closed after timeout')
|
||||
|
||||
|
||||
@@ -71,7 +71,7 @@ def test_protocols_icmp_echo_ipv6_only(dut: Dut) -> None:
|
||||
dut.write(f'{ap_ssid} {ap_password}')
|
||||
# expect all 8 octets from IPv6 (assumes it's printed in the long form)
|
||||
ipv6_r = r':'.join((r'[0-9a-fA-F]{4}',) * 8)
|
||||
ipv6 = dut.expect(ipv6_r, timeout=30)[0].decode()
|
||||
ipv6 = dut.expect(ipv6_r, timeout=60)[0].decode()
|
||||
logging.info(f'Connected AP with IPv6={ipv6}')
|
||||
interface_nr = dut.expect(r'Connected on interface: [a-z]{2}\d \((\d+)\)', timeout=30)[1].decode()
|
||||
|
||||
|
||||
@@ -1,9 +1,8 @@
|
||||
# SPDX-FileCopyrightText: 2021-2022 Espressif Systems (Shanghai) CO LTD
|
||||
# SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
import datetime
|
||||
import logging
|
||||
from typing import Any, Tuple
|
||||
from typing import Any
|
||||
|
||||
import pytest
|
||||
from common_test_methods import get_env_config_variable
|
||||
@@ -20,7 +19,7 @@ def test_get_time_from_sntp_server(dut: Dut) -> None:
|
||||
ap_ssid = get_env_config_variable(env_name, 'ap_ssid')
|
||||
ap_password = get_env_config_variable(env_name, 'ap_password')
|
||||
dut.write(f'{ap_ssid} {ap_password}')
|
||||
dut.expect('IPv4 address:')
|
||||
dut.expect('IPv4 address:', timeout=60)
|
||||
|
||||
dut.expect('Initializing and starting SNTP')
|
||||
dut.expect('Notification of a time synchronization event')
|
||||
@@ -31,11 +30,11 @@ def test_get_time_from_sntp_server(dut: Dut) -> None:
|
||||
NY_time = None
|
||||
SH_time = None
|
||||
|
||||
def check_time(prev_NY_time: Any, prev_SH_time: Any) -> Tuple[Any, Any]:
|
||||
NY_str = dut.expect(r'The current date/time in New York is: ({})'.format(TIME_FORMAT_REGEX))[1].decode()
|
||||
SH_str = dut.expect(r'The current date/time in Shanghai is: ({})'.format(TIME_FORMAT_REGEX))[1].decode()
|
||||
logging.info('New York: "{}"'.format(NY_str))
|
||||
logging.info('Shanghai: "{}"'.format(SH_str))
|
||||
def check_time(prev_NY_time: Any, prev_SH_time: Any) -> tuple[Any, Any]:
|
||||
NY_str = dut.expect(rf'The current date/time in New York is: ({TIME_FORMAT_REGEX})')[1].decode()
|
||||
SH_str = dut.expect(rf'The current date/time in Shanghai is: ({TIME_FORMAT_REGEX})')[1].decode()
|
||||
logging.info(f'New York: "{NY_str}"')
|
||||
logging.info(f'Shanghai: "{SH_str}"')
|
||||
dut.expect('Entering deep sleep for 10 seconds')
|
||||
logging.info('Sleeping...')
|
||||
new_NY_time = datetime.datetime.strptime(NY_str, TIME_FORMAT)
|
||||
@@ -49,5 +48,5 @@ def test_get_time_from_sntp_server(dut: Dut) -> None:
|
||||
|
||||
NY_time, SH_time = check_time(NY_time, SH_time)
|
||||
for i in range(2, 4):
|
||||
dut.expect('example: Boot count: {}'.format(i), timeout=30)
|
||||
dut.expect(f'example: Boot count: {i}', timeout=30)
|
||||
NY_time, SH_time = check_time(NY_time, SH_time)
|
||||
|
||||
@@ -1,12 +1,13 @@
|
||||
# SPDX-FileCopyrightText: 2021-2022 Espressif Systems (Shanghai) CO LTD
|
||||
# SPDX-FileCopyrightText: 2021-2026 Espressif Systems (Shanghai) CO LTD
|
||||
# SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
import logging
|
||||
import socket
|
||||
|
||||
import pytest
|
||||
from common_test_methods import (get_env_config_variable, get_host_ip4_by_dest_ip, get_host_ip6_by_dest_ip,
|
||||
get_my_interface_by_dest_ip)
|
||||
from common_test_methods import get_env_config_variable
|
||||
from common_test_methods import get_host_ip4_by_dest_ip
|
||||
from common_test_methods import get_host_ip6_by_dest_ip
|
||||
from common_test_methods import get_my_interface_by_dest_ip
|
||||
from pytest_embedded import Dut
|
||||
|
||||
try:
|
||||
@@ -37,13 +38,13 @@ def test_examples_tcp_client_ipv4(dut: Dut) -> None:
|
||||
ap_ssid = get_env_config_variable(env_name, 'ap_ssid')
|
||||
ap_password = get_env_config_variable(env_name, 'ap_password')
|
||||
dut.write(f'{ap_ssid} {ap_password}')
|
||||
ipv4 = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
|
||||
ipv4 = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=60)[1].decode()
|
||||
print(f'Connected with IPv4={ipv4}')
|
||||
|
||||
# test IPv4
|
||||
with TcpServer(PORT, socket.AF_INET):
|
||||
server_ip = get_host_ip4_by_dest_ip(ipv4)
|
||||
print('Connect tcp client to server IP={}'.format(server_ip))
|
||||
print(f'Connect tcp client to server IP={server_ip}')
|
||||
dut.write(server_ip)
|
||||
dut.expect('OK: Message from ESP32')
|
||||
|
||||
@@ -64,16 +65,16 @@ def test_examples_tcp_client_ipv6(dut: Dut) -> None:
|
||||
ap_ssid = get_env_config_variable(env_name, 'ap_ssid')
|
||||
ap_password = get_env_config_variable(env_name, 'ap_password')
|
||||
dut.write(f'{ap_ssid} {ap_password}')
|
||||
ipv4 = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
|
||||
ipv4 = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=60)[1].decode()
|
||||
# expect all 8 octets from IPv6 (assumes it's printed in the long form)
|
||||
ipv6_r = r':'.join((r'[0-9a-fA-F]{4}',) * 8)
|
||||
ipv6 = dut.expect(ipv6_r, timeout=30)[0].decode()
|
||||
print('Connected with IPv4={} and IPv6={}'.format(ipv4, ipv6))
|
||||
ipv6 = dut.expect(ipv6_r, timeout=60)[0].decode()
|
||||
print(f'Connected with IPv4={ipv4} and IPv6={ipv6}')
|
||||
|
||||
# test IPv6
|
||||
my_interface = get_my_interface_by_dest_ip(ipv4)
|
||||
with TcpServer(PORT, socket.AF_INET6):
|
||||
server_ip = get_host_ip6_by_dest_ip(ipv6, my_interface)
|
||||
print('Connect tcp client to server IP={}'.format(server_ip))
|
||||
print(f'Connect tcp client to server IP={server_ip}')
|
||||
dut.write(server_ip)
|
||||
dut.expect('OK: Message from ESP32')
|
||||
|
||||
@@ -1,12 +1,12 @@
|
||||
# SPDX-FileCopyrightText: 2021-2022 Espressif Systems (Shanghai) CO LTD
|
||||
# SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
import logging
|
||||
import time
|
||||
|
||||
import netifaces
|
||||
import pytest
|
||||
from common_test_methods import get_env_config_variable, get_my_interface_by_dest_ip
|
||||
from common_test_methods import get_env_config_variable
|
||||
from common_test_methods import get_my_interface_by_dest_ip
|
||||
from pytest_embedded import Dut
|
||||
|
||||
try:
|
||||
@@ -38,7 +38,7 @@ def test_examples_tcp_server_ipv4(dut: Dut) -> None:
|
||||
ap_ssid = get_env_config_variable(env_name, 'ap_ssid')
|
||||
ap_password = get_env_config_variable(env_name, 'ap_password')
|
||||
dut.write(f'{ap_ssid} {ap_password}')
|
||||
ipv4 = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
|
||||
ipv4 = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=60)[1].decode()
|
||||
logging.info(f'Connected with IPv4={ipv4}')
|
||||
time.sleep(1)
|
||||
|
||||
@@ -66,7 +66,7 @@ def test_examples_tcp_server_ipv4_esp32c2_26mhz(dut: Dut) -> None:
|
||||
ap_ssid = get_env_config_variable(env_name, 'ap_ssid')
|
||||
ap_password = get_env_config_variable(env_name, 'ap_password')
|
||||
dut.write(f'{ap_ssid} {ap_password}')
|
||||
ipv4 = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
|
||||
ipv4 = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=60)[1].decode()
|
||||
logging.info(f'Connected with IPv4={ipv4}')
|
||||
time.sleep(1)
|
||||
|
||||
@@ -93,16 +93,16 @@ def test_examples_tcp_server_ipv6(dut: Dut) -> None:
|
||||
ap_ssid = get_env_config_variable(env_name, 'ap_ssid')
|
||||
ap_password = get_env_config_variable(env_name, 'ap_password')
|
||||
dut.write(f'{ap_ssid} {ap_password}')
|
||||
ipv4 = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
|
||||
ipv4 = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=60)[1].decode()
|
||||
# expect all 8 octets from IPv6 (assumes it's printed in the long form)
|
||||
ipv6_r = r':'.join((r'[0-9a-fA-F]{4}',) * 8)
|
||||
ipv6 = dut.expect(ipv6_r, timeout=30)[0].decode()
|
||||
ipv6 = dut.expect(ipv6_r, timeout=60)[0].decode()
|
||||
logging.info(f'Connected with IPv4={ipv4} and IPv6={ipv6}')
|
||||
time.sleep(1)
|
||||
|
||||
interface = get_my_interface_by_dest_ip(ipv4)
|
||||
# test IPv6
|
||||
received = tcp_client('{}%{}'.format(ipv6, interface), PORT, MESSAGE)
|
||||
received = tcp_client(f'{ipv6}%{interface}', PORT, MESSAGE)
|
||||
if not received == MESSAGE:
|
||||
raise
|
||||
dut.expect(MESSAGE)
|
||||
@@ -126,7 +126,7 @@ def test_examples_tcp_server_ipv6_only(dut: Dut) -> None:
|
||||
dut.write(f'{ap_ssid} {ap_password}')
|
||||
# expect all 8 octets from IPv6 (assumes it's printed in the long form)
|
||||
ipv6_r = r':'.join((r'[0-9a-fA-F]{4}',) * 8)
|
||||
ipv6 = dut.expect(ipv6_r, timeout=30)[0].decode()
|
||||
ipv6 = dut.expect(ipv6_r, timeout=60)[0].decode()
|
||||
logging.info(f'Connected AP with IPv6={ipv6}')
|
||||
time.sleep(5)
|
||||
|
||||
|
||||
@@ -1,12 +1,13 @@
|
||||
# SPDX-FileCopyrightText: 2021-2022 Espressif Systems (Shanghai) CO LTD
|
||||
# SPDX-FileCopyrightText: 2021-2026 Espressif Systems (Shanghai) CO LTD
|
||||
# SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
import logging
|
||||
import socket
|
||||
|
||||
import pytest
|
||||
from common_test_methods import (get_env_config_variable, get_host_ip4_by_dest_ip, get_host_ip6_by_dest_ip,
|
||||
get_my_interface_by_dest_ip)
|
||||
from common_test_methods import get_env_config_variable
|
||||
from common_test_methods import get_host_ip4_by_dest_ip
|
||||
from common_test_methods import get_host_ip6_by_dest_ip
|
||||
from common_test_methods import get_my_interface_by_dest_ip
|
||||
from pexpect.exceptions import TIMEOUT
|
||||
from pytest_embedded import Dut
|
||||
|
||||
@@ -39,13 +40,13 @@ def test_examples_udp_client_ipv4(dut: Dut) -> None:
|
||||
ap_ssid = get_env_config_variable(env_name, 'ap_ssid')
|
||||
ap_password = get_env_config_variable(env_name, 'ap_password')
|
||||
dut.write(f'{ap_ssid} {ap_password}')
|
||||
ipv4 = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
|
||||
ipv4 = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=60)[1].decode()
|
||||
print(f'Connected with IPv4={ipv4}')
|
||||
|
||||
# test IPv4
|
||||
with UdpServer(PORT, socket.AF_INET):
|
||||
server_ip = get_host_ip4_by_dest_ip(ipv4)
|
||||
print('Connect udp client to server IP={}'.format(server_ip))
|
||||
print(f'Connect udp client to server IP={server_ip}')
|
||||
for _ in range(MAX_RETRIES):
|
||||
try:
|
||||
dut.write(server_ip)
|
||||
@@ -73,17 +74,17 @@ def test_examples_udp_client_ipv6(dut: Dut) -> None:
|
||||
ap_ssid = get_env_config_variable(env_name, 'ap_ssid')
|
||||
ap_password = get_env_config_variable(env_name, 'ap_password')
|
||||
dut.write(f'{ap_ssid} {ap_password}')
|
||||
ipv4 = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
|
||||
ipv4 = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=60)[1].decode()
|
||||
# expect all 8 octets from IPv6 (assumes it's printed in the long form)
|
||||
ipv6_r = r':'.join((r'[0-9a-fA-F]{4}',) * 8)
|
||||
ipv6 = dut.expect(ipv6_r, timeout=30)[0].decode()
|
||||
ipv6 = dut.expect(ipv6_r, timeout=60)[0].decode()
|
||||
print(f'Connected with IPv4={ipv4} and IPv6={ipv6}')
|
||||
|
||||
interface = get_my_interface_by_dest_ip(ipv4)
|
||||
# test IPv6
|
||||
with UdpServer(PORT, socket.AF_INET6):
|
||||
server_ip = get_host_ip6_by_dest_ip(ipv6, interface)
|
||||
print('Connect udp client to server IP={}'.format(server_ip))
|
||||
print(f'Connect udp client to server IP={server_ip}')
|
||||
for _ in range(MAX_RETRIES):
|
||||
try:
|
||||
dut.write(server_ip)
|
||||
|
||||
@@ -1,10 +1,10 @@
|
||||
# SPDX-FileCopyrightText: 2021-2022 Espressif Systems (Shanghai) CO LTD
|
||||
# SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
import logging
|
||||
|
||||
import pytest
|
||||
from common_test_methods import get_env_config_variable, get_my_interface_by_dest_ip
|
||||
from common_test_methods import get_env_config_variable
|
||||
from common_test_methods import get_my_interface_by_dest_ip
|
||||
from pytest_embedded import Dut
|
||||
|
||||
try:
|
||||
@@ -37,7 +37,7 @@ def test_examples_udp_server_ipv4(dut: Dut) -> None:
|
||||
ap_ssid = get_env_config_variable(env_name, 'ap_ssid')
|
||||
ap_password = get_env_config_variable(env_name, 'ap_password')
|
||||
dut.write(f'{ap_ssid} {ap_password}')
|
||||
ipv4 = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
|
||||
ipv4 = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=60)[1].decode()
|
||||
print(f'Connected with IPv4={ipv4}')
|
||||
|
||||
# test IPv4
|
||||
@@ -48,7 +48,7 @@ def test_examples_udp_server_ipv4(dut: Dut) -> None:
|
||||
print('OK')
|
||||
break
|
||||
else:
|
||||
raise ValueError('IPv4: Did not receive UDP message after {} retries'.format(MAX_RETRIES))
|
||||
raise ValueError(f'IPv4: Did not receive UDP message after {MAX_RETRIES} retries')
|
||||
dut.expect(MESSAGE)
|
||||
|
||||
|
||||
@@ -68,20 +68,20 @@ def test_examples_udp_server_ipv6(dut: Dut) -> None:
|
||||
ap_ssid = get_env_config_variable(env_name, 'ap_ssid')
|
||||
ap_password = get_env_config_variable(env_name, 'ap_password')
|
||||
dut.write(f'{ap_ssid} {ap_password}')
|
||||
ipv4 = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
|
||||
ipv4 = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=60)[1].decode()
|
||||
# expect all 8 octets from IPv6 (assumes it's printed in the long form)
|
||||
ipv6_r = r':'.join((r'[0-9a-fA-F]{4}',) * 8)
|
||||
ipv6 = dut.expect(ipv6_r, timeout=30)[0].decode()
|
||||
ipv6 = dut.expect(ipv6_r, timeout=60)[0].decode()
|
||||
print(f'Connected with IPv4={ipv4} and IPv6={ipv6}')
|
||||
|
||||
interface = get_my_interface_by_dest_ip(ipv4)
|
||||
# test IPv6
|
||||
for _ in range(MAX_RETRIES):
|
||||
print('Testing UDP on IPv6...')
|
||||
received = udp_client('{}%{}'.format(ipv6, interface), PORT, MESSAGE)
|
||||
received = udp_client(f'{ipv6}%{interface}', PORT, MESSAGE)
|
||||
if received == MESSAGE:
|
||||
print('OK')
|
||||
break
|
||||
else:
|
||||
raise ValueError('IPv6: Did not receive UDP message after {} retries'.format(MAX_RETRIES))
|
||||
raise ValueError(f'IPv6: Did not receive UDP message after {MAX_RETRIES} retries')
|
||||
dut.expect(MESSAGE)
|
||||
|
||||
@@ -13,7 +13,8 @@ from typing import Callable
|
||||
|
||||
import pexpect
|
||||
import pytest
|
||||
from common_test_methods import get_env_config_variable, get_host_ip4_by_dest_ip
|
||||
from common_test_methods import get_env_config_variable
|
||||
from common_test_methods import get_host_ip4_by_dest_ip
|
||||
from pytest_embedded import Dut
|
||||
from RangeHTTPServer import RangeRequestHandler
|
||||
|
||||
@@ -120,8 +121,8 @@ def test_examples_protocol_advanced_https_ota_example(dut: Dut) -> None:
|
||||
for _ in range(iterations):
|
||||
dut.expect('Loaded app from partition at offset', timeout=30)
|
||||
try:
|
||||
ip_address = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
|
||||
print('Connected to AP/Ethernet with IP: {}'.format(ip_address))
|
||||
ip_address = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=60)[1].decode()
|
||||
print(f'Connected to AP/Ethernet with IP: {ip_address}')
|
||||
except pexpect.exceptions.TIMEOUT:
|
||||
raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP/Ethernet')
|
||||
dut.expect('Starting Advanced OTA example', timeout=30)
|
||||
@@ -170,8 +171,8 @@ def test_examples_protocol_advanced_https_ota_example_truncated_bin(dut: Dut) ->
|
||||
# start test
|
||||
dut.expect('Loaded app from partition at offset', timeout=30)
|
||||
try:
|
||||
ip_address = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
|
||||
print('Connected to AP/Ethernet with IP: {}'.format(ip_address))
|
||||
ip_address = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=60)[1].decode()
|
||||
print(f'Connected to AP/Ethernet with IP: {ip_address}')
|
||||
except pexpect.exceptions.TIMEOUT:
|
||||
raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP/Ethernet')
|
||||
dut.expect('Starting Advanced OTA example', timeout=30)
|
||||
@@ -224,8 +225,8 @@ def test_examples_protocol_advanced_https_ota_example_truncated_header(dut: Dut)
|
||||
# start test
|
||||
dut.expect('Loaded app from partition at offset', timeout=30)
|
||||
try:
|
||||
ip_address = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
|
||||
print('Connected to AP/Ethernet with IP: {}'.format(ip_address))
|
||||
ip_address = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=60)[1].decode()
|
||||
print(f'Connected to AP/Ethernet with IP: {ip_address}')
|
||||
except pexpect.exceptions.TIMEOUT:
|
||||
raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP/Ethernet')
|
||||
host_ip = get_host_ip4_by_dest_ip(ip_address)
|
||||
@@ -278,8 +279,8 @@ def test_examples_protocol_advanced_https_ota_example_random(dut: Dut) -> None:
|
||||
# start test
|
||||
dut.expect('Loaded app from partition at offset', timeout=30)
|
||||
try:
|
||||
ip_address = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
|
||||
print('Connected to AP/Ethernet with IP: {}'.format(ip_address))
|
||||
ip_address = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=60)[1].decode()
|
||||
print(f'Connected to AP/Ethernet with IP: {ip_address}')
|
||||
except pexpect.exceptions.TIMEOUT:
|
||||
raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP/Ethernet')
|
||||
host_ip = get_host_ip4_by_dest_ip(ip_address)
|
||||
@@ -334,8 +335,8 @@ def test_examples_protocol_advanced_https_ota_example_invalid_chip_id(dut: Dut)
|
||||
# start test
|
||||
dut.expect('Loaded app from partition at offset', timeout=30)
|
||||
try:
|
||||
ip_address = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
|
||||
print('Connected to AP/Ethernet with IP: {}'.format(ip_address))
|
||||
ip_address = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=60)[1].decode()
|
||||
print(f'Connected to AP/Ethernet with IP: {ip_address}')
|
||||
except pexpect.exceptions.TIMEOUT:
|
||||
raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP/Ethernet')
|
||||
host_ip = get_host_ip4_by_dest_ip(ip_address)
|
||||
@@ -374,8 +375,8 @@ def test_examples_protocol_advanced_https_ota_example_chunked(dut: Dut) -> None:
|
||||
# start test
|
||||
dut.expect('Loaded app from partition at offset', timeout=30)
|
||||
try:
|
||||
ip_address = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
|
||||
print('Connected to AP/Ethernet with IP: {}'.format(ip_address))
|
||||
ip_address = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=60)[1].decode()
|
||||
print(f'Connected to AP/Ethernet with IP: {ip_address}')
|
||||
except pexpect.exceptions.TIMEOUT:
|
||||
raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP/Ethernet')
|
||||
host_ip = get_host_ip4_by_dest_ip(ip_address)
|
||||
@@ -415,8 +416,8 @@ def test_examples_protocol_advanced_https_ota_example_redirect_url(dut: Dut) ->
|
||||
# start test
|
||||
dut.expect('Loaded app from partition at offset', timeout=30)
|
||||
try:
|
||||
ip_address = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
|
||||
print('Connected to AP/Ethernet with IP: {}'.format(ip_address))
|
||||
ip_address = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=60)[1].decode()
|
||||
print(f'Connected to AP/Ethernet with IP: {ip_address}')
|
||||
except pexpect.exceptions.TIMEOUT:
|
||||
raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP/Ethernet')
|
||||
dut.expect('Starting Advanced OTA example', timeout=30)
|
||||
@@ -490,8 +491,8 @@ def test_examples_protocol_advanced_https_ota_example_anti_rollback(dut: Dut) ->
|
||||
# Positive Case
|
||||
dut.expect('Loaded app from partition at offset', timeout=30)
|
||||
try:
|
||||
ip_address = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
|
||||
print('Connected to AP/Ethernet with IP: {}'.format(ip_address))
|
||||
ip_address = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=60)[1].decode()
|
||||
print(f'Connected to AP/Ethernet with IP: {ip_address}')
|
||||
except pexpect.exceptions.TIMEOUT:
|
||||
raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP/Ethernet')
|
||||
host_ip = get_host_ip4_by_dest_ip(ip_address)
|
||||
@@ -501,7 +502,7 @@ def test_examples_protocol_advanced_https_ota_example_anti_rollback(dut: Dut) ->
|
||||
print('writing to device: {}'.format('https://' + host_ip + ':' + str(server_port) + '/' + bin_name))
|
||||
dut.write('https://' + host_ip + ':' + str(server_port) + '/' + bin_name)
|
||||
dut.expect('Loaded app from partition at offset', timeout=60)
|
||||
dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
|
||||
dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=60)[1].decode()
|
||||
dut.expect(r'App is valid, rollback cancelled successfully', timeout=30)
|
||||
|
||||
# Negative Case
|
||||
@@ -549,8 +550,8 @@ def test_examples_protocol_advanced_https_ota_example_partial_request(dut: Dut)
|
||||
# start test
|
||||
dut.expect('Loaded app from partition at offset', timeout=30)
|
||||
try:
|
||||
ip_address = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
|
||||
print('Connected to AP/Ethernet with IP: {}'.format(ip_address))
|
||||
ip_address = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=60)[1].decode()
|
||||
print(f'Connected to AP/Ethernet with IP: {ip_address}')
|
||||
except pexpect.exceptions.TIMEOUT:
|
||||
raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
|
||||
host_ip = get_host_ip4_by_dest_ip(ip_address)
|
||||
@@ -601,8 +602,8 @@ def test_examples_protocol_advanced_https_ota_example_nimble_gatts(dut: Dut) ->
|
||||
ap_password = get_env_config_variable(env_name, 'ap_password')
|
||||
dut.write(f'{ap_ssid} {ap_password}')
|
||||
try:
|
||||
ip_address = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
|
||||
print('Connected to AP with IP: {}'.format(ip_address))
|
||||
ip_address = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=60)[1].decode()
|
||||
print(f'Connected to AP with IP: {ip_address}')
|
||||
except pexpect.exceptions.TIMEOUT:
|
||||
raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
|
||||
host_ip = get_host_ip4_by_dest_ip(ip_address)
|
||||
@@ -653,8 +654,8 @@ def test_examples_protocol_advanced_https_ota_example_bluedroid_gatts(dut: Dut)
|
||||
ap_password = get_env_config_variable(env_name, 'ap_password')
|
||||
dut.write(f'{ap_ssid} {ap_password}')
|
||||
try:
|
||||
ip_address = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
|
||||
print('Connected to AP with IP: {}'.format(ip_address))
|
||||
ip_address = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=60)[1].decode()
|
||||
print(f'Connected to AP with IP: {ip_address}')
|
||||
except pexpect.exceptions.TIMEOUT:
|
||||
raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
|
||||
host_ip = get_host_ip4_by_dest_ip(ip_address)
|
||||
@@ -707,8 +708,8 @@ def test_examples_protocol_advanced_https_ota_example_openssl_aligned_bin(dut: D
|
||||
# start test
|
||||
dut.expect('Loaded app from partition at offset', timeout=30)
|
||||
try:
|
||||
ip_address = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
|
||||
print('Connected to AP/Ethernet with IP: {}'.format(ip_address))
|
||||
ip_address = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=60)[1].decode()
|
||||
print(f'Connected to AP/Ethernet with IP: {ip_address}')
|
||||
except pexpect.exceptions.TIMEOUT:
|
||||
raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP/Ethernet')
|
||||
host_ip = get_host_ip4_by_dest_ip(ip_address)
|
||||
|
||||
@@ -1,16 +1,19 @@
|
||||
# SPDX-FileCopyrightText: 2022-2023 Espressif Systems (Shanghai) CO LTD
|
||||
# SPDX-License-Identifier: Unlicense OR CC0-1.0
|
||||
import http.server
|
||||
import logging
|
||||
import multiprocessing
|
||||
import os
|
||||
import ssl
|
||||
import subprocess
|
||||
import sys
|
||||
from typing import Optional
|
||||
from typing import Tuple
|
||||
|
||||
import pexpect
|
||||
import pytest
|
||||
from common_test_methods import get_env_config_variable, get_host_ip4_by_dest_ip
|
||||
from common_test_methods import get_env_config_variable
|
||||
from common_test_methods import get_host_ip4_by_dest_ip
|
||||
from pytest_embedded import Dut
|
||||
|
||||
server_cert = '-----BEGIN CERTIFICATE-----\n' \
|
||||
@@ -64,7 +67,13 @@ server_key = '-----BEGIN PRIVATE KEY-----\n'\
|
||||
'-----END PRIVATE KEY-----\n'
|
||||
|
||||
|
||||
def start_https_server(ota_image_dir: str, server_ip: str, server_port: int, server_file: str = None, key_file: str = None) -> None:
|
||||
def start_https_server(
|
||||
ota_image_dir: str,
|
||||
server_ip: str,
|
||||
server_port: int,
|
||||
server_file: Optional[str] = None,
|
||||
key_file: Optional[str] = None,
|
||||
) -> None:
|
||||
os.chdir(ota_image_dir)
|
||||
|
||||
if server_file is None:
|
||||
@@ -104,12 +113,12 @@ def start_tls1_3_server(ota_image_dir: str, server_port: int) -> subprocess.Pope
|
||||
|
||||
|
||||
def check_sha256(sha256_expected: str, sha256_reported: str) -> None:
|
||||
print('sha256_expected: %s' % (sha256_expected))
|
||||
print('sha256_reported: %s' % (sha256_reported))
|
||||
logging.info('sha256_expected: %s', sha256_expected)
|
||||
logging.info('sha256_reported: %s', sha256_reported)
|
||||
if sha256_expected not in sha256_reported:
|
||||
raise ValueError('SHA256 mismatch')
|
||||
else:
|
||||
print('SHA256 expected and reported are the same')
|
||||
logging.info('SHA256 expected and reported are the same')
|
||||
|
||||
|
||||
def calc_all_sha256(dut: Dut) -> Tuple[str, str]:
|
||||
|
||||
@@ -30,7 +30,7 @@ def _run_test(dut: Dut) -> None:
|
||||
pass
|
||||
|
||||
try:
|
||||
dut.expect(r'got ip: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)
|
||||
dut.expect(r'got ip: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=60)
|
||||
log_after_got_ip = dut.expect(pexpect.TIMEOUT, timeout=10).decode()
|
||||
if any(s in log_after_got_ip for s in bad_event_str):
|
||||
logging.info('Abnormal connection log:')
|
||||
|
||||
Reference in New Issue
Block a user