diff mbox series

[04/11] patchtest: correctly abort --directory test

Message ID 20260514194207.1958325-5-tgamblin@baylibre.com
State New
Headers show
Series patchtest: improve testing coverage | expand

Commit Message

Trevor Gamblin May 14, 2026, 7:42 p.m. UTC
Currently, patchtest run with --directory responds to a CTRL+C press by
aborting only the current patch being tested, and moves onto the next.
Instead, this key press should stop the test entirely. Remove usage of
the unittest.installHandler() function (which intercepts the signal) and
handle it ourselves. Also make sure that the branch is properly reset
with git am --abort afterwards, and that the return code is properly set
in the sigint handler function.

AI-Generated: Uses Claude Code

Signed-off-by: Trevor Gamblin <tgamblin@baylibre.com>
---
 meta/lib/patchtest/patchtest_parser.py    |   2 +-
 meta/lib/patchtest/repo.py                |   4 +
 meta/lib/patchtest/tests/base.py          | 113 +++++++++-------------
 meta/lib/patchtest/tests/test_metadata.py |  86 ++++++++--------
 scripts/patchtest                         |  31 +++++-
 5 files changed, 119 insertions(+), 117 deletions(-)
diff mbox series

Patch

diff --git a/meta/lib/patchtest/patchtest_parser.py b/meta/lib/patchtest/patchtest_parser.py
index 2a11cb76c2..69bcb4b8e5 100644
--- a/meta/lib/patchtest/patchtest_parser.py
+++ b/meta/lib/patchtest/patchtest_parser.py
@@ -37,7 +37,7 @@  class PatchtestParser(object):
                             help='The patch to be tested')
 
         target_patch_group.add_argument('--directory', metavar='DIRECTORY', dest='patch_path',
-                            help='The directory containing patches to be tested')
+                            help='The directory containing patches to be tested. CTRL+C aborts the entire directory test.')
 
         parser.add_argument('--repodir', metavar='REPO',
                             default=default_repodir,
diff --git a/meta/lib/patchtest/repo.py b/meta/lib/patchtest/repo.py
index 29ce9062a4..cb68b32cdf 100644
--- a/meta/lib/patchtest/repo.py
+++ b/meta/lib/patchtest/repo.py
@@ -129,6 +129,10 @@  class PatchTestRepo(object):
             self._patchmerged = True
 
     def clean(self):
+        try:
+            self.repo.git.execute(['git', 'am', '--abort'])
+        except git.exc.GitCommandError:
+            pass
         self.repo.git.execute(['git', 'checkout', self.current_branch if self.current_branch else self.current_commit])
         self.repo.git.execute(['git', 'branch', '-D', self._workingbranch])
         self._patchmerged = False
diff --git a/meta/lib/patchtest/tests/base.py b/meta/lib/patchtest/tests/base.py
index 8263932dda..a7f5e79a54 100644
--- a/meta/lib/patchtest/tests/base.py
+++ b/meta/lib/patchtest/tests/base.py
@@ -133,18 +133,17 @@  class Metadata(Base):
     def setUpClassLocal(cls):
         cls.tinfoil = cls.setup_tinfoil()
 
-        # get info about added/modified/remove recipes
+        # get info about added/modified/removed recipes (as absolute paths)
         cls.added, cls.modified, cls.removed = cls.get_metadata_stats(cls.patchset)
 
     @classmethod
     def tearDownClassLocal(cls):
-        cls.tinfoil.shutdown()
+        if cls.tinfoil:
+            cls.tinfoil.shutdown()
 
     @classmethod
     def setup_tinfoil(cls, config_only=False):
-        """Initialize tinfoil api from bitbake"""
-
-        # import relevant libraries
+        """Initialize tinfoil api from bitbake, or return None if unavailable."""
         try:
             scripts_path = os.path.join(PatchtestParser.repodir, "scripts", "lib")
             if scripts_path not in sys.path:
@@ -153,21 +152,20 @@  class Metadata(Base):
             scriptpath.add_bitbake_lib_path()
             import bb.tinfoil
         except ImportError:
-            raise PatchtestOEError('Could not import tinfoil module')
+            logger.warn('patchtest: bitbake not available, metadata checks will use fallback parser')
+            return None
 
         orig_cwd = os.path.abspath(os.curdir)
-
-        # Load tinfoil
         tinfoil = None
         try:
             builddir = os.environ.get('BUILDDIR')
             if not builddir:
                 logger.warn('Bitbake environment not loaded?')
-                return tinfoil
+                return None
             os.chdir(builddir)
             tinfoil = bb.tinfoil.Tinfoil()
             tinfoil.prepare(config_only=config_only)
-        except bb.tinfoil.TinfoilUIException as te:
+        except bb.tinfoil.TinfoilUIException:
             if tinfoil:
                 tinfoil.shutdown()
             raise PatchtestOEError('Could not prepare properly tinfoil (TinfoilUIException)')
@@ -180,69 +178,48 @@  class Metadata(Base):
 
         return tinfoil
 
+    @staticmethod
+    def _find_pn(data, path):
+        """Find the PN from tinfoil recipe cache data for a given file path."""
+        pn = None
+        pn_native = None
+        for _path, _pn in data:
+            if path in _path:
+                if 'native' in _pn:
+                    pn_native = _pn
+                else:
+                    pn = _pn
+                    break
+        else:
+            if pn_native:
+                return pn_native
+            path_basename = path.split('_')[0]
+            for _path, _pn in data:
+                _path_basename = _path.split('_')[0]
+                if path_basename == _path_basename:
+                    pn = _pn
+        return pn
+
     @classmethod
-    def get_metadata_stats(cls, patchset):
-        """Get lists of added, modified and removed metadata files"""
+    def _getvar(cls, path, varname):
+        """Return a recipe variable value via tinfoil if available, else the fallback parser."""
+        if cls.tinfoil:
+            data = list(cls.tinfoil.cooker.recipecaches[''].pkg_fn.items())
+            pn = cls._find_pn(data, path)
+            rd = cls.tinfoil.parse_recipe(pn)
+            return rd.getVar(varname)
 
-        def find_pn(data, path):
-            """Find the PN from data"""
-            pn = None
-            pn_native = None
-            for _path, _pn in data:
-                if path in _path:
-                    if 'native' in _pn:
-                        # store the native PN but look for the non-native one first
-                        pn_native = _pn
-                    else:
-                        pn = _pn
-                        break
-            else:
-                # sent the native PN if found previously
-                if pn_native:
-                    return pn_native
-
-                # on renames (usually upgrades), we need to check (FILE) base names
-                # because the unidiff library does not provided the new filename, just the modified one
-                # and tinfoil datastore, once the patch is merged, will contain the new filename
-                path_basename = path.split('_')[0]
-                for _path, _pn in data:
-                    _path_basename = _path.split('_')[0]
-                    if path_basename == _path_basename:
-                        pn = _pn
-            return pn
-
-        if not cls.tinfoil:
-            cls.tinfoil = cls.setup_tinfoil()
-
-        added_paths, modified_paths, removed_paths = [], [], []
+    @classmethod
+    def get_metadata_stats(cls, patchset):
+        """Return lists of absolute paths for added, modified and removed recipe files."""
         added, modified, removed = [], [], []
-
-        # get metadata filename additions, modification and removals
         for patch in patchset:
             if patch.path.endswith('.bb') or patch.path.endswith('.bbappend') or patch.path.endswith('.inc'):
+                abspath = os.path.join(os.path.abspath(PatchtestParser.repodir), patch.path)
                 if patch.is_added_file:
-                    added_paths.append(
-                        os.path.join(
-                            os.path.abspath(PatchtestParser.repodir), patch.path
-                        )
-                    )
+                    added.append(abspath)
                 elif patch.is_modified_file:
-                    modified_paths.append(
-                        os.path.join(
-                            os.path.abspath(PatchtestParser.repodir), patch.path
-                        )
-                    )
+                    modified.append(abspath)
                 elif patch.is_removed_file:
-                    removed_paths.append(
-                        os.path.join(
-                            os.path.abspath(PatchtestParser.repodir), patch.path
-                        )
-                    )
-
-        data = cls.tinfoil.cooker.recipecaches[''].pkg_fn.items()
-
-        added = [find_pn(data,path) for path in added_paths]
-        modified = [find_pn(data,path) for path in modified_paths]
-        removed = [find_pn(data,path) for path in removed_paths]
-
-        return [a for a in added if a], [m for m in modified if m], [r for r in removed if r]
+                    removed.append(abspath)
+        return added, modified, removed
diff --git a/meta/lib/patchtest/tests/test_metadata.py b/meta/lib/patchtest/tests/test_metadata.py
index 442e80973d..8c77921af5 100644
--- a/meta/lib/patchtest/tests/test_metadata.py
+++ b/meta/lib/patchtest/tests/test_metadata.py
@@ -19,34 +19,32 @@  class TestMetadata(base.Metadata):
         if not self.added:
             self.skip('No added recipes, skipping test')
 
-        # TODO: this is a workaround so we can parse the recipe not
-        # containing the LICENSE var: add some default license instead
-        # of INVALID into auto.conf, then remove this line at the end
-        auto_conf = os.path.join(os.environ.get('BUILDDIR'), 'conf', 'auto.conf')
-        open_flag = 'w'
-        if os.path.exists(auto_conf):
-            open_flag = 'a'
-        with open(auto_conf, open_flag) as fd:
-            for pn in self.added:
-                fd.write('LICENSE ??= "%s"\n' % patchtest_patterns.invalid_license)
+        if self.tinfoil:
+            # workaround: set a default license so tinfoil can parse recipes
+            # that don't have LICENSE set yet
+            auto_conf = os.path.join(os.environ.get('BUILDDIR'), 'conf', 'auto.conf')
+            open_flag = 'w'
+            if os.path.exists(auto_conf):
+                open_flag = 'a'
+            with open(auto_conf, open_flag) as fd:
+                for path in self.added:
+                    fd.write('LICENSE ??= "%s"\n' % patchtest_patterns.invalid_license)
 
         no_license = False
-        for pn in self.added:
-            rd = self.tinfoil.parse_recipe(pn)
-            license = rd.getVar(patchtest_patterns.metadata_lic)
-            if license == patchtest_patterns.invalid_license:
+        for path in self.added:
+            license = self._getvar(path, patchtest_patterns.metadata_lic)
+            if not license or license == patchtest_patterns.invalid_license:
                 no_license = True
                 break
 
-        # remove auto.conf line or the file itself
-        if open_flag == 'w':
-            os.remove(auto_conf)
-        else:
-            fd = open(auto_conf, 'r')
-            lines = fd.readlines()
-            fd.close()
-            with open(auto_conf, 'w') as fd:
-                fd.write(''.join(lines[:-1]))
+        if self.tinfoil:
+            if open_flag == 'w':
+                os.remove(auto_conf)
+            else:
+                with open(auto_conf, 'r') as fd:
+                    lines = fd.readlines()
+                with open(auto_conf, 'w') as fd:
+                    fd.write(''.join(lines[:-1]))
 
         if no_license:
             self.fail('Recipe does not have the LICENSE field set.')
@@ -55,14 +53,13 @@  class TestMetadata(base.Metadata):
         if not self.added:
             self.skip('No added recipes, skipping test')
 
-        for pn in self.added:
-            rd = self.tinfoil.parse_recipe(pn)
-            pathname = rd.getVar('FILE')
+        for path in self.added:
+            pathname = self._getvar(path, 'FILE') or path
             # we are not interested in images
             if '/images/' in pathname:
                 continue
-            lic_files_chksum = rd.getVar(patchtest_patterns.metadata_chksum)
-            if rd.getVar(patchtest_patterns.license_var) == patchtest_patterns.closed:
+            lic_files_chksum = self._getvar(path, patchtest_patterns.metadata_chksum)
+            if self._getvar(path, patchtest_patterns.license_var) == patchtest_patterns.closed:
                 continue
             if not lic_files_chksum:
                 self.fail(
@@ -108,13 +105,13 @@  class TestMetadata(base.Metadata):
                         )
 
     def _collect_src_uri(self, key_prefix):
-        for pn in self.modified:
-            if 'core-image' in pn:
+        for path in self.modified:
+            if 'core-image' in os.path.basename(path):
                 continue
-            rd = self.tinfoil.parse_recipe(pn)
+            src_uri = self._getvar(path, patchtest_patterns.metadata_src_uri)
             PatchTestDataStore[
-                "%s-%s-%s" % (key_prefix, patchtest_patterns.metadata_src_uri, pn)
-            ] = rd.getVar(patchtest_patterns.metadata_src_uri)
+                "%s-%s-%s" % (key_prefix, patchtest_patterns.metadata_src_uri, path)
+            ] = src_uri or ''
 
     def pretest_src_uri_left_files(self):
         if not PatchtestParser.repo.canbemerged():
