diff mbox series

[1/2] fetch2: validate deb/ipk data member names

Message ID 20260518145909.1132755-2-anders.heimer@est.tech
State New
Headers show
Series fetch2: harden deb/ipk unpack command argument | expand

Commit Message

Anders Heimer May 18, 2026, 2:59 p.m. UTC
The deb/ipk unpack path selects a data archive member from 'ar -t'
output and then passes that member name to a shell command. Previously,
any member beginning with data.tar. was selected.

Only select known deb/ipk data archive member names when datafile is
created. Quote the package path used in the shell command as it can come
from the local fetch path.

Add local fetcher regression coverage for quoted package filenames,
valid compressed data members, and unsupported or unsafe data member
names.

Signed-off-by: Anders Heimer <anders.heimer@est.tech>
---
 lib/bb/fetch2/__init__.py | 10 +++++---
 lib/bb/tests/fetch.py     | 53 +++++++++++++++++++++++++++++++++++++++
 2 files changed, 60 insertions(+), 3 deletions(-)
diff mbox series

Patch

diff --git a/lib/bb/fetch2/__init__.py b/lib/bb/fetch2/__init__.py
index b6cb4c530..dc93e64a9 100644
--- a/lib/bb/fetch2/__init__.py
+++ b/lib/bb/fetch2/__init__.py
@@ -23,6 +23,7 @@  import collections
 import subprocess
 import pickle
 import errno
+import shlex
 import bb.utils
 import bb.checksum
 import bb.process
@@ -1592,16 +1593,19 @@  class FetchMethod(object):
             elif file.endswith('.deb') or file.endswith('.ipk'):
                 output = subprocess.check_output(['ar', '-t', file], preexec_fn=subprocess_setup)
                 datafile = None
+                valid_datafiles = ('data.tar', 'data.tar.gz', 'data.tar.xz',
+                                   'data.tar.zst', 'data.tar.bz2', 'data.tar.lzma')
                 if output:
                     for line in output.decode().splitlines():
-                        if line.startswith('data.tar.') or line == 'data.tar':
+                        if line in valid_datafiles:
                             datafile = line
                             break
                     else:
-                        raise UnpackError("Unable to unpack deb/ipk package - does not contain data.tar* file", urldata.url)
+                        raise UnpackError("Unable to unpack deb/ipk package - does not contain supported data.tar* file", urldata.url)
                 else:
                     raise UnpackError("Unable to unpack deb/ipk package - could not list contents", urldata.url)
-                cmd = 'ar x %s %s && %s -p -f %s && rm %s' % (file, datafile, tar_cmd, datafile, datafile)
+                quoted_datafile = shlex.quote(datafile)
+                cmd = 'ar x %s %s && %s -p -f %s && rm %s' % (shlex.quote(file), quoted_datafile, tar_cmd, quoted_datafile, quoted_datafile)
 
         # If 'subdir' param exists, create a dir and use it as destination for unpack cmd
         if 'subdir' in urldata.parm:
diff --git a/lib/bb/tests/fetch.py b/lib/bb/tests/fetch.py
index 969e5f876..bece59e62 100644
--- a/lib/bb/tests/fetch.py
+++ b/lib/bb/tests/fetch.py
@@ -16,6 +16,7 @@  import tempfile
 import collections
 import os
 import signal
+import subprocess
 import tarfile
 from bb.fetch2 import URI
 import bb
@@ -740,6 +741,34 @@  class FetcherLocalTest(FetcherTest):
         bb.process.run('tar cjf archive.tar.bz2 -C dir .', cwd=self.localsrcdir)
         self.d.setVar("FILESPATH", self.localsrcdir)
 
+    def make_ar_package(self, package_name, data_member="data.tar"):
+        if not shutil.which("ar"):
+            self.skipTest("ar not installed")
+
+        workdir = tempfile.mkdtemp(dir=self.tempdir)
+        payload = os.path.join(workdir, "payload")
+        with open(payload, "w") as f:
+            f.write("payload\n")
+
+        data_path = os.path.join(workdir, data_member)
+        mode = "w:gz" if data_member.endswith(".gz") else "w"
+        with tarfile.open(data_path, mode) as archive:
+            archive.add(payload, arcname="payload")
+
+        with open(os.path.join(workdir, "debian-binary"), "w") as f:
+            f.write("2.0\n")
+
+        control = os.path.join(workdir, "control")
+        with open(control, "w") as f:
+            f.write("Package: fetch-test\nVersion: 1\nArchitecture: all\n")
+        with tarfile.open(os.path.join(workdir, "control.tar"), "w") as archive:
+            archive.add(control, arcname="control")
+
+        package_path = os.path.join(self.localsrcdir, package_name)
+        subprocess.check_call(["ar", "r", package_path, "debian-binary", "control.tar", data_member],
+                              cwd=workdir, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
+        return package_name
+
     def fetchUnpack(self, uris):
         fetcher = bb.fetch.Fetch(uris, self.d)
         fetcher.download()
@@ -813,6 +842,30 @@  class FetcherLocalTest(FetcherTest):
         tree = self.fetchUnpack(['file://archive.tar.bz2;subdir=bar;striplevel=1'])
         self.assertEqual(tree, ['bar/c', 'bar/d', 'bar/subdir/e'])
 
+    def test_local_deb_quoted_filename(self):
+        package = self.make_ar_package("archive$(id).deb")
+        tree = self.fetchUnpack(['file://%s' % package])
+        self.assertEqual(tree, ['payload'])
+
+    def test_local_ipk_gz_data_member(self):
+        package = self.make_ar_package("archive.ipk", data_member="data.tar.gz")
+        tree = self.fetchUnpack(['file://%s' % package])
+        self.assertEqual(tree, ['payload'])
+
+    def test_local_deb_rejects_unknown_data_member_suffix(self):
+        package = self.make_ar_package("archive.deb", data_member="data.tar.foo")
+        with self.assertRaises(bb.fetch2.UnpackError) as context:
+            self.fetchUnpack(['file://%s' % package])
+
+        self.assertIn("does not contain supported data.tar* file", str(context.exception))
+
+    def test_local_deb_rejects_unsafe_data_member(self):
+        package = self.make_ar_package("archive.deb", data_member="data.tar.xz;id")
+        with self.assertRaises(bb.fetch2.UnpackError) as context:
+            self.fetchUnpack(['file://%s' % package])
+
+        self.assertIn("does not contain supported data.tar* file", str(context.exception))
+
     def dummyGitTest(self, suffix):
         # Create dummy local Git repo
         src_dir = tempfile.mkdtemp(dir=self.tempdir,