diff --git a/tools/idf_py_actions/mcp_ext.py b/tools/idf_py_actions/mcp_ext.py index cd59f74e83..51174e4555 100644 --- a/tools/idf_py_actions/mcp_ext.py +++ b/tools/idf_py_actions/mcp_ext.py @@ -1,20 +1,29 @@ -# SPDX-FileCopyrightText: 2024-2025 Espressif Systems (Shanghai) CO LTD +# SPDX-FileCopyrightText: 2024-2026 Espressif Systems (Shanghai) CO LTD # SPDX-License-Identifier: Apache-2.0 import json import os import subprocess import sys +from pathlib import Path from typing import Any from click.core import Context from idf_py_actions.errors import FatalError from idf_py_actions.tools import PropertyDict -from idf_py_actions.tools import get_build_context from idf_py_actions.tools import get_target from idf_py_actions.tools import idf_version +try: + from idf_component_tools.build_system_tools import CMAKE_PROJECT_LINE +except ImportError: + CMAKE_PROJECT_LINE = [ + r'include($ENV{IDF_PATH}/tools/cmake/project.cmake)', + r'include($ENV{IDF_PATH}/tools/cmakev2/idf.cmake)', + ] + + try: from mcp.server.fastmcp import FastMCP @@ -23,51 +32,98 @@ except ImportError: MCP_AVAILABLE = False +def is_valid_project_dir(directory: str) -> bool: + """ + Determine if the given directory is a valid ESP-IDF project directory. + - Must be a directory. + - Must contain a CMakeLists.txt. + - CMakeLists.txt must include CMAKE_PROJECT_LINE. + """ + root = Path(directory) + + if not root.is_dir(): + return False + cmakelists_path = root / 'CMakeLists.txt' + if not cmakelists_path.is_file(): + return False + + try: + with open(str(cmakelists_path), encoding='utf-8') as f: + for line in f: + if any(proj_line in line for proj_line in CMAKE_PROJECT_LINE): + return True + except Exception: + return False + + return False + + def action_extensions(base_actions: dict, project_path: str) -> dict: """ESP-IDF MCP Server Extension""" def start_mcp_server(action_name: str, ctx: Context, args: PropertyDict, **kwargs: Any) -> None: """Start MCP server for ESP-IDF project integration""" + if not MCP_AVAILABLE: - raise FatalError('MCP dependencies not available. Install with: ./install.sh --enable-mcp') + raise FatalError( + 'MCP dependencies not available. ' + 'Install ESP-IDF using the EIM installer and select the "mcp" feature to be included. ' + 'For more information, refer to the official Espressif EIM Installer documentation ' + 'or use "idf.py docs" and search for EIM configuration instructions.' + ) - current_project = None - # Use current working directory if available, fallback to project_path - for project_dir in [os.getcwd(), project_path]: - if project_dir and os.path.exists(os.path.join(project_dir, 'CMakeLists.txt')): - current_project = project_dir - break - - if not current_project: - raise FatalError('Open the MCP server in a valid ESP-IDF project directory.') - - print(f'Starting ESP-IDF MCP Server for project: {current_project}') - print(f'Target: {get_target(current_project)}') - print(f'ESP-IDF Version: {idf_version()}') - print(f'Working Directory: {os.getcwd()}') + # Verify that mcp-server was executed from a valid ESP-IDF project directory. + # This is necessary to obtain the correct context such as args, project path, etc. + if not is_valid_project_dir(project_path): + current_project = None + for candidate in [os.getcwd(), os.environ.get('IDF_MCP_WORKSPACE_FOLDER', '')]: + if is_valid_project_dir(candidate): + current_project = candidate + break + if not current_project: + raise FatalError('Open the MCP server in a valid ESP-IDF project directory.') + try: + cmd = [ + sys.executable, + os.path.join(os.environ['IDF_PATH'], 'tools', 'idf.py'), + '-C', + current_project, + 'mcp-server', + ] + print( + f'Starting ESP-IDF MCP Server with command: {" ".join(cmd)} in project path: {current_project}', + file=sys.stderr, + ) + subprocess.run(cmd, cwd=current_project, check=True) + return + except Exception as e: + print(f'ERROR: Failed to start ESP-IDF MCP Server: {str(e)}', file=sys.stderr) + raise FatalError(f'Failed to start ESP-IDF MCP Server: {str(e)}') from e # Initialize MCP server mcp = FastMCP('ESP-IDF') # === TOOLS (Actions) === @mcp.tool() - def build_project(target: str = 'all') -> str: - """Build ESP-IDF project with specified target""" + def build_project() -> str: + """Build ESP-IDF project""" try: - # Use the current project directory cmd = [ sys.executable, os.path.join(os.environ['IDF_PATH'], 'tools', 'idf.py'), - '-C', - current_project, 'build', ] - result = subprocess.run(cmd, capture_output=True, text=True, cwd=current_project) + # Information logs are shown in some mcp clients using stderr + print(f'INFO: Building project with command: {" ".join(cmd)} in path: {project_path}', file=sys.stderr) + result = subprocess.run(cmd, capture_output=True, text=True, cwd=project_path) if result.returncode == 0: - return f'Successfully built target: {target}' + print('INFO: Build successful', file=sys.stderr) + return 'Successfully built project' else: + print(f'ERROR: Build failed: {result.stderr}', file=sys.stderr) return f'Build failed: {result.stderr}' except Exception as e: + print(f'ERROR: Build failed: {str(e)}', file=sys.stderr) return f'Build failed: {str(e)}' @mcp.tool() @@ -77,17 +133,19 @@ def action_extensions(base_actions: dict, project_path: str) -> dict: cmd = [ sys.executable, os.path.join(os.environ['IDF_PATH'], 'tools', 'idf.py'), - '-C', - current_project, 'set-target', target, ] - result = subprocess.run(cmd, capture_output=True, text=True, cwd=current_project) + print(f'INFO: Setting target with command: {" ".join(cmd)} in path: {project_path}', file=sys.stderr) + result = subprocess.run(cmd, capture_output=True, text=True, cwd=project_path) if result.returncode == 0: + print(f'INFO: Target set to: {target}', file=sys.stderr) return f'Target set to: {target}' else: + print(f'ERROR: Failed to set target: {result.stderr}', file=sys.stderr) return f'Failed to set target: {result.stderr}' except Exception as e: + print(f'ERROR: Failed to set target: {str(e)}', file=sys.stderr) return f'Error setting target: {str(e)}' @mcp.tool() @@ -102,40 +160,20 @@ def action_extensions(base_actions: dict, project_path: str) -> dict: cmd = [ sys.executable, os.path.join(os.environ['IDF_PATH'], 'tools', 'idf.py'), - '-C', - current_project, ] + flash_args - result = subprocess.run(cmd, capture_output=True, text=True, cwd=current_project) + print(f'INFO: Flashing project with command: {" ".join(cmd)} in path: {project_path}', file=sys.stderr) + result = subprocess.run(cmd, capture_output=True, text=True, cwd=project_path) if result.returncode == 0: + print('INFO: Flash successful', file=sys.stderr) return f'Successfully flashed project{" to port " + port if port else ""}' else: + print(f'ERROR: Flash failed: {result.stderr}', file=sys.stderr) return f'Flash failed: {result.stderr}' except Exception as e: + print(f'ERROR: Flash failed: {str(e)}', file=sys.stderr) return f'Error flashing: {str(e)}' - @mcp.tool() - def monitor_serial(port: str | None = None) -> str: - """Start serial monitor (returns immediately, monitor runs in background)""" - try: - monitor_args = [] - if port: - monitor_args.extend(['-p', port]) - monitor_args.append('monitor') - - cmd = [ - sys.executable, - os.path.join(os.environ['IDF_PATH'], 'tools', 'idf.py'), - '-C', - current_project, - ] + monitor_args - - # Start monitor in background - subprocess.Popen(cmd, cwd=current_project) - return f'Serial monitor started{" on port " + port if port else ""}' - except Exception as e: - return f'Error starting monitor: {str(e)}' - @mcp.tool() def clean_project() -> str: """Clean build artifacts""" @@ -143,90 +181,64 @@ def action_extensions(base_actions: dict, project_path: str) -> dict: cmd = [ sys.executable, os.path.join(os.environ['IDF_PATH'], 'tools', 'idf.py'), - '-C', - current_project, 'clean', ] - result = subprocess.run(cmd, capture_output=True, text=True, cwd=current_project) + print(f'INFO: Cleaning project with command: {" ".join(cmd)} in path: {project_path}', file=sys.stderr) + result = subprocess.run(cmd, capture_output=True, text=True, cwd=project_path) if result.returncode == 0: + print('INFO: Project cleaned successfully', file=sys.stderr) return 'Project cleaned successfully' else: + print(f'ERROR: Clean failed: {result.stderr}', file=sys.stderr) return f'Clean failed: {result.stderr}' except Exception as e: + print(f'ERROR: Error cleaning: {str(e)}', file=sys.stderr) return f'Error cleaning: {str(e)}' - @mcp.tool() - def menuconfig() -> str: - """Open menuconfig interface (terminal-based)""" - try: - cmd = [ - sys.executable, - os.path.join(os.environ['IDF_PATH'], 'tools', 'idf.py'), - '-C', - current_project, - 'menuconfig', - ] - subprocess.run(cmd, cwd=current_project) - return 'Menuconfig completed' - except Exception as e: - return f'Error opening menuconfig: {str(e)}' - # === RESOURCES (Data Access) === - @mcp.resource('project://config') def get_project_config() -> str: """Get current project configuration""" + build_dir = args.get('build_dir', '') + config: dict[str, Any] = {} + + if not os.path.exists(build_dir): + config['build_dir_exists'] = False + return json.dumps(config, indent=2) + + config['build_dir'] = build_dir + proj_desc_fn = f'{build_dir}/project_description.json' + config['project_description'] = 'Project description does not exist' + try: - config_data = {} + with open(proj_desc_fn, encoding='utf-8') as f: + config['project_description'] = json.load(f) + except (OSError, ValueError): + pass - # Get target - config_data['target'] = get_target(current_project) - - # Get build directory info - build_dir = os.path.join(current_project, 'build') - config_data['build_dir'] = build_dir - config_data['build_exists'] = os.path.exists(build_dir) - - # Get project description if available - try: - ctx = get_build_context() - if 'proj_desc' in ctx: - config_data['project_description'] = ctx['proj_desc'] - except Exception: - pass - - # Get sdkconfig info - sdkconfig_path = os.path.join(current_project, 'sdkconfig') - config_data['sdkconfig_exists'] = os.path.exists(sdkconfig_path) - - return json.dumps(config_data, indent=2) - except Exception as e: - return f'Error getting config: {str(e)}' + return json.dumps(config, indent=2) @mcp.resource('project://status') def get_project_status() -> str: """Get current project build status""" try: status = { - 'project_path': current_project, - 'target': get_target(current_project), + 'project_path': project_path, + 'target': get_target(project_path), 'idf_version': idf_version(), - 'build_dir': os.path.join(current_project, 'build'), } # Check if built - build_dir = os.path.join(current_project, 'build') + build_dir = args.build_dir if os.path.exists(build_dir): - status['built'] = True - - # Check for common build artifacts + status['build_dir'] = build_dir artifacts = ['bootloader', 'partition_table', 'app-flash', 'flash_args'] status['artifacts'] = {} for artifact in artifacts: artifact_path = os.path.join(build_dir, artifact) status['artifacts'][artifact] = os.path.exists(artifact_path) else: - status['built'] = False + status['build_dir_exists'] = False return json.dumps(status, indent=2) except Exception as e: @@ -234,25 +246,19 @@ def action_extensions(base_actions: dict, project_path: str) -> dict: @mcp.resource('project://devices') def get_connected_devices() -> str: - """Get list of connected ESP devices""" + """Get list of connected devices""" try: - # Use esptool to list ports - cmd = [sys.executable, '-m', 'esptool', '--list-ports'] - result = subprocess.run(cmd, capture_output=True, text=True) - - devices = { - 'available_ports': result.stdout.strip().split('\n') if result.stdout else [], - 'esptool_available': result.returncode == 0, - } + import serial.tools.list_ports + devices_on_ports = [p.device.strip() for p in serial.tools.list_ports.comports()] + print(f'Devices: {devices_on_ports}', file=sys.stderr) + devices = {'available_ports': devices_on_ports if devices_on_ports else []} return json.dumps(devices, indent=2) except Exception as e: return f'Error getting devices: {str(e)}' # Start the MCP server print('MCP Server running on stdio...') - print('Available tools: build_project, set_target, flash_project, monitor_serial, clean_project, menuconfig') - print('Available resources: project://config, project://status, project://devices') try: mcp.run() @@ -268,6 +274,6 @@ def action_extensions(base_actions: dict, project_path: str) -> dict: 'callback': start_mcp_server, 'help': 'Start MCP (Model Context Protocol) server for AI integration', 'options': [], - } + }, } }