Skip to content

Commit

Permalink
Merge pull request #8 from datarevenue-berlin/ah-adlfs
Browse files Browse the repository at this point in the history
Implement azure data lake filesystem
  • Loading branch information
Alan Höng authored Nov 8, 2019
2 parents bcdc09b + c03daa8 commit 6dfcfae
Show file tree
Hide file tree
Showing 11 changed files with 278 additions and 12 deletions.
4 changes: 2 additions & 2 deletions .codebuild/buildspec.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ env:
parameter-store:
TWINE_PASSWORD: /CodeBuild/PyPi/Dev/pw
TWINE_USERNAME: /CodeBuild/PyPi/Dev/un
TWINE_REPOSITORY: /CodeBuild/PyPi/Dev/url
TWINE_REPOSITORY_URL: /CodeBuild/PyPi/Dev/url
phases:
install:
commands:
Expand All @@ -27,4 +27,4 @@ phases:
- make dist
post_build:
commands:
- bash .codebuild/release.sh
- bash .codebuild/release.sh
4 changes: 4 additions & 0 deletions drfs/filesystems/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,7 @@
except ImportError:
pass

try:
import drfs.filesystems.azure_datalake
except ImportError:
pass
99 changes: 99 additions & 0 deletions drfs/filesystems/azure_datalake.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
from azure.datalake.store import lib, AzureDLFileSystem
from drfs.filesystems.base import FileSystemBase, FILESYSTEMS


class AzureDataLakeFileSystem(FileSystemBase):
fs_cls = AzureDLFileSystem
scheme = "adl"
is_remote = True
supports_scheme = False

def __init__(self, tenant_id=None, client_id=None, client_secret=None, **kwargs):
self.tenant_id = tenant_id
self.client_id = client_id
self.client_secret = client_secret
self.kwargs = kwargs
# self.kwargs['store_name'] = kwargs['host']
token = lib.auth(
tenant_id=self.tenant_id,
client_id=self.client_id,
client_secret=self.client_secret,
)
self.kwargs["token"] = token
self.fs = AzureDLFileSystem(**self.kwargs)

def _parse_store_name(self, path):
from drfs.path import RemotePath

if not isinstance(path, RemotePath):
path = RemotePath(path)

store_name, path = path.hostname, path.path
if store_name == "":
raise ValueError(
"Can't connect without store name. Please provide the path in the "
"following form: 'adl://STORE_NAME/folder/file.extension'!"
)
return store_name, path

def _connect(self, path):
self.fs.kwargs["store_name"], path = self._parse_store_name(path)
self.fs.connect()
return path

def _add_store_name(self, p):
from drfs.path import RemotePath

parts = p.parts
part0 = parts[0].split("/")[2]
drv = parts[0].replace(part0, self.fs.kwargs["store_name"])
return RemotePath(drv, part0, *parts[1:])

def ls(self, path, *args, **kwargs):
path = self._connect(path)
return [self._add_store_name(p) for p in super().ls(path, *args, **kwargs)]

def open(self, path, *args, **kwargs):
path = self._connect(path)
return [self._add_store_name(p) for p in super().open(path, *args, **kwargs)]

def exists(self, path, *args, **kwargs):
path = self._connect(path)
return [self._add_store_name(p) for p in super().exists(path, *args, **kwargs)]

def remove(self, path, *args, **kwargs):
path = self._connect(path)
return [self._add_store_name(p) for p in super().remove(path, *args, **kwargs)]

def mv(self, path, *args, **kwargs):
path = self._connect(path)
return [self._add_store_name(p) for p in super().mv(path, *args, **kwargs)]

def makedirs(self, path, *args, **kwargs):
path = self._connect(path)
return [
self._add_store_name(p) for p in super().makedirs(path, *args, **kwargs)
]

def rmdir(self, path, *args, **kwargs):
path = self._connect(path)
return [self._add_store_name(p) for p in super().rmdir(path, *args, **kwargs)]

def info(self, path, *args, **kwargs):
path = self._connect(path)
return [self._add_store_name(p) for p in super().info(path, *args, **kwargs)]

