mfg_tool: Support device info, device instance info and few more

attributes from basic cluster
This commit is contained in:
Shubham Patil
2022-08-19 17:58:37 +05:30
parent e0c4b5db75
commit 6904fc485b
5 changed files with 661 additions and 350 deletions
+127
View File
@@ -0,0 +1,127 @@
#!/usr/bin/env python3
# Copyright 2022 Espressif Systems (Shanghai) PTE LTD
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
This file contains the CHIP specific key along with the data type and encoding format
"""
# This map contains mandatory CHIP specific key along with the data type and encoding format
# Additionally to add more keys to chip-factory, use chip_factory_append() API
CHIP_NVS_MAP = {
'chip-factory': {
# Commissionable Data
'discriminator': {
'type': 'data',
'encoding': 'u32',
'value': None,
},
'iteration-count': {
'type': 'data',
'encoding': 'u32',
'value': None,
},
'salt': {
'type': 'data',
'encoding': 'string',
'value': None,
},
'verifier': {
'type': 'data',
'encoding': 'string',
'value': None,
},
# Device Attestation Credentials
'dac-cert': {
'type': 'file',
'encoding': 'binary',
'value': None,
},
'dac-key': {
'type': 'file',
'encoding': 'binary',
'value': None,
},
'dac-pub-key': {
'type': 'file',
'encoding': 'binary',
'value': None,
},
'pai-cert': {
'type': 'file',
'encoding': 'binary',
'value': None,
},
'cert-dclrn': {
'type': 'file',
'encoding': 'binary',
'value': None,
},
},
'chip-config': {
'unique-id': {
'type': 'data',
'encoding': 'string',
'value': None,
}
}
}
def get_dict(key, type, encoding, value):
return {
key: {
'type': type,
'encoding': encoding,
'value': value,
}
}
def chip_nvs_get_config_csv():
csv_data = ''
for k, v in CHIP_NVS_MAP.items():
csv_data += k + ',' + 'namespace,' + '\n'
for k1, v1 in v.items():
csv_data += k1 + ',' + v1['type'] + ',' + v1['encoding'] + '\n'
return csv_data
def chip_factory_append(key, type, encoding, value):
CHIP_NVS_MAP['chip-factory'].update(get_dict(key, type, encoding, value))
def chip_factory_update(key, value):
CHIP_NVS_MAP['chip-factory'][key]['value'] = value
def chip_config_update(key, value):
CHIP_NVS_MAP['chip-config'][key]['value'] = value
def chip_get_keys_as_csv():
keys = list()
for ns in CHIP_NVS_MAP:
keys.extend(list(CHIP_NVS_MAP[ns]))
return ','.join(keys)
def chip_get_values_as_csv():
values = list()
for ns in CHIP_NVS_MAP:
for k in CHIP_NVS_MAP[ns]:
values.append(str(CHIP_NVS_MAP[ns][k]['value']))
return ','.join(values)
-69
View File
@@ -1,69 +0,0 @@
#!/usr/bin/env python3
# Copyright 2022 Espressif Systems (Shanghai) PTE LTD
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
This file contains the CHIP specific key along with the data type and encoding format
"""
CHIP_KEY_MAP = {
'chip-factory': {
'discriminator': {
'type': 'data',
'encoding': 'u32',
},
'pin-code': {
'type': 'data',
'encoding': 'u32',
},
'iteration-count': {
'type': 'data',
'encoding': 'u32',
},
'salt': {
'type': 'data',
'encoding': 'string',
},
'verifier': {
'type': 'data',
'encoding': 'string',
},
'dac-cert': {
'type': 'file',
'encoding': 'binary',
},
'dac-key': {
'type': 'file',
'encoding': 'binary',
},
'dac-pub-key': {
'type': 'file',
'encoding': 'binary',
},
'pai-cert': {
'type': 'file',
'encoding': 'binary',
},
'cert-dclrn': {
'type': 'file',
'encoding': 'binary',
},
'unique-id': {
'type': 'data',
'encoding': 'hex2bin',
},
},
}
+200 -281
View File
@@ -28,11 +28,9 @@ import logging
import binascii
import argparse
import pyqrcode
import subprocess
import cryptography.hazmat.backends
import cryptography.x509
from chip_nvs_keys import CHIP_KEY_MAP
from chip_nvs import *
from utils import *
if not os.getenv('IDF_PATH'):
logging.error("IDF_PATH environment variable is not set")
@@ -41,49 +39,34 @@ if not os.getenv('IDF_PATH'):
# TODO: Handle count > 100 case. spake2p gen-verifier doesn't support count > 100
TOOLS = {
'spake2p' : None,
'chip-cert' : None,
'spake2p': None,
'chip-cert': None,
'chip-tool': None,
'mfg_gen' : None,
'mfg_gen': None,
}
PAI = {
'cert_pem': None,
'cert_der': None,
'key_pem' : None,
'key_der' : None,
'key_pem': None,
'key_der': None,
}
OUT_DIR = {
'top' : None,
'top': None,
'chip': None,
}
OUT_FILE = {
'config_csv' : None,
'chip_mcsv' : None,
'mcsv' : None,
'config_csv': None,
'chip_mcsv': None,
'mcsv': None,
'pin_csv': None,
'pin_disc_csv': None,
}
UNIQUE_ID_LEN = 16 # 16 bytes (128-bits)
INVALID_PASSCODES = [ 00000000, 11111111, 22222222, 33333333, 44444444, 55555555,
66666666, 77777777, 88888888, 99999999, 12345678, 87654321 ]
UUIDs = list()
# Lengths for manual pairing codes and qrcode
SHORT_MANUALCODE_LEN = 11
LONG_MANUALCODE_LEN = 21
QRCODE_LEN = 22
def vid_pid_str(vid, pid):
return '_'.join([hex(vid)[2:], hex(pid)[2:]])
def disc_pin_str(discriminator, passcode):
return '_'.join([hex(discriminator)[2:], hex(passcode)[2:]])
def check_tools_exists():
TOOLS['spake2p'] = shutil.which('spake2p')
@@ -112,17 +95,6 @@ def check_tools_exists():
logging.debug('chip-tool: {}'.format(TOOLS['chip-tool']))
logging.debug('mfg_gen: {}'.format(TOOLS['mfg_gen']))
def execute_cmd(cmd):
logging.debug('Executing Command: {}'.format(cmd))
status = subprocess.run(cmd, capture_output=True)
try:
status.check_returncode()
except subprocess.CalledProcessError as e:
if status.stderr:
logging.error('[stderr]: {}'.format(status.stderr.decode('utf-8').strip()))
logging.error('Command failed with error: {}'.format(e))
sys.exit(1)
def generate_passcodes(args):
iter_count_max = 10000
@@ -142,6 +114,7 @@ def generate_passcodes(args):
execute_cmd(cmd)
def generate_discriminators(args):
discriminators = list()
@@ -154,6 +127,7 @@ def generate_discriminators(args):
return discriminators
# Append discriminator to each line of the passcode file
def append_discriminator(discriminator):
with open(OUT_FILE['pin_csv'], 'r') as fd:
@@ -168,15 +142,11 @@ def append_discriminator(discriminator):
os.remove(OUT_FILE['pin_csv'])
# Generates the csv file containing chip specific keys and keys provided by user in csv file
def generate_config_csv(args):
logging.info("Generating Config CSV...")
csv_data = ''
for k, v in CHIP_KEY_MAP.items():
csv_data += k + ',' + 'namespace,' + '\n'
for k1, v1 in v.items():
csv_data += k1 + ',' + v1['type'] + ',' + v1['encoding'] + '\n'
csv_data = chip_nvs_get_config_csv()
# Read data from the user provided csv file
if args.csv:
@@ -186,60 +156,25 @@ def generate_config_csv(args):
with open(OUT_FILE['config_csv'], 'w') as f:
f.write(csv_data)
def write_chip_mcsv_header(args):
logging.info('Writing chip manifest CSV header...')
keys = list()
for _, v in CHIP_KEY_MAP.items():
for k1, _ in v.items():
keys.append(k1)
mcsv_header = ','.join(keys) + '\n'
mcsv_header = chip_get_keys_as_csv() + '\n'
with open(OUT_FILE['chip_mcsv'], 'w') as f:
f.write(mcsv_header)
def append_chip_mcsv_row(row_data, args):
logging.info('Appending chip master CSV row...')
with open(OUT_FILE['chip_mcsv'], 'a') as f:
f.write(row_data + '\n')
# Convert the certificate in PEM format to DER format
def convert_x509_cert_from_pem_to_der(pem_file, out_der_file):
with open(pem_file, 'rb') as f:
pem_data = f.read()
pem_cert = cryptography.x509.load_pem_x509_certificate(pem_data, cryptography.hazmat.backends.default_backend())
der_cert = pem_cert.public_bytes(cryptography.hazmat.primitives.serialization.Encoding.DER)
with open(out_der_file, 'wb') as f:
f.write(der_cert)
# Generate the Public and Private key pair binaries
def generate_keypair_bin(pem_file, out_privkey_bin, out_pubkey_bin):
with open(pem_file, 'rb') as f:
pem_data = f.read()
key_pem = cryptography.hazmat.primitives.serialization.load_pem_private_key(pem_data, None)
private_number_val = key_pem.private_numbers().private_value
public_number_x = key_pem.public_key().public_numbers().x
public_number_y = key_pem.public_key().public_numbers().y
public_key_first_byte = 0x04
with open(out_privkey_bin, 'wb') as f:
f.write(private_number_val.to_bytes(32, byteorder='big'))
with open(out_pubkey_bin, 'wb') as f:
f.write(public_key_first_byte.to_bytes(1, byteorder='big'))
f.write(public_number_x.to_bytes(32, byteorder='big'))
f.write(public_number_y.to_bytes(32, byteorder='big'))
def generate_pai(args, ca_key, ca_cert, out_key, out_cert):
cmd = [
TOOLS['chip-cert'], 'gen-att-cert',
'--type', 'i',
'--subject-cn', '"{} PAI {}"'.format(args.subject_cn_prefix, '00'),
'--subject-cn', '"{} PAI {}"'.format(args.cn_prefix, '00'),
'--out-key', out_key,
'--out', out_cert,
]
@@ -260,17 +195,18 @@ def generate_pai(args, ca_key, ca_cert, out_key, out_cert):
logging.info('Generated PAI certificate: {}'.format(out_cert))
logging.info('Generated PAI private key: {}'.format(out_key))
def generate_dac(iteration, args, discriminator, passcode, ca_key, ca_cert):
out_key_pem = os.sep.join([OUT_DIR['top'], UUIDs[iteration], 'internal', 'DAC_key.pem'])
out_key_pem = os.sep.join([OUT_DIR['top'], UUIDs[iteration], 'internal', 'DAC_key.pem'])
out_cert_pem = out_key_pem.replace('key.pem', 'cert.pem')
out_cert_der = out_key_pem.replace('key.pem', 'cert.der')
out_private_key_bin = out_key_pem.replace('key.pem', 'private_key.bin')
out_public_key_bin = out_key_pem.replace('key.pem', 'public_key.bin')
out_public_key_bin = out_key_pem.replace('key.pem', 'public_key.bin')
cmd = [
TOOLS['chip-cert'], 'gen-att-cert',
'--type', 'd',
'--subject-cn', '"{} DAC {}"'.format(args.subject_cn_prefix, iteration),
'--subject-cn', '"{} DAC {}"'.format(args.cn_prefix, iteration),
'--out-key', out_key_pem,
'--out', out_cert_pem,
]
@@ -281,10 +217,10 @@ def generate_dac(iteration, args, discriminator, passcode, ca_key, ca_cert):
cmd.extend(['--valid-from', str(args.valid_from)])
cmd.extend(['--subject-vid', hex(args.vendor_id)[2:],
'--subject-pid', hex(args.product_id)[2:],
'--ca-key', ca_key,
'--ca-cert', ca_cert,
])
'--subject-pid', hex(args.product_id)[2:],
'--ca-key', ca_key,
'--ca-cert', ca_cert,
])
execute_cmd(cmd)
logging.info('Generated DAC certificate: {}'.format(out_cert_pem))
@@ -299,6 +235,7 @@ def generate_dac(iteration, args, discriminator, passcode, ca_key, ca_cert):
return out_cert_der, out_private_key_bin, out_public_key_bin
def use_dac_from_args(args):
logging.info('Using DAC from command line arguments...')
logging.info('DAC Certificate: {}'.format(args.dac_cert))
@@ -307,7 +244,7 @@ def use_dac_from_args(args):
# There should be only one UUID in the UUIDs list if DAC is specified
out_cert_der = os.sep.join([OUT_DIR['top'], UUIDs[0], 'internal', 'DAC_cert.der'])
out_private_key_bin = out_cert_der.replace('cert.der', 'private_key.bin')
out_public_key_bin = out_cert_der.replace('cert.der', 'public_key.bin')
out_public_key_bin = out_cert_der.replace('cert.der', 'public_key.bin')
convert_x509_cert_from_pem_to_der(args.dac_cert, out_cert_der)
logging.info('Generated DAC certificate in DER format: {}'.format(out_cert_der))
@@ -318,17 +255,18 @@ def use_dac_from_args(args):
return out_cert_der, out_private_key_bin, out_public_key_bin
def setup_out_dirs(vid, pid, count):
OUT_DIR['top'] = os.sep.join(['out', vid_pid_str(vid, pid)])
OUT_DIR['top'] = os.sep.join(['out', vid_pid_str(vid, pid)])
OUT_DIR['stage'] = os.sep.join(['out', vid_pid_str(vid, pid), 'staging'])
os.makedirs(OUT_DIR['top'], exist_ok=True)
os.makedirs(OUT_DIR['stage'], exist_ok=True)
OUT_FILE['config_csv'] = os.sep.join([OUT_DIR['stage'], 'config.csv'])
OUT_FILE['chip_mcsv'] = os.sep.join([OUT_DIR['stage'], 'chip_master.csv'])
OUT_FILE['mcsv'] = os.sep.join([OUT_DIR['stage'], 'master.csv'])
OUT_FILE['pin_csv'] = os.sep.join([OUT_DIR['stage'], 'pin.csv'])
OUT_FILE['config_csv'] = os.sep.join([OUT_DIR['stage'], 'config.csv'])
OUT_FILE['chip_mcsv'] = os.sep.join([OUT_DIR['stage'], 'chip_master.csv'])
OUT_FILE['mcsv'] = os.sep.join([OUT_DIR['stage'], 'master.csv'])
OUT_FILE['pin_csv'] = os.sep.join([OUT_DIR['stage'], 'pin.csv'])
OUT_FILE['pin_disc_csv'] = os.sep.join([OUT_DIR['stage'], 'pin_disc.csv'])
# Create directories to store the generated files
@@ -337,6 +275,7 @@ def setup_out_dirs(vid, pid, count):
UUIDs.append(uuid_str)
os.makedirs(os.sep.join([OUT_DIR['top'], uuid_str, 'internal']), exist_ok=True)
def generate_passcodes_and_discriminators(args):
# Generate passcodes using spake2p tool
generate_passcodes(args)
@@ -345,17 +284,19 @@ def generate_passcodes_and_discriminators(args):
# Append discriminators to passcodes file
append_discriminator(discriminators)
def setup_csv_files(args):
def write_csv_files(args):
generate_config_csv(args)
write_chip_mcsv_header(args)
def setup_root_certs(args):
# If PAA is passed as input, then generate PAI certificate
if args.paa:
# output file names
PAI['cert_pem'] = os.sep.join([OUT_DIR['stage'], 'pai_cert.pem'])
PAI['cert_der'] = os.sep.join([OUT_DIR['stage'], 'pai_cert.der'])
PAI['key_pem'] = os.sep.join([OUT_DIR['stage'], 'pai_key.pem'])
PAI['key_pem'] = os.sep.join([OUT_DIR['stage'], 'pai_key.pem'])
generate_pai(args, args.key, args.cert, PAI['key_pem'], PAI['cert_pem'])
convert_x509_cert_from_pem_to_der(PAI['cert_pem'], PAI['cert_der'])
@@ -364,51 +305,53 @@ def setup_root_certs(args):
# If PAI is passed as input, generate DACs
elif args.pai:
PAI['cert_pem'] = args.cert
PAI['key_pem'] = args.key
PAI['key_pem'] = args.key
PAI['cert_der'] = os.sep.join([OUT_DIR['stage'], 'pai_cert.der'])
convert_x509_cert_from_pem_to_der(PAI['cert_pem'], PAI['cert_der'])
logging.info('Generated PAI certificate in DER format: {}'.format(PAI['cert_der']))
# No need to verify else block as validate_args() already checks for this
# Generates DACs, add the required items to the master.csv file, and
# generates the onboarding data i.e. manual codes and QR codes and image
def generate_dacs_and_onb_data(args):
# This function generates the DACs, picks the commissionable data from the already present csv file,
# and generates the onboarding payloads, and writes everything to the master csv
def write_per_device_unique_data(args):
with open(OUT_FILE['pin_disc_csv'], 'r') as csvf:
pin_disc_dict = csv.DictReader(csvf)
# If any key is added to chip_nvs_keys.py make sure to maintain that order below
for row in pin_disc_dict:
row_items = list()
row_items.append(row['Discriminator'])
# TODO: remove PIN Code, spec says it should not be included in nvs storage
row_items.append(row['PIN Code'])
row_items.append(row['Iteration Count'])
row_items.append(row['Salt'])
row_items.append(row['Verifier'])
chip_factory_update('discriminator', row['Discriminator'])
chip_factory_update('iteration-count', row['Iteration Count'])
chip_factory_update('salt', row['Salt'])
chip_factory_update('verifier', row['Verifier'])
if args.dac_key is not None and args.dac_cert is not None:
dacs = use_dac_from_args(args)
else:
dacs = generate_dac(int(row['Index']), args, int(row['Discriminator']), int(row['PIN Code']), PAI['key_pem'], PAI['cert_pem'])
dacs = generate_dac(int(row['Index']), args, int(row['Discriminator']),
int(row['PIN Code']), PAI['key_pem'], PAI['cert_pem'])
# This appends DAC cert, private key, and public key
row_items.extend([os.path.abspath(x) for x in dacs])
chip_factory_update('dac-cert', os.path.abspath(dacs[0]))
chip_factory_update('dac-key', os.path.abspath(dacs[1]))
chip_factory_update('dac-pub-key', os.path.abspath(dacs[2]))
row_items.append(os.path.abspath(PAI['cert_der']))
row_items.append(os.path.abspath(args.cert_dclrn))
chip_factory_update('pai-cert', os.path.abspath(PAI['cert_der']))
chip_factory_update('cert-dclrn', os.path.abspath(args.cert_dclrn))
# Unique-id
row_items.append(binascii.b2a_hex(os.urandom(UNIQUE_ID_LEN)).decode('utf-8'))
# If unique id is not passed, then generate one
if (args.unique_id is None):
chip_config_update('unique-id', binascii.b2a_hex(os.urandom(int(MAX_UNIQUE_ID_LEN / 2))).decode('utf-8'))
mcsv_row_data = ','.join(row_items)
# If serial number is not passed, then generate one
if (args.serial_num is None):
chip_factory_append('serial-num', 'data', 'string', binascii.b2a_hex(os.urandom(SERIAL_NUMBER_LEN)).decode('utf-8'))
mcsv_row_data = chip_get_values_as_csv()
append_chip_mcsv_row(mcsv_row_data, args)
# Generate onboarding data
generate_onboarding_data(args, int(row['Index']), int(row['Discriminator']), int(row['PIN Code']))
def merge_chip_mcsv_and_user_mcsv(args):
logging.info('Merging chip master CSV and user master CSV...')
@@ -426,13 +369,15 @@ def merge_chip_mcsv_and_user_mcsv(args):
logging.error('Chip and user mcsv files have different number of rows')
sys.exit(1)
chip_mcsv_data = [','.join((c_line.strip(), u_line.strip())) + '\n' for c_line, u_line in zip(chip_mcsv_data, user_mcsv_data)]
chip_mcsv_data = [','.join((c_line.strip(), u_line.strip())) + '\n' for c_line,
u_line in zip(chip_mcsv_data, user_mcsv_data)]
with open(OUT_FILE['mcsv'], 'w') as f:
f.write(''.join(chip_mcsv_data))
os.remove(OUT_FILE['chip_mcsv'])
def organize_output_files(suffix):
for i in range(len(UUIDs)):
dest_path = os.sep.join([OUT_DIR['top'], UUIDs[i]])
@@ -454,70 +399,27 @@ def organize_output_files(suffix):
os.rmdir(os.sep.join([OUT_DIR['top'], 'bin']))
os.rmdir(os.sep.join([OUT_DIR['top'], 'csv']))
def generate_partitions(suffix, size):
cmd = [
TOOLS['mfg_gen'], 'generate',
'python3', TOOLS['mfg_gen'], 'generate',
OUT_FILE['config_csv'], OUT_FILE['mcsv'],
suffix, hex(size), '--outdir', OUT_DIR['top']
]
execute_cmd(cmd)
def get_manualcode_args(args, discriminator, passcode):
payload_args = list()
payload_args.append('--discriminator')
payload_args.append(str(discriminator))
payload_args.append('--setup-pin-code')
payload_args.append(str(passcode))
payload_args.append('--version')
payload_args.append('0')
payload_args.append('--vendor-id')
payload_args.append(str(args.vendor_id))
payload_args.append('--product-id')
payload_args.append(str(args.product_id))
payload_args.append('--commissioning-mode')
payload_args.append(str(args.commissioning_flow))
return payload_args
def get_qrcode_args(args, discriminator, passcode):
payload_args = get_manualcode_args(args, discriminator, passcode)
payload_args.append('--rendezvous')
payload_args.append(str(1 << args.discovery_mode))
return payload_args
def get_chip_qrcode(payload_args):
cmd_args = [TOOLS['chip-tool'], 'payload', 'generate-qrcode']
cmd_args.extend(payload_args)
data = subprocess.check_output(cmd_args)
# Command output is as below:
# \x1b[0;32m[1655386003372] [23483:7823617] CHIP: [TOO] QR Code: MT:Y.K90-WB010E7648G00\x1b[0m
return data.decode('utf-8').split('QR Code: ')[1][:QRCODE_LEN]
def get_chip_manualcode(payload_args, commissioning_flow):
cmd_args = [TOOLS['chip-tool'], 'payload', 'generate-manualcode']
cmd_args.extend(payload_args)
data = subprocess.check_output(cmd_args)
# Command output is as below:
# \x1b[0;32m[1655386909774] [24424:7837894] CHIP: [TOO] Manual Code: 749721123365521327689\x1b[0m\n
# OR
# \x1b[0;32m[1655386926028] [24458:7838229] CHIP: [TOO] Manual Code: 34972112338\x1b[0m\n
# Length of manual code depends on the commissioning flow:
# For standard commissioning flow it is 11 digits
# For User-intent and custom commissioning flow it is 21 digits
manual_code_len = LONG_MANUALCODE_LEN if commissioning_flow else SHORT_MANUALCODE_LEN
return data.decode('utf-8').split('Manual Code: ')[1][:manual_code_len]
def generate_onboarding_data(args, index, discriminator, passcode):
chip_manualcode = get_chip_manualcode(get_manualcode_args(args, discriminator, passcode), args.commissioning_flow)
chip_qrcode = get_chip_qrcode(get_qrcode_args(args, discriminator, passcode))
chip_manualcode = get_chip_manualcode(TOOLS['chip-tool'], args.vendor_id, args.product_id,
args.commissioning_flow, discriminator, passcode)
chip_qrcode = get_chip_qrcode(TOOLS['chip-tool'], args.vendor_id, args.product_id,
args.commissioning_flow, discriminator, passcode, args.discovery_mode)
logging.info('Generated QR code: ' + chip_qrcode)
logging.info('Generated manual code: ' + chip_manualcode)
# TODO: Append commissioning flow, discovery mode, passcode, discriminator as well?
csv_data = 'qrcode,manualcode\n'
csv_data += chip_qrcode + ',' + chip_manualcode + '\n'
csv_data = 'qrcode,manualcode,discriminator,passcode\n'
csv_data += chip_qrcode + ',' + chip_manualcode + ',' + str(discriminator) + ',' + str(passcode) + '\n'
onboarding_data_file = os.sep.join([OUT_DIR['top'], UUIDs[index], '{}-onb_codes.csv'.format(UUIDs[index])])
with open(onboarding_data_file, 'w') as f:
@@ -530,141 +432,158 @@ def generate_onboarding_data(args, index, discriminator, passcode):
logging.info('Generated onboarding data and QR Code')
def validate_args(args):
# csv and mcsv both should present or none
if (args.csv is not None) != (args.mcsv is not None):
logging.error("csv and mcsv should be both present or none")
sys.exit(1)
else:
# Read the number of lines in mcsv file
if args.mcsv is not None:
with open(args.mcsv, 'r') as f:
lines = sum(1 for line in f)
# Subtract 1 for the header line
args.count = lines - 1
# Validate the passcode
if args.passcode is not None:
if ((args.passcode < 0x0000001 and args.passcode > 0x5F5E0FE) or (args.passcode in INVALID_PASSCODES)):
logging.error('Invalid passcode' + str(args.passcode))
sys.exit(1)
# Validate the discriminator
if (args.discriminator is not None) and (args.discriminator not in range(0x0000, 0x0FFF)):
logging.error('Invalid discriminator ' + str(args.discriminator))
sys.exit(1)
if args.unique_id is not None:
if len(args.unique_id) < (UNIQUE_ID_LEN * 2):
logging.error('Unique ID is too short')
sys.exit(1)
if (len(args.unique_id) % 2) != 0:
logging.error('Unique ID is not even length')
sys.exit(1)
# Check for a valid hex string
try:
int(args.unique_id, 16)
except ValueError:
logging.error('Unique ID is not a valid hex string')
sys.exit(1)
# DAC key and DAC cert both should be present or none
if (args.dac_key is not None) != (args.dac_cert is not None):
logging.error("dac_key and dac_cert should be both present or none")
sys.exit(1)
else:
# Make sure PAI certificate is present if DAC is present
if (args.dac_key is not None) and (args.pai is False):
logging.error('Please provide PAI certificate along with DAC certificate and DAC key')
sys.exit(1)
# If unique_id/discriminator/passcode/DAC is present then we are restricting
# the number of partitions to 1
if (args.discriminator is not None
or args.passcode is not None
or args.unique_id is not None
or args.dac_key is not None):
if args.count > 1:
logging.error('Number of partitions should be 1 when unique_id or discriminator or passcode or DAC is present')
sys.exit(1)
# Validate the input certificate type, if DAC is not present
if args.dac_key is None and args.dac_cert is None:
if args.paa:
logging.info('Input Root certificate type PAA')
elif args.pai:
logging.info('Input Root certificate type PAI')
else:
logging.error('Either PAA or PAI certificate is required')
sys.exit(1)
# Check if Key and certificate are present
if args.key is None or args.cert is None:
logging.error('PAA key and certificate are required')
sys.exit(1)
logging.info('Number of manufacturing NVS images to generate: {}'.format(args.count))
def get_args():
def any_base_int(s): return int(s, 0)
parser = argparse.ArgumentParser(description='Manufacuring partition generator tool')
parser.add_argument('-n', '--count', type=any_base_int, default=1, help='The number of manufacturing partition binaries to generate. Default is 1. If --csv and --mcsv are present, the number of lines in the mcsv file is used.')
parser.add_argument('-s', '--size', type=any_base_int, default=0x6000, help='The size of manufacturing partition binaries to generate. Default is 0x6000.')
parser.add_argument('-cn', '--subject-cn-prefix', type=str, default='ESP32', help='The common name prefix of the subject of the generated certificate.')
parser = argparse.ArgumentParser(description='Manufacuring partition generator tool',
formatter_class=lambda prog: argparse.HelpFormatter(prog, max_help_position=50))
parser.add_argument('-v', '--vendor-id', type=any_base_int, required=True, help='The vendor ID.')
parser.add_argument('-p', '--product-id', type=any_base_int, required=True, help='The product ID.')
g_gen = parser.add_argument_group('General options')
g_gen.add_argument('-n', '--count', type=any_base_int, default=1,
help='The number of manufacturing partition binaries to generate. Default is 1. \
If --csv and --mcsv are present, the number of lines in the mcsv file is used.')
g_gen.add_argument('-s', '--size', type=any_base_int, default=0x6000,
help='The size of manufacturing partition binaries to generate. Default is 0x6000.')
input_cert_group = parser.add_mutually_exclusive_group(required=True)
g_commissioning = parser.add_argument_group('Commisioning options')
g_commissioning.add_argument('--passcode', type=any_base_int,
help='The passcode for pairing. Randomly generated if not specified.')
g_commissioning.add_argument('--discriminator', type=any_base_int,
help='The discriminator for pairing. Randomly generated if not specified.')
g_commissioning.add_argument('-cf', '--commissioning-flow', type=any_base_int, default=0,
help='Device commissioning flow, 0:Standard, 1:User-Intent, 2:Custom. \
Default is 0.', choices=[0, 1, 2])
g_commissioning.add_argument('-dm', '--discovery-mode', type=any_base_int, default=1,
help='Commissionable device discovery netowrking technology. \
0:WiFi-SoftAP, 1:BLE, 2:On-network. Default is BLE.', choices=[0, 1, 2])
g_dac = parser.add_argument_group('Device attestation credential options')
g_dac.add_argument('-cn', '--cn-prefix', type=str, default='ESP32',
help='The common name prefix of the subject of the generated certificate.')
g_dac.add_argument('-lt', '--lifetime', default=4294967295, type=any_base_int,
help='Lifetime of the generated certificate. Default is 4294967295 if not specified, \
this indicate that certificate does not have well defined expiration date.')
g_dac.add_argument('-vf', '--valid-from', type=str,
help='The start date for the certificate validity period in format <YYYY>-<MM>-<DD> [ <HH>:<MM>:<SS> ]. \
Default is current date.')
# If DAC is present then PAI key is not required, so it is marked as not required here
# but, if DAC is not present then PAI key is required and that case is validated in validate_args()
g_dac.add_argument('-c', '--cert', type=str, required=True, help='The input certificate file in PEM format.')
g_dac.add_argument('-k', '--key', type=str, required=False, help='The input key file in PEM format.')
g_dac.add_argument('-cd', '--cert-dclrn', type=str, required=True, help='The certificate declaration file in DER format.')
g_dac.add_argument('--dac-cert', type=str, help='The input DAC certificate file in PEM format.')
g_dac.add_argument('--dac-key', type=str, help='The input DAC private key file in PEM format.')
input_cert_group = g_dac.add_mutually_exclusive_group(required=True)
input_cert_group.add_argument('--paa', action='store_true', help='Use input certificate as PAA certificate.')
input_cert_group.add_argument('--pai', action='store_true', help='Use input certificate as PAI certificate.')
# If DAC is present then PAI key is not required, so it is marked as not required here
# but, if DAC is not present then PAI key is required and that case is validated in validate_args()
parser.add_argument('-c', '--cert', type=str, required=True, help='The input certificate file in PEM format.')
parser.add_argument('-k', '--key', type=str, required=False, help='The input key file in PEM format.')
g_dev_inst_info = parser.add_argument_group('Device instance information options')
g_dev_inst_info.add_argument('-v', '--vendor-id', type=any_base_int, required=False, help='Vendor id')
g_dev_inst_info.add_argument('--vendor-name', type=str, required=False, help='Vendor name')
g_dev_inst_info.add_argument('-p', '--product-id', type=any_base_int, required=False, help='Product id')
g_dev_inst_info.add_argument('--product-name', type=str, required=False, help='Product name')
g_dev_inst_info.add_argument('--hw-ver', type=any_base_int, required=False, help='Hardware version')
g_dev_inst_info.add_argument('--hw-ver-str', type=str, required=False, help='Hardware version string')
g_dev_inst_info.add_argument('--mfg-date', type=str, required=False, help='Manufacturing date in format YYYY-MM-DD')
g_dev_inst_info.add_argument('--serial-num', type=str, required=False, help='Serial number')
g_dev_inst_info.add_argument('--rd-id-uid', type=str, required=False,
help='128-bit unique identifier for generating rotating device identifier, provide 32-byte hex string, e.g. "1234567890abcdef1234567890abcdef"')
parser.add_argument('-cd', '--cert-dclrn', type=str, required=True, help='The certificate declaration file in DER format.')
g_dev_inst = parser.add_argument_group('Device instance options')
g_dev_inst.add_argument('--calendar-types', type=str, nargs='+', required=False,
help='List of supported calendar types. Supported Calendar Types: Buddhist, Chinese, Coptic, \
Ethiopian, Gregorian, Hebrew, Indian, Islamic, Japanese, Korean, Persian, Taiwanese')
g_dev_inst.add_argument('--locales', type=str, nargs='+', required=False,
help='List of supported locales, Language Tag as defined by BCP47, eg. en-US en-GB')
g_dev_inst.add_argument('--fixed-labels', type=str, nargs='+', required=False,
help='List of fixed labels, eg: "0/orientation/up" "1/orientation/down" "2/orientation/down"')
parser.add_argument('--dac-cert', type=str, help='The input DAC certificate file in PEM format.')
parser.add_argument('--dac-key', type=str, help='The input DAC private key file in PEM format.');
g_basic = parser.add_argument_group('Few more Basic clusters options')
g_basic.add_argument('--product-label', type=str, required=False, help='Product label')
g_basic.add_argument('--product-url', type=str, required=False, help='Product URL')
g_basic.add_argument('--unique-id', type=str, required=False, help='Unique identifier in Basic cluster')
parser.add_argument('-lt', '--lifetime', default=4294967295, type=any_base_int, help='Lifetime of the generated certificate. Default is 4294967295 if not specified, this indicate that certificate does not have well defined expiration date.')
parser.add_argument('-vf', '--valid-from', type=str, help='The start date for the certificate validity period in format <YYYY>-<MM>-<DD> [ <HH>:<MM>:<SS> ]. Default is current date.')
parser.add_argument('--passcode', type=any_base_int, help='The passcode for pairing. Randomly generated if not specified.')
parser.add_argument('--discriminator', type=any_base_int, help='The discriminator for pairing. Randomly generated if not specified.')
parser.add_argument('-cf', '--commissioning-flow', type=any_base_int, default=0, help='Device commissioning flow, 0:Standard, 1:User-Intent, 2:Custom. Default is 0.', choices=[0, 1, 2])
parser.add_argument('-dm', '--discovery-mode', type=any_base_int, default=1, help='Commissionable device discovery netowrking technology. 0:WiFi-SoftAP, 1:BLE, 2:On-network. Default is BLE.', choices=[0, 1, 2])
parser.add_argument('--csv', type=str, help='CSV file containing the partition schema for extra options. [REF: https://docs.espressif.com/projects/esp-idf/en/latest/esp32/api-reference/storage/mass_mfg.html#csv-configuration-file]')
parser.add_argument('--mcsv', type=str, help='Master CSV file containig optional/extra values specified by the user. [REF: https://docs.espressif.com/projects/esp-idf/en/latest/esp32/api-reference/storage/mass_mfg.html#master-value-csv-file]')
parser.add_argument('-u', '--unique-id', type=str, help='Unique Identifier for the device. This should be at least 128-bit long. If not specified, a random unique id is generated.')
g_extra_info = parser.add_argument_group('Extra information options using csv files')
g_extra_info.add_argument('--csv', type=str, help='CSV file containing the partition schema for extra options. \
[REF: https://docs.espressif.com/projects/esp-idf/en/latest/esp32/api-reference/storage/mass_mfg.html#csv-configuration-file]')
g_extra_info.add_argument('--mcsv', type=str, help='Master CSV file containig optional/extra values specified by the user. \
[REF: https://docs.espressif.com/projects/esp-idf/en/latest/esp32/api-reference/storage/mass_mfg.html#master-value-csv-file]')
return parser.parse_args()
def add_optional_KVs(args):
# Device instance information
if args.vendor_id is not None:
chip_factory_append('vendor-id', 'data', 'u32', args.vendor_id)
if args.vendor_name is not None:
chip_factory_append('vendor-name', 'data', 'string', args.vendor_name)
if args.product_id is not None:
chip_factory_append('product-id', 'data', 'u32', args.product_id)
if args.product_name is not None:
chip_factory_append('product-name', 'data', 'string', args.product_name)
if args.hw_ver is not None:
chip_factory_append('hardware-ver', 'data', 'u32', args.hw_ver)
if args.hw_ver_str is not None:
chip_factory_append('hw-ver-str', 'data', 'string', args.hw_ver_str)
if args.mfg_date is not None:
chip_factory_append('mfg-date', 'data', 'string', args.mfg_date)
if args.rd_id_uid is not None:
chip_factory_append('rd-id-uid', 'data', 'hex2bin', args.rd_id_uid)
# Add the serial-num
chip_factory_append('serial-num', 'data', 'string', args.serial_num)
# Device information
if args.calendar_types is not None:
chip_factory_append('cal-types', 'data', 'u32', calendar_types_to_uint32(args.calendar_types))
# Supported locale is stored as multiple entries, key format: "locale/<index>, example key: "locale/0"
if (args.locales is not None):
chip_factory_append('locale-sz', 'data', 'u32', len(args.locales))
for i in range(len(args.locales)):
chip_factory_append('locale/{:x}'.format(i), 'data', 'string', args.locales[i])
# Each endpoint can contains the fixed lables
# - fl-sz/<index> : number of fixed labels for the endpoint
# - fl-k/<ep>/<index> : fixed label key for the endpoint and index
# - fl-v/<ep>/<index> : fixed label value for the endpoint and index
if (args.fixed_labels is not None):
dict = get_fixed_label_dict(args.fixed_labels)
for key in dict.keys():
chip_factory_append('fl-sz/{:x}'.format(int(key)), 'data', 'u32', len(dict[key]))
for i in range(len(dict[key])):
entry = dict[key][i]
chip_factory_append('fl-k/{:x}/{:x}'.format(int(key), i), 'data', 'string', list(entry.keys())[0])
chip_factory_append('fl-v/{:x}/{:x}'.format(int(key), i), 'data', 'string', list(entry.values())[0])
# Keys from basic clusters
if args.product_label is not None:
chip_factory_append('product-label', 'data', 'string', args.product_label)
if args.product_url is not None:
chip_factory_append('product-url', 'data', 'string', args.product_url)
if args.unique_id is not None:
chip_config_update('unique-id', args.unique_id)
def main():
logging.basicConfig(format='[%(asctime)s] [%(levelname)7s] - %(message)s', level=logging.INFO)
args = get_args()
validate_args(args)
check_tools_exists()
setup_out_dirs(args.vendor_id, args.product_id, args.count)
add_optional_KVs(args)
generate_passcodes_and_discriminators(args)
setup_csv_files(args)
write_csv_files(args)
setup_root_certs(args)
generate_dacs_and_onb_data(args)
write_per_device_unique_data(args)
merge_chip_mcsv_and_user_mcsv(args)
generate_partitions('matter_partition', args.size)
organize_output_files('matter_partition')
if __name__ == "__main__":
main()
+1
View File
@@ -1,3 +1,4 @@
bitarray==2.6.0
cryptography==36.0.2
cffi==1.15.0
future==0.18.2
+333
View File
@@ -0,0 +1,333 @@
#!/usr/bin/env python3
# Copyright 2022 Espressif Systems (Shanghai) PTE LTD
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Contains utilitiy functions for validating argument, certs/keys conversion, etc.
"""
import sys
import enum
import logging
import subprocess
from bitarray import bitarray
from bitarray.util import ba2int
import cryptography.hazmat.backends
import cryptography.x509
MAX_UNIQUE_ID_LEN = 32
SERIAL_NUMBER_LEN = 16
# Lengths for manual pairing codes and qrcode
SHORT_MANUALCODE_LEN = 11
LONG_MANUALCODE_LEN = 21
QRCODE_LEN = 22
INVALID_PASSCODES = [00000000, 11111111, 22222222, 33333333, 44444444, 55555555,
66666666, 77777777, 88888888, 99999999, 12345678, 87654321]
class CalendarTypes(enum.Enum):
Buddhist = 0
Chinese = 1
Coptic = 2
Ethiopian = 3
Gregorian = 4
Hebrew = 5
Indian = 6
Islamic = 7
Japanese = 8
Korean = 9
Persian = 10
Taiwanese = 11
def vid_pid_str(vid, pid):
return '_'.join([hex(vid)[2:], hex(pid)[2:]])
def disc_pin_str(discriminator, passcode):
return '_'.join([hex(discriminator)[2:], hex(passcode)[2:]])
# Checks if the input string is a valid hex string
def ishex(s):
try:
n = int(s, 16)
return True
except ValueError:
return False
# Validate the input string length against the min and max length
def check_str_range(s, min_len, max_len, name):
if s and ((len(s) < min_len) or (len(s) > max_len)):
logging.error('%s must be between %d and %d characters', name, min_len, max_len)
sys.exit(1)
# Validate the input integer range
def check_int_range(value, min_value, max_value, name):
if value and ((value < min_value) or (value > max_value)):
logging.error('%s is out of range, should be in range [%d, %d]', name, min_value, max_value)
sys.exit(1)
# Validates discriminator and passcode
def validate_commissionable_data(args):
check_int_range(args.discriminator, 0x0000, 0x0FFF, 'Discriminator')
if args.passcode is not None:
if ((args.passcode < 0x0000001 and args.passcode > 0x5F5E0FE) or (args.passcode in INVALID_PASSCODES)):
logging.error('Invalid passcode' + str(args.passcode))
sys.exit(1)
# Validate the device instance information
def validate_device_instance_info(args):
check_int_range(args.product_id, 0x0000, 0xFFFF, 'Product id')
check_int_range(args.vendor_id, 0x0000, 0xFFFF, 'Vendor id')
check_int_range(args.hw_ver, 0x0000, 0xFFFF, 'Hardware version')
check_str_range(args.serial_num, 1, SERIAL_NUMBER_LEN, 'Serial number')
check_str_range(args.vendor_name, 1, 32, 'Vendor name')
check_str_range(args.product_name, 1, 32, 'Product name')
check_str_range(args.hw_ver_str, 1, 64, 'Hardware version string')
check_str_range(args.mfg_date, 8, 16, 'Manufacturing date')
check_str_range(args.rd_id_uid, 32, 32, 'Rotating device Unique id')
# Validate the device information: calendar types and fixed labels
def validate_device_info(args):
# Validate the input calendar types
if args.calendar_types is not None:
if not (set(args.calendar_types) <= set(CalendarTypes.__members__)):
invalid_types = set(args.calendar_types).union(set(CalendarTypes.__members__)) - set(CalendarTypes.__members__)
logging.error('Unknown calendar type/s: %s', invalid_types)
logging.error('Supported calendar types: %s', ', '.join(CalendarTypes.__members__))
sys.exit(1)
if args.fixed_labels is not None:
for fl in args.fixed_labels:
_l = fl.split('/')
if len(_l) != 3:
logging.error('Invalid fixed label: %s', fl)
sys.exit(1)
if not (ishex(_l[0]) and (len(_l[1]) > 0 and len(_l[1]) < 16) and (len(_l[2]) > 0 and len(_l[2]) < 16)):
logging.error('Invalid fixed label: %s', fl)
sys.exit(1)
# Validates the attestation related arguments
def validate_attestation_info(args):
# DAC key and DAC cert both should be present or none
if (args.dac_key is not None) != (args.dac_cert is not None):
logging.error("dac_key and dac_cert should be both present or none")
sys.exit(1)
else:
# Make sure PAI certificate is present if DAC is present
if (args.dac_key is not None) and (args.pai is False):
logging.error('Please provide PAI certificate along with DAC certificate and DAC key')
sys.exit(1)
# Validate the input certificate type, if DAC is not present
if args.dac_key is None and args.dac_cert is None:
if args.paa:
logging.info('Input Root certificate type PAA')
elif args.pai:
logging.info('Input Root certificate type PAI')
else:
logging.error('Either PAA or PAI certificate is required')
sys.exit(1)
# Check if Key and certificate are present
if args.key is None or args.cert is None:
logging.error('PAA key and certificate are required')
sys.exit(1)
# Validates few basic cluster related arguments: product-label and product-url
def validate_basic_cluster_info(args):
check_str_range(args.product_label, 1, 64, 'Product Label')
check_str_range(args.product_url, 1, 256, 'Product URL')
check_str_range(args.unique_id, 1, MAX_UNIQUE_ID_LEN, 'Unique id')
# Validates the input arguments, this calls the above functions
def validate_args(args):
# csv and mcsv both should present or none
if (args.csv is not None) != (args.mcsv is not None):
logging.error("csv and mcsv should be both present or none")
sys.exit(1)
else:
# Read the number of lines in mcsv file
if args.mcsv is not None:
with open(args.mcsv, 'r') as f:
lines = sum(1 for line in f)
# Subtract 1 for the header line
args.count = lines - 1
validate_commissionable_data(args)
validate_device_instance_info(args)
validate_device_info(args)
validate_attestation_info(args)
validate_basic_cluster_info(args)
# If unique_id/discriminator/passcode/DAC/serial_number is present
# then we are restricting the number of partitions to 1
if (args.discriminator is not None
or args.passcode is not None
or args.unique_id is not None
or args.dac_key is not None
or args.serial_num is not None):
if args.count > 1:
logging.error('Number of partitions should be 1 when unique_id or discriminator or passcode or DAC or serial number is present')
sys.exit(1)
logging.info('Number of manufacturing NVS images to generate: {}'.format(args.count))
# Supported Calendar types is stored as a bit array in one uint32_t.
def calendar_types_to_uint32(calendar_types):
# In validate_device_info() we have already verified that the calendar types are valid
result = bitarray(32, endian='little')
result.setall(0)
for calendar_type in calendar_types:
result[CalendarTypes[calendar_type].value] = 1
return ba2int(result)
# get_fixed_label_dict() converts the list of strings to per endpoint dictionaries.
# example input : ['0/orientation/up', '1/orientation/down', '2/orientation/down']
# example outout : {'0': [{'orientation': 'up'}], '1': [{'orientation': 'down'}], '2': [{'orientation': 'down'}]}
def get_fixed_label_dict(fixed_labels):
fl_dict = {}
for fl in fixed_labels:
_l = fl.split('/')
if len(_l) != 3:
logging.error('Invalid fixed label: %s', fl)
sys.exit(1)
if not (ishex(_l[0]) and (len(_l[1]) > 0 and len(_l[1]) < 16) and (len(_l[2]) > 0 and len(_l[2]) < 16)):
logging.error('Invalid fixed label: %s', fl)
sys.exit(1)
if _l[0] not in fl_dict.keys():
fl_dict[_l[0]] = list()
fl_dict[_l[0]].append({_l[1]: _l[2]})
return fl_dict
# Convert the certificate in PEM format to DER format
def convert_x509_cert_from_pem_to_der(pem_file, out_der_file):
with open(pem_file, 'rb') as f:
pem_data = f.read()
pem_cert = cryptography.x509.load_pem_x509_certificate(pem_data, cryptography.hazmat.backends.default_backend())
der_cert = pem_cert.public_bytes(cryptography.hazmat.primitives.serialization.Encoding.DER)
with open(out_der_file, 'wb') as f:
f.write(der_cert)
# Generate the Public and Private key pair binaries
def generate_keypair_bin(pem_file, out_privkey_bin, out_pubkey_bin):
with open(pem_file, 'rb') as f:
pem_data = f.read()
key_pem = cryptography.hazmat.primitives.serialization.load_pem_private_key(pem_data, None)
private_number_val = key_pem.private_numbers().private_value
public_number_x = key_pem.public_key().public_numbers().x
public_number_y = key_pem.public_key().public_numbers().y
public_key_first_byte = 0x04
with open(out_privkey_bin, 'wb') as f:
f.write(private_number_val.to_bytes(32, byteorder='big'))
with open(out_pubkey_bin, 'wb') as f:
f.write(public_key_first_byte.to_bytes(1, byteorder='big'))
f.write(public_number_x.to_bytes(32, byteorder='big'))
f.write(public_number_y.to_bytes(32, byteorder='big'))
def execute_cmd(cmd):
logging.debug('Executing Command: {}'.format(cmd))
status = subprocess.run(cmd, capture_output=True)
try:
status.check_returncode()
except subprocess.CalledProcessError as e:
if status.stderr:
logging.error('[stderr]: {}'.format(status.stderr.decode('utf-8').strip()))
logging.error('Command failed with error: {}'.format(e))
sys.exit(1)
def get_manualcode_args(vid, pid, flow, discriminator, passcode):
payload_args = list()
payload_args.append('--discriminator')
payload_args.append(str(discriminator))
payload_args.append('--setup-pin-code')
payload_args.append(str(passcode))
payload_args.append('--version')
payload_args.append('0')
payload_args.append('--vendor-id')
payload_args.append(str(vid))
payload_args.append('--product-id')
payload_args.append(str(pid))
payload_args.append('--commissioning-mode')
payload_args.append(str(flow))
return payload_args
def get_qrcode_args(vid, pid, flow, discriminator, passcode, disc_mode):
payload_args = get_manualcode_args(vid, pid, flow, discriminator, passcode)
payload_args.append('--rendezvous')
payload_args.append(str(1 << disc_mode))
return payload_args
def get_chip_qrcode(chip_tool, vid, pid, flow, discriminator, passcode, disc_mode):
payload_args = get_qrcode_args(vid, pid, flow, discriminator, passcode, disc_mode)
cmd_args = [chip_tool, 'payload', 'generate-qrcode']
cmd_args.extend(payload_args)
data = subprocess.check_output(cmd_args)
# Command output is as below:
# \x1b[0;32m[1655386003372] [23483:7823617] CHIP: [TOO] QR Code: MT:Y.K90-WB010E7648G00\x1b[0m
return data.decode('utf-8').split('QR Code: ')[1][:QRCODE_LEN]
def get_chip_manualcode(chip_tool, vid, pid, flow, discriminator, passcode):
payload_args = get_manualcode_args(vid, pid, flow, discriminator, passcode)
cmd_args = [chip_tool, 'payload', 'generate-manualcode']
cmd_args.extend(payload_args)
data = subprocess.check_output(cmd_args)
# Command output is as below:
# \x1b[0;32m[1655386909774] [24424:7837894] CHIP: [TOO] Manual Code: 749721123365521327689\x1b[0m\n
# OR
# \x1b[0;32m[1655386926028] [24458:7838229] CHIP: [TOO] Manual Code: 34972112338\x1b[0m\n
# Length of manual code depends on the commissioning flow:
# For standard commissioning flow it is 11 digits
# For User-intent and custom commissioning flow it is 21 digits
manual_code_len = LONG_MANUALCODE_LEN if flow else SHORT_MANUALCODE_LEN
return data.decode('utf-8').split('Manual Code: ')[1][:manual_code_len]