Files
esp-matter/tools/data_model_gen/xml_processing/xml_parser.py
T
Mahesh Pimpale 42075d5c75 components/esp_matter: generated data model using automated script
- data_model/legacy/: moved old data model to this folder
- data_model/generated/: contain the automatically generated data model
- tools/data_model_gen: contains the script to generate the data model
2026-04-15 17:05:50 +05:30

187 lines
6.8 KiB
Python

# Copyright 2026 Espressif Systems (Shanghai) PTE LTD
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import logging
import xml.etree.ElementTree as ET
from .cluster_parser import ClusterParser
from .device_parser import DeviceParser
from .parse_context import load_cluster_parse_context
from utils.helper import write_to_file
from utils.exceptions import XmlParseError
from .yaml_parser import YamlParser
from utils.config import FileNames
logger = logging.getLogger(__name__)
def get_base_and_derived_cluster_files(input_dir):
"""Return (base_list, derived_list), each a list of (file_path, root).
Parses each XML once; roots are reused by the cluster parser to avoid double parse.
"""
base_cluster_files = []
derived_cluster_files = []
for file_name in os.listdir(input_dir):
if os.path.isfile(
os.path.join(input_dir, file_name)
) and not file_name.endswith(".xml"):
logger.debug(f"Skipping {file_name} as it is not a valid file")
continue
file_path = os.path.join(input_dir, file_name)
tree = ET.parse(file_path)
root = tree.getroot()
classification = root.find("classification")
if (
classification is not None
and classification.get("hierarchy", "") == "derived"
):
derived_cluster_files.append((file_path, root))
else:
base_cluster_files.append((file_path, root))
return base_cluster_files, derived_cluster_files
def write_to_json_file(file_path, data):
data_list = [item.to_dict() for item in data]
data_list.sort(key=lambda x: int(x.get("id", "0"), 16))
if not write_to_file(file_path, data_list, "json"):
raise XmlParseError(
"Failed to write JSON output",
file_path=file_path,
suggestion="Check write permissions and disk space.",
)
logger.info("GENERATED json at %s", file_path)
return file_path
def process_cluster_files(
input_dir,
output_dir,
yaml_file_path,
):
"""Process all cluster XML files from input directory and generate intermediate cluster json file.
First it generates base cluster objects as during parsing of the derived cluster files, the base cluster objects are needed.
Then it generates derived cluster objects.
:param input_dir: Path to the directory containing the cluster XML files.
:param output_dir: Path to the output directory.
:returns: None
"""
base_cluster_xml_files, derived_cluster_xml_files = (
get_base_and_derived_cluster_files(input_dir)
)
if len(base_cluster_xml_files) == 0 and len(derived_cluster_xml_files) == 0:
logger.error(f"No cluster XML files found in {input_dir}")
return
yaml_parser = YamlParser(yaml_file_path)
cluster_parser = ClusterParser()
context = load_cluster_parse_context(output_dir)
base_clusters = []
derived_clusters = []
for file_path, root in base_cluster_xml_files:
cluster_list = cluster_parser.parse(
file_path=file_path,
output_dir=output_dir,
yaml_parser=yaml_parser,
context=context,
root=root,
)
if cluster_list is None or len(cluster_list) == 0:
logger.error(
"Processing of cluster file failed | File: %s",
file_path,
)
continue
base_clusters.extend(cluster_list)
for file_path, root in derived_cluster_xml_files:
cluster_list = cluster_parser.parse(
file_path=file_path,
output_dir=output_dir,
base_clusters=base_clusters,
yaml_parser=yaml_parser,
context=context,
root=root,
)
if cluster_list is None or len(cluster_list) == 0:
logger.error(
"Processing of derived cluster file failed | File: %s",
file_path,
)
continue
derived_clusters.extend(cluster_list)
clusters = base_clusters + derived_clusters
write_to_json_file(os.path.join(output_dir, FileNames.CLUSTER_JSON.value), clusters)
def process_device_files(input_dir, output_dir):
"""Process all device XML files from input directory and generate intermediate device json file.
:param input_dir: Path to the directory containing the device XML files.
:param output_dir: Path to the output directory.
:returns: None
"""
devices = []
device_parser = DeviceParser()
for file_name in os.listdir(input_dir):
if os.path.isfile(os.path.join(input_dir, file_name)) and file_name.endswith(
".xml"
):
file_path = os.path.join(input_dir, file_name)
device = device_parser.parse(file_path=file_path)
if device is not None:
devices.append(device)
write_to_json_file(os.path.join(output_dir, FileNames.DEVICE_JSON.value), devices)
def process_single_files(cluster_file, device_file, output_dir, yaml_file_path=None):
"""Process single cluster and device files and generate intermediate cluster and device json files.
:param cluster_file: Path to the input cluster xml file to process.
:param device_file: Path to the input device xml file to process.
:param output_dir: Path to the output directory.
:param yaml_file_path: Path to the yaml configuration file.
:returns: None
"""
clusters = []
devices = []
if cluster_file is not None:
cluster_parser = ClusterParser()
context = load_cluster_parse_context(output_dir)
yaml_parser = YamlParser(yaml_file_path)
# Base cluster objects not available for single file processing
cluster_list = cluster_parser.parse(
file_path=cluster_file,
output_dir=output_dir,
yaml_parser=yaml_parser,
context=context,
)
if cluster_list is not None:
clusters.extend(cluster_list)
if device_file is not None:
device_parser = DeviceParser()
device = device_parser.parse(file_path=device_file)
if device is not None:
devices.append(device)
write_to_json_file(os.path.join(output_dir, FileNames.CLUSTER_JSON.value), clusters)
write_to_json_file(os.path.join(output_dir, FileNames.DEVICE_JSON.value), devices)