Files
Mahesh Pimpale 42075d5c75 components/esp_matter: generated data model using automated script
- data_model/legacy/: moved old data model to this folder
- data_model/generated/: contain the automatically generated data model
- tools/data_model_gen: contains the script to generate the data model
2026-04-15 17:05:50 +05:30

581 lines
20 KiB
Python

# Copyright 2026 Espressif Systems (Shanghai) PTE LTD
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from dataclasses import dataclass
from typing import List, Any
from .attribute_type import AttributeType
from utils.base_elements import (
BaseDevice,
BaseEvent,
BaseFeature,
BaseCommand,
BaseAttribute,
BaseCluster,
)
from utils.overrides import (
should_skip_delegate_callback,
should_skip_plugin_callback,
should_include_delegate_callback,
)
from utils.conversion_utils import convert_to_int
from .serializers import (
DeviceSerializer,
EventSerializer,
FeatureSerializer,
)
logger = logging.getLogger(__name__)
class Device(BaseDevice):
"""Device class that inherits from BaseDevice."""
def __init__(self, id, name, revision):
super().__init__(id=id, name=name, revision=revision)
self.clusters = set()
self.features = set()
self.commands = set()
self.attributes = set()
self.classification = {}
self.conformance = None
self.filename = self.esp_name + "_device"
self.revision_history = []
self.conditions = []
def add_feature(self, feature: str):
self.features.add(feature)
def add_command(self, command: str):
self.commands.add(command)
def add_attribute(self, attribute: str):
self.attributes.add(attribute)
def get_clusters(self) -> List[BaseCluster]:
return sorted(
self.clusters, key=lambda x: (int(x.get_id(), 16), not x.server_cluster)
)
def get_unique_clusters(self):
unique_clusters_dict = {}
for cluster in self.clusters:
cluster_id = cluster.get_id()
if cluster_id not in unique_clusters_dict:
unique_clusters_dict[cluster_id] = cluster
unique_clusters = list(unique_clusters_dict.values())
return sorted(
unique_clusters, key=lambda x: (int(x.get_id(), 16), not x.server_cluster)
)
def to_dict(self):
return DeviceSerializer.to_dict(self)
class Event(BaseEvent):
def __init__(self, id, name, is_mandatory):
super().__init__(name, id, is_mandatory)
self.conformance = None
def to_dict(self, attribute_map=None):
"""Convert event object to dictionary representation"""
return EventSerializer.to_dict(self, attribute_map)
class Feature(BaseFeature):
def __init__(self, name, code, id):
super().__init__(name, id, is_mandatory=False)
self.code = code
self.command_set = set()
self.attribute_set = set()
self.event_set = set()
self.summary = None
self.conformance = None
def add_attribute_list(self, attributes: set):
if attributes:
self.attribute_set.update(attributes)
def add_event_list(self, events: set):
if events:
self.event_set.update(events)
def get_attributes(self):
"""Returns the list of mandatory attributes for this feature"""
attr_list = list(self.attribute_set)
if len(attr_list) > 0:
attr_list.sort(key=lambda x: int(x.get_id(), 16))
return attr_list
def get_events(self) -> List[BaseEvent]:
"""Returns the list of mandatory events for this feature"""
event_list = list(self.event_set)
if len(event_list) > 0:
event_list.sort(key=lambda x: int(x.get_id(), 16))
return event_list
def add_command_list(self, commands):
if commands is not None:
self.command_set.update(commands)
def get_commands(self):
command_list = list(self.command_set)
if len(command_list) > 0:
command_list.sort(key=lambda x: int(x.get_id(), 16))
return command_list
def to_dict(self, attribute_map=None):
"""Convert feature object to dictionary representation"""
return FeatureSerializer.to_dict(self, attribute_map)
class Command(BaseCommand):
@dataclass(frozen=True)
class CommandFlags:
COMMAND_FLAG_NONE: str = "COMMAND_FLAG_NONE"
COMMAND_FLAG_CUSTOM: str = "COMMAND_FLAG_CUSTOM"
COMMAND_FLAG_ACCEPTED: str = "COMMAND_FLAG_ACCEPTED"
COMMAND_FLAG_GENERATED: str = "COMMAND_FLAG_GENERATED"
@dataclass
class CommandAccess:
invokePrivilege: str
timed: bool
fabric_scoped: bool
@dataclass
class CommandField:
id: Any
name: str
type_: str
default_value: Any = None
is_mandatory: bool = False
constraint: Any = None
def __init__(self, id, name, direction, response, is_mandatory):
super().__init__(
(
name.split(" ")[0]
if len(name.split(" ")) > 1 and name.split(" ")[1] == "Command"
else name
),
id,
is_mandatory,
direction,
response,
)
self.feature_list = set()
self.access = None
self.conformance = None
self.fields = [] # List of CommandField objects
self.feature_map = {}
self.skip_command_cb = False
self.command_handler_available = False
def set_access(self, access):
self.access = access
def set_conformance(self, conformance):
self.conformance = conformance
def add_field(self, field):
self.fields.append(field)
def get_flag(self):
if self.direction and self.direction.lower() == "commandtoserver":
return self.CommandFlags.COMMAND_FLAG_ACCEPTED
elif self.direction and self.direction.lower() == "responsefromserver":
return self.CommandFlags.COMMAND_FLAG_GENERATED
return self.CommandFlags.COMMAND_FLAG_NONE
def callback_required(self):
"""Determine if a command requires a callback"""
if self.command_handler_available:
return False
# If command is part of a cluster file with multiple cluster ids e.g. ResourceMonitoring
if self.skip_command_cb:
return False
# Skip callbacks for client-bound commands
if self.direction is not None and self.direction.lower() != "commandtoserver":
return False
# Commands with response='Y' or specific response commands need callbacks
if self.response is None or self.response == "N":
return False
# Check if this is a response command (ends with 'Response')
if self.name and self.name.endswith("Response"):
return False
return True
def to_dict(self, attribute_map=None):
"""Convert command object to dictionary representation"""
from .serializers import CommandSerializer
return CommandSerializer.to_dict(self, attribute_map)
class Attribute(BaseAttribute):
@dataclass(frozen=True)
class AttributeFlags:
ATTRIBUTE_FLAG_NONE: str = "ATTRIBUTE_FLAG_NONE"
ATTRIBUTE_FLAG_WRITABLE: str = "ATTRIBUTE_FLAG_WRITABLE"
ATTRIBUTE_FLAG_NONVOLATILE: str = "ATTRIBUTE_FLAG_NONVOLATILE"
ATTRIBUTE_FLAG_MIN_MAX: str = "ATTRIBUTE_FLAG_MIN_MAX"
ATTRIBUTE_FLAG_MUST_USE_TIMED_WRITE: str = "ATTRIBUTE_FLAG_MUST_USE_TIMED_WRITE"
ATTRIBUTE_FLAG_EXTERNAL_STORAGE: str = "ATTRIBUTE_FLAG_EXTERNAL_STORAGE"
ATTRIBUTE_FLAG_SINGLETON: str = "ATTRIBUTE_FLAG_SINGLETON"
ATTRIBUTE_FLAG_NULLABLE: str = "ATTRIBUTE_FLAG_NULLABLE"
ATTRIBUTE_FLAG_OVERRIDE: str = "ATTRIBUTE_FLAG_OVERRIDE"
ATTRIBUTE_FLAG_DEFERRED: str = "ATTRIBUTE_FLAG_DEFERRED"
ATTRIBUTE_FLAG_MANAGED_INTERNALLY: str = "ATTRIBUTE_FLAG_MANAGED_INTERNALLY"
@dataclass
class Access:
read: str
readPrivilege: str
write: str
writePrivilege: str
@dataclass
class Quality:
changeOmitted: str
nullable: str
scene: str
persistence: str
reportable: str
sourceAttribution: str = None
quieterReporting: str = None
@dataclass
class Constraint:
type: str
from_: str = None
to_: str = None
value: str = None
def to_dict(self):
result = {"type": self.type}
if not self.type:
return result
if self.type == "min":
if self.value:
result["min"] = self.value
elif self.type == "max":
if self.value:
result["max"] = self.value
elif self.type == "maxLength":
if self.value:
result["maxLength"] = self.value
elif self.type == "between":
if self.from_:
result["min"] = self.from_
if self.to_:
result["max"] = self.to_
elif self.type == "desc":
if self.value:
result["description"] = self.value
else:
# For other constraint types
if self.value:
result["value"] = self.value
return result
def __init__(
self,
name,
id,
type_,
default_value,
is_mandatory,
access=None,
quality=None,
constraint=None,
):
super().__init__(name, id, type_, is_mandatory, default_value)
self.conformance = None
self.max_value = None
self.min_value = None
# Store access, quality, and constraint information
self.access = access
self.quality = quality
self.constraint = constraint
self.internally_managed = False
self.is_nullable = (
self.quality is not None
and getattr(self.quality, "nullable", None) is not None
and self.quality.nullable.lower() == "true"
)
def get_flag(self):
flags = []
if self.access is not None:
write_val = getattr(self.access, "write", None)
if write_val and write_val.lower() in ("true", "optional"):
flags.append(self.AttributeFlags.ATTRIBUTE_FLAG_WRITABLE)
if self.internally_managed:
flags.append(self.AttributeFlags.ATTRIBUTE_FLAG_MANAGED_INTERNALLY)
if self.quality is not None:
nullable_val = getattr(self.quality, "nullable", None)
if nullable_val and nullable_val.lower() == "true":
flags.append(self.AttributeFlags.ATTRIBUTE_FLAG_NULLABLE)
persistence_val = getattr(self.quality, "persistence", None)
if persistence_val and persistence_val.lower() == "nonvolatile":
flags.append(self.AttributeFlags.ATTRIBUTE_FLAG_NONVOLATILE)
return " | ".join(flags) if flags else self.AttributeFlags.ATTRIBUTE_FLAG_NONE
def get_default_value_type(self):
"""Get the ESP type for the default value"""
value = self.get_default_value()
if isinstance(value, bool):
return "bool"
if isinstance(value, int):
if value <= 255:
return "uint8_t"
elif value <= 65535:
return "uint16_t"
return "uint32_t"
return "uint32_t"
def get_default_value(self):
"""Get the default value of the attribute"""
return self._convert_default_values()
def get_type(self):
"""Get the ESP type for the attribute"""
return AttributeType(self.type).get_attribute_type()
def _convert_default_values(self):
"""Convert the default value to known values"""
if self.type == "bool":
if self.default_value is None:
return "false"
return "true" if self.default_value.lower() in ("true", "1") else "false"
if self.type in ("string", "octstr"):
if (
self.constraint is not None
and getattr(self.constraint, "type", None) == "maxLength"
and getattr(self.constraint, "value", None) is not None
):
return int(self.constraint.value)
return 0
if self.type == "list":
if (
self.constraint is not None
and getattr(self.constraint, "value", None)
and self.constraint.value.isdigit()
):
return int(self.constraint.value)
return 0
if "enum" in self.type.lower() or "bitmap" in self.type.lower():
if self.default_value is not None:
return convert_to_int(self.default_value, default="0")
return "0"
if self.default_value is not None and "°" in self.default_value:
default_value = self.default_value.split("°")[0]
if default_value.isdigit():
return int(default_value) * 100
return 0
if self.default_value is not None and not self.default_value.isdigit():
first_part = self.default_value.split(" ")[0]
if first_part.isdigit():
return int(first_part)
return convert_to_int(self.default_value, default="0")
def get_max_value(self):
"""Get the max value of the attribute"""
return self.max_value
def get_min_value(self):
"""Get the min value of the attribute"""
return self.min_value
def to_dict(self, attribute_map=None):
"""Convert attribute object to dictionary representation"""
from .serializers import AttributeSerializer
return AttributeSerializer.to_dict(self, attribute_map)
class Cluster(BaseCluster):
@dataclass(frozen=True)
class ClusterFlags:
CLUSTER_FLAG_NONE: str = "CLUSTER_FLAG_NONE"
CLUSTER_FLAG_INIT_FUNCTION: str = "CLUSTER_FLAG_INIT_FUNCTION"
CLUSTER_FLAG_ATTRIBUTE_CHANGED_FUNCTION: str = (
"CLUSTER_FLAG_ATTRIBUTE_CHANGED_FUNCTION"
)
CLUSTER_FLAG_SHUTDOWN_FUNCTION: str = "CLUSTER_FLAG_SHUTDOWN_FUNCTION"
CLUSTER_FLAG_PRE_ATTRIBUTE_CHANGED_FUNCTION: str = (
"CLUSTER_FLAG_PRE_ATTRIBUTE_CHANGED_FUNCTION"
)
CLUSTER_FLAG_SERVER: str = "CLUSTER_FLAG_SERVER"
CLUSTER_FLAG_CLIENT: str = "CLUSTER_FLAG_CLIENT"
def __init__(self, name, id, revision, is_mandatory=False):
super().__init__(name, id, revision, is_mandatory)
self.attributes = set()
self.commands = set()
self.events = set()
self.features = set()
self.conformance = None
self.revision_history = []
self.data_types = {}
# Classification details
self.role = "application" # Default value
self.hierarchy = None
self.pics_code = None
self.scope = None
self.base_cluster_name = None
self.is_migrated_cluster = False
self.skip_command_cb = False
def get_callback_functions(self):
"""Get the callback functions for the cluster"""
callback_functions = []
if self.init_function_available:
callback_functions.append(
f"emberAf{self.chip_name}ClusterServerInitCallback"
)
if self.attribute_changed_function_available:
callback_functions.append(
f"Matter{self.chip_name}ClusterServerAttributeChangedCallback"
)
if self.shutdown_function_available:
callback_functions.append(
f"Matter{self.chip_name}ClusterServerShutdownCallback"
)
if self.pre_attribute_change_function_available:
callback_functions.append(
f"Matter{self.chip_name}ClusterServerPreAttributeChangedCallback"
)
return callback_functions
def get_plugin_server_init_callback(self):
"""Get the plugin server init callback for the cluster"""
if not self.plugin_init_cb_available or should_skip_plugin_callback(self.id):
return None
if "_cluster" in self.name.lower():
cluster_name = self.name.split("_Cluster")[0]
return f"Matter{cluster_name}PluginServerInitCallback"
else:
return f"Matter{self.chip_name}PluginServerInitCallback"
def get_delegate_init_callback(self):
"""Get the delegate init callback for the cluster"""
if self.delegate_init_callback_available and not should_skip_delegate_callback(
self.id
):
return f"{self.chip_name}DelegateInitCB"
if should_include_delegate_callback(self.id):
return f"{self.chip_name}DelegateInitCB"
return None
def get_attributes(self):
"""Get all attributes sorted by attribute id, then by name if ids match"""
attributes = list(self.attributes)
attributes.sort(key=lambda x: (int(x.get_id(), 16), x.name))
return attributes
def get_commands(self):
"""Get all commands sorted by command id, then by name if ids match"""
commands = list(self.commands)
commands.sort(key=lambda x: (int(x.get_id(), 16), x.name))
return commands
def get_events(self) -> List[BaseEvent]:
"""Get all events sorted by event id, then by name if ids match"""
events = list(self.events)
events.sort(key=lambda x: (int(x.get_id(), 16), x.name))
return events
def get_features(self):
"""Get all features sorted by feature id"""
features = list(self.features)
features.sort(key=lambda x: int(x.get_id(), 16))
return features
def _get_mandatory_elements(self, elements):
"""Helper to filter mandatory elements with no conformance condition."""
result = []
for elem in elements:
if not elem.is_mandatory:
continue
conformance = getattr(elem, "conformance", None)
if conformance is None:
continue
condition = (
conformance.get("condition") if isinstance(conformance, dict) else None
)
if condition is None:
result.append(elem)
return (
sorted(result, key=lambda x: (int(x.get_id(), 16), x.name))
if result
else result
)
def get_mandatory_attributes(self):
"""Get only mandatory attributes from the attribute list"""
return self._get_mandatory_elements(self.attributes)
def get_mandatory_commands(self):
"""Get only mandatory commands from the command list"""
return self._get_mandatory_elements(self.commands)
def get_mandatory_events(self):
"""Get only mandatory events from the event list"""
return self._get_mandatory_elements(self.events)
def get_function_flags(self):
"""Get the function flags for the cluster"""
flags = []
if self.server_cluster:
flags.append(self.ClusterFlags.CLUSTER_FLAG_SERVER)
if self.client_cluster:
flags.append(self.ClusterFlags.CLUSTER_FLAG_CLIENT)
if self.init_function_available:
flags.append(self.ClusterFlags.CLUSTER_FLAG_INIT_FUNCTION)
if self.attribute_changed_function_available:
flags.append(self.ClusterFlags.CLUSTER_FLAG_ATTRIBUTE_CHANGED_FUNCTION)
if self.shutdown_function_available:
flags.append(self.ClusterFlags.CLUSTER_FLAG_SHUTDOWN_FUNCTION)
if self.pre_attribute_change_function_available:
flags.append(self.ClusterFlags.CLUSTER_FLAG_PRE_ATTRIBUTE_CHANGED_FUNCTION)
if len(flags) > 0:
return " | ".join(flags)
return self.ClusterFlags.CLUSTER_FLAG_NONE
def to_dict(self):
"""Convert cluster object to dictionary representation"""
from .serializers import ClusterSerializer
return ClusterSerializer.to_dict(self)