From 6904fc485bf9427edcf846379ff0cb3630113fb7 Mon Sep 17 00:00:00 2001 From: Shubham Patil Date: Fri, 19 Aug 2022 17:58:37 +0530 Subject: [PATCH] mfg_tool: Support device info, device instance info and few more attributes from basic cluster --- tools/mfg_tool/chip_nvs.py | 127 +++++++++ tools/mfg_tool/chip_nvs_keys.py | 69 ----- tools/mfg_tool/mfg_tool.py | 481 +++++++++++++------------------- tools/mfg_tool/requirements.txt | 1 + tools/mfg_tool/utils.py | 333 ++++++++++++++++++++++ 5 files changed, 661 insertions(+), 350 deletions(-) create mode 100644 tools/mfg_tool/chip_nvs.py delete mode 100644 tools/mfg_tool/chip_nvs_keys.py create mode 100644 tools/mfg_tool/utils.py diff --git a/tools/mfg_tool/chip_nvs.py b/tools/mfg_tool/chip_nvs.py new file mode 100644 index 000000000..8b08bda9a --- /dev/null +++ b/tools/mfg_tool/chip_nvs.py @@ -0,0 +1,127 @@ +#!/usr/bin/env python3 + +# Copyright 2022 Espressif Systems (Shanghai) PTE LTD +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +This file contains the CHIP specific key along with the data type and encoding format +""" + +# This map contains mandatory CHIP specific key along with the data type and encoding format +# Additionally to add more keys to chip-factory, use chip_factory_append() API +CHIP_NVS_MAP = { + 'chip-factory': { + # Commissionable Data + 'discriminator': { + 'type': 'data', + 'encoding': 'u32', + 'value': None, + }, + 'iteration-count': { + 'type': 'data', + 'encoding': 'u32', + 'value': None, + }, + 'salt': { + 'type': 'data', + 'encoding': 'string', + 'value': None, + }, + 'verifier': { + 'type': 'data', + 'encoding': 'string', + 'value': None, + }, + + # Device Attestation Credentials + 'dac-cert': { + 'type': 'file', + 'encoding': 'binary', + 'value': None, + }, + 'dac-key': { + 'type': 'file', + 'encoding': 'binary', + 'value': None, + }, + 'dac-pub-key': { + 'type': 'file', + 'encoding': 'binary', + 'value': None, + }, + 'pai-cert': { + 'type': 'file', + 'encoding': 'binary', + 'value': None, + }, + 'cert-dclrn': { + 'type': 'file', + 'encoding': 'binary', + 'value': None, + }, + }, + 'chip-config': { + 'unique-id': { + 'type': 'data', + 'encoding': 'string', + 'value': None, + } + } +} + + +def get_dict(key, type, encoding, value): + return { + key: { + 'type': type, + 'encoding': encoding, + 'value': value, + } + } + + +def chip_nvs_get_config_csv(): + csv_data = '' + for k, v in CHIP_NVS_MAP.items(): + csv_data += k + ',' + 'namespace,' + '\n' + for k1, v1 in v.items(): + csv_data += k1 + ',' + v1['type'] + ',' + v1['encoding'] + '\n' + return csv_data + + +def chip_factory_append(key, type, encoding, value): + CHIP_NVS_MAP['chip-factory'].update(get_dict(key, type, encoding, value)) + + +def chip_factory_update(key, value): + CHIP_NVS_MAP['chip-factory'][key]['value'] = value + + +def chip_config_update(key, value): + CHIP_NVS_MAP['chip-config'][key]['value'] = value + + +def chip_get_keys_as_csv(): + keys = list() + for ns in CHIP_NVS_MAP: + keys.extend(list(CHIP_NVS_MAP[ns])) + return ','.join(keys) + + +def chip_get_values_as_csv(): + values = list() + for ns in CHIP_NVS_MAP: + for k in CHIP_NVS_MAP[ns]: + values.append(str(CHIP_NVS_MAP[ns][k]['value'])) + return ','.join(values) diff --git a/tools/mfg_tool/chip_nvs_keys.py b/tools/mfg_tool/chip_nvs_keys.py deleted file mode 100644 index efeadb0ff..000000000 --- a/tools/mfg_tool/chip_nvs_keys.py +++ /dev/null @@ -1,69 +0,0 @@ -#!/usr/bin/env python3 - -# Copyright 2022 Espressif Systems (Shanghai) PTE LTD -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -This file contains the CHIP specific key along with the data type and encoding format -""" - -CHIP_KEY_MAP = { - 'chip-factory': { - 'discriminator': { - 'type': 'data', - 'encoding': 'u32', - }, - 'pin-code': { - 'type': 'data', - 'encoding': 'u32', - }, - 'iteration-count': { - 'type': 'data', - 'encoding': 'u32', - }, - 'salt': { - 'type': 'data', - 'encoding': 'string', - }, - 'verifier': { - 'type': 'data', - 'encoding': 'string', - }, - 'dac-cert': { - 'type': 'file', - 'encoding': 'binary', - }, - 'dac-key': { - 'type': 'file', - 'encoding': 'binary', - }, - 'dac-pub-key': { - 'type': 'file', - 'encoding': 'binary', - }, - 'pai-cert': { - 'type': 'file', - 'encoding': 'binary', - }, - 'cert-dclrn': { - 'type': 'file', - 'encoding': 'binary', - }, - 'unique-id': { - 'type': 'data', - 'encoding': 'hex2bin', - }, - }, -} - diff --git a/tools/mfg_tool/mfg_tool.py b/tools/mfg_tool/mfg_tool.py index 1efbd1af6..d5fddf492 100755 --- a/tools/mfg_tool/mfg_tool.py +++ b/tools/mfg_tool/mfg_tool.py @@ -28,11 +28,9 @@ import logging import binascii import argparse import pyqrcode -import subprocess -import cryptography.hazmat.backends -import cryptography.x509 -from chip_nvs_keys import CHIP_KEY_MAP +from chip_nvs import * +from utils import * if not os.getenv('IDF_PATH'): logging.error("IDF_PATH environment variable is not set") @@ -41,49 +39,34 @@ if not os.getenv('IDF_PATH'): # TODO: Handle count > 100 case. spake2p gen-verifier doesn't support count > 100 TOOLS = { - 'spake2p' : None, - 'chip-cert' : None, + 'spake2p': None, + 'chip-cert': None, 'chip-tool': None, - 'mfg_gen' : None, + 'mfg_gen': None, } PAI = { 'cert_pem': None, 'cert_der': None, - 'key_pem' : None, - 'key_der' : None, + 'key_pem': None, + 'key_der': None, } OUT_DIR = { - 'top' : None, + 'top': None, 'chip': None, } OUT_FILE = { - 'config_csv' : None, - 'chip_mcsv' : None, - 'mcsv' : None, + 'config_csv': None, + 'chip_mcsv': None, + 'mcsv': None, 'pin_csv': None, 'pin_disc_csv': None, } -UNIQUE_ID_LEN = 16 # 16 bytes (128-bits) - -INVALID_PASSCODES = [ 00000000, 11111111, 22222222, 33333333, 44444444, 55555555, - 66666666, 77777777, 88888888, 99999999, 12345678, 87654321 ] - UUIDs = list() -# Lengths for manual pairing codes and qrcode -SHORT_MANUALCODE_LEN = 11 -LONG_MANUALCODE_LEN = 21 -QRCODE_LEN = 22 - -def vid_pid_str(vid, pid): - return '_'.join([hex(vid)[2:], hex(pid)[2:]]) - -def disc_pin_str(discriminator, passcode): - return '_'.join([hex(discriminator)[2:], hex(passcode)[2:]]) def check_tools_exists(): TOOLS['spake2p'] = shutil.which('spake2p') @@ -112,17 +95,6 @@ def check_tools_exists(): logging.debug('chip-tool: {}'.format(TOOLS['chip-tool'])) logging.debug('mfg_gen: {}'.format(TOOLS['mfg_gen'])) -def execute_cmd(cmd): - logging.debug('Executing Command: {}'.format(cmd)) - status = subprocess.run(cmd, capture_output=True) - - try: - status.check_returncode() - except subprocess.CalledProcessError as e: - if status.stderr: - logging.error('[stderr]: {}'.format(status.stderr.decode('utf-8').strip())) - logging.error('Command failed with error: {}'.format(e)) - sys.exit(1) def generate_passcodes(args): iter_count_max = 10000 @@ -142,6 +114,7 @@ def generate_passcodes(args): execute_cmd(cmd) + def generate_discriminators(args): discriminators = list() @@ -154,6 +127,7 @@ def generate_discriminators(args): return discriminators + # Append discriminator to each line of the passcode file def append_discriminator(discriminator): with open(OUT_FILE['pin_csv'], 'r') as fd: @@ -168,15 +142,11 @@ def append_discriminator(discriminator): os.remove(OUT_FILE['pin_csv']) + # Generates the csv file containing chip specific keys and keys provided by user in csv file def generate_config_csv(args): logging.info("Generating Config CSV...") - - csv_data = '' - for k, v in CHIP_KEY_MAP.items(): - csv_data += k + ',' + 'namespace,' + '\n' - for k1, v1 in v.items(): - csv_data += k1 + ',' + v1['type'] + ',' + v1['encoding'] + '\n' + csv_data = chip_nvs_get_config_csv() # Read data from the user provided csv file if args.csv: @@ -186,60 +156,25 @@ def generate_config_csv(args): with open(OUT_FILE['config_csv'], 'w') as f: f.write(csv_data) + def write_chip_mcsv_header(args): logging.info('Writing chip manifest CSV header...') - - keys = list() - for _, v in CHIP_KEY_MAP.items(): - for k1, _ in v.items(): - keys.append(k1) - - mcsv_header = ','.join(keys) + '\n' - + mcsv_header = chip_get_keys_as_csv() + '\n' with open(OUT_FILE['chip_mcsv'], 'w') as f: f.write(mcsv_header) + def append_chip_mcsv_row(row_data, args): logging.info('Appending chip master CSV row...') - with open(OUT_FILE['chip_mcsv'], 'a') as f: f.write(row_data + '\n') -# Convert the certificate in PEM format to DER format -def convert_x509_cert_from_pem_to_der(pem_file, out_der_file): - with open(pem_file, 'rb') as f: - pem_data = f.read() - - pem_cert = cryptography.x509.load_pem_x509_certificate(pem_data, cryptography.hazmat.backends.default_backend()) - der_cert = pem_cert.public_bytes(cryptography.hazmat.primitives.serialization.Encoding.DER) - - with open(out_der_file, 'wb') as f: - f.write(der_cert) - -# Generate the Public and Private key pair binaries -def generate_keypair_bin(pem_file, out_privkey_bin, out_pubkey_bin): - with open(pem_file, 'rb') as f: - pem_data = f.read() - - key_pem = cryptography.hazmat.primitives.serialization.load_pem_private_key(pem_data, None) - private_number_val = key_pem.private_numbers().private_value - public_number_x = key_pem.public_key().public_numbers().x - public_number_y = key_pem.public_key().public_numbers().y - public_key_first_byte = 0x04 - - with open(out_privkey_bin, 'wb') as f: - f.write(private_number_val.to_bytes(32, byteorder='big')) - - with open(out_pubkey_bin, 'wb') as f: - f.write(public_key_first_byte.to_bytes(1, byteorder='big')) - f.write(public_number_x.to_bytes(32, byteorder='big')) - f.write(public_number_y.to_bytes(32, byteorder='big')) def generate_pai(args, ca_key, ca_cert, out_key, out_cert): cmd = [ TOOLS['chip-cert'], 'gen-att-cert', '--type', 'i', - '--subject-cn', '"{} PAI {}"'.format(args.subject_cn_prefix, '00'), + '--subject-cn', '"{} PAI {}"'.format(args.cn_prefix, '00'), '--out-key', out_key, '--out', out_cert, ] @@ -260,17 +195,18 @@ def generate_pai(args, ca_key, ca_cert, out_key, out_cert): logging.info('Generated PAI certificate: {}'.format(out_cert)) logging.info('Generated PAI private key: {}'.format(out_key)) + def generate_dac(iteration, args, discriminator, passcode, ca_key, ca_cert): - out_key_pem = os.sep.join([OUT_DIR['top'], UUIDs[iteration], 'internal', 'DAC_key.pem']) + out_key_pem = os.sep.join([OUT_DIR['top'], UUIDs[iteration], 'internal', 'DAC_key.pem']) out_cert_pem = out_key_pem.replace('key.pem', 'cert.pem') out_cert_der = out_key_pem.replace('key.pem', 'cert.der') out_private_key_bin = out_key_pem.replace('key.pem', 'private_key.bin') - out_public_key_bin = out_key_pem.replace('key.pem', 'public_key.bin') + out_public_key_bin = out_key_pem.replace('key.pem', 'public_key.bin') cmd = [ TOOLS['chip-cert'], 'gen-att-cert', '--type', 'd', - '--subject-cn', '"{} DAC {}"'.format(args.subject_cn_prefix, iteration), + '--subject-cn', '"{} DAC {}"'.format(args.cn_prefix, iteration), '--out-key', out_key_pem, '--out', out_cert_pem, ] @@ -281,10 +217,10 @@ def generate_dac(iteration, args, discriminator, passcode, ca_key, ca_cert): cmd.extend(['--valid-from', str(args.valid_from)]) cmd.extend(['--subject-vid', hex(args.vendor_id)[2:], - '--subject-pid', hex(args.product_id)[2:], - '--ca-key', ca_key, - '--ca-cert', ca_cert, - ]) + '--subject-pid', hex(args.product_id)[2:], + '--ca-key', ca_key, + '--ca-cert', ca_cert, + ]) execute_cmd(cmd) logging.info('Generated DAC certificate: {}'.format(out_cert_pem)) @@ -299,6 +235,7 @@ def generate_dac(iteration, args, discriminator, passcode, ca_key, ca_cert): return out_cert_der, out_private_key_bin, out_public_key_bin + def use_dac_from_args(args): logging.info('Using DAC from command line arguments...') logging.info('DAC Certificate: {}'.format(args.dac_cert)) @@ -307,7 +244,7 @@ def use_dac_from_args(args): # There should be only one UUID in the UUIDs list if DAC is specified out_cert_der = os.sep.join([OUT_DIR['top'], UUIDs[0], 'internal', 'DAC_cert.der']) out_private_key_bin = out_cert_der.replace('cert.der', 'private_key.bin') - out_public_key_bin = out_cert_der.replace('cert.der', 'public_key.bin') + out_public_key_bin = out_cert_der.replace('cert.der', 'public_key.bin') convert_x509_cert_from_pem_to_der(args.dac_cert, out_cert_der) logging.info('Generated DAC certificate in DER format: {}'.format(out_cert_der)) @@ -318,17 +255,18 @@ def use_dac_from_args(args): return out_cert_der, out_private_key_bin, out_public_key_bin + def setup_out_dirs(vid, pid, count): - OUT_DIR['top'] = os.sep.join(['out', vid_pid_str(vid, pid)]) + OUT_DIR['top'] = os.sep.join(['out', vid_pid_str(vid, pid)]) OUT_DIR['stage'] = os.sep.join(['out', vid_pid_str(vid, pid), 'staging']) os.makedirs(OUT_DIR['top'], exist_ok=True) os.makedirs(OUT_DIR['stage'], exist_ok=True) - OUT_FILE['config_csv'] = os.sep.join([OUT_DIR['stage'], 'config.csv']) - OUT_FILE['chip_mcsv'] = os.sep.join([OUT_DIR['stage'], 'chip_master.csv']) - OUT_FILE['mcsv'] = os.sep.join([OUT_DIR['stage'], 'master.csv']) - OUT_FILE['pin_csv'] = os.sep.join([OUT_DIR['stage'], 'pin.csv']) + OUT_FILE['config_csv'] = os.sep.join([OUT_DIR['stage'], 'config.csv']) + OUT_FILE['chip_mcsv'] = os.sep.join([OUT_DIR['stage'], 'chip_master.csv']) + OUT_FILE['mcsv'] = os.sep.join([OUT_DIR['stage'], 'master.csv']) + OUT_FILE['pin_csv'] = os.sep.join([OUT_DIR['stage'], 'pin.csv']) OUT_FILE['pin_disc_csv'] = os.sep.join([OUT_DIR['stage'], 'pin_disc.csv']) # Create directories to store the generated files @@ -337,6 +275,7 @@ def setup_out_dirs(vid, pid, count): UUIDs.append(uuid_str) os.makedirs(os.sep.join([OUT_DIR['top'], uuid_str, 'internal']), exist_ok=True) + def generate_passcodes_and_discriminators(args): # Generate passcodes using spake2p tool generate_passcodes(args) @@ -345,17 +284,19 @@ def generate_passcodes_and_discriminators(args): # Append discriminators to passcodes file append_discriminator(discriminators) -def setup_csv_files(args): + +def write_csv_files(args): generate_config_csv(args) write_chip_mcsv_header(args) + def setup_root_certs(args): # If PAA is passed as input, then generate PAI certificate if args.paa: # output file names PAI['cert_pem'] = os.sep.join([OUT_DIR['stage'], 'pai_cert.pem']) PAI['cert_der'] = os.sep.join([OUT_DIR['stage'], 'pai_cert.der']) - PAI['key_pem'] = os.sep.join([OUT_DIR['stage'], 'pai_key.pem']) + PAI['key_pem'] = os.sep.join([OUT_DIR['stage'], 'pai_key.pem']) generate_pai(args, args.key, args.cert, PAI['key_pem'], PAI['cert_pem']) convert_x509_cert_from_pem_to_der(PAI['cert_pem'], PAI['cert_der']) @@ -364,51 +305,53 @@ def setup_root_certs(args): # If PAI is passed as input, generate DACs elif args.pai: PAI['cert_pem'] = args.cert - PAI['key_pem'] = args.key + PAI['key_pem'] = args.key PAI['cert_der'] = os.sep.join([OUT_DIR['stage'], 'pai_cert.der']) convert_x509_cert_from_pem_to_der(PAI['cert_pem'], PAI['cert_der']) logging.info('Generated PAI certificate in DER format: {}'.format(PAI['cert_der'])) - # No need to verify else block as validate_args() already checks for this -# Generates DACs, add the required items to the master.csv file, and -# generates the onboarding data i.e. manual codes and QR codes and image -def generate_dacs_and_onb_data(args): +# This function generates the DACs, picks the commissionable data from the already present csv file, +# and generates the onboarding payloads, and writes everything to the master csv +def write_per_device_unique_data(args): with open(OUT_FILE['pin_disc_csv'], 'r') as csvf: pin_disc_dict = csv.DictReader(csvf) - # If any key is added to chip_nvs_keys.py make sure to maintain that order below for row in pin_disc_dict: - row_items = list() - - row_items.append(row['Discriminator']) - # TODO: remove PIN Code, spec says it should not be included in nvs storage - row_items.append(row['PIN Code']) - row_items.append(row['Iteration Count']) - row_items.append(row['Salt']) - row_items.append(row['Verifier']) + chip_factory_update('discriminator', row['Discriminator']) + chip_factory_update('iteration-count', row['Iteration Count']) + chip_factory_update('salt', row['Salt']) + chip_factory_update('verifier', row['Verifier']) if args.dac_key is not None and args.dac_cert is not None: dacs = use_dac_from_args(args) else: - dacs = generate_dac(int(row['Index']), args, int(row['Discriminator']), int(row['PIN Code']), PAI['key_pem'], PAI['cert_pem']) + dacs = generate_dac(int(row['Index']), args, int(row['Discriminator']), + int(row['PIN Code']), PAI['key_pem'], PAI['cert_pem']) - # This appends DAC cert, private key, and public key - row_items.extend([os.path.abspath(x) for x in dacs]) + chip_factory_update('dac-cert', os.path.abspath(dacs[0])) + chip_factory_update('dac-key', os.path.abspath(dacs[1])) + chip_factory_update('dac-pub-key', os.path.abspath(dacs[2])) - row_items.append(os.path.abspath(PAI['cert_der'])) - row_items.append(os.path.abspath(args.cert_dclrn)) + chip_factory_update('pai-cert', os.path.abspath(PAI['cert_der'])) + chip_factory_update('cert-dclrn', os.path.abspath(args.cert_dclrn)) - # Unique-id - row_items.append(binascii.b2a_hex(os.urandom(UNIQUE_ID_LEN)).decode('utf-8')) + # If unique id is not passed, then generate one + if (args.unique_id is None): + chip_config_update('unique-id', binascii.b2a_hex(os.urandom(int(MAX_UNIQUE_ID_LEN / 2))).decode('utf-8')) - mcsv_row_data = ','.join(row_items) + # If serial number is not passed, then generate one + if (args.serial_num is None): + chip_factory_append('serial-num', 'data', 'string', binascii.b2a_hex(os.urandom(SERIAL_NUMBER_LEN)).decode('utf-8')) + + mcsv_row_data = chip_get_values_as_csv() append_chip_mcsv_row(mcsv_row_data, args) # Generate onboarding data generate_onboarding_data(args, int(row['Index']), int(row['Discriminator']), int(row['PIN Code'])) + def merge_chip_mcsv_and_user_mcsv(args): logging.info('Merging chip master CSV and user master CSV...') @@ -426,13 +369,15 @@ def merge_chip_mcsv_and_user_mcsv(args): logging.error('Chip and user mcsv files have different number of rows') sys.exit(1) - chip_mcsv_data = [','.join((c_line.strip(), u_line.strip())) + '\n' for c_line, u_line in zip(chip_mcsv_data, user_mcsv_data)] + chip_mcsv_data = [','.join((c_line.strip(), u_line.strip())) + '\n' for c_line, + u_line in zip(chip_mcsv_data, user_mcsv_data)] with open(OUT_FILE['mcsv'], 'w') as f: f.write(''.join(chip_mcsv_data)) os.remove(OUT_FILE['chip_mcsv']) + def organize_output_files(suffix): for i in range(len(UUIDs)): dest_path = os.sep.join([OUT_DIR['top'], UUIDs[i]]) @@ -454,70 +399,27 @@ def organize_output_files(suffix): os.rmdir(os.sep.join([OUT_DIR['top'], 'bin'])) os.rmdir(os.sep.join([OUT_DIR['top'], 'csv'])) + def generate_partitions(suffix, size): cmd = [ - TOOLS['mfg_gen'], 'generate', + 'python3', TOOLS['mfg_gen'], 'generate', OUT_FILE['config_csv'], OUT_FILE['mcsv'], suffix, hex(size), '--outdir', OUT_DIR['top'] ] execute_cmd(cmd) -def get_manualcode_args(args, discriminator, passcode): - payload_args = list() - payload_args.append('--discriminator') - payload_args.append(str(discriminator)) - payload_args.append('--setup-pin-code') - payload_args.append(str(passcode)) - payload_args.append('--version') - payload_args.append('0') - payload_args.append('--vendor-id') - payload_args.append(str(args.vendor_id)) - payload_args.append('--product-id') - payload_args.append(str(args.product_id)) - payload_args.append('--commissioning-mode') - payload_args.append(str(args.commissioning_flow)) - return payload_args - -def get_qrcode_args(args, discriminator, passcode): - payload_args = get_manualcode_args(args, discriminator, passcode) - payload_args.append('--rendezvous') - payload_args.append(str(1 << args.discovery_mode)) - return payload_args - -def get_chip_qrcode(payload_args): - cmd_args = [TOOLS['chip-tool'], 'payload', 'generate-qrcode'] - cmd_args.extend(payload_args) - data = subprocess.check_output(cmd_args) - - # Command output is as below: - # \x1b[0;32m[1655386003372] [23483:7823617] CHIP: [TOO] QR Code: MT:Y.K90-WB010E7648G00\x1b[0m - return data.decode('utf-8').split('QR Code: ')[1][:QRCODE_LEN] - -def get_chip_manualcode(payload_args, commissioning_flow): - cmd_args = [TOOLS['chip-tool'], 'payload', 'generate-manualcode'] - cmd_args.extend(payload_args) - data = subprocess.check_output(cmd_args) - - # Command output is as below: - # \x1b[0;32m[1655386909774] [24424:7837894] CHIP: [TOO] Manual Code: 749721123365521327689\x1b[0m\n - # OR - # \x1b[0;32m[1655386926028] [24458:7838229] CHIP: [TOO] Manual Code: 34972112338\x1b[0m\n - # Length of manual code depends on the commissioning flow: - # For standard commissioning flow it is 11 digits - # For User-intent and custom commissioning flow it is 21 digits - manual_code_len = LONG_MANUALCODE_LEN if commissioning_flow else SHORT_MANUALCODE_LEN - return data.decode('utf-8').split('Manual Code: ')[1][:manual_code_len] def generate_onboarding_data(args, index, discriminator, passcode): - chip_manualcode = get_chip_manualcode(get_manualcode_args(args, discriminator, passcode), args.commissioning_flow) - chip_qrcode = get_chip_qrcode(get_qrcode_args(args, discriminator, passcode)) + chip_manualcode = get_chip_manualcode(TOOLS['chip-tool'], args.vendor_id, args.product_id, + args.commissioning_flow, discriminator, passcode) + chip_qrcode = get_chip_qrcode(TOOLS['chip-tool'], args.vendor_id, args.product_id, + args.commissioning_flow, discriminator, passcode, args.discovery_mode) logging.info('Generated QR code: ' + chip_qrcode) logging.info('Generated manual code: ' + chip_manualcode) - # TODO: Append commissioning flow, discovery mode, passcode, discriminator as well? - csv_data = 'qrcode,manualcode\n' - csv_data += chip_qrcode + ',' + chip_manualcode + '\n' + csv_data = 'qrcode,manualcode,discriminator,passcode\n' + csv_data += chip_qrcode + ',' + chip_manualcode + ',' + str(discriminator) + ',' + str(passcode) + '\n' onboarding_data_file = os.sep.join([OUT_DIR['top'], UUIDs[index], '{}-onb_codes.csv'.format(UUIDs[index])]) with open(onboarding_data_file, 'w') as f: @@ -530,141 +432,158 @@ def generate_onboarding_data(args, index, discriminator, passcode): logging.info('Generated onboarding data and QR Code') -def validate_args(args): - # csv and mcsv both should present or none - if (args.csv is not None) != (args.mcsv is not None): - logging.error("csv and mcsv should be both present or none") - sys.exit(1) - else: - # Read the number of lines in mcsv file - if args.mcsv is not None: - with open(args.mcsv, 'r') as f: - lines = sum(1 for line in f) - - # Subtract 1 for the header line - args.count = lines - 1 - - # Validate the passcode - if args.passcode is not None: - if ((args.passcode < 0x0000001 and args.passcode > 0x5F5E0FE) or (args.passcode in INVALID_PASSCODES)): - logging.error('Invalid passcode' + str(args.passcode)) - sys.exit(1) - - # Validate the discriminator - if (args.discriminator is not None) and (args.discriminator not in range(0x0000, 0x0FFF)): - logging.error('Invalid discriminator ' + str(args.discriminator)) - sys.exit(1) - - if args.unique_id is not None: - if len(args.unique_id) < (UNIQUE_ID_LEN * 2): - logging.error('Unique ID is too short') - sys.exit(1) - - if (len(args.unique_id) % 2) != 0: - logging.error('Unique ID is not even length') - sys.exit(1) - - # Check for a valid hex string - try: - int(args.unique_id, 16) - except ValueError: - logging.error('Unique ID is not a valid hex string') - sys.exit(1) - - # DAC key and DAC cert both should be present or none - if (args.dac_key is not None) != (args.dac_cert is not None): - logging.error("dac_key and dac_cert should be both present or none") - sys.exit(1) - else: - # Make sure PAI certificate is present if DAC is present - if (args.dac_key is not None) and (args.pai is False): - logging.error('Please provide PAI certificate along with DAC certificate and DAC key') - sys.exit(1) - - # If unique_id/discriminator/passcode/DAC is present then we are restricting - # the number of partitions to 1 - if (args.discriminator is not None - or args.passcode is not None - or args.unique_id is not None - or args.dac_key is not None): - if args.count > 1: - logging.error('Number of partitions should be 1 when unique_id or discriminator or passcode or DAC is present') - sys.exit(1) - - # Validate the input certificate type, if DAC is not present - if args.dac_key is None and args.dac_cert is None: - if args.paa: - logging.info('Input Root certificate type PAA') - elif args.pai: - logging.info('Input Root certificate type PAI') - else: - logging.error('Either PAA or PAI certificate is required') - sys.exit(1) - - # Check if Key and certificate are present - if args.key is None or args.cert is None: - logging.error('PAA key and certificate are required') - sys.exit(1) - - logging.info('Number of manufacturing NVS images to generate: {}'.format(args.count)) def get_args(): def any_base_int(s): return int(s, 0) - parser = argparse.ArgumentParser(description='Manufacuring partition generator tool') - parser.add_argument('-n', '--count', type=any_base_int, default=1, help='The number of manufacturing partition binaries to generate. Default is 1. If --csv and --mcsv are present, the number of lines in the mcsv file is used.') - parser.add_argument('-s', '--size', type=any_base_int, default=0x6000, help='The size of manufacturing partition binaries to generate. Default is 0x6000.') - parser.add_argument('-cn', '--subject-cn-prefix', type=str, default='ESP32', help='The common name prefix of the subject of the generated certificate.') + parser = argparse.ArgumentParser(description='Manufacuring partition generator tool', + formatter_class=lambda prog: argparse.HelpFormatter(prog, max_help_position=50)) - parser.add_argument('-v', '--vendor-id', type=any_base_int, required=True, help='The vendor ID.') - parser.add_argument('-p', '--product-id', type=any_base_int, required=True, help='The product ID.') + g_gen = parser.add_argument_group('General options') + g_gen.add_argument('-n', '--count', type=any_base_int, default=1, + help='The number of manufacturing partition binaries to generate. Default is 1. \ + If --csv and --mcsv are present, the number of lines in the mcsv file is used.') + g_gen.add_argument('-s', '--size', type=any_base_int, default=0x6000, + help='The size of manufacturing partition binaries to generate. Default is 0x6000.') - input_cert_group = parser.add_mutually_exclusive_group(required=True) + g_commissioning = parser.add_argument_group('Commisioning options') + g_commissioning.add_argument('--passcode', type=any_base_int, + help='The passcode for pairing. Randomly generated if not specified.') + g_commissioning.add_argument('--discriminator', type=any_base_int, + help='The discriminator for pairing. Randomly generated if not specified.') + g_commissioning.add_argument('-cf', '--commissioning-flow', type=any_base_int, default=0, + help='Device commissioning flow, 0:Standard, 1:User-Intent, 2:Custom. \ + Default is 0.', choices=[0, 1, 2]) + g_commissioning.add_argument('-dm', '--discovery-mode', type=any_base_int, default=1, + help='Commissionable device discovery netowrking technology. \ + 0:WiFi-SoftAP, 1:BLE, 2:On-network. Default is BLE.', choices=[0, 1, 2]) + + g_dac = parser.add_argument_group('Device attestation credential options') + g_dac.add_argument('-cn', '--cn-prefix', type=str, default='ESP32', + help='The common name prefix of the subject of the generated certificate.') + g_dac.add_argument('-lt', '--lifetime', default=4294967295, type=any_base_int, + help='Lifetime of the generated certificate. Default is 4294967295 if not specified, \ + this indicate that certificate does not have well defined expiration date.') + g_dac.add_argument('-vf', '--valid-from', type=str, + help='The start date for the certificate validity period in format --
[ :: ]. \ + Default is current date.') + # If DAC is present then PAI key is not required, so it is marked as not required here + # but, if DAC is not present then PAI key is required and that case is validated in validate_args() + g_dac.add_argument('-c', '--cert', type=str, required=True, help='The input certificate file in PEM format.') + g_dac.add_argument('-k', '--key', type=str, required=False, help='The input key file in PEM format.') + g_dac.add_argument('-cd', '--cert-dclrn', type=str, required=True, help='The certificate declaration file in DER format.') + g_dac.add_argument('--dac-cert', type=str, help='The input DAC certificate file in PEM format.') + g_dac.add_argument('--dac-key', type=str, help='The input DAC private key file in PEM format.') + input_cert_group = g_dac.add_mutually_exclusive_group(required=True) input_cert_group.add_argument('--paa', action='store_true', help='Use input certificate as PAA certificate.') input_cert_group.add_argument('--pai', action='store_true', help='Use input certificate as PAI certificate.') - # If DAC is present then PAI key is not required, so it is marked as not required here - # but, if DAC is not present then PAI key is required and that case is validated in validate_args() - parser.add_argument('-c', '--cert', type=str, required=True, help='The input certificate file in PEM format.') - parser.add_argument('-k', '--key', type=str, required=False, help='The input key file in PEM format.') + g_dev_inst_info = parser.add_argument_group('Device instance information options') + g_dev_inst_info.add_argument('-v', '--vendor-id', type=any_base_int, required=False, help='Vendor id') + g_dev_inst_info.add_argument('--vendor-name', type=str, required=False, help='Vendor name') + g_dev_inst_info.add_argument('-p', '--product-id', type=any_base_int, required=False, help='Product id') + g_dev_inst_info.add_argument('--product-name', type=str, required=False, help='Product name') + g_dev_inst_info.add_argument('--hw-ver', type=any_base_int, required=False, help='Hardware version') + g_dev_inst_info.add_argument('--hw-ver-str', type=str, required=False, help='Hardware version string') + g_dev_inst_info.add_argument('--mfg-date', type=str, required=False, help='Manufacturing date in format YYYY-MM-DD') + g_dev_inst_info.add_argument('--serial-num', type=str, required=False, help='Serial number') + g_dev_inst_info.add_argument('--rd-id-uid', type=str, required=False, + help='128-bit unique identifier for generating rotating device identifier, provide 32-byte hex string, e.g. "1234567890abcdef1234567890abcdef"') - parser.add_argument('-cd', '--cert-dclrn', type=str, required=True, help='The certificate declaration file in DER format.') + g_dev_inst = parser.add_argument_group('Device instance options') + g_dev_inst.add_argument('--calendar-types', type=str, nargs='+', required=False, + help='List of supported calendar types. Supported Calendar Types: Buddhist, Chinese, Coptic, \ + Ethiopian, Gregorian, Hebrew, Indian, Islamic, Japanese, Korean, Persian, Taiwanese') + g_dev_inst.add_argument('--locales', type=str, nargs='+', required=False, + help='List of supported locales, Language Tag as defined by BCP47, eg. en-US en-GB') + g_dev_inst.add_argument('--fixed-labels', type=str, nargs='+', required=False, + help='List of fixed labels, eg: "0/orientation/up" "1/orientation/down" "2/orientation/down"') - parser.add_argument('--dac-cert', type=str, help='The input DAC certificate file in PEM format.') - parser.add_argument('--dac-key', type=str, help='The input DAC private key file in PEM format.'); + g_basic = parser.add_argument_group('Few more Basic clusters options') + g_basic.add_argument('--product-label', type=str, required=False, help='Product label') + g_basic.add_argument('--product-url', type=str, required=False, help='Product URL') + g_basic.add_argument('--unique-id', type=str, required=False, help='Unique identifier in Basic cluster') - parser.add_argument('-lt', '--lifetime', default=4294967295, type=any_base_int, help='Lifetime of the generated certificate. Default is 4294967295 if not specified, this indicate that certificate does not have well defined expiration date.') - parser.add_argument('-vf', '--valid-from', type=str, help='The start date for the certificate validity period in format --
[ :: ]. Default is current date.') - - parser.add_argument('--passcode', type=any_base_int, help='The passcode for pairing. Randomly generated if not specified.') - parser.add_argument('--discriminator', type=any_base_int, help='The discriminator for pairing. Randomly generated if not specified.') - - parser.add_argument('-cf', '--commissioning-flow', type=any_base_int, default=0, help='Device commissioning flow, 0:Standard, 1:User-Intent, 2:Custom. Default is 0.', choices=[0, 1, 2]) - parser.add_argument('-dm', '--discovery-mode', type=any_base_int, default=1, help='Commissionable device discovery netowrking technology. 0:WiFi-SoftAP, 1:BLE, 2:On-network. Default is BLE.', choices=[0, 1, 2]) - - parser.add_argument('--csv', type=str, help='CSV file containing the partition schema for extra options. [REF: https://docs.espressif.com/projects/esp-idf/en/latest/esp32/api-reference/storage/mass_mfg.html#csv-configuration-file]') - parser.add_argument('--mcsv', type=str, help='Master CSV file containig optional/extra values specified by the user. [REF: https://docs.espressif.com/projects/esp-idf/en/latest/esp32/api-reference/storage/mass_mfg.html#master-value-csv-file]') - - parser.add_argument('-u', '--unique-id', type=str, help='Unique Identifier for the device. This should be at least 128-bit long. If not specified, a random unique id is generated.') + g_extra_info = parser.add_argument_group('Extra information options using csv files') + g_extra_info.add_argument('--csv', type=str, help='CSV file containing the partition schema for extra options. \ + [REF: https://docs.espressif.com/projects/esp-idf/en/latest/esp32/api-reference/storage/mass_mfg.html#csv-configuration-file]') + g_extra_info.add_argument('--mcsv', type=str, help='Master CSV file containig optional/extra values specified by the user. \ + [REF: https://docs.espressif.com/projects/esp-idf/en/latest/esp32/api-reference/storage/mass_mfg.html#master-value-csv-file]') return parser.parse_args() + +def add_optional_KVs(args): + # Device instance information + if args.vendor_id is not None: + chip_factory_append('vendor-id', 'data', 'u32', args.vendor_id) + if args.vendor_name is not None: + chip_factory_append('vendor-name', 'data', 'string', args.vendor_name) + if args.product_id is not None: + chip_factory_append('product-id', 'data', 'u32', args.product_id) + if args.product_name is not None: + chip_factory_append('product-name', 'data', 'string', args.product_name) + if args.hw_ver is not None: + chip_factory_append('hardware-ver', 'data', 'u32', args.hw_ver) + if args.hw_ver_str is not None: + chip_factory_append('hw-ver-str', 'data', 'string', args.hw_ver_str) + if args.mfg_date is not None: + chip_factory_append('mfg-date', 'data', 'string', args.mfg_date) + if args.rd_id_uid is not None: + chip_factory_append('rd-id-uid', 'data', 'hex2bin', args.rd_id_uid) + + # Add the serial-num + chip_factory_append('serial-num', 'data', 'string', args.serial_num) + + # Device information + if args.calendar_types is not None: + chip_factory_append('cal-types', 'data', 'u32', calendar_types_to_uint32(args.calendar_types)) + + # Supported locale is stored as multiple entries, key format: "locale/, example key: "locale/0" + if (args.locales is not None): + chip_factory_append('locale-sz', 'data', 'u32', len(args.locales)) + for i in range(len(args.locales)): + chip_factory_append('locale/{:x}'.format(i), 'data', 'string', args.locales[i]) + + # Each endpoint can contains the fixed lables + # - fl-sz/ : number of fixed labels for the endpoint + # - fl-k// : fixed label key for the endpoint and index + # - fl-v// : fixed label value for the endpoint and index + if (args.fixed_labels is not None): + dict = get_fixed_label_dict(args.fixed_labels) + for key in dict.keys(): + chip_factory_append('fl-sz/{:x}'.format(int(key)), 'data', 'u32', len(dict[key])) + + for i in range(len(dict[key])): + entry = dict[key][i] + chip_factory_append('fl-k/{:x}/{:x}'.format(int(key), i), 'data', 'string', list(entry.keys())[0]) + chip_factory_append('fl-v/{:x}/{:x}'.format(int(key), i), 'data', 'string', list(entry.values())[0]) + + # Keys from basic clusters + if args.product_label is not None: + chip_factory_append('product-label', 'data', 'string', args.product_label) + if args.product_url is not None: + chip_factory_append('product-url', 'data', 'string', args.product_url) + if args.unique_id is not None: + chip_config_update('unique-id', args.unique_id) + + def main(): logging.basicConfig(format='[%(asctime)s] [%(levelname)7s] - %(message)s', level=logging.INFO) args = get_args() validate_args(args) - check_tools_exists() - setup_out_dirs(args.vendor_id, args.product_id, args.count) + add_optional_KVs(args) generate_passcodes_and_discriminators(args) - setup_csv_files(args) + write_csv_files(args) setup_root_certs(args) - generate_dacs_and_onb_data(args) + write_per_device_unique_data(args) merge_chip_mcsv_and_user_mcsv(args) generate_partitions('matter_partition', args.size) organize_output_files('matter_partition') + if __name__ == "__main__": main() diff --git a/tools/mfg_tool/requirements.txt b/tools/mfg_tool/requirements.txt index ef6ce14e3..52a0dbdfc 100644 --- a/tools/mfg_tool/requirements.txt +++ b/tools/mfg_tool/requirements.txt @@ -1,3 +1,4 @@ +bitarray==2.6.0 cryptography==36.0.2 cffi==1.15.0 future==0.18.2 diff --git a/tools/mfg_tool/utils.py b/tools/mfg_tool/utils.py new file mode 100644 index 000000000..e1d1abd1b --- /dev/null +++ b/tools/mfg_tool/utils.py @@ -0,0 +1,333 @@ +#!/usr/bin/env python3 + +# Copyright 2022 Espressif Systems (Shanghai) PTE LTD +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Contains utilitiy functions for validating argument, certs/keys conversion, etc. +""" + +import sys +import enum +import logging +import subprocess +from bitarray import bitarray +from bitarray.util import ba2int +import cryptography.hazmat.backends +import cryptography.x509 + + +MAX_UNIQUE_ID_LEN = 32 +SERIAL_NUMBER_LEN = 16 + + +# Lengths for manual pairing codes and qrcode +SHORT_MANUALCODE_LEN = 11 +LONG_MANUALCODE_LEN = 21 +QRCODE_LEN = 22 + + +INVALID_PASSCODES = [00000000, 11111111, 22222222, 33333333, 44444444, 55555555, + 66666666, 77777777, 88888888, 99999999, 12345678, 87654321] + + +class CalendarTypes(enum.Enum): + Buddhist = 0 + Chinese = 1 + Coptic = 2 + Ethiopian = 3 + Gregorian = 4 + Hebrew = 5 + Indian = 6 + Islamic = 7 + Japanese = 8 + Korean = 9 + Persian = 10 + Taiwanese = 11 + + +def vid_pid_str(vid, pid): + return '_'.join([hex(vid)[2:], hex(pid)[2:]]) + + +def disc_pin_str(discriminator, passcode): + return '_'.join([hex(discriminator)[2:], hex(passcode)[2:]]) + + +# Checks if the input string is a valid hex string +def ishex(s): + try: + n = int(s, 16) + return True + except ValueError: + return False + + +# Validate the input string length against the min and max length +def check_str_range(s, min_len, max_len, name): + if s and ((len(s) < min_len) or (len(s) > max_len)): + logging.error('%s must be between %d and %d characters', name, min_len, max_len) + sys.exit(1) + + +# Validate the input integer range +def check_int_range(value, min_value, max_value, name): + if value and ((value < min_value) or (value > max_value)): + logging.error('%s is out of range, should be in range [%d, %d]', name, min_value, max_value) + sys.exit(1) + + +# Validates discriminator and passcode +def validate_commissionable_data(args): + check_int_range(args.discriminator, 0x0000, 0x0FFF, 'Discriminator') + if args.passcode is not None: + if ((args.passcode < 0x0000001 and args.passcode > 0x5F5E0FE) or (args.passcode in INVALID_PASSCODES)): + logging.error('Invalid passcode' + str(args.passcode)) + sys.exit(1) + + +# Validate the device instance information +def validate_device_instance_info(args): + check_int_range(args.product_id, 0x0000, 0xFFFF, 'Product id') + check_int_range(args.vendor_id, 0x0000, 0xFFFF, 'Vendor id') + check_int_range(args.hw_ver, 0x0000, 0xFFFF, 'Hardware version') + check_str_range(args.serial_num, 1, SERIAL_NUMBER_LEN, 'Serial number') + check_str_range(args.vendor_name, 1, 32, 'Vendor name') + check_str_range(args.product_name, 1, 32, 'Product name') + check_str_range(args.hw_ver_str, 1, 64, 'Hardware version string') + check_str_range(args.mfg_date, 8, 16, 'Manufacturing date') + check_str_range(args.rd_id_uid, 32, 32, 'Rotating device Unique id') + + +# Validate the device information: calendar types and fixed labels +def validate_device_info(args): + # Validate the input calendar types + if args.calendar_types is not None: + if not (set(args.calendar_types) <= set(CalendarTypes.__members__)): + invalid_types = set(args.calendar_types).union(set(CalendarTypes.__members__)) - set(CalendarTypes.__members__) + logging.error('Unknown calendar type/s: %s', invalid_types) + logging.error('Supported calendar types: %s', ', '.join(CalendarTypes.__members__)) + sys.exit(1) + + if args.fixed_labels is not None: + for fl in args.fixed_labels: + _l = fl.split('/') + if len(_l) != 3: + logging.error('Invalid fixed label: %s', fl) + sys.exit(1) + + if not (ishex(_l[0]) and (len(_l[1]) > 0 and len(_l[1]) < 16) and (len(_l[2]) > 0 and len(_l[2]) < 16)): + logging.error('Invalid fixed label: %s', fl) + sys.exit(1) + + +# Validates the attestation related arguments +def validate_attestation_info(args): + # DAC key and DAC cert both should be present or none + if (args.dac_key is not None) != (args.dac_cert is not None): + logging.error("dac_key and dac_cert should be both present or none") + sys.exit(1) + else: + # Make sure PAI certificate is present if DAC is present + if (args.dac_key is not None) and (args.pai is False): + logging.error('Please provide PAI certificate along with DAC certificate and DAC key') + sys.exit(1) + + # Validate the input certificate type, if DAC is not present + if args.dac_key is None and args.dac_cert is None: + if args.paa: + logging.info('Input Root certificate type PAA') + elif args.pai: + logging.info('Input Root certificate type PAI') + else: + logging.error('Either PAA or PAI certificate is required') + sys.exit(1) + + # Check if Key and certificate are present + if args.key is None or args.cert is None: + logging.error('PAA key and certificate are required') + sys.exit(1) + + +# Validates few basic cluster related arguments: product-label and product-url +def validate_basic_cluster_info(args): + check_str_range(args.product_label, 1, 64, 'Product Label') + check_str_range(args.product_url, 1, 256, 'Product URL') + check_str_range(args.unique_id, 1, MAX_UNIQUE_ID_LEN, 'Unique id') + + +# Validates the input arguments, this calls the above functions +def validate_args(args): + # csv and mcsv both should present or none + if (args.csv is not None) != (args.mcsv is not None): + logging.error("csv and mcsv should be both present or none") + sys.exit(1) + else: + # Read the number of lines in mcsv file + if args.mcsv is not None: + with open(args.mcsv, 'r') as f: + lines = sum(1 for line in f) + + # Subtract 1 for the header line + args.count = lines - 1 + + validate_commissionable_data(args) + validate_device_instance_info(args) + validate_device_info(args) + validate_attestation_info(args) + validate_basic_cluster_info(args) + + # If unique_id/discriminator/passcode/DAC/serial_number is present + # then we are restricting the number of partitions to 1 + if (args.discriminator is not None + or args.passcode is not None + or args.unique_id is not None + or args.dac_key is not None + or args.serial_num is not None): + if args.count > 1: + logging.error('Number of partitions should be 1 when unique_id or discriminator or passcode or DAC or serial number is present') + sys.exit(1) + + logging.info('Number of manufacturing NVS images to generate: {}'.format(args.count)) + + +# Supported Calendar types is stored as a bit array in one uint32_t. +def calendar_types_to_uint32(calendar_types): + # In validate_device_info() we have already verified that the calendar types are valid + result = bitarray(32, endian='little') + result.setall(0) + for calendar_type in calendar_types: + result[CalendarTypes[calendar_type].value] = 1 + return ba2int(result) + + +# get_fixed_label_dict() converts the list of strings to per endpoint dictionaries. +# example input : ['0/orientation/up', '1/orientation/down', '2/orientation/down'] +# example outout : {'0': [{'orientation': 'up'}], '1': [{'orientation': 'down'}], '2': [{'orientation': 'down'}]} +def get_fixed_label_dict(fixed_labels): + fl_dict = {} + for fl in fixed_labels: + _l = fl.split('/') + + if len(_l) != 3: + logging.error('Invalid fixed label: %s', fl) + sys.exit(1) + + if not (ishex(_l[0]) and (len(_l[1]) > 0 and len(_l[1]) < 16) and (len(_l[2]) > 0 and len(_l[2]) < 16)): + logging.error('Invalid fixed label: %s', fl) + sys.exit(1) + + if _l[0] not in fl_dict.keys(): + fl_dict[_l[0]] = list() + + fl_dict[_l[0]].append({_l[1]: _l[2]}) + + return fl_dict + + +# Convert the certificate in PEM format to DER format +def convert_x509_cert_from_pem_to_der(pem_file, out_der_file): + with open(pem_file, 'rb') as f: + pem_data = f.read() + + pem_cert = cryptography.x509.load_pem_x509_certificate(pem_data, cryptography.hazmat.backends.default_backend()) + der_cert = pem_cert.public_bytes(cryptography.hazmat.primitives.serialization.Encoding.DER) + + with open(out_der_file, 'wb') as f: + f.write(der_cert) + + +# Generate the Public and Private key pair binaries +def generate_keypair_bin(pem_file, out_privkey_bin, out_pubkey_bin): + with open(pem_file, 'rb') as f: + pem_data = f.read() + + key_pem = cryptography.hazmat.primitives.serialization.load_pem_private_key(pem_data, None) + private_number_val = key_pem.private_numbers().private_value + public_number_x = key_pem.public_key().public_numbers().x + public_number_y = key_pem.public_key().public_numbers().y + public_key_first_byte = 0x04 + + with open(out_privkey_bin, 'wb') as f: + f.write(private_number_val.to_bytes(32, byteorder='big')) + + with open(out_pubkey_bin, 'wb') as f: + f.write(public_key_first_byte.to_bytes(1, byteorder='big')) + f.write(public_number_x.to_bytes(32, byteorder='big')) + f.write(public_number_y.to_bytes(32, byteorder='big')) + + +def execute_cmd(cmd): + logging.debug('Executing Command: {}'.format(cmd)) + status = subprocess.run(cmd, capture_output=True) + + try: + status.check_returncode() + except subprocess.CalledProcessError as e: + if status.stderr: + logging.error('[stderr]: {}'.format(status.stderr.decode('utf-8').strip())) + logging.error('Command failed with error: {}'.format(e)) + sys.exit(1) + + +def get_manualcode_args(vid, pid, flow, discriminator, passcode): + payload_args = list() + payload_args.append('--discriminator') + payload_args.append(str(discriminator)) + payload_args.append('--setup-pin-code') + payload_args.append(str(passcode)) + payload_args.append('--version') + payload_args.append('0') + payload_args.append('--vendor-id') + payload_args.append(str(vid)) + payload_args.append('--product-id') + payload_args.append(str(pid)) + payload_args.append('--commissioning-mode') + payload_args.append(str(flow)) + return payload_args + + +def get_qrcode_args(vid, pid, flow, discriminator, passcode, disc_mode): + payload_args = get_manualcode_args(vid, pid, flow, discriminator, passcode) + payload_args.append('--rendezvous') + payload_args.append(str(1 << disc_mode)) + return payload_args + + +def get_chip_qrcode(chip_tool, vid, pid, flow, discriminator, passcode, disc_mode): + payload_args = get_qrcode_args(vid, pid, flow, discriminator, passcode, disc_mode) + cmd_args = [chip_tool, 'payload', 'generate-qrcode'] + cmd_args.extend(payload_args) + data = subprocess.check_output(cmd_args) + + # Command output is as below: + # \x1b[0;32m[1655386003372] [23483:7823617] CHIP: [TOO] QR Code: MT:Y.K90-WB010E7648G00\x1b[0m + return data.decode('utf-8').split('QR Code: ')[1][:QRCODE_LEN] + + +def get_chip_manualcode(chip_tool, vid, pid, flow, discriminator, passcode): + payload_args = get_manualcode_args(vid, pid, flow, discriminator, passcode) + cmd_args = [chip_tool, 'payload', 'generate-manualcode'] + cmd_args.extend(payload_args) + data = subprocess.check_output(cmd_args) + + # Command output is as below: + # \x1b[0;32m[1655386909774] [24424:7837894] CHIP: [TOO] Manual Code: 749721123365521327689\x1b[0m\n + # OR + # \x1b[0;32m[1655386926028] [24458:7838229] CHIP: [TOO] Manual Code: 34972112338\x1b[0m\n + # Length of manual code depends on the commissioning flow: + # For standard commissioning flow it is 11 digits + # For User-intent and custom commissioning flow it is 21 digits + manual_code_len = LONG_MANUALCODE_LEN if flow else SHORT_MANUALCODE_LEN + return data.decode('utf-8').split('Manual Code: ')[1][:manual_code_len]