diff --git a/tools/mfg_tool/mfg_tool.py b/tools/mfg_tool/mfg_tool.py index 4e56f8da2..adf27e3f4 100755 --- a/tools/mfg_tool/mfg_tool.py +++ b/tools/mfg_tool/mfg_tool.py @@ -35,6 +35,8 @@ from utils import * from datetime import datetime from types import SimpleNamespace +from esp_secure_cert.tlv_format import * + if not os.getenv('IDF_PATH'): logging.error("IDF_PATH environment variable is not set") sys.exit(1) @@ -73,6 +75,7 @@ OUT_FILE = { 'mcsv': None, 'pin_csv': None, 'pin_disc_csv': None, + 'cn_dac_csv': None } UUIDs = list() @@ -158,7 +161,6 @@ def append_chip_mcsv_row(row_data): with open(OUT_FILE['mcsv'], 'a') as f: f.write(row_data + '\n') - def generate_pai(args, ca_key, ca_cert, out_key, out_cert): cmd = [ TOOLS['chip-cert'], 'gen-att-cert', @@ -187,15 +189,15 @@ def generate_pai(args, ca_key, ca_cert, out_key, out_cert): def generate_dac(iteration, args, discriminator, passcode, ca_key, ca_cert): out_key_pem = os.sep.join([OUT_DIR['top'], UUIDs[iteration], 'internal', 'DAC_key.pem']) + out_private_key_der = out_key_pem.replace('key.pem', 'key.der') out_cert_pem = out_key_pem.replace('key.pem', 'cert.pem') out_cert_der = out_key_pem.replace('key.pem', 'cert.der') out_private_key_bin = out_key_pem.replace('key.pem', 'private_key.bin') out_public_key_bin = out_key_pem.replace('key.pem', 'public_key.bin') - cmd = [ TOOLS['chip-cert'], 'gen-att-cert', '--type', 'd', - '--subject-cn', '"{} DAC {}"'.format(args.cn_prefix, iteration), + '--subject-cn', UUIDs[iteration], '--out-key', out_key_pem, '--out', out_cert_pem, ] @@ -221,8 +223,8 @@ def generate_dac(iteration, args, discriminator, passcode, ca_key, ca_cert): generate_keypair_bin(out_key_pem, out_private_key_bin, out_public_key_bin) logging.info('Generated DAC private key in binary format: {}'.format(out_private_key_bin)) logging.info('Generated DAC public key in binary format: {}'.format(out_public_key_bin)) - - return out_cert_der, out_private_key_bin, out_public_key_bin + convert_private_key_from_pem_to_der(out_key_pem, out_key_der) + return out_cert_der, out_private_key_bin, out_public_key_bin, out_key_der def use_dac_from_args(args): @@ -256,6 +258,7 @@ def setup_out_dirs(vid, pid, count): OUT_FILE['mcsv'] = os.sep.join([OUT_DIR['stage'], 'master.csv']) OUT_FILE['pin_csv'] = os.sep.join([OUT_DIR['stage'], 'pin.csv']) OUT_FILE['pin_disc_csv'] = os.sep.join([OUT_DIR['stage'], 'pin_disc.csv']) + OUT_FILE['cn_dac_csv'] = os.sep.join([OUT_DIR['top'], 'cn_dacs-{}.csv'.format(datetime.now().strftime("%Y-%m-%d-%H-%M-%S"))]) # Create directories to store the generated files for i in range(count): @@ -273,9 +276,15 @@ def generate_passcodes_and_discriminators(args): append_discriminator(discriminators) +def write_cn_dac_csv_header(): + with open(OUT_FILE['cn_dac_csv'], 'a') as csv_file: + csv_file.write("CN,certs\n") + return + def write_csv_files(args): generate_config_csv(args) write_chip_mcsv_header(args) + write_cn_dac_csv_header() def setup_root_certs(args): @@ -313,6 +322,12 @@ def overwrite_values_in_mcsv(args, index): chip_nvs_map_update(current_namespace, csv_data[0], csv_data[1], csv_data[2], mcsv_dict[csv_data[0]]) +def append_cn_dac_to_csv(common_name, cert_path): + with open(OUT_FILE['cn_dac_csv'], 'a') as csv_file: + with open(cert_path, 'r') as device_cert_file: + device_cert_contents = device_cert_file.read() + csv_file.write('{},"{}"\n'.format(common_name, device_cert_contents)) + # This function generates the DACs, picks the commissionable data from the already present csv file, # and generates the onboarding payloads, and writes everything to the master csv def write_per_device_unique_data(args): @@ -324,7 +339,6 @@ def write_per_device_unique_data(args): chip_factory_update('iteration-count', row['Iteration Count']) chip_factory_update('salt', row['Salt']) chip_factory_update('verifier', row['Verifier']) - if args.paa or args.pai: if args.dac_key is not None and args.dac_cert is not None: dacs = use_dac_from_args(args) @@ -336,7 +350,12 @@ def write_per_device_unique_data(args): chip_factory_update('dac-key', os.path.abspath(dacs[1])) chip_factory_update('dac-pub-key', os.path.abspath(dacs[2])) chip_factory_update('pai-cert', os.path.abspath(PAI['cert_der'])) + secure_cert_partition_file_path = os.sep.join([OUT_DIR['top'], UUIDs[int(row['Index'])], UUIDs[int(row['Index'])] + '_esp_secure_cert.bin']) + # esp secure cert partition + priv_key = tlv_priv_key_t(key_type = tlv_priv_key_type_t.ESP_SECURE_CERT_DEFAULT_FORMAT_KEY, key_path = os.path.abspath(dacs[3]), key_pass = None) + generate_partition_no_ds(priv_key = priv_key, device_cert = os.path.abspath(dacs[0]), ca_cert = os.path.abspath(PAI['cert_der']), idf_target = args.node_platform, op_file = secure_cert_partition_file_path) + append_cn_dac_to_csv(UUIDs[int(row['Index'])], os.sep.join([OUT_DIR['top'], UUIDs[int(row['Index'])], "internal", "DAC_cert.pem"])) # If serial number is not passed, then generate one if (args.serial_num is None): chip_factory_update('serial-num', binascii.b2a_hex(os.urandom(SERIAL_NUMBER_LEN)).decode('utf-8')) @@ -352,6 +371,8 @@ def write_per_device_unique_data(args): # Generate onboarding data generate_onboarding_data(args, int(row['Index']), int(chip_factory_get_val('discriminator')), int(row['PIN Code'])) + if args.paa or args.pai: + logging.info("Generated CSV of CN and DAC: {}".format(OUT_FILE['cn_dac_csv'])) def organize_output_files(suffix, args): @@ -457,7 +478,6 @@ def generate_onboarding_data(args, index, discriminator, passcode): def get_args(): def any_base_int(s): return int(s, 0) - parser = argparse.ArgumentParser(description='Manufacuring partition generator tool', formatter_class=lambda prog: argparse.HelpFormatter(prog, max_help_position=50)) @@ -465,6 +485,9 @@ def get_args(): g_gen.add_argument('-n', '--count', type=any_base_int, default=1, help='The number of manufacturing partition binaries to generate. Default is 1. \ If --csv and --mcsv are present, the number of lines in the mcsv file is used.') + + g_gen.add_argument('--target', default='esp32', + help='The platform type of device. eg: one of esp32, esp32c3, etc.') g_gen.add_argument('-s', '--size', type=any_base_int, default=0x6000, help='The size of manufacturing partition binaries to generate. Default is 0x6000.') g_gen.add_argument('-e', '--encrypt', action='store_true', required=False, @@ -479,7 +502,7 @@ def get_args(): help='Device commissioning flow, 0:Standard, 1:User-Intent, 2:Custom. \ Default is 0.', choices=[0, 1, 2]) g_commissioning.add_argument('-dm', '--discovery-mode', type=any_base_int, default=1, - help='Commissionable device discovery netowrking technology. \ + help='Commissionable device discovery networking technology. \ 0:WiFi-SoftAP, 1:BLE, 2:On-network. Default is BLE.', choices=[0, 1, 2]) g_dac = parser.add_argument_group('Device attestation credential options') diff --git a/tools/mfg_tool/utils.py b/tools/mfg_tool/utils.py index 6e32369e4..5fecf58bf 100644 --- a/tools/mfg_tool/utils.py +++ b/tools/mfg_tool/utils.py @@ -26,7 +26,8 @@ from bitarray import bitarray from bitarray.util import ba2int import cryptography.hazmat.backends import cryptography.x509 - +from cryptography.hazmat.primitives import serialization +from cryptography.hazmat.backends import default_backend ROTATING_DEVICE_ID_UNIQUE_ID_LEN_BITS = 128 SERIAL_NUMBER_LEN = 16 @@ -266,12 +267,26 @@ def convert_x509_cert_from_pem_to_der(pem_file, out_der_file): with open(pem_file, 'rb') as f: pem_data = f.read() - pem_cert = cryptography.x509.load_pem_x509_certificate(pem_data, cryptography.hazmat.backends.default_backend()) - der_cert = pem_cert.public_bytes(cryptography.hazmat.primitives.serialization.Encoding.DER) + pem_cert = cryptography.x509.load_pem_x509_certificate(pem_data, default_backend()) + der_cert = pem_cert.public_bytes(serialization.Encoding.DER) with open(out_der_file, 'wb') as f: f.write(der_cert) +def convert_private_key_from_pem_to_der(pem_file, out_der_file): + with open(pem_file, 'rb') as f: + pem_data = f.read() + + pem_key = serialization.load_pem_private_key(pem_data, None, default_backend()) + + der_key = pem_key.private_bytes( + encoding=serialization.Encoding.DER, + format=serialization.PrivateFormat.TraditionalOpenSSL, + encryption_algorithm=serialization.NoEncryption(), + ) + + with open(out_der_file, 'wb') as f: + f.write(der_key) # Generate the Public and Private key pair binaries def generate_keypair_bin(pem_file, out_privkey_bin, out_pubkey_bin):