Files
esp-idf/tools/idf_py_actions/mcp_ext.py
T
2025-10-20 16:22:54 +02:00

274 lines
10 KiB
Python

# SPDX-FileCopyrightText: 2024-2025 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Apache-2.0
import json
import os
import subprocess
import sys
from typing import Any
from click.core import Context
from idf_py_actions.errors import FatalError
from idf_py_actions.tools import PropertyDict
from idf_py_actions.tools import get_build_context
from idf_py_actions.tools import get_target
from idf_py_actions.tools import idf_version
try:
from mcp.server.fastmcp import FastMCP
MCP_AVAILABLE = True
except ImportError:
MCP_AVAILABLE = False
def action_extensions(base_actions: dict, project_path: str) -> dict:
"""ESP-IDF MCP Server Extension"""
def start_mcp_server(action_name: str, ctx: Context, args: PropertyDict, **kwargs: Any) -> None:
"""Start MCP server for ESP-IDF project integration"""
if not MCP_AVAILABLE:
raise FatalError('MCP dependencies not available. Install with: ./install.sh --enable-mcp')
current_project = None
# Use current working directory if available, fallback to project_path
for project_dir in [os.getcwd(), project_path]:
if project_dir and os.path.exists(os.path.join(project_dir, 'CMakeLists.txt')):
current_project = project_dir
break
if not current_project:
raise FatalError('Open the MCP server in a valid ESP-IDF project directory.')
print(f'Starting ESP-IDF MCP Server for project: {current_project}')
print(f'Target: {get_target(current_project)}')
print(f'ESP-IDF Version: {idf_version()}')
print(f'Working Directory: {os.getcwd()}')
# Initialize MCP server
mcp = FastMCP('ESP-IDF')
# === TOOLS (Actions) ===
@mcp.tool()
def build_project(target: str = 'all') -> str:
"""Build ESP-IDF project with specified target"""
try:
# Use the current project directory
cmd = [
sys.executable,
os.path.join(os.environ['IDF_PATH'], 'tools', 'idf.py'),
'-C',
current_project,
'build',
]
result = subprocess.run(cmd, capture_output=True, text=True, cwd=current_project)
if result.returncode == 0:
return f'Successfully built target: {target}'
else:
return f'Build failed: {result.stderr}'
except Exception as e:
return f'Build failed: {str(e)}'
@mcp.tool()
def set_target(target: str) -> str:
"""Set the ESP-IDF target (esp32, esp32s3, esp32c6, etc.)"""
try:
cmd = [
sys.executable,
os.path.join(os.environ['IDF_PATH'], 'tools', 'idf.py'),
'-C',
current_project,
'set-target',
target,
]
result = subprocess.run(cmd, capture_output=True, text=True, cwd=current_project)
if result.returncode == 0:
return f'Target set to: {target}'
else:
return f'Failed to set target: {result.stderr}'
except Exception as e:
return f'Error setting target: {str(e)}'
@mcp.tool()
def flash_project(port: str | None = None) -> str:
"""Flash the built project to connected device"""
try:
flash_args = []
if port:
flash_args.extend(['-p', port])
flash_args.append('flash')
cmd = [
sys.executable,
os.path.join(os.environ['IDF_PATH'], 'tools', 'idf.py'),
'-C',
current_project,
] + flash_args
result = subprocess.run(cmd, capture_output=True, text=True, cwd=current_project)
if result.returncode == 0:
return f'Successfully flashed project{" to port " + port if port else ""}'
else:
return f'Flash failed: {result.stderr}'
except Exception as e:
return f'Error flashing: {str(e)}'
@mcp.tool()
def monitor_serial(port: str | None = None) -> str:
"""Start serial monitor (returns immediately, monitor runs in background)"""
try:
monitor_args = []
if port:
monitor_args.extend(['-p', port])
monitor_args.append('monitor')
cmd = [
sys.executable,
os.path.join(os.environ['IDF_PATH'], 'tools', 'idf.py'),
'-C',
current_project,
] + monitor_args
# Start monitor in background
subprocess.Popen(cmd, cwd=current_project)
return f'Serial monitor started{" on port " + port if port else ""}'
except Exception as e:
return f'Error starting monitor: {str(e)}'
@mcp.tool()
def clean_project() -> str:
"""Clean build artifacts"""
try:
cmd = [
sys.executable,
os.path.join(os.environ['IDF_PATH'], 'tools', 'idf.py'),
'-C',
current_project,
'clean',
]
result = subprocess.run(cmd, capture_output=True, text=True, cwd=current_project)
if result.returncode == 0:
return 'Project cleaned successfully'
else:
return f'Clean failed: {result.stderr}'
except Exception as e:
return f'Error cleaning: {str(e)}'
@mcp.tool()
def menuconfig() -> str:
"""Open menuconfig interface (terminal-based)"""
try:
cmd = [
sys.executable,
os.path.join(os.environ['IDF_PATH'], 'tools', 'idf.py'),
'-C',
current_project,
'menuconfig',
]
subprocess.run(cmd, cwd=current_project)
return 'Menuconfig completed'
except Exception as e:
return f'Error opening menuconfig: {str(e)}'
# === RESOURCES (Data Access) ===
@mcp.resource('project://config')
def get_project_config() -> str:
"""Get current project configuration"""
try:
config_data = {}
# Get target
config_data['target'] = get_target(current_project)
# Get build directory info
build_dir = os.path.join(current_project, 'build')
config_data['build_dir'] = build_dir
config_data['build_exists'] = os.path.exists(build_dir)
# Get project description if available
try:
ctx = get_build_context()
if 'proj_desc' in ctx:
config_data['project_description'] = ctx['proj_desc']
except Exception:
pass
# Get sdkconfig info
sdkconfig_path = os.path.join(current_project, 'sdkconfig')
config_data['sdkconfig_exists'] = os.path.exists(sdkconfig_path)
return json.dumps(config_data, indent=2)
except Exception as e:
return f'Error getting config: {str(e)}'
@mcp.resource('project://status')
def get_project_status() -> str:
"""Get current project build status"""
try:
status = {
'project_path': current_project,
'target': get_target(current_project),
'idf_version': idf_version(),
'build_dir': os.path.join(current_project, 'build'),
}
# Check if built
build_dir = os.path.join(current_project, 'build')
if os.path.exists(build_dir):
status['built'] = True
# Check for common build artifacts
artifacts = ['bootloader', 'partition_table', 'app-flash', 'flash_args']
status['artifacts'] = {}
for artifact in artifacts:
artifact_path = os.path.join(build_dir, artifact)
status['artifacts'][artifact] = os.path.exists(artifact_path)
else:
status['built'] = False
return json.dumps(status, indent=2)
except Exception as e:
return f'Error getting status: {str(e)}'
@mcp.resource('project://devices')
def get_connected_devices() -> str:
"""Get list of connected ESP devices"""
try:
# Use esptool to list ports
cmd = [sys.executable, '-m', 'esptool', '--list-ports']
result = subprocess.run(cmd, capture_output=True, text=True)
devices = {
'available_ports': result.stdout.strip().split('\n') if result.stdout else [],
'esptool_available': result.returncode == 0,
}
return json.dumps(devices, indent=2)
except Exception as e:
return f'Error getting devices: {str(e)}'
# Start the MCP server
print('MCP Server running on stdio...')
print('Available tools: build_project, set_target, flash_project, monitor_serial, clean_project, menuconfig')
print('Available resources: project://config, project://status, project://devices')
try:
mcp.run()
except KeyboardInterrupt:
print('\nMCP Server stopped.')
except Exception as e:
print(f'MCP Server error: {e}')
# Return the action extension
return {
'actions': {
'mcp-server': {
'callback': start_mcp_server,
'help': 'Start MCP (Model Context Protocol) server for AI integration',
'options': [],
}
}
}