@@ -130,12 +127,12 @@  class TestMetadata(base.Metadata):
             self.skip('No modified recipes, skipping test')
         self._collect_src_uri(self.shortid())
 
-        for pn in self.modified:
+        for path in self.modified:
             pretest_src_uri = PatchTestDataStore[
-                "pre%s-%s-%s" % (self.shortid(), patchtest_patterns.metadata_src_uri, pn)
+                "pre%s-%s-%s" % (self.shortid(), patchtest_patterns.metadata_src_uri, path)
             ].split()
             test_src_uri = PatchTestDataStore[
-                "%s-%s-%s" % (self.shortid(), patchtest_patterns.metadata_src_uri, pn)
+                "%s-%s-%s" % (self.shortid(), patchtest_patterns.metadata_src_uri, path)
             ].split()
 
             pretest_files = set([os.path.basename(patch.split(';')[0]) for patch in pretest_src_uri if patch.startswith('file://')])
@@ -154,7 +151,7 @@  class TestMetadata(base.Metadata):
                 filesremoved_from_usr_uri = pretest_files - test_files
 
                 # finally, get those patches removed at SRC_URI and not removed from the patchset
-                # TODO: we are not taking into account  renames, so test may raise false positives
+                # TODO: we are not taking into account renames, so test may raise false positives
                 not_removed = filesremoved_from_usr_uri - filesremoved_from_patchset
                 if not_removed:
                     self.fail('Patches not removed from tree. Remove them and amend the submitted mbox',
@@ -164,15 +161,15 @@  class TestMetadata(base.Metadata):
         if not self.added:
             self.skip('No added recipes, skipping test')
 
-        for pn in self.added:
+        for path in self.added:
+            pn = os.path.basename(path).split('_')[0]
             # we are not interested in images
             if 'core-image' in pn:
                 continue
-            rd = self.tinfoil.parse_recipe(pn)
-            summary = rd.getVar(patchtest_patterns.metadata_summary)
+            summary = self._getvar(path, patchtest_patterns.metadata_summary)
 
             # "${PN} version ${PN}-${PR}" is the default, so fail if default
-            if summary.startswith("%s version" % pn):
+            if not summary or summary.startswith("%s version" % pn):
                 self.fail(
                     "%s is missing in newly added recipe" % patchtest_patterns.metadata_summary
                 )
@@ -186,12 +183,11 @@  class TestMetadata(base.Metadata):
             or PatchtestParser.repo.patch.branch == "dunfell"
         ):
             self.skip("No modified recipes or older target branch, skipping test")
