mirror of
https://github.com/espressif/esp-idf.git
synced 2026-04-27 19:13:21 +00:00
feat(tools): mcp_ext.py commands adjustments
This commit is contained in:
+122
-116
@@ -1,20 +1,29 @@
|
||||
# SPDX-FileCopyrightText: 2024-2025 Espressif Systems (Shanghai) CO LTD
|
||||
# SPDX-FileCopyrightText: 2024-2026 Espressif Systems (Shanghai) CO LTD
|
||||
# SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
import json
|
||||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
from click.core import Context
|
||||
|
||||
from idf_py_actions.errors import FatalError
|
||||
from idf_py_actions.tools import PropertyDict
|
||||
from idf_py_actions.tools import get_build_context
|
||||
from idf_py_actions.tools import get_target
|
||||
from idf_py_actions.tools import idf_version
|
||||
|
||||
try:
|
||||
from idf_component_tools.build_system_tools import CMAKE_PROJECT_LINE
|
||||
except ImportError:
|
||||
CMAKE_PROJECT_LINE = [
|
||||
r'include($ENV{IDF_PATH}/tools/cmake/project.cmake)',
|
||||
r'include($ENV{IDF_PATH}/tools/cmakev2/idf.cmake)',
|
||||
]
|
||||
|
||||
|
||||
try:
|
||||
from mcp.server.fastmcp import FastMCP
|
||||
|
||||
@@ -23,51 +32,98 @@ except ImportError:
|
||||
MCP_AVAILABLE = False
|
||||
|
||||
|
||||
def is_valid_project_dir(directory: str) -> bool:
|
||||
"""
|
||||
Determine if the given directory is a valid ESP-IDF project directory.
|
||||
- Must be a directory.
|
||||
- Must contain a CMakeLists.txt.
|
||||
- CMakeLists.txt must include CMAKE_PROJECT_LINE.
|
||||
"""
|
||||
root = Path(directory)
|
||||
|
||||
if not root.is_dir():
|
||||
return False
|
||||
cmakelists_path = root / 'CMakeLists.txt'
|
||||
if not cmakelists_path.is_file():
|
||||
return False
|
||||
|
||||
try:
|
||||
with open(str(cmakelists_path), encoding='utf-8') as f:
|
||||
for line in f:
|
||||
if any(proj_line in line for proj_line in CMAKE_PROJECT_LINE):
|
||||
return True
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
return False
|
||||
|
||||
|
||||
def action_extensions(base_actions: dict, project_path: str) -> dict:
|
||||
"""ESP-IDF MCP Server Extension"""
|
||||
|
||||
def start_mcp_server(action_name: str, ctx: Context, args: PropertyDict, **kwargs: Any) -> None:
|
||||
"""Start MCP server for ESP-IDF project integration"""
|
||||
|
||||
if not MCP_AVAILABLE:
|
||||
raise FatalError('MCP dependencies not available. Install with: ./install.sh --enable-mcp')
|
||||
raise FatalError(
|
||||
'MCP dependencies not available. '
|
||||
'Install ESP-IDF using the EIM installer and select the "mcp" feature to be included. '
|
||||
'For more information, refer to the official Espressif EIM Installer documentation '
|
||||
'or use "idf.py docs" and search for EIM configuration instructions.'
|
||||
)
|
||||
|
||||
current_project = None
|
||||
# Use current working directory if available, fallback to project_path
|
||||
for project_dir in [os.getcwd(), project_path]:
|
||||
if project_dir and os.path.exists(os.path.join(project_dir, 'CMakeLists.txt')):
|
||||
current_project = project_dir
|
||||
break
|
||||
|
||||
if not current_project:
|
||||
raise FatalError('Open the MCP server in a valid ESP-IDF project directory.')
|
||||
|
||||
print(f'Starting ESP-IDF MCP Server for project: {current_project}')
|
||||
print(f'Target: {get_target(current_project)}')
|
||||
print(f'ESP-IDF Version: {idf_version()}')
|
||||
print(f'Working Directory: {os.getcwd()}')
|
||||
# Verify that mcp-server was executed from a valid ESP-IDF project directory.
|
||||
# This is necessary to obtain the correct context such as args, project path, etc.
|
||||
if not is_valid_project_dir(project_path):
|
||||
current_project = None
|
||||
for candidate in [os.getcwd(), os.environ.get('IDF_MCP_WORKSPACE_FOLDER', '')]:
|
||||
if is_valid_project_dir(candidate):
|
||||
current_project = candidate
|
||||
break
|
||||
if not current_project:
|
||||
raise FatalError('Open the MCP server in a valid ESP-IDF project directory.')
|
||||
try:
|
||||
cmd = [
|
||||
sys.executable,
|
||||
os.path.join(os.environ['IDF_PATH'], 'tools', 'idf.py'),
|
||||
'-C',
|
||||
current_project,
|
||||
'mcp-server',
|
||||
]
|
||||
print(
|
||||
f'Starting ESP-IDF MCP Server with command: {" ".join(cmd)} in project path: {current_project}',
|
||||
file=sys.stderr,
|
||||
)
|
||||
subprocess.run(cmd, cwd=current_project, check=True)
|
||||
return
|
||||
except Exception as e:
|
||||
print(f'ERROR: Failed to start ESP-IDF MCP Server: {str(e)}', file=sys.stderr)
|
||||
raise FatalError(f'Failed to start ESP-IDF MCP Server: {str(e)}') from e
|
||||
|
||||
# Initialize MCP server
|
||||
mcp = FastMCP('ESP-IDF')
|
||||
|
||||
# === TOOLS (Actions) ===
|
||||
@mcp.tool()
|
||||
def build_project(target: str = 'all') -> str:
|
||||
"""Build ESP-IDF project with specified target"""
|
||||
def build_project() -> str:
|
||||
"""Build ESP-IDF project"""
|
||||
try:
|
||||
# Use the current project directory
|
||||
cmd = [
|
||||
sys.executable,
|
||||
os.path.join(os.environ['IDF_PATH'], 'tools', 'idf.py'),
|
||||
'-C',
|
||||
current_project,
|
||||
'build',
|
||||
]
|
||||
result = subprocess.run(cmd, capture_output=True, text=True, cwd=current_project)
|
||||
# Information logs are shown in some mcp clients using stderr
|
||||
print(f'INFO: Building project with command: {" ".join(cmd)} in path: {project_path}', file=sys.stderr)
|
||||
result = subprocess.run(cmd, capture_output=True, text=True, cwd=project_path)
|
||||
if result.returncode == 0:
|
||||
return f'Successfully built target: {target}'
|
||||
print('INFO: Build successful', file=sys.stderr)
|
||||
return 'Successfully built project'
|
||||
else:
|
||||
print(f'ERROR: Build failed: {result.stderr}', file=sys.stderr)
|
||||
return f'Build failed: {result.stderr}'
|
||||
except Exception as e:
|
||||
print(f'ERROR: Build failed: {str(e)}', file=sys.stderr)
|
||||
return f'Build failed: {str(e)}'
|
||||
|
||||
@mcp.tool()
|
||||
@@ -77,17 +133,19 @@ def action_extensions(base_actions: dict, project_path: str) -> dict:
|
||||
cmd = [
|
||||
sys.executable,
|
||||
os.path.join(os.environ['IDF_PATH'], 'tools', 'idf.py'),
|
||||
'-C',
|
||||
current_project,
|
||||
'set-target',
|
||||
target,
|
||||
]
|
||||
result = subprocess.run(cmd, capture_output=True, text=True, cwd=current_project)
|
||||
print(f'INFO: Setting target with command: {" ".join(cmd)} in path: {project_path}', file=sys.stderr)
|
||||
result = subprocess.run(cmd, capture_output=True, text=True, cwd=project_path)
|
||||
if result.returncode == 0:
|
||||
print(f'INFO: Target set to: {target}', file=sys.stderr)
|
||||
return f'Target set to: {target}'
|
||||
else:
|
||||
print(f'ERROR: Failed to set target: {result.stderr}', file=sys.stderr)
|
||||
return f'Failed to set target: {result.stderr}'
|
||||
except Exception as e:
|
||||
print(f'ERROR: Failed to set target: {str(e)}', file=sys.stderr)
|
||||
return f'Error setting target: {str(e)}'
|
||||
|
||||
@mcp.tool()
|
||||
@@ -102,40 +160,20 @@ def action_extensions(base_actions: dict, project_path: str) -> dict:
|
||||
cmd = [
|
||||
sys.executable,
|
||||
os.path.join(os.environ['IDF_PATH'], 'tools', 'idf.py'),
|
||||
'-C',
|
||||
current_project,
|
||||
] + flash_args
|
||||
result = subprocess.run(cmd, capture_output=True, text=True, cwd=current_project)
|
||||
print(f'INFO: Flashing project with command: {" ".join(cmd)} in path: {project_path}', file=sys.stderr)
|
||||
result = subprocess.run(cmd, capture_output=True, text=True, cwd=project_path)
|
||||
|
||||
if result.returncode == 0:
|
||||
print('INFO: Flash successful', file=sys.stderr)
|
||||
return f'Successfully flashed project{" to port " + port if port else ""}'
|
||||
else:
|
||||
print(f'ERROR: Flash failed: {result.stderr}', file=sys.stderr)
|
||||
return f'Flash failed: {result.stderr}'
|
||||
except Exception as e:
|
||||
print(f'ERROR: Flash failed: {str(e)}', file=sys.stderr)
|
||||
return f'Error flashing: {str(e)}'
|
||||
|
||||
@mcp.tool()
|
||||
def monitor_serial(port: str | None = None) -> str:
|
||||
"""Start serial monitor (returns immediately, monitor runs in background)"""
|
||||
try:
|
||||
monitor_args = []
|
||||
if port:
|
||||
monitor_args.extend(['-p', port])
|
||||
monitor_args.append('monitor')
|
||||
|
||||
cmd = [
|
||||
sys.executable,
|
||||
os.path.join(os.environ['IDF_PATH'], 'tools', 'idf.py'),
|
||||
'-C',
|
||||
current_project,
|
||||
] + monitor_args
|
||||
|
||||
# Start monitor in background
|
||||
subprocess.Popen(cmd, cwd=current_project)
|
||||
return f'Serial monitor started{" on port " + port if port else ""}'
|
||||
except Exception as e:
|
||||
return f'Error starting monitor: {str(e)}'
|
||||
|
||||
@mcp.tool()
|
||||
def clean_project() -> str:
|
||||
"""Clean build artifacts"""
|
||||
@@ -143,90 +181,64 @@ def action_extensions(base_actions: dict, project_path: str) -> dict:
|
||||
cmd = [
|
||||
sys.executable,
|
||||
os.path.join(os.environ['IDF_PATH'], 'tools', 'idf.py'),
|
||||
'-C',
|
||||
current_project,
|
||||
'clean',
|
||||
]
|
||||
result = subprocess.run(cmd, capture_output=True, text=True, cwd=current_project)
|
||||
print(f'INFO: Cleaning project with command: {" ".join(cmd)} in path: {project_path}', file=sys.stderr)
|
||||
result = subprocess.run(cmd, capture_output=True, text=True, cwd=project_path)
|
||||
if result.returncode == 0:
|
||||
print('INFO: Project cleaned successfully', file=sys.stderr)
|
||||
return 'Project cleaned successfully'
|
||||
else:
|
||||
print(f'ERROR: Clean failed: {result.stderr}', file=sys.stderr)
|
||||
return f'Clean failed: {result.stderr}'
|
||||
except Exception as e:
|
||||
print(f'ERROR: Error cleaning: {str(e)}', file=sys.stderr)
|
||||
return f'Error cleaning: {str(e)}'
|
||||
|
||||
@mcp.tool()
|
||||
def menuconfig() -> str:
|
||||
"""Open menuconfig interface (terminal-based)"""
|
||||
try:
|
||||
cmd = [
|
||||
sys.executable,
|
||||
os.path.join(os.environ['IDF_PATH'], 'tools', 'idf.py'),
|
||||
'-C',
|
||||
current_project,
|
||||
'menuconfig',
|
||||
]
|
||||
subprocess.run(cmd, cwd=current_project)
|
||||
return 'Menuconfig completed'
|
||||
except Exception as e:
|
||||
return f'Error opening menuconfig: {str(e)}'
|
||||
|
||||
# === RESOURCES (Data Access) ===
|
||||
|
||||
@mcp.resource('project://config')
|
||||
def get_project_config() -> str:
|
||||
"""Get current project configuration"""
|
||||
build_dir = args.get('build_dir', '')
|
||||
config: dict[str, Any] = {}
|
||||
|
||||
if not os.path.exists(build_dir):
|
||||
config['build_dir_exists'] = False
|
||||
return json.dumps(config, indent=2)
|
||||
|
||||
config['build_dir'] = build_dir
|
||||
proj_desc_fn = f'{build_dir}/project_description.json'
|
||||
config['project_description'] = 'Project description does not exist'
|
||||
|
||||
try:
|
||||
config_data = {}
|
||||
with open(proj_desc_fn, encoding='utf-8') as f:
|
||||
config['project_description'] = json.load(f)
|
||||
except (OSError, ValueError):
|
||||
pass
|
||||
|
||||
# Get target
|
||||
config_data['target'] = get_target(current_project)
|
||||
|
||||
# Get build directory info
|
||||
build_dir = os.path.join(current_project, 'build')
|
||||
config_data['build_dir'] = build_dir
|
||||
config_data['build_exists'] = os.path.exists(build_dir)
|
||||
|
||||
# Get project description if available
|
||||
try:
|
||||
ctx = get_build_context()
|
||||
if 'proj_desc' in ctx:
|
||||
config_data['project_description'] = ctx['proj_desc']
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Get sdkconfig info
|
||||
sdkconfig_path = os.path.join(current_project, 'sdkconfig')
|
||||
config_data['sdkconfig_exists'] = os.path.exists(sdkconfig_path)
|
||||
|
||||
return json.dumps(config_data, indent=2)
|
||||
except Exception as e:
|
||||
return f'Error getting config: {str(e)}'
|
||||
return json.dumps(config, indent=2)
|
||||
|
||||
@mcp.resource('project://status')
|
||||
def get_project_status() -> str:
|
||||
"""Get current project build status"""
|
||||
try:
|
||||
status = {
|
||||
'project_path': current_project,
|
||||
'target': get_target(current_project),
|
||||
'project_path': project_path,
|
||||
'target': get_target(project_path),
|
||||
'idf_version': idf_version(),
|
||||
'build_dir': os.path.join(current_project, 'build'),
|
||||
}
|
||||
|
||||
# Check if built
|
||||
build_dir = os.path.join(current_project, 'build')
|
||||
build_dir = args.build_dir
|
||||
if os.path.exists(build_dir):
|
||||
status['built'] = True
|
||||
|
||||
# Check for common build artifacts
|
||||
status['build_dir'] = build_dir
|
||||
artifacts = ['bootloader', 'partition_table', 'app-flash', 'flash_args']
|
||||
status['artifacts'] = {}
|
||||
for artifact in artifacts:
|
||||
artifact_path = os.path.join(build_dir, artifact)
|
||||
status['artifacts'][artifact] = os.path.exists(artifact_path)
|
||||
else:
|
||||
status['built'] = False
|
||||
status['build_dir_exists'] = False
|
||||
|
||||
return json.dumps(status, indent=2)
|
||||
except Exception as e:
|
||||
@@ -234,25 +246,19 @@ def action_extensions(base_actions: dict, project_path: str) -> dict:
|
||||
|
||||
@mcp.resource('project://devices')
|
||||
def get_connected_devices() -> str:
|
||||
"""Get list of connected ESP devices"""
|
||||
"""Get list of connected devices"""
|
||||
try:
|
||||
# Use esptool to list ports
|
||||
cmd = [sys.executable, '-m', 'esptool', '--list-ports']
|
||||
result = subprocess.run(cmd, capture_output=True, text=True)
|
||||
|
||||
devices = {
|
||||
'available_ports': result.stdout.strip().split('\n') if result.stdout else [],
|
||||
'esptool_available': result.returncode == 0,
|
||||
}
|
||||
import serial.tools.list_ports
|
||||
|
||||
devices_on_ports = [p.device.strip() for p in serial.tools.list_ports.comports()]
|
||||
print(f'Devices: {devices_on_ports}', file=sys.stderr)
|
||||
devices = {'available_ports': devices_on_ports if devices_on_ports else []}
|
||||
return json.dumps(devices, indent=2)
|
||||
except Exception as e:
|
||||
return f'Error getting devices: {str(e)}'
|
||||
|
||||
# Start the MCP server
|
||||
print('MCP Server running on stdio...')
|
||||
print('Available tools: build_project, set_target, flash_project, monitor_serial, clean_project, menuconfig')
|
||||
print('Available resources: project://config, project://status, project://devices')
|
||||
|
||||
try:
|
||||
mcp.run()
|
||||
@@ -268,6 +274,6 @@ def action_extensions(base_actions: dict, project_path: str) -> dict:
|
||||
'callback': start_mcp_server,
|
||||
'help': 'Start MCP (Model Context Protocol) server for AI integration',
|
||||
'options': [],
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user