def walk(self, *args, **kwargs):
arg0 = self._connect(args[0])
return [
self._add_store_name(p) for p in super().walk(arg0, *args[1:], **kwargs)
]

def glob(self, *args, **kwargs):
arg0 = self._connect(args[0])
return [
self._add_store_name(p) for p in super().glob(arg0, *args[1:], **kwargs)
]


FILESYSTEMS[AzureDataLakeFileSystem.scheme] = AzureDataLakeFileSystem
38 changes: 36 additions & 2 deletions drfs/filesystems/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,36 @@
Which filesystem to use is usually inferred from the path/protocol.
"""
from .util import allow_pathlib, return_pathlib, return_schemes
from .util import allow_pathlib, return_pathlib, return_schemes, maybe_remove_scheme

FILESYSTEMS = {}


class FileSystemBase:
"""File System Base
This is a class that wraps other file system classes to provide consistency API
across all file system implementations.
Attributes
----------
fs_cls: Type
wrapped class, will be managed and instantiated by a subclass of this class.
scheme: str
which scheme to use for paths on this storage provider e.g. gcs, s3, file, adl.
is_remote: bool
should be set to true if the filesystem is a a remote filesystem.
supports_scheme: bool
should be set to true if the underlying filesystem supports uris
containing a scheme. If set to true this class will strip the scheme as
specified in the above attribute. E.g. `adl://store/some/file` will be received
as `/store/some/file
"""

fs_cls = None # type: type
scheme = None # type: str
is_remote = None # type: bool
supports_scheme = True # type: bool

def __init__(self, *args, **kwargs):
if self.fs_cls is None:
Expand All @@ -40,58 +60,72 @@ def __init__(self, *args, **kwargs):
self.fs = self.fs_cls(*args, **kwargs)

@allow_pathlib
@maybe_remove_scheme
def open(self, path, *args, **kwargs):
return self.fs.open(path, *args, **kwargs)

@allow_pathlib
@maybe_remove_scheme
def exists(self, path, *args, **kwargs):
return self.fs.exists(path, *args, **kwargs)

@return_pathlib
@return_schemes
@allow_pathlib
@maybe_remove_scheme
def ls(self, path, *args, **kwargs):
return self.fs.ls(path, *args, **kwargs)

@allow_pathlib
@maybe_remove_scheme
def remove(self, path, *args, **kwargs):
try:
return self.fs.remove(path, *args, **kwargs)
except AttributeError:
return self.fs.rm(path, *args, **kwargs)

@allow_pathlib
@maybe_remove_scheme
def copy(self, path, *args, **kwargs):
try:
return self.fs.copy(path, *args, **kwargs)
except AttributeError:
return self.fs.cp(path, *args, **kwargs)

@allow_pathlib
@maybe_remove_scheme
def mv(self, path, *args, **kwargs):
return self.fs.mv(path, *args, **kwargs)

@allow_pathlib
@maybe_remove_scheme
def makedirs(self, path, *args, **kwargs):
return self.fs.makedirs(path, *args, **kwargs)
try:
return self.fs.makedirs(path, *args, **kwargs)
except AttributeError:
return self.fs.mkdir(path, *args, **kwargs)

@allow_pathlib
@maybe_remove_scheme
def rmdir(self, path, *args, **kwargs):
return self.fs.rmdir(path, *args, **kwargs)

@allow_pathlib
@maybe_remove_scheme
def info(self, path, *args, **kwargs):
return self.fs.info(path, *args, **kwargs)

@return_pathlib
@return_schemes
@allow_pathlib
@maybe_remove_scheme
def walk(self, *args, **kwargs):
return self.fs.walk(*args, **kwargs)

@return_pathlib
@return_schemes
@allow_pathlib
@maybe_remove_scheme
def glob(self, *args, **kwargs):
return self.fs.glob(*args, **kwargs)

Expand Down
14 changes: 13 additions & 1 deletion drfs/filesystems/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from functools import partial, wraps

from drfs import settings
from drfs.util import prepend_scheme
from drfs.util import prepend_scheme, remove_scheme


def get_fs(path, opts=None, rtype='instance'):
Expand Down Expand Up @@ -100,3 +100,15 @@ def wrapper(self, path, *args, **kwargs):
res = prepend_scheme(self.scheme, res)
return res
return wrapper


def maybe_remove_scheme(func):
"""Remove scheme from args and kwargs in case underlying fs does not support it."""
@wraps(func)
def wrapper(self, path, *args, **kwargs):
if not self.supports_scheme:
path = remove_scheme(path, raise_=False)
new_args = [remove_scheme(a, raise_=False) for a in args]
new_kwargs = {k : remove_scheme(v, raise_=False) for k, v in kwargs.items()}
return func(self, path, *new_args, **new_kwargs)
return wrapper
35 changes: 35 additions & 0 deletions drfs/luigi.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import os

from drfs.filesystems import get_fs

try:
from luigi.target import FileSystemTarget
except ImportError:
raise ImportError('Could not import luigi library. Try installing it.')


class FileTarget(FileSystemTarget):

def __init__(self, path, **kwargs):
"""Target for any kind of storage. Infers file system automatically.
Parameters
----------
path: str
Path to the file.
**kwargs
Will be used as filesystem options. (Options from settings are used
by default, you can overwrite them here.)
"""
super(FileTarget, self).__init__(str(path))
self.storage_options = kwargs

@property
def fs(self):
return get_fs(self.path, opts=self.storage_options, rtype='instance')

def open(self, *args, **kwargs):
return self.fs.open(self.path, *args, **kwargs)

def makedirs(self, *args, **kwargs):
self.fs.makedirs(os.path.dirname(self.path), *args, **kwargs)
4 changes: 2 additions & 2 deletions drfs/structure.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ def _get_nodes(self):
)
nodes = [n for n in nodes if not n[0].startswith('__')]
return nodes
def __str__(self):

def __repr__(self):
res = ""
for node_name, node_value in self._get_nodes():
if isinstance(node_value, DRPathMixin):
Expand Down
53 changes: 53 additions & 0 deletions drfs/tests/filesystems/test_azure_data_lake.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import pytest
from azure.datalake.store import AzureDLFileSystem
from mock import MagicMock

from drfs.filesystems import azure_datalake


@pytest.fixture(autouse=True)
def mock_azure_fs_native(monkeypatch):
fs = MagicMock(spec=AzureDLFileSystem)
fs.ls.return_value = [
'folder/directory/file.txt',
'folder/directory/file2.txt',
'folder/directory/file3.txt'
]
fs.glob.return_value = [
'folder/directory/file.txt',
'folder/directory/file2.txt',
'folder/directory/file3.txt'
]
fs.kwargs = {}
cls = MagicMock()
cls.return_value = fs
monkeypatch.setattr(azure_datalake, 'AzureDLFileSystem', cls)
monkeypatch.setattr(azure_datalake.lib, 'auth', lambda *args, **kwargs: 'token')


def test_custom_connect():
fs = azure_datalake.AzureDataLakeFileSystem()
path = fs._connect('adl://intvanprofi/some/path.txt')
assert fs.fs.kwargs['store_name'] == 'intvanprofi'
assert not path.startswith('adl://intvanprofi')


def test_ls():
fs = azure_datalake.AzureDataLakeFileSystem()
res = fs.ls('adl://intvanprofi/some/path/to/directory')

fs.fs.ls.assert_called_once_with('/some/path/to/directory')
for p in res:
assert p.hostname == 'intvanprofi'
assert p.scheme == 'adl'


def test_glob():
fs = azure_datalake.AzureDataLakeFileSystem()
res = fs.glob('adl://intvanprofi/some/path/to/*.csv')

fs.fs.glob.assert_called_once_with('/some/path/to/*.csv')

for p in res:
assert p.hostname == 'intvanprofi'
assert p.scheme == 'adl'
Loading

0 comments on commit 6dfcfae

Please sign in to comment.