Files
Mahesh Pimpale 42075d5c75 components/esp_matter: generated data model using automated script
- data_model/legacy/: moved old data model to this folder
- data_model/generated/: contain the automatically generated data model
- tools/data_model_gen: contains the script to generate the data model
2026-04-15 17:05:50 +05:30

497 lines
18 KiB
Python

# Copyright 2026 Espressif Systems (Shanghai) PTE LTD
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
import logging
from utils.base_elements import (
BaseCluster,
BaseAttribute,
BaseCommand,
BaseEvent,
BaseFeature,
BaseDevice,
)
from .conformance_codegen import Conformance, ConformanceDecision, FeatureConformance
from typing import Dict, List, Tuple
from utils.conversion_utils import convert_to_int
from utils.overrides import (
get_overridden_cluster_init_callback_name,
get_overridden_cluster_shutdown_callback_name,
)
logger = logging.getLogger(__name__)
def get_id_name_lambda():
"""Returns a lambda function for sorting by ID and name"""
return lambda x: (convert_to_int(x.get_id()), x.name)
def get_choice_group(
parent_feature_type: str,
conformance_type: ConformanceDecision,
features: List[Feature],
):
"""
Get groups of parent feature with their dependent features.
Returns a list of dicts: {
"parent_feature": <Feature>,
"dependent_features": [<Feature>, ...],
"constraint": "at_least_one" | "exact_one" | None
}
"""
groups_by_key: Dict[Tuple[str, str], List] = {}
for feature in features:
if not feature.conformance:
continue
if feature.conformance.type != conformance_type:
continue
parent_name = getattr(feature.conformance, parent_feature_type, None)
if not parent_name or not feature.conformance.choice:
continue
key = (parent_name, feature.conformance.choice.marker)
if key not in groups_by_key:
groups_by_key[key] = []
groups_by_key[key].append(feature)
result = []
for (parent_name, _marker), dependents in groups_by_key.items():
parent = next(
(f for f in features if f.func_name == parent_name),
None,
)
if not parent:
continue
first = dependents[0]
if first.conformance.is_at_least_one():
constraint = "at_least_one"
elif first.conformance.is_exact_one():
constraint = "exact_one"
else:
constraint = None
result.append(
{
"parent_feature": parent,
"dependent_features": sorted(dependents, key=get_id_name_lambda()),
"constraint": constraint,
}
)
return result
class Cluster(BaseCluster):
"""Cluster class that inherits from BaseCluster"""
def __init__(self, name, id, revision, is_mandatory):
super().__init__(name=name, id=id, revision=revision, is_mandatory=is_mandatory)
self.attributes = []
self.commands = []
self.events = []
self.features = []
self.function_flags = ""
self.is_migrated_cluster = False
self.is_base_cluster = False
def get_attributes(self):
"""Get the list of attributes sorted by ID and name"""
return sorted(self.attributes, key=get_id_name_lambda())
def get_commands(self):
"""Get the list of commands sorted by ID and name"""
return sorted(self.commands, key=get_id_name_lambda())
def get_events(self) -> List[BaseEvent]:
"""Get the list of events sorted by ID and name"""
return sorted(self.events, key=get_id_name_lambda())
def get_features(self):
"""Get the list of features sorted by ID and name"""
return sorted(self.features, key=get_id_name_lambda())
def get_mandatory_attributes(self):
"""Get the list of mandatory attributes
Attribute is mandatory:
- if it is marked as mandatory in the cluster JSON file
- has no conformance condition or there is no NOT TERM present in the conformance condition
NOTE: NOT TERM indicate we have to create attribute by default while creating the cluster
"""
mandatory_attributes = []
for attr in self.get_attributes():
if attr.is_mandatory and (
attr.conformance.get_mandatory_condition() is None
or attr.conformance.is_not_term_present
):
mandatory_attributes.append(attr)
return mandatory_attributes
def get_mandatory_commands(self):
"""Get the list of mandatory commands
Command is mandatory:
- if it is marked as mandatory in the cluster JSON file
- has no conformance condition or there is no NOT TERM present in the conformance condition
NOTE: NOT TERM indicate we have to create command by default while creating the cluster
"""
mandatory_commands = []
for cmd in self.get_commands():
if cmd.is_mandatory and (
cmd.conformance.get_mandatory_condition() is None
or cmd.conformance.is_not_term_present
):
mandatory_commands.append(cmd)
return mandatory_commands
def get_mandatory_events(self):
"""Get the list of mandatory events
Event is mandatory:
- if it is marked as mandatory in the cluster JSON file
- has no conformance condition or there is no NOT TERM present in the conformance condition
NOTE: NOT TERM indicate we have to create event by default while creating the cluster
"""
mandatory_events = []
for event in self.get_events():
if event.is_mandatory and (
event.conformance.get_mandatory_condition() is None
or event.conformance.is_not_term_present
):
mandatory_events.append(event)
return mandatory_events
def get_all_exact_one_features(self):
"""Get all features that require exactly one feature to be supported"""
feature_list = {}
for feature in self.features:
if not feature.conformance:
continue
if feature.conformance.is_exact_one():
key = feature.conformance.choice.marker
if key not in feature_list:
feature_list[key] = []
feature_list[key].append(feature)
return feature_list
def get_all_at_least_one_features(self) -> Dict:
"""Get all features that require at least one feature to be supported"""
feature_list = {}
for feature in self.features:
if not feature.conformance:
continue
if feature.conformance.is_at_least_one():
key = feature.conformance.choice.marker
if key not in feature_list:
feature_list[key] = []
feature_list[key].append(feature)
return feature_list
def has_choice_features(self):
"""Check if the cluster has any choice features"""
return (
len(self.get_all_exact_one_features()) > 0
or len(self.get_all_at_least_one_features()) > 0
)
def get_otherwise_choice_groups(self) -> List[Dict]:
return get_choice_group(
"mandatory_parent", ConformanceDecision.OTHERWISE, self.features
)
def get_optional_choice_groups(self) -> List[Dict]:
return get_choice_group(
"optional_parent", ConformanceDecision.OPTIONAL, self.features
)
def get_mandatory_choice_groups(self) -> List[Dict]:
return get_choice_group(
"mandatory_parent", ConformanceDecision.MANDATORY, self.features
)
def get_choice_group_feature_set(self) -> set:
"""Set of features that are parent or dependent in any otherwise choice group."""
feature_set = set()
for group in self.get_otherwise_choice_groups():
feature_set.add(group["parent_feature"])
feature_set.update(group["dependent_features"])
for group in self.get_optional_choice_groups():
feature_set.update(group["dependent_features"])
for group in self.get_mandatory_choice_groups():
feature_set.add(group["parent_feature"])
feature_set.update(group["dependent_features"])
return feature_set
def get_standalone_choice_groups(self) -> List[Tuple[str, List]]:
"""
Choice groups that do not have a mandatory parent (not otherwise groups).
Returns list of (constraint_type, features) where constraint_type is
"exact_one", or "at_least_one".
"""
all_choice_set = self.get_choice_group_feature_set()
choice_group_list = []
for marker, features in self.get_all_exact_one_features().items():
if not all(f in all_choice_set for f in features):
choice_group_list.append(("exact_one", features))
for marker, features in self.get_all_at_least_one_features().items():
if not all(f in all_choice_set for f in features):
choice_group_list.append(("at_least_one", features))
return choice_group_list
def get_independent_features(self) -> List:
"""Features that are not part of any otherwise choice group (parent or dependent)."""
choice_set = self.get_choice_group_feature_set()
choice_groups_list = self.get_standalone_choice_groups()
choice_groups_set = set()
for type, features in choice_groups_list:
choice_groups_set.update(features)
final_choice_set = choice_set | choice_groups_set
return [f for f in self.get_features() if f not in final_choice_set]
def get_cluster_init_callback(self):
"""Get the cluster init callback name"""
return get_overridden_cluster_init_callback_name(self.id, self.chip_name)
def get_cluster_shutdown_callback(self):
"""Get the cluster shutdown callback name"""
return get_overridden_cluster_shutdown_callback_name(self.id, self.chip_name)
def get_response_command(self, command_name: str):
"""Get the response command for a given command name"""
for command in self.commands:
if command.name == command_name:
return command
return None
def get_destroyable_elements(self, feature_name: str):
"""Get the list of destroyable elements for a given feature name"""
elements = {
"attributes": [],
"commands": [],
"events": [],
}
for attribute in self.attributes:
conformance_condition = (
attribute.conformance.get_mandatory_condition()
if attribute.conformance
and attribute.conformance.get_mandatory_condition() is not None
else ""
)
if (
attribute.conformance.is_not_term_present
and f"!(has_feature({feature_name})" in conformance_condition
):
elements["attributes"].append(attribute)
for command in self.commands:
conformance_condition = (
command.conformance.get_mandatory_condition()
if command.conformance
and command.conformance.get_mandatory_condition() is not None
else ""
)
if (
command.conformance.is_not_term_present
and f"!(has_feature({feature_name})" in conformance_condition
):
elements["commands"].append(command)
for event in self.events:
conformance_condition = (
event.conformance.get_mandatory_condition()
if event.conformance
and event.conformance.get_mandatory_condition() is not None
else ""
)
if (
event.conformance.is_not_term_present
and f"!(has_feature({feature_name})" in conformance_condition
):
elements["events"].append(event)
return elements
class Attribute(BaseAttribute):
"""Attribute class that inherits from BaseAttribute"""
def __init__(self, name, id, type_, is_mandatory, default_value):
super().__init__(
name=name,
id=id,
type_=type_,
is_mandatory=is_mandatory,
default_value=default_value,
)
self.converted_type = None
self.is_nullable = False
self._flag = None
self.max_length = 0 # For string attributes
self.min_value = None # For attribute bounds
self.max_value = None # For attribute bounds
self.conformance = Conformance()
self.is_internally_managed = False
self.is_complex = False
def get_flag(self):
"""Get the attribute flags"""
return self._flag
def get_type(self):
"""Get the attribute type"""
return self.converted_type
def get_min_value(self):
"""Get the min value"""
return self.min_value
def get_max_value(self):
"""Get the max value"""
if self.max_value is None:
return self.get_default_value()
return self.max_value
def get_default_value(self):
"""Get the default value"""
return self.default_value
def get_default_value_type(self):
"""Get the ESP type for the default value"""
value = self.get_default_value()
int_value = convert_to_int(value)
if int_value is None:
return "uint32_t"
elif int_value <= 255:
return "uint8_t"
elif int_value <= 65535:
return "uint16_t"
return "uint32_t"
def get_conformance_condition(self):
"""Get the conformance condition"""
return self.conformance.get_mandatory_condition()
class Command(BaseCommand):
"""Command class that inherits from BaseCommand"""
def __init__(self, name, id, is_mandatory, direction, response):
super().__init__(
name=name,
id=id,
is_mandatory=is_mandatory,
direction=direction,
response=response,
)
self._flag = ""
self.has_callback = False
self.conformance = Conformance()
self.is_fabric_scoped = False
def get_flag(self):
"""Get the command flags"""
return self._flag
def get_conformance_condition(self):
"""Get the conformance condition"""
return self.conformance.get_mandatory_condition()
class Feature(BaseFeature):
"""Feature class that inherits from BaseFeature"""
def __init__(self, name, id, code, is_mandatory):
super().__init__(name=name, id=id, is_mandatory=is_mandatory)
self.attributes = []
self.commands = []
self.events = []
self.conformance = FeatureConformance()
self.code = code
def get_attributes(self):
"""Get the list of attributes sorted by ID and name"""
return sorted(self.attributes, key=get_id_name_lambda())
def get_externally_managed_attributes(self):
"""Get the list of attributes that are externally managed"""
return [attr for attr in self.attributes if not attr.is_internally_managed]
def get_commands(self):
"""Get the list of commands sorted by ID and name"""
return sorted(self.commands, key=get_id_name_lambda())
def get_events(self) -> List[BaseEvent]:
"""Get the list of events sorted by ID and name"""
return sorted(self.events, key=get_id_name_lambda())
def get_conformance_condition(self):
"""Get the conformance condition"""
return self.conformance.get_optional_condition()
class Event(BaseEvent):
"""Event class that inherits from BaseEvent"""
def __init__(self, name, id, is_mandatory):
super().__init__(name=name, id=id, is_mandatory=is_mandatory)
self.priority = "Info" # Default priority
self.conformance = Conformance()
def get_conformance_condition(self):
"""Get the conformance condition"""
return self.conformance.get_mandatory_condition()
class Device(BaseDevice):
"""Device class that inherits from BaseDevice"""
def __init__(self, id, name, revision):
super().__init__(id=id, name=name, revision=revision)
self.clusters = [] # List of Cluster objects
def get_device_type_id(self):
"""Return the device type ID"""
return self.id
def get_device_type_version(self):
"""Get the device type version"""
return self.revision
def get_clusters(self):
"""Get all clusters sorted by ID and server/client type"""
return sorted(
self.clusters,
key=lambda x: (convert_to_int(x.get_id()), not x.server_cluster),
)
def binding_cluster_available(self) -> bool:
"""Check if a binding cluster is available"""
return any(cluster.client_cluster for cluster in self.get_mandatory_clusters())
def get_mandatory_clusters(self):
"""Get all mandatory clusters"""
return [cluster for cluster in self.clusters if cluster.is_mandatory]
def get_unique_mandatory_clusters(self):
"""Get all unique mandatory clusters"""
return [
cluster for cluster in self.get_unique_clusters() if cluster.is_mandatory
]
def get_unique_clusters(self):
"""Get unique clusters (no duplicates) sorted by ID"""
unique_clusters = []
seen_names = set()
# Add the descriptor and binding clusters to the set of seen names as they are always present
seen_names.add("descriptor")
seen_names.add("binding")
sorted_clusters = self.get_clusters()
for cluster in sorted_clusters:
if cluster.esp_name not in seen_names:
unique_clusters.append(cluster)
seen_names.add(cluster.esp_name)
return unique_clusters