diff --git a/examples/protocols/esp_local_ctrl/pytest_esp_local_ctrl.py b/examples/protocols/esp_local_ctrl/pytest_esp_local_ctrl.py index bfd59b66cb..da8cf1d557 100644 --- a/examples/protocols/esp_local_ctrl/pytest_esp_local_ctrl.py +++ b/examples/protocols/esp_local_ctrl/pytest_esp_local_ctrl.py @@ -18,12 +18,12 @@ def get_sdk_path() -> str: return idf_path -class CustomProcess(object): - def __init__(self, cmd: str, logfile: str, verbose:bool =True) -> None: +class CustomProcess: + def __init__(self, cmd: str, logfile: str, verbose: bool = True) -> None: self.verbose = verbose self.f = open(logfile, 'w', encoding='utf-8') if self.verbose: - logging.info('Starting {} > {}'.format(cmd, self.f.name)) + logging.info(f'Starting {cmd} > {self.f.name}') self.pexpect_proc = pexpect.spawn(cmd, timeout=60, logfile=self.f, encoding='utf-8', codec_errors='ignore') def __enter__(self): # type: ignore @@ -32,7 +32,7 @@ class CustomProcess(object): def close(self) -> None: self.pexpect_proc.terminate(force=True) - def __exit__(self, type, value, traceback): # type: ignore + def __exit__(self, type, value, traceback): # type: ignore # noqa self.close() self.f.close() @@ -60,7 +60,7 @@ def test_examples_esp_local_ctrl(config: str, dut: Dut) -> None: ap_ssid = get_env_config_variable(env_name, 'ap_ssid') ap_password = get_env_config_variable(env_name, 'ap_password') dut.write(f'{ap_ssid} {ap_password}') - dut_ip = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]')[1].decode() + dut_ip = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=60)[1].decode() if config == 'default': dut.expect('esp_https_server: Starting server') dut.expect('esp_https_server: Server listening on port 443') @@ -75,20 +75,32 @@ def test_examples_esp_local_ctrl(config: str, dut: Dut) -> None: # Running mDNS services in docker is not a trivial task. Therefore, the script won't connect to the host name but # to IP address. However, the certificates were generated for the host name and will be rejected. if config == 'default': - cmd = ' '.join([sys.executable, os.path.join(idf_path, rel_project_path, 'scripts/esp_local_ctrl.py'), - '--sec_ver 2', - '--sec2_username wifiprov', - '--sec2_pwd abcd1234', - '--name', dut_ip, - '--dont-check-hostname']) # don't reject the certificate because of the hostname + cmd = ' '.join( + [ + sys.executable, + os.path.join(idf_path, rel_project_path, 'scripts/esp_local_ctrl.py'), + '--sec_ver 2', + '--sec2_username wifiprov', + '--sec2_pwd abcd1234', + '--name', + dut_ip, + '--dont-check-hostname', + ] + ) # don't reject the certificate because of the hostname elif config == 'http': - cmd = ' '.join([sys.executable, os.path.join(idf_path, rel_project_path, 'scripts/esp_local_ctrl.py'), - '--sec_ver 2', - '--transport http', - '--sec2_username wifiprov', - '--sec2_pwd abcd1234', - '--name', dut_ip, - '--dont-check-hostname']) + cmd = ' '.join( + [ + sys.executable, + os.path.join(idf_path, rel_project_path, 'scripts/esp_local_ctrl.py'), + '--sec_ver 2', + '--transport http', + '--sec2_username wifiprov', + '--sec2_pwd abcd1234', + '--name', + dut_ip, + '--dont-check-hostname', + ] + ) esp_local_ctrl_log = os.path.join(idf_path, rel_project_path, 'esp_local_ctrl.log') with CustomProcess(cmd, esp_local_ctrl_log) as ctrl_py: @@ -97,15 +109,15 @@ def test_examples_esp_local_ctrl(config: str, dut: Dut) -> None: ctrl_py.pexpect_proc.expect_exact('==== Available Properties ====') ctrl_py.pexpect_proc.expect(re.compile(r'S.N. Name\s+Type\s+Flags\s+Value')) ctrl_py.pexpect_proc.expect(re.compile(r'\[ 1\] timestamp \(us\)\s+TIME\(us\)\s+Read-Only\s+\d+')) - ctrl_py.pexpect_proc.expect(re.compile(r'\[ 2\] property1\s+INT32\s+{}'.format(prop1))) + ctrl_py.pexpect_proc.expect(re.compile(rf'\[ 2\] property1\s+INT32\s+{prop1}')) ctrl_py.pexpect_proc.expect(re.compile(r'\[ 3\] property2\s+BOOLEAN\s+Read-Only\s+(True)|(False)')) - ctrl_py.pexpect_proc.expect(re.compile(r'\[ 4\] property3\s+STRING\s+{}'.format(prop3))) - ctrl_py.pexpect_proc.expect_exact('Select properties to set (0 to re-read, \'q\' to quit) :') + ctrl_py.pexpect_proc.expect(re.compile(rf'\[ 4\] property3\s+STRING\s+{prop3}')) + ctrl_py.pexpect_proc.expect_exact("Select properties to set (0 to re-read, 'q' to quit) :") property1 = 123456789 property3 = '' - ctrl_py.pexpect_proc.expect_exact('Connecting to {}'.format(dut_ip)) + ctrl_py.pexpect_proc.expect_exact(f'Connecting to {dut_ip}') if config == 'default': dut.expect('esp_https_server: performing session handshake', timeout=60) expect_properties(property1, property3) @@ -121,14 +133,14 @@ def test_examples_esp_local_ctrl(config: str, dut: Dut) -> None: ctrl_py.pexpect_proc.sendline('2') ctrl_py.pexpect_proc.expect_exact('Enter value to set for property (property1) :') ctrl_py.pexpect_proc.sendline(str(property1)) - dut.expect_exact('control: Setting property1 value to {}'.format(property1)) + dut.expect_exact(f'control: Setting property1 value to {property1}') expect_properties(property1, property3) property3 = 'test' ctrl_py.pexpect_proc.sendline('4') ctrl_py.pexpect_proc.expect_exact('Enter value to set for property (property3) :') ctrl_py.pexpect_proc.sendline(property3) - dut.expect_exact('control: Setting property3 value to {}'.format(property3)) + dut.expect_exact(f'control: Setting property3 value to {property3}') expect_properties(property1, property3) ctrl_py.pexpect_proc.sendline('q') diff --git a/examples/protocols/http_server/advanced_tests/pytest_http_server_advanced.py b/examples/protocols/http_server/advanced_tests/pytest_http_server_advanced.py index 79f031fe4f..02dd32b052 100644 --- a/examples/protocols/http_server/advanced_tests/pytest_http_server_advanced.py +++ b/examples/protocols/http_server/advanced_tests/pytest_http_server_advanced.py @@ -2,9 +2,6 @@ # # SPDX-FileCopyrightText: 2018-2022 Espressif Systems (Shanghai) CO LTD # SPDX-License-Identifier: Apache-2.0 - -from __future__ import division, print_function, unicode_literals - import logging import os import sys @@ -41,7 +38,7 @@ def test_examples_protocol_http_server_advanced(dut: Dut) -> None: # Get binary file binary_file = os.path.join(dut.app.binary_path, 'tests.bin') bin_size = os.path.getsize(binary_file) - logging.info('http_server_bin_size : {}KB'.format(bin_size // 1024)) + logging.info(f'http_server_bin_size : {bin_size // 1024}KB') logging.info('Starting http_server advanced test app') @@ -53,7 +50,7 @@ def test_examples_protocol_http_server_advanced(dut: Dut) -> None: ap_ssid = get_env_config_variable(env_name, 'ap_ssid') ap_password = get_env_config_variable(env_name, 'ap_password') dut.write(f'{ap_ssid} {ap_password}') - got_ip = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode() + got_ip = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=60)[1].decode() got_port = dut.expect(r"(?:[\s\S]*)Started HTTP server on port: '(\d+)'", timeout=30)[1].decode() @@ -66,8 +63,8 @@ def test_examples_protocol_http_server_advanced(dut: Dut) -> None: max_uri_len = int(result[4]) max_stack_size = int(result[5]) - logging.info('Got Port : {}'.format(got_port)) - logging.info('Got IP : {}'.format(got_ip)) + logging.info(f'Got Port : {got_port}') + logging.info(f'Got IP : {got_ip}') # Run test script # If failed raise appropriate exception diff --git a/examples/protocols/http_server/file_serving/pytest_http_server_file_serving.py b/examples/protocols/http_server/file_serving/pytest_http_server_file_serving.py index d8d71396fe..22cd87c446 100644 --- a/examples/protocols/http_server/file_serving/pytest_http_server_file_serving.py +++ b/examples/protocols/http_server/file_serving/pytest_http_server_file_serving.py @@ -28,7 +28,7 @@ def test_examples_protocol_http_server_file_serving(dut: Dut) -> None: # Get binary file binary_file = os.path.join(dut.app.binary_path, 'file_server.bin') bin_size = os.path.getsize(binary_file) - logging.info('file_server_bin_size : {}KB'.format(bin_size // 1024)) + logging.info(f'file_server_bin_size : {bin_size // 1024}KB') logging.info('Erasing the storage partition on the chip') dut.serial.erase_partition('storage') # Upload binary and start testing @@ -37,11 +37,11 @@ def test_examples_protocol_http_server_file_serving(dut: Dut) -> None: dut.expect('Initializing SPIFFS', timeout=60) # Parse IP address of STA logging.info('Waiting to connect with AP') - got_ip = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode() + got_ip = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=60)[1].decode() # Expected logs got_port = dut.expect(r"Starting HTTP Server on port: '(\d+)'", timeout=30)[1].decode() - logging.info('Got IP : {}'.format(got_ip)) - logging.info('Got Port : {}'.format(got_port)) + logging.info(f'Got IP : {got_ip}') + logging.info(f'Got Port : {got_port}') # Run test script conn = client.start_session(got_ip, got_port) diff --git a/examples/protocols/http_server/persistent_sockets/pytest_http_server_persistence.py b/examples/protocols/http_server/persistent_sockets/pytest_http_server_persistence.py index 9d19d77f20..f29cb78f65 100644 --- a/examples/protocols/http_server/persistent_sockets/pytest_http_server_persistence.py +++ b/examples/protocols/http_server/persistent_sockets/pytest_http_server_persistence.py @@ -32,10 +32,10 @@ def test_examples_protocol_http_server_persistence(dut: Dut) -> None: # Get binary file binary_file = os.path.join(dut.app.binary_path, 'persistent_sockets.bin') bin_size = os.path.getsize(binary_file) - logging.info('http_server_bin_size : {}KB'.format(bin_size // 1024)) + logging.info(f'http_server_bin_size : {bin_size // 1024}KB') # Upload binary and start testing - logging.info('Starting http_server persistance test app') + logging.info('Starting http_server persistence test app') # Parse IP address of STA logging.info('Waiting to connect with AP') @@ -45,11 +45,11 @@ def test_examples_protocol_http_server_persistence(dut: Dut) -> None: ap_ssid = get_env_config_variable(env_name, 'ap_ssid') ap_password = get_env_config_variable(env_name, 'ap_password') dut.write(f'{ap_ssid} {ap_password}') - got_ip = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode() + got_ip = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=60)[1].decode() got_port = dut.expect(r"(?:[\s\S]*)Starting server on port: '(\d+)'", timeout=30)[1].decode() - logging.info('Got IP : {}'.format(got_ip)) - logging.info('Got Port : {}'.format(got_port)) + logging.info(f'Got IP : {got_ip}') + logging.info(f'Got Port : {got_port}') # Expected Logs dut.expect('Registering URI handlers', timeout=30) @@ -78,8 +78,8 @@ def test_examples_protocol_http_server_persistence(dut: Dut) -> None: dut.expect('PUT allocating new session', timeout=30) # Retest PUT request and change session context value - num = random.randint(0,100) - logging.info('Adding: {}'.format(num)) + num = random.randint(0, 100) + logging.info(f'Adding: {num}') client.putreq(conn, '/adder', str(num)) visitor += 1 adder += num @@ -97,7 +97,7 @@ def test_examples_protocol_http_server_persistence(dut: Dut) -> None: # Test POST request and session persistence random_nums = [random.randint(0,100) for _ in range(100)] for num in random_nums: - logging.info('Adding: {}'.format(num)) + logging.info(f'Adding: {num}') client.postreq(conn, '/adder', str(num)) visitor += 1 adder += num @@ -105,7 +105,7 @@ def test_examples_protocol_http_server_persistence(dut: Dut) -> None: dut.expect('/adder handler read ' + str(num), timeout=30) # Test GET request and session persistence - logging.info('Matching final sum: {}'.format(adder)) + logging.info(f'Matching final sum: {adder}') if client.getreq(conn, '/adder').decode() != str(adder): raise RuntimeError visitor += 1 diff --git a/examples/protocols/http_server/simple/pytest_http_server_simple.py b/examples/protocols/http_server/simple/pytest_http_server_simple.py index 0827bdb60e..4762e0ae82 100644 --- a/examples/protocols/http_server/simple/pytest_http_server_simple.py +++ b/examples/protocols/http_server/simple/pytest_http_server_simple.py @@ -41,13 +41,13 @@ class http_client_thread(threading.Thread): def run(self) -> None: try: self.open_connection(self.ip, self.port, self.delay) - except socket.timeout: + except TimeoutError: self.exc = 1 def join(self, timeout=None): # type: ignore threading.Thread.join(self) if self.exc: - raise socket.timeout + raise TimeoutError # When running on local machine execute the following before running this script @@ -64,7 +64,7 @@ def test_examples_protocol_http_server_simple(dut: Dut) -> None: # Get binary file binary_file = os.path.join(dut.app.binary_path, 'simple.bin') bin_size = os.path.getsize(binary_file) - logging.info('http_server_bin_size : {}KB'.format(bin_size // 1024)) + logging.info(f'http_server_bin_size : {bin_size // 1024}KB') # Upload binary and start testing logging.info('Starting http_server simple test app') @@ -77,11 +77,11 @@ def test_examples_protocol_http_server_simple(dut: Dut) -> None: ap_ssid = get_env_config_variable(env_name, 'ap_ssid') ap_password = get_env_config_variable(env_name, 'ap_password') dut.write(' '.join([ap_ssid, ap_password])) - got_ip = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode() + got_ip = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=60)[1].decode() got_port = dut.expect(r"(?:[\s\S]*)Starting server on port: '(\d+)'", timeout=30)[1].decode() - logging.info('Got IP : {}'.format(got_ip)) - logging.info('Got Port : {}'.format(got_port)) + logging.info(f'Got IP : {got_ip}') + logging.info(f'Got Port : {got_port}') # Expected Logs dut.expect('Registering URI handlers', timeout=30) @@ -138,7 +138,7 @@ def test_examples_protocol_http_server_lru_purge_enable(dut: Dut) -> None: # Get binary file binary_file = os.path.join(dut.app.binary_path, 'simple.bin') bin_size = os.path.getsize(binary_file) - logging.info('http_server_bin_size : {}KB'.format(bin_size // 1024)) + logging.info(f'http_server_bin_size : {bin_size // 1024}KB') # Upload binary and start testing logging.info('Starting http_server simple test app') @@ -151,11 +151,11 @@ def test_examples_protocol_http_server_lru_purge_enable(dut: Dut) -> None: ap_ssid = get_env_config_variable(env_name, 'ap_ssid') ap_password = get_env_config_variable(env_name, 'ap_password') dut.write(f'{ap_ssid} {ap_password}') - got_ip = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode() + got_ip = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=60)[1].decode() got_port = dut.expect(r"(?:[\s\S]*)Starting server on port: '(\d+)'", timeout=30)[1].decode() - logging.info('Got IP : {}'.format(got_ip)) - logging.info('Got Port : {}'.format(got_port)) + logging.info(f'Got IP : {got_ip}') + logging.info(f'Got Port : {got_port}') # Expected Logs dut.expect('Registering URI handlers', timeout=30) @@ -167,7 +167,7 @@ def test_examples_protocol_http_server_lru_purge_enable(dut: Dut) -> None: thread.start() threads.append(thread) except OSError as err: - logging.info('Error: unable to start thread, {}'.format(err)) + logging.info(f'Error: unable to start thread, {err}') for t in threads: t.join() diff --git a/examples/protocols/http_server/ws_echo_server/pytest_ws_server_example.py b/examples/protocols/http_server/ws_echo_server/pytest_ws_server_example.py index 9dd1bf4c49..466eb93b0a 100644 --- a/examples/protocols/http_server/ws_echo_server/pytest_ws_server_example.py +++ b/examples/protocols/http_server/ws_echo_server/pytest_ws_server_example.py @@ -2,9 +2,6 @@ # # SPDX-FileCopyrightText: 2021-2022 Espressif Systems (Shanghai) CO LTD # SPDX-License-Identifier: Apache-2.0 - -from __future__ import division, print_function, unicode_literals - import logging import os @@ -30,8 +27,8 @@ class WsClient: self.ip = ip self.ws = websocket.WebSocket() - def __enter__(self): # type: ignore - self.ws.connect('ws://{}:{}/ws'.format(self.ip, self.port)) + def __enter__(self): # type: ignore + self.ws.connect(f'ws://{self.ip}:{self.port}/{self.uri}') return self def __exit__(self, exc_type, exc_value, traceback): # type: ignore @@ -54,7 +51,7 @@ def test_examples_protocol_http_ws_echo_server(dut: Dut) -> None: # Get binary file binary_file = os.path.join(dut.app.binary_path, 'ws_echo_server.bin') bin_size = os.path.getsize(binary_file) - logging.info('http_ws_server_bin_size : {}KB'.format(bin_size // 1024)) + logging.info(f'http_ws_server_bin_size : {bin_size // 1024}KB') logging.info('Starting ws-echo-server test app based on http_server') @@ -66,11 +63,11 @@ def test_examples_protocol_http_ws_echo_server(dut: Dut) -> None: ap_ssid = get_env_config_variable(env_name, 'ap_ssid') ap_password = get_env_config_variable(env_name, 'ap_password') dut.write(f'{ap_ssid} {ap_password}') - got_ip = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode() + got_ip = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=60)[1].decode() got_port = dut.expect(r"Starting server on port: '(\d+)'", timeout=30)[1].decode() - logging.info('Got IP : {}'.format(got_ip)) - logging.info('Got Port : {}'.format(got_port)) + logging.info(f'Got IP : {got_ip}') + logging.info(f'Got Port : {got_port}') # Start ws server test with WsClient(got_ip, int(got_port)) as ws: @@ -78,21 +75,21 @@ def test_examples_protocol_http_ws_echo_server(dut: Dut) -> None: for expected_opcode in [OPCODE_TEXT, OPCODE_BIN, OPCODE_PING]: ws.write(data=DATA, opcode=expected_opcode) opcode, data = ws.read() - logging.info('Testing opcode {}: Received opcode:{}, data:{}'.format(expected_opcode, opcode, data)) + logging.info(f'Testing opcode {expected_opcode}: Received opcode:{opcode}, data:{data}') data = data.decode() if expected_opcode == OPCODE_PING: dut.expect('Got a WS PING frame, Replying PONG') if opcode != OPCODE_PONG or data != DATA: - raise RuntimeError('Failed to receive correct opcode:{} or data:{}'.format(opcode, data)) + raise RuntimeError(f'Failed to receive correct opcode:{opcode} or data:{data}') continue dut_data = dut.expect(r'Got packet with message: ([A-Za-z0-9_]*)')[1] dut_opcode = dut.expect(r'Packet type: ([0-9]*)')[1].decode() if opcode != expected_opcode or data != DATA or opcode != int(dut_opcode) or (data not in str(dut_data)): - raise RuntimeError('Failed to receive correct opcode:{} or data:{}'.format(opcode, data)) + raise RuntimeError(f'Failed to receive correct opcode:{opcode} or data:{data}') ws.write(data='Trigger async', opcode=OPCODE_TEXT) opcode, data = ws.read() - logging.info('Testing async send: Received opcode:{}, data:{}'.format(opcode, data)) + logging.info(f'Testing async send: Received opcode:{opcode}, data:{data}') data = data.decode() if opcode != OPCODE_TEXT or data != 'Async data': raise RuntimeError('Failed to receive correct opcode:{} or data:{}'.format(opcode, data)) diff --git a/examples/protocols/https_server/simple/pytest_https_server_simple.py b/examples/protocols/https_server/simple/pytest_https_server_simple.py index 681fcf465e..0a5a135795 100644 --- a/examples/protocols/https_server/simple/pytest_https_server_simple.py +++ b/examples/protocols/https_server/simple/pytest_https_server_simple.py @@ -2,7 +2,6 @@ # # SPDX-FileCopyrightText: 2021-2022 Espressif Systems (Shanghai) CO LTD # SPDX-License-Identifier: Apache-2.0 - import http.client import logging import os @@ -119,7 +118,7 @@ def test_examples_protocol_https_server_simple(dut: Dut) -> None: # Parse IP address and port of the server dut.expect(r'Starting server') got_port = int(dut.expect(r'Server listening on port (\d+)', timeout=30)[1].decode()) - got_ip = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode() + got_ip = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=60)[1].decode() # Expected logs @@ -191,7 +190,7 @@ def test_examples_protocol_https_server_simple_dynamic_buffers(dut: Dut) -> None # Parse IP address and port of the server dut.expect(r'Starting server') got_port = int(dut.expect(r'Server listening on port (\d+)', timeout=30)[1].decode()) - got_ip = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode() + got_ip = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=60)[1].decode() # Expected logs diff --git a/examples/protocols/https_server/wss_server/pytest_https_wss_server.py b/examples/protocols/https_server/wss_server/pytest_https_wss_server.py index 9f6dd4de75..3bcc4fb2df 100644 --- a/examples/protocols/https_server/wss_server/pytest_https_wss_server.py +++ b/examples/protocols/https_server/wss_server/pytest_https_wss_server.py @@ -1,16 +1,13 @@ #!/usr/bin/env python # -# SPDX-FileCopyrightText: 2021-2022 Espressif Systems (Shanghai) CO LTD +# SPDX-FileCopyrightText: 2021-2026 Espressif Systems (Shanghai) CO LTD # SPDX-License-Identifier: Apache-2.0 - -from __future__ import division, print_function, unicode_literals - import logging import os import threading import time -from types import TracebackType -from typing import Any, Optional +from typing import Any +from typing import Optional import pytest import websocket @@ -30,20 +27,20 @@ class WsClient: self.ip = ip sslopt = {'ca_certs':ca_file, 'check_hostname': False} self.ws = websocket.WebSocket(sslopt=sslopt) - # Set timeout to 10 seconds to avoid conection failure at the time of handshake + # Set timeout to 10 seconds to avoid connection failure at the time of handshake self.ws.settimeout(10) def __enter__(self): # type: ignore - self.ws.connect('wss://{}:{}/ws'.format(self.ip, self.port)) + self.ws.connect(f'wss://{self.ip}:{self.port}/ws') return self - def __exit__(self, exc_type, exc_value, traceback): # type: (type, RuntimeError, TracebackType) -> None + def __exit__(self, exc_type, exc_value, traceback): # type: ignore self.ws.close() - def read(self): # type: () -> Any + def read(self) -> Any: return self.ws.recv_data(control_frame=True) - def write(self, data, opcode=OPCODE_TEXT): # type: (str, int) -> Any + def write(self, data: str, opcode: int = OPCODE_TEXT) -> Any: if opcode == OPCODE_PING: return self.ws.ping(data) if opcode == OPCODE_PONG: @@ -62,7 +59,7 @@ class wss_client_thread(threading.Thread): self.data = 'Espressif' self.async_response = False - def run(self): # type: () -> None + def run(self) -> None: with WsClient(self.ip, self.port, self.ca_file) as ws: while True: try: @@ -74,7 +71,7 @@ class wss_client_thread(threading.Thread): if opcode == OPCODE_TEXT: if data == CORRECT_ASYNC_DATA: self.async_response = True - logging.info('Thread {} obtained correct async message'.format(self.name)) + logging.info(f'Thread {self.name} obtained correct async message') # Keep sending pong to update the keepalive in the server if (time.time() - self.start_time) > 20: break @@ -84,7 +81,7 @@ class wss_client_thread(threading.Thread): if self.async_response is not True: self.exc = RuntimeError('Failed to obtain correct async data') # type: ignore - def join(self, timeout=0): # type:(Optional[float]) -> None + def join(self, timeout: Optional[float] = 0) -> None: threading.Thread.join(self) if self.exc: raise self.exc @@ -113,7 +110,7 @@ def test_examples_protocol_https_wss_server(dut: Dut) -> None: # Get binary file binary_file = os.path.join(dut.app.binary_path, 'wss_server.bin') bin_size = os.path.getsize(binary_file) - logging.info('https_wss_server_bin_size : {}KB'.format(bin_size // 1024)) + logging.info(f'https_wss_server_bin_size : {bin_size // 1024}KB') logging.info('Starting wss_server test app') @@ -126,10 +123,10 @@ def test_examples_protocol_https_wss_server(dut: Dut) -> None: dut.write(f'{ap_ssid} {ap_password}') # Parse IP address of STA got_port = int(dut.expect(r'Server listening on port (\d+)', timeout=30)[1].decode()) - got_ip = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode() + got_ip = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=60)[1].decode() - logging.info('Got IP : {}'.format(got_ip)) - logging.info('Got Port : {}'.format(got_port)) + logging.info(f'Got IP : {got_ip}') + logging.info(f'Got Port : {got_port}') ca_file = os.path.join(os.path.dirname(__file__), 'main', 'certs', 'servercert.pem') # Start ws server test @@ -139,11 +136,11 @@ def test_examples_protocol_https_wss_server(dut: Dut) -> None: dut.expect('performing session handshake') client_fd = int(dut.expect(r'New client connected (\d+)', timeout=30)[1].decode()) ws.write(data=DATA, opcode=OPCODE_TEXT) - dut.expect(r'Received packet with message: {}'.format(DATA)) + dut.expect(rf'Received packet with message: {DATA}') opcode, data = ws.read() data = data.decode('UTF-8') if data != DATA: - raise RuntimeError(f'Failed to receive the correct echo response.') + raise RuntimeError('Failed to receive the correct echo response.') logging.info('Correct echo response obtained from the wss server') # Test for PING @@ -172,11 +169,14 @@ def test_examples_protocol_https_wss_server(dut: Dut) -> None: logging.info('Failed the test for keep alive,\nthe client got abruptly disconnected') raise - # keepalive timeout is 10 seconds so do not respond for (10 + 1) senconds - logging.info('Testing if client is disconnected if it does not respond for 10s i.e. keep_alive timeout (approx time = 11s)') + # keepalive timeout is 10 seconds so do not respond for (10 + 1) seconds + logging.info( + 'Testing if client is disconnected if it does not respond for 10s ' + 'i.e. keep_alive timeout (approx time = 11s)' + ) try: - dut.expect('Client not alive, closing fd {}'.format(client_fd), timeout=20) - dut.expect('Client disconnected {}'.format(client_fd)) + dut.expect(f'Client not alive, closing fd {client_fd}', timeout=20) + dut.expect(f'Client disconnected {client_fd}') except Exception: logging.info('ENV_ERROR:Failed the test for keep alive,\nthe connection was not closed after timeout') diff --git a/examples/protocols/icmp_echo/pytest_icmp_echo.py b/examples/protocols/icmp_echo/pytest_icmp_echo.py index 6496613f45..b5ddbc5694 100644 --- a/examples/protocols/icmp_echo/pytest_icmp_echo.py +++ b/examples/protocols/icmp_echo/pytest_icmp_echo.py @@ -71,7 +71,7 @@ def test_protocols_icmp_echo_ipv6_only(dut: Dut) -> None: dut.write(f'{ap_ssid} {ap_password}') # expect all 8 octets from IPv6 (assumes it's printed in the long form) ipv6_r = r':'.join((r'[0-9a-fA-F]{4}',) * 8) - ipv6 = dut.expect(ipv6_r, timeout=30)[0].decode() + ipv6 = dut.expect(ipv6_r, timeout=60)[0].decode() logging.info(f'Connected AP with IPv6={ipv6}') interface_nr = dut.expect(r'Connected on interface: [a-z]{2}\d \((\d+)\)', timeout=30)[1].decode() diff --git a/examples/protocols/sntp/pytest_sntp.py b/examples/protocols/sntp/pytest_sntp.py index 6191fd7ae7..3684c44285 100644 --- a/examples/protocols/sntp/pytest_sntp.py +++ b/examples/protocols/sntp/pytest_sntp.py @@ -1,9 +1,8 @@ # SPDX-FileCopyrightText: 2021-2022 Espressif Systems (Shanghai) CO LTD # SPDX-License-Identifier: Apache-2.0 - import datetime import logging -from typing import Any, Tuple +from typing import Any import pytest from common_test_methods import get_env_config_variable @@ -20,7 +19,7 @@ def test_get_time_from_sntp_server(dut: Dut) -> None: ap_ssid = get_env_config_variable(env_name, 'ap_ssid') ap_password = get_env_config_variable(env_name, 'ap_password') dut.write(f'{ap_ssid} {ap_password}') - dut.expect('IPv4 address:') + dut.expect('IPv4 address:', timeout=60) dut.expect('Initializing and starting SNTP') dut.expect('Notification of a time synchronization event') @@ -31,11 +30,11 @@ def test_get_time_from_sntp_server(dut: Dut) -> None: NY_time = None SH_time = None - def check_time(prev_NY_time: Any, prev_SH_time: Any) -> Tuple[Any, Any]: - NY_str = dut.expect(r'The current date/time in New York is: ({})'.format(TIME_FORMAT_REGEX))[1].decode() - SH_str = dut.expect(r'The current date/time in Shanghai is: ({})'.format(TIME_FORMAT_REGEX))[1].decode() - logging.info('New York: "{}"'.format(NY_str)) - logging.info('Shanghai: "{}"'.format(SH_str)) + def check_time(prev_NY_time: Any, prev_SH_time: Any) -> tuple[Any, Any]: + NY_str = dut.expect(rf'The current date/time in New York is: ({TIME_FORMAT_REGEX})')[1].decode() + SH_str = dut.expect(rf'The current date/time in Shanghai is: ({TIME_FORMAT_REGEX})')[1].decode() + logging.info(f'New York: "{NY_str}"') + logging.info(f'Shanghai: "{SH_str}"') dut.expect('Entering deep sleep for 10 seconds') logging.info('Sleeping...') new_NY_time = datetime.datetime.strptime(NY_str, TIME_FORMAT) @@ -49,5 +48,5 @@ def test_get_time_from_sntp_server(dut: Dut) -> None: NY_time, SH_time = check_time(NY_time, SH_time) for i in range(2, 4): - dut.expect('example: Boot count: {}'.format(i), timeout=30) + dut.expect(f'example: Boot count: {i}', timeout=30) NY_time, SH_time = check_time(NY_time, SH_time) diff --git a/examples/protocols/sockets/tcp_client/pytest_tcp_client.py b/examples/protocols/sockets/tcp_client/pytest_tcp_client.py index fa297269a5..d20079d27e 100644 --- a/examples/protocols/sockets/tcp_client/pytest_tcp_client.py +++ b/examples/protocols/sockets/tcp_client/pytest_tcp_client.py @@ -1,12 +1,13 @@ -# SPDX-FileCopyrightText: 2021-2022 Espressif Systems (Shanghai) CO LTD +# SPDX-FileCopyrightText: 2021-2026 Espressif Systems (Shanghai) CO LTD # SPDX-License-Identifier: Apache-2.0 - import logging import socket import pytest -from common_test_methods import (get_env_config_variable, get_host_ip4_by_dest_ip, get_host_ip6_by_dest_ip, - get_my_interface_by_dest_ip) +from common_test_methods import get_env_config_variable +from common_test_methods import get_host_ip4_by_dest_ip +from common_test_methods import get_host_ip6_by_dest_ip +from common_test_methods import get_my_interface_by_dest_ip from pytest_embedded import Dut try: @@ -37,13 +38,13 @@ def test_examples_tcp_client_ipv4(dut: Dut) -> None: ap_ssid = get_env_config_variable(env_name, 'ap_ssid') ap_password = get_env_config_variable(env_name, 'ap_password') dut.write(f'{ap_ssid} {ap_password}') - ipv4 = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode() + ipv4 = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=60)[1].decode() print(f'Connected with IPv4={ipv4}') # test IPv4 with TcpServer(PORT, socket.AF_INET): server_ip = get_host_ip4_by_dest_ip(ipv4) - print('Connect tcp client to server IP={}'.format(server_ip)) + print(f'Connect tcp client to server IP={server_ip}') dut.write(server_ip) dut.expect('OK: Message from ESP32') @@ -64,16 +65,16 @@ def test_examples_tcp_client_ipv6(dut: Dut) -> None: ap_ssid = get_env_config_variable(env_name, 'ap_ssid') ap_password = get_env_config_variable(env_name, 'ap_password') dut.write(f'{ap_ssid} {ap_password}') - ipv4 = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode() + ipv4 = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=60)[1].decode() # expect all 8 octets from IPv6 (assumes it's printed in the long form) ipv6_r = r':'.join((r'[0-9a-fA-F]{4}',) * 8) - ipv6 = dut.expect(ipv6_r, timeout=30)[0].decode() - print('Connected with IPv4={} and IPv6={}'.format(ipv4, ipv6)) + ipv6 = dut.expect(ipv6_r, timeout=60)[0].decode() + print(f'Connected with IPv4={ipv4} and IPv6={ipv6}') # test IPv6 my_interface = get_my_interface_by_dest_ip(ipv4) with TcpServer(PORT, socket.AF_INET6): server_ip = get_host_ip6_by_dest_ip(ipv6, my_interface) - print('Connect tcp client to server IP={}'.format(server_ip)) + print(f'Connect tcp client to server IP={server_ip}') dut.write(server_ip) dut.expect('OK: Message from ESP32') diff --git a/examples/protocols/sockets/tcp_server/pytest_tcp_server.py b/examples/protocols/sockets/tcp_server/pytest_tcp_server.py index 546dfc3dd7..0844f145d2 100644 --- a/examples/protocols/sockets/tcp_server/pytest_tcp_server.py +++ b/examples/protocols/sockets/tcp_server/pytest_tcp_server.py @@ -1,12 +1,12 @@ # SPDX-FileCopyrightText: 2021-2022 Espressif Systems (Shanghai) CO LTD # SPDX-License-Identifier: Apache-2.0 - import logging import time import netifaces import pytest -from common_test_methods import get_env_config_variable, get_my_interface_by_dest_ip +from common_test_methods import get_env_config_variable +from common_test_methods import get_my_interface_by_dest_ip from pytest_embedded import Dut try: @@ -38,7 +38,7 @@ def test_examples_tcp_server_ipv4(dut: Dut) -> None: ap_ssid = get_env_config_variable(env_name, 'ap_ssid') ap_password = get_env_config_variable(env_name, 'ap_password') dut.write(f'{ap_ssid} {ap_password}') - ipv4 = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode() + ipv4 = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=60)[1].decode() logging.info(f'Connected with IPv4={ipv4}') time.sleep(1) @@ -66,7 +66,7 @@ def test_examples_tcp_server_ipv4_esp32c2_26mhz(dut: Dut) -> None: ap_ssid = get_env_config_variable(env_name, 'ap_ssid') ap_password = get_env_config_variable(env_name, 'ap_password') dut.write(f'{ap_ssid} {ap_password}') - ipv4 = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode() + ipv4 = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=60)[1].decode() logging.info(f'Connected with IPv4={ipv4}') time.sleep(1) @@ -93,16 +93,16 @@ def test_examples_tcp_server_ipv6(dut: Dut) -> None: ap_ssid = get_env_config_variable(env_name, 'ap_ssid') ap_password = get_env_config_variable(env_name, 'ap_password') dut.write(f'{ap_ssid} {ap_password}') - ipv4 = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode() + ipv4 = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=60)[1].decode() # expect all 8 octets from IPv6 (assumes it's printed in the long form) ipv6_r = r':'.join((r'[0-9a-fA-F]{4}',) * 8) - ipv6 = dut.expect(ipv6_r, timeout=30)[0].decode() + ipv6 = dut.expect(ipv6_r, timeout=60)[0].decode() logging.info(f'Connected with IPv4={ipv4} and IPv6={ipv6}') time.sleep(1) interface = get_my_interface_by_dest_ip(ipv4) # test IPv6 - received = tcp_client('{}%{}'.format(ipv6, interface), PORT, MESSAGE) + received = tcp_client(f'{ipv6}%{interface}', PORT, MESSAGE) if not received == MESSAGE: raise dut.expect(MESSAGE) @@ -126,7 +126,7 @@ def test_examples_tcp_server_ipv6_only(dut: Dut) -> None: dut.write(f'{ap_ssid} {ap_password}') # expect all 8 octets from IPv6 (assumes it's printed in the long form) ipv6_r = r':'.join((r'[0-9a-fA-F]{4}',) * 8) - ipv6 = dut.expect(ipv6_r, timeout=30)[0].decode() + ipv6 = dut.expect(ipv6_r, timeout=60)[0].decode() logging.info(f'Connected AP with IPv6={ipv6}') time.sleep(5) diff --git a/examples/protocols/sockets/udp_client/pytest_udp_client.py b/examples/protocols/sockets/udp_client/pytest_udp_client.py index 6c14c88d7c..f786806aea 100644 --- a/examples/protocols/sockets/udp_client/pytest_udp_client.py +++ b/examples/protocols/sockets/udp_client/pytest_udp_client.py @@ -1,12 +1,13 @@ -# SPDX-FileCopyrightText: 2021-2022 Espressif Systems (Shanghai) CO LTD +# SPDX-FileCopyrightText: 2021-2026 Espressif Systems (Shanghai) CO LTD # SPDX-License-Identifier: Apache-2.0 - import logging import socket import pytest -from common_test_methods import (get_env_config_variable, get_host_ip4_by_dest_ip, get_host_ip6_by_dest_ip, - get_my_interface_by_dest_ip) +from common_test_methods import get_env_config_variable +from common_test_methods import get_host_ip4_by_dest_ip +from common_test_methods import get_host_ip6_by_dest_ip +from common_test_methods import get_my_interface_by_dest_ip from pexpect.exceptions import TIMEOUT from pytest_embedded import Dut @@ -39,13 +40,13 @@ def test_examples_udp_client_ipv4(dut: Dut) -> None: ap_ssid = get_env_config_variable(env_name, 'ap_ssid') ap_password = get_env_config_variable(env_name, 'ap_password') dut.write(f'{ap_ssid} {ap_password}') - ipv4 = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode() + ipv4 = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=60)[1].decode() print(f'Connected with IPv4={ipv4}') # test IPv4 with UdpServer(PORT, socket.AF_INET): server_ip = get_host_ip4_by_dest_ip(ipv4) - print('Connect udp client to server IP={}'.format(server_ip)) + print(f'Connect udp client to server IP={server_ip}') for _ in range(MAX_RETRIES): try: dut.write(server_ip) @@ -73,17 +74,17 @@ def test_examples_udp_client_ipv6(dut: Dut) -> None: ap_ssid = get_env_config_variable(env_name, 'ap_ssid') ap_password = get_env_config_variable(env_name, 'ap_password') dut.write(f'{ap_ssid} {ap_password}') - ipv4 = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode() + ipv4 = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=60)[1].decode() # expect all 8 octets from IPv6 (assumes it's printed in the long form) ipv6_r = r':'.join((r'[0-9a-fA-F]{4}',) * 8) - ipv6 = dut.expect(ipv6_r, timeout=30)[0].decode() + ipv6 = dut.expect(ipv6_r, timeout=60)[0].decode() print(f'Connected with IPv4={ipv4} and IPv6={ipv6}') interface = get_my_interface_by_dest_ip(ipv4) # test IPv6 with UdpServer(PORT, socket.AF_INET6): server_ip = get_host_ip6_by_dest_ip(ipv6, interface) - print('Connect udp client to server IP={}'.format(server_ip)) + print(f'Connect udp client to server IP={server_ip}') for _ in range(MAX_RETRIES): try: dut.write(server_ip) diff --git a/examples/protocols/sockets/udp_server/pytest_udp_server.py b/examples/protocols/sockets/udp_server/pytest_udp_server.py index 52ddcc775c..eb13807fd6 100644 --- a/examples/protocols/sockets/udp_server/pytest_udp_server.py +++ b/examples/protocols/sockets/udp_server/pytest_udp_server.py @@ -1,10 +1,10 @@ # SPDX-FileCopyrightText: 2021-2022 Espressif Systems (Shanghai) CO LTD # SPDX-License-Identifier: Apache-2.0 - import logging import pytest -from common_test_methods import get_env_config_variable, get_my_interface_by_dest_ip +from common_test_methods import get_env_config_variable +from common_test_methods import get_my_interface_by_dest_ip from pytest_embedded import Dut try: @@ -37,7 +37,7 @@ def test_examples_udp_server_ipv4(dut: Dut) -> None: ap_ssid = get_env_config_variable(env_name, 'ap_ssid') ap_password = get_env_config_variable(env_name, 'ap_password') dut.write(f'{ap_ssid} {ap_password}') - ipv4 = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode() + ipv4 = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=60)[1].decode() print(f'Connected with IPv4={ipv4}') # test IPv4 @@ -48,7 +48,7 @@ def test_examples_udp_server_ipv4(dut: Dut) -> None: print('OK') break else: - raise ValueError('IPv4: Did not receive UDP message after {} retries'.format(MAX_RETRIES)) + raise ValueError(f'IPv4: Did not receive UDP message after {MAX_RETRIES} retries') dut.expect(MESSAGE) @@ -68,20 +68,20 @@ def test_examples_udp_server_ipv6(dut: Dut) -> None: ap_ssid = get_env_config_variable(env_name, 'ap_ssid') ap_password = get_env_config_variable(env_name, 'ap_password') dut.write(f'{ap_ssid} {ap_password}') - ipv4 = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode() + ipv4 = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=60)[1].decode() # expect all 8 octets from IPv6 (assumes it's printed in the long form) ipv6_r = r':'.join((r'[0-9a-fA-F]{4}',) * 8) - ipv6 = dut.expect(ipv6_r, timeout=30)[0].decode() + ipv6 = dut.expect(ipv6_r, timeout=60)[0].decode() print(f'Connected with IPv4={ipv4} and IPv6={ipv6}') interface = get_my_interface_by_dest_ip(ipv4) # test IPv6 for _ in range(MAX_RETRIES): print('Testing UDP on IPv6...') - received = udp_client('{}%{}'.format(ipv6, interface), PORT, MESSAGE) + received = udp_client(f'{ipv6}%{interface}', PORT, MESSAGE) if received == MESSAGE: print('OK') break else: - raise ValueError('IPv6: Did not receive UDP message after {} retries'.format(MAX_RETRIES)) + raise ValueError(f'IPv6: Did not receive UDP message after {MAX_RETRIES} retries') dut.expect(MESSAGE) diff --git a/examples/system/ota/advanced_https_ota/pytest_advanced_ota.py b/examples/system/ota/advanced_https_ota/pytest_advanced_ota.py index 19d6198b9a..2654acb9ab 100644 --- a/examples/system/ota/advanced_https_ota/pytest_advanced_ota.py +++ b/examples/system/ota/advanced_https_ota/pytest_advanced_ota.py @@ -13,7 +13,8 @@ from typing import Callable import pexpect import pytest -from common_test_methods import get_env_config_variable, get_host_ip4_by_dest_ip +from common_test_methods import get_env_config_variable +from common_test_methods import get_host_ip4_by_dest_ip from pytest_embedded import Dut from RangeHTTPServer import RangeRequestHandler @@ -120,8 +121,8 @@ def test_examples_protocol_advanced_https_ota_example(dut: Dut) -> None: for _ in range(iterations): dut.expect('Loaded app from partition at offset', timeout=30) try: - ip_address = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode() - print('Connected to AP/Ethernet with IP: {}'.format(ip_address)) + ip_address = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=60)[1].decode() + print(f'Connected to AP/Ethernet with IP: {ip_address}') except pexpect.exceptions.TIMEOUT: raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP/Ethernet') dut.expect('Starting Advanced OTA example', timeout=30) @@ -170,8 +171,8 @@ def test_examples_protocol_advanced_https_ota_example_truncated_bin(dut: Dut) -> # start test dut.expect('Loaded app from partition at offset', timeout=30) try: - ip_address = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode() - print('Connected to AP/Ethernet with IP: {}'.format(ip_address)) + ip_address = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=60)[1].decode() + print(f'Connected to AP/Ethernet with IP: {ip_address}') except pexpect.exceptions.TIMEOUT: raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP/Ethernet') dut.expect('Starting Advanced OTA example', timeout=30) @@ -224,8 +225,8 @@ def test_examples_protocol_advanced_https_ota_example_truncated_header(dut: Dut) # start test dut.expect('Loaded app from partition at offset', timeout=30) try: - ip_address = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode() - print('Connected to AP/Ethernet with IP: {}'.format(ip_address)) + ip_address = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=60)[1].decode() + print(f'Connected to AP/Ethernet with IP: {ip_address}') except pexpect.exceptions.TIMEOUT: raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP/Ethernet') host_ip = get_host_ip4_by_dest_ip(ip_address) @@ -278,8 +279,8 @@ def test_examples_protocol_advanced_https_ota_example_random(dut: Dut) -> None: # start test dut.expect('Loaded app from partition at offset', timeout=30) try: - ip_address = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode() - print('Connected to AP/Ethernet with IP: {}'.format(ip_address)) + ip_address = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=60)[1].decode() + print(f'Connected to AP/Ethernet with IP: {ip_address}') except pexpect.exceptions.TIMEOUT: raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP/Ethernet') host_ip = get_host_ip4_by_dest_ip(ip_address) @@ -334,8 +335,8 @@ def test_examples_protocol_advanced_https_ota_example_invalid_chip_id(dut: Dut) # start test dut.expect('Loaded app from partition at offset', timeout=30) try: - ip_address = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode() - print('Connected to AP/Ethernet with IP: {}'.format(ip_address)) + ip_address = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=60)[1].decode() + print(f'Connected to AP/Ethernet with IP: {ip_address}') except pexpect.exceptions.TIMEOUT: raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP/Ethernet') host_ip = get_host_ip4_by_dest_ip(ip_address) @@ -374,8 +375,8 @@ def test_examples_protocol_advanced_https_ota_example_chunked(dut: Dut) -> None: # start test dut.expect('Loaded app from partition at offset', timeout=30) try: - ip_address = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode() - print('Connected to AP/Ethernet with IP: {}'.format(ip_address)) + ip_address = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=60)[1].decode() + print(f'Connected to AP/Ethernet with IP: {ip_address}') except pexpect.exceptions.TIMEOUT: raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP/Ethernet') host_ip = get_host_ip4_by_dest_ip(ip_address) @@ -415,8 +416,8 @@ def test_examples_protocol_advanced_https_ota_example_redirect_url(dut: Dut) -> # start test dut.expect('Loaded app from partition at offset', timeout=30) try: - ip_address = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode() - print('Connected to AP/Ethernet with IP: {}'.format(ip_address)) + ip_address = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=60)[1].decode() + print(f'Connected to AP/Ethernet with IP: {ip_address}') except pexpect.exceptions.TIMEOUT: raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP/Ethernet') dut.expect('Starting Advanced OTA example', timeout=30) @@ -490,8 +491,8 @@ def test_examples_protocol_advanced_https_ota_example_anti_rollback(dut: Dut) -> # Positive Case dut.expect('Loaded app from partition at offset', timeout=30) try: - ip_address = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode() - print('Connected to AP/Ethernet with IP: {}'.format(ip_address)) + ip_address = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=60)[1].decode() + print(f'Connected to AP/Ethernet with IP: {ip_address}') except pexpect.exceptions.TIMEOUT: raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP/Ethernet') host_ip = get_host_ip4_by_dest_ip(ip_address) @@ -501,7 +502,7 @@ def test_examples_protocol_advanced_https_ota_example_anti_rollback(dut: Dut) -> print('writing to device: {}'.format('https://' + host_ip + ':' + str(server_port) + '/' + bin_name)) dut.write('https://' + host_ip + ':' + str(server_port) + '/' + bin_name) dut.expect('Loaded app from partition at offset', timeout=60) - dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode() + dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=60)[1].decode() dut.expect(r'App is valid, rollback cancelled successfully', timeout=30) # Negative Case @@ -549,8 +550,8 @@ def test_examples_protocol_advanced_https_ota_example_partial_request(dut: Dut) # start test dut.expect('Loaded app from partition at offset', timeout=30) try: - ip_address = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode() - print('Connected to AP/Ethernet with IP: {}'.format(ip_address)) + ip_address = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=60)[1].decode() + print(f'Connected to AP/Ethernet with IP: {ip_address}') except pexpect.exceptions.TIMEOUT: raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP') host_ip = get_host_ip4_by_dest_ip(ip_address) @@ -601,8 +602,8 @@ def test_examples_protocol_advanced_https_ota_example_nimble_gatts(dut: Dut) -> ap_password = get_env_config_variable(env_name, 'ap_password') dut.write(f'{ap_ssid} {ap_password}') try: - ip_address = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode() - print('Connected to AP with IP: {}'.format(ip_address)) + ip_address = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=60)[1].decode() + print(f'Connected to AP with IP: {ip_address}') except pexpect.exceptions.TIMEOUT: raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP') host_ip = get_host_ip4_by_dest_ip(ip_address) @@ -653,8 +654,8 @@ def test_examples_protocol_advanced_https_ota_example_bluedroid_gatts(dut: Dut) ap_password = get_env_config_variable(env_name, 'ap_password') dut.write(f'{ap_ssid} {ap_password}') try: - ip_address = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode() - print('Connected to AP with IP: {}'.format(ip_address)) + ip_address = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=60)[1].decode() + print(f'Connected to AP with IP: {ip_address}') except pexpect.exceptions.TIMEOUT: raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP') host_ip = get_host_ip4_by_dest_ip(ip_address) @@ -707,8 +708,8 @@ def test_examples_protocol_advanced_https_ota_example_openssl_aligned_bin(dut: D # start test dut.expect('Loaded app from partition at offset', timeout=30) try: - ip_address = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode() - print('Connected to AP/Ethernet with IP: {}'.format(ip_address)) + ip_address = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=60)[1].decode() + print(f'Connected to AP/Ethernet with IP: {ip_address}') except pexpect.exceptions.TIMEOUT: raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP/Ethernet') host_ip = get_host_ip4_by_dest_ip(ip_address) diff --git a/examples/system/ota/simple_ota_example/pytest_simple_ota.py b/examples/system/ota/simple_ota_example/pytest_simple_ota.py index ca6227c910..5c4dc94f25 100644 --- a/examples/system/ota/simple_ota_example/pytest_simple_ota.py +++ b/examples/system/ota/simple_ota_example/pytest_simple_ota.py @@ -1,16 +1,19 @@ # SPDX-FileCopyrightText: 2022-2023 Espressif Systems (Shanghai) CO LTD # SPDX-License-Identifier: Unlicense OR CC0-1.0 import http.server +import logging import multiprocessing import os import ssl import subprocess import sys +from typing import Optional from typing import Tuple import pexpect import pytest -from common_test_methods import get_env_config_variable, get_host_ip4_by_dest_ip +from common_test_methods import get_env_config_variable +from common_test_methods import get_host_ip4_by_dest_ip from pytest_embedded import Dut server_cert = '-----BEGIN CERTIFICATE-----\n' \ @@ -64,7 +67,13 @@ server_key = '-----BEGIN PRIVATE KEY-----\n'\ '-----END PRIVATE KEY-----\n' -def start_https_server(ota_image_dir: str, server_ip: str, server_port: int, server_file: str = None, key_file: str = None) -> None: +def start_https_server( + ota_image_dir: str, + server_ip: str, + server_port: int, + server_file: Optional[str] = None, + key_file: Optional[str] = None, +) -> None: os.chdir(ota_image_dir) if server_file is None: @@ -104,12 +113,12 @@ def start_tls1_3_server(ota_image_dir: str, server_port: int) -> subprocess.Pope def check_sha256(sha256_expected: str, sha256_reported: str) -> None: - print('sha256_expected: %s' % (sha256_expected)) - print('sha256_reported: %s' % (sha256_reported)) + logging.info('sha256_expected: %s', sha256_expected) + logging.info('sha256_reported: %s', sha256_reported) if sha256_expected not in sha256_reported: raise ValueError('SHA256 mismatch') else: - print('SHA256 expected and reported are the same') + logging.info('SHA256 expected and reported are the same') def calc_all_sha256(dut: Dut) -> Tuple[str, str]: diff --git a/examples/wifi/power_save/pytest_wifi_power_save.py b/examples/wifi/power_save/pytest_wifi_power_save.py index a1078f9fee..bbfbeaa149 100644 --- a/examples/wifi/power_save/pytest_wifi_power_save.py +++ b/examples/wifi/power_save/pytest_wifi_power_save.py @@ -30,7 +30,7 @@ def _run_test(dut: Dut) -> None: pass try: - dut.expect(r'got ip: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30) + dut.expect(r'got ip: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=60) log_after_got_ip = dut.expect(pexpect.TIMEOUT, timeout=10).decode() if any(s in log_after_got_ip for s in bad_event_str): logging.info('Abnormal connection log:')