# Copyright 2026 Espressif Systems (Shanghai) PTE LTD # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from __future__ import annotations import logging from utils.base_elements import ( BaseCluster, BaseAttribute, BaseCommand, BaseEvent, BaseFeature, BaseDevice, ) from .conformance_codegen import Conformance, ConformanceDecision, FeatureConformance from typing import Dict, List, Tuple from utils.conversion_utils import convert_to_int from utils.overrides import ( get_overridden_cluster_init_callback_name, get_overridden_cluster_shutdown_callback_name, ) logger = logging.getLogger(__name__) def get_id_name_lambda(): """Returns a lambda function for sorting by ID and name""" return lambda x: (convert_to_int(x.get_id()), x.name) def get_choice_group( parent_feature_type: str, conformance_type: ConformanceDecision, features: List[Feature], ): """ Get groups of parent feature with their dependent features. Returns a list of dicts: { "parent_feature": , "dependent_features": [, ...], "constraint": "at_least_one" | "exact_one" | None } """ groups_by_key: Dict[Tuple[str, str], List] = {} for feature in features: if not feature.conformance: continue if feature.conformance.type != conformance_type: continue parent_name = getattr(feature.conformance, parent_feature_type, None) if not parent_name or not feature.conformance.choice: continue key = (parent_name, feature.conformance.choice.marker) if key not in groups_by_key: groups_by_key[key] = [] groups_by_key[key].append(feature) result = [] for (parent_name, _marker), dependents in groups_by_key.items(): parent = next( (f for f in features if f.func_name == parent_name), None, ) if not parent: continue first = dependents[0] if first.conformance.is_at_least_one(): constraint = "at_least_one" elif first.conformance.is_exact_one(): constraint = "exact_one" else: constraint = None result.append( { "parent_feature": parent, "dependent_features": sorted(dependents, key=get_id_name_lambda()), "constraint": constraint, } ) return result class Cluster(BaseCluster): """Cluster class that inherits from BaseCluster""" def __init__(self, name, id, revision, is_mandatory): super().__init__(name=name, id=id, revision=revision, is_mandatory=is_mandatory) self.attributes = [] self.commands = [] self.events = [] self.features = [] self.function_flags = "" self.is_migrated_cluster = False self.is_base_cluster = False def get_attributes(self): """Get the list of attributes sorted by ID and name""" return sorted(self.attributes, key=get_id_name_lambda()) def get_commands(self): """Get the list of commands sorted by ID and name""" return sorted(self.commands, key=get_id_name_lambda()) def get_events(self) -> List[BaseEvent]: """Get the list of events sorted by ID and name""" return sorted(self.events, key=get_id_name_lambda()) def get_features(self): """Get the list of features sorted by ID and name""" return sorted(self.features, key=get_id_name_lambda()) def get_mandatory_attributes(self): """Get the list of mandatory attributes Attribute is mandatory: - if it is marked as mandatory in the cluster JSON file - has no conformance condition or there is no NOT TERM present in the conformance condition NOTE: NOT TERM indicate we have to create attribute by default while creating the cluster """ mandatory_attributes = [] for attr in self.get_attributes(): if attr.is_mandatory and ( attr.conformance.get_mandatory_condition() is None or attr.conformance.is_not_term_present ): mandatory_attributes.append(attr) return mandatory_attributes def get_mandatory_commands(self): """Get the list of mandatory commands Command is mandatory: - if it is marked as mandatory in the cluster JSON file - has no conformance condition or there is no NOT TERM present in the conformance condition NOTE: NOT TERM indicate we have to create command by default while creating the cluster """ mandatory_commands = [] for cmd in self.get_commands(): if cmd.is_mandatory and ( cmd.conformance.get_mandatory_condition() is None or cmd.conformance.is_not_term_present ): mandatory_commands.append(cmd) return mandatory_commands def get_mandatory_events(self): """Get the list of mandatory events Event is mandatory: - if it is marked as mandatory in the cluster JSON file - has no conformance condition or there is no NOT TERM present in the conformance condition NOTE: NOT TERM indicate we have to create event by default while creating the cluster """ mandatory_events = [] for event in self.get_events(): if event.is_mandatory and ( event.conformance.get_mandatory_condition() is None or event.conformance.is_not_term_present ): mandatory_events.append(event) return mandatory_events def get_all_exact_one_features(self): """Get all features that require exactly one feature to be supported""" feature_list = {} for feature in self.features: if not feature.conformance: continue if feature.conformance.is_exact_one(): key = feature.conformance.choice.marker if key not in feature_list: feature_list[key] = [] feature_list[key].append(feature) return feature_list def get_all_at_least_one_features(self) -> Dict: """Get all features that require at least one feature to be supported""" feature_list = {} for feature in self.features: if not feature.conformance: continue if feature.conformance.is_at_least_one(): key = feature.conformance.choice.marker if key not in feature_list: feature_list[key] = [] feature_list[key].append(feature) return feature_list def has_choice_features(self): """Check if the cluster has any choice features""" return ( len(self.get_all_exact_one_features()) > 0 or len(self.get_all_at_least_one_features()) > 0 ) def get_otherwise_choice_groups(self) -> List[Dict]: return get_choice_group( "mandatory_parent", ConformanceDecision.OTHERWISE, self.features ) def get_optional_choice_groups(self) -> List[Dict]: return get_choice_group( "optional_parent", ConformanceDecision.OPTIONAL, self.features ) def get_mandatory_choice_groups(self) -> List[Dict]: return get_choice_group( "mandatory_parent", ConformanceDecision.MANDATORY, self.features ) def get_choice_group_feature_set(self) -> set: """Set of features that are parent or dependent in any otherwise choice group.""" feature_set = set() for group in self.get_otherwise_choice_groups(): feature_set.add(group["parent_feature"]) feature_set.update(group["dependent_features"]) for group in self.get_optional_choice_groups(): feature_set.update(group["dependent_features"]) for group in self.get_mandatory_choice_groups(): feature_set.add(group["parent_feature"]) feature_set.update(group["dependent_features"]) return feature_set def get_standalone_choice_groups(self) -> List[Tuple[str, List]]: """ Choice groups that do not have a mandatory parent (not otherwise groups). Returns list of (constraint_type, features) where constraint_type is "exact_one", or "at_least_one". """ all_choice_set = self.get_choice_group_feature_set() choice_group_list = [] for marker, features in self.get_all_exact_one_features().items(): if not all(f in all_choice_set for f in features): choice_group_list.append(("exact_one", features)) for marker, features in self.get_all_at_least_one_features().items(): if not all(f in all_choice_set for f in features): choice_group_list.append(("at_least_one", features)) return choice_group_list def get_independent_features(self) -> List: """Features that are not part of any otherwise choice group (parent or dependent).""" choice_set = self.get_choice_group_feature_set() choice_groups_list = self.get_standalone_choice_groups() choice_groups_set = set() for type, features in choice_groups_list: choice_groups_set.update(features) final_choice_set = choice_set | choice_groups_set return [f for f in self.get_features() if f not in final_choice_set] def get_cluster_init_callback(self): """Get the cluster init callback name""" return get_overridden_cluster_init_callback_name(self.id, self.chip_name) def get_cluster_shutdown_callback(self): """Get the cluster shutdown callback name""" return get_overridden_cluster_shutdown_callback_name(self.id, self.chip_name) def get_response_command(self, command_name: str): """Get the response command for a given command name""" for command in self.commands: if command.name == command_name: return command return None def get_destroyable_elements(self, feature_name: str): """Get the list of destroyable elements for a given feature name""" elements = { "attributes": [], "commands": [], "events": [], } for attribute in self.attributes: conformance_condition = ( attribute.conformance.get_mandatory_condition() if attribute.conformance and attribute.conformance.get_mandatory_condition() is not None else "" ) if ( attribute.conformance.is_not_term_present and f"!(has_feature({feature_name})" in conformance_condition ): elements["attributes"].append(attribute) for command in self.commands: conformance_condition = ( command.conformance.get_mandatory_condition() if command.conformance and command.conformance.get_mandatory_condition() is not None else "" ) if ( command.conformance.is_not_term_present and f"!(has_feature({feature_name})" in conformance_condition ): elements["commands"].append(command) for event in self.events: conformance_condition = ( event.conformance.get_mandatory_condition() if event.conformance and event.conformance.get_mandatory_condition() is not None else "" ) if ( event.conformance.is_not_term_present and f"!(has_feature({feature_name})" in conformance_condition ): elements["events"].append(event) return elements class Attribute(BaseAttribute): """Attribute class that inherits from BaseAttribute""" def __init__(self, name, id, type_, is_mandatory, default_value): super().__init__( name=name, id=id, type_=type_, is_mandatory=is_mandatory, default_value=default_value, ) self.converted_type = None self.is_nullable = False self._flag = None self.max_length = 0 # For string attributes self.min_value = None # For attribute bounds self.max_value = None # For attribute bounds self.conformance = Conformance() self.is_internally_managed = False self.is_complex = False def get_flag(self): """Get the attribute flags""" return self._flag def get_type(self): """Get the attribute type""" return self.converted_type def get_min_value(self): """Get the min value""" return self.min_value def get_max_value(self): """Get the max value""" if self.max_value is None: return self.get_default_value() return self.max_value def get_default_value(self): """Get the default value""" return self.default_value def get_default_value_type(self): """Get the ESP type for the default value""" value = self.get_default_value() int_value = convert_to_int(value) if int_value is None: return "uint32_t" elif int_value <= 255: return "uint8_t" elif int_value <= 65535: return "uint16_t" return "uint32_t" def get_conformance_condition(self): """Get the conformance condition""" return self.conformance.get_mandatory_condition() class Command(BaseCommand): """Command class that inherits from BaseCommand""" def __init__(self, name, id, is_mandatory, direction, response): super().__init__( name=name, id=id, is_mandatory=is_mandatory, direction=direction, response=response, ) self._flag = "" self.has_callback = False self.conformance = Conformance() self.is_fabric_scoped = False def get_flag(self): """Get the command flags""" return self._flag def get_conformance_condition(self): """Get the conformance condition""" return self.conformance.get_mandatory_condition() class Feature(BaseFeature): """Feature class that inherits from BaseFeature""" def __init__(self, name, id, code, is_mandatory): super().__init__(name=name, id=id, is_mandatory=is_mandatory) self.attributes = [] self.commands = [] self.events = [] self.conformance = FeatureConformance() self.code = code def get_attributes(self): """Get the list of attributes sorted by ID and name""" return sorted(self.attributes, key=get_id_name_lambda()) def get_externally_managed_attributes(self): """Get the list of attributes that are externally managed""" return [attr for attr in self.attributes if not attr.is_internally_managed] def get_commands(self): """Get the list of commands sorted by ID and name""" return sorted(self.commands, key=get_id_name_lambda()) def get_events(self) -> List[BaseEvent]: """Get the list of events sorted by ID and name""" return sorted(self.events, key=get_id_name_lambda()) def get_conformance_condition(self): """Get the conformance condition""" return self.conformance.get_optional_condition() class Event(BaseEvent): """Event class that inherits from BaseEvent""" def __init__(self, name, id, is_mandatory): super().__init__(name=name, id=id, is_mandatory=is_mandatory) self.priority = "Info" # Default priority self.conformance = Conformance() def get_conformance_condition(self): """Get the conformance condition""" return self.conformance.get_mandatory_condition() class Device(BaseDevice): """Device class that inherits from BaseDevice""" def __init__(self, id, name, revision): super().__init__(id=id, name=name, revision=revision) self.clusters = [] # List of Cluster objects def get_device_type_id(self): """Return the device type ID""" return self.id def get_device_type_version(self): """Get the device type version""" return self.revision def get_clusters(self): """Get all clusters sorted by ID and server/client type""" return sorted( self.clusters, key=lambda x: (convert_to_int(x.get_id()), not x.server_cluster), ) def binding_cluster_available(self) -> bool: """Check if a binding cluster is available""" return any(cluster.client_cluster for cluster in self.get_mandatory_clusters()) def get_mandatory_clusters(self): """Get all mandatory clusters""" return [cluster for cluster in self.clusters if cluster.is_mandatory] def get_unique_mandatory_clusters(self): """Get all unique mandatory clusters""" return [ cluster for cluster in self.get_unique_clusters() if cluster.is_mandatory ] def get_unique_clusters(self): """Get unique clusters (no duplicates) sorted by ID""" unique_clusters = [] seen_names = set() # Add the descriptor and binding clusters to the set of seen names as they are always present seen_names.add("descriptor") seen_names.add("binding") sorted_clusters = self.get_clusters() for cluster in sorted_clusters: if cluster.esp_name not in seen_names: unique_clusters.append(cluster) seen_names.add(cluster.esp_name) return unique_clusters