mirror of
https://github.com/espressif/esp-idf.git
synced 2026-04-27 19:13:21 +00:00
285 lines
12 KiB
Python
285 lines
12 KiB
Python
#!/usr/bin/env python
|
|
#
|
|
# SPDX-FileCopyrightText: 2021-2026 Espressif Systems (Shanghai) CO LTD
|
|
# SPDX-License-Identifier: Apache-2.0
|
|
import http.client
|
|
import logging
|
|
import os
|
|
import ssl
|
|
|
|
import pytest
|
|
from common_test_methods import get_env_config_variable
|
|
from pytest_embedded import Dut
|
|
from pytest_embedded_idf.utils import idf_parametrize
|
|
|
|
CURRENT_FILE_PATH = os.path.dirname(__file__)
|
|
|
|
CACERT_FILE_PATH = os.path.join(CURRENT_FILE_PATH, './main/certs/cacert.pem')
|
|
CLIENT_CERT_PATH = os.path.join(CURRENT_FILE_PATH, './main/certs/client_cert.pem')
|
|
CLIENT_KEY_PATH = os.path.join(CURRENT_FILE_PATH, './main/certs/client_key.pem')
|
|
|
|
success_response = '<h1>Hello Secure World!</h1>'
|
|
|
|
|
|
@pytest.mark.wifi_router
|
|
@idf_parametrize('target', ['esp32', 'esp32c3', 'esp32s3'], indirect=['target'])
|
|
def test_examples_protocol_https_server_simple(dut: Dut) -> None:
|
|
"""
|
|
steps: |
|
|
1. join AP
|
|
2. connect to www.howsmyssl.com:443
|
|
3. send http request
|
|
"""
|
|
# check and log bin size
|
|
binary_file = os.path.join(dut.app.binary_path, 'https_server.bin')
|
|
bin_size = os.path.getsize(binary_file)
|
|
logging.info('https_server_simple_bin_size : {}KB'.format(bin_size // 1024))
|
|
# start test
|
|
logging.info('Waiting to connect with AP')
|
|
if dut.app.sdkconfig.get('EXAMPLE_WIFI_SSID_PWD_FROM_STDIN') is True:
|
|
dut.expect('Please input ssid password:')
|
|
env_name = 'wifi_router'
|
|
ap_ssid = get_env_config_variable(env_name, 'ap_ssid')
|
|
ap_password = get_env_config_variable(env_name, 'ap_password')
|
|
dut.write(f'{ap_ssid} {ap_password}')
|
|
# Parse IP address and port of the server
|
|
dut.expect(r'Starting server')
|
|
got_port = int(dut.expect(r'Server listening on port (\d+)', timeout=30)[1].decode())
|
|
got_ip = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
|
|
|
|
# Expected logs
|
|
|
|
logging.info('Got IP : {}'.format(got_ip))
|
|
logging.info('Got Port : {}'.format(got_port))
|
|
|
|
# Try requesting without client certificate and it should fail if client cert is required
|
|
logging.info('Performing GET request over an SSL connection with the server without client certificate')
|
|
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
|
|
ssl_context.verify_mode = ssl.CERT_REQUIRED
|
|
ssl_context.check_hostname = False
|
|
# Load the CA certificate from the path
|
|
ssl_context.load_verify_locations(cafile=CACERT_FILE_PATH)
|
|
conn = http.client.HTTPSConnection(got_ip, got_port, context=ssl_context)
|
|
logging.info('Performing SSL handshake with the server without client certificate')
|
|
try:
|
|
conn.request('GET', '/')
|
|
resp = conn.getresponse()
|
|
resp.read().decode('utf-8')
|
|
except Exception as e:
|
|
if dut.app.sdkconfig.get('EXAMPLE_ENABLE_SKIP_CLIENT_CERT') is True:
|
|
logging.info('SSL handshake failed without client certificate but was expected to be successful')
|
|
raise RuntimeError('Failed to test SSL connection') from e
|
|
else:
|
|
logging.info('SSL handshake failed without client certificate as expected')
|
|
finally:
|
|
conn.close()
|
|
|
|
logging.info('Performing GET request over an SSL connection with the server')
|
|
|
|
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
|
|
ssl_context.verify_mode = ssl.CERT_REQUIRED
|
|
ssl_context.check_hostname = False
|
|
ssl_context.load_verify_locations(cafile=CACERT_FILE_PATH)
|
|
|
|
ssl_context.load_cert_chain(certfile=CLIENT_CERT_PATH, keyfile=CLIENT_KEY_PATH)
|
|
|
|
conn = http.client.HTTPSConnection(got_ip, got_port, context=ssl_context)
|
|
logging.info('Performing SSL handshake with the server')
|
|
conn.request('GET', '/')
|
|
resp = conn.getresponse()
|
|
dut.expect('performing session handshake')
|
|
got_resp = resp.read().decode('utf-8')
|
|
if got_resp != success_response:
|
|
logging.info('Response obtained does not match with correct response')
|
|
raise RuntimeError('Failed to test SSL connection')
|
|
|
|
if dut.app.sdkconfig.get('EXAMPLE_ENABLE_HTTPS_USER_CALLBACK') is True:
|
|
current_cipher = dut.expect(r'Current Ciphersuite(.*)', timeout=5)[0]
|
|
logging.info('Current Ciphersuite {}'.format(current_cipher))
|
|
|
|
conn.close()
|
|
|
|
logging.info('Checking user callback: Obtaining client certificate...')
|
|
|
|
serial_number = dut.expect(r'serial number\s*:([^\n]*)', timeout=5)[0]
|
|
issuer_name = dut.expect(r'issuer name\s*:([^\n]*)', timeout=5)[0]
|
|
expiry = dut.expect(r'expires on ((.*)\d{4}\-(0?[1-9]|1[012])\-(0?[1-9]|[12][0-9]|3[01])*)', timeout=5)[
|
|
1
|
|
].decode()
|
|
|
|
logging.info('Serial No. {}'.format(serial_number))
|
|
logging.info('Issuer Name {}'.format(issuer_name))
|
|
logging.info('Expires on {}'.format(expiry))
|
|
|
|
logging.info('Correct response obtained')
|
|
logging.info('SSL connection test successful\nClosing the connection')
|
|
|
|
|
|
@pytest.mark.wifi_router
|
|
@pytest.mark.parametrize(
|
|
'config',
|
|
[
|
|
'dynamic_buffer',
|
|
],
|
|
indirect=True,
|
|
)
|
|
@idf_parametrize('target', ['esp32', 'esp32c3', 'esp32s3'], indirect=['target'])
|
|
def test_examples_protocol_https_server_simple_dynamic_buffers(dut: Dut) -> None:
|
|
# Test with mbedTLS dynamic buffer feature
|
|
|
|
# start test
|
|
logging.info('Waiting to connect with AP')
|
|
if dut.app.sdkconfig.get('EXAMPLE_WIFI_SSID_PWD_FROM_STDIN') is True:
|
|
dut.expect('Please input ssid password:')
|
|
env_name = 'wifi_router'
|
|
ap_ssid = get_env_config_variable(env_name, 'ap_ssid')
|
|
ap_password = get_env_config_variable(env_name, 'ap_password')
|
|
dut.write(f'{ap_ssid} {ap_password}')
|
|
# Parse IP address and port of the server
|
|
dut.expect(r'Starting server')
|
|
got_port = int(dut.expect(r'Server listening on port (\d+)', timeout=30)[1].decode())
|
|
got_ip = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
|
|
|
|
# Expected logs
|
|
|
|
logging.info('Got IP : {}'.format(got_ip))
|
|
logging.info('Got Port : {}'.format(got_port))
|
|
|
|
logging.info('Performing GET request over an SSL connection with the server')
|
|
|
|
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
|
|
ssl_context.verify_mode = ssl.CERT_REQUIRED
|
|
ssl_context.check_hostname = False
|
|
ssl_context.load_verify_locations(cafile=CACERT_FILE_PATH)
|
|
|
|
ssl_context.load_cert_chain(certfile=CLIENT_CERT_PATH, keyfile=CLIENT_KEY_PATH)
|
|
|
|
conn = http.client.HTTPSConnection(got_ip, got_port, context=ssl_context)
|
|
logging.info('Performing SSL handshake with the server')
|
|
conn.request('GET', '/')
|
|
resp = conn.getresponse()
|
|
dut.expect('performing session handshake')
|
|
got_resp = resp.read().decode('utf-8')
|
|
if got_resp != success_response:
|
|
logging.info('Response obtained does not match with correct response')
|
|
raise RuntimeError('Failed to test SSL connection')
|
|
|
|
if dut.app.sdkconfig.get('EXAMPLE_ENABLE_HTTPS_USER_CALLBACK') is True:
|
|
current_cipher = dut.expect(r'Current Ciphersuite(.*)', timeout=5)[0]
|
|
logging.info('Current Ciphersuite {}'.format(current_cipher))
|
|
|
|
# Close the connection
|
|
conn.close()
|
|
|
|
logging.info('Checking user callback: Obtaining client certificate...')
|
|
|
|
serial_number = dut.expect(r'serial number\s*:([^\n]*)', timeout=5)[0]
|
|
issuer_name = dut.expect(r'issuer name\s*:([^\n]*)', timeout=5)[0]
|
|
expiry = dut.expect(r'expires on\s*:((.*)\d{4}\-(0?[1-9]|1[012])\-(0?[1-9]|[12][0-9]|3[01])*)', timeout=5)[
|
|
1
|
|
].decode()
|
|
|
|
logging.info('Serial No. : {}'.format(serial_number))
|
|
logging.info('Issuer Name : {}'.format(issuer_name))
|
|
logging.info('Expires on : {}'.format(expiry))
|
|
|
|
logging.info('Correct response obtained')
|
|
logging.info('SSL connection test successful\nClosing the connection')
|
|
|
|
|
|
@pytest.mark.wifi_router
|
|
@pytest.mark.parametrize(
|
|
'config',
|
|
[
|
|
'tls1_3',
|
|
],
|
|
indirect=True,
|
|
)
|
|
@idf_parametrize('target', ['esp32', 'esp32c3', 'esp32s3'], indirect=['target'])
|
|
def test_examples_protocol_https_server_tls1_3(dut: Dut) -> None:
|
|
logging.info('Waiting to connect with AP')
|
|
if dut.app.sdkconfig.get('EXAMPLE_WIFI_SSID_PWD_FROM_STDIN') is True:
|
|
dut.expect('Please input ssid password:')
|
|
env_name = 'wifi_router'
|
|
ap_ssid = get_env_config_variable(env_name, 'ap_ssid')
|
|
ap_password = get_env_config_variable(env_name, 'ap_password')
|
|
dut.write(f'{ap_ssid} {ap_password}')
|
|
# Parse IP address and port of the server
|
|
dut.expect(r'Starting server')
|
|
got_port = int(dut.expect(r'Server listening on port (\d+)', timeout=30)[1].decode())
|
|
got_ip = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
|
|
|
|
# Expected logs
|
|
logging.info('Got IP : {}'.format(got_ip))
|
|
logging.info('Got Port : {}'.format(got_port))
|
|
logging.info('Performing GET request over an SSL connection with the server using TLSv1.3')
|
|
|
|
# First try requesting without client certificate and it should also pass if client cert is optional or none
|
|
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
|
|
ssl_context.minimum_version = ssl.TLSVersion.TLSv1_3
|
|
ssl_context.maximum_version = ssl.TLSVersion.TLSv1_3
|
|
ssl_context.check_hostname = False
|
|
ssl_context.verify_mode = ssl.CERT_REQUIRED
|
|
ssl_context.load_verify_locations(cafile=CACERT_FILE_PATH)
|
|
|
|
conn = http.client.HTTPSConnection(got_ip, got_port, context=ssl_context)
|
|
logging.info('Performing SSL handshake with the server without client certificate')
|
|
try:
|
|
conn.request('GET', '/')
|
|
resp = conn.getresponse()
|
|
resp.read().decode('utf-8')
|
|
if dut.app.sdkconfig.get('EXAMPLE_ENABLE_SKIP_CLIENT_CERT') is True:
|
|
dut.expect('performing session handshake')
|
|
logging.info('SSL handshake successful without client certificate as expected')
|
|
else:
|
|
logging.info('SSL handshake successful without client certificate but was expected to fail')
|
|
raise RuntimeError('Failed to test SSL connection')
|
|
except Exception as e:
|
|
if dut.app.sdkconfig.get('EXAMPLE_ENABLE_SKIP_CLIENT_CERT') is True:
|
|
logging.info(f'SSL handshake failed without client certificate but was expected to be successful: {e}')
|
|
raise RuntimeError('Failed to test SSL connection')
|
|
else:
|
|
logging.info('SSL handshake failed without client certificate as expected')
|
|
finally:
|
|
conn.close()
|
|
|
|
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
|
|
ssl_context.minimum_version = ssl.TLSVersion.TLSv1_3
|
|
ssl_context.maximum_version = ssl.TLSVersion.TLSv1_3
|
|
ssl_context.verify_mode = ssl.CERT_REQUIRED
|
|
ssl_context.check_hostname = False
|
|
ssl_context.load_verify_locations(cafile=CACERT_FILE_PATH)
|
|
ssl_context.load_cert_chain(certfile=CLIENT_CERT_PATH, keyfile=CLIENT_KEY_PATH)
|
|
|
|
conn = http.client.HTTPSConnection(got_ip, got_port, context=ssl_context)
|
|
logging.info('Performing SSL handshake with the server')
|
|
conn.request('GET', '/')
|
|
resp = conn.getresponse()
|
|
dut.expect('performing session handshake')
|
|
got_resp = resp.read().decode('utf-8')
|
|
if got_resp != success_response:
|
|
logging.info('Response obtained does not match with correct response')
|
|
raise RuntimeError('Failed to test SSL connection')
|
|
|
|
if dut.app.sdkconfig.get('EXAMPLE_ENABLE_HTTPS_USER_CALLBACK') is True:
|
|
current_cipher = dut.expect(r'Current Ciphersuite(.*)', timeout=5)[0]
|
|
logging.info('Current Ciphersuite {}'.format(current_cipher))
|
|
|
|
conn.close()
|
|
|
|
logging.info('Checking user callback: Obtaining client certificate...')
|
|
|
|
serial_number = dut.expect(r'serial number\s*:([^\n]*)', timeout=5)[0]
|
|
issuer_name = dut.expect(r'issuer name\s*:([^\n]*)', timeout=5)[0]
|
|
expiry = dut.expect(
|
|
r'expires on\s*:((.*)\d{4}\-(0?[1-9]|1[012])\-(0?[1-9]|[12][0-9]|3[01])*)',
|
|
timeout=5,
|
|
)[1].decode()
|
|
|
|
logging.info('Serial No. : {}'.format(serial_number))
|
|
logging.info('Issuer Name : {}'.format(issuer_name))
|
|
logging.info('Expires on : {}'.format(expiry))
|
|
|
|
logging.info('Correct response obtained')
|
|
logging.info('SSL connection test successful\nClosing the connection')
|