Merge branch 'mfg_tool/override_csv' into 'main'

mfg_tool: override the configs with --csv and --mcsv options

See merge request app-frameworks/esp-matter!224
This commit is contained in:
Shu Chen
2022-11-24 18:54:55 +08:00
4 changed files with 99 additions and 88 deletions
+7
View File
@@ -140,6 +140,13 @@ Below commands uses the test PAI signing certificate and key, test certificate d
Above command will generate `n` number of partitions. Where `n` is the rows in the mcsv file.
Output binary contains all the chip specific key/value and key/values specified using `--csv` and `--mcsv` option.
### Generate factory partitions without device attestation certificates and keys
```
./mfg_tool.py -v 0xFFF2 -p 0x8001 \
-cd path/to/esp-matter/connectedhomeip/connectedhomeip/credentials/test/certification-declaration/Chip-Test-CD-FFF2-8001.der
```
* NOTE: These factory partitions are only for firmwares with other ways to get the certificates and sign message with the private key.
## Flashing the manufacturing binary
Please note that `mfg_tool.py` only generates manufacturing binary images which need to be flashed onto device using `esptool.py`.
+29 -22
View File
@@ -20,6 +20,9 @@ This file contains the CHIP specific key along with the data type and encoding f
# This map contains mandatory CHIP specific key along with the data type and encoding format
# Additionally to add more keys to chip-factory, use chip_factory_append() API
import csv
CHIP_NVS_MAP = {
'chip-factory': {
# Commissionable Data
@@ -45,26 +48,6 @@ CHIP_NVS_MAP = {
},
# Device Attestation Credentials
'dac-cert': {
'type': 'file',
'encoding': 'binary',
'value': None,
},
'dac-key': {
'type': 'file',
'encoding': 'binary',
'value': None,
},
'dac-pub-key': {
'type': 'file',
'encoding': 'binary',
'value': None,
},
'pai-cert': {
'type': 'file',
'encoding': 'binary',
'value': None,
},
'cert-dclrn': {
'type': 'file',
'encoding': 'binary',
@@ -84,6 +67,13 @@ def get_dict(key, type, encoding, value):
}
def get_namespace_dict(namespace):
return {
namespace: {
}
}
def chip_nvs_get_config_csv():
csv_data = ''
for k, v in CHIP_NVS_MAP.items():
@@ -101,8 +91,25 @@ def chip_factory_update(key, value):
CHIP_NVS_MAP['chip-factory'][key]['value'] = value
def chip_config_update(key, value):
CHIP_NVS_MAP['chip-config'][key]['value'] = value
def chip_nvs_map_update(namespace, key, type, encoding, value):
CHIP_NVS_MAP[namespace].update(get_dict(key, type, encoding, value))
def chip_nvs_map_append_config_csv(csv_path):
with open(csv_path, 'r') as csv_file:
csv_reader = csv.reader(csv_file, delimiter=',')
# Set current namespace to 'chip-factory' when the first line of the csv file is not 'XX,namespace,'
current_namespace = 'chip-factory'
for csv_data in csv_reader:
if 'namespace' in csv_data:
current_namespace = csv_data[0]
if (current_namespace not in list(CHIP_NVS_MAP.keys())):
CHIP_NVS_MAP.update(get_namespace_dict(current_namespace))
else:
chip_nvs_map_update(current_namespace, csv_data[0], csv_data[1], csv_data[2], None)
def chip_factory_get_val(key):
return CHIP_NVS_MAP['chip-factory'][key]['value']
def chip_get_keys_as_csv():
+60 -62
View File
@@ -59,7 +59,6 @@ OUT_DIR = {
OUT_FILE = {
'config_csv': None,
'chip_mcsv': None,
'mcsv': None,
'pin_csv': None,
'pin_disc_csv': None,
@@ -68,16 +67,18 @@ OUT_FILE = {
UUIDs = list()
def check_tools_exists():
def check_tools_exists(args):
TOOLS['spake2p'] = shutil.which('spake2p')
if TOOLS['spake2p'] is None:
logging.error('spake2p not found, please add spake2p path to PATH environment variable')
sys.exit(1)
TOOLS['chip-cert'] = shutil.which('chip-cert')
if TOOLS['chip-cert'] is None:
logging.error('chip-cert not found, please add chip-cert path to PATH environment variable')
sys.exit(1)
# if the certs and keys are not in the generated partitions or the specific dac cert and key are used,
# the chip-cert is not needed.
if args.paa or (args.pai and (args.dac_cert is None and args.dac_key is None)):
TOOLS['chip-cert'] = shutil.which('chip-cert')
if TOOLS['chip-cert'] is None:
logging.error('chip-cert not found, please add chip-cert path to PATH environment variable')
sys.exit(1)
TOOLS['chip-tool'] = shutil.which('chip-tool')
if TOOLS['chip-tool'] is None:
@@ -148,11 +149,6 @@ def generate_config_csv(args):
logging.info("Generating Config CSV...")
csv_data = chip_nvs_get_config_csv()
# Read data from the user provided csv file
if args.csv:
with open(args.csv, 'r') as f:
csv_data += f.read()
with open(OUT_FILE['config_csv'], 'w') as f:
f.write(csv_data)
@@ -160,13 +156,13 @@ def generate_config_csv(args):
def write_chip_mcsv_header(args):
logging.info('Writing chip manifest CSV header...')
mcsv_header = chip_get_keys_as_csv() + '\n'
with open(OUT_FILE['chip_mcsv'], 'w') as f:
with open(OUT_FILE['mcsv'], 'w') as f:
f.write(mcsv_header)
def append_chip_mcsv_row(row_data, args):
def append_chip_mcsv_row(row_data):
logging.info('Appending chip master CSV row...')
with open(OUT_FILE['chip_mcsv'], 'a') as f:
with open(OUT_FILE['mcsv'], 'a') as f:
f.write(row_data + '\n')
@@ -264,7 +260,6 @@ def setup_out_dirs(vid, pid, count):
os.makedirs(OUT_DIR['stage'], exist_ok=True)
OUT_FILE['config_csv'] = os.sep.join([OUT_DIR['stage'], 'config.csv'])
OUT_FILE['chip_mcsv'] = os.sep.join([OUT_DIR['stage'], 'chip_master.csv'])
OUT_FILE['mcsv'] = os.sep.join([OUT_DIR['stage'], 'master.csv'])
OUT_FILE['pin_csv'] = os.sep.join([OUT_DIR['stage'], 'pin.csv'])
OUT_FILE['pin_disc_csv'] = os.sep.join([OUT_DIR['stage'], 'pin_disc.csv'])
@@ -312,6 +307,19 @@ def setup_root_certs(args):
logging.info('Generated PAI certificate in DER format: {}'.format(PAI['cert_der']))
def overwrite_values_in_mcsv(args, index):
with open(args.mcsv, 'r') as mcsvf:
mcsv_dict = list(csv.DictReader(mcsvf))[index]
with open(args.csv, 'r') as csvf:
csv_reader = csv.reader(csvf, delimiter=',')
current_namespace = 'chip-factory'
for csv_data in csv_reader:
if 'namespace' in csv_data:
current_namespace = csv_data[0]
else:
chip_nvs_map_update(current_namespace, csv_data[0], csv_data[1], csv_data[2], mcsv_dict[csv_data[0]])
# This function generates the DACs, picks the commissionable data from the already present csv file,
# and generates the onboarding payloads, and writes everything to the master csv
def write_per_device_unique_data(args):
@@ -324,60 +332,38 @@ def write_per_device_unique_data(args):
chip_factory_update('salt', row['Salt'])
chip_factory_update('verifier', row['Verifier'])
if args.dac_key is not None and args.dac_cert is not None:
dacs = use_dac_from_args(args)
else:
dacs = generate_dac(int(row['Index']), args, int(row['Discriminator']),
int(row['PIN Code']), PAI['key_pem'], PAI['cert_pem'])
if args.paa or args.pai:
if args.dac_key is not None and args.dac_cert is not None:
dacs = use_dac_from_args(args)
else:
dacs = generate_dac(int(row['Index']), args, int(row['Discriminator']),
int(row['PIN Code']), PAI['key_pem'], PAI['cert_pem'])
chip_factory_update('dac-cert', os.path.abspath(dacs[0]))
chip_factory_update('dac-key', os.path.abspath(dacs[1]))
chip_factory_update('dac-pub-key', os.path.abspath(dacs[2]))
chip_factory_update('dac-cert', os.path.abspath(dacs[0]))
chip_factory_update('dac-key', os.path.abspath(dacs[1]))
chip_factory_update('dac-pub-key', os.path.abspath(dacs[2]))
chip_factory_update('pai-cert', os.path.abspath(PAI['cert_der']))
chip_factory_update('pai-cert', os.path.abspath(PAI['cert_der']))
chip_factory_update('cert-dclrn', os.path.abspath(args.cert_dclrn))
# If serial number is not passed, then generate one
if (args.serial_num is None):
chip_factory_append('serial-num', 'data', 'string', binascii.b2a_hex(os.urandom(SERIAL_NUMBER_LEN)).decode('utf-8'))
chip_factory_update('serial-num', binascii.b2a_hex(os.urandom(SERIAL_NUMBER_LEN)).decode('utf-8'))
if (args.enable_rotating_device_id is True) and (args.rd_id_uid is None):
chip_factory_update('rd-id-uid', binascii.b2a_hex(os.urandom(int(ROTATING_DEVICE_ID_UNIQUE_ID_LEN_BITS / 8))).decode('utf-8'))
if (args.csv is not None and args.mcsv is not None):
overwrite_values_in_mcsv(args, int(row['Index']))
mcsv_row_data = chip_get_values_as_csv()
append_chip_mcsv_row(mcsv_row_data, args)
append_chip_mcsv_row(mcsv_row_data)
# Generate onboarding data
generate_onboarding_data(args, int(row['Index']), int(row['Discriminator']), int(row['PIN Code']))
generate_onboarding_data(args, int(row['Index']), int(chip_factory_get_val('discriminator')), int(row['PIN Code']))
def merge_chip_mcsv_and_user_mcsv(args):
logging.info('Merging chip master CSV and user master CSV...')
with open(OUT_FILE['chip_mcsv'], 'r') as f:
chip_mcsv_data = f.readlines()
# If user mcsv is present, merge it with chip mcsv
if args.mcsv:
logging.info('User manifest CSV is present. Merging...')
with open(args.mcsv, 'r') as f:
user_mcsv_data = f.readlines()
if (len(chip_mcsv_data) != len(user_mcsv_data)):
logging.error('Chip and user mcsv files have different number of rows')
sys.exit(1)
chip_mcsv_data = [','.join((c_line.strip(), u_line.strip())) + '\n' for c_line,
u_line in zip(chip_mcsv_data, user_mcsv_data)]
with open(OUT_FILE['mcsv'], 'w') as f:
f.write(''.join(chip_mcsv_data))
os.remove(OUT_FILE['chip_mcsv'])
def organize_output_files(suffix):
def organize_output_files(suffix, args):
for i in range(len(UUIDs)):
dest_path = os.sep.join([OUT_DIR['top'], UUIDs[i]])
internal_path = os.sep.join([dest_path, 'internal'])
@@ -391,7 +377,8 @@ def organize_output_files(suffix):
os.rename(replace, replace_with)
# Also copy the PAI certificate to the output directory
shutil.copy2(PAI['cert_der'], os.sep.join([internal_path, 'PAI_cert.der']))
if args.paa or args.pai:
shutil.copy2(PAI['cert_der'], os.sep.join([internal_path, 'PAI_cert.der']))
logging.info('Generated output files at: {}'.format(os.sep.join([OUT_DIR['top'], UUIDs[i]])))
@@ -468,12 +455,12 @@ def get_args():
Default is current date.')
# If DAC is present then PAI key is not required, so it is marked as not required here
# but, if DAC is not present then PAI key is required and that case is validated in validate_args()
g_dac.add_argument('-c', '--cert', type=str, required=True, help='The input certificate file in PEM format.')
g_dac.add_argument('-c', '--cert', type=str, required=False, help='The input certificate file in PEM format.')
g_dac.add_argument('-k', '--key', type=str, required=False, help='The input key file in PEM format.')
g_dac.add_argument('-cd', '--cert-dclrn', type=str, required=True, help='The certificate declaration file in DER format.')
g_dac.add_argument('--dac-cert', type=str, help='The input DAC certificate file in PEM format.')
g_dac.add_argument('--dac-key', type=str, help='The input DAC private key file in PEM format.')
input_cert_group = g_dac.add_mutually_exclusive_group(required=True)
input_cert_group = g_dac.add_mutually_exclusive_group(required=False)
input_cert_group.add_argument('--paa', action='store_true', help='Use input certificate as PAA certificate.')
input_cert_group.add_argument('--pai', action='store_true', help='Use input certificate as PAI certificate.')
@@ -534,6 +521,17 @@ def add_optional_KVs(args):
# Add the serial-num
chip_factory_append('serial-num', 'data', 'string', args.serial_num)
# Add certificates and keys
if args.paa or args.pai:
chip_factory_append('dac-cert', 'file', 'binary', None)
chip_factory_append('dac-key', 'file', 'binary', None)
chip_factory_append('dac-pub-key', 'file', 'binary', None)
chip_factory_append('pai-cert', 'file', 'binary', None)
# Add the Keys in csv files
if args.csv is not None:
chip_nvs_map_append_config_csv(args.csv)
# Device information
if args.calendar_types is not None:
chip_factory_append('cal-types', 'data', 'u32', calendar_types_to_uint32(args.calendar_types))
@@ -570,16 +568,16 @@ def main():
args = get_args()
validate_args(args)
check_tools_exists()
check_tools_exists(args)
setup_out_dirs(args.vendor_id, args.product_id, args.count)
add_optional_KVs(args)
generate_passcodes_and_discriminators(args)
write_csv_files(args)
setup_root_certs(args)
if args.paa or args.pai:
setup_root_certs(args)
write_per_device_unique_data(args)
merge_chip_mcsv_and_user_mcsv(args)
generate_partitions('matter_partition', args.size)
organize_output_files('matter_partition')
organize_output_files('matter_partition', args)
if __name__ == "__main__":
+3 -4
View File
@@ -151,12 +151,11 @@ def validate_attestation_info(args):
elif args.pai:
logging.info('Input Root certificate type PAI')
else:
logging.error('Either PAA or PAI certificate is required')
sys.exit(1)
logging.info('Do not include the device attestation certificates and keys in partition binaries')
# Check if Key and certificate are present
if args.key is None or args.cert is None:
logging.error('PAA key and certificate are required')
if (args.paa or args.pai) and (args.key is None or args.cert is None):
logging.error('CA key and certificate are required to generate DAC key and certificate')
sys.exit(1)