Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/readme #8

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ test:
format:
black -l 99 src
black -l 99 tests
isort -rc src
isort -rc tests
isort src
isort tests

# Deployment
package:
Expand Down
4 changes: 2 additions & 2 deletions Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,5 @@ pytest = "*"
pytest-cov = "*"
pytest-mock = "*"
pytest-lazy-fixture = "*"
boto3 = "*"
moto = "==1.3.13" # https://github.com/spulec/moto/issues/1941
boto3 = "*"
moto = "*"
545 changes: 258 additions & 287 deletions Pipfile.lock

Large diffs are not rendered by default.

18 changes: 4 additions & 14 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ The module provides the following commands:
|`cp`/`copy`|2|copy (duplicate) all matched files to new location|
|`mv`/`move`|2|move (rename) all matched files|
|`rm`/`remove`|1| remove all matched files|
|`ls`/`list`/`iter`|1| list all matched files|


Eeach takes one or two patterns, as well as the `dryrun` argument.

Expand All @@ -58,24 +60,12 @@ Eeach takes one or two patterns, as well as the `dryrun` argument.
### General-purpose generators
| command | usecase |
| --- | --- |
| `iter`| iterate over all matching filenames, e.g. to read each file |
| `iter`/`ls`| iterate over all matching filenames, e.g. to read each file |
| `iterformats` | iterate over all matched `format dictionaries`, e.g. to collect all Hive key values |

`s3migrate.iter(pattern)` will yield file names `filename` matching `pattern`. This allows custom file processing logic downstream.

`s3migrate.iterformats(pattern)` will instead yield dictionaries `fmt_dict` such that `pattarn.format(**fmt_dict)` is equivalent to the matched `filename`.

## Dry run mode
Dry run mode allows testing your patterns without performing any destructive operations.

With `dryrun=True` (default), information about operations to be performed is logged at `INFO` and `DEBUG` level - make sure
to set your logging accordingly, e.g. inside a Jupyter Notebook:


```python
import logging

logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
logger.handlers = [logging.StreamHandler()]
```
Dry run mode allows testing your patterns without performing any destructive operations using `dryrun=True`.
150 changes: 123 additions & 27 deletions src/s3migrate/api.py
Original file line number Diff line number Diff line change
@@ -1,34 +1,82 @@
import logging
from typing import Sequence, Union

import s3fs
from tqdm.autonotebook import tqdm
import fsspec

from .paths import immutable_base
from . import patterns


logger = logging.getLogger(__name__)


__all__ = ["cp", "copy", "mv", "move", "rm", "remove", "iter", "iterformats"]


FS = s3fs.S3FileSystem()
PROTOCOL = "s3"


def _yield_candidates(fmt_in):
base_path = immutable_base(fmt_in)
logger.info("Looking for objects in %s", base_path)
for (dirpath, dirnames, filenames) in FS.walk(base_path): # FixMe: once tio is updated
for filename in filenames:
yield PROTOCOL + "://" + dirpath + FS.sep + filename
__all__ = ["ls", "list", "cp", "copy", "mv", "move", "rm", "remove", "iter", "iterformats"]


def _strip_prefix(path, prefix):
"""Strip prefix if matched exactly."""
if path.startswith(prefix):
path = path[len(prefix):]
return path


def _get_fs_sep_prefix_paths(path: Union[str, Sequence[str]]):
"""Identify separator and prefix for various filesystems.

In the case of local files, prefix is set to "" if original input path
is not prefixed with the file:// prefix.
"""
fs, _, paths = fsspec.get_fs_token_paths(path)
protocol = fs.protocol
if not isinstance(protocol, str):
protocol, *_ = protocol
prefix = protocol + "://"
if isinstance(path, str):
first_path = path
else:
first_path = path[0]
if protocol == "file" and not first_path.startswith(prefix): # local file notation
prefix = ""
try:
sep = fs.pathsep
except AttributeError:
sep = "/"
return fs, sep, prefix, paths


