Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make acquire compression configurable #185

Merged
merged 6 commits into from
Jul 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions acquire/acquire.py
Original file line number Diff line number Diff line change
Expand Up @@ -1782,6 +1782,7 @@ def acquire_target(target: Target, args: argparse.Namespace, output_ts: Optional
output = OUTPUTS[args.output_type](
output_path,
compress=args.compress,
compression_method=args.compress_method,
encrypt=args.encrypt,
public_key=args.public_key,
)
Expand Down
6 changes: 4 additions & 2 deletions acquire/outputs/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from acquire.outputs.dir import DirectoryOutput
from acquire.outputs.tar import TarOutput
from acquire.outputs.zip import ZipOutput
from acquire.outputs.tar import TAR_COMPRESSION_METHODS, TarOutput
from acquire.outputs.zip import ZIP_COMPRESSION_METHODS, ZipOutput

__all__ = ["DirectoryOutput", "TarOutput", "ZipOutput"]

OUTPUTS = {"tar": TarOutput, "dir": DirectoryOutput, "zip": ZipOutput}

COMPRESSION_METHODS = {*TAR_COMPRESSION_METHODS, *ZIP_COMPRESSION_METHODS}
11 changes: 9 additions & 2 deletions acquire/outputs/tar.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,16 @@
from acquire.crypt import EncryptedStream
from acquire.outputs.base import Output

TAR_COMPRESSION_METHODS = {"gzip": "gz", "bzip2": "bz2", "xz": "xz"}


class TarOutput(Output):
"""Tar archive acquire output format. Output can be compressed and/or encrypted.

Args:
path: The path to write the tar archive to.
compress: Whether to compress the tar archive.
compression_method: Compression method to use (Default: gzip). Supports "gzip", "bzip2", "xz".
encrypt: Whether to encrypt the tar archive.
public_key: The RSA public key to encrypt the header with.
"""
Expand All @@ -23,15 +26,19 @@ def __init__(
self,
path: Path,
compress: bool = False,
compression_method: str = "gzip",
encrypt: bool = False,
public_key: Optional[bytes] = None,
) -> None:
self.compression = None
ext = ".tar" if ".tar" not in path.suffixes else ""
mode = "w|" if encrypt else "w:"

if compress:
ext += ".gz" if ".gz" not in path.suffixes else ""
mode += "gz"
self.compression = TAR_COMPRESSION_METHODS.get(compression_method, "gz")

ext += f".{self.compression}" if f".{self.compression}" not in path.suffixes else ""
mode += self.compression

if encrypt:
ext += ".enc"
Expand Down
57 changes: 38 additions & 19 deletions acquire/outputs/zip.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,16 @@
from acquire.crypt import EncryptedStream
from acquire.outputs.base import Output

ZIP_COMPRESSION_METHODS = {"deflate": zipfile.ZIP_DEFLATED, "bzip2": zipfile.ZIP_BZIP2, "lzma": zipfile.ZIP_LZMA}


class ZipOutput(Output):
"""Zip archive acquire output format. Output can be compressed and/or encrypted.

Args:
path: The path to write the zip archive to.
compress: Whether to compress the zip archive.
compression_method: Compression method to use (Default: Deflate). Supports "deflate", "bzip2", "lzma".
encrypt: Whether to encrypt the zip archive.
public_key: The RSA public key to encrypt the header with.
"""
Expand All @@ -26,6 +29,7 @@ def __init__(
self,
path: Path,
compress: bool = False,
compression_method: str = "deflate",
encrypt: bool = False,
public_key: Optional[bytes] = None,
) -> None:
Expand All @@ -38,7 +42,7 @@ def __init__(
self.path = path.with_suffix(path.suffix + ext)

if compress:
self.compression = zipfile.ZIP_DEFLATED
self.compression = ZIP_COMPRESSION_METHODS.get(compression_method, zipfile.ZIP_DEFLATED)
else:
self.compression = zipfile.ZIP_STORED

Expand Down Expand Up @@ -78,32 +82,19 @@ def write(
info.compress_type = self.compression

if entry:
info.external_attr = self._get_external_attr(entry)

if entry.is_symlink():
# System which created ZIP archive, 3 = Unix; 0 = Windows
# Windows does not have symlinks, so this must be a unixoid system
info.create_system = 3

# The Python zipfile module accepts the 16-bit "Mode" field (that stores st_mode field from
# struct stat, containing user/group/other permissions, setuid/setgid and symlink info, etc) of the
# ASi extra block for Unix as bits 16-31 of the external_attr
unix_st_mode = (
stat.S_IFLNK
| stat.S_IRUSR
| stat.S_IWUSR
| stat.S_IXUSR
| stat.S_IRGRP
| stat.S_IWGRP
| stat.S_IXGRP
| stat.S_IROTH
| stat.S_IWOTH
| stat.S_IXOTH
)
info.external_attr = unix_st_mode << 16

lstat = entry.lstat()
if lstat:
# Python zipfile module does not support timestamps before 1980
dt = datetime.fromtimestamp(lstat.st_mtime)
info.date_time = (dt.year, dt.month, dt.day, dt.hour, dt.minute, dt.second)
year = max(dt.year, 1980)
info.date_time = (year, dt.month, dt.day, dt.hour, dt.minute, dt.second)

with self.archive.open(info, "w") as zfh:
shutil.copyfileobj(fh, zfh)
Expand All @@ -113,3 +104,31 @@ def close(self) -> None:
self.archive.close()
if self._fh:
self._fh.close()

def _get_external_attr(self, entry: FilesystemEntry) -> int:
"""Return the appropriate external attributes of the entry."""

# The Python zipfile module accepts the 16-bit "Mode" field (that stores st_mode field from
# struct stat, containing user/group/other permissions, setuid/setgid and symlink info, etc) of the
# ASi extra block for Unix as bits 16-31 of the external_attr
unix_st_mode = stat.S_IFREG

if entry.is_symlink():
unix_st_mode = stat.S_IFLNK
elif entry.is_dir():
unix_st_mode = stat.S_IFDIR

unix_st_mode = (
unix_st_mode
| stat.S_IRUSR
| stat.S_IWUSR
| stat.S_IXUSR
| stat.S_IRGRP
| stat.S_IWGRP
| stat.S_IXGRP
| stat.S_IROTH
| stat.S_IWOTH
| stat.S_IXOTH
) << 16

return unix_st_mode
24 changes: 22 additions & 2 deletions acquire/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,12 @@

from dissect.target import Target

from acquire.outputs import OUTPUTS
from acquire.outputs import (
COMPRESSION_METHODS,
OUTPUTS,
TAR_COMPRESSION_METHODS,
ZIP_COMPRESSION_METHODS,
)
from acquire.uploaders.plugin_registry import UploaderRegistry


Expand Down Expand Up @@ -75,7 +80,7 @@
parser.add_argument(
"-ot",
"--output-type",
choices=OUTPUTS.keys(),
choices=OUTPUTS,
default="tar",
help="output type (default: tar)",
)
Expand All @@ -84,6 +89,11 @@
action=argparse.BooleanOptionalAction,
help="compress output (if supported by the output type)",
)
parser.add_argument(
"--compress-method",
choices=COMPRESSION_METHODS,
pyrco marked this conversation as resolved.
Show resolved Hide resolved
help="compression method (if supported by the output type)",
)
parser.add_argument(
"--encrypt",
action=argparse.BooleanOptionalAction,
Expand Down Expand Up @@ -320,6 +330,16 @@
if not args.children and args.skip_parent:
raise ValueError("--skip-parent can only be set with --children")

if args.compress:
if (args.output_type == "zip" and args.compress_method) and args.compress_method not in ZIP_COMPRESSION_METHODS:
raise ValueError(

Check warning on line 335 in acquire/utils.py

View check run for this annotation

Codecov / codecov/patch

acquire/utils.py#L334-L335

Added lines #L334 - L335 were not covered by tests
f"Invalid compression method for zip, allowed are: {', '.join(ZIP_COMPRESSION_METHODS.keys())}"
)
if (args.output_type == "tar" and args.compress_method) and args.compress_method not in TAR_COMPRESSION_METHODS:
raise ValueError(

Check warning on line 339 in acquire/utils.py

View check run for this annotation

Codecov / codecov/patch

acquire/utils.py#L338-L339

Added lines #L338 - L339 were not covered by tests
f"Invalid compression method for tar, allowed are: {', '.join(TAR_COMPRESSION_METHODS.keys())}"
)


def get_user_name() -> str:
try:
Expand Down
8 changes: 5 additions & 3 deletions tests/test_outputs_tar.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@
from acquire.outputs import TarOutput


@pytest.fixture
def tar_output(tmp_path: Path) -> TarOutput:
return TarOutput(tmp_path)
@pytest.fixture(params=[(True, "gzip"), (True, "bzip2"), (True, "xz"), (False, None)])
def tar_output(tmp_path: Path, request: pytest.FixtureRequest) -> TarOutput:
compress, compression_method = request.param
return TarOutput(tmp_path, compress=compress, compression_method=compression_method)


@pytest.mark.parametrize(
Expand All @@ -28,6 +29,7 @@ def test_tar_output_write_entry(mock_fs: VirtualFilesystem, tar_output: TarOutpu
tar_file = tarfile.open(tar_output.path)
files = tar_file.getmembers()

assert tar_output.path.suffix == f".{tar_output.compression}" if tar_output.compression else ".tar"
assert len(files) == 1

file = files[0]
Expand Down
47 changes: 47 additions & 0 deletions tests/test_outputs_zip.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import stat
import zipfile
from pathlib import Path

import pytest
from dissect.target.filesystem import VirtualFilesystem

from acquire.outputs import ZipOutput


@pytest.fixture(params=[(True, "deflate"), (True, "bzip2"), (True, "lzma"), (False, None)])
def zip_output(tmp_path: Path, request: pytest.FixtureRequest) -> ZipOutput:
compress, compression_method = request.param
return ZipOutput(tmp_path, compress=compress, compression_method=compression_method)


@pytest.mark.parametrize(
"entry_name",
[
"/foo/bar/some-file",
"/foo/bar/some-symlink",
"/foo/bar/some-dir",
],
)
def test_zip_output_write_entry(mock_fs: VirtualFilesystem, zip_output: ZipOutput, entry_name: str) -> None:
entry = mock_fs.get(entry_name)

assert zip_output.compression == zip_output.archive.compression
zip_output.write_entry(entry_name, entry)
zip_output.close()

zip_file = zipfile.ZipFile(zip_output.path, mode="r")
files = zip_file.filelist
assert len(files) == 1

file = files[0]
assert file.filename == entry_name

file_type = file.external_attr >> 16

# zipfile only supports is_dir(). we have all the information we need to determine the file type in 'external_attr'
if entry.is_dir():
assert stat.S_ISDIR(file_type)
elif entry.is_symlink():
assert stat.S_ISLNK(file_type)
elif entry.is_file():
assert stat.S_ISREG(file_type)
9 changes: 5 additions & 4 deletions tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -491,10 +491,11 @@ def test_utils_normalize_path(
if os == "windows":
case_sensitive = False

with patch.object(mock_target, "os", new=os), patch.object(
mock_target.fs, "_case_sensitive", new=case_sensitive
), patch.object(mock_target.fs, "_alt_separator", new=("\\" if os == "windows" else "/")), patch.dict(
mock_target.props, {"sysvol_drive": sysvol}
with (
patch.object(mock_target, "os", new=os),
patch.object(mock_target.fs, "_case_sensitive", new=case_sensitive),
patch.object(mock_target.fs, "_alt_separator", new=("\\" if os == "windows" else "/")),
patch.dict(mock_target.props, {"sysvol_drive": sysvol}),
):
if as_path:
path = TargetPath(mock_target.fs, path)
Expand Down
Loading