diff mbox series

[AUH] PATCH 3/6] upgrade-helper.py: add changelog flag

Message ID 20260409090815.1731294-4-daniel.turull@ericsson.com
State New
Headers show
Series [AUH] PATCH 3/6] upgrade-helper.py: add changelog flag | expand

Commit Message

Daniel Turull April 9, 2026, 9:08 a.m. UTC
From: Daniel Turull <daniel.turull@ericsson.com>

It extracts changelog entries between the old and new versions from the
 upstream source tree, highlighting any CVEs.

New module changelog.py provides extract_changelog() with three strategies:
1. Parse standard changelog files (ChangeLog, NEWS, etc.), resolving
   RST .. include:: directives and stripping license/copyright blocks
2. Find per-version files (e.g. doc/changelog/changelog-9.18.46.rst)
   for projects like bind that use RST includes
3. Git log between version tags

The changelog text is appended to the git commit message and included
in upgrade notification emails. Extraction runs after devtool_upgrade
using the workspace source tree.

Disabled by default, use with --changelog

Assisted-by: Claude, Anthropic
Signed-off-by: Daniel Turull <daniel.turull@ericsson.com>
---
 modules/changelog.py     | 256 +++++++++++++++++++++++++++++++++++++++
 modules/steps.py         |  20 +++
 modules/utils/version.py | 160 ++++++++++++++++++++++++
 upgrade-helper.py        |  67 ++++++++++
 4 files changed, 503 insertions(+)
 create mode 100644 modules/changelog.py
 create mode 100644 modules/utils/version.py
diff mbox series

Patch

