@@ -197,7 +197,7 @@ class BaseConfig(object):
self.portlocks = {}
self.bitbake_e = ''
self.snapshot = False
- self.wictypes = ('wic', 'wic.vmdk', 'wic.qcow2', 'wic.vdi', "wic.vhd", "wic.vhdx")
+ self.wictypes = ('wic.zst', 'wic', 'wic.vmdk', 'wic.qcow2', 'wic.vdi', "wic.vhd", "wic.vhdx")
self.fstypes = ('ext2', 'ext3', 'ext4', 'jffs2', 'nfs', 'btrfs',
'cpio.gz', 'cpio', 'ramfs', 'tar.bz2', 'tar.gz',
'squashfs', 'squashfs-xz', 'squashfs-lzo',
@@ -418,6 +418,48 @@ class BaseConfig(object):
else:
raise RunQemuError("Unknown path arg %s" % p)
+ def uncompress_rootfs(self):
+ """Decompress ZST rootfs image if needed"""
+ if not self.rootfs or not self.fstype.endswith('.zst'):
+ return
+
+ # Ensure snapshot mode is active before allowing decompression.
+ if not self.snapshot:
+ raise RunQemuError(".zst images are only supported with snapshot mode. " \
+ "You can either use the \"snapshot\" option or use an uncompressed image.")
+
+ # Get the real path to the image to avoid issues when a symbolic link is passed.
+ # This ensures we always operate on the actual file.
+ image_path = os.path.realpath(self.rootfs)
+ # Extract target filename by removing .zst
+ image_dir = os.path.dirname(image_path)
+ uncompressed_name = os.path.basename(image_path).replace(".zst", "")
+ uncompressed_path = os.path.join(image_dir, uncompressed_name)
+
+ # If the decompressed image already exists (e.g., in the deploy directory),
+ # we use it directly to avoid overwriting artifacts generated by the build system.
+ # This prevents redundant decompression and preserves build outputs.
+ if os.path.exists(uncompressed_path):
+ logger.warning(f"Found existing decompressed image: {uncompressed_path}, Using it directly.")
+ else:
+ logger.info(f"Decompressing {self.rootfs} to {uncompressed_path}")
+ # Ensure the 'zstd' tool is installed before attempting to decompress.
+ if not shutil.which('zstd'):
+ raise RunQemuError(f"'zstd' is required to decompress {self.rootfs} but was not found in PATH")
+ try:
+ with open(uncompressed_path, 'wb') as out_file:
+ subprocess.check_call(['zstd', '-d', '-c', image_path], stdout=out_file)
+ except subprocess.CalledProcessError as e:
+ self.cleanup_files.append(uncompressed_path)
+ raise RunQemuError(f"Failed to decompress {self.rootfs}: {e}")
+
+ # Mark for deletion at the end
+ self.cleanup_files.append(uncompressed_path)
+
+ # Use the decompressed image as the rootfs
+ self.rootfs = uncompressed_path
+ self.fstype = self.fstype.removesuffix(".zst")
+
def check_arg_machine(self, arg):
"""Check whether it is a machine"""
if self.get('MACHINE') == arg:
@@ -1762,6 +1804,7 @@ def main():
config.check_args()
config.read_qemuboot()
config.check_and_set()
+ config.uncompress_rootfs()
# Check whether the combos is valid or not
config.validate_combos()
config.print_config()