Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support s3://, memory:// and file:// filesystems #6

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ test:
format:
black -l 99 src
black -l 99 tests
isort -rc src
isort -rc tests
isort src
isort tests

# Deployment
package:
Expand Down
4 changes: 2 additions & 2 deletions Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,5 @@ pytest = "*"
pytest-cov = "*"
pytest-mock = "*"
pytest-lazy-fixture = "*"
boto3 = "*"
moto = "==1.3.13" # https://github.com/spulec/moto/issues/1941
boto3 = "*"
moto = "*"
545 changes: 258 additions & 287 deletions Pipfile.lock

Large diffs are not rendered by default.

98 changes: 82 additions & 16 deletions src/s3migrate/api.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import logging

import s3fs
import fsspec
from tqdm.autonotebook import tqdm

from .paths import immutable_base
from . import patterns
from .paths import immutable_base


logger = logging.getLogger(__name__)
Expand All @@ -13,16 +13,35 @@
__all__ = ["cp", "copy", "mv", "move", "rm", "remove", "iter", "iterformats"]


FS = s3fs.S3FileSystem()
PROTOCOL = "s3"
def _strip_prefix(path, prefix):
"""Strip prefix if matched exactly."""
if path.startswith(prefix):
path = path[len(prefix):]
return path


def _get_fs_sep_prefix(fmt_in):
fs, _, _ = fsspec.get_fs_token_paths(fmt_in)
protocol = fs.protocol
if not isinstance(protocol, str):
protocol, *_ = protocol
prefix = protocol + "://"
if protocol == "file" and not fmt_in.startswith(prefix):
prefix = ""
try:
sep = fs.pathsep
except AttributeError:
sep = "/"
return fs, sep, prefix


def _yield_candidates(fmt_in):
base_path = immutable_base(fmt_in)
logger.info("Looking for objects in %s", base_path)
for (dirpath, dirnames, filenames) in FS.walk(base_path): # FixMe: once tio is updated
fs, sep, prefix = _get_fs_sep_prefix(fmt_in)
for (dirpath, dirnames, filenames) in fs.walk(base_path):
for filename in filenames:
yield PROTOCOL + "://" + dirpath + FS.sep + filename
yield prefix + dirpath + sep + filename


def iterformats(fmt_in):
Expand All @@ -44,30 +63,77 @@ def iter(fmt_in):
yield fmt_in.format(**fmt)


def copy(fmt_in, fmt_out, dryrun=True):
def _bin_op(fs_op_fn_getter, op_description: str, fmt_in, fmt_out, dryrun=True):
"""Shared functionality for move/copy."""
fs_in, sep, prefix_in = _get_fs_sep_prefix(fmt_in)
fs_out, sep, prefix_out = _get_fs_sep_prefix(fmt_out)
if fs_out != fs_in:
raise NotImplementedError("Can not copy between differen filesystems.")
else:
fs = fs_in
_op_fn = fs_op_fn_getter(fs)

def op_fn(path_in, path_out, prefix_in=prefix_in, prefix_out=prefix_out):
path_in = _strip_prefix(path_in, prefix_in)
path_out = _strip_prefix(path_out, prefix_out)
_op_fn(path_in, path_out)

for fmt in iterformats(fmt_in):
path_in = fmt_in.format(**fmt)
path_out = fmt_out.format(**fmt)
logger.debug("Copying %s to %s", path_in, path_out)
logger.debug(f"{op_description} %s to %s", path_in, path_out)
if not dryrun:
FS.copy_basic(path_in, path_out)
path_out_dir_only = sep.join(path_out.split(sep)[:-1])
fs.makedirs(path_out_dir_only, exist_ok=True)
op_fn(path_in, path_out)


def copy(fmt_in, fmt_out, dryrun=True):
"""Copy files to new parametrised locations."""

def _copy_fn_getter(fs):
try:
_copy_fn = fs.copy_basic
except AttributeError:
_copy_fn = fs.copy
return _copy_fn

return _bin_op(
fs_op_fn_getter=_copy_fn_getter,
op_description="Copying",
fmt_in=fmt_in,
fmt_out=fmt_out,
dryrun=dryrun,
)


