Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support s3://, memory:// and file:// filesystems #6

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Support and test s3://, file:// and memory:// file systems
ig248 committed Aug 11, 2020
commit c1c620a4294f0a84d7830009791c5f2f03f81769
98 changes: 82 additions & 16 deletions src/s3migrate/api.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import logging

import s3fs
import fsspec
from tqdm.autonotebook import tqdm

from .paths import immutable_base
from . import patterns
from .paths import immutable_base


logger = logging.getLogger(__name__)
@@ -13,16 +13,35 @@
__all__ = ["cp", "copy", "mv", "move", "rm", "remove", "iter", "iterformats"]


FS = s3fs.S3FileSystem()
PROTOCOL = "s3"
def _strip_prefix(path, prefix):
"""Strip prefix if matched exactly."""
if path.startswith(prefix):
path = path[len(prefix):]
return path


def _get_fs_sep_prefix(fmt_in):
fs, _, _ = fsspec.get_fs_token_paths(fmt_in)
protocol = fs.protocol
if not isinstance(protocol, str):
protocol, *_ = protocol
prefix = protocol + "://"
if protocol == "file" and not fmt_in.startswith(prefix):
prefix = ""
try:
sep = fs.pathsep
except AttributeError:
sep = "/"
return fs, sep, prefix


def _yield_candidates(fmt_in):
base_path = immutable_base(fmt_in)
logger.info("Looking for objects in %s", base_path)
for (dirpath, dirnames, filenames) in FS.walk(base_path): # FixMe: once tio is updated
fs, sep, prefix = _get_fs_sep_prefix(fmt_in)
for (dirpath, dirnames, filenames) in fs.walk(base_path):
for filename in filenames:
yield PROTOCOL + "://" + dirpath + FS.sep + filename
yield prefix + dirpath + sep + filename


def iterformats(fmt_in):
@@ -44,30 +63,77 @@ def iter(fmt_in):
yield fmt_in.format(**fmt)


def copy(fmt_in, fmt_out, dryrun=True):
def _bin_op(fs_op_fn_getter, op_description: str, fmt_in, fmt_out, dryrun=True):
"""Shared functionality for move/copy."""
fs_in, sep, prefix_in = _get_fs_sep_prefix(fmt_in)
fs_out, sep, prefix_out = _get_fs_sep_prefix(fmt_out)
if fs_out != fs_in:
raise NotImplementedError("Can not copy between differen filesystems.")
else:
fs = fs_in
_op_fn = fs_op_fn_getter(fs)

def op_fn(path_in, path_out, prefix_in=prefix_in, prefix_out=prefix_out):
path_in = _strip_prefix(path_in, prefix_in)
path_out = _strip_prefix(path_out, prefix_out)
_op_fn(path_in, path_out)

for fmt in iterformats(fmt_in):
path_in = fmt_in.format(**fmt)
path_out = fmt_out.format(**fmt)
logger.debug("Copying %s to %s", path_in, path_out)
logger.debug(f"{op_description} %s to %s", path_in, path_out)
if not dryrun:
FS.copy_basic(path_in, path_out)
path_out_dir_only = sep.join(path_out.split(sep)[:-1])
fs.makedirs(path_out_dir_only, exist_ok=True)
op_fn(path_in, path_out)


def copy(fmt_in, fmt_out, dryrun=True):
"""Copy files to new parametrised locations."""

def _copy_fn_getter(fs):
try:
_copy_fn = fs.copy_basic
except AttributeError:
_copy_fn = fs.copy
return _copy_fn

return _bin_op(
fs_op_fn_getter=_copy_fn_getter,
op_description="Copying",
fmt_in=fmt_in,
fmt_out=fmt_out,
dryrun=dryrun,
)


def move(fmt_in, fmt_out, dryrun=True):
for fmt in iterformats(fmt_in):
path_in = fmt_in.format(**fmt)
path_out = fmt_out.format(**fmt)
logger.debug("Moving %s to %s", path_in, path_out)
if not dryrun:
FS.move(path_in, path_out)
"""Move files to new parametrised locations."""