diff --git a/modules/changelog.py b/modules/changelog.py
new file mode 100644
index 0000000..c1cc14b
--- /dev/null
+++ b/modules/changelog.py
@@ -0,0 +1,256 @@ 
+# SPDX-License-Identifier: GPL-2.0-or-later
+
+import os
+import re
+import glob
+import functools
+import subprocess
+
+from logging import info as I
+from logging import debug as D
+
+import bb.utils
+
+CHANGELOG_FILENAMES = [
+    'ChangeLog', 'CHANGELOG', 'CHANGELOG.md', 'CHANGELOG.txt',
+    'Changes', 'CHANGES', 'NEWS', 'NEWS.md', 'NEWS.txt',
+    'RELEASE_NOTES', 'RELEASE_NOTES.md', 'RELEASE-NOTES',
+    'HISTORY', 'HISTORY.md',
+    'debian/changelog',
+]
+
+CVE_PATTERN = re.compile(r'(CVE-\d{4}-\d{4,})', re.IGNORECASE)
+
+
+def _find_changelog_files(srcdir):
+    found = []
+    for name in CHANGELOG_FILENAMES:
+        for f in glob.glob(os.path.join(srcdir, '**', name), recursive=True):
+            if os.path.isfile(f) and f not in found:
+                found.append(f)
+    return found
+
+
+RST_INCLUDE_RE = re.compile(r'^\.\.\s+include::\s+(.+)$')
+RST_COMMENT_RE = re.compile(r'^\.\.\s*$|^\.\.\s')
+
+
+def _strip_rst_comments(text):
+    """Remove RST comment blocks (license headers etc.)."""
+    lines = text.split('\n')
+    result = []
+    in_comment = False
+    for line in lines:
+        if RST_COMMENT_RE.match(line):
+            in_comment = True
+            continue
+        if in_comment:
+            if line.startswith('   ') or line.strip() == '':
+                continue
+            in_comment = False
+        result.append(line)
+    return '\n'.join(result)
+
+
+def _resolve_rst_includes(content, base_dir, srcdir):
+    """Inline RST .. include:: directives."""
+    lines = content.split('\n')
+    result = []
+    for line in lines:
+        m = RST_INCLUDE_RE.match(line.strip())
+        if m:
+            inc_rel = m.group(1).strip()
+            inc_path = os.path.normpath(os.path.join(base_dir, inc_rel))
+            if not os.path.isfile(inc_path):
+                # Search for the filename within the source tree
+                fname = os.path.basename(inc_rel)
+                for f in glob.glob(os.path.join(srcdir, '**', fname),
+                                   recursive=True):
+                    if os.path.isfile(f):
+                        inc_path = f
+                        break
+            if os.path.isfile(inc_path):
+                try:
+                    with open(inc_path, 'r', encoding='utf-8',
+                              errors='replace') as f:
+                        result.append(f.read())
+                    continue
+                except OSError:
+                    pass
+        result.append(line)
+    return '\n'.join(result)
+
+
+GIT_LOG_RE = re.compile(r'^commit [0-9a-f]{7,}', re.MULTILINE)
+
+
+def _condense_git_log(text):
+    """Condense git-log-style content to subject lines only."""
+    if not GIT_LOG_RE.search(text):
+        return text
+    lines = text.split('\n')
+    subjects = []
+    i = 0
+    while i < len(lines):
+        if GIT_LOG_RE.match(lines[i]):
+            short = lines[i].split()[1][:12]
+            # Skip Author/Date, blank line, then grab subject
+            i += 1
+            while i < len(lines) and (lines[i].startswith('Author:') or
+                    lines[i].startswith('Date:') or not lines[i].strip()):
+                i += 1
+            if i < len(lines):
+                subjects.append('%s %s' % (short, lines[i].strip()))
+            continue
+        i += 1
+    return '\n'.join(subjects) if subjects else text
+
+
+def _extract_entries_between_versions(content, old_ver, new_ver):
+    lines = content.split('\n')
+    new_pattern = re.compile(re.escape(new_ver))
+    old_pattern = re.compile(re.escape(old_ver))
+
+    start_idx = None
+    end_idx = None
+
+    for i, line in enumerate(lines):
+        if start_idx is None and new_pattern.search(line):
+            start_idx = i
+        elif start_idx is not None and old_pattern.search(line):
+            end_idx = i
+            break
+
+    if start_idx is not None:
+        return '\n'.join(lines[start_idx:end_idx])
+    return None
+
+
+def _find_per_version_files(srcdir, old_ver, new_ver):
+    """Find individual per-version changelog files (e.g. changelog-1.2.3.rst)."""
+    ver_file_re = re.compile(
+        r'(?:changelog|changes|news|release|relnotes)[-_./\\]?'
+        r'v?(?P<pver>(\d+[\.\-_])*\d+)\.\w+$',
+        re.IGNORECASE)
+
+    candidates = {}
+    for f in glob.glob(os.path.join(srcdir, '**', '*'), recursive=True):
+        if not os.path.isfile(f):
+            continue
+        # Match against last two path components (e.g. RelNotes/v1.47.4.txt)
+        tail = os.sep.join(f.rsplit(os.sep, 2)[-2:])
+        m = ver_file_re.search(tail)
+        if m:
+            ver = m.group('pver').replace('_', '.').replace('-', '.')
+            candidates[ver] = f
+
+    # Select versions > old_ver and <= new_ver
+    selected = []
+    for ver, path in candidates.items():
+        if bb.utils.vercmp_string(ver, old_ver) > 0 and \
+           bb.utils.vercmp_string(ver, new_ver) <= 0:
+            selected.append((ver, path))
+
+    selected.sort(key=lambda x: functools.cmp_to_key(
+        bb.utils.vercmp_string)(x[0]))
+    return [path for _, path in selected]
+
+
+def extract_changelog(srcdir, pn, old_ver, new_ver, workdir):
+    if not srcdir or not os.path.isdir(srcdir):
+        D(" %s: source directory %s not available" % (pn, srcdir))
+        return None
+
+    entries = []
+    cves = []
+
+    # Strategy 1: extract sections between version markers in changelog files
+    changelog_files = _find_changelog_files(srcdir)
+    for fpath in changelog_files:
+        try:
+            with open(fpath, 'r', encoding='utf-8', errors='replace') as f:
+                content = f.read()
+        except OSError:
+            continue
+        content = _resolve_rst_includes(content, os.path.dirname(fpath), srcdir)
+        section = _extract_entries_between_versions(content, old_ver, new_ver)
+        if section:
+            section = _condense_git_log(section)
+            entries.append(section)
+            cves.extend(CVE_PATTERN.findall(section))
+            break
+
+    # Strategy 2: concatenate per-version files (e.g. changelog-9.18.42.rst)
+    if not entries:
+        ver_files = _find_per_version_files(srcdir, old_ver, new_ver)
+        for fpath in ver_files:
+            try:
+                with open(fpath, 'r', encoding='utf-8', errors='replace') as f:
+                    content = f.read()
+            except OSError:
+                continue
+            entries.append(content)
+            cves.extend(CVE_PATTERN.findall(content))
+
+    # Strategy 3: git log between version tags
+    if not entries and os.path.isdir(os.path.join(srcdir, '.git')):
+        tag_prefixes = ['v', '', pn + '-']
+        old_tag = new_tag = None
+        for prefix in tag_prefixes:
+            try:
+                subprocess.check_output(
+                    ['git', 'rev-parse', prefix + old_ver],
+                    cwd=srcdir, stderr=subprocess.DEVNULL)
+                subprocess.check_output(
+                    ['git', 'rev-parse', prefix + new_ver],
+                    cwd=srcdir, stderr=subprocess.DEVNULL)
+                old_tag, new_tag = prefix + old_ver, prefix + new_ver
+                break
+            except (subprocess.CalledProcessError, OSError):
+                continue
+        if old_tag and new_tag:
+            try:
+                out = subprocess.check_output(
+                    ['git', 'log', '--oneline', '%s..%s' % (old_tag, new_tag)],
+                    cwd=srcdir, stderr=subprocess.DEVNULL).decode('utf-8', errors='replace')
+                if out.strip():
+                    entries.append(out.strip())
+                    cves.extend(CVE_PATTERN.findall(out))
+            except (subprocess.CalledProcessError, OSError):
+                pass
+
+    if not entries:
+        D(" %s: no changelog entries found" % pn)
+        return None
+
+    I(" %s: found %d changelog entries" % (pn, len(entries)))
+
+    cves = sorted(set(cves))
+    text = "Changelog for %s: %s -> %s\n" % (pn, old_ver, new_ver)
+    text += "=" * 60 + "\n\n"
+    if cves:
+        text += "SECURITY FIXES / CVEs FOUND:\n"
+        for cve in cves:
+            text += "  - %s\n" % cve
+        text += "\n" + "-" * 60 + "\n\n"
+    text += CVE_PATTERN.sub(r'*** \1 ***', _strip_rst_comments('\n\n'.join(entries)))
+
+    # Collapse multiple blank lines into one
+    text = re.sub(r'\n{3,}', '\n\n', text)
+
+    changelog_path = os.path.join(workdir, "changelog-%s.txt" % pn)
+    with open(changelog_path, 'w', encoding='utf-8') as f:
+        f.write(text)
+
+    I(" %s: changelog saved to %s" % (pn, os.path.basename(changelog_path)))
+    if cves:
+        I(" %s: CVEs found: %s" % (pn, ', '.join(cves)))
+
+    commit_text = text
+    if len(commit_text) > 3000:
+        commit_text = commit_text[:3000] + "\n\n[... changelog truncated ...]\n"
+    # Sanitize for shell-safe git commit -m "..."
+    for ch in '"', '`', '$', '\\':
+        commit_text = commit_text.replace(ch, '')
+
+    return {'text': text, 'commit_text': commit_text, 'cves': cves, 'file': changelog_path}
diff --git a/modules/steps.py b/modules/steps.py
index b3ec61c..78fcbbe 100644
--- a/modules/steps.py
+++ b/modules/steps.py
@@ -29,6 +29,7 @@  from logging import warning as W
 
 from errors import Error, DevtoolError, CompilationError
 from buildhistory import BuildHistory