-        for pn in self.modified:
+        for path in self.modified:
             # we are not interested in images
-            if 'core-image' in pn:
+            if 'core-image' in os.path.basename(path):
                 continue
-            rd = self.tinfoil.parse_recipe(pn)
-            cve_check_ignore = rd.getVar(patchtest_patterns.cve_check_ignore_var)
+            cve_check_ignore = self._getvar(path, patchtest_patterns.cve_check_ignore_var)
 
             if cve_check_ignore is not None:
                 self.fail(
diff --git a/scripts/patchtest b/scripts/patchtest
index e8ace03905..435610b54f 100755
--- a/scripts/patchtest
+++ b/scripts/patchtest
@@ -12,6 +12,7 @@ 
 import json
 import logging
 import os
+import signal
 import subprocess
 import sys
 import traceback
@@ -133,8 +134,6 @@  def _runner(resultklass, prefix=None):
 
 def run(patch, logfile=None):
     """ Load, setup and run pre and post-merge tests """
-    unittest.installHandler()
-
     premerge_result = _runner(make_result_class(patch, False, logfile), 'pretest')
     postmerge_result = _runner(make_result_class(patch, True, logfile), 'test')
 
@@ -183,10 +182,26 @@  def main():
     else:
         patch_list = [patch_path]
 
+    interrupted = False
+    previous_sigint = signal.getsignal(signal.SIGINT)
+
+    def _sigint_handler(signum, frame):
+        nonlocal interrupted
+        interrupted = True
+        # Restore previous handler so a second CTRL+C exits immediately
+        signal.signal(signal.SIGINT, previous_sigint)
+        signal.default_int_handler(signum, frame)
+
+    signal.signal(signal.SIGINT, _sigint_handler)
+
     ret = 0
     for patch in patch_list:
+        if interrupted:
+            break
+
         if os.path.getsize(patch) == 0:
             logger.error('patchtest: patch is empty')
+            signal.signal(signal.SIGINT, previous_sigint)
             return 1
 
         logger.info('Testing patch %s' % patch)
@@ -197,8 +212,18 @@  def main():
             with open(log_path, "a") as f:
                 f.write("Patchtest results for patch '%s':\n\n" % patch)
 
-        ret = run(patch, log_path)
+        try:
+            result = run(patch, log_path)
+            ret = ret or result
+        except KeyboardInterrupt:
+            interrupted = True
+
+        if interrupted:
+            logger.error('\npatchtest: interrupted')
+            signal.signal(signal.SIGINT, previous_sigint)
+            return 1
 
+    signal.signal(signal.SIGINT, previous_sigint)
     return ret
 
 if __name__ == '__main__':