mirror of
https://github.com/espressif/esp-matter.git
synced 2026-04-27 19:13:13 +00:00
42075d5c75
- data_model/legacy/: moved old data model to this folder - data_model/generated/: contain the automatically generated data model - tools/data_model_gen: contains the script to generate the data model
581 lines
20 KiB
Python
581 lines
20 KiB
Python
# Copyright 2026 Espressif Systems (Shanghai) PTE LTD
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
import logging
|
|
from dataclasses import dataclass
|
|
from typing import List, Any
|
|
from .attribute_type import AttributeType
|
|
from utils.base_elements import (
|
|
BaseDevice,
|
|
BaseEvent,
|
|
BaseFeature,
|
|
BaseCommand,
|
|
BaseAttribute,
|
|
BaseCluster,
|
|
)
|
|
from utils.overrides import (
|
|
should_skip_delegate_callback,
|
|
should_skip_plugin_callback,
|
|
should_include_delegate_callback,
|
|
)
|
|
from utils.conversion_utils import convert_to_int
|
|
from .serializers import (
|
|
DeviceSerializer,
|
|
EventSerializer,
|
|
FeatureSerializer,
|
|
)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class Device(BaseDevice):
|
|
"""Device class that inherits from BaseDevice."""
|
|
|
|
def __init__(self, id, name, revision):
|
|
super().__init__(id=id, name=name, revision=revision)
|
|
self.clusters = set()
|
|
self.features = set()
|
|
self.commands = set()
|
|
self.attributes = set()
|
|
self.classification = {}
|
|
self.conformance = None
|
|
self.filename = self.esp_name + "_device"
|
|
self.revision_history = []
|
|
self.conditions = []
|
|
|
|
def add_feature(self, feature: str):
|
|
self.features.add(feature)
|
|
|
|
def add_command(self, command: str):
|
|
self.commands.add(command)
|
|
|
|
def add_attribute(self, attribute: str):
|
|
self.attributes.add(attribute)
|
|
|
|
def get_clusters(self) -> List[BaseCluster]:
|
|
return sorted(
|
|
self.clusters, key=lambda x: (int(x.get_id(), 16), not x.server_cluster)
|
|
)
|
|
|
|
def get_unique_clusters(self):
|
|
unique_clusters_dict = {}
|
|
for cluster in self.clusters:
|
|
cluster_id = cluster.get_id()
|
|
if cluster_id not in unique_clusters_dict:
|
|
unique_clusters_dict[cluster_id] = cluster
|
|
unique_clusters = list(unique_clusters_dict.values())
|
|
return sorted(
|
|
unique_clusters, key=lambda x: (int(x.get_id(), 16), not x.server_cluster)
|
|
)
|
|
|
|
def to_dict(self):
|
|
return DeviceSerializer.to_dict(self)
|
|
|
|
|
|
class Event(BaseEvent):
|
|
def __init__(self, id, name, is_mandatory):
|
|
super().__init__(name, id, is_mandatory)
|
|
self.conformance = None
|
|
|
|
def to_dict(self, attribute_map=None):
|
|
"""Convert event object to dictionary representation"""
|
|
return EventSerializer.to_dict(self, attribute_map)
|
|
|
|
|
|
class Feature(BaseFeature):
|
|
def __init__(self, name, code, id):
|
|
super().__init__(name, id, is_mandatory=False)
|
|
self.code = code
|
|
self.command_set = set()
|
|
self.attribute_set = set()
|
|
self.event_set = set()
|
|
self.summary = None
|
|
self.conformance = None
|
|
|
|
def add_attribute_list(self, attributes: set):
|
|
if attributes:
|
|
self.attribute_set.update(attributes)
|
|
|
|
def add_event_list(self, events: set):
|
|
if events:
|
|
self.event_set.update(events)
|
|
|
|
def get_attributes(self):
|
|
"""Returns the list of mandatory attributes for this feature"""
|
|
attr_list = list(self.attribute_set)
|
|
if len(attr_list) > 0:
|
|
attr_list.sort(key=lambda x: int(x.get_id(), 16))
|
|
return attr_list
|
|
|
|
def get_events(self) -> List[BaseEvent]:
|
|
"""Returns the list of mandatory events for this feature"""
|
|
event_list = list(self.event_set)
|
|
if len(event_list) > 0:
|
|
event_list.sort(key=lambda x: int(x.get_id(), 16))
|
|
return event_list
|
|
|
|
def add_command_list(self, commands):
|
|
if commands is not None:
|
|
self.command_set.update(commands)
|
|
|
|
def get_commands(self):
|
|
command_list = list(self.command_set)
|
|
if len(command_list) > 0:
|
|
command_list.sort(key=lambda x: int(x.get_id(), 16))
|
|
return command_list
|
|
|
|
def to_dict(self, attribute_map=None):
|
|
"""Convert feature object to dictionary representation"""
|
|
return FeatureSerializer.to_dict(self, attribute_map)
|
|
|
|
|
|
class Command(BaseCommand):
|
|
@dataclass(frozen=True)
|
|
class CommandFlags:
|
|
COMMAND_FLAG_NONE: str = "COMMAND_FLAG_NONE"
|
|
COMMAND_FLAG_CUSTOM: str = "COMMAND_FLAG_CUSTOM"
|
|
COMMAND_FLAG_ACCEPTED: str = "COMMAND_FLAG_ACCEPTED"
|
|
COMMAND_FLAG_GENERATED: str = "COMMAND_FLAG_GENERATED"
|
|
|
|
@dataclass
|
|
class CommandAccess:
|
|
invokePrivilege: str
|
|
timed: bool
|
|
fabric_scoped: bool
|
|
|
|
@dataclass
|
|
class CommandField:
|
|
id: Any
|
|
name: str
|
|
type_: str
|
|
default_value: Any = None
|
|
is_mandatory: bool = False
|
|
constraint: Any = None
|
|
|
|
def __init__(self, id, name, direction, response, is_mandatory):
|
|
super().__init__(
|
|
(
|
|
name.split(" ")[0]
|
|
if len(name.split(" ")) > 1 and name.split(" ")[1] == "Command"
|
|
else name
|
|
),
|
|
id,
|
|
is_mandatory,
|
|
direction,
|
|
response,
|
|
)
|
|
self.feature_list = set()
|
|
self.access = None
|
|
self.conformance = None
|
|
self.fields = [] # List of CommandField objects
|
|
self.feature_map = {}
|
|
|
|
self.skip_command_cb = False
|
|
self.command_handler_available = False
|
|
|
|
def set_access(self, access):
|
|
self.access = access
|
|
|
|
def set_conformance(self, conformance):
|
|
self.conformance = conformance
|
|
|
|
def add_field(self, field):
|
|
self.fields.append(field)
|
|
|
|
def get_flag(self):
|
|
if self.direction and self.direction.lower() == "commandtoserver":
|
|
return self.CommandFlags.COMMAND_FLAG_ACCEPTED
|
|
elif self.direction and self.direction.lower() == "responsefromserver":
|
|
return self.CommandFlags.COMMAND_FLAG_GENERATED
|
|
return self.CommandFlags.COMMAND_FLAG_NONE
|
|
|
|
def callback_required(self):
|
|
"""Determine if a command requires a callback"""
|
|
if self.command_handler_available:
|
|
return False
|
|
|
|
# If command is part of a cluster file with multiple cluster ids e.g. ResourceMonitoring
|
|
if self.skip_command_cb:
|
|
return False
|
|
|
|
# Skip callbacks for client-bound commands
|
|
if self.direction is not None and self.direction.lower() != "commandtoserver":
|
|
return False
|
|
|
|
# Commands with response='Y' or specific response commands need callbacks
|
|
if self.response is None or self.response == "N":
|
|
return False
|
|
|
|
# Check if this is a response command (ends with 'Response')
|
|
if self.name and self.name.endswith("Response"):
|
|
return False
|
|
|
|
return True
|
|
|
|
def to_dict(self, attribute_map=None):
|
|
"""Convert command object to dictionary representation"""
|
|
from .serializers import CommandSerializer
|
|
|
|
return CommandSerializer.to_dict(self, attribute_map)
|
|
|
|
|
|
class Attribute(BaseAttribute):
|
|
@dataclass(frozen=True)
|
|
class AttributeFlags:
|
|
ATTRIBUTE_FLAG_NONE: str = "ATTRIBUTE_FLAG_NONE"
|
|
ATTRIBUTE_FLAG_WRITABLE: str = "ATTRIBUTE_FLAG_WRITABLE"
|
|
ATTRIBUTE_FLAG_NONVOLATILE: str = "ATTRIBUTE_FLAG_NONVOLATILE"
|
|
ATTRIBUTE_FLAG_MIN_MAX: str = "ATTRIBUTE_FLAG_MIN_MAX"
|
|
ATTRIBUTE_FLAG_MUST_USE_TIMED_WRITE: str = "ATTRIBUTE_FLAG_MUST_USE_TIMED_WRITE"
|
|
ATTRIBUTE_FLAG_EXTERNAL_STORAGE: str = "ATTRIBUTE_FLAG_EXTERNAL_STORAGE"
|
|
ATTRIBUTE_FLAG_SINGLETON: str = "ATTRIBUTE_FLAG_SINGLETON"
|
|
ATTRIBUTE_FLAG_NULLABLE: str = "ATTRIBUTE_FLAG_NULLABLE"
|
|
ATTRIBUTE_FLAG_OVERRIDE: str = "ATTRIBUTE_FLAG_OVERRIDE"
|
|
ATTRIBUTE_FLAG_DEFERRED: str = "ATTRIBUTE_FLAG_DEFERRED"
|
|
ATTRIBUTE_FLAG_MANAGED_INTERNALLY: str = "ATTRIBUTE_FLAG_MANAGED_INTERNALLY"
|
|
|
|
@dataclass
|
|
class Access:
|
|
read: str
|
|
readPrivilege: str
|
|
write: str
|
|
writePrivilege: str
|
|
|
|
@dataclass
|
|
class Quality:
|
|
changeOmitted: str
|
|
nullable: str
|
|
scene: str
|
|
persistence: str
|
|
reportable: str
|
|
sourceAttribution: str = None
|
|
quieterReporting: str = None
|
|
|
|
@dataclass
|
|
class Constraint:
|
|
type: str
|
|
from_: str = None
|
|
to_: str = None
|
|
value: str = None
|
|
|
|
def to_dict(self):
|
|
result = {"type": self.type}
|
|
|
|
if not self.type:
|
|
return result
|
|
|
|
if self.type == "min":
|
|
if self.value:
|
|
result["min"] = self.value
|
|
elif self.type == "max":
|
|
if self.value:
|
|
result["max"] = self.value
|
|
elif self.type == "maxLength":
|
|
if self.value:
|
|
result["maxLength"] = self.value
|
|
elif self.type == "between":
|
|
if self.from_:
|
|
result["min"] = self.from_
|
|
if self.to_:
|
|
result["max"] = self.to_
|
|
elif self.type == "desc":
|
|
if self.value:
|
|
result["description"] = self.value
|
|
else:
|
|
# For other constraint types
|
|
if self.value:
|
|
result["value"] = self.value
|
|
|
|
return result
|
|
|
|
def __init__(
|
|
self,
|
|
name,
|
|
id,
|
|
type_,
|
|
default_value,
|
|
is_mandatory,
|
|
access=None,
|
|
quality=None,
|
|
constraint=None,
|
|
):
|
|
super().__init__(name, id, type_, is_mandatory, default_value)
|
|
self.conformance = None
|
|
self.max_value = None
|
|
self.min_value = None
|
|
|
|
# Store access, quality, and constraint information
|
|
self.access = access
|
|
self.quality = quality
|
|
self.constraint = constraint
|
|
self.internally_managed = False
|
|
self.is_nullable = (
|
|
self.quality is not None
|
|
and getattr(self.quality, "nullable", None) is not None
|
|
and self.quality.nullable.lower() == "true"
|
|
)
|
|
|
|
def get_flag(self):
|
|
flags = []
|
|
if self.access is not None:
|
|
write_val = getattr(self.access, "write", None)
|
|
if write_val and write_val.lower() in ("true", "optional"):
|
|
flags.append(self.AttributeFlags.ATTRIBUTE_FLAG_WRITABLE)
|
|
if self.internally_managed:
|
|
flags.append(self.AttributeFlags.ATTRIBUTE_FLAG_MANAGED_INTERNALLY)
|
|
|
|
if self.quality is not None:
|
|
nullable_val = getattr(self.quality, "nullable", None)
|
|
if nullable_val and nullable_val.lower() == "true":
|
|
flags.append(self.AttributeFlags.ATTRIBUTE_FLAG_NULLABLE)
|
|
persistence_val = getattr(self.quality, "persistence", None)
|
|
if persistence_val and persistence_val.lower() == "nonvolatile":
|
|
flags.append(self.AttributeFlags.ATTRIBUTE_FLAG_NONVOLATILE)
|
|
|
|
return " | ".join(flags) if flags else self.AttributeFlags.ATTRIBUTE_FLAG_NONE
|
|
|
|
def get_default_value_type(self):
|
|
"""Get the ESP type for the default value"""
|
|
value = self.get_default_value()
|
|
if isinstance(value, bool):
|
|
return "bool"
|
|
if isinstance(value, int):
|
|
if value <= 255:
|
|
return "uint8_t"
|
|
elif value <= 65535:
|
|
return "uint16_t"
|
|
return "uint32_t"
|
|
return "uint32_t"
|
|
|
|
def get_default_value(self):
|
|
"""Get the default value of the attribute"""
|
|
return self._convert_default_values()
|
|
|
|
def get_type(self):
|
|
"""Get the ESP type for the attribute"""
|
|
return AttributeType(self.type).get_attribute_type()
|
|
|
|
def _convert_default_values(self):
|
|
"""Convert the default value to known values"""
|
|
if self.type == "bool":
|
|
if self.default_value is None:
|
|
return "false"
|
|
return "true" if self.default_value.lower() in ("true", "1") else "false"
|
|
|
|
if self.type in ("string", "octstr"):
|
|
if (
|
|
self.constraint is not None
|
|
and getattr(self.constraint, "type", None) == "maxLength"
|
|
and getattr(self.constraint, "value", None) is not None
|
|
):
|
|
return int(self.constraint.value)
|
|
return 0
|
|
|
|
if self.type == "list":
|
|
if (
|
|
self.constraint is not None
|
|
and getattr(self.constraint, "value", None)
|
|
and self.constraint.value.isdigit()
|
|
):
|
|
return int(self.constraint.value)
|
|
return 0
|
|
|
|
if "enum" in self.type.lower() or "bitmap" in self.type.lower():
|
|
if self.default_value is not None:
|
|
return convert_to_int(self.default_value, default="0")
|
|
return "0"
|
|
|
|
if self.default_value is not None and "°" in self.default_value:
|
|
default_value = self.default_value.split("°")[0]
|
|
if default_value.isdigit():
|
|
return int(default_value) * 100
|
|
return 0
|
|
|
|
if self.default_value is not None and not self.default_value.isdigit():
|
|
first_part = self.default_value.split(" ")[0]
|
|
if first_part.isdigit():
|
|
return int(first_part)
|
|
|
|
return convert_to_int(self.default_value, default="0")
|
|
|
|
def get_max_value(self):
|
|
"""Get the max value of the attribute"""
|
|
return self.max_value
|
|
|
|
def get_min_value(self):
|
|
"""Get the min value of the attribute"""
|
|
return self.min_value
|
|
|
|
def to_dict(self, attribute_map=None):
|
|
"""Convert attribute object to dictionary representation"""
|
|
from .serializers import AttributeSerializer
|
|
|
|
return AttributeSerializer.to_dict(self, attribute_map)
|
|
|
|
|
|
class Cluster(BaseCluster):
|
|
@dataclass(frozen=True)
|
|
class ClusterFlags:
|
|
CLUSTER_FLAG_NONE: str = "CLUSTER_FLAG_NONE"
|
|
CLUSTER_FLAG_INIT_FUNCTION: str = "CLUSTER_FLAG_INIT_FUNCTION"
|
|
CLUSTER_FLAG_ATTRIBUTE_CHANGED_FUNCTION: str = (
|
|
"CLUSTER_FLAG_ATTRIBUTE_CHANGED_FUNCTION"
|
|
)
|
|
CLUSTER_FLAG_SHUTDOWN_FUNCTION: str = "CLUSTER_FLAG_SHUTDOWN_FUNCTION"
|
|
CLUSTER_FLAG_PRE_ATTRIBUTE_CHANGED_FUNCTION: str = (
|
|
"CLUSTER_FLAG_PRE_ATTRIBUTE_CHANGED_FUNCTION"
|
|
)
|
|
CLUSTER_FLAG_SERVER: str = "CLUSTER_FLAG_SERVER"
|
|
CLUSTER_FLAG_CLIENT: str = "CLUSTER_FLAG_CLIENT"
|
|
|
|
def __init__(self, name, id, revision, is_mandatory=False):
|
|
super().__init__(name, id, revision, is_mandatory)
|
|
self.attributes = set()
|
|
self.commands = set()
|
|
self.events = set()
|
|
self.features = set()
|
|
self.conformance = None
|
|
self.revision_history = []
|
|
self.data_types = {}
|
|
# Classification details
|
|
self.role = "application" # Default value
|
|
self.hierarchy = None
|
|
self.pics_code = None
|
|
self.scope = None
|
|
self.base_cluster_name = None
|
|
self.is_migrated_cluster = False
|
|
self.skip_command_cb = False
|
|
|
|
def get_callback_functions(self):
|
|
"""Get the callback functions for the cluster"""
|
|
callback_functions = []
|
|
if self.init_function_available:
|
|
callback_functions.append(
|
|
f"emberAf{self.chip_name}ClusterServerInitCallback"
|
|
)
|
|
if self.attribute_changed_function_available:
|
|
callback_functions.append(
|
|
f"Matter{self.chip_name}ClusterServerAttributeChangedCallback"
|
|
)
|
|
if self.shutdown_function_available:
|
|
callback_functions.append(
|
|
f"Matter{self.chip_name}ClusterServerShutdownCallback"
|
|
)
|
|
if self.pre_attribute_change_function_available:
|
|
callback_functions.append(
|
|
f"Matter{self.chip_name}ClusterServerPreAttributeChangedCallback"
|
|
)
|
|
return callback_functions
|
|
|
|
def get_plugin_server_init_callback(self):
|
|
"""Get the plugin server init callback for the cluster"""
|
|
if not self.plugin_init_cb_available or should_skip_plugin_callback(self.id):
|
|
return None
|
|
if "_cluster" in self.name.lower():
|
|
cluster_name = self.name.split("_Cluster")[0]
|
|
return f"Matter{cluster_name}PluginServerInitCallback"
|
|
else:
|
|
return f"Matter{self.chip_name}PluginServerInitCallback"
|
|
|
|
def get_delegate_init_callback(self):
|
|
"""Get the delegate init callback for the cluster"""
|
|
if self.delegate_init_callback_available and not should_skip_delegate_callback(
|
|
self.id
|
|
):
|
|
return f"{self.chip_name}DelegateInitCB"
|
|
if should_include_delegate_callback(self.id):
|
|
return f"{self.chip_name}DelegateInitCB"
|
|
return None
|
|
|
|
def get_attributes(self):
|
|
"""Get all attributes sorted by attribute id, then by name if ids match"""
|
|
attributes = list(self.attributes)
|
|
attributes.sort(key=lambda x: (int(x.get_id(), 16), x.name))
|
|
return attributes
|
|
|
|
def get_commands(self):
|
|
"""Get all commands sorted by command id, then by name if ids match"""
|
|
commands = list(self.commands)
|
|
commands.sort(key=lambda x: (int(x.get_id(), 16), x.name))
|
|
return commands
|
|
|
|
def get_events(self) -> List[BaseEvent]:
|
|
"""Get all events sorted by event id, then by name if ids match"""
|
|
events = list(self.events)
|
|
events.sort(key=lambda x: (int(x.get_id(), 16), x.name))
|
|
return events
|
|
|
|
def get_features(self):
|
|
"""Get all features sorted by feature id"""
|
|
features = list(self.features)
|
|
features.sort(key=lambda x: int(x.get_id(), 16))
|
|
return features
|
|
|
|
def _get_mandatory_elements(self, elements):
|
|
"""Helper to filter mandatory elements with no conformance condition."""
|
|
result = []
|
|
for elem in elements:
|
|
if not elem.is_mandatory:
|
|
continue
|
|
conformance = getattr(elem, "conformance", None)
|
|
if conformance is None:
|
|
continue
|
|
condition = (
|
|
conformance.get("condition") if isinstance(conformance, dict) else None
|
|
)
|
|
if condition is None:
|
|
result.append(elem)
|
|
return (
|
|
sorted(result, key=lambda x: (int(x.get_id(), 16), x.name))
|
|
if result
|
|
else result
|
|
)
|
|
|
|
def get_mandatory_attributes(self):
|
|
"""Get only mandatory attributes from the attribute list"""
|
|
return self._get_mandatory_elements(self.attributes)
|
|
|
|
def get_mandatory_commands(self):
|
|
"""Get only mandatory commands from the command list"""
|
|
return self._get_mandatory_elements(self.commands)
|
|
|
|
def get_mandatory_events(self):
|
|
"""Get only mandatory events from the event list"""
|
|
return self._get_mandatory_elements(self.events)
|
|
|
|
def get_function_flags(self):
|
|
"""Get the function flags for the cluster"""
|
|
flags = []
|
|
if self.server_cluster:
|
|
flags.append(self.ClusterFlags.CLUSTER_FLAG_SERVER)
|
|
if self.client_cluster:
|
|
flags.append(self.ClusterFlags.CLUSTER_FLAG_CLIENT)
|
|
if self.init_function_available:
|
|
flags.append(self.ClusterFlags.CLUSTER_FLAG_INIT_FUNCTION)
|
|
if self.attribute_changed_function_available:
|
|
flags.append(self.ClusterFlags.CLUSTER_FLAG_ATTRIBUTE_CHANGED_FUNCTION)
|
|
if self.shutdown_function_available:
|
|
flags.append(self.ClusterFlags.CLUSTER_FLAG_SHUTDOWN_FUNCTION)
|
|
if self.pre_attribute_change_function_available:
|
|
flags.append(self.ClusterFlags.CLUSTER_FLAG_PRE_ATTRIBUTE_CHANGED_FUNCTION)
|
|
|
|
if len(flags) > 0:
|
|
return " | ".join(flags)
|
|
return self.ClusterFlags.CLUSTER_FLAG_NONE
|
|
|
|
def to_dict(self):
|
|
"""Convert cluster object to dictionary representation"""
|
|
from .serializers import ClusterSerializer
|
|
|
|
return ClusterSerializer.to_dict(self)
|