diff mbox series

[v6,10/12] classes/create-spdx-3.0: Move tasks to library

Message ID 20240712160304.3514496-11-JPEWhacker@gmail.com
State Accepted, archived
Commit aed6f8c1c2e291bde4d7172742790fa535b2fc7d
Headers show
Series Add SPDX 3.0 support | expand

Commit Message

Joshua Watt July 12, 2024, 3:58 p.m. UTC
Move the bulk of the python code in the SPDX 3.0 classes into a library
file

Signed-off-by: Joshua Watt <JPEWhacker@gmail.com>
---
 meta/classes/create-spdx-3.0.bbclass       |  874 +-------------
 meta/classes/create-spdx-image-3.0.bbclass |  307 +----
 meta/lib/oe/spdx30_tasks.py                | 1229 ++++++++++++++++++++
 3 files changed, 1256 insertions(+), 1154 deletions(-)
 create mode 100644 meta/lib/oe/spdx30_tasks.py
diff mbox series

Patch

diff --git a/meta/classes/create-spdx-3.0.bbclass b/meta/classes/create-spdx-3.0.bbclass
index a930ea81152..41840d9d1a3 100644
--- a/meta/classes/create-spdx-3.0.bbclass
+++ b/meta/classes/create-spdx-3.0.bbclass
@@ -116,698 +116,15 @@  SPDX_PACKAGE_SUPPLIER[doc] = "The base variable name to describe the Agent who \
 
 IMAGE_CLASSES:append = " create-spdx-image-3.0"
 
-def set_timestamp_now(d, o, prop):
-    from datetime import datetime, timezone
+oe.spdx30_tasks.set_timestamp_now[vardepsexclude] = "SPDX_INCLUDE_TIMESTAMPS"
+oe.spdx30_tasks.get_package_sources_from_debug[vardepsexclude] += "STAGING_KERNEL_DIR"
+oe.spdx30_tasks.collect_dep_objsets[vardepsexclude] = "SSTATE_ARCHS"
 