+from changelog import extract_changelog
 
 def load_env(devtool, bb, git, opts, group):
     group['workdir'] = os.path.join(group['base_dir'], group['name'])
@@ -150,10 +151,29 @@  def devtool_finish(devtool, bb, git, opts, group):
             pass
         raise e1
 
+def changelog_extract(devtool, bb, git, opts, group):
+    if not opts.get('changelog'):
+        return
+    for p in group['pkgs']:
+        # After devtool_upgrade, source is in workspace/sources/<pn>/
+        srcdir = os.path.join(os.environ.get('BUILDDIR', ''),
+                              'workspace', 'sources', p['PN'])
+        if not os.path.isdir(srcdir):
+            # Fallback: derive from env S, replacing old version
+            srcdir = p['env'].get('S', '')
+            if p['PV'] in srcdir:
+                srcdir = srcdir.replace(p['PV'], p['NPV'])
+        result = extract_changelog(srcdir, p['PN'], p['PV'],
+                                   p['NPV'], group['workdir'])
+        if result:
+            p['changelog'] = result
+            group['commit_msg'] += "\n\n" + result['commit_text']
+
 upgrade_steps = [
     (load_env, "Loading environment ..."),
     (buildhistory_init, None),
     (devtool_upgrade, "Running 'devtool upgrade' ..."),
+    (changelog_extract, "Extracting changelog ..."),
     (devtool_finish, "Running 'devtool finish' ..."),
     (compile, None),
 ]