def move(fmt_in, fmt_out, dryrun=True):
for fmt in iterformats(fmt_in):
path_in = fmt_in.format(**fmt)
path_out = fmt_out.format(**fmt)
logger.debug("Moving %s to %s", path_in, path_out)
if not dryrun:
FS.move(path_in, path_out)
"""Move files to new parametrised locations."""

def _move_fn_getter(fs):
return fs.move

return _bin_op(
fs_op_fn_getter=_move_fn_getter,
op_description="Moving",
fmt_in=fmt_in,
fmt_out=fmt_out,
dryrun=dryrun,
)


def remove(fmt_in, dryrun=True):
fs, _, prefix_in = _get_fs_sep_prefix(fmt_in)

def rm(path_in):
path_in = _strip_prefix(path_in, prefix_in)
fs.rm(path_in)

for fmt in iterformats(fmt_in):
path_in = fmt_in.format(**fmt)
logger.debug("Removing %s", path_in)
if not dryrun:
FS.rm(path_in)
rm(path_in)


cp = copy
Expand Down
20 changes: 13 additions & 7 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,16 @@
import pytest
from pytest_lazyfixture import lazy_fixture

import moto
import boto3

import fsspec
import moto
import pytest
from pytest_lazyfixture import lazy_fixture


@pytest.fixture
def s3_writable_url(monkeypatch):
"""Returns a writable S3 URL."""
test_bucket_name = "test_bucket"
with moto.mock_s3():
conn = boto3.resource('s3', region_name='us-east-1')
conn = boto3.resource("s3", region_name="us-east-1")
conn.create_bucket(Bucket=test_bucket_name)
url = f"s3://{test_bucket_name}"
yield url
Expand All @@ -26,6 +24,13 @@ def local_writable_url(tmp_path):
yield url


@pytest.fixture
def memory_writable_url():
"""Return a writable in-memory URL."""
url = "memory://temp"
yield url


TREE = [
"2018-01-01/type=one/file.ext",
"2018-01-02/type=two/file.ext",
Expand All @@ -37,7 +42,8 @@ def local_writable_url(tmp_path):
@pytest.fixture(
params=[
lazy_fixture("s3_writable_url"),
# lazy_fixture("local_writable_url")
lazy_fixture("local_writable_url"),
lazy_fixture("memory_writable_url"),
]
)
def file_tree(request):
Expand Down
34 changes: 34 additions & 0 deletions tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,37 @@ def test_iterformats(file_tree):
for idx_found, fmt in enumerate(s3migrate.iterformats(pattern)):
assert pattern.format(**fmt) in files
assert idx_found + 1 == len(files)


def test_copy(file_tree):
base_url, files = file_tree
fmt_in = base_url + "/{ds}/type={type}/{filename}.ext"
fmt_out = base_url + "/year={ds}/{filename}.{type}"

s3migrate.copy(fmt_in, fmt_out, dryrun=False)

new_files = list(s3migrate.iter(fmt_out))
assert len(new_files) == len(files)


def test_move(file_tree):
base_url, files = file_tree
fmt_in = base_url + "/{ds}/type={type}/{filename}.ext"
fmt_out = base_url + "/year={ds}/{filename}.{type}"

s3migrate.move(fmt_in, fmt_out, dryrun=False)

old_files = list(s3migrate.iter(fmt_in))
new_files = list(s3migrate.iter(fmt_out))
assert len(old_files) == 0
assert len(new_files) == len(files)


def test_remove(file_tree):
base_url, files = file_tree
fmt_in = base_url + "/{ds}/type={type}/{filename}.ext"

s3migrate.remove(fmt_in, dryrun=False)

old_files = list(s3migrate.iter(fmt_in))
assert len(old_files) == 0
1 change: 1 addition & 0 deletions tests/test_paths.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
("bucket/key", "bucket/key"),
("bucket/{key}", "bucket"),
("bucket/key={key}", "bucket"),
("s3://bucket/key={key}", "s3://bucket"),
],
)
def test_fmt_string_to_regex_pattern(path_fmt, base):
Expand Down