new file mode 100644
@@ -0,0 +1,141 @@
+# Copyright (C) 2024-2025 Weidmueller Interface GmbH & Co. KG
+# Stefan Herbrechtsmeier <stefan.herbrechtsmeier@weidmueller.com>
+#
+# SPDX-License-Identifier: MIT
+#
+import base64
+import glob
+import json
+import os
+import shutil
+import urllib.parse
+import bb
+import oe.vendor
+from bb.fetch2 import URI
+from . import ResolveError
+
+DEFAULT_REGISTRY = "https://registry.npmjs.org"
+VENDOR_TYPE = "npm"
+
+def determine_uri_path(path, name, version):
+ return f"{path.rstrip('/')}/{name}/-/{name.split('/')[-1]}-{version}.tgz"
+
+def determine_downloadfilename(name, version):
+ filename = f"{name.replace('/', '-')}-{version}.tgz"
+ return oe.vendor.determine_downloadfilename(VENDOR_TYPE, filename)
+
+def extend_uri(uri, name, version, subdir, checksum_name=None,
+ checksum_value=None):
+ params = uri.params
+ params["subdir"] = subdir
+ params["downloadfilename"] = determine_downloadfilename(name, version)
+ params["striplevel"] = "1"
+ if checksum_name and checksum_value:
+ params[checksum_name] = checksum_value
+
+def determine_src_uri(registry, name, version, subdir):
+ uri = URI(registry)
+ uri.path = determine_uri_path(uri.path, name, version)
+ extend_uri(uri, name, version, subdir)
+ return str(uri)
+
+def parse_lock_file(lock_file, function, dev, bundle):
+ try:
+ with open(lock_file, "r") as f:
+ package_lock = json.load(f)
+ except Exception as e:
+ raise ResolveError(f"Invalid file: {str(e)}", lock_file)
+
+ packages = package_lock.get("packages")
+ if not packages:
+ raise ResolveError("Invalid file format", lock_file)
+
+ for location, data in packages.items():
+ # Skip empty main and local link target packages
+ if not location.startswith('node_modules/'):
+ continue
+ elif not dev and data.get("dev", False):
+ continue
+ elif not bundle and data.get("inBundle", False):
+ continue
+ name = location.split('node_modules/')[-1]
+ function(name, data, location)
+
+def resolve_src_uris(lock_file, registry, base_subdir, dev=False):
+ src_uris = []
+
+ def resolve_src_uri(name, data, location):
+ integrity = data.get("integrity")
+ resolved = data.get("resolved")
+ name = data.get("name", name)
+ version = data.get("version")
+ link = data.get("link", False)
+
+ if integrity:
+ algorithm, value = integrity.split("-", maxsplit=1)
+ checksum_name = f"{algorithm}sum"
+ checksum_value = base64.b64decode(value).hex()
+
+ if resolved.startswith(DEFAULT_REGISTRY):
+ resolved = resolved.replace(DEFAULT_REGISTRY, registry)
+
+ subdir = os.path.join(base_subdir, location)
+
+ # Skip link sources
+ if link:
+ return
+
+ # Handle registry sources
+ elif version and integrity:
+ # Handle duplicate dependencies without url
+ if not resolved:
+ return
+
+ uri = URI(resolved)
+ params = uri.params
+ params["name"] = name
+ params["version"] = version
+ params["vendor"] = VENDOR_TYPE
+ extend_uri(uri, name, version, subdir, checksum_name,
+ checksum_value)
+
+ # Handle http tarball sources
+ elif resolved.startswith("http") and integrity:
+ uri = URI(resolved)
+ params = uri.params
+ params["name"] = name
+ params["subdir"] = subdir
+ params["striplevel"] = "1"
+ params[checksum_name] = checksum_value
+
+ # Skip local tarball
+ elif resolved.startswith("file"):
+ return
+
+ # Handle git sources
+ elif resolved.startswith("git"):
+ resolved = resolved.replace("+ssh://git@github.com", "+https://github.com")
+ repository, _, revision = resolved.partition("#")
+ uri = URI(repository)
+ params = uri.params
+ scheme, _, protocol = uri.scheme.partition("+")
+ if protocol:
+ if protocol == "ssh" and uri.user == "git":
+ protocol = "https"
+ uri.user = ""
+ params["protocol"] = protocol
+ uri.scheme = scheme
+ params["nobranch"] = "1"
+ params["subdir"] = subdir
+ params["rev"] = revision
+
+ else:
+ raise ResolveError(f"Unsupported dependency: {name}", lock_file)
+
+ src_uri = str(uri)
+ src_uri = urllib.parse.unquote(src_uri)
+ src_uris.append(src_uri)
+
+ parse_lock_file(lock_file, resolve_src_uri, dev, False)
+
+ return src_uris