-    if d.getVar("SPDX_INCLUDE_TIMESTAMPS") == "1":
-        setattr(o, prop, datetime.now(timezone.utc))
-    else:
-        # Doing this helps to validated that the property actually exists, and
-        # also that it is not mandatory
-        delattr(o, prop)
-
-set_timestamp_now[vardepsexclude] = "SPDX_INCLUDE_TIMESTAMPS"
-
-def add_license_expression(d, objset, license_expression):
-    from pathlib import Path
-    import oe.spdx30
-    import oe.sbom30
-
-    license_data = d.getVar("SPDX_LICENSE_DATA")
-    simple_license_text = {}
-    license_text_map = {}
-    license_ref_idx = 0
-
-    def add_license_text(name):
-        nonlocal objset
-        nonlocal simple_license_text
-
-        if name in simple_license_text:
-            return simple_license_text[name]
-
-        lic = objset.find_filter(
-            oe.spdx30.simplelicensing_SimpleLicensingText,
-            name=name,
-        )
-
-        if lic is not None:
-            simple_license_text[name] = lic
-            return lic
-
-        lic = objset.add(oe.spdx30.simplelicensing_SimpleLicensingText(
-            _id=objset.new_spdxid("license-text", name),
-            creationInfo=objset.doc.creationInfo,
-            name=name,
-        ))
-        simple_license_text[name] = lic
-
-        if name == "PD":
-            lic.simplelicensing_licenseText = "Software released to the public domain"
-            return lic
-
-        # Seach for the license in COMMON_LICENSE_DIR and LICENSE_PATH
-        for directory in [d.getVar('COMMON_LICENSE_DIR')] + (d.getVar('LICENSE_PATH') or '').split():
-            try:
-                with (Path(directory) / name).open(errors="replace") as f:
-                    lic.simplelicensing_licenseText = f.read()
-                    return lic
-
-            except FileNotFoundError:
-                pass
-
-        # If it's not SPDX or PD, then NO_GENERIC_LICENSE must be set
-        filename = d.getVarFlag('NO_GENERIC_LICENSE', name)
-        if filename:
-            filename = d.expand("${S}/" + filename)
-            with open(filename, errors="replace") as f:
-                lic.simplelicensing_licenseText = f.read()
-                return lic
-        else:
-            bb.fatal("Cannot find any text for license %s" % name)
-
-    def convert(l):
-        nonlocal license_text_map
-        nonlocal license_ref_idx
-
-        if l == "(" or l == ")":
-            return l
-
-        if l == "&":
-            return "AND"
-
-        if l == "|":
-            return "OR"
-
-        if l == "CLOSED":
-            return "NONE"
-
-        spdx_license = d.getVarFlag("SPDXLICENSEMAP", l) or l
-        if spdx_license in license_data["licenses"]:
-            return spdx_license
-
-        spdx_license = "LicenseRef-" + l
-        if spdx_license not in license_text_map:
-            license_text_map[spdx_license] = add_license_text(l)._id
-
-        return spdx_license
-
-    lic_split = license_expression.replace("(", " ( ").replace(")", " ) ").replace("|", " | ").replace("&", " & ").split()
-    spdx_license_expression = ' '.join(convert(l) for l in lic_split)
-
-    return objset.new_license_expression(spdx_license_expression, license_text_map)
-
-
-def add_package_files(d, objset, topdir, get_spdxid, get_purposes, *, archive=None, ignore_dirs=[], ignore_top_level_dirs=[]):
-    from pathlib import Path
-    import oe.spdx30
-    import oe.sbom30
-
-    source_date_epoch = d.getVar("SOURCE_DATE_EPOCH")
-    if source_date_epoch:
-        source_date_epoch = int(source_date_epoch)
-
-    spdx_files = set()
-
-    file_counter = 1
-    for subdir, dirs, files in os.walk(topdir):
-        dirs[:] = [d for d in dirs if d not in ignore_dirs]
-        if subdir == str(topdir):
-            dirs[:] = [d for d in dirs if d not in ignore_top_level_dirs]
-
-        for file in files:
-            filepath = Path(subdir) / file
-            if filepath.is_symlink() or not filepath.is_file():
-                continue
-
-            bb.debug(1, "Adding file %s to %s" % (filepath, objset.doc._id))
-
-            filename = str(filepath.relative_to(topdir))
-            file_purposes = get_purposes(filepath)
-
-            spdx_file = objset.new_file(
-                get_spdxid(file_counter),
-                filename,
-                filepath,
-                purposes=file_purposes,
-            )
-            spdx_files.add(spdx_file)
-
-            if oe.spdx30.software_SoftwarePurpose.source in file_purposes:
-                objset.scan_declared_licenses(spdx_file, filepath)
-
-            if archive is not None:
-                with filepath.open("rb") as f:
-                    info = archive.gettarinfo(fileobj=f)
-                    info.name = filename
-                    info.uid = 0
-                    info.gid = 0
-                    info.uname = "root"
-                    info.gname = "root"
-
-                    if source_date_epoch is not None and info.mtime > source_date_epoch:
-                        info.mtime = source_date_epoch
-
-                    archive.addfile(info, f)
-
-            file_counter += 1
-
-    return spdx_files
-
-
-def get_package_sources_from_debug(d, package, package_files, sources, source_hash_cache):
-    from pathlib import Path
-    import oe.packagedata
-
-    def file_path_match(file_path, pkg_file):
-        if file_path.lstrip("/") == pkg_file.name.lstrip("/"):
-            return True
-
-        for e in pkg_file.extension:
-            if isinstance(e, oe.sbom30.OEFileNameAliasExtension):
-                for a in e.aliases:
-                    if file_path.lstrip("/") == a.lstrip("/"):
-                        return True
-
-        return False
-
-    debug_search_paths = [
-        Path(d.getVar('PKGD')),
-        Path(d.getVar('STAGING_DIR_TARGET')),
-        Path(d.getVar('STAGING_DIR_NATIVE')),
-        Path(d.getVar('STAGING_KERNEL_DIR')),
-    ]
-
-    pkg_data = oe.packagedata.read_subpkgdata_extended(package, d)
-
-    if pkg_data is None:
-        return
-
-    dep_source_files = set()
-
-    for file_path, file_data in pkg_data["files_info"].items():
-        if not "debugsrc" in file_data:
-            continue
-
-        if not any(file_path_match(file_path, pkg_file) for pkg_file in package_files):
-            bb.fatal("No package file found for %s in %s; SPDX found: %s" % (str(file_path), package,
-                " ".join(p.name for p in package_files)))
-            continue
-
-        for debugsrc in file_data["debugsrc"]:
-            for search in debug_search_paths:
-                if debugsrc.startswith("/usr/src/kernel"):
-                    debugsrc_path = search / debugsrc.replace('/usr/src/kernel/', '')
-                else:
-                    debugsrc_path = search / debugsrc.lstrip("/")
-
-                if debugsrc_path in source_hash_cache:
-                    file_sha256 = source_hash_cache[debugsrc_path]
-                    if file_sha256 is None:
-                        continue
-                else:
-                    if not debugsrc_path.exists():
-                        source_hash_cache[debugsrc_path] = None
-                        continue
-
-                    file_sha256 = bb.utils.sha256_file(debugsrc_path)
-                    source_hash_cache[debugsrc_path] = file_sha256
-
-                if file_sha256 in sources:
-                    source_file = sources[file_sha256]
-                    dep_source_files.add(source_file)
-                else:
-                    bb.debug(1, "Debug source %s with SHA256 %s not found in any dependency" % (str(debugsrc_path), file_sha256))
-                break
-            else:
-                bb.debug(1, "Debug source %s not found" % debugsrc)
-
-    return dep_source_files
-
-get_package_sources_from_debug[vardepsexclude] += "STAGING_KERNEL_DIR"
-
-def collect_dep_objsets(d, build):
-    import json
-    from pathlib import Path
-    import oe.sbom30
-    import oe.spdx30
-    import oe.spdx_common
-
-    deps = oe.spdx_common.get_spdx_deps(d)
-
-    dep_objsets = []
-    dep_builds = set()
-
-    dep_build_spdxids = set()
-    for dep in deps:
-        bb.debug(1, "Fetching SPDX for dependency %s" % (dep.pn))
-        dep_build, dep_objset = oe.sbom30.find_root_obj_in_jsonld(d, "recipes", dep.pn, oe.spdx30.build_Build)
-        # If the dependency is part of the taskhash, return it to be linked
-        # against. Otherwise, it cannot be linked against because this recipe
-        # will not rebuilt if dependency changes
-        if dep.in_taskhash:
-            dep_objsets.append(dep_objset)
-
-        # The build _can_ be linked against (by alias)
-        dep_builds.add(dep_build)
-
-    return dep_objsets, dep_builds
-
-collect_dep_objsets[vardepsexclude] = "SSTATE_ARCHS"
-
-def collect_dep_sources(dep_objsets):
-    import oe.spdx30
-    import oe.sbom30
-
-    sources = {}
-    for objset in dep_objsets:
-        # Don't collect sources from native recipes as they
-        # match non-native sources also.
-        if objset.is_native():
-            continue
-
-        bb.debug(1, "Fetching Sources for dependency %s" % (objset.doc.name))
-
-        dep_build = objset.find_root(oe.spdx30.build_Build)
-        if not dep_build:
-            bb.fatal("Unable to find a build")
-
-        for e in objset.foreach_type(oe.spdx30.Relationship):
-            if dep_build is not e.from_:
-                continue
-
-            if e.relationshipType != oe.spdx30.RelationshipType.hasInputs:
-                continue
-
-            for to in e.to:
-                if not isinstance(to, oe.spdx30.software_File):
-                    continue
-
-                if to.software_primaryPurpose != oe.spdx30.software_SoftwarePurpose.source:
-                    continue
-
-                for v in to.verifiedUsing:
-                    if v.algorithm == oe.spdx30.HashAlgorithm.sha256:
-                        sources[v.hashValue] = to
-                        break
-                else:
-                    bb.fatal("No SHA256 found for %s in %s" % (to.name, objset.doc.name))
-
-    return sources
-
-def add_download_files(d, objset):
-    import oe.patch
-    import oe.spdx30
-    import os
-
-    inputs = set()
-
-    urls = d.getVar("SRC_URI").split()
-    fetch = bb.fetch2.Fetch(urls, d)
-
-    for download_idx, src_uri in enumerate(urls):
-        fd = fetch.ud[src_uri]
-
-        for name in fd.names:
-            file_name = os.path.basename(fetch.localpath(src_uri))
-            if oe.patch.patch_path(src_uri, fetch, '', expand=False):
-                primary_purpose = oe.spdx30.software_SoftwarePurpose.patch
-            else:
-                primary_purpose = oe.spdx30.software_SoftwarePurpose.source
-
-            if fd.type == "file":
-                if os.path.isdir(fd.localpath):
-                    walk_idx = 1
-                    for root, dirs, files in os.walk(fd.localpath):
-                        for f in files:
-                            f_path = os.path.join(root, f)
-                            if os.path.islink(f_path):
-                                # TODO: SPDX doesn't support symlinks yet
-                                continue
-
-                            file = objset.new_file(
-                                objset.new_spdxid("source", str(download_idx + 1), str(walk_idx)),
-                                os.path.join(file_name, os.path.relpath(f_path, fd.localpath)),
-                                f_path,
-                                purposes=[primary_purpose],
-                            )
-
-                            inputs.add(file)
-                            walk_idx += 1
-
-                else:
-                    file = objset.new_file(
-                        objset.new_spdxid("source", str(download_idx + 1)),
-                        file_name,
-                        fd.localpath,
-                        purposes=[primary_purpose],
-                    )
-                    inputs.add(file)
-
-            else:
-                uri = fd.type
-                proto = getattr(fd, "proto", None)
-                if proto is not None:
-                    uri = uri + "+" + proto
-                uri = uri + "://" + fd.host + fd.path
-
-                if fd.method.supports_srcrev():
-                    uri = uri + "@" + fd.revisions[name]
-
-                dl = objset.add(oe.spdx30.software_Package(
-                    _id=objset.new_spdxid("source", str(download_idx + 1)),
-                    creationInfo=objset.doc.creationInfo,
-                    name=file_name,
-                    software_primaryPurpose=primary_purpose,
-                    software_downloadLocation=uri,
-                ))
-
-                if fd.method.supports_checksum(fd):
-                    # TODO Need something better than hard coding this
-                    for checksum_id in ["sha256", "sha1"]:
-                        expected_checksum = getattr(fd, "%s_expected" % checksum_id, None)
-                        if expected_checksum is None:
-                            continue
-
-                        dl.verifiedUsing.append(
-                            oe.spdx30.Hash(
-                                algorithm=getattr(oe.spdx30.HashAlgorithm, checksum_id),
-                                hashValue=expected_checksum,
-                            )
-                        )
-
-                inputs.add(dl)
-
-    return inputs
-
-
-def set_purposes(d, element, *var_names, force_purposes=[]):
-    purposes = force_purposes[:]
-
-    for var_name in var_names:
-        val = d.getVar(var_name)
-        if val:
-            purposes.extend(val.split())
-            break
-
-    if not purposes:
-        bb.warn("No SPDX purposes found in %s" % " ".join(var_names))
-        return
-
-    element.software_primaryPurpose = getattr(oe.spdx30.software_SoftwarePurpose, purposes[0])
-    element.software_additionalPurpose = [getattr(oe.spdx30.software_SoftwarePurpose, p) for p in purposes[1:]]
 
 
 python do_create_spdx() {
-    import oe.sbom30
-    import oe.spdx30
-    import oe.spdx_common
-    from pathlib import Path
-    from contextlib import contextmanager
-    import oe.cve_check
-    from datetime import datetime
-
-    def set_var_field(var, obj, name, package=None):
-        val = None
-        if package:
-            val = d.getVar("%s:%s" % (var, package))
-
-        if not val:
-            val = d.getVar(var)
-
-        if val:
-            setattr(obj, name, val)
-
-    deploydir = Path(d.getVar("SPDXDEPLOY"))
-    deploy_dir_spdx = Path(d.getVar("DEPLOY_DIR_SPDX"))
-    spdx_workdir = Path(d.getVar("SPDXWORK"))
-    include_sources = d.getVar("SPDX_INCLUDE_SOURCES") == "1"
-    pkg_arch = d.getVar("SSTATE_PKGARCH")
-    is_native = bb.data.inherits_class("native", d) or bb.data.inherits_class("cross", d)
-    include_vex = d.getVar("SPDX_INCLUDE_VEX")
-    if not include_vex in ("none", "current", "all"):
-        bb.fatal("SPDX_INCLUDE_VEX must be one of 'none', 'current', 'all'")
-
-    build_objset = oe.sbom30.ObjectSet.new_objset(d, d.getVar("PN"))
-
-    build = build_objset.new_task_build("recipe", "recipe")
-    build_objset.doc.rootElement.append(build)
-
-    build_objset.set_is_native(is_native)
-
-    for var in (d.getVar('SPDX_CUSTOM_ANNOTATION_VARS') or "").split():
-        new_annotation(
-            d,
-            build_objset,
-            build,
-            "%s=%s" % (var, d.getVar(var)),
-            oe.spdx30.AnnotationType.other
-        )
-
-    build_inputs = set()
-
-    # Add CVEs
-    cve_by_status = {}
-    if include_vex != "none":
-        for cve in (d.getVarFlags("CVE_STATUS") or {}):
-            status, detail, description = oe.cve_check.decode_cve_status(d, cve)
-
-            # If this CVE is fixed upstream, skip it unless all CVEs are
-            # specified.
-            if include_vex != "all" and detail in ("fixed-version", "cpe-stable-backport"):
-                bb.debug(1, "Skipping %s since it is already fixed upstream" % cve)
-                continue
-
-            cve_by_status.setdefault(status, {})[cve] = (
-                build_objset.new_cve_vuln(cve),
-                detail,
-                description,
-            )
-
-    cpe_ids = oe.cve_check.get_cpe_ids(d.getVar("CVE_PRODUCT"), d.getVar("CVE_VERSION"))
-
-    source_files = add_download_files(d, build_objset)
-    build_inputs |= source_files
-
-    recipe_spdx_license = add_license_expression(d, build_objset, d.getVar("LICENSE"))
-    build_objset.new_relationship(
-        source_files,
-        oe.spdx30.RelationshipType.hasConcludedLicense,
-        [recipe_spdx_license],
-    )
-
-    if oe.spdx_common.process_sources(d) and include_sources:
-        bb.debug(1, "Adding source files to SPDX")
-        oe.spdx_common.get_patched_src(d)
-
-        build_inputs |= add_package_files(
-            d,
-            build_objset,
-            spdx_workdir,
-            lambda file_counter: build_objset.new_spdxid("sourcefile", str(file_counter)),
-            lambda filepath: [oe.spdx30.software_SoftwarePurpose.source],
-            ignore_dirs=[".git"],
-            ignore_top_level_dirs=["temp"],
-            archive=None,
-        )
-
-
-    dep_objsets, dep_builds = collect_dep_objsets(d, build)
-    if dep_builds:
-        build_objset.new_scoped_relationship(
-            [build],
-            oe.spdx30.RelationshipType.dependsOn,
-            oe.spdx30.LifecycleScopeType.build,
-            sorted(oe.sbom30.get_element_link_id(b) for b in dep_builds),
-        )
-
-    debug_source_ids = set()
-    source_hash_cache = {}
-
-    # Write out the package SPDX data now. It is not complete as we cannot
-    # write the runtime data, so write it to a staging area and a later task
-    # will write out the final collection
-
-    # TODO: Handle native recipe output
-    if not is_native:
-        bb.debug(1, "Collecting Dependency sources files")
-        sources = collect_dep_sources(dep_objsets)
-
-        bb.build.exec_func("read_subpackage_metadata", d)
-
-        pkgdest = Path(d.getVar("PKGDEST"))
-        for package in d.getVar("PACKAGES").split():
-            if not oe.packagedata.packaged(package, d):
-                continue
-
-            pkg_name = d.getVar("PKG:%s" % package) or package
-
-            bb.debug(1, "Creating SPDX for package %s" % pkg_name)
-
-            pkg_objset = oe.sbom30.ObjectSet.new_objset(d, pkg_name)
-
-            spdx_package = pkg_objset.add_root(oe.spdx30.software_Package(
-                _id=pkg_objset.new_spdxid("package", pkg_name),
-                creationInfo=pkg_objset.doc.creationInfo,
-                name=pkg_name,
-                software_packageVersion=d.getVar("PV"),
-            ))
-            set_timestamp_now(d, spdx_package, "builtTime")
-
-            set_purposes(
-                d,
-                spdx_package,
-                "SPDX_PACKAGE_ADDITIONAL_PURPOSE:%s" % package,
-                "SPDX_PACKAGE_ADDITIONAL_PURPOSE",
-                force_purposes=["install"],
-            )
-
-
-            supplier = build_objset.new_agent("SPDX_PACKAGE_SUPPLIER")
-            if supplier is not None:
-                spdx_package.supplier = supplier if isinstance(supplier, str) else supplier._id
-
-            set_var_field("HOMEPAGE", spdx_package, "software_homePage", package=package)
-            set_var_field("SUMMARY", spdx_package, "summary", package=package)
-            set_var_field("DESCRIPTION", spdx_package, "description", package=package)
-
-            pkg_objset.new_scoped_relationship(
-                [build._id],
-                oe.spdx30.RelationshipType.hasOutputs,
-                oe.spdx30.LifecycleScopeType.build,
-                [spdx_package],
-            )
-
-            for cpe_id in cpe_ids:
-                spdx_package.externalIdentifier.append(
-                    oe.spdx30.ExternalIdentifier(
-                        externalIdentifierType=oe.spdx30.ExternalIdentifierType.cpe23,
-                        identifier=cpe_id,
-                ))
-
-            # TODO: Generate a file for each actual IPK/DEB/RPM/TGZ file
-            # generated and link it to the package
-            #spdx_package_file = pkg_objset.add(oe.spdx30.software_File(
-            #    _id=pkg_objset.new_spdxid("distribution", pkg_name),
-            #    creationInfo=pkg_objset.doc.creationInfo,
-            #    name=pkg_name,
-            #    software_primaryPurpose=spdx_package.software_primaryPurpose,
-            #    software_additionalPurpose=spdx_package.software_additionalPurpose,
-            #))
-            #set_timestamp_now(d, spdx_package_file, "builtTime")
-
-            ## TODO add hashes
-            #pkg_objset.new_relationship(
-            #    [spdx_package],
-            #    oe.spdx30.RelationshipType.hasDistributionArtifact,
-            #    [spdx_package_file],
-            #)
-
-            # NOTE: licenses live in the recipe collection and are referenced
-            # by ID in the package collection(s). This helps reduce duplication
-            # (since a lot of packages will have the same license), and also
-            # prevents duplicate license SPDX IDs in the packages
-            package_license = d.getVar("LICENSE:%s" % package)
-            if package_license and package_license != d.getVar("LICENSE"):
-                package_spdx_license = add_license_expression(d, build_objset, package_license)
-            else:
-                package_spdx_license = recipe_spdx_license
-
-            pkg_objset.new_relationship(
-                [spdx_package],
-                oe.spdx30.RelationshipType.hasConcludedLicense,
-                [package_spdx_license._id],
-            )
-
-            # NOTE: CVE Elements live in the recipe collection
-            all_cves = set()
-            for status, cves in cve_by_status.items():
-                for cve, items in cves.items():
-                    spdx_cve, detail, description = items
-
-                    all_cves.add(spdx_cve._id)
-
-                    if status == "Patched":
-                        pkg_objset.new_vex_patched_relationship([spdx_cve._id], [spdx_package])
-                    elif status == "Unpatched":
-                        pkg_objset.new_vex_unpatched_relationship([spdx_cve._id], [spdx_package])
-                    elif status == "Ignored":
-                        spdx_vex = pkg_objset.new_vex_ignored_relationship(
-                            [spdx_cve._id],
-                            [spdx_package],
-                            impact_statement=description,
-                        )
-
-                        if detail in ("ignored", "cpe-incorrect", "disputed", "upstream-wontfix"):
-                            # VEX doesn't have justifications for this
-                            pass
-                        elif detail in ("not-applicable-config", "not-applicable-platform"):
-                            for v in spdx_vex:
-                                v.security_justificationType = oe.spdx30.security_VexJustificationType.vulnerableCodeNotPresent
-                        else:
-                            bb.fatal(f"Unknown detail '{detail}' for ignored {cve}")
-                    else:
-                        bb.fatal(f"Unknown CVE status {status}")
-
-            if all_cves:
-                pkg_objset.new_relationship(
-                    [spdx_package],
-                    oe.spdx30.RelationshipType.hasAssociatedVulnerability,
-                    sorted(list(all_cves)),
-                )
-
-            bb.debug(1, "Adding package files to SPDX for package %s" % pkg_name)
-            package_files = add_package_files(
-                d,
-                pkg_objset,
-                pkgdest / package,
-                lambda file_counter: pkg_objset.new_spdxid("package", pkg_name, "file", str(file_counter)),
-                # TODO: Can we know the purpose here?
-                lambda filepath: [],
-                ignore_top_level_dirs=['CONTROL', 'DEBIAN'],
-                archive=None,
-            )
-
-            if package_files:
-                pkg_objset.new_relationship(
-                    [spdx_package],
-                    oe.spdx30.RelationshipType.contains,
-                    sorted(list(package_files)),
-                )
-
-            if include_sources:
-                debug_sources = get_package_sources_from_debug(d, package, package_files, sources, source_hash_cache)
-                debug_source_ids |= set(oe.sbom30.get_element_link_id(d) for d in debug_sources)
-
-            oe.sbom30.write_recipe_jsonld_doc(d, pkg_objset, "packages-staging", deploydir, create_spdx_id_links=False)
-
-    if include_sources:
-        bb.debug(1, "Adding sysroot files to SPDX")
-        sysroot_files = add_package_files(
-            d,
-            build_objset,
-            d.expand("${COMPONENTS_DIR}/${PACKAGE_ARCH}/${PN}"),
-            lambda file_counter: build_objset.new_spdxid("sysroot", str(file_counter)),
-            lambda filepath: [],
-            archive=None,
-        )
-
-        if sysroot_files:
-            build_objset.new_scoped_relationship(
-                [build],
-                oe.spdx30.RelationshipType.hasOutputs,
-                oe.spdx30.LifecycleScopeType.build,
-                sorted(list(sysroot_files)),
-            )
-
-    if build_inputs or debug_source_ids:
-        build_objset.new_scoped_relationship(
-            [build],
-            oe.spdx30.RelationshipType.hasInputs,
-            oe.spdx30.LifecycleScopeType.build,
-            sorted(list(build_inputs)) + sorted(list(debug_source_ids)),
-        )
-
-    oe.sbom30.write_recipe_jsonld_doc(d, build_objset, "recipes", deploydir)
+    import oe.spdx30_tasks
+    oe.spdx30_tasks.create_spdx(d)
 }
 do_create_spdx[vardepsexclude] += "BB_NUMBER_THREADS"
 addtask do_create_spdx after \