def _yield_candidates(path_fmt, fs=None, sep=None, prefix=None):
if not fs:
fs, sep, prefix, [path_fmt] = _get_fs_sep_prefix_paths(path_fmt)

parts = path_fmt.split(sep)
parts_immutable = ["{" not in part for part in parts]
try:
first_mutable = parts_immutable.index(False)
except ValueError: # path has no templates
if fs.isfile(path_fmt):
yield prefix + path_fmt
return

immutable_base = sep.join(parts[:first_mutable])
this_fmt = sep.join(parts[: first_mutable + 1])
remaining_fmt = sep.join(parts[first_mutable + 1:])
if remaining_fmt:
remaining_fmt = sep + remaining_fmt
try:
entries = fs.ls(immutable_base)
except FileNotFoundError:
return
for entry in entries:
fmt_dict = patterns.get_fmt_match_dict(entry.rstrip(sep), this_fmt)
if fmt_dict is not None:
yield from _yield_candidates(
this_fmt.format(**fmt_dict) + remaining_fmt, fs=fs, sep=sep, prefix=prefix
)


def iterformats(fmt_in):
candidate_files = _yield_candidates(fmt_in)
total, found = 0, 0
for path_in in tqdm(candidate_files):
for path_in in candidate_files:
total += 1
fmt_dict = patterns.get_fmt_match_dict(path_in, fmt_in)
if fmt_dict is not None:
Expand All @@ -44,32 +92,80 @@ def iter(fmt_in):
yield fmt_in.format(**fmt)


def copy(fmt_in, fmt_out, dryrun=True):
def _bin_op(fs_op_fn_getter, op_description: str, fmt_in, fmt_out, dryrun=True):
"""Shared functionality for move/copy."""
fs_in, sep, prefix_in, _ = _get_fs_sep_prefix_paths(fmt_in)
fs_out, sep, prefix_out, _ = _get_fs_sep_prefix_paths(fmt_out)
if fs_out != fs_in:
raise NotImplementedError("Can not copy between differen filesystems.")
else:
fs = fs_in
_op_fn = fs_op_fn_getter(fs)

def op_fn(path_in, path_out, prefix_in=prefix_in, prefix_out=prefix_out):
path_in = _strip_prefix(path_in, prefix_in)
path_out = _strip_prefix(path_out, prefix_out)
_op_fn(path_in, path_out)

for fmt in iterformats(fmt_in):
path_in = fmt_in.format(**fmt)
path_out = fmt_out.format(**fmt)
logger.debug("Copying %s to %s", path_in, path_out)
logger.debug(f"{op_description} %s to %s", path_in, path_out)
if not dryrun:
FS.copy_basic(path_in, path_out)
path_out_dir_only = sep.join(path_out.split(sep)[:-1])
fs.makedirs(path_out_dir_only, exist_ok=True)
op_fn(path_in, path_out)


def copy(fmt_in, fmt_out, dryrun=True):
"""Copy files to new parametrised locations."""

def _copy_fn_getter(fs):
try:
_copy_fn = fs.copy_basic
except AttributeError:
_copy_fn = fs.copy
return _copy_fn

return _bin_op(
fs_op_fn_getter=_copy_fn_getter,
op_description="Copying",
fmt_in=fmt_in,
fmt_out=fmt_out,
dryrun=dryrun,
)


def move(fmt_in, fmt_out, dryrun=True):
for fmt in iterformats(fmt_in):
path_in = fmt_in.format(**fmt)
path_out = fmt_out.format(**fmt)
logger.debug("Moving %s to %s", path_in, path_out)
if not dryrun:
FS.move(path_in, path_out)
"""Move files to new parametrised locations."""

def _move_fn_getter(fs):
return fs.move

return _bin_op(
fs_op_fn_getter=_move_fn_getter,
op_description="Moving",
fmt_in=fmt_in,
fmt_out=fmt_out,
dryrun=dryrun,
)


