| Message ID | 20260331132430.781647-3-ross.burton@arm.com |
|---|---|
| State | Under Review |
| Headers | show |
| Series | [1/3] classes/vex: remove | expand |
Please adapt build scripts on AB before merging this. Peter > -----Original Message----- > From: openembedded-core@lists.openembedded.org <openembedded- > core@lists.openembedded.org> On Behalf Of Ross Burton via > lists.openembedded.org > Sent: Tuesday, March 31, 2026 3:24 PM > To: openembedded-core@lists.openembedded.org > Subject: [OE-core] [PATCH 3/3] classes/cve-check: remove class > > It's been long known that the cve-check class in oe-core is not that > usable in the real world, for more details see "Future of CVE scanning > in Yocto"[1]. This mail proposed an alternative direction that included > a CVE scanning tool that can be ran both during the build and afterwards, > so that periodic scans of a previously build image is possible. > > Last year, Bootlin wrote sbom-cve-check[2] and I compared this to my > proposal in "Comparing cve-check with sbom-cve-check"[3], concluding > that this is likely the missing piece. > > Support for sbom-cve-check has been merged into oe-core, and the > cve-check class is now obsolete. So that we don't have to maintain it for > the four-year lifecycle of the Wrynose release, delete it. > > This patch also deletes the database fetcher recipes, and the test cases > that were specific to cve-check. Note that the oe.cve_check library > still exists as this is used by the SPDX classes. > > [1] https://lore.kernel.org/openembedded-core/7D6E419E-A7AE-4324-966C- > 3552C586E452@arm.com/ > [2] https://github.com/bootlin/sbom-cve-check > [3] https://lore.kernel.org/openembedded-core/2CD10DD9-FB2A-4B10-B98A- > 85918EB6B4B7@arm.com/ > > Signed-off-by: Ross Burton <ross.burton@arm.com> > --- > meta/classes/cve-check.bbclass | 570 ------------------ > meta/conf/distro/include/maintainers.inc | 1 - > meta/conf/documentation.conf | 2 - > meta/lib/oeqa/selftest/cases/cve_check.py | 172 ------ > .../recipes-core/meta/cve-update-db-native.bb | 421 ------------- > .../meta/cve-update-nvd2-native.bb | 422 ------------- > 6 files changed, 1588 deletions(-) > delete mode 100644 meta/classes/cve-check.bbclass > delete mode 100644 meta/recipes-core/meta/cve-update-db-native.bb > delete mode 100644 meta/recipes-core/meta/cve-update-nvd2-native.bb > > diff --git a/meta/classes/cve-check.bbclass b/meta/classes/cve-check.bbclass > deleted file mode 100644 > index c63ebd56e16..00000000000 > --- a/meta/classes/cve-check.bbclass > +++ /dev/null > @@ -1,570 +0,0 @@ > -# > -# Copyright OpenEmbedded Contributors > -# > -# SPDX-License-Identifier: MIT > -# > - > -# This class is used to check recipes against public CVEs. > -# > -# In order to use this class just inherit the class in the > -# local.conf file and it will add the cve_check task for > -# every recipe. The task can be used per recipe, per image, > -# or using the special cases "world" and "universe". The > -# cve_check task will print a warning for every unpatched > -# CVE found and generate a file in the recipe WORKDIR/cve > -# directory. If an image is build it will generate a report > -# in DEPLOY_DIR_IMAGE for all the packages used. > -# > -# Example: > -# bitbake -c cve_check openssl > -# bitbake core-image-sato > -# bitbake -k -c cve_check universe > -# > -# DISCLAIMER > -# > -# This class/tool is meant to be used as support and not > -# the only method to check against CVEs. Running this tool > -# doesn't guarantee your packages are free of CVEs. > - > -# The product name that the CVE database uses defaults to BPN, but may need to > -# be overriden per recipe (for example tiff.bb sets CVE_PRODUCT=libtiff). > -CVE_PRODUCT ??= "${BPN}" > -CVE_VERSION ??= "${PV}" > - > -# Possible database sources: NVD1, NVD2, FKIE > -NVD_DB_VERSION ?= "FKIE" > - > -# Use different file names for each database source, as they synchronize at different > moments, so may be slightly different > -CVE_CHECK_DB_FILENAME ?= "${@'nvdcve_2-2.db' if > d.getVar('NVD_DB_VERSION') == 'NVD2' else 'nvdcve_1-3.db' if > d.getVar('NVD_DB_VERSION') == 'NVD1' else 'nvdfkie_1-1.db'}" > -CVE_CHECK_DB_FETCHER ?= "${@'cve-update-nvd2-native' if > d.getVar('NVD_DB_VERSION') == 'NVD2' else 'cve-update-db-native'}" > -CVE_CHECK_DB_DIR ?= "${STAGING_DIR}/CVE_CHECK" > -CVE_CHECK_DB_FILE ?= > "${CVE_CHECK_DB_DIR}/${CVE_CHECK_DB_FILENAME}" > -CVE_CHECK_DB_FILE_LOCK ?= "${CVE_CHECK_DB_FILE}.lock" > - > -CVE_CHECK_SUMMARY_DIR ?= "${LOG_DIR}/cve" > -CVE_CHECK_SUMMARY_FILE_NAME ?= "cve-summary" > -CVE_CHECK_SUMMARY_FILE_NAME_JSON = "cve-summary.json" > -CVE_CHECK_SUMMARY_INDEX_PATH = "${CVE_CHECK_SUMMARY_DIR}/cve- > summary-index.txt" > - > -CVE_CHECK_LOG_JSON ?= "${T}/cve.json" > - > -CVE_CHECK_DIR ??= "${DEPLOY_DIR}/cve" > -CVE_CHECK_RECIPE_FILE_JSON ?= "${CVE_CHECK_DIR}/${PN}_cve.json" > -CVE_CHECK_MANIFEST_JSON_SUFFIX ?= "json" > -CVE_CHECK_MANIFEST_JSON ?= > "${IMGDEPLOYDIR}/${IMAGE_NAME}.${CVE_CHECK_MANIFEST_JSON_SUFFIX > }" > -CVE_CHECK_COPY_FILES ??= "1" > -CVE_CHECK_CREATE_MANIFEST ??= "1" > - > -# Report Patched or Ignored CVEs > -CVE_CHECK_REPORT_PATCHED ??= "1" > - > -CVE_CHECK_SHOW_WARNINGS ??= "1" > - > -# Provide JSON output > -CVE_CHECK_FORMAT_JSON ??= "1" > - > -# Check for packages without CVEs (no issues or missing product name) > -CVE_CHECK_COVERAGE ??= "1" > - > -# Skip CVE Check for packages (PN) > -CVE_CHECK_SKIP_RECIPE ?= "" > - > -# Replace NVD DB check status for a given CVE. Each of CVE has to be > mentioned > -# separately with optional detail and description for this status. > -# > -# CVE_STATUS[CVE-1234-0001] = "not-applicable-platform: Issue only applies on > Windows" > -# CVE_STATUS[CVE-1234-0002] = "fixed-version: Fixed externally" > -# > -# Settings the same status and reason for multiple CVEs is possible > -# via CVE_STATUS_GROUPS variable. > -# > -# CVE_STATUS_GROUPS = "CVE_STATUS_WIN CVE_STATUS_PATCHED" > -# > -# CVE_STATUS_WIN = "CVE-1234-0001 CVE-1234-0003" > -# CVE_STATUS_WIN[status] = "not-applicable-platform: Issue only applies on > Windows" > -# CVE_STATUS_PATCHED = "CVE-1234-0002 CVE-1234-0004" > -# CVE_STATUS_PATCHED[status] = "fixed-version: Fixed externally" > -# > -# All possible CVE statuses could be found in cve-check-map.conf > -# CVE_CHECK_STATUSMAP[not-applicable-platform] = "Ignored" > -# CVE_CHECK_STATUSMAP[fixed-version] = "Patched" > -# > -# CVE_CHECK_IGNORE is deprecated and CVE_STATUS has to be used instead. > -# Keep CVE_CHECK_IGNORE until other layers migrate to new variables > -CVE_CHECK_IGNORE ?= "" > - > -# Layers to be excluded > -CVE_CHECK_LAYER_EXCLUDELIST ??= "" > - > -# Layers to be included > -CVE_CHECK_LAYER_INCLUDELIST ??= "" > - > - > -# set to "alphabetical" for version using single alphabetical character as increment > release > -CVE_VERSION_SUFFIX ??= "" > - > -python () { > - from oe.cve_check import extend_cve_status > - extend_cve_status(d) > - > - nvd_database_type = d.getVar("NVD_DB_VERSION") > - if nvd_database_type not in ("NVD1", "NVD2", "FKIE"): > - bb.erroronce("Malformed NVD_DB_VERSION, must be one of: NVD1, NVD2, > FKIE. Defaulting to NVD2") > - d.setVar("NVD_DB_VERSION", "NVD2") > -} > - > -def generate_json_report(d, out_path, link_path): > - if os.path.exists(d.getVar("CVE_CHECK_SUMMARY_INDEX_PATH")): > - import json > - from oe.cve_check import cve_check_merge_jsons, update_symlinks > - > - bb.note("Generating JSON CVE summary") > - index_file = d.getVar("CVE_CHECK_SUMMARY_INDEX_PATH") > - summary = {"version":"1", "package": []} > - with open(index_file) as f: > - filename = f.readline() > - while filename: > - with open(filename.rstrip()) as j: > - data = json.load(j) > - cve_check_merge_jsons(summary, data) > - filename = f.readline() > - > - summary["package"].sort(key=lambda d: d['name']) > - > - with open(out_path, "w") as f: > - json.dump(summary, f, indent=2) > - > - update_symlinks(out_path, link_path) > - > -python cve_save_summary_handler () { > - import shutil > - import datetime > - from oe.cve_check import update_symlinks > - > - cve_summary_name = d.getVar("CVE_CHECK_SUMMARY_FILE_NAME") > - cvelogpath = d.getVar("CVE_CHECK_SUMMARY_DIR") > - bb.utils.mkdirhier(cvelogpath) > - > - timestamp = datetime.datetime.now().strftime('%Y%m%d%H%M%S') > - > - if d.getVar("CVE_CHECK_FORMAT_JSON") == "1": > - json_summary_link_name = os.path.join(cvelogpath, > d.getVar("CVE_CHECK_SUMMARY_FILE_NAME_JSON")) > - json_summary_name = os.path.join(cvelogpath, "%s-%s.json" % > (cve_summary_name, timestamp)) > - generate_json_report(d, json_summary_name, json_summary_link_name) > - bb.plain("Complete CVE JSON report summary created at: %s" % > json_summary_link_name) > -} > - > -addhandler cve_save_summary_handler > -cve_save_summary_handler[eventmask] = "bb.event.BuildCompleted" > - > -python do_cve_check () { > - """ > - Check recipe for patched and unpatched CVEs > - """ > - from oe.cve_check import get_patched_cves > - > - with bb.utils.fileslocked([d.getVar("CVE_CHECK_DB_FILE_LOCK")], > shared=True): > - if os.path.exists(d.getVar("CVE_CHECK_DB_FILE")): > - try: > - patched_cves = get_patched_cves(d) > - except FileNotFoundError: > - bb.fatal("Failure in searching patches") > - cve_data, status = check_cves(d, patched_cves) > - if len(cve_data) or (d.getVar("CVE_CHECK_COVERAGE") == "1" and > status): > - get_cve_info(d, cve_data) > - cve_write_data(d, cve_data, status) > - else: > - bb.note("No CVE database found, skipping CVE check") > - > -} > - > -addtask cve_check before do_build > -do_cve_check[depends] = "${CVE_CHECK_DB_FETCHER}:do_unpack" > -do_cve_check[nostamp] = "1" > - > -python cve_check_cleanup () { > - """ > - Delete the file used to gather all the CVE information. > - """ > - bb.utils.remove(e.data.getVar("CVE_CHECK_SUMMARY_INDEX_PATH")) > -} > - > -addhandler cve_check_cleanup > -cve_check_cleanup[eventmask] = "bb.event.BuildCompleted" > - > -python cve_check_write_rootfs_manifest () { > - """ > - Create CVE manifest when building an image > - """ > - > - import shutil > - import json > - from oe.rootfs import image_list_installed_packages > - from oe.cve_check import cve_check_merge_jsons, update_symlinks > - > - if d.getVar("CVE_CHECK_COPY_FILES") == "1": > - deploy_file_json = d.getVar("CVE_CHECK_RECIPE_FILE_JSON") > - if os.path.exists(deploy_file_json): > - bb.utils.remove(deploy_file_json) > - > - # Create a list of relevant recipies > - recipies = set() > - for pkg in list(image_list_installed_packages(d)): > - pkg_info = os.path.join(d.getVar('PKGDATA_DIR'), > - 'runtime-reverse', pkg) > - pkg_data = oe.packagedata.read_pkgdatafile(pkg_info) > - recipies.add(pkg_data["PN"]) > - > - bb.note("Writing rootfs CVE manifest") > - deploy_dir = d.getVar("IMGDEPLOYDIR") > - link_name = d.getVar("IMAGE_LINK_NAME") > - > - json_data = {"version":"1", "package": []} > - text_data = "" > - enable_json = d.getVar("CVE_CHECK_FORMAT_JSON") == "1" > - > - save_pn = d.getVar("PN") > - > - for pkg in recipies: > - # To be able to use the CVE_CHECK_RECIPE_FILE_JSON variable we have > to evaluate > - # it with the different PN names set each time. > - d.setVar("PN", pkg) > - > - if enable_json: > - pkgfilepath = d.getVar("CVE_CHECK_RECIPE_FILE_JSON") > - if os.path.exists(pkgfilepath): > - with open(pkgfilepath) as j: > - data = json.load(j) > - cve_check_merge_jsons(json_data, data) > - > - d.setVar("PN", save_pn) > - > - if enable_json: > - manifest_name_suffix = > d.getVar("CVE_CHECK_MANIFEST_JSON_SUFFIX") > - manifest_name = d.getVar("CVE_CHECK_MANIFEST_JSON") > - > - with open(manifest_name, "w") as f: > - json.dump(json_data, f, indent=2) > - > - if link_name: > - link_path = os.path.join(deploy_dir, "%s.%s" % (link_name, > manifest_name_suffix)) > - update_symlinks(manifest_name, link_path) > - > - bb.plain("Image CVE JSON report stored in: %s" % manifest_name) > -} > - > -ROOTFS_POSTPROCESS_COMMAND:prepend = > "${@'cve_check_write_rootfs_manifest ' if > d.getVar('CVE_CHECK_CREATE_MANIFEST') == '1' else ''}" > -do_rootfs[recrdeptask] += "${@'do_cve_check' if > d.getVar('CVE_CHECK_CREATE_MANIFEST') == '1' else ''}" > -do_populate_sdk[recrdeptask] += "${@'do_cve_check' if > d.getVar('CVE_CHECK_CREATE_MANIFEST') == '1' else ''}" > - > -def cve_is_ignored(d, cve_data, cve): > - if cve not in cve_data: > - return False > - if cve_data[cve]['abbrev-status'] == "Ignored": > - return True > - return False > - > -def cve_is_patched(d, cve_data, cve): > - if cve not in cve_data: > - return False > - if cve_data[cve]['abbrev-status'] == "Patched": > - return True > - return False > - > -def cve_update(d, cve_data, cve, entry): > - # If no entry, just add it > - if cve not in cve_data: > - cve_data[cve] = entry > - return > - # If we are updating, there might be change in the status > - bb.debug(1, "Trying CVE entry update for %s from %s to %s" % (cve, > cve_data[cve]['abbrev-status'], entry['abbrev-status'])) > - if cve_data[cve]['abbrev-status'] == "Unknown": > - cve_data[cve] = entry > - return > - if cve_data[cve]['abbrev-status'] == entry['abbrev-status']: > - return > - # Update like in {'abbrev-status': 'Patched', 'status': 'version-not-in-range'} to > {'abbrev-status': 'Unpatched', 'status': 'version-in-range'} > - if entry['abbrev-status'] == "Unpatched" and cve_data[cve]['abbrev-status'] == > "Patched": > - if entry['status'] == "version-in-range" and cve_data[cve]['status'] == "version- > not-in-range": > - # New result from the scan, vulnerable > - cve_data[cve] = entry > - bb.debug(1, "CVE entry %s update from Patched to Unpatched from the > scan result" % cve) > - return > - if entry['abbrev-status'] == "Patched" and cve_data[cve]['abbrev-status'] == > "Unpatched": > - if entry['status'] == "version-not-in-range" and cve_data[cve]['status'] == > "version-in-range": > - # Range does not match the scan, but we already have a vulnerable match, > ignore > - bb.debug(1, "CVE entry %s update from Patched to Unpatched from the > scan result - not applying" % cve) > - return > - # If we have an "Ignored", it has a priority > - if cve_data[cve]['abbrev-status'] == "Ignored": > - bb.debug(1, "CVE %s not updating because Ignored" % cve) > - return > - bb.warn("Unhandled CVE entry update for %s from %s to %s" % (cve, > cve_data[cve], entry)) > - > -def check_cves(d, cve_data): > - """ > - Connect to the NVD database and find unpatched cves. > - """ > - from oe.cve_check import Version, convert_cve_version, decode_cve_status > - > - pn = d.getVar("PN") > - real_pv = d.getVar("PV") > - suffix = d.getVar("CVE_VERSION_SUFFIX") > - > - cves_status = [] > - cves_in_recipe = False > - # CVE_PRODUCT can contain more than one product (eg. curl/libcurl) > - products = d.getVar("CVE_PRODUCT").split() > - # If this has been unset then we're not scanning for CVEs here (for example, > image recipes) > - if not products: > - return ([], []) > - pv = d.getVar("CVE_VERSION").split("+git")[0] > - > - # If the recipe has been skipped/ignored we return empty lists > - if pn in d.getVar("CVE_CHECK_SKIP_RECIPE").split(): > - bb.note("Recipe has been skipped by cve-check") > - return ([], []) > - > - import sqlite3 > - db_file = d.expand("file:${CVE_CHECK_DB_FILE}?mode=ro") > - conn = sqlite3.connect(db_file, uri=True) > - > - # For each of the known product names (e.g. curl has CPEs using curl and > libcurl)... > - for product in products: > - cves_in_product = False > - if ":" in product: > - vendor, product = product.split(":", 1) > - else: > - vendor = "%" > - > - # Find all relevant CVE IDs. > - cve_cursor = conn.execute("SELECT DISTINCT ID FROM PRODUCTS > WHERE PRODUCT IS ? AND VENDOR LIKE ?", (product, vendor)) > - for cverow in cve_cursor: > - cve = cverow[0] > - > - # Write status once only for each product > - if not cves_in_product: > - cves_status.append([product, True]) > - cves_in_product = True > - cves_in_recipe = True > - > - if cve_is_ignored(d, cve_data, cve): > - bb.note("%s-%s ignores %s" % (product, pv, cve)) > - continue > - elif cve_is_patched(d, cve_data, cve): > - bb.note("%s has been patched" % (cve)) > - continue > - > - vulnerable = False > - ignored = False > - > - product_cursor = conn.execute("SELECT * FROM PRODUCTS WHERE ID > IS ? AND PRODUCT IS ? AND VENDOR LIKE ?", (cve, product, vendor)) > - for row in product_cursor: > - (_, _, _, version_start, operator_start, version_end, operator_end) = row > - #bb.debug(2, "Evaluating row " + str(row)) > - if cve_is_ignored(d, cve_data, cve): > - ignored = True > - > - version_start = convert_cve_version(version_start) > - version_end = convert_cve_version(version_end) > - > - if (operator_start == '=' and pv == version_start) or version_start == '-': > - vulnerable = True > - else: > - if operator_start: > - try: > - vulnerable_start = (operator_start == '>=' and Version(pv,suffix) > >= Version(version_start,suffix)) > - vulnerable_start |= (operator_start == '>' and Version(pv,suffix) > > Version(version_start,suffix)) > - except: > - bb.warn("%s: Failed to compare %s %s %s for %s" % > - (product, pv, operator_start, version_start, cve)) > - vulnerable_start = False > - else: > - vulnerable_start = False > - > - if operator_end: > - try: > - vulnerable_end = (operator_end == '<=' and Version(pv,suffix) > <= Version(version_end,suffix) ) > - vulnerable_end |= (operator_end == '<' and Version(pv,suffix) < > Version(version_end,suffix) ) > - except: > - bb.warn("%s: Failed to compare %s %s %s for %s" % > - (product, pv, operator_end, version_end, cve)) > - vulnerable_end = False > - else: > - vulnerable_end = False > - > - if operator_start and operator_end: > - vulnerable = vulnerable_start and vulnerable_end > - else: > - vulnerable = vulnerable_start or vulnerable_end > - > - if vulnerable: > - if ignored: > - bb.note("%s is ignored in %s-%s" % (cve, pn, real_pv)) > - cve_update(d, cve_data, cve, {"abbrev-status": "Ignored"}) > - else: > - bb.note("%s-%s is vulnerable to %s" % (pn, real_pv, cve)) > - cve_update(d, cve_data, cve, {"abbrev-status": "Unpatched", > "status": "version-in-range"}) > - break > - product_cursor.close() > - > - if not vulnerable: > - bb.note("%s-%s is not vulnerable to %s" % (pn, real_pv, cve)) > - cve_update(d, cve_data, cve, {"abbrev-status": "Patched", "status": > "version-not-in-range"}) > - cve_cursor.close() > - > - if not cves_in_product: > - bb.note("No CVE records found for product %s, pn %s" % (product, pn)) > - cves_status.append([product, False]) > - > - conn.close() > - > - if not cves_in_recipe: > - bb.note("No CVE records for products in recipe %s" % (pn)) > - > - if d.getVar("CVE_CHECK_SHOW_WARNINGS") == "1": > - unpatched_cves = [cve for cve in cve_data if cve_data[cve]["abbrev-status"] > == "Unpatched"] > - if unpatched_cves: > - bb.warn("Found unpatched CVE (%s)" % " ".join(unpatched_cves)) > - > - return (cve_data, cves_status) > - > -def get_cve_info(d, cve_data): > - """ > - Get CVE information from the database. > - """ > - > - import sqlite3 > - > - db_file = d.expand("file:${CVE_CHECK_DB_FILE}?mode=ro") > - conn = sqlite3.connect(db_file, uri=True) > - > - for cve in cve_data: > - cursor = conn.execute("SELECT * FROM NVD WHERE ID IS ?", (cve,)) > - for row in cursor: > - # The CVE itdelf has been added already > - if row[0] not in cve_data: > - bb.note("CVE record %s not present" % row[0]) > - continue > - #cve_data[row[0]] = {} > - cve_data[row[0]]["NVD-summary"] = row[1] > - cve_data[row[0]]["NVD-scorev2"] = row[2] > - cve_data[row[0]]["NVD-scorev3"] = row[3] > - cve_data[row[0]]["NVD-scorev4"] = row[4] > - cve_data[row[0]]["NVD-modified"] = row[5] > - cve_data[row[0]]["NVD-vector"] = row[6] > - cve_data[row[0]]["NVD-vectorString"] = row[7] > - cursor.close() > - conn.close() > - > -def cve_check_write_json_output(d, output, direct_file, deploy_file, manifest_file): > - """ > - Write CVE information in the JSON format: to WORKDIR; and to > - CVE_CHECK_DIR, if CVE manifest if enabled, write fragment > - files that will be assembled at the end in cve_check_write_rootfs_manifest. > - """ > - > - import json > - > - write_string = json.dumps(output, indent=2) > - with open(direct_file, "w") as f: > - bb.note("Writing file %s with CVE information" % direct_file) > - f.write(write_string) > - > - if d.getVar("CVE_CHECK_COPY_FILES") == "1": > - bb.utils.mkdirhier(os.path.dirname(deploy_file)) > - with open(deploy_file, "w") as f: > - f.write(write_string) > - > - if d.getVar("CVE_CHECK_CREATE_MANIFEST") == "1": > - cvelogpath = d.getVar("CVE_CHECK_SUMMARY_DIR") > - index_path = d.getVar("CVE_CHECK_SUMMARY_INDEX_PATH") > - bb.utils.mkdirhier(cvelogpath) > - fragment_file = os.path.basename(deploy_file) > - fragment_path = os.path.join(cvelogpath, fragment_file) > - with open(fragment_path, "w") as f: > - f.write(write_string) > - with open(index_path, "a+") as f: > - f.write("%s\n" % fragment_path) > - > -def cve_write_data_json(d, cve_data, cve_status): > - """ > - Prepare CVE data for the JSON format, then write it. > - """ > - > - output = {"version":"1", "package": []} > - nvd_link = "https://nvd.nist.gov/vuln/detail/" > - > - fdir_name = d.getVar("FILE_DIRNAME") > - layer = fdir_name.split("/")[-3] > - > - include_layers = d.getVar("CVE_CHECK_LAYER_INCLUDELIST").split() > - exclude_layers = d.getVar("CVE_CHECK_LAYER_EXCLUDELIST").split() > - > - report_all = d.getVar("CVE_CHECK_REPORT_PATCHED") == "1" > - > - if exclude_layers and layer in exclude_layers: > - return > - > - if include_layers and layer not in include_layers: > - return > - > - product_data = [] > - for s in cve_status: > - p = {"product": s[0], "cvesInRecord": "Yes"} > - if s[1] == False: > - p["cvesInRecord"] = "No" > - product_data.append(p) > - > - package_version = "%s%s" % (d.getVar("EXTENDPE"), d.getVar("PV")) > - package_data = { > - "name" : d.getVar("PN"), > - "layer" : layer, > - "version" : package_version, > - "products": product_data > - } > - > - cve_list = [] > - > - for cve in sorted(cve_data): > - if not report_all and (cve_data[cve]["abbrev-status"] == "Patched" or > cve_data[cve]["abbrev-status"] == "Ignored"): > - continue > - issue_link = "%s%s" % (nvd_link, cve) > - > - cve_item = { > - "id" : cve, > - "status" : cve_data[cve]["abbrev-status"], > - "link": issue_link, > - } > - if 'NVD-summary' in cve_data[cve]: > - cve_item["summary"] = cve_data[cve]["NVD-summary"] > - cve_item["scorev2"] = cve_data[cve]["NVD-scorev2"] > - cve_item["scorev3"] = cve_data[cve]["NVD-scorev3"] > - cve_item["scorev4"] = cve_data[cve]["NVD-scorev4"] > - cve_item["modified"] = cve_data[cve]["NVD-modified"] > - cve_item["vector"] = cve_data[cve]["NVD-vector"] > - cve_item["vectorString"] = cve_data[cve]["NVD-vectorString"] > - if 'status' in cve_data[cve]: > - cve_item["detail"] = cve_data[cve]["status"] > - if 'justification' in cve_data[cve]: > - cve_item["description"] = cve_data[cve]["justification"] > - if 'resource' in cve_data[cve]: > - cve_item["patch-file"] = cve_data[cve]["resource"] > - cve_list.append(cve_item) > - > - package_data["issue"] = cve_list > - output["package"].append(package_data) > - > - direct_file = d.getVar("CVE_CHECK_LOG_JSON") > - deploy_file = d.getVar("CVE_CHECK_RECIPE_FILE_JSON") > - manifest_file = d.getVar("CVE_CHECK_SUMMARY_FILE_NAME_JSON") > - > - cve_check_write_json_output(d, output, direct_file, deploy_file, manifest_file) > - > -def cve_write_data(d, cve_data, status): > - """ > - Write CVE data in each enabled format. > - """ > - > - if d.getVar("CVE_CHECK_FORMAT_JSON") == "1": > - cve_write_data_json(d, cve_data, status) > diff --git a/meta/conf/distro/include/maintainers.inc > b/meta/conf/distro/include/maintainers.inc > index 1bd43211e23..a429320b88b 100644 > --- a/meta/conf/distro/include/maintainers.inc > +++ b/meta/conf/distro/include/maintainers.inc > @@ -140,7 +140,6 @@ RECIPE_MAINTAINER:pn-cryptodev-module = "Robert > Yang <liezhi.yang@windriver.com> > RECIPE_MAINTAINER:pn-cryptodev-tests = "Robert Yang > <liezhi.yang@windriver.com>" > RECIPE_MAINTAINER:pn-cups = "Chen Qi <Qi.Chen@windriver.com>" > RECIPE_MAINTAINER:pn-curl = "Robert Joslyn <robert.joslyn@redrectangle.org>" > -RECIPE_MAINTAINER:pn-cve-update-nvd2-native = "Ross Burton > <ross.burton@arm.com>" > RECIPE_MAINTAINER:pn-db = "Unassigned <unassigned@yoctoproject.org>" > RECIPE_MAINTAINER:pn-dbus = "Chen Qi <Qi.Chen@windriver.com>" > RECIPE_MAINTAINER:pn-dbus-glib = "Chen Qi <Qi.Chen@windriver.com>" > diff --git a/meta/conf/documentation.conf b/meta/conf/documentation.conf > index 1853676fa06..9d429ba9a31 100644 > --- a/meta/conf/documentation.conf > +++ b/meta/conf/documentation.conf > @@ -121,8 +121,6 @@ CONFLICT_MACHINE_FEATURES[doc] = "When a recipe > inherits the features_check clas > CORE_IMAGE_EXTRA_INSTALL[doc] = "Specifies the list of packages to be > added to the image. You should only set this variable in the conf/local.conf file in the > Build Directory." > COREBASE[doc] = "Specifies the parent directory of the OpenEmbedded Core > Metadata layer (i.e. meta)." > CONF_VERSION[doc] = "Tracks the version of local.conf. Increased each time > build/conf/ changes incompatibly." > -CVE_CHECK_LAYER_EXCLUDELIST[doc] = "Defines which layers to exclude from > cve-check scanning" > -CVE_CHECK_LAYER_INCLUDELIST[doc] = "Defines which layers to include during > cve-check scanning" > > #D > > diff --git a/meta/lib/oeqa/selftest/cases/cve_check.py > b/meta/lib/oeqa/selftest/cases/cve_check.py > index 511e4b81b41..891a7de3317 100644 > --- a/meta/lib/oeqa/selftest/cases/cve_check.py > +++ b/meta/lib/oeqa/selftest/cases/cve_check.py > @@ -4,10 +4,7 @@ > # SPDX-License-Identifier: MIT > # > > -import json > -import os > from oeqa.selftest.case import OESelftestTestCase > -from oeqa.utils.commands import bitbake, get_bb_vars > > class CVECheck(OESelftestTestCase): > > @@ -325,172 +322,3 @@ class CVECheck(OESelftestTestCase): > ), > {"CVE-2019-6461", "CVE-2019-6462", "CVE-2019-6463", "CVE-2019-6464"}, > ) > - > - def test_recipe_report_json(self): > - config = """ > -INHERIT += "cve-check" > -CVE_CHECK_FORMAT_JSON = "1" > -""" > - self.write_config(config) > - > - vars = get_bb_vars(["CVE_CHECK_SUMMARY_DIR", > "CVE_CHECK_SUMMARY_FILE_NAME_JSON"]) > - summary_json = os.path.join(vars["CVE_CHECK_SUMMARY_DIR"], > vars["CVE_CHECK_SUMMARY_FILE_NAME_JSON"]) > - recipe_json = os.path.join(vars["CVE_CHECK_SUMMARY_DIR"], "m4- > native_cve.json") > - > - try: > - os.remove(summary_json) > - os.remove(recipe_json) > - except FileNotFoundError: > - pass > - > - bitbake("m4-native -c cve_check") > - > - def check_m4_json(filename): > - with open(filename) as f: > - report = json.load(f) > - self.assertEqual(report["version"], "1") > - self.assertEqual(len(report["package"]), 1) > - package = report["package"][0] > - self.assertEqual(package["name"], "m4-native") > - found_cves = { issue["id"]: issue["status"] for issue in package["issue"]} > - self.assertIn("CVE-2008-1687", found_cves) > - self.assertEqual(found_cves["CVE-2008-1687"], "Patched") > - > - self.assertExists(summary_json) > - check_m4_json(summary_json) > - self.assertExists(recipe_json) > - check_m4_json(recipe_json) > - > - > - def test_image_json(self): > - config = """ > -INHERIT += "cve-check" > -CVE_CHECK_FORMAT_JSON = "1" > -""" > - self.write_config(config) > - > - vars = get_bb_vars(["CVE_CHECK_DIR", "CVE_CHECK_SUMMARY_DIR", > "CVE_CHECK_SUMMARY_FILE_NAME_JSON"]) > - report_json = os.path.join(vars["CVE_CHECK_SUMMARY_DIR"], > vars["CVE_CHECK_SUMMARY_FILE_NAME_JSON"]) > - print(report_json) > - try: > - os.remove(report_json) > - except FileNotFoundError: > - pass > - > - bitbake("core-image-minimal-initramfs") > - self.assertExists(report_json) > - > - # Check that the summary report lists at least one package > - with open(report_json) as f: > - report = json.load(f) > - self.assertEqual(report["version"], "1") > - self.assertGreater(len(report["package"]), 1) > - > - # Check that a random recipe wrote a recipe report to deploy/cve/ > - recipename = report["package"][0]["name"] > - recipe_report = os.path.join(vars["CVE_CHECK_DIR"], recipename + > "_cve.json") > - self.assertExists(recipe_report) > - with open(recipe_report) as f: > - report = json.load(f) > - self.assertEqual(report["version"], "1") > - self.assertEqual(len(report["package"]), 1) > - self.assertEqual(report["package"][0]["name"], recipename) > - > - > - def test_recipe_report_json_unpatched(self): > - config = """ > -INHERIT += "cve-check" > -CVE_CHECK_FORMAT_JSON = "1" > -CVE_CHECK_REPORT_PATCHED = "0" > -""" > - self.write_config(config) > - > - vars = get_bb_vars(["CVE_CHECK_SUMMARY_DIR", > "CVE_CHECK_SUMMARY_FILE_NAME_JSON"]) > - summary_json = os.path.join(vars["CVE_CHECK_SUMMARY_DIR"], > vars["CVE_CHECK_SUMMARY_FILE_NAME_JSON"]) > - recipe_json = os.path.join(vars["CVE_CHECK_SUMMARY_DIR"], "m4- > native_cve.json") > - > - try: > - os.remove(summary_json) > - os.remove(recipe_json) > - except FileNotFoundError: > - pass > - > - bitbake("m4-native -c cve_check") > - > - def check_m4_json(filename): > - with open(filename) as f: > - report = json.load(f) > - self.assertEqual(report["version"], "1") > - self.assertEqual(len(report["package"]), 1) > - package = report["package"][0] > - self.assertEqual(package["name"], "m4-native") > - #m4 had only Patched CVEs, so the issues array will be empty > - self.assertEqual(package["issue"], []) > - > - self.assertExists(summary_json) > - check_m4_json(summary_json) > - self.assertExists(recipe_json) > - check_m4_json(recipe_json) > - > - > - def test_recipe_report_json_ignored(self): > - config = """ > -INHERIT += "cve-check" > -CVE_CHECK_FORMAT_JSON = "1" > -CVE_CHECK_REPORT_PATCHED = "1" > -""" > - self.write_config(config) > - > - vars = get_bb_vars(["CVE_CHECK_SUMMARY_DIR", > "CVE_CHECK_SUMMARY_FILE_NAME_JSON"]) > - summary_json = os.path.join(vars["CVE_CHECK_SUMMARY_DIR"], > vars["CVE_CHECK_SUMMARY_FILE_NAME_JSON"]) > - recipe_json = os.path.join(vars["CVE_CHECK_SUMMARY_DIR"], > "logrotate_cve.json") > - > - try: > - os.remove(summary_json) > - os.remove(recipe_json) > - except FileNotFoundError: > - pass > - > - bitbake("logrotate -c cve_check") > - > - def check_m4_json(filename): > - with open(filename) as f: > - report = json.load(f) > - self.assertEqual(report["version"], "1") > - self.assertEqual(len(report["package"]), 1) > - package = report["package"][0] > - self.assertEqual(package["name"], "logrotate") > - found_cves = {} > - for issue in package["issue"]: > - found_cves[issue["id"]] = { > - "status" : issue["status"], > - "detail" : issue["detail"] if "detail" in issue else "", > - "description" : issue["description"] if "description" in issue else "" > - } > - # m4 CVE should not be in logrotate > - self.assertNotIn("CVE-2008-1687", found_cves) > - # logrotate has both Patched and Ignored CVEs > - detail = "version-not-in-range" > - self.assertIn("CVE-2011-1098", found_cves) > - self.assertEqual(found_cves["CVE-2011-1098"]["status"], "Patched") > - self.assertEqual(found_cves["CVE-2011-1098"]["detail"], detail) > - self.assertEqual(len(found_cves["CVE-2011-1098"]["description"]), 0) > - detail = "not-applicable-platform" > - description = "CVE is debian, gentoo or SUSE specific on the way logrotate > was installed/used" > - self.assertIn("CVE-2011-1548", found_cves) > - self.assertEqual(found_cves["CVE-2011-1548"]["status"], "Ignored") > - self.assertEqual(found_cves["CVE-2011-1548"]["detail"], detail) > - self.assertEqual(found_cves["CVE-2011-1548"]["description"], description) > - self.assertIn("CVE-2011-1549", found_cves) > - self.assertEqual(found_cves["CVE-2011-1549"]["status"], "Ignored") > - self.assertEqual(found_cves["CVE-2011-1549"]["detail"], detail) > - self.assertEqual(found_cves["CVE-2011-1549"]["description"], description) > - self.assertIn("CVE-2011-1550", found_cves) > - self.assertEqual(found_cves["CVE-2011-1550"]["status"], "Ignored") > - self.assertEqual(found_cves["CVE-2011-1550"]["detail"], detail) > - self.assertEqual(found_cves["CVE-2011-1550"]["description"], description) > - > - self.assertExists(summary_json) > - check_m4_json(summary_json) > - self.assertExists(recipe_json) > - check_m4_json(recipe_json) > diff --git a/meta/recipes-core/meta/cve-update-db-native.bb b/meta/recipes- > core/meta/cve-update-db-native.bb > deleted file mode 100644 > index 01f942dcdbf..00000000000 > --- a/meta/recipes-core/meta/cve-update-db-native.bb > +++ /dev/null > @@ -1,421 +0,0 @@ > -SUMMARY = "Updates the NVD CVE database" > -LICENSE = "MIT" > - > -INHIBIT_DEFAULT_DEPS = "1" > - > -inherit native > - > -deltask do_patch > -deltask do_configure > -deltask do_compile > -deltask do_install > -deltask do_populate_sysroot > - > -NVDCVE_URL ?= "https://nvd.nist.gov/feeds/json/cve/1.1/nvdcve-1.1-" > -FKIE_URL ?= "https://github.com/fkie-cad/nvd-json-data- > feeds/releases/latest/download/CVE-" > - > -# CVE database update interval, in seconds. By default: once a day (23*60*60). > -# Use 0 to force the update > -# Use a negative value to skip the update > -CVE_DB_UPDATE_INTERVAL ?= "82800" > - > -# Timeout for blocking socket operations, such as the connection attempt. > -CVE_SOCKET_TIMEOUT ?= "60" > - > -CVE_CHECK_DB_DLDIR_FILE ?= > "${DL_DIR}/CVE_CHECK2/${CVE_CHECK_DB_FILENAME}" > -CVE_CHECK_DB_DLDIR_LOCK ?= "${CVE_CHECK_DB_DLDIR_FILE}.lock" > -CVE_CHECK_DB_TEMP_FILE ?= "${CVE_CHECK_DB_FILE}.tmp" > - > -python () { > - if not bb.data.inherits_class("cve-check", d): > - raise bb.parse.SkipRecipe("Skip recipe when cve-check class is not loaded.") > -} > - > -python do_fetch() { > - """ > - Update NVD database with json data feed > - """ > - import bb.utils > - import bb.progress > - import shutil > - > - bb.utils.export_proxies(d) > - > - db_file = d.getVar("CVE_CHECK_DB_DLDIR_FILE") > - db_dir = os.path.dirname(db_file) > - db_tmp_file = d.getVar("CVE_CHECK_DB_TEMP_FILE") > - > - cleanup_db_download(db_tmp_file) > - > - # The NVD database changes once a day, so no need to update more frequently > - # Allow the user to force-update > - try: > - import time > - update_interval = int(d.getVar("CVE_DB_UPDATE_INTERVAL")) > - if update_interval < 0: > - bb.note("CVE database update skipped") > - if not os.path.exists(db_file): > - bb.error("CVE database %s not present, database fetch/update skipped" > % db_file) > - return > - curr_time = time.time() > - database_time = os.path.getmtime(db_file) > - bb.note("Current time: %s; DB time: %s" % (time.ctime(curr_time), > time.ctime(database_time))) > - if curr_time < database_time: > - bb.warn("Database time is in the future, force DB update") > - elif curr_time - database_time < update_interval: > - bb.note("CVE database recently updated, skipping") > - return > - > - except OSError: > - pass > - > - if bb.utils.to_boolean(d.getVar("BB_NO_NETWORK")): > - bb.error("BB_NO_NETWORK attempted to disable fetch, this recipe uses > CVE_DB_UPDATE_INTERVAL to control download, set to '-1' to disable fetch or > update") > - > - bb.utils.mkdirhier(db_dir) > - bb.utils.mkdirhier(os.path.dirname(db_tmp_file)) > - if os.path.exists(db_file): > - shutil.copy2(db_file, db_tmp_file) > - > - if update_db_file(db_tmp_file, d): > - # Update downloaded correctly, we can swap files. To avoid potential > - # NFS caching issues, ensure that the destination file has a new inode > - # number. We do this in two steps as the downloads directory may be on > - # a different filesystem to tmpdir we're working in. > - new_file = "%s.new" % (db_file) > - shutil.move(db_tmp_file, new_file) > - os.rename(new_file, db_file) > - else: > - # Update failed, do not modify the database > - bb.warn("CVE database update failed") > - os.remove(db_tmp_file) > -} > - > -do_fetch[lockfiles] += "${CVE_CHECK_DB_DLDIR_LOCK}" > -do_fetch[file-checksums] = "" > -do_fetch[vardeps] = "" > - > -python do_unpack() { > - import shutil > - shutil.copyfile(d.getVar("CVE_CHECK_DB_DLDIR_FILE"), > d.getVar("CVE_CHECK_DB_FILE")) > -} > -do_unpack[lockfiles] += "${CVE_CHECK_DB_DLDIR_LOCK} > ${CVE_CHECK_DB_FILE_LOCK}" > - > -def cleanup_db_download(db_tmp_file): > - """ > - Cleanup the download space from possible failed downloads > - """ > - > - # Clean-up the temporary file downloads, we can remove both journal > - # and the temporary database > - if os.path.exists("{0}-journal".format(db_tmp_file)): > - os.remove("{0}-journal".format(db_tmp_file)) > - if os.path.exists(db_tmp_file): > - os.remove(db_tmp_file) > - > -def db_file_names(d, year, is_nvd): > - if is_nvd: > - year_url = d.getVar('NVDCVE_URL') + str(year) > - meta_url = year_url + ".meta" > - json_url = year_url + ".json.gz" > - return json_url, meta_url > - year_url = d.getVar('FKIE_URL') + str(year) > - meta_url = year_url + ".meta" > - json_url = year_url + ".json.xz" > - return json_url, meta_url > - > -def host_db_name(d, is_nvd): > - if is_nvd: > - return "nvd.nist.gov" > - return "github.com" > - > -def db_decompress(d, data, is_nvd): > - import gzip, lzma > - > - if is_nvd: > - return gzip.decompress(data).decode('utf-8') > - # otherwise > - return lzma.decompress(data) > - > -def update_db_file(db_tmp_file, d): > - """ > - Update the given database file > - """ > - import bb.progress > - import bb.utils > - from datetime import date > - import sqlite3 > - import urllib > - > - YEAR_START = 2002 > - cve_socket_timeout = int(d.getVar("CVE_SOCKET_TIMEOUT")) > - is_nvd = d.getVar("NVD_DB_VERSION") == "NVD1" > - > - # Connect to database > - conn = sqlite3.connect(db_tmp_file) > - initialize_db(conn) > - > - with bb.progress.ProgressHandler(d) as ph, > open(os.path.join(d.getVar("TMPDIR"), 'cve_check'), 'a') as cve_f: > - total_years = date.today().year + 1 - YEAR_START > - for i, year in enumerate(range(YEAR_START, date.today().year + 1)): > - bb.note("Updating %d" % year) > - ph.update((float(i + 1) / total_years) * 100) > - json_url, meta_url = db_file_names(d, year, is_nvd) > - > - # Retrieve meta last modified date > - try: > - response = urllib.request.urlopen(meta_url, timeout=cve_socket_timeout) > - except urllib.error.URLError as e: > - cve_f.write('Warning: CVE db update error, Unable to fetch CVE > data.\n\n') > - bb.warn("Failed to fetch CVE data (%s)" % e) > - import socket > - result = socket.getaddrinfo(host_db_name(d, is_nvd), 443, > proto=socket.IPPROTO_TCP) > - bb.warn("Host IPs are %s" % (", ".join(t[4][0] for t in result))) > - return False > - > - if response: > - for line in response.read().decode("utf-8").splitlines(): > - key, value = line.split(":", 1) > - if key == "lastModifiedDate": > - last_modified = value > - break > - else: > - bb.warn("Cannot parse CVE metadata, update failed") > - return False > - > - # Compare with current db last modified date > - cursor = conn.execute("select DATE from META where YEAR = ?", (year,)) > - meta = cursor.fetchone() > - cursor.close() > - > - if not meta or meta[0] != last_modified: > - bb.note("Updating entries") > - # Clear products table entries corresponding to current year > - conn.execute("delete from PRODUCTS where ID like ?", ('CVE-%d%%' > % year,)).close() > - > - # Update db with current year json file > - try: > - response = urllib.request.urlopen(json_url, > timeout=cve_socket_timeout) > - if response: > - update_db(d, conn, db_decompress(d, response.read(), is_nvd)) > - conn.execute("insert or replace into META values (?, ?)", [year, > last_modified]).close() > - except urllib.error.URLError as e: > - cve_f.write('Warning: CVE db update error, CVE data is outdated.\n\n') > - bb.warn("Cannot parse CVE data (%s), update failed" % e.reason) > - return False > - else: > - bb.debug(2, "Already up to date (last modified %s)" % last_modified) > - # Update success, set the date to cve_check file. > - if year == date.today().year: > - cve_f.write('CVE database update : %s\n\n' % date.today()) > - > - conn.commit() > - conn.close() > - return True > - > -def initialize_db(conn): > - with conn: > - c = conn.cursor() > - > - c.execute("CREATE TABLE IF NOT EXISTS META (YEAR INTEGER > UNIQUE, DATE TEXT)") > - > - c.execute("CREATE TABLE IF NOT EXISTS NVD (ID TEXT UNIQUE, > SUMMARY TEXT, \ > - SCOREV2 TEXT, SCOREV3 TEXT, SCOREV4 TEXT, MODIFIED > INTEGER, VECTOR TEXT, VECTORSTRING TEXT)") > - > - c.execute("CREATE TABLE IF NOT EXISTS PRODUCTS (ID TEXT, \ > - VENDOR TEXT, PRODUCT TEXT, VERSION_START TEXT, > OPERATOR_START TEXT, \ > - VERSION_END TEXT, OPERATOR_END TEXT)") > - c.execute("CREATE INDEX IF NOT EXISTS PRODUCT_ID_IDX on > PRODUCTS(ID);") > - > - c.close() > - > -def parse_node_and_insert(conn, node, cveId, is_nvd): > - # Parse children node if needed > - for child in node.get('children', ()): > - parse_node_and_insert(conn, child, cveId, is_nvd) > - > - def cpe_generator(is_nvd): > - match_string = "cpeMatch" > - cpe_string = 'criteria' > - if is_nvd: > - match_string = "cpe_match" > - cpe_string = 'cpe23Uri' > - > - for cpe in node.get(match_string, ()): > - if not cpe['vulnerable']: > - return > - cpe23 = cpe.get(cpe_string) > - if not cpe23: > - return > - cpe23 = cpe23.split(':') > - if len(cpe23) < 6: > - return > - vendor = cpe23[3] > - product = cpe23[4] > - version = cpe23[5] > - > - if cpe23[6] == '*' or cpe23[6] == '-': > - version_suffix = "" > - else: > - version_suffix = "_" + cpe23[6] > - > - if version != '*' and version != '-': > - # Version is defined, this is a '=' match > - yield [cveId, vendor, product, version + version_suffix, '=', '', ''] > - elif version == '-': > - # no version information is available > - yield [cveId, vendor, product, version, '', '', ''] > - else: > - # Parse start version, end version and operators > - op_start = '' > - op_end = '' > - v_start = '' > - v_end = '' > - > - if 'versionStartIncluding' in cpe: > - op_start = '>=' > - v_start = cpe['versionStartIncluding'] > - > - if 'versionStartExcluding' in cpe: > - op_start = '>' > - v_start = cpe['versionStartExcluding'] > - > - if 'versionEndIncluding' in cpe: > - op_end = '<=' > - v_end = cpe['versionEndIncluding'] > - > - if 'versionEndExcluding' in cpe: > - op_end = '<' > - v_end = cpe['versionEndExcluding'] > - > - if op_start or op_end or v_start or v_end: > - yield [cveId, vendor, product, v_start, op_start, v_end, op_end] > - else: > - # This is no version information, expressed differently. > - # Save processing by representing as -. > - yield [cveId, vendor, product, '-', '', '', ''] > - > - conn.executemany("insert into PRODUCTS values (?, ?, ?, ?, ?, ?, ?)", > cpe_generator(is_nvd)).close() > - > -def update_db_nvdjson(conn, jsondata): > - import json > - root = json.loads(jsondata) > - > - for elt in root['CVE_Items']: > - if not elt['impact']: > - continue > - > - accessVector = None > - vectorString = None > - cvssv2 = 0.0 > - cvssv3 = 0.0 > - cvssv4 = 0.0 > - cveId = elt['cve']['CVE_data_meta']['ID'] > - cveDesc = elt['cve']['description']['description_data'][0]['value'] > - date = elt['lastModifiedDate'] > - try: > - accessVector = elt['impact']['baseMetricV2']['cvssV2']['accessVector'] > - vectorString = elt['impact']['baseMetricV2']['cvssV2']['vectorString'] > - cvssv2 = elt['impact']['baseMetricV2']['cvssV2']['baseScore'] > - except KeyError: > - cvssv2 = 0.0 > - try: > - accessVector = accessVector or > elt['impact']['baseMetricV3']['cvssV3']['attackVector'] > - vectorString = vectorString or > elt['impact']['baseMetricV3']['cvssV3']['vectorString'] > - cvssv3 = elt['impact']['baseMetricV3']['cvssV3']['baseScore'] > - except KeyError: > - accessVector = accessVector or "UNKNOWN" > - cvssv3 = 0.0 > - > - conn.execute("insert or replace into NVD values (?, ?, ?, ?, ?, ?, ?, ?)", > - [cveId, cveDesc, cvssv2, cvssv3, cvssv4, date, accessVector, > vectorString]).close() > - > - configurations = elt['configurations']['nodes'] > - for config in configurations: > - parse_node_and_insert(conn, config, cveId, True) > - > -def get_metric_entry(metric): > - primaries = [c for c in metric if c['type'] == "Primary"] > - secondaries = [c for c in metric if c['type'] == "Secondary"] > - if len(primaries) > 0: > - return primaries[0] > - elif len(secondaries) > 0: > - return secondaries[0] > - return None > - > -def update_db_fkie(conn, jsondata): > - import json > - root = json.loads(jsondata) > - > - for elt in root['cve_items']: > - if 'vulnStatus' not in elt or elt['vulnStatus'] == 'Rejected': > - continue > - > - if 'configurations' not in elt: > - continue > - > - accessVector = None > - vectorString = None > - cvssv2 = 0.0 > - cvssv3 = 0.0 > - cvssv4 = 0.0 > - cveId = elt['id'] > - cveDesc = elt['descriptions'][0]['value'] > - date = elt['lastModified'] > - try: > - if 'cvssMetricV2' in elt['metrics']: > - entry = get_metric_entry(elt['metrics']['cvssMetricV2']) > - if entry: > - accessVector = entry['cvssData']['accessVector'] > - vectorString = entry['cvssData']['vectorString'] > - cvssv2 = entry['cvssData']['baseScore'] > - except KeyError: > - cvssv2 = 0.0 > - try: > - if 'cvssMetricV30' in elt['metrics']: > - entry = get_metric_entry(elt['metrics']['cvssMetricV30']) > - if entry: > - accessVector = entry['cvssData']['attackVector'] > - vectorString = entry['cvssData']['vectorString'] > - cvssv3 = entry['cvssData']['baseScore'] > - except KeyError: > - accessVector = accessVector or "UNKNOWN" > - cvssv3 = 0.0 > - try: > - if 'cvssMetricV31' in elt['metrics']: > - entry = get_metric_entry(elt['metrics']['cvssMetricV31']) > - if entry: > - accessVector = entry['cvssData']['attackVector'] > - vectorString = entry['cvssData']['vectorString'] > - cvssv3 = entry['cvssData']['baseScore'] > - except KeyError: > - accessVector = accessVector or "UNKNOWN" > - cvssv3 = 0.0 > - try: > - if 'cvssMetricV40' in elt['metrics']: > - entry = get_metric_entry(elt['metrics']['cvssMetricV40']) > - if entry: > - accessVector = entry['cvssData']['attackVector'] > - vectorString = entry['cvssData']['vectorString'] > - cvssv4 = entry['cvssData']['baseScore'] > - except KeyError: > - accessVector = accessVector or "UNKNOWN" > - cvssv4 = 0.0 > - > - conn.execute("insert or replace into NVD values (?, ?, ?, ?, ?, ?, ?, ?)", > - [cveId, cveDesc, cvssv2, cvssv3, cvssv4, date, accessVector, > vectorString]).close() > - > - for config in elt['configurations']: > - # This is suboptimal as it doesn't handle AND/OR and negate, but is better > than nothing > - for node in config.get("nodes") or []: > - parse_node_and_insert(conn, node, cveId, False) > - > -def update_db(d, conn, jsondata): > - if (d.getVar("NVD_DB_VERSION") == "FKIE"): > - return update_db_fkie(conn, jsondata) > - else: > - return update_db_nvdjson(conn, jsondata) > - > -do_fetch[nostamp] = "1" > - > -EXCLUDE_FROM_WORLD = "1" > diff --git a/meta/recipes-core/meta/cve-update-nvd2-native.bb b/meta/recipes- > core/meta/cve-update-nvd2-native.bb > deleted file mode 100644 > index 41c34ba0d01..00000000000 > --- a/meta/recipes-core/meta/cve-update-nvd2-native.bb > +++ /dev/null > @@ -1,422 +0,0 @@ > -SUMMARY = "Updates the NVD CVE database" > -LICENSE = "MIT" > - > -# Important note: > -# This product uses the NVD API but is not endorsed or certified by the NVD. > - > -INHIBIT_DEFAULT_DEPS = "1" > - > -inherit native > - > -deltask do_patch > -deltask do_configure > -deltask do_compile > -deltask do_install > -deltask do_populate_sysroot > - > -NVDCVE_URL ?= "https://services.nvd.nist.gov/rest/json/cves/2.0" > - > -# If you have a NVD API key (https://nvd.nist.gov/developers/request-an-api-key) > -# then setting this to get higher rate limits. > -NVDCVE_API_KEY ?= "" > - > -# CVE database update interval, in seconds. By default: once a day (23*60*60). > -# Use 0 to force the update > -# Use a negative value to skip the update > -CVE_DB_UPDATE_INTERVAL ?= "82800" > - > -# CVE database incremental update age threshold, in seconds. If the database is > -# older than this threshold, do a full re-download, else, do an incremental > -# update. By default: the maximum allowed value from NVD: 120 days > (120*24*60*60) > -# Use 0 to force a full download. > -CVE_DB_INCR_UPDATE_AGE_THRES ?= "10368000" > - > -# Number of attempts for each http query to nvd server before giving up > -CVE_DB_UPDATE_ATTEMPTS ?= "5" > - > -CVE_CHECK_DB_DLDIR_FILE ?= > "${DL_DIR}/CVE_CHECK2/${CVE_CHECK_DB_FILENAME}" > -CVE_CHECK_DB_DLDIR_LOCK ?= "${CVE_CHECK_DB_DLDIR_FILE}.lock" > -CVE_CHECK_DB_TEMP_FILE ?= "${CVE_CHECK_DB_FILE}.tmp" > - > -python () { > - if not bb.data.inherits_class("cve-check", d): > - raise bb.parse.SkipRecipe("Skip recipe when cve-check class is not loaded.") > -} > - > -python do_fetch() { > - """ > - Update NVD database with API 2.0 > - """ > - import bb.utils > - import bb.progress > - import shutil > - > - bb.utils.export_proxies(d) > - > - db_file = d.getVar("CVE_CHECK_DB_DLDIR_FILE") > - db_dir = os.path.dirname(db_file) > - db_tmp_file = d.getVar("CVE_CHECK_DB_TEMP_FILE") > - > - cleanup_db_download(db_tmp_file) > - # By default let's update the whole database (since time 0) > - database_time = 0 > - > - # The NVD database changes once a day, so no need to update more frequently > - # Allow the user to force-update > - try: > - import time > - update_interval = int(d.getVar("CVE_DB_UPDATE_INTERVAL")) > - if update_interval < 0: > - bb.note("CVE database update skipped") > - if not os.path.exists(db_file): > - bb.error("CVE database %s not present, database fetch/update skipped" > % db_file) > - return > - curr_time = time.time() > - database_time = os.path.getmtime(db_file) > - bb.note("Current time: %s; DB time: %s" % (time.ctime(curr_time), > time.ctime(database_time))) > - if curr_time < database_time: > - bb.warn("Database time is in the future, force DB update") > - database_time = 0 > - elif curr_time - database_time < update_interval: > - bb.note("CVE database recently updated, skipping") > - return > - > - except OSError: > - pass > - > - if bb.utils.to_boolean(d.getVar("BB_NO_NETWORK")): > - bb.error("BB_NO_NETWORK attempted to disable fetch, this recipe uses > CVE_DB_UPDATE_INTERVAL to control download, set to '-1' to disable fetch or > update") > - > - bb.utils.mkdirhier(db_dir) > - bb.utils.mkdirhier(os.path.dirname(db_tmp_file)) > - if os.path.exists(db_file): > - shutil.copy2(db_file, db_tmp_file) > - > - if update_db_file(db_tmp_file, d, database_time): > - # Update downloaded correctly, we can swap files. To avoid potential > - # NFS caching issues, ensure that the destination file has a new inode > - # number. We do this in two steps as the downloads directory may be on > - # a different filesystem to tmpdir we're working in. > - new_file = "%s.new" % (db_file) > - shutil.move(db_tmp_file, new_file) > - os.rename(new_file, db_file) > - else: > - # Update failed, do not modify the database > - bb.warn("CVE database update failed") > - os.remove(db_tmp_file) > -} > - > -do_fetch[lockfiles] += "${CVE_CHECK_DB_DLDIR_LOCK}" > -do_fetch[file-checksums] = "" > -do_fetch[vardeps] = "" > - > -python do_unpack() { > - import shutil > - shutil.copyfile(d.getVar("CVE_CHECK_DB_DLDIR_FILE"), > d.getVar("CVE_CHECK_DB_FILE")) > -} > -do_unpack[lockfiles] += "${CVE_CHECK_DB_DLDIR_LOCK} > ${CVE_CHECK_DB_FILE_LOCK}" > - > -def cleanup_db_download(db_tmp_file): > - """ > - Cleanup the download space from possible failed downloads > - """ > - > - # Clean-up the temporary file downloads, we can remove both journal > - # and the temporary database > - if os.path.exists("{0}-journal".format(db_tmp_file)): > - os.remove("{0}-journal".format(db_tmp_file)) > - if os.path.exists(db_tmp_file): > - os.remove(db_tmp_file) > - > -def nvd_request_wait(attempt, min_wait): > - return min(((2 * attempt) + min_wait), 30) > - > -def nvd_request_next(url, attempts, api_key, args, min_wait): > - """ > - Request next part of the NVD database > - NVD API documentation: https://nvd.nist.gov/developers/vulnerabilities > - """ > - > - import urllib.request > - import urllib.parse > - import gzip > - import http > - import time > - > - request = urllib.request.Request(url + "?" + urllib.parse.urlencode(args)) > - if api_key: > - request.add_header("apiKey", api_key) > - bb.note("Requesting %s" % request.full_url) > - > - for attempt in range(attempts): > - try: > - r = urllib.request.urlopen(request) > - > - if (r.headers['content-encoding'] == 'gzip'): > - buf = r.read() > - raw_data = gzip.decompress(buf) > - else: > - raw_data = r.read().decode("utf-8") > - > - r.close() > - > - except Exception as e: > - wait_time = nvd_request_wait(attempt, min_wait) > - bb.note("CVE database: received error (%s)" % (e)) > - bb.note("CVE database: retrying download after %d seconds. attempted > (%d/%d)" % (wait_time, attempt+1, attempts)) > - time.sleep(wait_time) > - pass > - else: > - return raw_data > - else: > - # We failed at all attempts > - return None > - > -def update_db_file(db_tmp_file, d, database_time): > - """ > - Update the given database file > - """ > - import bb.progress > - import bb.utils > - import datetime > - import sqlite3 > - import json > - > - # Connect to database > - conn = sqlite3.connect(db_tmp_file) > - initialize_db(conn) > - > - req_args = {'startIndex': 0} > - > - incr_update_threshold = int(d.getVar("CVE_DB_INCR_UPDATE_AGE_THRES")) > - if database_time != 0: > - database_date = datetime.datetime.fromtimestamp(database_time, > tz=datetime.timezone.utc) > - today_date = datetime.datetime.now(tz=datetime.timezone.utc) > - delta = today_date - database_date > - if incr_update_threshold == 0: > - bb.note("CVE database: forced full update") > - elif delta < datetime.timedelta(seconds=incr_update_threshold): > - bb.note("CVE database: performing partial update") > - # The maximum range for time is 120 days > - if delta > datetime.timedelta(days=120): > - bb.error("CVE database: Trying to do an incremental update on a larger > than supported range") > - req_args['lastModStartDate'] = database_date.isoformat() > - req_args['lastModEndDate'] = today_date.isoformat() > - else: > - bb.note("CVE database: file too old, forcing a full update") > - else: > - bb.note("CVE database: no preexisting database, do a full download") > - > - with bb.progress.ProgressHandler(d) as ph, > open(os.path.join(d.getVar("TMPDIR"), 'cve_check'), 'a') as cve_f: > - > - bb.note("Updating entries") > - index = 0 > - url = d.getVar("NVDCVE_URL") > - api_key = d.getVar("NVDCVE_API_KEY") or None > - attempts = int(d.getVar("CVE_DB_UPDATE_ATTEMPTS")) > - > - # Recommended by NVD > - wait_time = 6 > - if api_key: > - wait_time = 2 > - > - while True: > - req_args['startIndex'] = index > - raw_data = nvd_request_next(url, attempts, api_key, req_args, wait_time) > - if raw_data is None: > - # We haven't managed to download data > - return False > - > - # hack for json5 style responses > - if raw_data[-3:] == ',]}': > - bb.note("Removing trailing ',' from nvd response") > - raw_data = raw_data[:-3] + ']}' > - > - data = json.loads(raw_data) > - > - index = data["startIndex"] > - total = data["totalResults"] > - per_page = data["resultsPerPage"] > - bb.note("Got %d entries" % per_page) > - for cve in data["vulnerabilities"]: > - update_db(conn, cve) > - > - index += per_page > - ph.update((float(index) / (total+1)) * 100) > - if index >= total: > - break > - > - # Recommended by NVD > - time.sleep(wait_time) > - > - # Update success, set the date to cve_check file. > - cve_f.write('CVE database update : %s\n\n' % datetime.date.today()) > - > - conn.commit() > - conn.close() > - return True > - > -def initialize_db(conn): > - with conn: > - c = conn.cursor() > - > - c.execute("CREATE TABLE IF NOT EXISTS META (YEAR INTEGER > UNIQUE, DATE TEXT)") > - > - c.execute("CREATE TABLE IF NOT EXISTS NVD (ID TEXT UNIQUE, > SUMMARY TEXT, \ > - SCOREV2 TEXT, SCOREV3 TEXT, SCOREV4 TEXT, MODIFIED > INTEGER, VECTOR TEXT, VECTORSTRING TEXT)") > - > - c.execute("CREATE TABLE IF NOT EXISTS PRODUCTS (ID TEXT, \ > - VENDOR TEXT, PRODUCT TEXT, VERSION_START TEXT, > OPERATOR_START TEXT, \ > - VERSION_END TEXT, OPERATOR_END TEXT)") > - c.execute("CREATE INDEX IF NOT EXISTS PRODUCT_ID_IDX on > PRODUCTS(ID);") > - > - c.close() > - > -def parse_node_and_insert(conn, node, cveId): > - > - def cpe_generator(): > - for cpe in node.get('cpeMatch', ()): > - if not cpe['vulnerable']: > - return > - cpe23 = cpe.get('criteria') > - if not cpe23: > - return > - cpe23 = cpe23.split(':') > - if len(cpe23) < 6: > - return > - vendor = cpe23[3] > - product = cpe23[4] > - version = cpe23[5] > - > - if cpe23[6] == '*' or cpe23[6] == '-': > - version_suffix = "" > - else: > - version_suffix = "_" + cpe23[6] > - > - if version != '*' and version != '-': > - # Version is defined, this is a '=' match > - yield [cveId, vendor, product, version + version_suffix, '=', '', ''] > - elif version == '-': > - # no version information is available > - yield [cveId, vendor, product, version, '', '', ''] > - else: > - # Parse start version, end version and operators > - op_start = '' > - op_end = '' > - v_start = '' > - v_end = '' > - > - if 'versionStartIncluding' in cpe: > - op_start = '>=' > - v_start = cpe['versionStartIncluding'] > - > - if 'versionStartExcluding' in cpe: > - op_start = '>' > - v_start = cpe['versionStartExcluding'] > - > - if 'versionEndIncluding' in cpe: > - op_end = '<=' > - v_end = cpe['versionEndIncluding'] > - > - if 'versionEndExcluding' in cpe: > - op_end = '<' > - v_end = cpe['versionEndExcluding'] > - > - if op_start or op_end or v_start or v_end: > - yield [cveId, vendor, product, v_start, op_start, v_end, op_end] > - else: > - # This is no version information, expressed differently. > - # Save processing by representing as -. > - yield [cveId, vendor, product, '-', '', '', ''] > - > - conn.executemany("insert into PRODUCTS values (?, ?, ?, ?, ?, ?, ?)", > cpe_generator()).close() > - > -def update_db(conn, elt): > - """ > - Update a single entry in the on-disk database > - """ > - > - accessVector = None > - vectorString = None > - cveId = elt['cve']['id'] > - if elt['cve'].get('vulnStatus') == "Rejected": > - c = conn.cursor() > - c.execute("delete from PRODUCTS where ID = ?;", [cveId]) > - c.execute("delete from NVD where ID = ?;", [cveId]) > - c.close() > - return > - cveDesc = "" > - for desc in elt['cve']['descriptions']: > - if desc['lang'] == 'en': > - cveDesc = desc['value'] > - date = elt['cve']['lastModified'] > - > - # Extract maximum CVSS scores from all sources (Primary and Secondary) > - cvssv2 = 0.0 > - try: > - # Iterate through all cvssMetricV2 entries and find the maximum score > - for metric in elt['cve']['metrics']['cvssMetricV2']: > - score = metric['cvssData']['baseScore'] > - if score > cvssv2: > - cvssv2 = score > - accessVector = metric['cvssData']['accessVector'] > - vectorString = metric['cvssData']['vectorString'] > - except KeyError: > - pass > - > - cvssv3 = 0.0 > - try: > - # Iterate through all cvssMetricV30 entries and find the maximum score > - for metric in elt['cve']['metrics']['cvssMetricV30']: > - score = metric['cvssData']['baseScore'] > - if score > cvssv3: > - cvssv3 = score > - accessVector = accessVector or metric['cvssData']['attackVector'] > - vectorString = vectorString or metric['cvssData']['vectorString'] > - except KeyError: > - pass > - > - try: > - # Iterate through all cvssMetricV31 entries and find the maximum score > - for metric in elt['cve']['metrics']['cvssMetricV31']: > - score = metric['cvssData']['baseScore'] > - if score > cvssv3: > - cvssv3 = score > - accessVector = accessVector or metric['cvssData']['attackVector'] > - vectorString = vectorString or metric['cvssData']['vectorString'] > - except KeyError: > - pass > - > - cvssv4 = 0.0 > - try: > - # Iterate through all cvssMetricV40 entries and find the maximum score > - for metric in elt['cve']['metrics']['cvssMetricV40']: > - score = metric['cvssData']['baseScore'] > - if score > cvssv4: > - cvssv4 = score > - accessVector = accessVector or metric['cvssData']['attackVector'] > - vectorString = vectorString or metric['cvssData']['vectorString'] > - except KeyError: > - pass > - > - accessVector = accessVector or "UNKNOWN" > - vectorString = vectorString or "UNKNOWN" > - > - conn.execute("insert or replace into NVD values (?, ?, ?, ?, ?, ?, ?, ?)", > - [cveId, cveDesc, cvssv2, cvssv3, cvssv4, date, accessVector, > vectorString]).close() > - > - try: > - # Remove any pre-existing CVE configuration. Even for partial database > - # update, those will be repopulated. This ensures that old > - # configuration is not kept for an updated CVE. > - conn.execute("delete from PRODUCTS where ID = ?", [cveId]).close() > - for config in elt['cve']['configurations']: > - # This is suboptimal as it doesn't handle AND/OR and negate, but is better > than nothing > - for node in config["nodes"]: > - parse_node_and_insert(conn, node, cveId) > - except KeyError: > - bb.note("CVE %s has no configurations" % cveId) > - > -do_fetch[nostamp] = "1" > - > -EXCLUDE_FROM_WORLD = "1" > -- > 2.43.0
diff --git a/meta/classes/cve-check.bbclass b/meta/classes/cve-check.bbclass deleted file mode 100644 index c63ebd56e16..00000000000 --- a/meta/classes/cve-check.bbclass +++ /dev/null @@ -1,570 +0,0 @@ -# -# Copyright OpenEmbedded Contributors -# -# SPDX-License-Identifier: MIT -# - -# This class is used to check recipes against public CVEs. -# -# In order to use this class just inherit the class in the -# local.conf file and it will add the cve_check task for -# every recipe. The task can be used per recipe, per image, -# or using the special cases "world" and "universe". The -# cve_check task will print a warning for every unpatched -# CVE found and generate a file in the recipe WORKDIR/cve -# directory. If an image is build it will generate a report -# in DEPLOY_DIR_IMAGE for all the packages used. -# -# Example: -# bitbake -c cve_check openssl -# bitbake core-image-sato -# bitbake -k -c cve_check universe -# -# DISCLAIMER -# -# This class/tool is meant to be used as support and not -# the only method to check against CVEs. Running this tool -# doesn't guarantee your packages are free of CVEs. - -# The product name that the CVE database uses defaults to BPN, but may need to -# be overriden per recipe (for example tiff.bb sets CVE_PRODUCT=libtiff). -CVE_PRODUCT ??= "${BPN}" -CVE_VERSION ??= "${PV}" - -# Possible database sources: NVD1, NVD2, FKIE -NVD_DB_VERSION ?= "FKIE" - -# Use different file names for each database source, as they synchronize at different moments, so may be slightly different -CVE_CHECK_DB_FILENAME ?= "${@'nvdcve_2-2.db' if d.getVar('NVD_DB_VERSION') == 'NVD2' else 'nvdcve_1-3.db' if d.getVar('NVD_DB_VERSION') == 'NVD1' else 'nvdfkie_1-1.db'}" -CVE_CHECK_DB_FETCHER ?= "${@'cve-update-nvd2-native' if d.getVar('NVD_DB_VERSION') == 'NVD2' else 'cve-update-db-native'}" -CVE_CHECK_DB_DIR ?= "${STAGING_DIR}/CVE_CHECK" -CVE_CHECK_DB_FILE ?= "${CVE_CHECK_DB_DIR}/${CVE_CHECK_DB_FILENAME}" -CVE_CHECK_DB_FILE_LOCK ?= "${CVE_CHECK_DB_FILE}.lock" - -CVE_CHECK_SUMMARY_DIR ?= "${LOG_DIR}/cve" -CVE_CHECK_SUMMARY_FILE_NAME ?= "cve-summary" -CVE_CHECK_SUMMARY_FILE_NAME_JSON = "cve-summary.json" -CVE_CHECK_SUMMARY_INDEX_PATH = "${CVE_CHECK_SUMMARY_DIR}/cve-summary-index.txt" - -CVE_CHECK_LOG_JSON ?= "${T}/cve.json" - -CVE_CHECK_DIR ??= "${DEPLOY_DIR}/cve" -CVE_CHECK_RECIPE_FILE_JSON ?= "${CVE_CHECK_DIR}/${PN}_cve.json" -CVE_CHECK_MANIFEST_JSON_SUFFIX ?= "json" -CVE_CHECK_MANIFEST_JSON ?= "${IMGDEPLOYDIR}/${IMAGE_NAME}.${CVE_CHECK_MANIFEST_JSON_SUFFIX}" -CVE_CHECK_COPY_FILES ??= "1" -CVE_CHECK_CREATE_MANIFEST ??= "1" - -# Report Patched or Ignored CVEs -CVE_CHECK_REPORT_PATCHED ??= "1" - -CVE_CHECK_SHOW_WARNINGS ??= "1" - -# Provide JSON output -CVE_CHECK_FORMAT_JSON ??= "1" - -# Check for packages without CVEs (no issues or missing product name) -CVE_CHECK_COVERAGE ??= "1" - -# Skip CVE Check for packages (PN) -CVE_CHECK_SKIP_RECIPE ?= "" - -# Replace NVD DB check status for a given CVE. Each of CVE has to be mentioned -# separately with optional detail and description for this status. -# -# CVE_STATUS[CVE-1234-0001] = "not-applicable-platform: Issue only applies on Windows" -# CVE_STATUS[CVE-1234-0002] = "fixed-version: Fixed externally" -# -# Settings the same status and reason for multiple CVEs is possible -# via CVE_STATUS_GROUPS variable. -# -# CVE_STATUS_GROUPS = "CVE_STATUS_WIN CVE_STATUS_PATCHED" -# -# CVE_STATUS_WIN = "CVE-1234-0001 CVE-1234-0003" -# CVE_STATUS_WIN[status] = "not-applicable-platform: Issue only applies on Windows" -# CVE_STATUS_PATCHED = "CVE-1234-0002 CVE-1234-0004" -# CVE_STATUS_PATCHED[status] = "fixed-version: Fixed externally" -# -# All possible CVE statuses could be found in cve-check-map.conf -# CVE_CHECK_STATUSMAP[not-applicable-platform] = "Ignored" -# CVE_CHECK_STATUSMAP[fixed-version] = "Patched" -# -# CVE_CHECK_IGNORE is deprecated and CVE_STATUS has to be used instead. -# Keep CVE_CHECK_IGNORE until other layers migrate to new variables -CVE_CHECK_IGNORE ?= "" - -# Layers to be excluded -CVE_CHECK_LAYER_EXCLUDELIST ??= "" - -# Layers to be included -CVE_CHECK_LAYER_INCLUDELIST ??= "" - - -# set to "alphabetical" for version using single alphabetical character as increment release -CVE_VERSION_SUFFIX ??= "" - -python () { - from oe.cve_check import extend_cve_status - extend_cve_status(d) - - nvd_database_type = d.getVar("NVD_DB_VERSION") - if nvd_database_type not in ("NVD1", "NVD2", "FKIE"): - bb.erroronce("Malformed NVD_DB_VERSION, must be one of: NVD1, NVD2, FKIE. Defaulting to NVD2") - d.setVar("NVD_DB_VERSION", "NVD2") -} - -def generate_json_report(d, out_path, link_path): - if os.path.exists(d.getVar("CVE_CHECK_SUMMARY_INDEX_PATH")): - import json - from oe.cve_check import cve_check_merge_jsons, update_symlinks - - bb.note("Generating JSON CVE summary") - index_file = d.getVar("CVE_CHECK_SUMMARY_INDEX_PATH") - summary = {"version":"1", "package": []} - with open(index_file) as f: - filename = f.readline() - while filename: - with open(filename.rstrip()) as j: - data = json.load(j) - cve_check_merge_jsons(summary, data) - filename = f.readline() - - summary["package"].sort(key=lambda d: d['name']) - - with open(out_path, "w") as f: - json.dump(summary, f, indent=2) - - update_symlinks(out_path, link_path) - -python cve_save_summary_handler () { - import shutil - import datetime - from oe.cve_check import update_symlinks - - cve_summary_name = d.getVar("CVE_CHECK_SUMMARY_FILE_NAME") - cvelogpath = d.getVar("CVE_CHECK_SUMMARY_DIR") - bb.utils.mkdirhier(cvelogpath) - - timestamp = datetime.datetime.now().strftime('%Y%m%d%H%M%S') - - if d.getVar("CVE_CHECK_FORMAT_JSON") == "1": - json_summary_link_name = os.path.join(cvelogpath, d.getVar("CVE_CHECK_SUMMARY_FILE_NAME_JSON")) - json_summary_name = os.path.join(cvelogpath, "%s-%s.json" % (cve_summary_name, timestamp)) - generate_json_report(d, json_summary_name, json_summary_link_name) - bb.plain("Complete CVE JSON report summary created at: %s" % json_summary_link_name) -} - -addhandler cve_save_summary_handler -cve_save_summary_handler[eventmask] = "bb.event.BuildCompleted" - -python do_cve_check () { - """ - Check recipe for patched and unpatched CVEs - """ - from oe.cve_check import get_patched_cves - - with bb.utils.fileslocked([d.getVar("CVE_CHECK_DB_FILE_LOCK")], shared=True): - if os.path.exists(d.getVar("CVE_CHECK_DB_FILE")): - try: - patched_cves = get_patched_cves(d) - except FileNotFoundError: - bb.fatal("Failure in searching patches") - cve_data, status = check_cves(d, patched_cves) - if len(cve_data) or (d.getVar("CVE_CHECK_COVERAGE") == "1" and status): - get_cve_info(d, cve_data) - cve_write_data(d, cve_data, status) - else: - bb.note("No CVE database found, skipping CVE check") - -} - -addtask cve_check before do_build -do_cve_check[depends] = "${CVE_CHECK_DB_FETCHER}:do_unpack" -do_cve_check[nostamp] = "1" - -python cve_check_cleanup () { - """ - Delete the file used to gather all the CVE information. - """ - bb.utils.remove(e.data.getVar("CVE_CHECK_SUMMARY_INDEX_PATH")) -} - -addhandler cve_check_cleanup -cve_check_cleanup[eventmask] = "bb.event.BuildCompleted" - -python cve_check_write_rootfs_manifest () { - """ - Create CVE manifest when building an image - """ - - import shutil - import json - from oe.rootfs import image_list_installed_packages - from oe.cve_check import cve_check_merge_jsons, update_symlinks - - if d.getVar("CVE_CHECK_COPY_FILES") == "1": - deploy_file_json = d.getVar("CVE_CHECK_RECIPE_FILE_JSON") - if os.path.exists(deploy_file_json): - bb.utils.remove(deploy_file_json) - - # Create a list of relevant recipies - recipies = set() - for pkg in list(image_list_installed_packages(d)): - pkg_info = os.path.join(d.getVar('PKGDATA_DIR'), - 'runtime-reverse', pkg) - pkg_data = oe.packagedata.read_pkgdatafile(pkg_info) - recipies.add(pkg_data["PN"]) - - bb.note("Writing rootfs CVE manifest") - deploy_dir = d.getVar("IMGDEPLOYDIR") - link_name = d.getVar("IMAGE_LINK_NAME") - - json_data = {"version":"1", "package": []} - text_data = "" - enable_json = d.getVar("CVE_CHECK_FORMAT_JSON") == "1" - - save_pn = d.getVar("PN") - - for pkg in recipies: - # To be able to use the CVE_CHECK_RECIPE_FILE_JSON variable we have to evaluate - # it with the different PN names set each time. - d.setVar("PN", pkg) - - if enable_json: - pkgfilepath = d.getVar("CVE_CHECK_RECIPE_FILE_JSON") - if os.path.exists(pkgfilepath): - with open(pkgfilepath) as j: - data = json.load(j) - cve_check_merge_jsons(json_data, data) - - d.setVar("PN", save_pn) - - if enable_json: - manifest_name_suffix = d.getVar("CVE_CHECK_MANIFEST_JSON_SUFFIX") - manifest_name = d.getVar("CVE_CHECK_MANIFEST_JSON") - - with open(manifest_name, "w") as f: - json.dump(json_data, f, indent=2) - - if link_name: - link_path = os.path.join(deploy_dir, "%s.%s" % (link_name, manifest_name_suffix)) - update_symlinks(manifest_name, link_path) - - bb.plain("Image CVE JSON report stored in: %s" % manifest_name) -} - -ROOTFS_POSTPROCESS_COMMAND:prepend = "${@'cve_check_write_rootfs_manifest ' if d.getVar('CVE_CHECK_CREATE_MANIFEST') == '1' else ''}" -do_rootfs[recrdeptask] += "${@'do_cve_check' if d.getVar('CVE_CHECK_CREATE_MANIFEST') == '1' else ''}" -do_populate_sdk[recrdeptask] += "${@'do_cve_check' if d.getVar('CVE_CHECK_CREATE_MANIFEST') == '1' else ''}" - -def cve_is_ignored(d, cve_data, cve): - if cve not in cve_data: - return False - if cve_data[cve]['abbrev-status'] == "Ignored": - return True - return False - -def cve_is_patched(d, cve_data, cve): - if cve not in cve_data: - return False - if cve_data[cve]['abbrev-status'] == "Patched": - return True - return False - -def cve_update(d, cve_data, cve, entry): - # If no entry, just add it - if cve not in cve_data: - cve_data[cve] = entry - return - # If we are updating, there might be change in the status - bb.debug(1, "Trying CVE entry update for %s from %s to %s" % (cve, cve_data[cve]['abbrev-status'], entry['abbrev-status'])) - if cve_data[cve]['abbrev-status'] == "Unknown": - cve_data[cve] = entry - return - if cve_data[cve]['abbrev-status'] == entry['abbrev-status']: - return - # Update like in {'abbrev-status': 'Patched', 'status': 'version-not-in-range'} to {'abbrev-status': 'Unpatched', 'status': 'version-in-range'} - if entry['abbrev-status'] == "Unpatched" and cve_data[cve]['abbrev-status'] == "Patched": - if entry['status'] == "version-in-range" and cve_data[cve]['status'] == "version-not-in-range": - # New result from the scan, vulnerable - cve_data[cve] = entry - bb.debug(1, "CVE entry %s update from Patched to Unpatched from the scan result" % cve) - return - if entry['abbrev-status'] == "Patched" and cve_data[cve]['abbrev-status'] == "Unpatched": - if entry['status'] == "version-not-in-range" and cve_data[cve]['status'] == "version-in-range": - # Range does not match the scan, but we already have a vulnerable match, ignore - bb.debug(1, "CVE entry %s update from Patched to Unpatched from the scan result - not applying" % cve) - return - # If we have an "Ignored", it has a priority - if cve_data[cve]['abbrev-status'] == "Ignored": - bb.debug(1, "CVE %s not updating because Ignored" % cve) - return - bb.warn("Unhandled CVE entry update for %s from %s to %s" % (cve, cve_data[cve], entry)) - -def check_cves(d, cve_data): - """ - Connect to the NVD database and find unpatched cves. - """ - from oe.cve_check import Version, convert_cve_version, decode_cve_status - - pn = d.getVar("PN") - real_pv = d.getVar("PV") - suffix = d.getVar("CVE_VERSION_SUFFIX") - - cves_status = [] - cves_in_recipe = False - # CVE_PRODUCT can contain more than one product (eg. curl/libcurl) - products = d.getVar("CVE_PRODUCT").split() - # If this has been unset then we're not scanning for CVEs here (for example, image recipes) - if not products: - return ([], []) - pv = d.getVar("CVE_VERSION").split("+git")[0] - - # If the recipe has been skipped/ignored we return empty lists - if pn in d.getVar("CVE_CHECK_SKIP_RECIPE").split(): - bb.note("Recipe has been skipped by cve-check") - return ([], []) - - import sqlite3 - db_file = d.expand("file:${CVE_CHECK_DB_FILE}?mode=ro") - conn = sqlite3.connect(db_file, uri=True) - - # For each of the known product names (e.g. curl has CPEs using curl and libcurl)... - for product in products: - cves_in_product = False - if ":" in product: - vendor, product = product.split(":", 1) - else: - vendor = "%" - - # Find all relevant CVE IDs. - cve_cursor = conn.execute("SELECT DISTINCT ID FROM PRODUCTS WHERE PRODUCT IS ? AND VENDOR LIKE ?", (product, vendor)) - for cverow in cve_cursor: - cve = cverow[0] - - # Write status once only for each product - if not cves_in_product: - cves_status.append([product, True]) - cves_in_product = True - cves_in_recipe = True - - if cve_is_ignored(d, cve_data, cve): - bb.note("%s-%s ignores %s" % (product, pv, cve)) - continue - elif cve_is_patched(d, cve_data, cve): - bb.note("%s has been patched" % (cve)) - continue - - vulnerable = False - ignored = False - - product_cursor = conn.execute("SELECT * FROM PRODUCTS WHERE ID IS ? AND PRODUCT IS ? AND VENDOR LIKE ?", (cve, product, vendor)) - for row in product_cursor: - (_, _, _, version_start, operator_start, version_end, operator_end) = row - #bb.debug(2, "Evaluating row " + str(row)) - if cve_is_ignored(d, cve_data, cve): - ignored = True - - version_start = convert_cve_version(version_start) - version_end = convert_cve_version(version_end) - - if (operator_start == '=' and pv == version_start) or version_start == '-': - vulnerable = True - else: - if operator_start: - try: - vulnerable_start = (operator_start == '>=' and Version(pv,suffix) >= Version(version_start,suffix)) - vulnerable_start |= (operator_start == '>' and Version(pv,suffix) > Version(version_start,suffix)) - except: - bb.warn("%s: Failed to compare %s %s %s for %s" % - (product, pv, operator_start, version_start, cve)) - vulnerable_start = False - else: - vulnerable_start = False - - if operator_end: - try: - vulnerable_end = (operator_end == '<=' and Version(pv,suffix) <= Version(version_end,suffix) ) - vulnerable_end |= (operator_end == '<' and Version(pv,suffix) < Version(version_end,suffix) ) - except: - bb.warn("%s: Failed to compare %s %s %s for %s" % - (product, pv, operator_end, version_end, cve)) - vulnerable_end = False - else: - vulnerable_end = False - - if operator_start and operator_end: - vulnerable = vulnerable_start and vulnerable_end - else: - vulnerable = vulnerable_start or vulnerable_end - - if vulnerable: - if ignored: - bb.note("%s is ignored in %s-%s" % (cve, pn, real_pv)) - cve_update(d, cve_data, cve, {"abbrev-status": "Ignored"}) - else: - bb.note("%s-%s is vulnerable to %s" % (pn, real_pv, cve)) - cve_update(d, cve_data, cve, {"abbrev-status": "Unpatched", "status": "version-in-range"}) - break - product_cursor.close() - - if not vulnerable: - bb.note("%s-%s is not vulnerable to %s" % (pn, real_pv, cve)) - cve_update(d, cve_data, cve, {"abbrev-status": "Patched", "status": "version-not-in-range"}) - cve_cursor.close() - - if not cves_in_product: - bb.note("No CVE records found for product %s, pn %s" % (product, pn)) - cves_status.append([product, False]) - - conn.close() - - if not cves_in_recipe: - bb.note("No CVE records for products in recipe %s" % (pn)) - - if d.getVar("CVE_CHECK_SHOW_WARNINGS") == "1": - unpatched_cves = [cve for cve in cve_data if cve_data[cve]["abbrev-status"] == "Unpatched"] - if unpatched_cves: - bb.warn("Found unpatched CVE (%s)" % " ".join(unpatched_cves)) - - return (cve_data, cves_status) - -def get_cve_info(d, cve_data): - """ - Get CVE information from the database. - """ - - import sqlite3 - - db_file = d.expand("file:${CVE_CHECK_DB_FILE}?mode=ro") - conn = sqlite3.connect(db_file, uri=True) - - for cve in cve_data: - cursor = conn.execute("SELECT * FROM NVD WHERE ID IS ?", (cve,)) - for row in cursor: - # The CVE itdelf has been added already - if row[0] not in cve_data: - bb.note("CVE record %s not present" % row[0]) - continue - #cve_data[row[0]] = {} - cve_data[row[0]]["NVD-summary"] = row[1] - cve_data[row[0]]["NVD-scorev2"] = row[2] - cve_data[row[0]]["NVD-scorev3"] = row[3] - cve_data[row[0]]["NVD-scorev4"] = row[4] - cve_data[row[0]]["NVD-modified"] = row[5] - cve_data[row[0]]["NVD-vector"] = row[6] - cve_data[row[0]]["NVD-vectorString"] = row[7] - cursor.close() - conn.close() - -def cve_check_write_json_output(d, output, direct_file, deploy_file, manifest_file): - """ - Write CVE information in the JSON format: to WORKDIR; and to - CVE_CHECK_DIR, if CVE manifest if enabled, write fragment - files that will be assembled at the end in cve_check_write_rootfs_manifest. - """ - - import json - - write_string = json.dumps(output, indent=2) - with open(direct_file, "w") as f: - bb.note("Writing file %s with CVE information" % direct_file) - f.write(write_string) - - if d.getVar("CVE_CHECK_COPY_FILES") == "1": - bb.utils.mkdirhier(os.path.dirname(deploy_file)) - with open(deploy_file, "w") as f: - f.write(write_string) - - if d.getVar("CVE_CHECK_CREATE_MANIFEST") == "1": - cvelogpath = d.getVar("CVE_CHECK_SUMMARY_DIR") - index_path = d.getVar("CVE_CHECK_SUMMARY_INDEX_PATH") - bb.utils.mkdirhier(cvelogpath) - fragment_file = os.path.basename(deploy_file) - fragment_path = os.path.join(cvelogpath, fragment_file) - with open(fragment_path, "w") as f: - f.write(write_string) - with open(index_path, "a+") as f: - f.write("%s\n" % fragment_path) - -def cve_write_data_json(d, cve_data, cve_status): - """ - Prepare CVE data for the JSON format, then write it. - """ - - output = {"version":"1", "package": []} - nvd_link = "https://nvd.nist.gov/vuln/detail/" - - fdir_name = d.getVar("FILE_DIRNAME") - layer = fdir_name.split("/")[-3] - - include_layers = d.getVar("CVE_CHECK_LAYER_INCLUDELIST").split() - exclude_layers = d.getVar("CVE_CHECK_LAYER_EXCLUDELIST").split() - - report_all = d.getVar("CVE_CHECK_REPORT_PATCHED") == "1" - - if exclude_layers and layer in exclude_layers: - return - - if include_layers and layer not in include_layers: - return - - product_data = [] - for s in cve_status: - p = {"product": s[0], "cvesInRecord": "Yes"} - if s[1] == False: - p["cvesInRecord"] = "No" - product_data.append(p) - - package_version = "%s%s" % (d.getVar("EXTENDPE"), d.getVar("PV")) - package_data = { - "name" : d.getVar("PN"), - "layer" : layer, - "version" : package_version, - "products": product_data - } - - cve_list = [] - - for cve in sorted(cve_data): - if not report_all and (cve_data[cve]["abbrev-status"] == "Patched" or cve_data[cve]["abbrev-status"] == "Ignored"): - continue - issue_link = "%s%s" % (nvd_link, cve) - - cve_item = { - "id" : cve, - "status" : cve_data[cve]["abbrev-status"], - "link": issue_link, - } - if 'NVD-summary' in cve_data[cve]: - cve_item["summary"] = cve_data[cve]["NVD-summary"] - cve_item["scorev2"] = cve_data[cve]["NVD-scorev2"] - cve_item["scorev3"] = cve_data[cve]["NVD-scorev3"] - cve_item["scorev4"] = cve_data[cve]["NVD-scorev4"] - cve_item["modified"] = cve_data[cve]["NVD-modified"] - cve_item["vector"] = cve_data[cve]["NVD-vector"] - cve_item["vectorString"] = cve_data[cve]["NVD-vectorString"] - if 'status' in cve_data[cve]: - cve_item["detail"] = cve_data[cve]["status"] - if 'justification' in cve_data[cve]: - cve_item["description"] = cve_data[cve]["justification"] - if 'resource' in cve_data[cve]: - cve_item["patch-file"] = cve_data[cve]["resource"] - cve_list.append(cve_item) - - package_data["issue"] = cve_list - output["package"].append(package_data) - - direct_file = d.getVar("CVE_CHECK_LOG_JSON") - deploy_file = d.getVar("CVE_CHECK_RECIPE_FILE_JSON") - manifest_file = d.getVar("CVE_CHECK_SUMMARY_FILE_NAME_JSON") - - cve_check_write_json_output(d, output, direct_file, deploy_file, manifest_file) - -def cve_write_data(d, cve_data, status): - """ - Write CVE data in each enabled format. - """ - - if d.getVar("CVE_CHECK_FORMAT_JSON") == "1": - cve_write_data_json(d, cve_data, status) diff --git a/meta/conf/distro/include/maintainers.inc b/meta/conf/distro/include/maintainers.inc index 1bd43211e23..a429320b88b 100644 --- a/meta/conf/distro/include/maintainers.inc +++ b/meta/conf/distro/include/maintainers.inc @@ -140,7 +140,6 @@ RECIPE_MAINTAINER:pn-cryptodev-module = "Robert Yang <liezhi.yang@windriver.com> RECIPE_MAINTAINER:pn-cryptodev-tests = "Robert Yang <liezhi.yang@windriver.com>" RECIPE_MAINTAINER:pn-cups = "Chen Qi <Qi.Chen@windriver.com>" RECIPE_MAINTAINER:pn-curl = "Robert Joslyn <robert.joslyn@redrectangle.org>" -RECIPE_MAINTAINER:pn-cve-update-nvd2-native = "Ross Burton <ross.burton@arm.com>" RECIPE_MAINTAINER:pn-db = "Unassigned <unassigned@yoctoproject.org>" RECIPE_MAINTAINER:pn-dbus = "Chen Qi <Qi.Chen@windriver.com>" RECIPE_MAINTAINER:pn-dbus-glib = "Chen Qi <Qi.Chen@windriver.com>" diff --git a/meta/conf/documentation.conf b/meta/conf/documentation.conf index 1853676fa06..9d429ba9a31 100644 --- a/meta/conf/documentation.conf +++ b/meta/conf/documentation.conf @@ -121,8 +121,6 @@ CONFLICT_MACHINE_FEATURES[doc] = "When a recipe inherits the features_check clas CORE_IMAGE_EXTRA_INSTALL[doc] = "Specifies the list of packages to be added to the image. You should only set this variable in the conf/local.conf file in the Build Directory." COREBASE[doc] = "Specifies the parent directory of the OpenEmbedded Core Metadata layer (i.e. meta)." CONF_VERSION[doc] = "Tracks the version of local.conf. Increased each time build/conf/ changes incompatibly." -CVE_CHECK_LAYER_EXCLUDELIST[doc] = "Defines which layers to exclude from cve-check scanning" -CVE_CHECK_LAYER_INCLUDELIST[doc] = "Defines which layers to include during cve-check scanning" #D diff --git a/meta/lib/oeqa/selftest/cases/cve_check.py b/meta/lib/oeqa/selftest/cases/cve_check.py index 511e4b81b41..891a7de3317 100644 --- a/meta/lib/oeqa/selftest/cases/cve_check.py +++ b/meta/lib/oeqa/selftest/cases/cve_check.py @@ -4,10 +4,7 @@ # SPDX-License-Identifier: MIT # -import json -import os from oeqa.selftest.case import OESelftestTestCase -from oeqa.utils.commands import bitbake, get_bb_vars class CVECheck(OESelftestTestCase): @@ -325,172 +322,3 @@ class CVECheck(OESelftestTestCase): ), {"CVE-2019-6461", "CVE-2019-6462", "CVE-2019-6463", "CVE-2019-6464"}, ) - - def test_recipe_report_json(self): - config = """ -INHERIT += "cve-check" -CVE_CHECK_FORMAT_JSON = "1" -""" - self.write_config(config) - - vars = get_bb_vars(["CVE_CHECK_SUMMARY_DIR", "CVE_CHECK_SUMMARY_FILE_NAME_JSON"]) - summary_json = os.path.join(vars["CVE_CHECK_SUMMARY_DIR"], vars["CVE_CHECK_SUMMARY_FILE_NAME_JSON"]) - recipe_json = os.path.join(vars["CVE_CHECK_SUMMARY_DIR"], "m4-native_cve.json") - - try: - os.remove(summary_json) - os.remove(recipe_json) - except FileNotFoundError: - pass - - bitbake("m4-native -c cve_check") - - def check_m4_json(filename): - with open(filename) as f: - report = json.load(f) - self.assertEqual(report["version"], "1") - self.assertEqual(len(report["package"]), 1) - package = report["package"][0] - self.assertEqual(package["name"], "m4-native") - found_cves = { issue["id"]: issue["status"] for issue in package["issue"]} - self.assertIn("CVE-2008-1687", found_cves) - self.assertEqual(found_cves["CVE-2008-1687"], "Patched") - - self.assertExists(summary_json) - check_m4_json(summary_json) - self.assertExists(recipe_json) - check_m4_json(recipe_json) - - - def test_image_json(self): - config = """ -INHERIT += "cve-check" -CVE_CHECK_FORMAT_JSON = "1" -""" - self.write_config(config) - - vars = get_bb_vars(["CVE_CHECK_DIR", "CVE_CHECK_SUMMARY_DIR", "CVE_CHECK_SUMMARY_FILE_NAME_JSON"]) - report_json = os.path.join(vars["CVE_CHECK_SUMMARY_DIR"], vars["CVE_CHECK_SUMMARY_FILE_NAME_JSON"]) - print(report_json) - try: - os.remove(report_json) - except FileNotFoundError: - pass - - bitbake("core-image-minimal-initramfs") - self.assertExists(report_json) - - # Check that the summary report lists at least one package - with open(report_json) as f: - report = json.load(f) - self.assertEqual(report["version"], "1") - self.assertGreater(len(report["package"]), 1) - - # Check that a random recipe wrote a recipe report to deploy/cve/ - recipename = report["package"][0]["name"] - recipe_report = os.path.join(vars["CVE_CHECK_DIR"], recipename + "_cve.json") - self.assertExists(recipe_report) - with open(recipe_report) as f: - report = json.load(f) - self.assertEqual(report["version"], "1") - self.assertEqual(len(report["package"]), 1) - self.assertEqual(report["package"][0]["name"], recipename) - - - def test_recipe_report_json_unpatched(self): - config = """ -INHERIT += "cve-check" -CVE_CHECK_FORMAT_JSON = "1" -CVE_CHECK_REPORT_PATCHED = "0" -""" - self.write_config(config) - - vars = get_bb_vars(["CVE_CHECK_SUMMARY_DIR", "CVE_CHECK_SUMMARY_FILE_NAME_JSON"]) - summary_json = os.path.join(vars["CVE_CHECK_SUMMARY_DIR"], vars["CVE_CHECK_SUMMARY_FILE_NAME_JSON"]) - recipe_json = os.path.join(vars["CVE_CHECK_SUMMARY_DIR"], "m4-native_cve.json") - - try: - os.remove(summary_json) - os.remove(recipe_json) - except FileNotFoundError: - pass - - bitbake("m4-native -c cve_check") - - def check_m4_json(filename): - with open(filename) as f: - report = json.load(f) - self.assertEqual(report["version"], "1") - self.assertEqual(len(report["package"]), 1) - package = report["package"][0] - self.assertEqual(package["name"], "m4-native") - #m4 had only Patched CVEs, so the issues array will be empty - self.assertEqual(package["issue"], []) - - self.assertExists(summary_json) - check_m4_json(summary_json) - self.assertExists(recipe_json) - check_m4_json(recipe_json) - - - def test_recipe_report_json_ignored(self): - config = """ -INHERIT += "cve-check" -CVE_CHECK_FORMAT_JSON = "1" -CVE_CHECK_REPORT_PATCHED = "1" -""" - self.write_config(config) - - vars = get_bb_vars(["CVE_CHECK_SUMMARY_DIR", "CVE_CHECK_SUMMARY_FILE_NAME_JSON"]) - summary_json = os.path.join(vars["CVE_CHECK_SUMMARY_DIR"], vars["CVE_CHECK_SUMMARY_FILE_NAME_JSON"]) - recipe_json = os.path.join(vars["CVE_CHECK_SUMMARY_DIR"], "logrotate_cve.json") - - try: - os.remove(summary_json) - os.remove(recipe_json) - except FileNotFoundError: - pass - - bitbake("logrotate -c cve_check") - - def check_m4_json(filename): - with open(filename) as f: - report = json.load(f) - self.assertEqual(report["version"], "1") - self.assertEqual(len(report["package"]), 1) - package = report["package"][0] - self.assertEqual(package["name"], "logrotate") - found_cves = {} - for issue in package["issue"]: - found_cves[issue["id"]] = { - "status" : issue["status"], - "detail" : issue["detail"] if "detail" in issue else "", - "description" : issue["description"] if "description" in issue else "" - } - # m4 CVE should not be in logrotate - self.assertNotIn("CVE-2008-1687", found_cves) - # logrotate has both Patched and Ignored CVEs - detail = "version-not-in-range" - self.assertIn("CVE-2011-1098", found_cves) - self.assertEqual(found_cves["CVE-2011-1098"]["status"], "Patched") - self.assertEqual(found_cves["CVE-2011-1098"]["detail"], detail) - self.assertEqual(len(found_cves["CVE-2011-1098"]["description"]), 0) - detail = "not-applicable-platform" - description = "CVE is debian, gentoo or SUSE specific on the way logrotate was installed/used" - self.assertIn("CVE-2011-1548", found_cves) - self.assertEqual(found_cves["CVE-2011-1548"]["status"], "Ignored") - self.assertEqual(found_cves["CVE-2011-1548"]["detail"], detail) - self.assertEqual(found_cves["CVE-2011-1548"]["description"], description) - self.assertIn("CVE-2011-1549", found_cves) - self.assertEqual(found_cves["CVE-2011-1549"]["status"], "Ignored") - self.assertEqual(found_cves["CVE-2011-1549"]["detail"], detail) - self.assertEqual(found_cves["CVE-2011-1549"]["description"], description) - self.assertIn("CVE-2011-1550", found_cves) - self.assertEqual(found_cves["CVE-2011-1550"]["status"], "Ignored") - self.assertEqual(found_cves["CVE-2011-1550"]["detail"], detail) - self.assertEqual(found_cves["CVE-2011-1550"]["description"], description) - - self.assertExists(summary_json) - check_m4_json(summary_json) - self.assertExists(recipe_json) - check_m4_json(recipe_json) diff --git a/meta/recipes-core/meta/cve-update-db-native.bb b/meta/recipes-core/meta/cve-update-db-native.bb deleted file mode 100644 index 01f942dcdbf..00000000000 --- a/meta/recipes-core/meta/cve-update-db-native.bb +++ /dev/null @@ -1,421 +0,0 @@ -SUMMARY = "Updates the NVD CVE database" -LICENSE = "MIT" - -INHIBIT_DEFAULT_DEPS = "1" - -inherit native - -deltask do_patch -deltask do_configure -deltask do_compile -deltask do_install -deltask do_populate_sysroot - -NVDCVE_URL ?= "https://nvd.nist.gov/feeds/json/cve/1.1/nvdcve-1.1-" -FKIE_URL ?= "https://github.com/fkie-cad/nvd-json-data-feeds/releases/latest/download/CVE-" - -# CVE database update interval, in seconds. By default: once a day (23*60*60). -# Use 0 to force the update -# Use a negative value to skip the update -CVE_DB_UPDATE_INTERVAL ?= "82800" - -# Timeout for blocking socket operations, such as the connection attempt. -CVE_SOCKET_TIMEOUT ?= "60" - -CVE_CHECK_DB_DLDIR_FILE ?= "${DL_DIR}/CVE_CHECK2/${CVE_CHECK_DB_FILENAME}" -CVE_CHECK_DB_DLDIR_LOCK ?= "${CVE_CHECK_DB_DLDIR_FILE}.lock" -CVE_CHECK_DB_TEMP_FILE ?= "${CVE_CHECK_DB_FILE}.tmp" - -python () { - if not bb.data.inherits_class("cve-check", d): - raise bb.parse.SkipRecipe("Skip recipe when cve-check class is not loaded.") -} - -python do_fetch() { - """ - Update NVD database with json data feed - """ - import bb.utils - import bb.progress - import shutil - - bb.utils.export_proxies(d) - - db_file = d.getVar("CVE_CHECK_DB_DLDIR_FILE") - db_dir = os.path.dirname(db_file) - db_tmp_file = d.getVar("CVE_CHECK_DB_TEMP_FILE") - - cleanup_db_download(db_tmp_file) - - # The NVD database changes once a day, so no need to update more frequently - # Allow the user to force-update - try: - import time - update_interval = int(d.getVar("CVE_DB_UPDATE_INTERVAL")) - if update_interval < 0: - bb.note("CVE database update skipped") - if not os.path.exists(db_file): - bb.error("CVE database %s not present, database fetch/update skipped" % db_file) - return - curr_time = time.time() - database_time = os.path.getmtime(db_file) - bb.note("Current time: %s; DB time: %s" % (time.ctime(curr_time), time.ctime(database_time))) - if curr_time < database_time: - bb.warn("Database time is in the future, force DB update") - elif curr_time - database_time < update_interval: - bb.note("CVE database recently updated, skipping") - return - - except OSError: - pass - - if bb.utils.to_boolean(d.getVar("BB_NO_NETWORK")): - bb.error("BB_NO_NETWORK attempted to disable fetch, this recipe uses CVE_DB_UPDATE_INTERVAL to control download, set to '-1' to disable fetch or update") - - bb.utils.mkdirhier(db_dir) - bb.utils.mkdirhier(os.path.dirname(db_tmp_file)) - if os.path.exists(db_file): - shutil.copy2(db_file, db_tmp_file) - - if update_db_file(db_tmp_file, d): - # Update downloaded correctly, we can swap files. To avoid potential - # NFS caching issues, ensure that the destination file has a new inode - # number. We do this in two steps as the downloads directory may be on - # a different filesystem to tmpdir we're working in. - new_file = "%s.new" % (db_file) - shutil.move(db_tmp_file, new_file) - os.rename(new_file, db_file) - else: - # Update failed, do not modify the database - bb.warn("CVE database update failed") - os.remove(db_tmp_file) -} - -do_fetch[lockfiles] += "${CVE_CHECK_DB_DLDIR_LOCK}" -do_fetch[file-checksums] = "" -do_fetch[vardeps] = "" - -python do_unpack() { - import shutil - shutil.copyfile(d.getVar("CVE_CHECK_DB_DLDIR_FILE"), d.getVar("CVE_CHECK_DB_FILE")) -} -do_unpack[lockfiles] += "${CVE_CHECK_DB_DLDIR_LOCK} ${CVE_CHECK_DB_FILE_LOCK}" - -def cleanup_db_download(db_tmp_file): - """ - Cleanup the download space from possible failed downloads - """ - - # Clean-up the temporary file downloads, we can remove both journal - # and the temporary database - if os.path.exists("{0}-journal".format(db_tmp_file)): - os.remove("{0}-journal".format(db_tmp_file)) - if os.path.exists(db_tmp_file): - os.remove(db_tmp_file) - -def db_file_names(d, year, is_nvd): - if is_nvd: - year_url = d.getVar('NVDCVE_URL') + str(year) - meta_url = year_url + ".meta" - json_url = year_url + ".json.gz" - return json_url, meta_url - year_url = d.getVar('FKIE_URL') + str(year) - meta_url = year_url + ".meta" - json_url = year_url + ".json.xz" - return json_url, meta_url - -def host_db_name(d, is_nvd): - if is_nvd: - return "nvd.nist.gov" - return "github.com" - -def db_decompress(d, data, is_nvd): - import gzip, lzma - - if is_nvd: - return gzip.decompress(data).decode('utf-8') - # otherwise - return lzma.decompress(data) - -def update_db_file(db_tmp_file, d): - """ - Update the given database file - """ - import bb.progress - import bb.utils - from datetime import date - import sqlite3 - import urllib - - YEAR_START = 2002 - cve_socket_timeout = int(d.getVar("CVE_SOCKET_TIMEOUT")) - is_nvd = d.getVar("NVD_DB_VERSION") == "NVD1" - - # Connect to database - conn = sqlite3.connect(db_tmp_file) - initialize_db(conn) - - with bb.progress.ProgressHandler(d) as ph, open(os.path.join(d.getVar("TMPDIR"), 'cve_check'), 'a') as cve_f: - total_years = date.today().year + 1 - YEAR_START - for i, year in enumerate(range(YEAR_START, date.today().year + 1)): - bb.note("Updating %d" % year) - ph.update((float(i + 1) / total_years) * 100) - json_url, meta_url = db_file_names(d, year, is_nvd) - - # Retrieve meta last modified date - try: - response = urllib.request.urlopen(meta_url, timeout=cve_socket_timeout) - except urllib.error.URLError as e: - cve_f.write('Warning: CVE db update error, Unable to fetch CVE data.\n\n') - bb.warn("Failed to fetch CVE data (%s)" % e) - import socket - result = socket.getaddrinfo(host_db_name(d, is_nvd), 443, proto=socket.IPPROTO_TCP) - bb.warn("Host IPs are %s" % (", ".join(t[4][0] for t in result))) - return False - - if response: - for line in response.read().decode("utf-8").splitlines(): - key, value = line.split(":", 1) - if key == "lastModifiedDate": - last_modified = value - break - else: - bb.warn("Cannot parse CVE metadata, update failed") - return False - - # Compare with current db last modified date - cursor = conn.execute("select DATE from META where YEAR = ?", (year,)) - meta = cursor.fetchone() - cursor.close() - - if not meta or meta[0] != last_modified: - bb.note("Updating entries") - # Clear products table entries corresponding to current year - conn.execute("delete from PRODUCTS where ID like ?", ('CVE-%d%%' % year,)).close() - - # Update db with current year json file - try: - response = urllib.request.urlopen(json_url, timeout=cve_socket_timeout) - if response: - update_db(d, conn, db_decompress(d, response.read(), is_nvd)) - conn.execute("insert or replace into META values (?, ?)", [year, last_modified]).close() - except urllib.error.URLError as e: - cve_f.write('Warning: CVE db update error, CVE data is outdated.\n\n') - bb.warn("Cannot parse CVE data (%s), update failed" % e.reason) - return False - else: - bb.debug(2, "Already up to date (last modified %s)" % last_modified) - # Update success, set the date to cve_check file. - if year == date.today().year: - cve_f.write('CVE database update : %s\n\n' % date.today()) - - conn.commit() - conn.close() - return True - -def initialize_db(conn): - with conn: - c = conn.cursor() - - c.execute("CREATE TABLE IF NOT EXISTS META (YEAR INTEGER UNIQUE, DATE TEXT)") - - c.execute("CREATE TABLE IF NOT EXISTS NVD (ID TEXT UNIQUE, SUMMARY TEXT, \ - SCOREV2 TEXT, SCOREV3 TEXT, SCOREV4 TEXT, MODIFIED INTEGER, VECTOR TEXT, VECTORSTRING TEXT)") - - c.execute("CREATE TABLE IF NOT EXISTS PRODUCTS (ID TEXT, \ - VENDOR TEXT, PRODUCT TEXT, VERSION_START TEXT, OPERATOR_START TEXT, \ - VERSION_END TEXT, OPERATOR_END TEXT)") - c.execute("CREATE INDEX IF NOT EXISTS PRODUCT_ID_IDX on PRODUCTS(ID);") - - c.close() - -def parse_node_and_insert(conn, node, cveId, is_nvd): - # Parse children node if needed - for child in node.get('children', ()): - parse_node_and_insert(conn, child, cveId, is_nvd) - - def cpe_generator(is_nvd): - match_string = "cpeMatch" - cpe_string = 'criteria' - if is_nvd: - match_string = "cpe_match" - cpe_string = 'cpe23Uri' - - for cpe in node.get(match_string, ()): - if not cpe['vulnerable']: - return - cpe23 = cpe.get(cpe_string) - if not cpe23: - return - cpe23 = cpe23.split(':') - if len(cpe23) < 6: - return - vendor = cpe23[3] - product = cpe23[4] - version = cpe23[5] - - if cpe23[6] == '*' or cpe23[6] == '-': - version_suffix = "" - else: - version_suffix = "_" + cpe23[6] - - if version != '*' and version != '-': - # Version is defined, this is a '=' match - yield [cveId, vendor, product, version + version_suffix, '=', '', ''] - elif version == '-': - # no version information is available - yield [cveId, vendor, product, version, '', '', ''] - else: - # Parse start version, end version and operators - op_start = '' - op_end = '' - v_start = '' - v_end = '' - - if 'versionStartIncluding' in cpe: - op_start = '>=' - v_start = cpe['versionStartIncluding'] - - if 'versionStartExcluding' in cpe: - op_start = '>' - v_start = cpe['versionStartExcluding'] - - if 'versionEndIncluding' in cpe: - op_end = '<=' - v_end = cpe['versionEndIncluding'] - - if 'versionEndExcluding' in cpe: - op_end = '<' - v_end = cpe['versionEndExcluding'] - - if op_start or op_end or v_start or v_end: - yield [cveId, vendor, product, v_start, op_start, v_end, op_end] - else: - # This is no version information, expressed differently. - # Save processing by representing as -. - yield [cveId, vendor, product, '-', '', '', ''] - - conn.executemany("insert into PRODUCTS values (?, ?, ?, ?, ?, ?, ?)", cpe_generator(is_nvd)).close() - -def update_db_nvdjson(conn, jsondata): - import json - root = json.loads(jsondata) - - for elt in root['CVE_Items']: - if not elt['impact']: - continue - - accessVector = None - vectorString = None - cvssv2 = 0.0 - cvssv3 = 0.0 - cvssv4 = 0.0 - cveId = elt['cve']['CVE_data_meta']['ID'] - cveDesc = elt['cve']['description']['description_data'][0]['value'] - date = elt['lastModifiedDate'] - try: - accessVector = elt['impact']['baseMetricV2']['cvssV2']['accessVector'] - vectorString = elt['impact']['baseMetricV2']['cvssV2']['vectorString'] - cvssv2 = elt['impact']['baseMetricV2']['cvssV2']['baseScore'] - except KeyError: - cvssv2 = 0.0 - try: - accessVector = accessVector or elt['impact']['baseMetricV3']['cvssV3']['attackVector'] - vectorString = vectorString or elt['impact']['baseMetricV3']['cvssV3']['vectorString'] - cvssv3 = elt['impact']['baseMetricV3']['cvssV3']['baseScore'] - except KeyError: - accessVector = accessVector or "UNKNOWN" - cvssv3 = 0.0 - - conn.execute("insert or replace into NVD values (?, ?, ?, ?, ?, ?, ?, ?)", - [cveId, cveDesc, cvssv2, cvssv3, cvssv4, date, accessVector, vectorString]).close() - - configurations = elt['configurations']['nodes'] - for config in configurations: - parse_node_and_insert(conn, config, cveId, True) - -def get_metric_entry(metric): - primaries = [c for c in metric if c['type'] == "Primary"] - secondaries = [c for c in metric if c['type'] == "Secondary"] - if len(primaries) > 0: - return primaries[0] - elif len(secondaries) > 0: - return secondaries[0] - return None - -def update_db_fkie(conn, jsondata): - import json - root = json.loads(jsondata) - - for elt in root['cve_items']: - if 'vulnStatus' not in elt or elt['vulnStatus'] == 'Rejected': - continue - - if 'configurations' not in elt: - continue - - accessVector = None - vectorString = None - cvssv2 = 0.0 - cvssv3 = 0.0 - cvssv4 = 0.0 - cveId = elt['id'] - cveDesc = elt['descriptions'][0]['value'] - date = elt['lastModified'] - try: - if 'cvssMetricV2' in elt['metrics']: - entry = get_metric_entry(elt['metrics']['cvssMetricV2']) - if entry: - accessVector = entry['cvssData']['accessVector'] - vectorString = entry['cvssData']['vectorString'] - cvssv2 = entry['cvssData']['baseScore'] - except KeyError: - cvssv2 = 0.0 - try: - if 'cvssMetricV30' in elt['metrics']: - entry = get_metric_entry(elt['metrics']['cvssMetricV30']) - if entry: - accessVector = entry['cvssData']['attackVector'] - vectorString = entry['cvssData']['vectorString'] - cvssv3 = entry['cvssData']['baseScore'] - except KeyError: - accessVector = accessVector or "UNKNOWN" - cvssv3 = 0.0 - try: - if 'cvssMetricV31' in elt['metrics']: - entry = get_metric_entry(elt['metrics']['cvssMetricV31']) - if entry: - accessVector = entry['cvssData']['attackVector'] - vectorString = entry['cvssData']['vectorString'] - cvssv3 = entry['cvssData']['baseScore'] - except KeyError: - accessVector = accessVector or "UNKNOWN" - cvssv3 = 0.0 - try: - if 'cvssMetricV40' in elt['metrics']: - entry = get_metric_entry(elt['metrics']['cvssMetricV40']) - if entry: - accessVector = entry['cvssData']['attackVector'] - vectorString = entry['cvssData']['vectorString'] - cvssv4 = entry['cvssData']['baseScore'] - except KeyError: - accessVector = accessVector or "UNKNOWN" - cvssv4 = 0.0 - - conn.execute("insert or replace into NVD values (?, ?, ?, ?, ?, ?, ?, ?)", - [cveId, cveDesc, cvssv2, cvssv3, cvssv4, date, accessVector, vectorString]).close() - - for config in elt['configurations']: - # This is suboptimal as it doesn't handle AND/OR and negate, but is better than nothing - for node in config.get("nodes") or []: - parse_node_and_insert(conn, node, cveId, False) - -def update_db(d, conn, jsondata): - if (d.getVar("NVD_DB_VERSION") == "FKIE"): - return update_db_fkie(conn, jsondata) - else: - return update_db_nvdjson(conn, jsondata) - -do_fetch[nostamp] = "1" - -EXCLUDE_FROM_WORLD = "1" diff --git a/meta/recipes-core/meta/cve-update-nvd2-native.bb b/meta/recipes-core/meta/cve-update-nvd2-native.bb deleted file mode 100644 index 41c34ba0d01..00000000000 --- a/meta/recipes-core/meta/cve-update-nvd2-native.bb +++ /dev/null @@ -1,422 +0,0 @@ -SUMMARY = "Updates the NVD CVE database" -LICENSE = "MIT" - -# Important note: -# This product uses the NVD API but is not endorsed or certified by the NVD. - -INHIBIT_DEFAULT_DEPS = "1" - -inherit native - -deltask do_patch -deltask do_configure -deltask do_compile -deltask do_install -deltask do_populate_sysroot - -NVDCVE_URL ?= "https://services.nvd.nist.gov/rest/json/cves/2.0" - -# If you have a NVD API key (https://nvd.nist.gov/developers/request-an-api-key) -# then setting this to get higher rate limits. -NVDCVE_API_KEY ?= "" - -# CVE database update interval, in seconds. By default: once a day (23*60*60). -# Use 0 to force the update -# Use a negative value to skip the update -CVE_DB_UPDATE_INTERVAL ?= "82800" - -# CVE database incremental update age threshold, in seconds. If the database is -# older than this threshold, do a full re-download, else, do an incremental -# update. By default: the maximum allowed value from NVD: 120 days (120*24*60*60) -# Use 0 to force a full download. -CVE_DB_INCR_UPDATE_AGE_THRES ?= "10368000" - -# Number of attempts for each http query to nvd server before giving up -CVE_DB_UPDATE_ATTEMPTS ?= "5" - -CVE_CHECK_DB_DLDIR_FILE ?= "${DL_DIR}/CVE_CHECK2/${CVE_CHECK_DB_FILENAME}" -CVE_CHECK_DB_DLDIR_LOCK ?= "${CVE_CHECK_DB_DLDIR_FILE}.lock" -CVE_CHECK_DB_TEMP_FILE ?= "${CVE_CHECK_DB_FILE}.tmp" - -python () { - if not bb.data.inherits_class("cve-check", d): - raise bb.parse.SkipRecipe("Skip recipe when cve-check class is not loaded.") -} - -python do_fetch() { - """ - Update NVD database with API 2.0 - """ - import bb.utils - import bb.progress - import shutil - - bb.utils.export_proxies(d) - - db_file = d.getVar("CVE_CHECK_DB_DLDIR_FILE") - db_dir = os.path.dirname(db_file) - db_tmp_file = d.getVar("CVE_CHECK_DB_TEMP_FILE") - - cleanup_db_download(db_tmp_file) - # By default let's update the whole database (since time 0) - database_time = 0 - - # The NVD database changes once a day, so no need to update more frequently - # Allow the user to force-update - try: - import time - update_interval = int(d.getVar("CVE_DB_UPDATE_INTERVAL")) - if update_interval < 0: - bb.note("CVE database update skipped") - if not os.path.exists(db_file): - bb.error("CVE database %s not present, database fetch/update skipped" % db_file) - return - curr_time = time.time() - database_time = os.path.getmtime(db_file) - bb.note("Current time: %s; DB time: %s" % (time.ctime(curr_time), time.ctime(database_time))) - if curr_time < database_time: - bb.warn("Database time is in the future, force DB update") - database_time = 0 - elif curr_time - database_time < update_interval: - bb.note("CVE database recently updated, skipping") - return - - except OSError: - pass - - if bb.utils.to_boolean(d.getVar("BB_NO_NETWORK")): - bb.error("BB_NO_NETWORK attempted to disable fetch, this recipe uses CVE_DB_UPDATE_INTERVAL to control download, set to '-1' to disable fetch or update") - - bb.utils.mkdirhier(db_dir) - bb.utils.mkdirhier(os.path.dirname(db_tmp_file)) - if os.path.exists(db_file): - shutil.copy2(db_file, db_tmp_file) - - if update_db_file(db_tmp_file, d, database_time): - # Update downloaded correctly, we can swap files. To avoid potential - # NFS caching issues, ensure that the destination file has a new inode - # number. We do this in two steps as the downloads directory may be on - # a different filesystem to tmpdir we're working in. - new_file = "%s.new" % (db_file) - shutil.move(db_tmp_file, new_file) - os.rename(new_file, db_file) - else: - # Update failed, do not modify the database - bb.warn("CVE database update failed") - os.remove(db_tmp_file) -} - -do_fetch[lockfiles] += "${CVE_CHECK_DB_DLDIR_LOCK}" -do_fetch[file-checksums] = "" -do_fetch[vardeps] = "" - -python do_unpack() { - import shutil - shutil.copyfile(d.getVar("CVE_CHECK_DB_DLDIR_FILE"), d.getVar("CVE_CHECK_DB_FILE")) -} -do_unpack[lockfiles] += "${CVE_CHECK_DB_DLDIR_LOCK} ${CVE_CHECK_DB_FILE_LOCK}" - -def cleanup_db_download(db_tmp_file): - """ - Cleanup the download space from possible failed downloads - """ - - # Clean-up the temporary file downloads, we can remove both journal - # and the temporary database - if os.path.exists("{0}-journal".format(db_tmp_file)): - os.remove("{0}-journal".format(db_tmp_file)) - if os.path.exists(db_tmp_file): - os.remove(db_tmp_file) - -def nvd_request_wait(attempt, min_wait): - return min(((2 * attempt) + min_wait), 30) - -def nvd_request_next(url, attempts, api_key, args, min_wait): - """ - Request next part of the NVD database - NVD API documentation: https://nvd.nist.gov/developers/vulnerabilities - """ - - import urllib.request - import urllib.parse - import gzip - import http - import time - - request = urllib.request.Request(url + "?" + urllib.parse.urlencode(args)) - if api_key: - request.add_header("apiKey", api_key) - bb.note("Requesting %s" % request.full_url) - - for attempt in range(attempts): - try: - r = urllib.request.urlopen(request) - - if (r.headers['content-encoding'] == 'gzip'): - buf = r.read() - raw_data = gzip.decompress(buf) - else: - raw_data = r.read().decode("utf-8") - - r.close() - - except Exception as e: - wait_time = nvd_request_wait(attempt, min_wait) - bb.note("CVE database: received error (%s)" % (e)) - bb.note("CVE database: retrying download after %d seconds. attempted (%d/%d)" % (wait_time, attempt+1, attempts)) - time.sleep(wait_time) - pass - else: - return raw_data - else: - # We failed at all attempts - return None - -def update_db_file(db_tmp_file, d, database_time): - """ - Update the given database file - """ - import bb.progress - import bb.utils - import datetime - import sqlite3 - import json - - # Connect to database - conn = sqlite3.connect(db_tmp_file) - initialize_db(conn) - - req_args = {'startIndex': 0} - - incr_update_threshold = int(d.getVar("CVE_DB_INCR_UPDATE_AGE_THRES")) - if database_time != 0: - database_date = datetime.datetime.fromtimestamp(database_time, tz=datetime.timezone.utc) - today_date = datetime.datetime.now(tz=datetime.timezone.utc) - delta = today_date - database_date - if incr_update_threshold == 0: - bb.note("CVE database: forced full update") - elif delta < datetime.timedelta(seconds=incr_update_threshold): - bb.note("CVE database: performing partial update") - # The maximum range for time is 120 days - if delta > datetime.timedelta(days=120): - bb.error("CVE database: Trying to do an incremental update on a larger than supported range") - req_args['lastModStartDate'] = database_date.isoformat() - req_args['lastModEndDate'] = today_date.isoformat() - else: - bb.note("CVE database: file too old, forcing a full update") - else: - bb.note("CVE database: no preexisting database, do a full download") - - with bb.progress.ProgressHandler(d) as ph, open(os.path.join(d.getVar("TMPDIR"), 'cve_check'), 'a') as cve_f: - - bb.note("Updating entries") - index = 0 - url = d.getVar("NVDCVE_URL") - api_key = d.getVar("NVDCVE_API_KEY") or None - attempts = int(d.getVar("CVE_DB_UPDATE_ATTEMPTS")) - - # Recommended by NVD - wait_time = 6 - if api_key: - wait_time = 2 - - while True: - req_args['startIndex'] = index - raw_data = nvd_request_next(url, attempts, api_key, req_args, wait_time) - if raw_data is None: - # We haven't managed to download data - return False - - # hack for json5 style responses - if raw_data[-3:] == ',]}': - bb.note("Removing trailing ',' from nvd response") - raw_data = raw_data[:-3] + ']}' - - data = json.loads(raw_data) - - index = data["startIndex"] - total = data["totalResults"] - per_page = data["resultsPerPage"] - bb.note("Got %d entries" % per_page) - for cve in data["vulnerabilities"]: - update_db(conn, cve) - - index += per_page - ph.update((float(index) / (total+1)) * 100) - if index >= total: - break - - # Recommended by NVD - time.sleep(wait_time) - - # Update success, set the date to cve_check file. - cve_f.write('CVE database update : %s\n\n' % datetime.date.today()) - - conn.commit() - conn.close() - return True - -def initialize_db(conn): - with conn: - c = conn.cursor() - - c.execute("CREATE TABLE IF NOT EXISTS META (YEAR INTEGER UNIQUE, DATE TEXT)") - - c.execute("CREATE TABLE IF NOT EXISTS NVD (ID TEXT UNIQUE, SUMMARY TEXT, \ - SCOREV2 TEXT, SCOREV3 TEXT, SCOREV4 TEXT, MODIFIED INTEGER, VECTOR TEXT, VECTORSTRING TEXT)") - - c.execute("CREATE TABLE IF NOT EXISTS PRODUCTS (ID TEXT, \ - VENDOR TEXT, PRODUCT TEXT, VERSION_START TEXT, OPERATOR_START TEXT, \ - VERSION_END TEXT, OPERATOR_END TEXT)") - c.execute("CREATE INDEX IF NOT EXISTS PRODUCT_ID_IDX on PRODUCTS(ID);") - - c.close() - -def parse_node_and_insert(conn, node, cveId): - - def cpe_generator(): - for cpe in node.get('cpeMatch', ()): - if not cpe['vulnerable']: - return - cpe23 = cpe.get('criteria') - if not cpe23: - return - cpe23 = cpe23.split(':') - if len(cpe23) < 6: - return - vendor = cpe23[3] - product = cpe23[4] - version = cpe23[5] - - if cpe23[6] == '*' or cpe23[6] == '-': - version_suffix = "" - else: - version_suffix = "_" + cpe23[6] - - if version != '*' and version != '-': - # Version is defined, this is a '=' match - yield [cveId, vendor, product, version + version_suffix, '=', '', ''] - elif version == '-': - # no version information is available - yield [cveId, vendor, product, version, '', '', ''] - else: - # Parse start version, end version and operators - op_start = '' - op_end = '' - v_start = '' - v_end = '' - - if 'versionStartIncluding' in cpe: - op_start = '>=' - v_start = cpe['versionStartIncluding'] - - if 'versionStartExcluding' in cpe: - op_start = '>' - v_start = cpe['versionStartExcluding'] - - if 'versionEndIncluding' in cpe: - op_end = '<=' - v_end = cpe['versionEndIncluding'] - - if 'versionEndExcluding' in cpe: - op_end = '<' - v_end = cpe['versionEndExcluding'] - - if op_start or op_end or v_start or v_end: - yield [cveId, vendor, product, v_start, op_start, v_end, op_end] - else: - # This is no version information, expressed differently. - # Save processing by representing as -. - yield [cveId, vendor, product, '-', '', '', ''] - - conn.executemany("insert into PRODUCTS values (?, ?, ?, ?, ?, ?, ?)", cpe_generator()).close() - -def update_db(conn, elt): - """ - Update a single entry in the on-disk database - """ - - accessVector = None - vectorString = None - cveId = elt['cve']['id'] - if elt['cve'].get('vulnStatus') == "Rejected": - c = conn.cursor() - c.execute("delete from PRODUCTS where ID = ?;", [cveId]) - c.execute("delete from NVD where ID = ?;", [cveId]) - c.close() - return - cveDesc = "" - for desc in elt['cve']['descriptions']: - if desc['lang'] == 'en': - cveDesc = desc['value'] - date = elt['cve']['lastModified'] - - # Extract maximum CVSS scores from all sources (Primary and Secondary) - cvssv2 = 0.0 - try: - # Iterate through all cvssMetricV2 entries and find the maximum score - for metric in elt['cve']['metrics']['cvssMetricV2']: - score = metric['cvssData']['baseScore'] - if score > cvssv2: - cvssv2 = score - accessVector = metric['cvssData']['accessVector'] - vectorString = metric['cvssData']['vectorString'] - except KeyError: - pass - - cvssv3 = 0.0 - try: - # Iterate through all cvssMetricV30 entries and find the maximum score - for metric in elt['cve']['metrics']['cvssMetricV30']: - score = metric['cvssData']['baseScore'] - if score > cvssv3: - cvssv3 = score - accessVector = accessVector or metric['cvssData']['attackVector'] - vectorString = vectorString or metric['cvssData']['vectorString'] - except KeyError: - pass - - try: - # Iterate through all cvssMetricV31 entries and find the maximum score - for metric in elt['cve']['metrics']['cvssMetricV31']: - score = metric['cvssData']['baseScore'] - if score > cvssv3: - cvssv3 = score - accessVector = accessVector or metric['cvssData']['attackVector'] - vectorString = vectorString or metric['cvssData']['vectorString'] - except KeyError: - pass - - cvssv4 = 0.0 - try: - # Iterate through all cvssMetricV40 entries and find the maximum score - for metric in elt['cve']['metrics']['cvssMetricV40']: - score = metric['cvssData']['baseScore'] - if score > cvssv4: - cvssv4 = score - accessVector = accessVector or metric['cvssData']['attackVector'] - vectorString = vectorString or metric['cvssData']['vectorString'] - except KeyError: - pass - - accessVector = accessVector or "UNKNOWN" - vectorString = vectorString or "UNKNOWN" - - conn.execute("insert or replace into NVD values (?, ?, ?, ?, ?, ?, ?, ?)", - [cveId, cveDesc, cvssv2, cvssv3, cvssv4, date, accessVector, vectorString]).close() - - try: - # Remove any pre-existing CVE configuration. Even for partial database - # update, those will be repopulated. This ensures that old - # configuration is not kept for an updated CVE. - conn.execute("delete from PRODUCTS where ID = ?", [cveId]).close() - for config in elt['cve']['configurations']: - # This is suboptimal as it doesn't handle AND/OR and negate, but is better than nothing - for node in config["nodes"]: - parse_node_and_insert(conn, node, cveId) - except KeyError: - bb.note("CVE %s has no configurations" % cveId) - -do_fetch[nostamp] = "1" - -EXCLUDE_FROM_WORLD = "1"
It's been long known that the cve-check class in oe-core is not that usable in the real world, for more details see "Future of CVE scanning in Yocto"[1]. This mail proposed an alternative direction that included a CVE scanning tool that can be ran both during the build and afterwards, so that periodic scans of a previously build image is possible. Last year, Bootlin wrote sbom-cve-check[2] and I compared this to my proposal in "Comparing cve-check with sbom-cve-check"[3], concluding that this is likely the missing piece. Support for sbom-cve-check has been merged into oe-core, and the cve-check class is now obsolete. So that we don't have to maintain it for the four-year lifecycle of the Wrynose release, delete it. This patch also deletes the database fetcher recipes, and the test cases that were specific to cve-check. Note that the oe.cve_check library still exists as this is used by the SPDX classes. [1] https://lore.kernel.org/openembedded-core/7D6E419E-A7AE-4324-966C-3552C586E452@arm.com/ [2] https://github.com/bootlin/sbom-cve-check [3] https://lore.kernel.org/openembedded-core/2CD10DD9-FB2A-4B10-B98A-85918EB6B4B7@arm.com/ Signed-off-by: Ross Burton <ross.burton@arm.com> --- meta/classes/cve-check.bbclass | 570 ------------------ meta/conf/distro/include/maintainers.inc | 1 - meta/conf/documentation.conf | 2 - meta/lib/oeqa/selftest/cases/cve_check.py | 172 ------ .../recipes-core/meta/cve-update-db-native.bb | 421 ------------- .../meta/cve-update-nvd2-native.bb | 422 ------------- 6 files changed, 1588 deletions(-) delete mode 100644 meta/classes/cve-check.bbclass delete mode 100644 meta/recipes-core/meta/cve-update-db-native.bb delete mode 100644 meta/recipes-core/meta/cve-update-nvd2-native.bb