@@ -844,101 +161,9 @@  do_create_spdx[cleandirs] = "${SPDXDEPLOY} ${SPDXWORK}"
 do_create_spdx[depends] += "${PATCHDEPENDENCY}"
 
 python do_create_package_spdx() {
-    import oe.sbom30
-    import oe.spdx30
-    import oe.spdx_common
-    import oe.packagedata
-    from pathlib import Path
-
-    deploy_dir_spdx = Path(d.getVar("DEPLOY_DIR_SPDX"))
-    deploydir = Path(d.getVar("SPDXRUNTIMEDEPLOY"))
-    is_native = bb.data.inherits_class("native", d) or bb.data.inherits_class("cross", d)
-
-    providers = oe.spdx_common.collect_package_providers(d)
-    pkg_arch = d.getVar("SSTATE_PKGARCH")
-
-    if not is_native:
-        bb.build.exec_func("read_subpackage_metadata", d)
-
-        dep_package_cache = {}
-
-        # Any element common to all packages that need to be referenced by ID
-        # should be written into this objset set
-        common_objset = oe.sbom30.ObjectSet.new_objset(d, "%s-package-common" % d.getVar("PN"))
-
-        pkgdest = Path(d.getVar("PKGDEST"))
-        for package in d.getVar("PACKAGES").split():
-            localdata = bb.data.createCopy(d)
-            pkg_name = d.getVar("PKG:%s" % package) or package
-            localdata.setVar("PKG", pkg_name)
-            localdata.setVar('OVERRIDES', d.getVar("OVERRIDES", False) + ":" + package)
-
-            if not oe.packagedata.packaged(package, localdata):
-                continue
-
-            spdx_package, pkg_objset = oe.sbom30.load_obj_in_jsonld(
-                d,
-                pkg_arch,
-                "packages-staging",
-                pkg_name,
-                oe.spdx30.software_Package,
-                software_primaryPurpose=oe.spdx30.software_SoftwarePurpose.install,
-            )
-
-            # We will write out a new collection, so link it to the new
-            # creation info in the common package data. The old creation info
-            # should still exist and be referenced by all the existing elements
-            # in the package
-            pkg_objset.creationInfo = pkg_objset.copy_creation_info(common_objset.doc.creationInfo)
-
-            runtime_spdx_deps = set()
-
-            deps = bb.utils.explode_dep_versions2(localdata.getVar("RDEPENDS") or "")
-            seen_deps = set()
-            for dep, _ in deps.items():
-                if dep in seen_deps:
-                    continue
-
-                if dep not in providers:
-                    continue
-
-                (dep, _) = providers[dep]
-
-                if not oe.packagedata.packaged(dep, localdata):
-                    continue
-
-                dep_pkg_data = oe.packagedata.read_subpkgdata_dict(dep, d)
-                dep_pkg = dep_pkg_data["PKG"]
-
-                if dep in dep_package_cache:
-                    dep_spdx_package = dep_package_cache[dep]
-                else:
-                    bb.debug(1, "Searching for %s" % dep_pkg)
-                    dep_spdx_package, _ = oe.sbom30.find_root_obj_in_jsonld(
-                        d,
-                        "packages-staging",
-                        dep_pkg,
-                        oe.spdx30.software_Package,
-                        software_primaryPurpose=oe.spdx30.software_SoftwarePurpose.install,
-                    )
-                    dep_package_cache[dep] = dep_spdx_package
-
-                runtime_spdx_deps.add(dep_spdx_package)
-                seen_deps.add(dep)
-
-            if runtime_spdx_deps:
-                pkg_objset.new_scoped_relationship(
-                    [spdx_package],
-                    oe.spdx30.RelationshipType.dependsOn,
-                    oe.spdx30.LifecycleScopeType.runtime,
-                    [oe.sbom30.get_element_link_id(dep) for dep in runtime_spdx_deps],
-                )
-
-            oe.sbom30.write_recipe_jsonld_doc(d, pkg_objset, "packages", deploydir)
-
-        oe.sbom30.write_recipe_jsonld_doc(d, common_objset, "common-package", deploydir)
+    import oe.spdx30_tasks
+    oe.spdx30_tasks.create_package_spdx(d)
 }
-
 do_create_package_spdx[vardepsexclude] += "OVERRIDES SSTATE_ARCHS"
 
 addtask do_create_package_spdx after do_create_spdx before do_build do_rm_work
@@ -955,91 +180,10 @@  do_create_package_spdx[dirs] = "${SPDXRUNTIMEDEPLOY}"
 do_create_package_spdx[cleandirs] = "${SPDXRUNTIMEDEPLOY}"
 do_create_package_spdx[rdeptask] = "do_create_spdx"
 
-
-
 python spdx30_build_started_handler () {
-    import oe.spdx30
-    import oe.sbom30
-    import oe.spdx_common
-    import os
-    from pathlib import Path
-    from datetime import datetime, timezone
-
-    # Create a copy of the datastore. Set PN to "bitbake" so that SPDX IDs can
-    # be generated
+    import oe.spdx30_tasks
     d = e.data.createCopy()
-    d.setVar("PN", "bitbake")
-    d.setVar("BB_TASKHASH", "bitbake")
-    oe.spdx_common.load_spdx_license_data(d)
-
-    deploy_dir_spdx = Path(e.data.getVar("DEPLOY_DIR_SPDX"))
-
-    objset = oe.sbom30.ObjectSet.new_objset(d, "bitbake", False)
-
-    host_import_key = d.getVar("SPDX_BUILD_HOST")
-    invoked_by = objset.new_agent("SPDX_INVOKED_BY", add=False)
-    on_behalf_of = objset.new_agent("SPDX_ON_BEHALF_OF", add=False)
-
-    if d.getVar("SPDX_INCLUDE_BITBAKE_PARENT_BUILD") == "1":
-        # Since the Build objects are unique, we may as well set the creation
-        # time to the current time instead of the fallback SDE
-        objset.doc.creationInfo.created = datetime.now(timezone.utc)
-
-        # Each invocation of bitbake should have a unique ID since it is a
-        # unique build
-        nonce = os.urandom(16).hex()
-
-        build = objset.add_root(oe.spdx30.build_Build(
-            _id=objset.new_spdxid(nonce, include_unihash=False),
-            creationInfo=objset.doc.creationInfo,
-            build_buildType=oe.sbom30.SPDX_BUILD_TYPE,
-        ))
-        set_timestamp_now(d, build, "build_buildStartTime")
-
-        if host_import_key:
-            objset.new_scoped_relationship(
-                [build],
-                oe.spdx30.RelationshipType.hasHost,
-                oe.spdx30.LifecycleScopeType.build,
-                [objset.new_import("SPDX_BUILD_HOST")],
-            )
-
-        if invoked_by:
-            objset.add(invoked_by)
-            invoked_by_spdx = objset.new_scoped_relationship(
-                [build],
-                oe.spdx30.RelationshipType.invokedBy,
-                oe.spdx30.LifecycleScopeType.build,
-                [invoked_by],
-            )
-
-            if on_behalf_of:
-                objset.add(on_behalf_of)
-                objset.new_scoped_relationship(
-                    [on_behalf_of],
-                    oe.spdx30.RelationshipType.delegatedTo,
-                    oe.spdx30.LifecycleScopeType.build,
-                    invoked_by_spdx,
-                )
-
-        elif on_behalf_of:
-            bb.warn("SPDX_ON_BEHALF_OF has no effect if SPDX_INVOKED_BY is not set")
-
-    else:
-        if host_import_key:
-            bb.warn("SPDX_BUILD_HOST has no effect if SPDX_INCLUDE_BITBAKE_PARENT_BUILD is not set")
-
-        if invoked_by:
-            bb.warn("SPDX_INVOKED_BY has no effect if SPDX_INCLUDE_BITBAKE_PARENT_BUILD is not set")
-
-        if on_behalf_of:
-            bb.warn("SPDX_ON_BEHALF_OF has no effect if SPDX_INCLUDE_BITBAKE_PARENT_BUILD is not set")
-
-    for obj in objset.foreach_type(oe.spdx30.Element):
-        obj.extension.append(oe.sbom30.OELinkExtension(link_spdx_id=False))
-        obj.extension.append(oe.sbom30.OEIdAliasExtension())
-
-    oe.sbom30.write_jsonld_doc(d, objset, deploy_dir_spdx / "bitbake.spdx.json")
+    oe.spdx30_tasks.write_bitbake_spdx(d)
 }
 
 addhandler spdx30_build_started_handler
diff --git a/meta/classes/create-spdx-image-3.0.bbclass b/meta/classes/create-spdx-image-3.0.bbclass
index 467719555d6..1cad8537d14 100644
--- a/meta/classes/create-spdx-image-3.0.bbclass
+++ b/meta/classes/create-spdx-image-3.0.bbclass
@@ -9,37 +9,6 @@  SPDX_ROOTFS_PACKAGES = "${SPDXDIR}/rootfs-packages.json"
 SPDXIMAGEDEPLOYDIR = "${SPDXDIR}/image-deploy"
 SPDXROOTFSDEPLOY = "${SPDXDIR}/rootfs-deploy"
 