diff --git a/modules/utils/version.py b/modules/utils/version.py
new file mode 100644
index 0000000..b40eb1c
--- /dev/null
+++ b/modules/utils/version.py
@@ -0,0 +1,160 @@ 
+# SPDX-License-Identifier: GPL-2.0-or-later
+#
+# Version utilities for --stable patch-only upgrades.
+#
+# get_all_upstream_versions() collects *all* available upstream versions so
+# that we can pick the highest patch-level release within the current stable
+# branch.  The existing bitbake/oe-core APIs (latest_versionstring,
+# get_recipe_upstream_version) only return the single highest version, so we
+# must re-implement the inner loops here.
+#
+# The HTTP path mirrors bb.fetch2.wget.Wget._check_latest_version() and the
+# git path mirrors bb.fetch2.git.Git.latest_versionstring(), both from
+# bitbake scarthgap.  We cannot modify oe-core on a stable release, hence
+# the duplication.
+
+import functools
+import re
+from logging import warning as W
+
+import bb.utils
+import bb.fetch2
+from bs4 import BeautifulSoup, SoupStrainer
+
+
+def _split_version(ver):
+    # Split on '.', '-', '_' and letter-digit boundaries (e.g. 10.0p2 -> [10, 0, p, 2])
+    parts = re.split(r'[\.\-_]', ver)
+    result = []
+    for p in parts:
+        result.extend(re.split(r'(?<=[a-zA-Z])(?=\d)|(?<=\d)(?=[a-zA-Z])', p))
+    return result
+
+
+def is_patch_update(current_ver, candidate_ver):
+    cur = _split_version(current_ver)
+    cand = _split_version(candidate_ver)
+    if len(cur) != len(cand) or len(cur) < 3:
+        return False
+    if cur[:-1] != cand[:-1]:
+        return False
+    try:
+        return int(cand[-1]) > int(cur[-1])
+    except ValueError:
+        return bb.utils.vercmp_string(candidate_ver, current_ver) > 0
+
+
+def _find_best_version(current_ver, all_versions, filter_fn):
+    candidates = [v for v in all_versions if filter_fn(current_ver, v)]
+    if not candidates:
+        return None
+    candidates.sort(
+        key=functools.cmp_to_key(bb.utils.vercmp_string), reverse=True
+    )
+    return candidates[0]
+
+
+def find_patch_version(current_ver, all_versions):
+    return _find_best_version(current_ver, all_versions, is_patch_update)
+
+
+def get_all_upstream_versions(rd):
+    """Get all upstream versions using the fetcher infrastructure.
+
+    Unlike ud.method.latest_versionstring() which returns only the highest
+    version, this collects every version so the caller can filter for
+    patch-only updates.
+    """
+    src_uris = rd.getVar('SRC_URI')
+    if not src_uris:
+        return []
+
+    src_uri = src_uris.split()[0]
+    ud = bb.fetch2.FetchData(src_uri, rd)
+
+    if ud.type == 'git':
+        return _get_git_versions(ud, rd)
+    return _get_http_versions(ud, rd)
+
+
+def _get_http_versions(ud, rd):
+    """Collect all upstream versions from an HTTP index page.
+
+    Adapted from bb.fetch2.wget.Wget._check_latest_version() and
+    ._init_regexes().  The upstream code only keeps the highest version;
+    we collect them all.  Uses BeautifulSoup to parse <a> tags, matching
+    the upstream behaviour.
+    """
+    try:
+        package = ud.path.split("/")[-1]
+
+        regex_uri = rd.getVar('UPSTREAM_CHECK_URI')
+        if not regex_uri:
+            path = ud.path.split(package)[0]
+            regex_uri = bb.fetch.encodeurl([ud.type, ud.host, path,
+                                            ud.user, ud.pswd, {}])
+
+        page = ud.method._fetch_index(regex_uri, ud, rd)
+        if not page:
+            return []
+
+        regex = rd.getVar('UPSTREAM_CHECK_REGEX')
+        if regex:
+            regex = re.compile(regex)
+        else:
+            regex = ud.method._init_regexes(package, ud, rd)
+        if not regex:
+            return []
+
+        # Parse HTML links, same as Wget._check_latest_version()
+        soup = BeautifulSoup(page, "html.parser",
+                             parse_only=SoupStrainer("a"))
+        if not soup:
+            return []
+
+        versions = set()
+        for link in soup.find_all('a', href=True):
+            for text in (link['href'], str(link)):
+                m = regex.search(text)
+                if m and 'pver' in m.groupdict() and m.group('pver'):
+                    versions.add(re.sub('_', '.', m.group('pver')))
+                    break
+        return list(versions)
+    except Exception as e:
+        W(" Failed to get HTTP versions: %s" % str(e))
+        return []
+
+
+def _get_git_versions(ud, rd):
+    """Collect all tagged versions from a git remote.
+
+    Adapted from bb.fetch2.git.Git.latest_versionstring().  The upstream
+    code only keeps the highest version; we collect them all.
+    """
+    try:
+        output = ud.method._lsremote(ud, rd, "refs/tags/*")
+    except (bb.fetch2.FetchError, bb.fetch2.NetworkAccess, OSError) as e:
+        W(" Failed to list remote tags: %s" % str(e))
+        return []
+
+    rev_tag_re = re.compile(r"([0-9a-f]{40})\s+refs/tags/(.*)")
+    pver_re = re.compile(
+        rd.getVar('UPSTREAM_CHECK_GITTAGREGEX')
+        or r"(?P<pver>([0-9][\.|_]?)+)"
+    )
+    nonrel_re = re.compile(r"(alpha|beta|rc|final)+")
+
+    versions = set()
+    for line in output.split("\n"):
+        if not line:
+            break
+        m = rev_tag_re.match(line)
+        if not m:
+            continue
+        tag = m.group(2)
+        if nonrel_re.search(tag):
+            continue
+        m = pver_re.search(tag)
+        if m:
+            versions.add(m.group('pver').replace("_", "."))
+    return list(versions)
diff --git a/upgrade-helper.py b/upgrade-helper.py
index 67ca54b..d023559 100755
--- a/upgrade-helper.py
+++ b/upgrade-helper.py
@@ -59,6 +59,7 @@  from utils.emailhandler import Email
 from statistics import Statistics
 from steps import upgrade_steps
 from testimage import TestImage