def _move_fn_getter(fs):
return fs.move

return _bin_op(
fs_op_fn_getter=_move_fn_getter,
op_description="Moving",
fmt_in=fmt_in,
fmt_out=fmt_out,
dryrun=dryrun,
)


def remove(fmt_in, dryrun=True):
fs, _, prefix_in = _get_fs_sep_prefix(fmt_in)

def rm(path_in):
path_in = _strip_prefix(path_in, prefix_in)
fs.rm(path_in)

for fmt in iterformats(fmt_in):
path_in = fmt_in.format(**fmt)
logger.debug("Removing %s", path_in)
if not dryrun:
FS.rm(path_in)
rm(path_in)


cp = copy
20 changes: 13 additions & 7 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,16 @@
import pytest
from pytest_lazyfixture import lazy_fixture

import moto
import boto3

import fsspec
import moto
import pytest
from pytest_lazyfixture import lazy_fixture


@pytest.fixture
def s3_writable_url(monkeypatch):
"""Returns a writable S3 URL."""
test_bucket_name = "test_bucket"
with moto.mock_s3():
conn = boto3.resource('s3', region_name='us-east-1')
conn = boto3.resource("s3", region_name="us-east-1")
conn.create_bucket(Bucket=test_bucket_name)
url = f"s3://{test_bucket_name}"
yield url
@@ -26,6 +24,13 @@ def local_writable_url(tmp_path):
yield url


@pytest.fixture
def memory_writable_url():
"""Return a writable in-memory URL."""
url = "memory://temp"
yield url


TREE = [
"2018-01-01/type=one/file.ext",
"2018-01-02/type=two/file.ext",
@@ -37,7 +42,8 @@ def local_writable_url(tmp_path):
@pytest.fixture(
params=[
lazy_fixture("s3_writable_url"),
# lazy_fixture("local_writable_url")
lazy_fixture("local_writable_url"),
lazy_fixture("memory_writable_url"),
]
)
def file_tree(request):
34 changes: 34 additions & 0 deletions tests/test_api.py
Original file line number Diff line number Diff line change
@@ -7,3 +7,37 @@ def test_iterformats(file_tree):
for idx_found, fmt in enumerate(s3migrate.iterformats(pattern)):
assert pattern.format(**fmt) in files
assert idx_found + 1 == len(files)


def test_copy(file_tree):
base_url, files = file_tree
fmt_in = base_url + "/{ds}/type={type}/{filename}.ext"
fmt_out = base_url + "/year={ds}/{filename}.{type}"

s3migrate.copy(fmt_in, fmt_out, dryrun=False)

new_files = list(s3migrate.iter(fmt_out))
assert len(new_files) == len(files)


def test_move(file_tree):
base_url, files = file_tree
fmt_in = base_url + "/{ds}/type={type}/{filename}.ext"
fmt_out = base_url + "/year={ds}/{filename}.{type}"

s3migrate.move(fmt_in, fmt_out, dryrun=False)

old_files = list(s3migrate.iter(fmt_in))
new_files = list(s3migrate.iter(fmt_out))
assert len(old_files) == 0
assert len(new_files) == len(files)


def test_remove(file_tree):
base_url, files = file_tree
fmt_in = base_url + "/{ds}/type={type}/{filename}.ext"

s3migrate.remove(fmt_in, dryrun=False)

old_files = list(s3migrate.iter(fmt_in))
assert len(old_files) == 0
1 change: 1 addition & 0 deletions tests/test_paths.py
Original file line number Diff line number Diff line change
@@ -10,6 +10,7 @@
("bucket/key", "bucket/key"),
("bucket/{key}", "bucket"),
("bucket/key={key}", "bucket"),
("s3://bucket/key={key}", "s3://bucket"),
],
)
def test_fmt_string_to_regex_pattern(path_fmt, base):