From b38c003cb14b3c5253757b76e687955eb2e07ba0 Mon Sep 17 00:00:00 2001 From: Erhan Kurubas Date: Fri, 16 Jan 2026 21:44:53 +0300 Subject: [PATCH] test(panic): add retry logic to espcoredump subprocess call --- .../system/panic/test_panic_util/panic_dut.py | 88 +++++++++---------- 1 file changed, 44 insertions(+), 44 deletions(-) diff --git a/tools/test_apps/system/panic/test_panic_util/panic_dut.py b/tools/test_apps/system/panic/test_panic_util/panic_dut.py index 0e03d201ae..3f6c4d7c04 100644 --- a/tools/test_apps/system/panic/test_panic_util/panic_dut.py +++ b/tools/test_apps/system/panic/test_panic_util/panic_dut.py @@ -1,10 +1,11 @@ -# SPDX-FileCopyrightText: 2022-2025 Espressif Systems (Shanghai) CO LTD +# SPDX-FileCopyrightText: 2022-2026 Espressif Systems (Shanghai) CO LTD # SPDX-License-Identifier: Unlicense OR CC0-1.0 import logging import os import re import subprocess import sys +import time from typing import Any from typing import Dict from typing import List @@ -13,8 +14,8 @@ from typing import TextIO from typing import Union import pexpect -from panic_utils import attach_logger from panic_utils import NoGdbProcessError +from panic_utils import attach_logger from panic_utils import quote_string from panic_utils import sha256 from panic_utils import verify_valid_gdb_subprocess @@ -32,7 +33,10 @@ class PanicTestDut(IdfDut): COREDUMP_UART_END = r'================= CORE DUMP END =================' COREDUMP_CHECKSUM = r"Coredump checksum='([a-fA-F0-9]+)'" REBOOT = r'.*Rebooting\.\.\.' - CPU_RESET = r'.*rst:.*(RTC_SW_CPU_RST|SW_CPU_RESET|SW_CPU|RTCWDT_RTC_RESET|LP_WDT_SYS|RTCWDT_RTC_RST|CHIP_LP_WDT_RESET|RTC_WDT_SYS)\b' + CPU_RESET = ( + r'.*rst:.*(RTC_SW_CPU_RST|SW_CPU_RESET|SW_CPU|RTCWDT_RTC_RESET|' + r'LP_WDT_SYS|RTCWDT_RTC_RST|CHIP_LP_WDT_RESET|RTC_WDT_SYS)\b' + ) app: IdfApp serial: IdfSerial @@ -113,9 +117,7 @@ class PanicTestDut(IdfDut): def expect_elf_sha256(self, caption: str = 'ELF file SHA256: ') -> None: """Expect method for ELF SHA256 line""" elf_sha256 = sha256(self.app.elf_file) - elf_sha256_len = int( - self.app.sdkconfig.get('CONFIG_APP_RETRIEVE_LEN_ELF_SHA', '9') - ) + elf_sha256_len = int(self.app.sdkconfig.get('CONFIG_APP_RETRIEVE_LEN_ELF_SHA', '9')) self.expect_exact(caption + elf_sha256[0:elf_sha256_len]) def expect_coredump(self, output_file_name: str, patterns: List[Union[str, re.Pattern]]) -> None: @@ -131,16 +133,12 @@ class PanicTestDut(IdfDut): else: raise ValueError(f'Unsupported input type: {type(pattern).__name__}') - def _call_espcoredump( - self, extra_args: List[str], output_file_name: str - ) -> None: + def _call_espcoredump(self, extra_args: list[str], output_file_name: str, max_retries: int = 3) -> None: # no "with" here, since we need the file to be open for later inspection by the test case if not self.coredump_output: self.coredump_output = open(output_file_name, 'w') - espcoredump_script = os.path.join( - os.environ['IDF_PATH'], 'components', 'espcoredump', 'espcoredump.py' - ) + espcoredump_script = os.path.join(os.environ['IDF_PATH'], 'components', 'espcoredump', 'espcoredump.py') espcoredump_args = [ sys.executable, espcoredump_script, @@ -153,18 +151,31 @@ class PanicTestDut(IdfDut): logging.info('espcoredump output is written to %s', self.coredump_output.name) self.serial.close() - try: - subprocess.check_call(espcoredump_args, stdout=self.coredump_output, stderr=self.coredump_output) - except subprocess.CalledProcessError: - self.coredump_output.flush() - with open(output_file_name, 'r') as file: - logging.error('espcoredump failed with output: %s', file.read()) - raise - finally: - self.coredump_output.seek(0) + for attempt in range(max_retries): + try: + if attempt > 0: + # Reset output file for retry + time.sleep(1) + self.coredump_output.seek(0) + self.coredump_output.truncate() + logging.info(f'Retrying espcoredump (attempt {attempt + 1}/{max_retries})') + subprocess.check_call(espcoredump_args, stdout=self.coredump_output, stderr=self.coredump_output) + self.coredump_output.seek(0) + return # Success + except subprocess.CalledProcessError: + self.coredump_output.flush() + with open(output_file_name) as file: + content = file.read() + if attempt < max_retries - 1: + logging.warning(f'espcoredump attempt {attempt + 1}/{max_retries} failed with output: {content}') + else: + logging.error(f'espcoredump failed after {max_retries} attempts with output: {content}') + raise def process_coredump_uart( - self, coredump_base64: Any, expected: Optional[List[Union[str, re.Pattern]]] = None, + self, + coredump_base64: Any, + expected: Optional[List[Union[str, re.Pattern]]] = None, ) -> Any: with open(os.path.join(self.logdir, 'coredump_data.b64'), 'w') as coredump_file: logging.info('Writing UART base64 core dump to %s', coredump_file.name) @@ -183,9 +194,7 @@ class PanicTestDut(IdfDut): coredump_file_name = os.path.join(self.logdir, 'coredump_data.bin') logging.info('Writing flash binary core dump to %s', coredump_file_name) output_file_name = os.path.join(self.logdir, 'coredump_flash_result.txt') - self._call_espcoredump( - ['--core-format', 'raw', '--save-core', coredump_file_name], output_file_name - ) + self._call_espcoredump(['--core-format', 'raw', '--save-core', coredump_file_name], output_file_name) if expected: self.expect_coredump(output_file_name, expected) return coredump_file_name @@ -210,12 +219,14 @@ class PanicTestDut(IdfDut): gdb_path = 'riscv32-esp-elf-gdb' try: from pygdbmi.constants import GdbTimeoutError + gdb_command = [gdb_path] + gdb_args self.gdbmi = GdbController(command=gdb_command) pygdbmi_logger = attach_logger() except ImportError: # fallback for pygdbmi<0.10.0.0. from pygdbmi.gdbcontroller import GdbTimeoutError + self.gdbmi = GdbController(gdb_path=gdb_path, gdb_args=gdb_args) pygdbmi_logger = self.gdbmi.logger @@ -225,9 +236,7 @@ class PanicTestDut(IdfDut): while pygdbmi_logger.hasHandlers(): pygdbmi_logger.removeHandler(pygdbmi_logger.handlers[0]) log_handler = logging.FileHandler(pygdbmi_log_file_name) - log_handler.setFormatter( - logging.Formatter('%(asctime)s %(levelname)s: %(message)s') - ) + log_handler.setFormatter(logging.Formatter('%(asctime)s %(levelname)s: %(message)s')) logging.info(f'Saving pygdbmi logs to {pygdbmi_log_file_name}') pygdbmi_logger.addHandler(log_handler) try: @@ -251,16 +260,12 @@ class PanicTestDut(IdfDut): logging.info('GDB response: %s', resp) break # success except GdbTimeoutError: - logging.warning( - 'GDB internal error: cannot get response from the subprocess' - ) + logging.warning('GDB internal error: cannot get response from the subprocess') except NoGdbProcessError: logging.error('GDB internal error: process is not running') break # failure - TODO: create another GdbController except ValueError: - logging.error( - 'GDB internal error: select() returned an unexpected file number' - ) + logging.error('GDB internal error: select() returned an unexpected file number') # Set up logging for GDB remote protocol gdb_remotelog_file_name = os.path.join(self.logdir, 'gdb_remote_log.txt') @@ -271,7 +276,6 @@ class PanicTestDut(IdfDut): # Prepare gdb for the gdb stub def start_gdb_for_gdbstub(self) -> None: - self.run_gdb() # Connect GDB to UART @@ -280,8 +284,9 @@ class PanicTestDut(IdfDut): self.gdb_write('-gdb-set serial baud 115200') if sys.platform == 'darwin': - assert '/dev/tty.' not in self.serial.port, \ - '/dev/tty.* ports can\'t be used with GDB on macOS. Use with /dev/cu.* instead.' + assert '/dev/tty.' not in self.serial.port, ( + "/dev/tty.* ports can't be used with GDB on macOS. Use with /dev/cu.* instead." + ) # Make sure we get the 'stopped' notification responses = self.gdb_write('-target-select remote ' + self.serial.port) @@ -307,7 +312,6 @@ class PanicTestDut(IdfDut): # Prepare gdb to debug coredump file def start_gdb_for_coredump(self, elf_file: str) -> None: - self.run_gdb() self.gdb_write('core {}'.format(elf_file)) @@ -326,9 +330,7 @@ class PanicTestDut(IdfDut): return self.find_gdb_response('done', 'result', responses)['payload']['value'] @staticmethod - def verify_gdb_backtrace( - gdb_backtrace: List[Any], expected_functions_list: List[Any] - ) -> None: + def verify_gdb_backtrace(gdb_backtrace: List[Any], expected_functions_list: List[Any]) -> None: """ Raises an assert if the function names listed in expected_functions_list do not match the backtrace given by gdb_backtrace argument. The latter is in the same format as returned by gdb_backtrace() @@ -341,9 +343,7 @@ class PanicTestDut(IdfDut): assert False, 'Got unexpected backtrace' @staticmethod - def find_gdb_response( - message: str, response_type: str, responses: List[Any] - ) -> Any: + def find_gdb_response(message: str, response_type: str, responses: List[Any]) -> Any: """ Helper function which extracts one response from an array of GDB responses, filtering by message and type. Returned message is a dictionary, refer to pygdbmi docs for the format.