mfg_tool: generate secure_cert partition and csv for registering device certificates to rainmaker cloud.

This commit is contained in:
sanket.wadekar
2023-06-20 16:14:27 +05:30
parent 949044f784
commit 5cedec54a3
2 changed files with 49 additions and 11 deletions
+31 -8
View File
@@ -35,6 +35,8 @@ from utils import *
from datetime import datetime
from types import SimpleNamespace
from esp_secure_cert.tlv_format import *
if not os.getenv('IDF_PATH'):
logging.error("IDF_PATH environment variable is not set")
sys.exit(1)
@@ -73,6 +75,7 @@ OUT_FILE = {
'mcsv': None,
'pin_csv': None,
'pin_disc_csv': None,
'cn_dac_csv': None
}
UUIDs = list()
@@ -158,7 +161,6 @@ def append_chip_mcsv_row(row_data):
with open(OUT_FILE['mcsv'], 'a') as f:
f.write(row_data + '\n')
def generate_pai(args, ca_key, ca_cert, out_key, out_cert):
cmd = [
TOOLS['chip-cert'], 'gen-att-cert',
@@ -187,15 +189,15 @@ def generate_pai(args, ca_key, ca_cert, out_key, out_cert):
def generate_dac(iteration, args, discriminator, passcode, ca_key, ca_cert):
out_key_pem = os.sep.join([OUT_DIR['top'], UUIDs[iteration], 'internal', 'DAC_key.pem'])
out_private_key_der = out_key_pem.replace('key.pem', 'key.der')
out_cert_pem = out_key_pem.replace('key.pem', 'cert.pem')
out_cert_der = out_key_pem.replace('key.pem', 'cert.der')
out_private_key_bin = out_key_pem.replace('key.pem', 'private_key.bin')
out_public_key_bin = out_key_pem.replace('key.pem', 'public_key.bin')
cmd = [
TOOLS['chip-cert'], 'gen-att-cert',
'--type', 'd',
'--subject-cn', '"{} DAC {}"'.format(args.cn_prefix, iteration),
'--subject-cn', UUIDs[iteration],
'--out-key', out_key_pem,
'--out', out_cert_pem,
]
@@ -221,8 +223,8 @@ def generate_dac(iteration, args, discriminator, passcode, ca_key, ca_cert):
generate_keypair_bin(out_key_pem, out_private_key_bin, out_public_key_bin)
logging.info('Generated DAC private key in binary format: {}'.format(out_private_key_bin))
logging.info('Generated DAC public key in binary format: {}'.format(out_public_key_bin))
return out_cert_der, out_private_key_bin, out_public_key_bin
convert_private_key_from_pem_to_der(out_key_pem, out_key_der)
return out_cert_der, out_private_key_bin, out_public_key_bin, out_key_der
def use_dac_from_args(args):
@@ -256,6 +258,7 @@ def setup_out_dirs(vid, pid, count):
OUT_FILE['mcsv'] = os.sep.join([OUT_DIR['stage'], 'master.csv'])
OUT_FILE['pin_csv'] = os.sep.join([OUT_DIR['stage'], 'pin.csv'])
OUT_FILE['pin_disc_csv'] = os.sep.join([OUT_DIR['stage'], 'pin_disc.csv'])
OUT_FILE['cn_dac_csv'] = os.sep.join([OUT_DIR['top'], 'cn_dacs-{}.csv'.format(datetime.now().strftime("%Y-%m-%d-%H-%M-%S"))])
# Create directories to store the generated files
for i in range(count):
@@ -273,9 +276,15 @@ def generate_passcodes_and_discriminators(args):
append_discriminator(discriminators)
def write_cn_dac_csv_header():
with open(OUT_FILE['cn_dac_csv'], 'a') as csv_file:
csv_file.write("CN,certs\n")
return
def write_csv_files(args):
generate_config_csv(args)
write_chip_mcsv_header(args)
write_cn_dac_csv_header()
def setup_root_certs(args):
@@ -313,6 +322,12 @@ def overwrite_values_in_mcsv(args, index):
chip_nvs_map_update(current_namespace, csv_data[0], csv_data[1], csv_data[2], mcsv_dict[csv_data[0]])
def append_cn_dac_to_csv(common_name, cert_path):
with open(OUT_FILE['cn_dac_csv'], 'a') as csv_file:
with open(cert_path, 'r') as device_cert_file:
device_cert_contents = device_cert_file.read()
csv_file.write('{},"{}"\n'.format(common_name, device_cert_contents))
# This function generates the DACs, picks the commissionable data from the already present csv file,
# and generates the onboarding payloads, and writes everything to the master csv
def write_per_device_unique_data(args):
@@ -324,7 +339,6 @@ def write_per_device_unique_data(args):
chip_factory_update('iteration-count', row['Iteration Count'])
chip_factory_update('salt', row['Salt'])
chip_factory_update('verifier', row['Verifier'])
if args.paa or args.pai:
if args.dac_key is not None and args.dac_cert is not None:
dacs = use_dac_from_args(args)
@@ -336,7 +350,12 @@ def write_per_device_unique_data(args):
chip_factory_update('dac-key', os.path.abspath(dacs[1]))
chip_factory_update('dac-pub-key', os.path.abspath(dacs[2]))
chip_factory_update('pai-cert', os.path.abspath(PAI['cert_der']))
secure_cert_partition_file_path = os.sep.join([OUT_DIR['top'], UUIDs[int(row['Index'])], UUIDs[int(row['Index'])] + '_esp_secure_cert.bin'])
# esp secure cert partition
priv_key = tlv_priv_key_t(key_type = tlv_priv_key_type_t.ESP_SECURE_CERT_DEFAULT_FORMAT_KEY, key_path = os.path.abspath(dacs[3]), key_pass = None)
generate_partition_no_ds(priv_key = priv_key, device_cert = os.path.abspath(dacs[0]), ca_cert = os.path.abspath(PAI['cert_der']), idf_target = args.node_platform, op_file = secure_cert_partition_file_path)
append_cn_dac_to_csv(UUIDs[int(row['Index'])], os.sep.join([OUT_DIR['top'], UUIDs[int(row['Index'])], "internal", "DAC_cert.pem"]))
# If serial number is not passed, then generate one
if (args.serial_num is None):
chip_factory_update('serial-num', binascii.b2a_hex(os.urandom(SERIAL_NUMBER_LEN)).decode('utf-8'))
@@ -352,6 +371,8 @@ def write_per_device_unique_data(args):
# Generate onboarding data
generate_onboarding_data(args, int(row['Index']), int(chip_factory_get_val('discriminator')), int(row['PIN Code']))
if args.paa or args.pai:
logging.info("Generated CSV of CN and DAC: {}".format(OUT_FILE['cn_dac_csv']))
def organize_output_files(suffix, args):
@@ -457,7 +478,6 @@ def generate_onboarding_data(args, index, discriminator, passcode):
def get_args():
def any_base_int(s): return int(s, 0)
parser = argparse.ArgumentParser(description='Manufacuring partition generator tool',
formatter_class=lambda prog: argparse.HelpFormatter(prog, max_help_position=50))
@@ -465,6 +485,9 @@ def get_args():
g_gen.add_argument('-n', '--count', type=any_base_int, default=1,
help='The number of manufacturing partition binaries to generate. Default is 1. \
If --csv and --mcsv are present, the number of lines in the mcsv file is used.')
g_gen.add_argument('--target', default='esp32',
help='The platform type of device. eg: one of esp32, esp32c3, etc.')
g_gen.add_argument('-s', '--size', type=any_base_int, default=0x6000,
help='The size of manufacturing partition binaries to generate. Default is 0x6000.')
g_gen.add_argument('-e', '--encrypt', action='store_true', required=False,
@@ -479,7 +502,7 @@ def get_args():
help='Device commissioning flow, 0:Standard, 1:User-Intent, 2:Custom. \
Default is 0.', choices=[0, 1, 2])
g_commissioning.add_argument('-dm', '--discovery-mode', type=any_base_int, default=1,
help='Commissionable device discovery netowrking technology. \
help='Commissionable device discovery networking technology. \
0:WiFi-SoftAP, 1:BLE, 2:On-network. Default is BLE.', choices=[0, 1, 2])
g_dac = parser.add_argument_group('Device attestation credential options')
+18 -3
View File
@@ -26,7 +26,8 @@ from bitarray import bitarray
from bitarray.util import ba2int
import cryptography.hazmat.backends
import cryptography.x509
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.backends import default_backend
ROTATING_DEVICE_ID_UNIQUE_ID_LEN_BITS = 128
SERIAL_NUMBER_LEN = 16
@@ -266,12 +267,26 @@ def convert_x509_cert_from_pem_to_der(pem_file, out_der_file):
with open(pem_file, 'rb') as f:
pem_data = f.read()
pem_cert = cryptography.x509.load_pem_x509_certificate(pem_data, cryptography.hazmat.backends.default_backend())
der_cert = pem_cert.public_bytes(cryptography.hazmat.primitives.serialization.Encoding.DER)
pem_cert = cryptography.x509.load_pem_x509_certificate(pem_data, default_backend())
der_cert = pem_cert.public_bytes(serialization.Encoding.DER)
with open(out_der_file, 'wb') as f:
f.write(der_cert)
def convert_private_key_from_pem_to_der(pem_file, out_der_file):
with open(pem_file, 'rb') as f:
pem_data = f.read()
pem_key = serialization.load_pem_private_key(pem_data, None, default_backend())
der_key = pem_key.private_bytes(
encoding=serialization.Encoding.DER,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption(),
)
with open(out_der_file, 'wb') as f:
f.write(der_key)
# Generate the Public and Private key pair binaries
def generate_keypair_bin(pem_file, out_privkey_bin, out_pubkey_bin):