def remove(fmt_in, dryrun=True):
fs, _, prefix_in, _ = _get_fs_sep_prefix_paths(fmt_in)

def rm(path_in):
path_in = _strip_prefix(path_in, prefix_in)
fs.rm(path_in)

for fmt in iterformats(fmt_in):
path_in = fmt_in.format(**fmt)
logger.debug("Removing %s", path_in)
if not dryrun:
FS.rm(path_in)
rm(path_in)


ls = list = iter
cp = copy
mv = move
rm = remove
20 changes: 13 additions & 7 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,16 @@
import pytest
from pytest_lazyfixture import lazy_fixture

import moto
import boto3

import fsspec
import moto
import pytest
from pytest_lazyfixture import lazy_fixture


@pytest.fixture
def s3_writable_url(monkeypatch):
"""Returns a writable S3 URL."""
test_bucket_name = "test_bucket"
with moto.mock_s3():
conn = boto3.resource('s3', region_name='us-east-1')
conn = boto3.resource("s3", region_name="us-east-1")
conn.create_bucket(Bucket=test_bucket_name)
url = f"s3://{test_bucket_name}"
yield url
Expand All @@ -26,6 +24,13 @@ def local_writable_url(tmp_path):
yield url


@pytest.fixture
def memory_writable_url():
"""Return a writable in-memory URL."""
url = "memory://temp"
yield url


TREE = [
"2018-01-01/type=one/file.ext",
"2018-01-02/type=two/file.ext",
Expand All @@ -37,7 +42,8 @@ def local_writable_url(tmp_path):
@pytest.fixture(
params=[
lazy_fixture("s3_writable_url"),
# lazy_fixture("local_writable_url")
lazy_fixture("local_writable_url"),
lazy_fixture("memory_writable_url"),
]
)
def file_tree(request):
Expand Down
42 changes: 42 additions & 0 deletions tests/test_api.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,51 @@
import s3migrate


def test_yield_candidates(file_tree):
"""Test that pruning is done early."""
base_url, files = file_tree
pattern = base_url + "/{ds}/type=three/{filename}.ext"
candidates = list(s3migrate.api._yield_candidates(pattern))
assert len(candidates) == 2


def test_iterformats(file_tree):
base_url, files = file_tree
pattern = base_url + "/{ds}/type={type}/{filename}.ext"
for idx_found, fmt in enumerate(s3migrate.iterformats(pattern)):
assert pattern.format(**fmt) in files
assert idx_found + 1 == len(files)


def test_copy(file_tree):
base_url, files = file_tree
fmt_in = base_url + "/{ds}/type={type}/{filename}.ext"
fmt_out = base_url + "/year={ds}/{filename}.{type}"

s3migrate.copy(fmt_in, fmt_out, dryrun=False)

new_files = list(s3migrate.iter(fmt_out))
assert len(new_files) == len(files)


def test_move(file_tree):
base_url, files = file_tree
fmt_in = base_url + "/{ds}/type={type}/{filename}.ext"
fmt_out = base_url + "/year={ds}/{filename}.{type}"

s3migrate.move(fmt_in, fmt_out, dryrun=False)

old_files = list(s3migrate.iter(fmt_in))
new_files = list(s3migrate.iter(fmt_out))
assert len(old_files) == 0
assert len(new_files) == len(files)


def test_remove(file_tree):
base_url, files = file_tree
fmt_in = base_url + "/{ds}/type={type}/{filename}.ext"

s3migrate.remove(fmt_in, dryrun=False)

old_files = list(s3migrate.iter(fmt_in))
assert len(old_files) == 0
1 change: 1 addition & 0 deletions tests/test_paths.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
("bucket/key", "bucket/key"),
("bucket/{key}", "bucket"),
("bucket/key={key}", "bucket"),
("s3://bucket/key={key}", "s3://bucket"),
],
)
def test_fmt_string_to_regex_pattern(path_fmt, base):
Expand Down