mirror of
https://github.com/espressif/esp-idf.git
synced 2026-04-27 19:13:21 +00:00
fix(provisioning): fix incorrect AES-GCM IV usage in security2 scheme
Using same IV in AES-GCM across multiple invocation of encryption/decryption operations can pose a security risk. It can help to reveal co-relation between different plaintexts. This commit introduces a change to use part of IV as a monotonic counter, which must be incremented after every AES-GCM invocation on both the client and the device side. Concept of patch version for a security scheme has been introduced here which can help to differentiate a protocol behavior for the provisioning entity. The security patch version will be available in the JSON response for `proto-ver` endpoint request with the field `sec_patch_ver`. Please refer to documentation for more details on the changes required on the provisioning entity side (e.g., PhoneApps).
This commit is contained in:
@@ -1,9 +1,8 @@
|
||||
#!/usr/bin/env python
|
||||
#
|
||||
# SPDX-FileCopyrightText: 2018-2022 Espressif Systems (Shanghai) CO LTD
|
||||
# SPDX-FileCopyrightText: 2018-2025 Espressif Systems (Shanghai) CO LTD
|
||||
# SPDX-License-Identifier: Apache-2.0
|
||||
#
|
||||
|
||||
import argparse
|
||||
import asyncio
|
||||
import json
|
||||
@@ -38,9 +37,9 @@ def on_except(err):
|
||||
print(err)
|
||||
|
||||
|
||||
def get_security(secver, username, password, pop='', verbose=False):
|
||||
def get_security(secver, sec_patch_ver, username, password, pop='', verbose=False):
|
||||
if secver == 2:
|
||||
return security.Security2(username, password, verbose)
|
||||
return security.Security2(sec_patch_ver, username, password, verbose)
|
||||
elif secver == 1:
|
||||
return security.Security1(pop, verbose)
|
||||
elif secver == 0:
|
||||
@@ -148,6 +147,27 @@ async def get_version(tp):
|
||||
return response
|
||||
|
||||
|
||||
async def get_sec_patch_ver(tp, verbose=False):
|
||||
response = await get_version(tp)
|
||||
|
||||
if verbose:
|
||||
print('proto-ver response : ', response)
|
||||
|
||||
try:
|
||||
# Interpret this as JSON structure containing
|
||||
# information with security version information
|
||||
info = json.loads(response)
|
||||
try:
|
||||
sec_patch_ver = info['prov']['sec_patch_ver']
|
||||
except KeyError:
|
||||
sec_patch_ver = 0
|
||||
return sec_patch_ver
|
||||
|
||||
except ValueError:
|
||||
# If decoding as JSON fails, we assume default patch level
|
||||
return 0
|
||||
|
||||
|
||||
async def establish_session(tp, sec):
|
||||
try:
|
||||
response = None
|
||||
@@ -415,6 +435,7 @@ async def main():
|
||||
raise RuntimeError('Failed to establish connection')
|
||||
|
||||
try:
|
||||
sec_patch_ver = 0
|
||||
# If security version not specified check in capabilities
|
||||
if args.secver is None:
|
||||
# First check if capabilities are supported or not
|
||||
@@ -436,13 +457,14 @@ async def main():
|
||||
args.sec1_pop = ''
|
||||
|
||||
if (args.secver == 2):
|
||||
sec_patch_ver = await get_sec_patch_ver(obj_transport)
|
||||
if len(args.sec2_usr) == 0:
|
||||
args.sec2_usr = input('Security Scheme 2 - SRP6a Username required: ')
|
||||
if len(args.sec2_pwd) == 0:
|
||||
prompt_str = 'Security Scheme 2 - SRP6a Password required: '
|
||||
args.sec2_pwd = getpass(prompt_str)
|
||||
|
||||
obj_security = get_security(args.secver, args.sec2_usr, args.sec2_pwd, args.sec1_pop, args.verbose)
|
||||
obj_security = get_security(args.secver, sec_patch_ver, args.sec2_usr, args.sec2_pwd, args.sec1_pop, args.verbose)
|
||||
if obj_security is None:
|
||||
raise ValueError('Invalid Security Version')
|
||||
|
||||
@@ -459,7 +481,7 @@ async def main():
|
||||
print('==== Session Established ====')
|
||||
|
||||
if args.reset:
|
||||
print('==== Reseting WiFi====')
|
||||
print('==== Resetting WiFi====')
|
||||
await reset_wifi(obj_transport, obj_security)
|
||||
sys.exit()
|
||||
|
||||
|
||||
@@ -1,18 +1,19 @@
|
||||
# SPDX-FileCopyrightText: 2018-2022 Espressif Systems (Shanghai) CO LTD
|
||||
# SPDX-FileCopyrightText: 2018-2025 Espressif Systems (Shanghai) CO LTD
|
||||
# SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
# APIs for interpreting and creating protobuf packets for
|
||||
# protocomm endpoint with security type protocomm_security2
|
||||
|
||||
|
||||
from typing import Any, Type
|
||||
import struct
|
||||
from typing import Any
|
||||
from typing import Type
|
||||
|
||||
import proto
|
||||
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
|
||||
from utils import long_to_bytes, str_to_bytes
|
||||
from utils import long_to_bytes
|
||||
from utils import str_to_bytes
|
||||
|
||||
from .security import Security
|
||||
from .srp6a import Srp6a, generate_salt_and_verifier
|
||||
from .srp6a import generate_salt_and_verifier
|
||||
from .srp6a import Srp6a
|
||||
|
||||
AES_KEY_LEN = 256 // 8
|
||||
|
||||
@@ -30,17 +31,18 @@ def sec2_gen_salt_verifier(username: str, password: str, salt_len: int) -> Any:
|
||||
|
||||
salt_str = ', '.join([format(b, '#04x') for b in salt])
|
||||
salt_c_arr = '\n '.join(salt_str[i: i + 96] for i in range(0, len(salt_str), 96))
|
||||
print(f'static const char sec2_salt[] = {{\n {salt_c_arr}\n}};\n')
|
||||
print(f'static const char sec2_salt[] = {{\n {salt_c_arr}\n}};\n') # noqa E702
|
||||
|
||||
verifier_str = ', '.join([format(b, '#04x') for b in verifier])
|
||||
verifier_c_arr = '\n '.join(verifier_str[i: i + 96] for i in range(0, len(verifier_str), 96))
|
||||
print(f'static const char sec2_verifier[] = {{\n {verifier_c_arr}\n}};\n')
|
||||
print(f'static const char sec2_verifier[] = {{\n {verifier_c_arr}\n}};\n') # noqa E702
|
||||
|
||||
|
||||
class Security2(Security):
|
||||
def __init__(self, username: str, password: str, verbose: bool) -> None:
|
||||
def __init__(self, sec_patch_ver:int, username: str, password: str, verbose: bool) -> None:
|
||||
# Initialize state of the security2 FSM
|
||||
self.session_state = security_state.REQUEST1
|
||||
self.sec_patch_ver = sec_patch_ver
|
||||
self.username = username
|
||||
self.password = password
|
||||
self.verbose = verbose
|
||||
@@ -49,7 +51,7 @@ class Security2(Security):
|
||||
self.cipher: Type[AESGCM]
|
||||
|
||||
self.client_pop_key = None
|
||||
self.nonce = None
|
||||
self.nonce = bytearray()
|
||||
|
||||
Security.__init__(self, self.security2_session)
|
||||
|
||||
@@ -75,7 +77,7 @@ class Security2(Security):
|
||||
|
||||
def _print_verbose(self, data: str) -> None:
|
||||
if (self.verbose):
|
||||
print(f'\x1b[32;20m++++ {data} ++++\x1b[0m')
|
||||
print(f'\x1b[32;20m++++ {data} ++++\x1b[0m') # noqa E702
|
||||
|
||||
def setup0_request(self) -> Any:
|
||||
# Form SessionCmd0 request packet using client public key
|
||||
@@ -148,7 +150,7 @@ class Security2(Security):
|
||||
self._print_verbose(f'Session Key:\t0x{session_key.hex()}')
|
||||
|
||||
# 96-bit nonce
|
||||
self.nonce = setup_resp.sec2.sr1.device_nonce
|
||||
self.nonce = bytearray(setup_resp.sec2.sr1.device_nonce)
|
||||
if self.nonce is None:
|
||||
raise RuntimeError('Received invalid nonce from device!')
|
||||
self._print_verbose(f'Nonce:\t0x{self.nonce.hex()}')
|
||||
@@ -158,8 +160,23 @@ class Security2(Security):
|
||||
if self.cipher is None:
|
||||
raise RuntimeError('Failed to initialize AES-GCM cryptographic engine!')
|
||||
|
||||
def _increment_nonce(self) -> None:
|
||||
"""Increment the last 4 bytes of nonce (big-endian counter)."""
|
||||
if self.sec_patch_ver == 1:
|
||||
counter = struct.unpack('>I', self.nonce[8:])[0] # Read last 4 bytes as big-endian integer
|
||||
counter += 1 # Increment counter
|
||||
if counter > 0xFFFFFFFF: # Check for overflow
|
||||
raise RuntimeError('Nonce counter overflow')
|
||||
self.nonce[8:] = struct.pack('>I', counter) # Store back as big-endian
|
||||
|
||||
def encrypt_data(self, data: bytes) -> Any:
|
||||
return self.cipher.encrypt(self.nonce, data, None)
|
||||
self._print_verbose(f'Nonce:\t0x{self.nonce.hex()}')
|
||||
ciphertext = self.cipher.encrypt(self.nonce, data, None)
|
||||
self._increment_nonce()
|
||||
return ciphertext
|
||||
|
||||
def decrypt_data(self, data: bytes) -> Any:
|
||||
return self.cipher.decrypt(self.nonce, data, None)
|
||||
self._print_verbose(f'Nonce:\t0x{self.nonce.hex()}')
|
||||
plaintext = self.cipher.decrypt(self.nonce, data, None)
|
||||
self._increment_nonce()
|
||||
return plaintext
|
||||
|
||||
Reference in New Issue
Block a user