+from utils.version import is_patch_update, find_patch_version, get_all_upstream_versions
 
 if not os.getenv('BUILDDIR', False):
     E(" You must source oe-init-build-env before running this script!\n")
@@ -74,6 +75,7 @@  scriptpath.add_bitbake_lib_path()
 scriptpath.add_oe_lib_path()
 
 import oe.recipeutils
+import bb.tinfoil
 
 help_text = """Usage examples:
 * To upgrade xmodmap recipe to the latest available version:
@@ -95,6 +97,10 @@  def parse_cmdline():
 
     parser.add_argument("-t", "--to_version",
                         help="version to upgrade the recipe to")
+    parser.add_argument("--changelog", action="store_true", default=False,
+                        help="extract changelog between old and new versions, highlighting CVEs")
+    parser.add_argument("--stable", action="store_true", default=False,
+                        help="only upgrade to the next patch version within the stable branch (e.g. 1.2.3 -> 1.2.4)")
 
     parser.add_argument("-d", "--debug-level", type=int, default=4, choices=range(1, 6),
                         help="set the debug level: CRITICAL=1, ERROR=2, WARNING=3, INFO=4, DEBUG=5")
@@ -198,6 +204,7 @@  class Updater(object):
         self.opts['skip_compilation'] = self.args.skip_compilation
         self.opts['buildhistory'] = self._buildhistory_is_enabled()
         self.opts['testimage'] = self._testimage_is_enabled()
+        self.opts['changelog'] = self.args.changelog
 
     def _make_dirs(self, build_dir):
         self.uh_dir = os.path.join(build_dir, "upgrade-helper")
@@ -358,6 +365,19 @@  class Updater(object):
         if 'patch_file' in g and g['patch_file'] is not None:
             msg_body += next_steps_info % (os.path.basename(g['patch_file']))
 
+        # Add changelog summary if available
+        for pkg_ctx in g['pkgs']:
+            if 'changelog' in pkg_ctx:
+                cl = pkg_ctx['changelog']
+                msg_body += ("\n--- Changelog Summary for %s ---\n"
+                             % pkg_ctx['PN'])
+                if cl['cves']:
+                    msg_body += "\nSECURITY FIXES / CVEs:\n"
+                    for cve in cl['cves']:
+                        msg_body += "  - %s\n" % cve
+                    msg_body += "\n"
+                msg_body += cl['text'] + "\n"
+
         msg_body += mail_footer
 
         # Add possible attachments to email
@@ -682,6 +702,30 @@  class UniverseUpdater(Updater):
 
     def _get_packagegroups_to_upgrade(self, packages=None):
 
+        def _resolve_stable_version(pn, cur_ver, next_ver, tinfoil):
+            """Find the latest patch version within the current stable branch."""
+            if is_patch_update(cur_ver, next_ver):
+                return next_ver, None
+            I(" %s: latest version %s is not a patch update from %s,"
+              " searching for patch versions..." %
+              (pn, next_ver, cur_ver))
+            try:
+                rd = tinfoil.parse_recipe(pn)
+                if not rd:
+                    I(" %s: could not parse recipe, skipping" % pn)
+                    return None, None
+                all_versions = get_all_upstream_versions(rd)
+                ver = find_patch_version(cur_ver, all_versions)
+                if ver:
+                    I(" %s: found patch version %s" % (pn, ver))
+                    return ver, "N/A"
+                else:
+                    I(" %s: no suitable version available, skipping" % pn)
+                    return None, None
+            except Exception as e:
+                I(" %s: failed to search for versions: %s" % (pn, e))
+                return None, None
+
         # Prepare a single pkg dict data (or None is not upgradable) from recipeutils.get_recipe_upgrade_status data.
         def _get_pkg_to_upgrade(self, layer_name, pn, status, cur_ver, next_ver, maintainer, revision, no_upgrade_reason):
             pkg_to_upgrade = None
@@ -746,6 +790,29 @@  class UniverseUpdater(Updater):
                         upgrade_group.append(pkg_to_upgrade)
                 if upgrade_group:
                     upgrade_pkggroups.append(upgrade_group)
+
+        if self.args.stable and upgrade_pkggroups:
+            stable_tinfoil = bb.tinfoil.Tinfoil()
+            stable_tinfoil.prepare(config_only=False)
+            try:
+                filtered = []
+                for group in upgrade_pkggroups:
+                    filtered_group = []
+                    for pkg in group:
+                        stable_ver, stable_rev = _resolve_stable_version(
+                            pkg['pn'], pkg['cur_ver'], pkg['next_ver'],
+                            stable_tinfoil)
+                        if stable_ver is not None:
+                            pkg['next_ver'] = stable_ver
+                            if stable_rev is not None:
+                                pkg['revision'] = stable_rev
+                            filtered_group.append(pkg)
+                    if filtered_group:
+                        filtered.append(filtered_group)
+                upgrade_pkggroups = filtered
+            finally:
+                stable_tinfoil.shutdown()
+
         return upgrade_pkggroups
 
     def pkg_upgrade_handler(self, pkg_ctx):