Skip to content

Commit

Permalink
Merge pull request #153 from octoenergy/add_databricks_db_client
Browse files Browse the repository at this point in the history
Add databricks db client
  • Loading branch information
j0nnyr0berts authored Mar 23, 2021
2 parents a377abf + 019aa5a commit 5bddda0
Show file tree
Hide file tree
Showing 8 changed files with 131 additions and 5 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [0.0.15] - 2021-03-23
### Addition
- Add Databricks db client

## [0.0.14] - 2021-03-17
### Fix
- Temporarily pin sqlalchemy to <1.4 due to deprecated ResultProxy interface
Expand Down
18 changes: 18 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ The supported url protocols are:
* `sftp://path/to/file`
* `http://host.com/path/to/resource`
* `https://host.com/path/to/resource`
* `databricks+pyodbc://host/database`
* `postgresql://host/database::table` will allow you to write from a csv format into a database with the same column names (note that the table goes after `::` :warning:).

You can add the credentials for any of the urls in order to access protected resources.
Expand Down Expand Up @@ -287,6 +288,23 @@ The token file has been saved in a default location '~/.tentaclio_google_drive.j
The `credentials.json` file is not longer need, feel free to delete it.


## Configuring access to Databricks

In order to use Tentaclio to connect to a Databricks cluster or SQL endpoint, it is necessary to install the required
[ODBC driver](https://databricks.com/spark/odbc-drivers-download) for your operating system.

Once installed, it is possible to access Databricks as you would any supported URL protocol. However,
it is likely that you will have to pass some [additional variables](https://docs.databricks.com/integrations/bi/jdbc-odbc-bi.html)
in the URL query string, including the path to the installed driver.

For example, if your Databricks connection requires you to set DRIVER and HTTPPATH values,
the URL should look like this:

```
databricks+pyodbc://<token>@<host>/<database>?DRIVER=<path/to/driver>&HTTPPath=<http_path>
```


## Quick note on protocols structural subtyping.

In order to abstract concrete dependencies from the implementation of data related functions (or in any part of the system really) we use typed [protocols](https://mypy.readthedocs.io/en/latest/protocols.html#simple-user-defined-protocols). This allows a more flexible dependency injection than using subclassing or [more complex approches](http://code.activestate.com/recipes/413268/). This idea is heavily inspired by how this exact thing is done in [go](https://www.youtube.com/watch?v=ifBUfIb7kdo). Learn more about this principle in our [tech blog](https://tech.octopus.energy/news/2019/03/21/python-interfaces-a-la-go.html).
4 changes: 3 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from setuptools.command.install import install


VERSION = "0.0.14"
VERSION = "0.0.15"

REPO_ROOT = pathlib.Path(__file__).parent

Expand Down Expand Up @@ -40,6 +40,8 @@ def run(self):
"requests",
# Postgres
"psycopg2-binary",
# Databricks
"pyodbc",
# Sqlalchemy
"sqlalchemy<1.4",
# Athena
Expand Down
1 change: 1 addition & 0 deletions src/tentaclio/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
# Db registry
DB_REGISTRY.register("postgresql", PostgresClient)
DB_REGISTRY.register("awsathena+rest", AthenaClient)
DB_REGISTRY.register("databricks+pyodbc", DatabricksClient)

COPIER_REGISTRY.register("s3+s3", S3Client("s3://"))

Expand Down
1 change: 1 addition & 0 deletions src/tentaclio/clients/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@
from .base_client import * # noqa
from .local_fs_client import * # noqa
from .google_drive_client import * # noqa
from .databricks_client import * # noqa
39 changes: 39 additions & 0 deletions src/tentaclio/clients/databricks_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
"""Databricks query client."""
from typing import Dict
import urllib

from sqlalchemy.engine import Connection, create_engine

from . import sqla_client


class DatabricksClient(sqla_client.SQLAlchemyClient):
"""Databricks client, backed by a pyodbc + SQLAlchemy connection."""

def _connect(self) -> Connection:
odbc_connection_map = self._build_odbc_connection_dict()
connection_url = build_odbc_connection_string(**odbc_connection_map)

if self.engine is None:
self.engine = create_engine(
f"mssql+pyodbc:///?odbc_connect={connection_url}"
)
return self.engine.connect()

def _build_odbc_connection_dict(self) -> Dict:
odbc_connection_string_map = {
"UID": "token",
"PWD": self.username,
"HOST": self.host,
"PORT": self.port,
"Schema": self.database,
}
if self.url.query:
odbc_connection_string_map.update(self.url.query)
return odbc_connection_string_map


def build_odbc_connection_string(**kwargs) -> str:
"""Build a url formatted odbc connection string from kwargs."""
connection_url = ";".join([f"{k}={v}" for k, v in kwargs.items()])
return urllib.parse.quote(connection_url)
8 changes: 4 additions & 4 deletions src/tentaclio/clients/sqla_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,19 +119,19 @@ def delete_schema(self, meta_data: MetaData) -> None:
# Query methods:

@decorators.check_conn
def query(self, sql_query: str, params: dict = None, **kwargs) -> result.ResultProxy:
def query(self, sql_query: str, **kwargs) -> result.ResultProxy:
"""Execute a read-only SQL query, and return results.
This will not commit any changes to the database.
"""
return self.conn.execute(sql_query, params=params, **kwargs)
return self.conn.execute(sql_query, **kwargs)

@decorators.check_conn
def execute(self, sql_query: str, params: dict = None, **kwargs) -> None:
def execute(self, sql_query: str, **kwargs) -> None:
"""Execute a raw SQL query command."""
trans = self.conn.begin()
try:
self.conn.execute(sql_query, params=params, **kwargs)
self.conn.execute(sql_query, **kwargs)
except Exception:
trans.rollback()
raise
Expand Down
61 changes: 61 additions & 0 deletions tests/unit/clients/test_databricks_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import pytest
from tentaclio.clients.databricks_client import (
build_odbc_connection_string,
DatabricksClient,
)
from typing import Dict


@pytest.mark.parametrize(
"url, expected",
[
(
"databricks+pyodbc://my_t0k3n@db_host:443/database",
{
"UID": "token",
"PWD": "my_t0k3n",
"HOST": "db_host",
"PORT": 443,
"Schema": "database",
}
),
(
"databricks+pyodbc://my_t0k3n@db_host:443/",
{
"UID": "token",
"PWD": "my_t0k3n",
"HOST": "db_host",
"PORT": 443,
"Schema": "",
}
),
(
"databricks+pyodbc://my_t0k3n@db_host:443/database"
"?HTTPPath=sql/protocolv1/&AuthMech=3&SparkServerType=3"
"&ThriftTransport=2&SSL=1&IgnoreTransactions=1&DRIVER=/path/to/driver",
{
"UID": "token",
"PWD": "my_t0k3n",
"HOST": "db_host",
"PORT": 443,
"Schema": "database",
"AuthMech": '3',
"HTTPPath": "sql/protocolv1/",
"IgnoreTransactions": "1",
"SSL": "1",
"ThriftTransport": "2",
"SparkServerType": "3",
"DRIVER": "/path/to/driver"
}
)
],
)
def test_build_odbc_connection_dict(url: str, expected: Dict):
output = DatabricksClient(url)._build_odbc_connection_dict()
assert output == expected


def test_build_odbc_connection_string():
conn_dict = {"UID": "user", "PWD": "p@ssw0rd", "HOST": "db_host", "PORT": 443}
output = build_odbc_connection_string(**conn_dict)
assert output == "UID%3Duser%3BPWD%3Dp%40ssw0rd%3BHOST%3Ddb_host%3BPORT%3D443"

0 comments on commit 5bddda0

Please sign in to comment.