Files
Mahesh Pimpale 42075d5c75 components/esp_matter: generated data model using automated script
- data_model/legacy/: moved old data model to this folder
- data_model/generated/: contain the automatically generated data model
- tools/data_model_gen: contains the script to generate the data model
2026-04-15 17:05:50 +05:30

157 lines
5.8 KiB
Python

# Copyright 2026 Espressif Systems (Shanghai) PTE LTD
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import xml.etree.ElementTree as ET
from xml.etree.ElementTree import Element
from .elements import Cluster, Device
from utils.helper import check_valid_id, convert_to_snake_case, safe_get_attr
logger = logging.getLogger(__name__)
def is_mandatory(element: Element) -> bool:
mandatory_conform = element.find("mandatoryConform")
return mandatory_conform is not None and len(mandatory_conform) == 0
class DeviceParser:
def can_skip(self, element: Element):
if not element.get("name"):
return True, "name is missing"
if not element.get("id"):
return True, "id is missing"
if not check_valid_id(element.get("id")):
return True, "id is not valid"
return False, "Unknown reason"
def parse(self, file_path):
"""Parse a device XML file and return the parsed device object.
:param file_path:
:returns: The parsed device object.
"""
tree = ET.parse(file_path)
root = tree.getroot()
skip, reason = self.can_skip(root)
if skip:
logger.warning(f"Skipping {file_path} reason : {reason}")
return None
device = self.create_device(root)
for cluster_elem in root.findall("clusters/cluster"):
skip, reason = self.can_skip(cluster_elem)
if skip:
logger.warning(
f"Skipping cluster {cluster_elem.get('name')} reason : {reason}"
)
continue
cluster = self.create_cluster(cluster_elem)
device.clusters.add(cluster)
logger.debug(
f"Processed device {safe_get_attr(device, 'name')} successfully with {len(device.clusters)} clusters"
)
return device
def create_device(self, root: Element):
"""Create a device object from the device XML element.
Assumes device has valid id and name.
"""
device = Device(
id=root.get("id"),
name=root.get("name"),
revision=root.get("revision", "Unknown"),
)
device.revision_history = self._get_revision_history(root)
device.classification = self._get_classification(root)
device.conditions = self._get_conditions(root)
return device
def create_cluster(self, cluster_elem: Element):
"""Create a cluster object from the cluster XML element.
Assumes cluster has valid id and name.
"""
cluster = Cluster(
name=cluster_elem.get("name"), id=cluster_elem.get("id"), revision=None
)
cluster.server_cluster = cluster_elem.get("side") == "server"
cluster.client_cluster = cluster_elem.get("side") == "client"
mandatory_conform = cluster_elem.find("mandatoryConform")
if (
mandatory_conform is not None
and mandatory_conform.find("condition") is None
):
cluster.is_mandatory = True
attribute_list = []
for attribute_elem in cluster_elem.findall("attributes/attribute"):
if is_mandatory(attribute_elem):
attribute_list.append(convert_to_snake_case(attribute_elem.get("name")))
cluster.attribute_name_list = attribute_list
feature_list = []
for feature_elem in cluster_elem.findall("features/feature"):
if is_mandatory(feature_elem):
feature_name = (
feature_elem.get("name")
if feature_elem.get("name")
else feature_elem.get("code")
)
feature_list.append(convert_to_snake_case(feature_name))
cluster.feature_name_list = feature_list
command_list = []
for command_elem in cluster_elem.findall("commands/command"):
if is_mandatory(command_elem):
command_list.append(convert_to_snake_case(command_elem.get("name")))
cluster.command_name_list = command_list
event_list = []
for event_elem in cluster_elem.findall("events/event"):
if is_mandatory(event_elem):
event_list.append(convert_to_snake_case(event_elem.get("name")))
cluster.event_name_list = event_list
return cluster
def _get_revision_history(self, root):
revision_history = []
for revision in root.findall("revisionHistory/revision"):
revision_history.append(
{
"revision": revision.get("revision"),
"summary": revision.get("summary"),
}
)
return revision_history
def _get_classification(self, root):
classification = {}
for classification_elem in root.findall("classification"):
for attr_name, attr_value in classification_elem.attrib.items():
classification[attr_name] = attr_value
return classification
def _get_conditions(self, root):
conditions = []
for condition_elem in root.findall("conditions/condition"):
conditions.append(
{
"name": condition_elem.get("name"),
"summary": condition_elem.get("summary"),
}
)
return conditions