test(panic): add retry logic to espcoredump subprocess call

This commit is contained in:
Erhan Kurubas
2026-01-16 21:44:53 +03:00
parent ab213a9987
commit b38c003cb1
@@ -1,10 +1,11 @@
# SPDX-FileCopyrightText: 2022-2025 Espressif Systems (Shanghai) CO LTD
# SPDX-FileCopyrightText: 2022-2026 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Unlicense OR CC0-1.0
import logging
import os
import re
import subprocess
import sys
import time
from typing import Any
from typing import Dict
from typing import List
@@ -13,8 +14,8 @@ from typing import TextIO
from typing import Union
import pexpect
from panic_utils import attach_logger
from panic_utils import NoGdbProcessError
from panic_utils import attach_logger
from panic_utils import quote_string
from panic_utils import sha256
from panic_utils import verify_valid_gdb_subprocess
@@ -32,7 +33,10 @@ class PanicTestDut(IdfDut):
COREDUMP_UART_END = r'================= CORE DUMP END ================='
COREDUMP_CHECKSUM = r"Coredump checksum='([a-fA-F0-9]+)'"
REBOOT = r'.*Rebooting\.\.\.'
CPU_RESET = r'.*rst:.*(RTC_SW_CPU_RST|SW_CPU_RESET|SW_CPU|RTCWDT_RTC_RESET|LP_WDT_SYS|RTCWDT_RTC_RST|CHIP_LP_WDT_RESET|RTC_WDT_SYS)\b'
CPU_RESET = (
r'.*rst:.*(RTC_SW_CPU_RST|SW_CPU_RESET|SW_CPU|RTCWDT_RTC_RESET|'
r'LP_WDT_SYS|RTCWDT_RTC_RST|CHIP_LP_WDT_RESET|RTC_WDT_SYS)\b'
)
app: IdfApp
serial: IdfSerial
@@ -113,9 +117,7 @@ class PanicTestDut(IdfDut):
def expect_elf_sha256(self, caption: str = 'ELF file SHA256: ') -> None:
"""Expect method for ELF SHA256 line"""
elf_sha256 = sha256(self.app.elf_file)
elf_sha256_len = int(
self.app.sdkconfig.get('CONFIG_APP_RETRIEVE_LEN_ELF_SHA', '9')
)
elf_sha256_len = int(self.app.sdkconfig.get('CONFIG_APP_RETRIEVE_LEN_ELF_SHA', '9'))
self.expect_exact(caption + elf_sha256[0:elf_sha256_len])
def expect_coredump(self, output_file_name: str, patterns: List[Union[str, re.Pattern]]) -> None:
@@ -131,16 +133,12 @@ class PanicTestDut(IdfDut):
else:
raise ValueError(f'Unsupported input type: {type(pattern).__name__}')
def _call_espcoredump(
self, extra_args: List[str], output_file_name: str
) -> None:
def _call_espcoredump(self, extra_args: list[str], output_file_name: str, max_retries: int = 3) -> None:
# no "with" here, since we need the file to be open for later inspection by the test case
if not self.coredump_output:
self.coredump_output = open(output_file_name, 'w')
espcoredump_script = os.path.join(
os.environ['IDF_PATH'], 'components', 'espcoredump', 'espcoredump.py'
)
espcoredump_script = os.path.join(os.environ['IDF_PATH'], 'components', 'espcoredump', 'espcoredump.py')
espcoredump_args = [
sys.executable,
espcoredump_script,
@@ -153,18 +151,31 @@ class PanicTestDut(IdfDut):
logging.info('espcoredump output is written to %s', self.coredump_output.name)
self.serial.close()
try:
subprocess.check_call(espcoredump_args, stdout=self.coredump_output, stderr=self.coredump_output)
except subprocess.CalledProcessError:
self.coredump_output.flush()
with open(output_file_name, 'r') as file:
logging.error('espcoredump failed with output: %s', file.read())
raise
finally:
self.coredump_output.seek(0)
for attempt in range(max_retries):
try:
if attempt > 0:
# Reset output file for retry
time.sleep(1)
self.coredump_output.seek(0)
self.coredump_output.truncate()
logging.info(f'Retrying espcoredump (attempt {attempt + 1}/{max_retries})')
subprocess.check_call(espcoredump_args, stdout=self.coredump_output, stderr=self.coredump_output)
self.coredump_output.seek(0)
return # Success
except subprocess.CalledProcessError:
self.coredump_output.flush()
with open(output_file_name) as file:
content = file.read()
if attempt < max_retries - 1:
logging.warning(f'espcoredump attempt {attempt + 1}/{max_retries} failed with output: {content}')
else:
logging.error(f'espcoredump failed after {max_retries} attempts with output: {content}')
raise
def process_coredump_uart(
self, coredump_base64: Any, expected: Optional[List[Union[str, re.Pattern]]] = None,
self,
coredump_base64: Any,
expected: Optional[List[Union[str, re.Pattern]]] = None,
) -> Any:
with open(os.path.join(self.logdir, 'coredump_data.b64'), 'w') as coredump_file:
logging.info('Writing UART base64 core dump to %s', coredump_file.name)
@@ -183,9 +194,7 @@ class PanicTestDut(IdfDut):
coredump_file_name = os.path.join(self.logdir, 'coredump_data.bin')
logging.info('Writing flash binary core dump to %s', coredump_file_name)
output_file_name = os.path.join(self.logdir, 'coredump_flash_result.txt')
self._call_espcoredump(
['--core-format', 'raw', '--save-core', coredump_file_name], output_file_name
)
self._call_espcoredump(['--core-format', 'raw', '--save-core', coredump_file_name], output_file_name)
if expected:
self.expect_coredump(output_file_name, expected)
return coredump_file_name
@@ -210,12 +219,14 @@ class PanicTestDut(IdfDut):
gdb_path = 'riscv32-esp-elf-gdb'
try:
from pygdbmi.constants import GdbTimeoutError
gdb_command = [gdb_path] + gdb_args
self.gdbmi = GdbController(command=gdb_command)
pygdbmi_logger = attach_logger()
except ImportError:
# fallback for pygdbmi<0.10.0.0.
from pygdbmi.gdbcontroller import GdbTimeoutError
self.gdbmi = GdbController(gdb_path=gdb_path, gdb_args=gdb_args)
pygdbmi_logger = self.gdbmi.logger
@@ -225,9 +236,7 @@ class PanicTestDut(IdfDut):
while pygdbmi_logger.hasHandlers():
pygdbmi_logger.removeHandler(pygdbmi_logger.handlers[0])
log_handler = logging.FileHandler(pygdbmi_log_file_name)
log_handler.setFormatter(
logging.Formatter('%(asctime)s %(levelname)s: %(message)s')
)
log_handler.setFormatter(logging.Formatter('%(asctime)s %(levelname)s: %(message)s'))
logging.info(f'Saving pygdbmi logs to {pygdbmi_log_file_name}')
pygdbmi_logger.addHandler(log_handler)
try:
@@ -251,16 +260,12 @@ class PanicTestDut(IdfDut):
logging.info('GDB response: %s', resp)
break # success
except GdbTimeoutError:
logging.warning(
'GDB internal error: cannot get response from the subprocess'
)
logging.warning('GDB internal error: cannot get response from the subprocess')
except NoGdbProcessError:
logging.error('GDB internal error: process is not running')
break # failure - TODO: create another GdbController
except ValueError:
logging.error(
'GDB internal error: select() returned an unexpected file number'
)
logging.error('GDB internal error: select() returned an unexpected file number')
# Set up logging for GDB remote protocol
gdb_remotelog_file_name = os.path.join(self.logdir, 'gdb_remote_log.txt')
@@ -271,7 +276,6 @@ class PanicTestDut(IdfDut):
# Prepare gdb for the gdb stub
def start_gdb_for_gdbstub(self) -> None:
self.run_gdb()
# Connect GDB to UART
@@ -280,8 +284,9 @@ class PanicTestDut(IdfDut):
self.gdb_write('-gdb-set serial baud 115200')
if sys.platform == 'darwin':
assert '/dev/tty.' not in self.serial.port, \
'/dev/tty.* ports can\'t be used with GDB on macOS. Use with /dev/cu.* instead.'
assert '/dev/tty.' not in self.serial.port, (
"/dev/tty.* ports can't be used with GDB on macOS. Use with /dev/cu.* instead."
)
# Make sure we get the 'stopped' notification
responses = self.gdb_write('-target-select remote ' + self.serial.port)
@@ -307,7 +312,6 @@ class PanicTestDut(IdfDut):
# Prepare gdb to debug coredump file
def start_gdb_for_coredump(self, elf_file: str) -> None:
self.run_gdb()
self.gdb_write('core {}'.format(elf_file))
@@ -326,9 +330,7 @@ class PanicTestDut(IdfDut):
return self.find_gdb_response('done', 'result', responses)['payload']['value']
@staticmethod
def verify_gdb_backtrace(
gdb_backtrace: List[Any], expected_functions_list: List[Any]
) -> None:
def verify_gdb_backtrace(gdb_backtrace: List[Any], expected_functions_list: List[Any]) -> None:
"""
Raises an assert if the function names listed in expected_functions_list do not match the backtrace
given by gdb_backtrace argument. The latter is in the same format as returned by gdb_backtrace()
@@ -341,9 +343,7 @@ class PanicTestDut(IdfDut):
assert False, 'Got unexpected backtrace'
@staticmethod
def find_gdb_response(
message: str, response_type: str, responses: List[Any]
) -> Any:
def find_gdb_response(message: str, response_type: str, responses: List[Any]) -> Any:
"""
Helper function which extracts one response from an array of GDB responses, filtering
by message and type. Returned message is a dictionary, refer to pygdbmi docs for the format.