-def collect_build_package_inputs(d, objset, build, packages):
-    import oe.spdx_common
-    providers = oe.spdx_common.collect_package_providers(d)
-
-    build_deps = set()
-
-    for name in sorted(packages.keys()):
-        if name not in providers:
-            bb.fatal("Unable to find SPDX provider for '%s'" % name)
-
-        pkg_name, pkg_hashfn = providers[name]
-
-        # Copy all of the package SPDX files into the Sbom elements
-        pkg_spdx, _ = oe.sbom30.find_root_obj_in_jsonld(
-            d,
-            "packages",
-            pkg_name,
-            oe.spdx30.software_Package,
-            software_primaryPurpose=oe.spdx30.software_SoftwarePurpose.install,
-        )
-        build_deps.add(pkg_spdx._id)
-
-    if build_deps:
-        objset.new_scoped_relationship(
-            [build],
-            oe.spdx30.RelationshipType.hasInputs,
-            oe.spdx30.LifecycleScopeType.build,
-            sorted(list(build_deps)),
-        )
-
-
 python spdx_collect_rootfs_packages() {
     import json
     from pathlib import Path
@@ -58,44 +27,8 @@  python spdx_collect_rootfs_packages() {
 ROOTFS_POSTUNINSTALL_COMMAND =+ "spdx_collect_rootfs_packages"
 
 python do_create_rootfs_spdx() {
-    import json
-    from pathlib import Path
-    import oe.spdx30
-    import oe.sbom30
-    from datetime import datetime
-
-    deploy_dir_spdx = Path(d.getVar("DEPLOY_DIR_SPDX"))
-    deploydir = Path(d.getVar("SPDXROOTFSDEPLOY"))
-    root_packages_file = Path(d.getVar("SPDX_ROOTFS_PACKAGES"))
-    image_basename = d.getVar("IMAGE_BASENAME")
-    machine = d.getVar("MACHINE")
-
-    with root_packages_file.open("r") as f:
-        packages = json.load(f)
-
-    objset = oe.sbom30.ObjectSet.new_objset(d, "%s-%s" % (image_basename, machine))
-
-    rootfs = objset.add_root(oe.spdx30.software_Package(
-        _id=objset.new_spdxid("rootfs", image_basename),
-        creationInfo=objset.doc.creationInfo,
-        name=image_basename,
-        software_primaryPurpose=oe.spdx30.software_SoftwarePurpose.archive,
-    ))
-    set_timestamp_now(d, rootfs, "builtTime")
-
-    rootfs_build = objset.add_root(objset.new_task_build("rootfs", "rootfs"))
-    set_timestamp_now(d, rootfs_build, "build_buildEndTime")
-
-    objset.new_scoped_relationship(
-        [rootfs_build],
-        oe.spdx30.RelationshipType.hasOutputs,
-        oe.spdx30.LifecycleScopeType.build,
-        [rootfs],
-    )
-
-    collect_build_package_inputs(d, objset, rootfs_build, packages)
-
-    oe.sbom30.write_recipe_jsonld_doc(d, objset, "rootfs", deploydir)
+    import oe.spdx30_tasks
+    oe.spdx30_tasks.create_rootfs_spdx(d)
 }
 addtask do_create_rootfs_spdx after do_rootfs before do_image
 SSTATETASKS += "do_create_rootfs_spdx"
@@ -110,79 +43,8 @@  python do_create_rootfs_spdx_setscene() {
 addtask do_create_rootfs_spdx_setscene
 
 python do_create_image_spdx() {
-    import oe.spdx30
-    import oe.sbom30
-    import json
-    from pathlib import Path
-
-    image_deploy_dir = Path(d.getVar('IMGDEPLOYDIR'))
-    manifest_path = Path(d.getVar("IMAGE_OUTPUT_MANIFEST"))
-    spdx_work_dir = Path(d.getVar('SPDXIMAGEWORK'))
-
-    image_basename = d.getVar('IMAGE_BASENAME')
-    machine = d.getVar("MACHINE")
-
-    objset = oe.sbom30.ObjectSet.new_objset(d, "%s-%s" % (image_basename, machine))
-
-    with manifest_path.open("r") as f:
-        manifest = json.load(f)
-
-    builds = []
-    for task in manifest:
-        imagetype = task["imagetype"]
-        taskname = task["taskname"]
-
-        image_build = objset.add_root(objset.new_task_build(taskname, "image/%s" % imagetype))
-        set_timestamp_now(d, image_build, "build_buildEndTime")
-        builds.append(image_build)
-
-        artifacts = []
-
-        for image in task["images"]:
-            image_filename = image["filename"]
-            image_path = image_deploy_dir / image_filename
-            a = objset.add_root(oe.spdx30.software_File(
-                _id=objset.new_spdxid("image", image_filename),
-                creationInfo=objset.doc.creationInfo,
-                name=image_filename,
-                verifiedUsing=[
-                    oe.spdx30.Hash(
-                        algorithm=oe.spdx30.HashAlgorithm.sha256,
-                        hashValue=bb.utils.sha256_file(image_path),
-                    )
-                ]
-            ))
-            set_purposes(d, a, "SPDX_IMAGE_PURPOSE:%s" % imagetype, "SPDX_IMAGE_PURPOSE")
-            set_timestamp_now(d, a, "builtTime")
-
-            artifacts.append(a)
-
-        if artifacts:
-            objset.new_scoped_relationship(
-                [image_build],
-                oe.spdx30.RelationshipType.hasOutputs,
-                oe.spdx30.LifecycleScopeType.build,
-                artifacts,
-            )
-
-    if builds:
-        rootfs_image, _ = oe.sbom30.find_root_obj_in_jsonld(
-            d,
-            "rootfs",
-            "%s-%s" % (image_basename, machine),
-            oe.spdx30.software_Package,
-            # TODO: Should use a purpose to filter here?
-        )
-        objset.new_scoped_relationship(
-            builds,
-            oe.spdx30.RelationshipType.hasInputs,
-            oe.spdx30.LifecycleScopeType.build,
-            [rootfs_image._id],
-        )
-
-    objset.add_aliases()
-    objset.link()
-    oe.sbom30.write_recipe_jsonld_doc(d, objset, "image", spdx_work_dir)
+    import oe.spdx30_tasks
+    oe.spdx30_tasks.create_image_spdx(d)
 }
 addtask do_create_image_spdx after do_image_complete do_create_rootfs_spdx before do_build
 SSTATETASKS += "do_create_image_spdx"
@@ -199,46 +61,8 @@  addtask do_create_image_spdx_setscene
 
 
 python do_create_image_sbom_spdx() {
-    import os
-    from pathlib import Path
-    import oe.spdx30
-    import oe.sbom30
-
-    image_name = d.getVar("IMAGE_NAME")
-    image_basename = d.getVar("IMAGE_BASENAME")
-    image_link_name = d.getVar("IMAGE_LINK_NAME")
-    imgdeploydir = Path(d.getVar("SPDXIMAGEDEPLOYDIR"))
-    machine = d.getVar("MACHINE")
-
-    spdx_path = imgdeploydir / (image_name + ".spdx.json")
-
-    root_elements = []
-
-    # TODO: Do we need to add the rootfs or are the image files sufficient?
-    rootfs_image, _ = oe.sbom30.find_root_obj_in_jsonld(
-        d,
-        "rootfs",
-        "%s-%s" % (image_basename, machine),
-        oe.spdx30.software_Package,
-        # TODO: Should use a purpose here?
-    )
-    root_elements.append(rootfs_image._id)
-
-    image_objset, _ = oe.sbom30.find_jsonld(d, "image", "%s-%s" % (image_basename, machine), required=True)
-    for o in image_objset.foreach_root(oe.spdx30.software_File):
-        root_elements.append(o._id)
-
-    objset, sbom = oe.sbom30.create_sbom(d, image_name, root_elements)
-
-    oe.sbom30.write_jsonld_doc(d, objset, spdx_path)
-
-    def make_image_link(target_path, suffix):
-        if image_link_name:
-            link = imgdeploydir / (image_link_name + suffix)
-            if link != target_path:
-                link.symlink_to(os.path.relpath(target_path, link.parent))
-
-    make_image_link(spdx_path, ".spdx.json")
+    import oe.spdx30_tasks
+    oe.spdx30_tasks.create_image_sbom_spdx(d)
 }
 addtask do_create_image_sbom_spdx after do_create_rootfs_spdx do_create_image_spdx before do_build
 SSTATETASKS += "do_create_image_sbom_spdx"
@@ -268,149 +92,54 @@  POPULATE_SDK_POST_TARGET_COMMAND:append:task-populate-sdk-ext = " sdk_ext_target
 
 python sdk_host_create_spdx() {
     from pathlib import Path
+    import oe.spdx30_tasks
     spdx_work_dir = Path(d.getVar('SPDXSDKWORK'))
 
-    sdk_create_spdx(d, "host", spdx_work_dir, d.getVar("TOOLCHAIN_OUTPUTNAME"))
+    oe.spdx30_tasks.sdk_create_spdx(d, "host", spdx_work_dir, d.getVar("TOOLCHAIN_OUTPUTNAME"))
 }
 
 python sdk_target_create_spdx() {
     from pathlib import Path
+    import oe.spdx30_tasks
     spdx_work_dir = Path(d.getVar('SPDXSDKWORK'))
 
-    sdk_create_spdx(d, "target", spdx_work_dir, d.getVar("TOOLCHAIN_OUTPUTNAME"))
+    oe.spdx30_tasks.sdk_create_spdx(d, "target", spdx_work_dir, d.getVar("TOOLCHAIN_OUTPUTNAME"))
 }
 
 python sdk_ext_host_create_spdx() {
     from pathlib import Path
+    import oe.spdx30_tasks
     spdx_work_dir = Path(d.getVar('SPDXSDKEXTWORK'))
 
     # TODO: This doesn't seem to work
-    sdk_create_spdx(d, "host", spdx_work_dir, d.getVar("TOOLCHAINEXT_OUTPUTNAME"))
+    oe.spdx30_tasks.sdk_create_spdx(d, "host", spdx_work_dir, d.getVar("TOOLCHAINEXT_OUTPUTNAME"))
 }
 
 python sdk_ext_target_create_spdx() {
     from pathlib import Path
+    import oe.spdx30_tasks
     spdx_work_dir = Path(d.getVar('SPDXSDKEXTWORK'))
 
     # TODO: This doesn't seem to work
-    sdk_create_spdx(d, "target", spdx_work_dir, d.getVar("TOOLCHAINEXT_OUTPUTNAME"))
+    oe.spdx30_tasks.sdk_create_spdx(d, "target", spdx_work_dir, d.getVar("TOOLCHAINEXT_OUTPUTNAME"))
 }
 
-def sdk_create_spdx(d, sdk_type, spdx_work_dir, toolchain_outputname):
-    from pathlib import Path
-    from oe.sdk import sdk_list_installed_packages
-    import oe.spdx30
-    import oe.sbom30
-    from datetime import datetime
-
-    sdk_name = toolchain_outputname + "-" + sdk_type
-    sdk_packages = sdk_list_installed_packages(d, sdk_type == "target")
-
-    objset = oe.sbom30.ObjectSet.new_objset(d, sdk_name)
-
-    sdk_rootfs = objset.add_root(oe.spdx30.software_Package(
-        _id=objset.new_spdxid("sdk-rootfs", sdk_name),
-        creationInfo=objset.doc.creationInfo,
-        name=sdk_name,
-        software_primaryPurpose=oe.spdx30.software_SoftwarePurpose.archive,
-    ))
-    set_timestamp_now(d, sdk_rootfs, "builtTime")
-
-    sdk_build = objset.add_root(objset.new_task_build("sdk-rootfs", "sdk-rootfs"))
-    set_timestamp_now(d, sdk_build, "build_buildEndTime")
-
-    objset.new_scoped_relationship(
-        [sdk_build],
-        oe.spdx30.RelationshipType.hasOutputs,
-        oe.spdx30.LifecycleScopeType.build,
-        [sdk_rootfs],
-    )
-
-    collect_build_package_inputs(d, objset, sdk_build, sdk_packages)
-
-    objset.add_aliases()
-    oe.sbom30.write_jsonld_doc(d, objset, spdx_work_dir / "sdk-rootfs.spdx.json")
 
 python sdk_create_sbom() {
     from pathlib import Path
+    import oe.spdx30_tasks
     sdk_deploydir = Path(d.getVar("SDKDEPLOYDIR"))
     spdx_work_dir = Path(d.getVar('SPDXSDKWORK'))
 
-    create_sdk_sbom(d, sdk_deploydir, spdx_work_dir, d.getVar("TOOLCHAIN_OUTPUTNAME"))
+    oe.spdx30_tasks.create_sdk_sbom(d, sdk_deploydir, spdx_work_dir, d.getVar("TOOLCHAIN_OUTPUTNAME"))
 }
 
 python sdk_ext_create_sbom() {
     from pathlib import Path
+    import oe.spdx30_tasks
     sdk_deploydir = Path(d.getVar("SDKEXTDEPLOYDIR"))
     spdx_work_dir = Path(d.getVar('SPDXSDKEXTWORK'))
 
-    create_sdk_sbom(d, sdk_deploydir, spdx_work_dir, d.getVar("TOOLCHAINEXT_OUTPUTNAME"))
+    oe.spdx30_tasks.create_sdk_sbom(d, sdk_deploydir, spdx_work_dir, d.getVar("TOOLCHAINEXT_OUTPUTNAME"))
 }
 
-def create_sdk_sbom(d, sdk_deploydir, spdx_work_dir, toolchain_outputname):
-    import oe.spdx30
-    import oe.sbom30
-    from pathlib import Path
-    from datetime import datetime
-
-    # Load the document written earlier
-    rootfs_objset = oe.sbom30.load_jsonld(d, spdx_work_dir / "sdk-rootfs.spdx.json", required=True)
-
-    # Create a new build for the SDK installer
-    sdk_build = rootfs_objset.new_task_build("sdk-populate", "sdk-populate")
-    set_timestamp_now(d, sdk_build, "build_buildEndTime")
-
-    rootfs = rootfs_objset.find_root(oe.spdx30.software_Package)
-    if rootfs is None:
-        bb.fatal("Unable to find rootfs artifact")
-
-    rootfs_objset.new_scoped_relationship(
-        [sdk_build],
-        oe.spdx30.RelationshipType.hasInputs,
-        oe.spdx30.LifecycleScopeType.build,
-        [rootfs]
-    )
-
-    files = set()
-    root_files = []
-
-    # NOTE: os.walk() doesn't return symlinks
-    for dirpath, dirnames, filenames in os.walk(sdk_deploydir):
-        for fn in filenames:
-            fpath = Path(dirpath) / fn
-            if not fpath.is_file() or fpath.is_symlink():
-                continue
-
-            relpath = str(fpath.relative_to(sdk_deploydir))
-
-            f = rootfs_objset.new_file(
-                rootfs_objset.new_spdxid("sdk-installer", relpath),
-                relpath,
-                fpath,
-            )
-            set_timestamp_now(d, f, "builtTime")
-
-            if fn.endswith(".manifest"):
-                f.software_primaryPurpose = oe.spdx30.software_SoftwarePurpose.manifest
-            elif fn.endswith(".testdata.json"):
-                f.software_primaryPurpose = oe.spdx30.software_SoftwarePurpose.configuration
-            else:
-                set_purposes(d, f, "SPDX_SDK_PURPOSE")
-                root_files.append(f)
-
-            files.add(f)
-
-    if files:
-        rootfs_objset.new_scoped_relationship(
-            [sdk_build],
-            oe.spdx30.RelationshipType.hasOutputs,
-            oe.spdx30.LifecycleScopeType.build,
-            files,
-        )
-    else:
-        bb.warn(f"No SDK output files found in {sdk_deploydir}")
-
-    objset, sbom = oe.sbom30.create_sbom(d, toolchain_outputname, sorted(list(files)), [rootfs_objset])
-
-    oe.sbom30.write_jsonld_doc(d, objset, sdk_deploydir / (toolchain_outputname + ".spdx.json"))
-
diff --git a/meta/lib/oe/spdx30_tasks.py b/meta/lib/oe/spdx30_tasks.py
new file mode 100644
index 00000000000..59fd8750744
--- /dev/null
+++ b/meta/lib/oe/spdx30_tasks.py
@@ -0,0 +1,1229 @@ 
+#
+# Copyright OpenEmbedded Contributors
+#
+# SPDX-License-Identifier: GPL-2.0-only
+#
+
+import json
+import oe.cve_check
+import oe.packagedata
+import oe.patch
+import oe.sbom30
+import oe.spdx30
+import oe.spdx_common
+import oe.sdk
+import os
+
+from contextlib import contextmanager
+from datetime import datetime, timezone
+from pathlib import Path
+
+
+def set_timestamp_now(d, o, prop):
+    if d.getVar("SPDX_INCLUDE_TIMESTAMPS") == "1":
+        setattr(o, prop, datetime.now(timezone.utc))
+    else:
+        # Doing this helps to validated that the property actually exists, and
+        # also that it is not mandatory
+        delattr(o, prop)
+
+
+def add_license_expression(d, objset, license_expression):
+    license_data = d.getVar("SPDX_LICENSE_DATA")
+    simple_license_text = {}
+    license_text_map = {}
+    license_ref_idx = 0
+
+    def add_license_text(name):
+        nonlocal objset
+        nonlocal simple_license_text
+
+        if name in simple_license_text:
+            return simple_license_text[name]
+
+        lic = objset.find_filter(
+            oe.spdx30.simplelicensing_SimpleLicensingText,
+            name=name,
+        )
+
+        if lic is not None:
+            simple_license_text[name] = lic
+            return lic
+
+        lic = objset.add(
+            oe.spdx30.simplelicensing_SimpleLicensingText(
+                _id=objset.new_spdxid("license-text", name),
+                creationInfo=objset.doc.creationInfo,
+                name=name,
+            )
+        )
+        simple_license_text[name] = lic
+
+        if name == "PD":
+            lic.simplelicensing_licenseText = "Software released to the public domain"
+            return lic
+
+        # Seach for the license in COMMON_LICENSE_DIR and LICENSE_PATH
+        for directory in [d.getVar("COMMON_LICENSE_DIR")] + (
+            d.getVar("LICENSE_PATH") or ""
+        ).split():
+            try:
+                with (Path(directory) / name).open(errors="replace") as f:
+                    lic.simplelicensing_licenseText = f.read()
+                    return lic
+
+            except FileNotFoundError:
+                pass
+
+        # If it's not SPDX or PD, then NO_GENERIC_LICENSE must be set
+        filename = d.getVarFlag("NO_GENERIC_LICENSE", name)
+        if filename:
+            filename = d.expand("${S}/" + filename)
+            with open(filename, errors="replace") as f:
+                lic.simplelicensing_licenseText = f.read()
+                return lic
+        else:
+            bb.fatal("Cannot find any text for license %s" % name)
+
+    def convert(l):
+        nonlocal license_text_map
+        nonlocal license_ref_idx
+
+        if l == "(" or l == ")":
+            return l
+
+        if l == "&":
+            return "AND"
+
+        if l == "|":
+            return "OR"
+
+        if l == "CLOSED":
+            return "NONE"
+
+        spdx_license = d.getVarFlag("SPDXLICENSEMAP", l) or l
+        if spdx_license in license_data["licenses"]:
+            return spdx_license
+
+        spdx_license = "LicenseRef-" + l
+        if spdx_license not in license_text_map:
+            license_text_map[spdx_license] = add_license_text(l)._id
+
+        return spdx_license
+
+    lic_split = (
+        license_expression.replace("(", " ( ")
+        .replace(")", " ) ")
+        .replace("|", " | ")
+        .replace("&", " & ")
+        .split()
+    )
+    spdx_license_expression = " ".join(convert(l) for l in lic_split)
+
+    return objset.new_license_expression(spdx_license_expression, license_text_map)
+
+
+def add_package_files(
+    d,
+    objset,
+    topdir,
+    get_spdxid,
+    get_purposes,
+    *,
+    archive=None,
+    ignore_dirs=[],
+    ignore_top_level_dirs=[],
+):
+    source_date_epoch = d.getVar("SOURCE_DATE_EPOCH")
+    if source_date_epoch:
+        source_date_epoch = int(source_date_epoch)
+
+    spdx_files = set()
+
+    file_counter = 1
+    for subdir, dirs, files in os.walk(topdir):
+        dirs[:] = [d for d in dirs if d not in ignore_dirs]
+        if subdir == str(topdir):
+            dirs[:] = [d for d in dirs if d not in ignore_top_level_dirs]
+
+        for file in files:
+            filepath = Path(subdir) / file
+            if filepath.is_symlink() or not filepath.is_file():
+                continue
+
+            bb.debug(1, "Adding file %s to %s" % (filepath, objset.doc._id))
+
+            filename = str(filepath.relative_to(topdir))
+            file_purposes = get_purposes(filepath)
+
+            spdx_file = objset.new_file(
+                get_spdxid(file_counter),
+                filename,
+                filepath,
+                purposes=file_purposes,
+            )
+            spdx_files.add(spdx_file)
+
+            if oe.spdx30.software_SoftwarePurpose.source in file_purposes:
+                objset.scan_declared_licenses(spdx_file, filepath)
+
+            if archive is not None:
+                with filepath.open("rb") as f:
+                    info = archive.gettarinfo(fileobj=f)
+                    info.name = filename
+                    info.uid = 0
+                    info.gid = 0
+                    info.uname = "root"
+                    info.gname = "root"
+
+                    if source_date_epoch is not None and info.mtime > source_date_epoch:
+                        info.mtime = source_date_epoch
+
+                    archive.addfile(info, f)
+
+            file_counter += 1
+
+    return spdx_files
+
+
+def get_package_sources_from_debug(
+    d, package, package_files, sources, source_hash_cache
+):
+    def file_path_match(file_path, pkg_file):
+        if file_path.lstrip("/") == pkg_file.name.lstrip("/"):
+            return True
+
+        for e in pkg_file.extension:
+            if isinstance(e, oe.sbom30.OEFileNameAliasExtension):
+                for a in e.aliases:
+                    if file_path.lstrip("/") == a.lstrip("/"):
+                        return True
+
+        return False
+
+    debug_search_paths = [
+        Path(d.getVar("PKGD")),
+        Path(d.getVar("STAGING_DIR_TARGET")),
+        Path(d.getVar("STAGING_DIR_NATIVE")),
+        Path(d.getVar("STAGING_KERNEL_DIR")),
+    ]
+
+    pkg_data = oe.packagedata.read_subpkgdata_extended(package, d)
+
+    if pkg_data is None:
+        return
+
+    dep_source_files = set()
+
+    for file_path, file_data in pkg_data["files_info"].items():
+        if not "debugsrc" in file_data:
+            continue
+
+        if not any(file_path_match(file_path, pkg_file) for pkg_file in package_files):
+            bb.fatal(
+                "No package file found for %s in %s; SPDX found: %s"
+                % (str(file_path), package, " ".join(p.name for p in package_files))
+            )
+            continue
+
+        for debugsrc in file_data["debugsrc"]:
+            for search in debug_search_paths:
+                if debugsrc.startswith("/usr/src/kernel"):
+                    debugsrc_path = search / debugsrc.replace("/usr/src/kernel/", "")
+                else:
+                    debugsrc_path = search / debugsrc.lstrip("/")
+
+                if debugsrc_path in source_hash_cache:
+                    file_sha256 = source_hash_cache[debugsrc_path]
+                    if file_sha256 is None:
+                        continue
+                else:
+                    if not debugsrc_path.exists():
+                        source_hash_cache[debugsrc_path] = None
+                        continue
+
+                    file_sha256 = bb.utils.sha256_file(debugsrc_path)
+                    source_hash_cache[debugsrc_path] = file_sha256
+
+                if file_sha256 in sources:
+                    source_file = sources[file_sha256]
+                    dep_source_files.add(source_file)
+                else:
+                    bb.debug(
+                        1,
+                        "Debug source %s with SHA256 %s not found in any dependency"
+                        % (str(debugsrc_path), file_sha256),
+                    )
+                break
+            else:
+                bb.debug(1, "Debug source %s not found" % debugsrc)
+
+    return dep_source_files
+
+
+def collect_dep_objsets(d, build):
+    deps = oe.spdx_common.get_spdx_deps(d)
+
+    dep_objsets = []
+    dep_builds = set()
+
+    dep_build_spdxids = set()
+    for dep in deps:
+        bb.debug(1, "Fetching SPDX for dependency %s" % (dep.pn))
+        dep_build, dep_objset = oe.sbom30.find_root_obj_in_jsonld(
+            d, "recipes", dep.pn, oe.spdx30.build_Build
+        )
+        # If the dependency is part of the taskhash, return it to be linked
+        # against. Otherwise, it cannot be linked against because this recipe
+        # will not rebuilt if dependency changes
+        if dep.in_taskhash:
+            dep_objsets.append(dep_objset)
+
+        # The build _can_ be linked against (by alias)
+        dep_builds.add(dep_build)
+
+    return dep_objsets, dep_builds
+
+
+def collect_dep_sources(dep_objsets):
+    sources = {}
+    for objset in dep_objsets:
+        # Don't collect sources from native recipes as they
+        # match non-native sources also.
+        if objset.is_native():
+            continue
+
+        bb.debug(1, "Fetching Sources for dependency %s" % (objset.doc.name))
+
+        dep_build = objset.find_root(oe.spdx30.build_Build)
+        if not dep_build:
+            bb.fatal("Unable to find a build")
+
+        for e in objset.foreach_type(oe.spdx30.Relationship):
+            if dep_build is not e.from_:
+                continue
+
+            if e.relationshipType != oe.spdx30.RelationshipType.hasInputs:
+                continue
+
+            for to in e.to:
+                if not isinstance(to, oe.spdx30.software_File):
+                    continue
+
+                if (
+                    to.software_primaryPurpose
+                    != oe.spdx30.software_SoftwarePurpose.source
+                ):
+                    continue
+
+                for v in to.verifiedUsing:
+                    if v.algorithm == oe.spdx30.HashAlgorithm.sha256:
+                        sources[v.hashValue] = to
+                        break
+                else:
+                    bb.fatal(
+                        "No SHA256 found for %s in %s" % (to.name, objset.doc.name)
+                    )
+
+    return sources
+
+
+def add_download_files(d, objset):
+    inputs = set()
+
+    urls = d.getVar("SRC_URI").split()
+    fetch = bb.fetch2.Fetch(urls, d)
+
+    for download_idx, src_uri in enumerate(urls):
+        fd = fetch.ud[src_uri]
+
+        for name in fd.names:
+            file_name = os.path.basename(fetch.localpath(src_uri))
+            if oe.patch.patch_path(src_uri, fetch, "", expand=False):
+                primary_purpose = oe.spdx30.software_SoftwarePurpose.patch
+            else:
+                primary_purpose = oe.spdx30.software_SoftwarePurpose.source
+
+            if fd.type == "file":
+                if os.path.isdir(fd.localpath):
+                    walk_idx = 1
+                    for root, dirs, files in os.walk(fd.localpath):
+                        for f in files:
+                            f_path = os.path.join(root, f)
+                            if os.path.islink(f_path):
+                                # TODO: SPDX doesn't support symlinks yet
+                                continue
+
+                            file = objset.new_file(
+                                objset.new_spdxid(
+                                    "source", str(download_idx + 1), str(walk_idx)
+                                ),
+                                os.path.join(
+                                    file_name, os.path.relpath(f_path, fd.localpath)
+                                ),
+                                f_path,
+                                purposes=[primary_purpose],
+                            )
+
+                            inputs.add(file)
+                            walk_idx += 1
+
+                else:
+                    file = objset.new_file(
+                        objset.new_spdxid("source", str(download_idx + 1)),
+                        file_name,
+                        fd.localpath,
+                        purposes=[primary_purpose],
+                    )
+                    inputs.add(file)
+
+            else:
+                uri = fd.type
+                proto = getattr(fd, "proto", None)
+                if proto is not None:
+                    uri = uri + "+" + proto
+                uri = uri + "://" + fd.host + fd.path
+
+                if fd.method.supports_srcrev():
+                    uri = uri + "@" + fd.revisions[name]
+
+                dl = objset.add(
+                    oe.spdx30.software_Package(
+                        _id=objset.new_spdxid("source", str(download_idx + 1)),
+                        creationInfo=objset.doc.creationInfo,
+                        name=file_name,
+                        software_primaryPurpose=primary_purpose,
+                        software_downloadLocation=uri,
+                    )
+                )
+
+                if fd.method.supports_checksum(fd):
+                    # TODO Need something better than hard coding this
+                    for checksum_id in ["sha256", "sha1"]:
+                        expected_checksum = getattr(
+                            fd, "%s_expected" % checksum_id, None
+                        )
+                        if expected_checksum is None:
+                            continue
+
+                        dl.verifiedUsing.append(
+                            oe.spdx30.Hash(
+                                algorithm=getattr(oe.spdx30.HashAlgorithm, checksum_id),
+                                hashValue=expected_checksum,
+                            )
+                        )
+
+                inputs.add(dl)
+
+    return inputs
+
+
+def set_purposes(d, element, *var_names, force_purposes=[]):
+    purposes = force_purposes[:]
+
+    for var_name in var_names:
+        val = d.getVar(var_name)
+        if val:
+            purposes.extend(val.split())
+            break
+
+    if not purposes:
+        bb.warn("No SPDX purposes found in %s" % " ".join(var_names))
+        return
+
+    element.software_primaryPurpose = getattr(
+        oe.spdx30.software_SoftwarePurpose, purposes[0]
+    )
+    element.software_additionalPurpose = [
+        getattr(oe.spdx30.software_SoftwarePurpose, p) for p in purposes[1:]
+    ]
+
+
+def create_spdx(d):
+    def set_var_field(var, obj, name, package=None):
+        val = None
+        if package:
+            val = d.getVar("%s:%s" % (var, package))
+
+        if not val:
+            val = d.getVar(var)
+
+        if val:
+            setattr(obj, name, val)
+
+    deploydir = Path(d.getVar("SPDXDEPLOY"))
+    deploy_dir_spdx = Path(d.getVar("DEPLOY_DIR_SPDX"))
+    spdx_workdir = Path(d.getVar("SPDXWORK"))
+    include_sources = d.getVar("SPDX_INCLUDE_SOURCES") == "1"
+    pkg_arch = d.getVar("SSTATE_PKGARCH")
+    is_native = bb.data.inherits_class("native", d) or bb.data.inherits_class(
+        "cross", d
+    )
+    include_vex = d.getVar("SPDX_INCLUDE_VEX")
+    if not include_vex in ("none", "current", "all"):
+        bb.fatal("SPDX_INCLUDE_VEX must be one of 'none', 'current', 'all'")
+
+    build_objset = oe.sbom30.ObjectSet.new_objset(d, d.getVar("PN"))
+
+    build = build_objset.new_task_build("recipe", "recipe")
+    build_objset.doc.rootElement.append(build)
+
+    build_objset.set_is_native(is_native)
+
+    for var in (d.getVar("SPDX_CUSTOM_ANNOTATION_VARS") or "").split():
+        new_annotation(
+            d,
+            build_objset,
+            build,
+            "%s=%s" % (var, d.getVar(var)),
+            oe.spdx30.AnnotationType.other,
+        )
+
+    build_inputs = set()
+
+    # Add CVEs
+    cve_by_status = {}
+    if include_vex != "none":
+        for cve in d.getVarFlags("CVE_STATUS") or {}:
+            status, detail, description = oe.cve_check.decode_cve_status(d, cve)
+
+            # If this CVE is fixed upstream, skip it unless all CVEs are
+            # specified.
+            if include_vex != "all" and detail in (
+                "fixed-version",
+                "cpe-stable-backport",
+            ):
+                bb.debug(1, "Skipping %s since it is already fixed upstream" % cve)
+                continue
+
+            cve_by_status.setdefault(status, {})[cve] = (
+                build_objset.new_cve_vuln(cve),
+                detail,
+                description,
+            )
+
+    cpe_ids = oe.cve_check.get_cpe_ids(d.getVar("CVE_PRODUCT"), d.getVar("CVE_VERSION"))
+
+    source_files = add_download_files(d, build_objset)
+    build_inputs |= source_files
+
+    recipe_spdx_license = add_license_expression(d, build_objset, d.getVar("LICENSE"))
+    build_objset.new_relationship(
+        source_files,
+        oe.spdx30.RelationshipType.hasConcludedLicense,
+        [recipe_spdx_license],
+    )
+
+    if oe.spdx_common.process_sources(d) and include_sources:
+        bb.debug(1, "Adding source files to SPDX")
+        oe.spdx_common.get_patched_src(d)
+
+        build_inputs |= add_package_files(
+            d,
+            build_objset,
+            spdx_workdir,
+            lambda file_counter: build_objset.new_spdxid(
+                "sourcefile", str(file_counter)
+            ),
+            lambda filepath: [oe.spdx30.software_SoftwarePurpose.source],
+            ignore_dirs=[".git"],
+            ignore_top_level_dirs=["temp"],
+            archive=None,
+        )
+
+    dep_objsets, dep_builds = collect_dep_objsets(d, build)
+    if dep_builds:
+        build_objset.new_scoped_relationship(
+            [build],
+            oe.spdx30.RelationshipType.dependsOn,
+            oe.spdx30.LifecycleScopeType.build,
+            sorted(oe.sbom30.get_element_link_id(b) for b in dep_builds),
+        )
+
+    debug_source_ids = set()
+    source_hash_cache = {}
+
+    # Write out the package SPDX data now. It is not complete as we cannot
+    # write the runtime data, so write it to a staging area and a later task
+    # will write out the final collection
+
+    # TODO: Handle native recipe output
+    if not is_native:
+        bb.debug(1, "Collecting Dependency sources files")
+        sources = collect_dep_sources(dep_objsets)
+
+        bb.build.exec_func("read_subpackage_metadata", d)
+
+        pkgdest = Path(d.getVar("PKGDEST"))
+        for package in d.getVar("PACKAGES").split():
+            if not oe.packagedata.packaged(package, d):
+                continue
+
+            pkg_name = d.getVar("PKG:%s" % package) or package
+
+            bb.debug(1, "Creating SPDX for package %s" % pkg_name)
+
+            pkg_objset = oe.sbom30.ObjectSet.new_objset(d, pkg_name)
+
+            spdx_package = pkg_objset.add_root(
+                oe.spdx30.software_Package(
+                    _id=pkg_objset.new_spdxid("package", pkg_name),
+                    creationInfo=pkg_objset.doc.creationInfo,
+                    name=pkg_name,
+                    software_packageVersion=d.getVar("PV"),
+                )
+            )
+            set_timestamp_now(d, spdx_package, "builtTime")
+
+            set_purposes(
+                d,
+                spdx_package,
+                "SPDX_PACKAGE_ADDITIONAL_PURPOSE:%s" % package,
+                "SPDX_PACKAGE_ADDITIONAL_PURPOSE",
+                force_purposes=["install"],
+            )
+
+            supplier = build_objset.new_agent("SPDX_PACKAGE_SUPPLIER")
+            if supplier is not None:
+                spdx_package.supplier = (
+                    supplier if isinstance(supplier, str) else supplier._id
+                )
+
+            set_var_field(
+                "HOMEPAGE", spdx_package, "software_homePage", package=package
+            )
+            set_var_field("SUMMARY", spdx_package, "summary", package=package)
+            set_var_field("DESCRIPTION", spdx_package, "description", package=package)
+
+            pkg_objset.new_scoped_relationship(
+                [build._id],
+                oe.spdx30.RelationshipType.hasOutputs,
+                oe.spdx30.LifecycleScopeType.build,
+                [spdx_package],
+            )
+
+            for cpe_id in cpe_ids:
+                spdx_package.externalIdentifier.append(
+                    oe.spdx30.ExternalIdentifier(
+                        externalIdentifierType=oe.spdx30.ExternalIdentifierType.cpe23,
+                        identifier=cpe_id,
+                    )
+                )
+
+            # TODO: Generate a file for each actual IPK/DEB/RPM/TGZ file
+            # generated and link it to the package
+            # spdx_package_file = pkg_objset.add(oe.spdx30.software_File(
+            #    _id=pkg_objset.new_spdxid("distribution", pkg_name),
+            #    creationInfo=pkg_objset.doc.creationInfo,
+            #    name=pkg_name,
+            #    software_primaryPurpose=spdx_package.software_primaryPurpose,
+            #    software_additionalPurpose=spdx_package.software_additionalPurpose,
+            # ))
+            # set_timestamp_now(d, spdx_package_file, "builtTime")
+
+            ## TODO add hashes
+            # pkg_objset.new_relationship(
+            #    [spdx_package],
+            #    oe.spdx30.RelationshipType.hasDistributionArtifact,
+            #    [spdx_package_file],
+            # )
+
+            # NOTE: licenses live in the recipe collection and are referenced
+            # by ID in the package collection(s). This helps reduce duplication
+            # (since a lot of packages will have the same license), and also
+            # prevents duplicate license SPDX IDs in the packages
+            package_license = d.getVar("LICENSE:%s" % package)
+            if package_license and package_license != d.getVar("LICENSE"):
+                package_spdx_license = add_license_expression(
+                    d, build_objset, package_license
+                )
+            else:
+                package_spdx_license = recipe_spdx_license
+
+            pkg_objset.new_relationship(
+                [spdx_package],
+                oe.spdx30.RelationshipType.hasConcludedLicense,
+                [package_spdx_license._id],
+            )
+
+            # NOTE: CVE Elements live in the recipe collection
+            all_cves = set()
+            for status, cves in cve_by_status.items():
+                for cve, items in cves.items():
+                    spdx_cve, detail, description = items
+
+                    all_cves.add(spdx_cve._id)
+
+                    if status == "Patched":
+                        pkg_objset.new_vex_patched_relationship(
+                            [spdx_cve._id], [spdx_package]
+                        )
+                    elif status == "Unpatched":
+                        pkg_objset.new_vex_unpatched_relationship(
+                            [spdx_cve._id], [spdx_package]
+                        )
+                    elif status == "Ignored":
+                        spdx_vex = pkg_objset.new_vex_ignored_relationship(
+                            [spdx_cve._id],
+                            [spdx_package],
+                            impact_statement=description,
+                        )
+
+                        if detail in (
+                            "ignored",
+                            "cpe-incorrect",
+                            "disputed",
+                            "upstream-wontfix",
+                        ):
+                            # VEX doesn't have justifications for this
+                            pass
+                        elif detail in (
+                            "not-applicable-config",
+                            "not-applicable-platform",
+                        ):
+                            for v in spdx_vex:
+                                v.security_justificationType = (
+                                    oe.spdx30.security_VexJustificationType.vulnerableCodeNotPresent
+                                )
+                        else:
+                            bb.fatal(f"Unknown detail '{detail}' for ignored {cve}")
+                    else:
+                        bb.fatal(f"Unknown CVE status {status}")
+
+            if all_cves:
+                pkg_objset.new_relationship(
+                    [spdx_package],
+                    oe.spdx30.RelationshipType.hasAssociatedVulnerability,
+                    sorted(list(all_cves)),
+                )
+
+            bb.debug(1, "Adding package files to SPDX for package %s" % pkg_name)
+            package_files = add_package_files(
+                d,
+                pkg_objset,
+                pkgdest / package,
+                lambda file_counter: pkg_objset.new_spdxid(
+                    "package", pkg_name, "file", str(file_counter)
+                ),
+                # TODO: Can we know the purpose here?
+                lambda filepath: [],
+                ignore_top_level_dirs=["CONTROL", "DEBIAN"],
+                archive=None,
+            )
+
+            if package_files:
+                pkg_objset.new_relationship(
+                    [spdx_package],
+                    oe.spdx30.RelationshipType.contains,
+                    sorted(list(package_files)),
+                )
+
+            if include_sources:
+                debug_sources = get_package_sources_from_debug(
+                    d, package, package_files, sources, source_hash_cache
+                )
+                debug_source_ids |= set(
+                    oe.sbom30.get_element_link_id(d) for d in debug_sources
+                )
+
+            oe.sbom30.write_recipe_jsonld_doc(
+                d, pkg_objset, "packages-staging", deploydir, create_spdx_id_links=False
+            )
+
+    if include_sources:
+        bb.debug(1, "Adding sysroot files to SPDX")
+        sysroot_files = add_package_files(
+            d,
+            build_objset,
+            d.expand("${COMPONENTS_DIR}/${PACKAGE_ARCH}/${PN}"),
+            lambda file_counter: build_objset.new_spdxid("sysroot", str(file_counter)),
+            lambda filepath: [],
+            archive=None,
+        )
+
+        if sysroot_files:
+            build_objset.new_scoped_relationship(
+                [build],
+                oe.spdx30.RelationshipType.hasOutputs,
+                oe.spdx30.LifecycleScopeType.build,
+                sorted(list(sysroot_files)),
+            )
+
+    if build_inputs or debug_source_ids:
+        build_objset.new_scoped_relationship(
+            [build],
+            oe.spdx30.RelationshipType.hasInputs,
+            oe.spdx30.LifecycleScopeType.build,
+            sorted(list(build_inputs)) + sorted(list(debug_source_ids)),
+        )
+
+    oe.sbom30.write_recipe_jsonld_doc(d, build_objset, "recipes", deploydir)
+
+
+def create_package_spdx(d):
+    deploy_dir_spdx = Path(d.getVar("DEPLOY_DIR_SPDX"))
+    deploydir = Path(d.getVar("SPDXRUNTIMEDEPLOY"))
+    is_native = bb.data.inherits_class("native", d) or bb.data.inherits_class(
+        "cross", d
+    )
+
+    providers = oe.spdx_common.collect_package_providers(d)
+    pkg_arch = d.getVar("SSTATE_PKGARCH")
+
+    if is_native:
+        return
+
+    bb.build.exec_func("read_subpackage_metadata", d)
+
+    dep_package_cache = {}
+
+    # Any element common to all packages that need to be referenced by ID
+    # should be written into this objset set
+    common_objset = oe.sbom30.ObjectSet.new_objset(
+        d, "%s-package-common" % d.getVar("PN")
+    )
+
+    pkgdest = Path(d.getVar("PKGDEST"))
+    for package in d.getVar("PACKAGES").split():
+        localdata = bb.data.createCopy(d)
+        pkg_name = d.getVar("PKG:%s" % package) or package
+        localdata.setVar("PKG", pkg_name)
+        localdata.setVar("OVERRIDES", d.getVar("OVERRIDES", False) + ":" + package)
+
+        if not oe.packagedata.packaged(package, localdata):
+            continue
+
+        spdx_package, pkg_objset = oe.sbom30.load_obj_in_jsonld(
+            d,
+            pkg_arch,
+            "packages-staging",
+            pkg_name,
+            oe.spdx30.software_Package,
+            software_primaryPurpose=oe.spdx30.software_SoftwarePurpose.install,
+        )
+
+        # We will write out a new collection, so link it to the new
+        # creation info in the common package data. The old creation info
+        # should still exist and be referenced by all the existing elements
+        # in the package
+        pkg_objset.creationInfo = pkg_objset.copy_creation_info(
+            common_objset.doc.creationInfo
+        )
+
+        runtime_spdx_deps = set()
+
+        deps = bb.utils.explode_dep_versions2(localdata.getVar("RDEPENDS") or "")
+        seen_deps = set()
+        for dep, _ in deps.items():
+            if dep in seen_deps:
+                continue
+
+            if dep not in providers:
+                continue
+
+            (dep, _) = providers[dep]
+
+            if not oe.packagedata.packaged(dep, localdata):
+                continue
+
+            dep_pkg_data = oe.packagedata.read_subpkgdata_dict(dep, d)
+            dep_pkg = dep_pkg_data["PKG"]
+
+            if dep in dep_package_cache:
+                dep_spdx_package = dep_package_cache[dep]
+            else:
+                bb.debug(1, "Searching for %s" % dep_pkg)
+                dep_spdx_package, _ = oe.sbom30.find_root_obj_in_jsonld(
+                    d,
+                    "packages-staging",
+                    dep_pkg,
+                    oe.spdx30.software_Package,
+                    software_primaryPurpose=oe.spdx30.software_SoftwarePurpose.install,
+                )
+                dep_package_cache[dep] = dep_spdx_package
+
+            runtime_spdx_deps.add(dep_spdx_package)
+            seen_deps.add(dep)
+
+        if runtime_spdx_deps:
+            pkg_objset.new_scoped_relationship(
+                [spdx_package],
+                oe.spdx30.RelationshipType.dependsOn,
+                oe.spdx30.LifecycleScopeType.runtime,
+                [oe.sbom30.get_element_link_id(dep) for dep in runtime_spdx_deps],
+            )
+
+        oe.sbom30.write_recipe_jsonld_doc(d, pkg_objset, "packages", deploydir)
+
+    oe.sbom30.write_recipe_jsonld_doc(d, common_objset, "common-package", deploydir)
+
+
+def write_bitbake_spdx(d):
+    # Set PN to "bitbake" so that SPDX IDs can be generated
+    d.setVar("PN", "bitbake")
+    d.setVar("BB_TASKHASH", "bitbake")
+    oe.spdx_common.load_spdx_license_data(d)
+
+    deploy_dir_spdx = Path(d.getVar("DEPLOY_DIR_SPDX"))
+
+    objset = oe.sbom30.ObjectSet.new_objset(d, "bitbake", False)
+
+    host_import_key = d.getVar("SPDX_BUILD_HOST")
+    invoked_by = objset.new_agent("SPDX_INVOKED_BY", add=False)
+    on_behalf_of = objset.new_agent("SPDX_ON_BEHALF_OF", add=False)
+
+    if d.getVar("SPDX_INCLUDE_BITBAKE_PARENT_BUILD") == "1":
+        # Since the Build objects are unique, we may as well set the creation
+        # time to the current time instead of the fallback SDE
+        objset.doc.creationInfo.created = datetime.now(timezone.utc)
+
+        # Each invocation of bitbake should have a unique ID since it is a
+        # unique build
+        nonce = os.urandom(16).hex()
+
+        build = objset.add_root(
+            oe.spdx30.build_Build(
+                _id=objset.new_spdxid(nonce, include_unihash=False),
+                creationInfo=objset.doc.creationInfo,
+                build_buildType=oe.sbom30.SPDX_BUILD_TYPE,
+            )
+        )
+        set_timestamp_now(d, build, "build_buildStartTime")
+
+        if host_import_key:
+            objset.new_scoped_relationship(
+                [build],
+                oe.spdx30.RelationshipType.hasHost,
+                oe.spdx30.LifecycleScopeType.build,
+                [objset.new_import("SPDX_BUILD_HOST")],
+            )
+
+        if invoked_by:
+            objset.add(invoked_by)
+            invoked_by_spdx = objset.new_scoped_relationship(
+                [build],
+                oe.spdx30.RelationshipType.invokedBy,
+                oe.spdx30.LifecycleScopeType.build,
+                [invoked_by],
+            )
+
+            if on_behalf_of:
+                objset.add(on_behalf_of)
+                objset.new_scoped_relationship(
+                    [on_behalf_of],
+                    oe.spdx30.RelationshipType.delegatedTo,
+                    oe.spdx30.LifecycleScopeType.build,
+                    invoked_by_spdx,
+                )
+
+        elif on_behalf_of:
+            bb.warn("SPDX_ON_BEHALF_OF has no effect if SPDX_INVOKED_BY is not set")
+
+    else:
+        if host_import_key:
+            bb.warn(
+                "SPDX_BUILD_HOST has no effect if SPDX_INCLUDE_BITBAKE_PARENT_BUILD is not set"
+            )
+
+        if invoked_by:
+            bb.warn(
+                "SPDX_INVOKED_BY has no effect if SPDX_INCLUDE_BITBAKE_PARENT_BUILD is not set"
+            )
+
+        if on_behalf_of:
+            bb.warn(
+                "SPDX_ON_BEHALF_OF has no effect if SPDX_INCLUDE_BITBAKE_PARENT_BUILD is not set"
+            )
+
+    for obj in objset.foreach_type(oe.spdx30.Element):
+        obj.extension.append(oe.sbom30.OELinkExtension(link_spdx_id=False))
+        obj.extension.append(oe.sbom30.OEIdAliasExtension())
+
+    oe.sbom30.write_jsonld_doc(d, objset, deploy_dir_spdx / "bitbake.spdx.json")
+
+
+def collect_build_package_inputs(d, objset, build, packages):
+    providers = oe.spdx_common.collect_package_providers(d)
+
+    build_deps = set()
+
+    for name in sorted(packages.keys()):
+        if name not in providers:
+            bb.fatal("Unable to find SPDX provider for '%s'" % name)
+
+        pkg_name, pkg_hashfn = providers[name]
+
+        # Copy all of the package SPDX files into the Sbom elements
+        pkg_spdx, _ = oe.sbom30.find_root_obj_in_jsonld(
+            d,
+            "packages",
+            pkg_name,
+            oe.spdx30.software_Package,
+            software_primaryPurpose=oe.spdx30.software_SoftwarePurpose.install,
+        )
+        build_deps.add(pkg_spdx._id)
+
+    if build_deps:
+        objset.new_scoped_relationship(
+            [build],
+            oe.spdx30.RelationshipType.hasInputs,
+            oe.spdx30.LifecycleScopeType.build,
+            sorted(list(build_deps)),
+        )
+
+
+def create_rootfs_spdx(d):
+    deploy_dir_spdx = Path(d.getVar("DEPLOY_DIR_SPDX"))
+    deploydir = Path(d.getVar("SPDXROOTFSDEPLOY"))
+    root_packages_file = Path(d.getVar("SPDX_ROOTFS_PACKAGES"))
+    image_basename = d.getVar("IMAGE_BASENAME")
+    machine = d.getVar("MACHINE")
+
+    with root_packages_file.open("r") as f:
+        packages = json.load(f)
+
+    objset = oe.sbom30.ObjectSet.new_objset(d, "%s-%s" % (image_basename, machine))
+
+    rootfs = objset.add_root(
+        oe.spdx30.software_Package(
+            _id=objset.new_spdxid("rootfs", image_basename),
+            creationInfo=objset.doc.creationInfo,
+            name=image_basename,
+            software_primaryPurpose=oe.spdx30.software_SoftwarePurpose.archive,
+        )
+    )
+    set_timestamp_now(d, rootfs, "builtTime")
+
+    rootfs_build = objset.add_root(objset.new_task_build("rootfs", "rootfs"))
+    set_timestamp_now(d, rootfs_build, "build_buildEndTime")
+
+    objset.new_scoped_relationship(
+        [rootfs_build],
+        oe.spdx30.RelationshipType.hasOutputs,
+        oe.spdx30.LifecycleScopeType.build,
+        [rootfs],
+    )
+
+    collect_build_package_inputs(d, objset, rootfs_build, packages)
+
+    oe.sbom30.write_recipe_jsonld_doc(d, objset, "rootfs", deploydir)
+
+
+def create_image_spdx(d):
+    image_deploy_dir = Path(d.getVar("IMGDEPLOYDIR"))
+    manifest_path = Path(d.getVar("IMAGE_OUTPUT_MANIFEST"))
+    spdx_work_dir = Path(d.getVar("SPDXIMAGEWORK"))
+
+    image_basename = d.getVar("IMAGE_BASENAME")
+    machine = d.getVar("MACHINE")
+
+    objset = oe.sbom30.ObjectSet.new_objset(d, "%s-%s" % (image_basename, machine))
+
+    with manifest_path.open("r") as f:
+        manifest = json.load(f)
+
+    builds = []
+    for task in manifest:
+        imagetype = task["imagetype"]
+        taskname = task["taskname"]
+
+        image_build = objset.add_root(
+            objset.new_task_build(taskname, "image/%s" % imagetype)
+        )
+        set_timestamp_now(d, image_build, "build_buildEndTime")
+        builds.append(image_build)
+
+        artifacts = []
+
+        for image in task["images"]:
+            image_filename = image["filename"]
+            image_path = image_deploy_dir / image_filename
+            a = objset.add_root(
+                oe.spdx30.software_File(
+                    _id=objset.new_spdxid("image", image_filename),
+                    creationInfo=objset.doc.creationInfo,
+                    name=image_filename,
+                    verifiedUsing=[
+                        oe.spdx30.Hash(
+                            algorithm=oe.spdx30.HashAlgorithm.sha256,
+                            hashValue=bb.utils.sha256_file(image_path),
+                        )
+                    ],
+                )
+            )
+            set_purposes(
+                d, a, "SPDX_IMAGE_PURPOSE:%s" % imagetype, "SPDX_IMAGE_PURPOSE"
+            )
+            set_timestamp_now(d, a, "builtTime")
+
+            artifacts.append(a)
+
+        if artifacts:
+            objset.new_scoped_relationship(
+                [image_build],
+                oe.spdx30.RelationshipType.hasOutputs,
+                oe.spdx30.LifecycleScopeType.build,
+                artifacts,
+            )
+
+    if builds:
+        rootfs_image, _ = oe.sbom30.find_root_obj_in_jsonld(
+            d,
+            "rootfs",
+            "%s-%s" % (image_basename, machine),
+            oe.spdx30.software_Package,
+            # TODO: Should use a purpose to filter here?
+        )
+        objset.new_scoped_relationship(
+            builds,
+            oe.spdx30.RelationshipType.hasInputs,
+            oe.spdx30.LifecycleScopeType.build,
+            [rootfs_image._id],
+        )
+
+    objset.add_aliases()
+    objset.link()
+    oe.sbom30.write_recipe_jsonld_doc(d, objset, "image", spdx_work_dir)
+
+
+def create_image_sbom_spdx(d):
+    image_name = d.getVar("IMAGE_NAME")
+    image_basename = d.getVar("IMAGE_BASENAME")
+    image_link_name = d.getVar("IMAGE_LINK_NAME")
+    imgdeploydir = Path(d.getVar("SPDXIMAGEDEPLOYDIR"))
+    machine = d.getVar("MACHINE")
+
+    spdx_path = imgdeploydir / (image_name + ".spdx.json")
+
+    root_elements = []
+
+    # TODO: Do we need to add the rootfs or are the image files sufficient?
+    rootfs_image, _ = oe.sbom30.find_root_obj_in_jsonld(
+        d,
+        "rootfs",
+        "%s-%s" % (image_basename, machine),
+        oe.spdx30.software_Package,
+        # TODO: Should use a purpose here?
+    )
+    root_elements.append(rootfs_image._id)
+
+    image_objset, _ = oe.sbom30.find_jsonld(
+        d, "image", "%s-%s" % (image_basename, machine), required=True
+    )
+    for o in image_objset.foreach_root(oe.spdx30.software_File):
+        root_elements.append(o._id)
+
+    objset, sbom = oe.sbom30.create_sbom(d, image_name, root_elements)
+
+    oe.sbom30.write_jsonld_doc(d, objset, spdx_path)
+
+    def make_image_link(target_path, suffix):
+        if image_link_name:
+            link = imgdeploydir / (image_link_name + suffix)
+            if link != target_path:
+                link.symlink_to(os.path.relpath(target_path, link.parent))
+
+    make_image_link(spdx_path, ".spdx.json")
+
+
+def sdk_create_spdx(d, sdk_type, spdx_work_dir, toolchain_outputname):
+    sdk_name = toolchain_outputname + "-" + sdk_type
+    sdk_packages = oe.sdk.sdk_list_installed_packages(d, sdk_type == "target")
+
+    objset = oe.sbom30.ObjectSet.new_objset(d, sdk_name)
+
+    sdk_rootfs = objset.add_root(
+        oe.spdx30.software_Package(
+            _id=objset.new_spdxid("sdk-rootfs", sdk_name),
+            creationInfo=objset.doc.creationInfo,
+            name=sdk_name,
+            software_primaryPurpose=oe.spdx30.software_SoftwarePurpose.archive,
+        )
+    )
+    set_timestamp_now(d, sdk_rootfs, "builtTime")
+
+    sdk_build = objset.add_root(objset.new_task_build("sdk-rootfs", "sdk-rootfs"))
+    set_timestamp_now(d, sdk_build, "build_buildEndTime")
+
+    objset.new_scoped_relationship(
+        [sdk_build],
+        oe.spdx30.RelationshipType.hasOutputs,
+        oe.spdx30.LifecycleScopeType.build,
+        [sdk_rootfs],
+    )
+
+    collect_build_package_inputs(d, objset, sdk_build, sdk_packages)
+
+    objset.add_aliases()
+    oe.sbom30.write_jsonld_doc(d, objset, spdx_work_dir / "sdk-rootfs.spdx.json")
+
+
+def create_sdk_sbom(d, sdk_deploydir, spdx_work_dir, toolchain_outputname):
+    # Load the document written earlier
+    rootfs_objset = oe.sbom30.load_jsonld(
+        d, spdx_work_dir / "sdk-rootfs.spdx.json", required=True
+    )
+
+    # Create a new build for the SDK installer
+    sdk_build = rootfs_objset.new_task_build("sdk-populate", "sdk-populate")
+    set_timestamp_now(d, sdk_build, "build_buildEndTime")
+
+    rootfs = rootfs_objset.find_root(oe.spdx30.software_Package)
+    if rootfs is None:
+        bb.fatal("Unable to find rootfs artifact")
+
+    rootfs_objset.new_scoped_relationship(
+        [sdk_build],
+        oe.spdx30.RelationshipType.hasInputs,
+        oe.spdx30.LifecycleScopeType.build,
+        [rootfs],
+    )
+
+    files = set()
+    root_files = []
+
+    # NOTE: os.walk() doesn't return symlinks
+    for dirpath, dirnames, filenames in os.walk(sdk_deploydir):
+        for fn in filenames:
+            fpath = Path(dirpath) / fn
+            if not fpath.is_file() or fpath.is_symlink():
+                continue
+
+            relpath = str(fpath.relative_to(sdk_deploydir))
+
+            f = rootfs_objset.new_file(
+                rootfs_objset.new_spdxid("sdk-installer", relpath),
+                relpath,
+                fpath,
+            )
+            set_timestamp_now(d, f, "builtTime")
+
+            if fn.endswith(".manifest"):
+                f.software_primaryPurpose = oe.spdx30.software_SoftwarePurpose.manifest
+            elif fn.endswith(".testdata.json"):
+                f.software_primaryPurpose = (
+                    oe.spdx30.software_SoftwarePurpose.configuration
+                )
+            else:
+                set_purposes(d, f, "SPDX_SDK_PURPOSE")
+                root_files.append(f)
+
+            files.add(f)
+
+    if files:
+        rootfs_objset.new_scoped_relationship(
+            [sdk_build],
+            oe.spdx30.RelationshipType.hasOutputs,
+            oe.spdx30.LifecycleScopeType.build,
+            files,
+        )
+    else:
+        bb.warn(f"No SDK output files found in {sdk_deploydir}")
+
+    objset, sbom = oe.sbom30.create_sbom(
+        d, toolchain_outputname, sorted(list(files)), [rootfs_objset]
+    )
+
+    oe.sbom30.write_jsonld_doc(
+        d, objset, sdk_deploydir / (toolchain_outputname + ".spdx.json")
+    )