mirror of
https://github.com/espressif/esp-matter.git
synced 2026-04-27 19:13:13 +00:00
42075d5c75
- data_model/legacy/: moved old data model to this folder - data_model/generated/: contain the automatically generated data model - tools/data_model_gen: contains the script to generate the data model
497 lines
18 KiB
Python
497 lines
18 KiB
Python
# Copyright 2026 Espressif Systems (Shanghai) PTE LTD
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
from __future__ import annotations
|
|
import logging
|
|
from utils.base_elements import (
|
|
BaseCluster,
|
|
BaseAttribute,
|
|
BaseCommand,
|
|
BaseEvent,
|
|
BaseFeature,
|
|
BaseDevice,
|
|
)
|
|
from .conformance_codegen import Conformance, ConformanceDecision, FeatureConformance
|
|
from typing import Dict, List, Tuple
|
|
from utils.conversion_utils import convert_to_int
|
|
from utils.overrides import (
|
|
get_overridden_cluster_init_callback_name,
|
|
get_overridden_cluster_shutdown_callback_name,
|
|
)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def get_id_name_lambda():
|
|
"""Returns a lambda function for sorting by ID and name"""
|
|
return lambda x: (convert_to_int(x.get_id()), x.name)
|
|
|
|
|
|
def get_choice_group(
|
|
parent_feature_type: str,
|
|
conformance_type: ConformanceDecision,
|
|
features: List[Feature],
|
|
):
|
|
"""
|
|
Get groups of parent feature with their dependent features.
|
|
Returns a list of dicts: {
|
|
"parent_feature": <Feature>,
|
|
"dependent_features": [<Feature>, ...],
|
|
"constraint": "at_least_one" | "exact_one" | None
|
|
}
|
|
"""
|
|
groups_by_key: Dict[Tuple[str, str], List] = {}
|
|
for feature in features:
|
|
if not feature.conformance:
|
|
continue
|
|
if feature.conformance.type != conformance_type:
|
|
continue
|
|
parent_name = getattr(feature.conformance, parent_feature_type, None)
|
|
if not parent_name or not feature.conformance.choice:
|
|
continue
|
|
key = (parent_name, feature.conformance.choice.marker)
|
|
if key not in groups_by_key:
|
|
groups_by_key[key] = []
|
|
groups_by_key[key].append(feature)
|
|
|
|
result = []
|
|
for (parent_name, _marker), dependents in groups_by_key.items():
|
|
parent = next(
|
|
(f for f in features if f.func_name == parent_name),
|
|
None,
|
|
)
|
|
if not parent:
|
|
continue
|
|
first = dependents[0]
|
|
if first.conformance.is_at_least_one():
|
|
constraint = "at_least_one"
|
|
elif first.conformance.is_exact_one():
|
|
constraint = "exact_one"
|
|
else:
|
|
constraint = None
|
|
result.append(
|
|
{
|
|
"parent_feature": parent,
|
|
"dependent_features": sorted(dependents, key=get_id_name_lambda()),
|
|
"constraint": constraint,
|
|
}
|
|
)
|
|
return result
|
|
|
|
|
|
class Cluster(BaseCluster):
|
|
"""Cluster class that inherits from BaseCluster"""
|
|
|
|
def __init__(self, name, id, revision, is_mandatory):
|
|
super().__init__(name=name, id=id, revision=revision, is_mandatory=is_mandatory)
|
|
self.attributes = []
|
|
self.commands = []
|
|
self.events = []
|
|
self.features = []
|
|
self.function_flags = ""
|
|
self.is_migrated_cluster = False
|
|
self.is_base_cluster = False
|
|
|
|
def get_attributes(self):
|
|
"""Get the list of attributes sorted by ID and name"""
|
|
return sorted(self.attributes, key=get_id_name_lambda())
|
|
|
|
def get_commands(self):
|
|
"""Get the list of commands sorted by ID and name"""
|
|
return sorted(self.commands, key=get_id_name_lambda())
|
|
|
|
def get_events(self) -> List[BaseEvent]:
|
|
"""Get the list of events sorted by ID and name"""
|
|
return sorted(self.events, key=get_id_name_lambda())
|
|
|
|
def get_features(self):
|
|
"""Get the list of features sorted by ID and name"""
|
|
return sorted(self.features, key=get_id_name_lambda())
|
|
|
|
def get_mandatory_attributes(self):
|
|
"""Get the list of mandatory attributes
|
|
Attribute is mandatory:
|
|
- if it is marked as mandatory in the cluster JSON file
|
|
- has no conformance condition or there is no NOT TERM present in the conformance condition
|
|
NOTE: NOT TERM indicate we have to create attribute by default while creating the cluster
|
|
"""
|
|
mandatory_attributes = []
|
|
for attr in self.get_attributes():
|
|
if attr.is_mandatory and (
|
|
attr.conformance.get_mandatory_condition() is None
|
|
or attr.conformance.is_not_term_present
|
|
):
|
|
mandatory_attributes.append(attr)
|
|
return mandatory_attributes
|
|
|
|
def get_mandatory_commands(self):
|
|
"""Get the list of mandatory commands
|
|
Command is mandatory:
|
|
- if it is marked as mandatory in the cluster JSON file
|
|
- has no conformance condition or there is no NOT TERM present in the conformance condition
|
|
NOTE: NOT TERM indicate we have to create command by default while creating the cluster
|
|
"""
|
|
mandatory_commands = []
|
|
for cmd in self.get_commands():
|
|
if cmd.is_mandatory and (
|
|
cmd.conformance.get_mandatory_condition() is None
|
|
or cmd.conformance.is_not_term_present
|
|
):
|
|
mandatory_commands.append(cmd)
|
|
return mandatory_commands
|
|
|
|
def get_mandatory_events(self):
|
|
"""Get the list of mandatory events
|
|
Event is mandatory:
|
|
- if it is marked as mandatory in the cluster JSON file
|
|
- has no conformance condition or there is no NOT TERM present in the conformance condition
|
|
NOTE: NOT TERM indicate we have to create event by default while creating the cluster
|
|
"""
|
|
mandatory_events = []
|
|
for event in self.get_events():
|
|
if event.is_mandatory and (
|
|
event.conformance.get_mandatory_condition() is None
|
|
or event.conformance.is_not_term_present
|
|
):
|
|
mandatory_events.append(event)
|
|
return mandatory_events
|
|
|
|
def get_all_exact_one_features(self):
|
|
"""Get all features that require exactly one feature to be supported"""
|
|
feature_list = {}
|
|
for feature in self.features:
|
|
if not feature.conformance:
|
|
continue
|
|
if feature.conformance.is_exact_one():
|
|
key = feature.conformance.choice.marker
|
|
if key not in feature_list:
|
|
feature_list[key] = []
|
|
feature_list[key].append(feature)
|
|
return feature_list
|
|
|
|
def get_all_at_least_one_features(self) -> Dict:
|
|
"""Get all features that require at least one feature to be supported"""
|
|
feature_list = {}
|
|
for feature in self.features:
|
|
if not feature.conformance:
|
|
continue
|
|
if feature.conformance.is_at_least_one():
|
|
key = feature.conformance.choice.marker
|
|
if key not in feature_list:
|
|
feature_list[key] = []
|
|
feature_list[key].append(feature)
|
|
return feature_list
|
|
|
|
def has_choice_features(self):
|
|
"""Check if the cluster has any choice features"""
|
|
return (
|
|
len(self.get_all_exact_one_features()) > 0
|
|
or len(self.get_all_at_least_one_features()) > 0
|
|
)
|
|
|
|
def get_otherwise_choice_groups(self) -> List[Dict]:
|
|
return get_choice_group(
|
|
"mandatory_parent", ConformanceDecision.OTHERWISE, self.features
|
|
)
|
|
|
|
def get_optional_choice_groups(self) -> List[Dict]:
|
|
return get_choice_group(
|
|
"optional_parent", ConformanceDecision.OPTIONAL, self.features
|
|
)
|
|
|
|
def get_mandatory_choice_groups(self) -> List[Dict]:
|
|
return get_choice_group(
|
|
"mandatory_parent", ConformanceDecision.MANDATORY, self.features
|
|
)
|
|
|
|
def get_choice_group_feature_set(self) -> set:
|
|
"""Set of features that are parent or dependent in any otherwise choice group."""
|
|
feature_set = set()
|
|
for group in self.get_otherwise_choice_groups():
|
|
feature_set.add(group["parent_feature"])
|
|
feature_set.update(group["dependent_features"])
|
|
for group in self.get_optional_choice_groups():
|
|
feature_set.update(group["dependent_features"])
|
|
for group in self.get_mandatory_choice_groups():
|
|
feature_set.add(group["parent_feature"])
|
|
feature_set.update(group["dependent_features"])
|
|
return feature_set
|
|
|
|
def get_standalone_choice_groups(self) -> List[Tuple[str, List]]:
|
|
"""
|
|
Choice groups that do not have a mandatory parent (not otherwise groups).
|
|
Returns list of (constraint_type, features) where constraint_type is
|
|
"exact_one", or "at_least_one".
|
|
"""
|
|
all_choice_set = self.get_choice_group_feature_set()
|
|
choice_group_list = []
|
|
for marker, features in self.get_all_exact_one_features().items():
|
|
if not all(f in all_choice_set for f in features):
|
|
choice_group_list.append(("exact_one", features))
|
|
for marker, features in self.get_all_at_least_one_features().items():
|
|
if not all(f in all_choice_set for f in features):
|
|
choice_group_list.append(("at_least_one", features))
|
|
return choice_group_list
|
|
|
|
def get_independent_features(self) -> List:
|
|
"""Features that are not part of any otherwise choice group (parent or dependent)."""
|
|
choice_set = self.get_choice_group_feature_set()
|
|
choice_groups_list = self.get_standalone_choice_groups()
|
|
choice_groups_set = set()
|
|
for type, features in choice_groups_list:
|
|
choice_groups_set.update(features)
|
|
final_choice_set = choice_set | choice_groups_set
|
|
return [f for f in self.get_features() if f not in final_choice_set]
|
|
|
|
def get_cluster_init_callback(self):
|
|
"""Get the cluster init callback name"""
|
|
return get_overridden_cluster_init_callback_name(self.id, self.chip_name)
|
|
|
|
def get_cluster_shutdown_callback(self):
|
|
"""Get the cluster shutdown callback name"""
|
|
return get_overridden_cluster_shutdown_callback_name(self.id, self.chip_name)
|
|
|
|
def get_response_command(self, command_name: str):
|
|
"""Get the response command for a given command name"""
|
|
for command in self.commands:
|
|
if command.name == command_name:
|
|
return command
|
|
return None
|
|
|
|
def get_destroyable_elements(self, feature_name: str):
|
|
"""Get the list of destroyable elements for a given feature name"""
|
|
elements = {
|
|
"attributes": [],
|
|
"commands": [],
|
|
"events": [],
|
|
}
|
|
for attribute in self.attributes:
|
|
conformance_condition = (
|
|
attribute.conformance.get_mandatory_condition()
|
|
if attribute.conformance
|
|
and attribute.conformance.get_mandatory_condition() is not None
|
|
else ""
|
|
)
|
|
if (
|
|
attribute.conformance.is_not_term_present
|
|
and f"!(has_feature({feature_name})" in conformance_condition
|
|
):
|
|
elements["attributes"].append(attribute)
|
|
for command in self.commands:
|
|
conformance_condition = (
|
|
command.conformance.get_mandatory_condition()
|
|
if command.conformance
|
|
and command.conformance.get_mandatory_condition() is not None
|
|
else ""
|
|
)
|
|
if (
|
|
command.conformance.is_not_term_present
|
|
and f"!(has_feature({feature_name})" in conformance_condition
|
|
):
|
|
elements["commands"].append(command)
|
|
for event in self.events:
|
|
conformance_condition = (
|
|
event.conformance.get_mandatory_condition()
|
|
if event.conformance
|
|
and event.conformance.get_mandatory_condition() is not None
|
|
else ""
|
|
)
|
|
if (
|
|
event.conformance.is_not_term_present
|
|
and f"!(has_feature({feature_name})" in conformance_condition
|
|
):
|
|
elements["events"].append(event)
|
|
return elements
|
|
|
|
|
|
class Attribute(BaseAttribute):
|
|
"""Attribute class that inherits from BaseAttribute"""
|
|
|
|
def __init__(self, name, id, type_, is_mandatory, default_value):
|
|
super().__init__(
|
|
name=name,
|
|
id=id,
|
|
type_=type_,
|
|
is_mandatory=is_mandatory,
|
|
default_value=default_value,
|
|
)
|
|
self.converted_type = None
|
|
self.is_nullable = False
|
|
self._flag = None
|
|
self.max_length = 0 # For string attributes
|
|
self.min_value = None # For attribute bounds
|
|
self.max_value = None # For attribute bounds
|
|
|
|
self.conformance = Conformance()
|
|
self.is_internally_managed = False
|
|
self.is_complex = False
|
|
|
|
def get_flag(self):
|
|
"""Get the attribute flags"""
|
|
return self._flag
|
|
|
|
def get_type(self):
|
|
"""Get the attribute type"""
|
|
return self.converted_type
|
|
|
|
def get_min_value(self):
|
|
"""Get the min value"""
|
|
return self.min_value
|
|
|
|
def get_max_value(self):
|
|
"""Get the max value"""
|
|
if self.max_value is None:
|
|
return self.get_default_value()
|
|
return self.max_value
|
|
|
|
def get_default_value(self):
|
|
"""Get the default value"""
|
|
return self.default_value
|
|
|
|
def get_default_value_type(self):
|
|
"""Get the ESP type for the default value"""
|
|
value = self.get_default_value()
|
|
int_value = convert_to_int(value)
|
|
if int_value is None:
|
|
return "uint32_t"
|
|
elif int_value <= 255:
|
|
return "uint8_t"
|
|
elif int_value <= 65535:
|
|
return "uint16_t"
|
|
return "uint32_t"
|
|
|
|
def get_conformance_condition(self):
|
|
"""Get the conformance condition"""
|
|
return self.conformance.get_mandatory_condition()
|
|
|
|
|
|
class Command(BaseCommand):
|
|
"""Command class that inherits from BaseCommand"""
|
|
|
|
def __init__(self, name, id, is_mandatory, direction, response):
|
|
super().__init__(
|
|
name=name,
|
|
id=id,
|
|
is_mandatory=is_mandatory,
|
|
direction=direction,
|
|
response=response,
|
|
)
|
|
self._flag = ""
|
|
self.has_callback = False
|
|
self.conformance = Conformance()
|
|
self.is_fabric_scoped = False
|
|
|
|
def get_flag(self):
|
|
"""Get the command flags"""
|
|
return self._flag
|
|
|
|
def get_conformance_condition(self):
|
|
"""Get the conformance condition"""
|
|
return self.conformance.get_mandatory_condition()
|
|
|
|
|
|
class Feature(BaseFeature):
|
|
"""Feature class that inherits from BaseFeature"""
|
|
|
|
def __init__(self, name, id, code, is_mandatory):
|
|
super().__init__(name=name, id=id, is_mandatory=is_mandatory)
|
|
self.attributes = []
|
|
self.commands = []
|
|
self.events = []
|
|
self.conformance = FeatureConformance()
|
|
self.code = code
|
|
|
|
def get_attributes(self):
|
|
"""Get the list of attributes sorted by ID and name"""
|
|
return sorted(self.attributes, key=get_id_name_lambda())
|
|
|
|
def get_externally_managed_attributes(self):
|
|
"""Get the list of attributes that are externally managed"""
|
|
return [attr for attr in self.attributes if not attr.is_internally_managed]
|
|
|
|
def get_commands(self):
|
|
"""Get the list of commands sorted by ID and name"""
|
|
return sorted(self.commands, key=get_id_name_lambda())
|
|
|
|
def get_events(self) -> List[BaseEvent]:
|
|
"""Get the list of events sorted by ID and name"""
|
|
return sorted(self.events, key=get_id_name_lambda())
|
|
|
|
def get_conformance_condition(self):
|
|
"""Get the conformance condition"""
|
|
return self.conformance.get_optional_condition()
|
|
|
|
|
|
class Event(BaseEvent):
|
|
"""Event class that inherits from BaseEvent"""
|
|
|
|
def __init__(self, name, id, is_mandatory):
|
|
super().__init__(name=name, id=id, is_mandatory=is_mandatory)
|
|
self.priority = "Info" # Default priority
|
|
self.conformance = Conformance()
|
|
|
|
def get_conformance_condition(self):
|
|
"""Get the conformance condition"""
|
|
return self.conformance.get_mandatory_condition()
|
|
|
|
|
|
class Device(BaseDevice):
|
|
"""Device class that inherits from BaseDevice"""
|
|
|
|
def __init__(self, id, name, revision):
|
|
super().__init__(id=id, name=name, revision=revision)
|
|
self.clusters = [] # List of Cluster objects
|
|
|
|
def get_device_type_id(self):
|
|
"""Return the device type ID"""
|
|
return self.id
|
|
|
|
def get_device_type_version(self):
|
|
"""Get the device type version"""
|
|
return self.revision
|
|
|
|
def get_clusters(self):
|
|
"""Get all clusters sorted by ID and server/client type"""
|
|
return sorted(
|
|
self.clusters,
|
|
key=lambda x: (convert_to_int(x.get_id()), not x.server_cluster),
|
|
)
|
|
|
|
def binding_cluster_available(self) -> bool:
|
|
"""Check if a binding cluster is available"""
|
|
return any(cluster.client_cluster for cluster in self.get_mandatory_clusters())
|
|
|
|
def get_mandatory_clusters(self):
|
|
"""Get all mandatory clusters"""
|
|
return [cluster for cluster in self.clusters if cluster.is_mandatory]
|
|
|
|
def get_unique_mandatory_clusters(self):
|
|
"""Get all unique mandatory clusters"""
|
|
return [
|
|
cluster for cluster in self.get_unique_clusters() if cluster.is_mandatory
|
|
]
|
|
|
|
def get_unique_clusters(self):
|
|
"""Get unique clusters (no duplicates) sorted by ID"""
|
|
unique_clusters = []
|
|
seen_names = set()
|
|
# Add the descriptor and binding clusters to the set of seen names as they are always present
|
|
seen_names.add("descriptor")
|
|
seen_names.add("binding")
|
|
sorted_clusters = self.get_clusters()
|
|
for cluster in sorted_clusters:
|
|
if cluster.esp_name not in seen_names:
|
|
unique_clusters.append(cluster)
|
|
seen_names.add(cluster.esp_name)
|
|
return unique_clusters
|