mirror of
https://github.com/espressif/esp-matter.git
synced 2026-04-27 19:13:13 +00:00
42075d5c75
- data_model/legacy/: moved old data model to this folder - data_model/generated/: contain the automatically generated data model - tools/data_model_gen: contains the script to generate the data model
157 lines
5.8 KiB
Python
157 lines
5.8 KiB
Python
# Copyright 2026 Espressif Systems (Shanghai) PTE LTD
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
import logging
|
|
import xml.etree.ElementTree as ET
|
|
from xml.etree.ElementTree import Element
|
|
from .elements import Cluster, Device
|
|
from utils.helper import check_valid_id, convert_to_snake_case, safe_get_attr
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def is_mandatory(element: Element) -> bool:
|
|
mandatory_conform = element.find("mandatoryConform")
|
|
return mandatory_conform is not None and len(mandatory_conform) == 0
|
|
|
|
|
|
class DeviceParser:
|
|
def can_skip(self, element: Element):
|
|
if not element.get("name"):
|
|
return True, "name is missing"
|
|
if not element.get("id"):
|
|
return True, "id is missing"
|
|
if not check_valid_id(element.get("id")):
|
|
return True, "id is not valid"
|
|
return False, "Unknown reason"
|
|
|
|
def parse(self, file_path):
|
|
"""Parse a device XML file and return the parsed device object.
|
|
|
|
:param file_path:
|
|
:returns: The parsed device object.
|
|
|
|
"""
|
|
tree = ET.parse(file_path)
|
|
root = tree.getroot()
|
|
|
|
skip, reason = self.can_skip(root)
|
|
if skip:
|
|
logger.warning(f"Skipping {file_path} reason : {reason}")
|
|
return None
|
|
|
|
device = self.create_device(root)
|
|
for cluster_elem in root.findall("clusters/cluster"):
|
|
skip, reason = self.can_skip(cluster_elem)
|
|
if skip:
|
|
logger.warning(
|
|
f"Skipping cluster {cluster_elem.get('name')} reason : {reason}"
|
|
)
|
|
continue
|
|
cluster = self.create_cluster(cluster_elem)
|
|
device.clusters.add(cluster)
|
|
logger.debug(
|
|
f"Processed device {safe_get_attr(device, 'name')} successfully with {len(device.clusters)} clusters"
|
|
)
|
|
return device
|
|
|
|
def create_device(self, root: Element):
|
|
"""Create a device object from the device XML element.
|
|
Assumes device has valid id and name.
|
|
"""
|
|
device = Device(
|
|
id=root.get("id"),
|
|
name=root.get("name"),
|
|
revision=root.get("revision", "Unknown"),
|
|
)
|
|
device.revision_history = self._get_revision_history(root)
|
|
device.classification = self._get_classification(root)
|
|
device.conditions = self._get_conditions(root)
|
|
return device
|
|
|
|
def create_cluster(self, cluster_elem: Element):
|
|
"""Create a cluster object from the cluster XML element.
|
|
Assumes cluster has valid id and name.
|
|
"""
|
|
cluster = Cluster(
|
|
name=cluster_elem.get("name"), id=cluster_elem.get("id"), revision=None
|
|
)
|
|
cluster.server_cluster = cluster_elem.get("side") == "server"
|
|
cluster.client_cluster = cluster_elem.get("side") == "client"
|
|
|
|
mandatory_conform = cluster_elem.find("mandatoryConform")
|
|
if (
|
|
mandatory_conform is not None
|
|
and mandatory_conform.find("condition") is None
|
|
):
|
|
cluster.is_mandatory = True
|
|
attribute_list = []
|
|
for attribute_elem in cluster_elem.findall("attributes/attribute"):
|
|
if is_mandatory(attribute_elem):
|
|
attribute_list.append(convert_to_snake_case(attribute_elem.get("name")))
|
|
cluster.attribute_name_list = attribute_list
|
|
|
|
feature_list = []
|
|
for feature_elem in cluster_elem.findall("features/feature"):
|
|
if is_mandatory(feature_elem):
|
|
feature_name = (
|
|
feature_elem.get("name")
|
|
if feature_elem.get("name")
|
|
else feature_elem.get("code")
|
|
)
|
|
feature_list.append(convert_to_snake_case(feature_name))
|
|
cluster.feature_name_list = feature_list
|
|
|
|
command_list = []
|
|
for command_elem in cluster_elem.findall("commands/command"):
|
|
if is_mandatory(command_elem):
|
|
command_list.append(convert_to_snake_case(command_elem.get("name")))
|
|
cluster.command_name_list = command_list
|
|
|
|
event_list = []
|
|
for event_elem in cluster_elem.findall("events/event"):
|
|
if is_mandatory(event_elem):
|
|
event_list.append(convert_to_snake_case(event_elem.get("name")))
|
|
cluster.event_name_list = event_list
|
|
|
|
return cluster
|
|
|
|
def _get_revision_history(self, root):
|
|
revision_history = []
|
|
for revision in root.findall("revisionHistory/revision"):
|
|
revision_history.append(
|
|
{
|
|
"revision": revision.get("revision"),
|
|
"summary": revision.get("summary"),
|
|
}
|
|
)
|
|
return revision_history
|
|
|
|
def _get_classification(self, root):
|
|
classification = {}
|
|
for classification_elem in root.findall("classification"):
|
|
for attr_name, attr_value in classification_elem.attrib.items():
|
|
classification[attr_name] = attr_value
|
|
return classification
|
|
|
|
def _get_conditions(self, root):
|
|
conditions = []
|
|
for condition_elem in root.findall("conditions/condition"):
|
|
conditions.append(
|
|
{
|
|
"name": condition_elem.get("name"),
|
|
"summary": condition_elem.get("summary"),
|
|
}
|
|
)
|
|
return conditions
|