From 71ada13afa00888aa26606efb8e396165fd88d06 Mon Sep 17 00:00:00 2001 From: Nils Braun Date: Tue, 13 Oct 2020 11:10:46 +0200 Subject: [PATCH] Implement a SQL server speaking the presto protocol (#56) * Implement a SQL server speaking the presto protocol This server can be used to run dask-sql in a standalone application. It is e.g. possible to run it in a dask-cluster and answer SQL queries from external. * Stylefix * Fixes to docker build setup --- .coveragerc | 1 - .github/workflows/deploy.yml | 14 ++ Dockerfile | 27 +++ README.md | 30 ++- conda.yaml | 2 + dask_sql/__init__.py | 1 + dask_sql/context.py | 2 +- dask_sql/server/app.py | 108 +++++++++ dask_sql/server/handler.py | 98 --------- dask_sql/server/messages.py | 361 ------------------------------- dask_sql/server/responses.py | 110 ++++++++++ dask_sql/utils.py | 11 +- docs/index.rst | 1 + docs/pages/api.rst | 2 + docs/pages/server.rst | 96 ++++++++ setup.py | 6 +- tests/integration/test_server.py | 110 ++++++++++ 17 files changed, 505 insertions(+), 475 deletions(-) create mode 100644 Dockerfile create mode 100644 dask_sql/server/app.py delete mode 100644 dask_sql/server/handler.py delete mode 100644 dask_sql/server/messages.py create mode 100644 dask_sql/server/responses.py create mode 100644 docs/pages/server.rst create mode 100644 tests/integration/test_server.py diff --git a/.coveragerc b/.coveragerc index a001d26cf..ce124286e 100644 --- a/.coveragerc +++ b/.coveragerc @@ -1,6 +1,5 @@ [run] omit = tests/* - dask_sql/server/* branch = True [report] diff --git a/.github/workflows/deploy.yml b/.github/workflows/deploy.yml index 159f96636..b716bebfb 100644 --- a/.github/workflows/deploy.yml +++ b/.github/workflows/deploy.yml @@ -41,3 +41,17 @@ jobs: run: | python setup.py sdist bdist_wheel twine upload dist/* + + push_to_registry: + name: Push Docker image to Docker Hub + runs-on: ubuntu-latest + steps: + - name: Check out the repo + uses: actions/checkout@v2 + - name: Push to Docker Hub + uses: docker/build-push-action@v1 + with: + username: ${{ secrets.DOCKER_USERNAME }} + password: ${{ secrets.DOCKER_PASSWORD }} + repository: nbraun/dask-sql + tag_with_ref: true diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 000000000..1c351bd7d --- /dev/null +++ b/Dockerfile @@ -0,0 +1,27 @@ +# Dockerfile for dask-sql running the SQL server +# For more information, see https://dask-sql.readthedocs.io/. +FROM continuumio/miniconda3:4.8.2 +LABEL author "Nils Braun " + +# Install dependencies for dask-sql +COPY conda.yaml /opt/dask_sql/ +RUN /opt/conda/bin/conda install \ + --file /opt/dask_sql/conda.yaml \ + -c conda-forge + +# Build the java libraries +COPY setup.py /opt/dask_sql/ +COPY .git /opt/dask_sql/.git +COPY planner /opt/dask_sql/planner +RUN cd /opt/dask_sql/ \ + && python setup.py java + +# Install the python library +COPY dask_sql /opt/dask_sql/dask_sql +RUN cd /opt/dask_sql/ \ + && pip install -e . + +# Set the script to execute +EXPOSE 8080 +ENV JAVA_HOME /opt/conda +ENTRYPOINT [ "/opt/conda/bin/python", "/opt/dask_sql/dask_sql/server/app.py" ] \ No newline at end of file diff --git a/README.md b/README.md index 662b07db2..39d93447c 100644 --- a/README.md +++ b/README.md @@ -17,6 +17,12 @@ Some ideas for this project are coming from the very great [blazingSQL](https:// Read more in the [documentation](https://dask-sql.readthedocs.io/en/latest/). +You can try out `dask-sql` quickly by using the docker command + + docker run --rm -it -p 8080:8080 nils-braun/dask-sql + +See information in the SQL server at the end of this page. + --- **NOTE** @@ -145,20 +151,24 @@ After the translation to a relational algebra is done (using `RelationalAlgebraG ## SQL Server `dask-sql` comes with a small test implementation for a SQL server. -Instead of rebuilding a full ODBC driver, we re-use the [postgreSQL wire protocol](https://www.postgresql.org/docs/9.3/protocol-flow.html). -It is - so far - just a proof of concept +Instead of rebuilding a full ODBC driver, we re-use the [presto wire protocol](https://github.com/prestodb/presto/wiki/HTTP-Protocol). +It is - so far - only a start of the development and missing important concepts, such as +authentication. -You can test the sql postgres server by running +You can test the sql presto server by running - python dask_sql/server/handler.py + python dask_sql/server/app.py -in one terminal. This will spin up a server on port 9876 -that looks similar to a normal postgres database to any postgres client -(except that you can only do queries, no database creation etc.) +or by using the created docker image -You can test this for example with the default postgres client: + docker run --rm -it -p 8080:8080 nils-braun/dask-sql - psql -h localhost -p 9876 +in one terminal. This will spin up a server on port 8080 (by default) +that looks similar to a normal presto database to any presto client. + +You can test this for example with the default [presto client](https://prestosql.io/docs/current/installation/cli.html): + + presto --server localhost:8080 Now you can fire simple SQL queries (as no data is loaded by default): @@ -167,3 +177,5 @@ Now you can fire simple SQL queries (as no data is loaded by default): -------- 2 (1 row) + +You can find more information in the [documentation](https://dask-sql.readthedocs.io/en/latest/pages/server.html). diff --git a/conda.yaml b/conda.yaml index fdeaa8a0f..abfd05686 100644 --- a/conda.yaml +++ b/conda.yaml @@ -6,3 +6,5 @@ maven>=3.6.0 pytest>=6.0.1 pytest-cov>=2.10.1 sphinx>=3.2.1 +fastapi>=0.61.1 +uvicorn>=0.11.3 diff --git a/dask_sql/__init__.py b/dask_sql/__init__.py index 0cebf2bfd..4733d4f0a 100644 --- a/dask_sql/__init__.py +++ b/dask_sql/__init__.py @@ -1 +1,2 @@ from .context import Context +from .server.app import run_server diff --git a/dask_sql/context.py b/dask_sql/context.py index c195ce081..e1111c01f 100644 --- a/dask_sql/context.py +++ b/dask_sql/context.py @@ -293,7 +293,7 @@ def _prepare_schema(self): schema = DaskSchema(self.schema_name) if not self.tables: # pragma: no cover - logger.warn("No tables are registered.") + logger.warning("No tables are registered.") for name, dc in self.tables.items(): table = DaskTable(name) diff --git a/dask_sql/server/app.py b/dask_sql/server/app.py new file mode 100644 index 000000000..34ea5776b --- /dev/null +++ b/dask_sql/server/app.py @@ -0,0 +1,108 @@ +from argparse import ArgumentParser + +from dask_sql.server.responses import DataResults, QueryResults, ErrorResults +from fastapi import FastAPI, Request +import uvicorn + +from dask_sql import Context + +app = FastAPI() + + +@app.get("/v1/empty") +async def empty(request: Request): + """ + Helper endpoint returning an empty + result. + """ + return QueryResults(request=request) + + +@app.post("/v1/statement") +async def query(request: Request): + """ + Main endpoint returning query results + in the presto on wire format. + """ + try: + sql = (await request.body()).decode().strip() + df = request.app.c.sql(sql) + + return DataResults(df, request=request) + except Exception as e: + return ErrorResults(e, request=request) + + +def run_server( + context: Context = None, host: str = "0.0.0.0", port: int = 8080 +): # pragma: no cover + """ + Run a HTTP server for answering SQL queries using ``dask-sql``. + It uses the `Presto Wire Protocol `_ + for communication. + This means, it has a single POST endpoint `v1/statement`, which answers + SQL queries (as string in the body) with the output as a JSON + (in the format described in the documentation above). + Every SQL expression that ``dask-sql`` understands can be used here. + + Note: + The presto protocol also includes some statistics on the query + in the response. + These statistics are currently only filled with placeholder variables. + + Args: + context (:obj:`dask_sql.Context`): If set, use this context instead of an empty one. + host (:obj:`str`): The host interface to listen on (defaults to all interfaces) + port (:obj:`int`): The port to listen on (defaults to 8080) + + Example: + It is possible to run an SQL server by using the CLI script in ``dask_sql.server.app`` + or by calling this function directly in your user code: + + .. code-block:: python + + from dask_sql import run_server + + # Create your pre-filled context + c = Context() + ... + + run_server(context=c) + + After starting the server, it is possible to send queries to it, e.g. with the + `presto CLI `_ + or via sqlalchemy (e.g. using the `PyHive `_ package): + + .. code-block:: python + + from sqlalchemy.engine import create_engine + engine = create_engine('presto://localhost:8080/') + + import pandas as pd + pd.read_sql_query("SELECT 1 + 1", con=engine) + + Of course, it is also possible to call the usual ``CREATE TABLE`` + commands. + """ + if context is None: + context = Context() + + app.c = context + + uvicorn.run(app, host=host, port=port) + + +if __name__ == "__main__": + parser = ArgumentParser() + parser.add_argument( + "--host", + default="0.0.0.0", + help="The host interface to listen on (defaults to all interfaces)", + ) + parser.add_argument( + "--port", default=8080, help="The port to listen on (defaults to 8080)" + ) + + args = parser.parse_args() + + run_server(host=args.host, port=args.port) diff --git a/dask_sql/server/handler.py b/dask_sql/server/handler.py deleted file mode 100644 index 284ee1a4a..000000000 --- a/dask_sql/server/handler.py +++ /dev/null @@ -1,98 +0,0 @@ -import socketserver -import logging - -from dask_sql.context import Context -from dask_sql.server import messages - -logging.basicConfig(level=logging.DEBUG) - - -class ObservableHandler(socketserver.BaseRequestHandler): - def read_socket(self): - logging.debug("Start listening for new data...") - data = self.request.recv(1024) - logging.debug("... received {} bytes: {}".format(len(data), repr(data))) - return data - - def read_message(self, *classes): - data = self.read_socket() - for cls in classes: - try: - logging.debug(f"Trying to cast as {cls.__name__}") - return cls.from_data(data) - except Exception as e: - logging.debug(f"Failed!") - pass - - def send_to_socket(self, data): - logging.debug("Sending {} bytes: {}".format(len(data), repr(data))) - return self.request.sendall(data) - - def send_message(self, message): - return self.send_to_socket(message.to_struct()) - - -class Handler(ObservableHandler): - """ - Answer the postgres wire protocol by giving the SQL query - to the dask_sql context and sending back the result. - - Idea taken from https://stackoverflow.com/questions/335008/creating-a-custom-odbc-driver - - The protocol is described here: https://www.postgresql.org/docs/9.3/protocol-flow.html - """ - - def handle(self): - logging.debug("Handling new connection") - - self.read_message(messages.SSLRequest) - self.send_message(messages.EmptyNoticeMessage()) - - protocol_version, parameters = self.read_message(messages.StartupMessage) - logging.debug(f"Protocol: {protocol_version}, parameters: {parameters}") - - # TODO: one could implement password authentication here - # or at least have different contexts for different users. - # So far we just send out "OK" without authentication - self.send_message(messages.AuthenticationOkMessage()) - - c = Context() - - while True: - self.send_message(messages.ReadyForQueryMessage()) - query = self.read_message(messages.QueryMessage, messages.AbortMessage) - if not query: - # The user has aborted - break - - try: - # Execute the query agains the context - result = self._execute_query(query, c) - - # Send out the result - self._send_out_result(result) - except: - # TODO - pass - - def _execute_query(self, query, c): - assert len(query) == 1 - query = query[0] - query = query.rstrip("; ") - - df = c.sql(query) - result = df.compute() - return result - - def _send_out_result(self, result): - self.send_message(messages.RowDescriptionMessage(result)) - for index, row in result.iterrows(): - self.send_message(messages.RowMessage(row)) - - self.send_message(messages.CommandCompleteMessage(result)) - - -if __name__ == "__main__": - socketserver.TCPServer.allow_reuse_address = True - with socketserver.TCPServer(("localhost", 9876), Handler) as server: - server.serve_forever() diff --git a/dask_sql/server/messages.py b/dask_sql/server/messages.py deleted file mode 100644 index c1105f866..000000000 --- a/dask_sql/server/messages.py +++ /dev/null @@ -1,361 +0,0 @@ -import struct -import logging - - -class Message: - """ - Base class for a message in the postgres protocol. - Can be used to either construct a message from - arguments or by unpacking one from the received data. - """ - - def __init__(self, struct_type, indicator, message=None): - self.struct_type = struct_type - self.indicator = indicator - self.message = message - - logging.debug(f"Creating {type(self).__name__}") - print(self.struct_type, self.indicator, self.message) - - def to_struct(self): - """ - Return the message as a packed struct in binary format. - The postgres protocol (see https://www.postgresql.org/docs/9.3/protocol-message-formats.html) - typically foresees an indicator byte, - the message length (without that byte) and the message itself. - Exception: if there is no message, only the indicator will be sent. - """ - if self.message: - # Reduce by one, as indicator is not counted - return struct.pack( - self.struct_type, - self.indicator, - struct.calcsize(self.struct_type) - 1, - *self.message, - ) - else: - return struct.pack(self.struct_type, self.indicator) - - -class ReadyForQueryMessage(Message): - """ - ReadyForQuery - - Byte1('Z') - Identifies the message type. ReadyForQuery is sent whenever the backend is ready for a new query cycle. - - Int32(5) - Length of message contents in bytes, including self. - - Byte1 - Current backend transaction status indicator. Possible values are 'I' if idle (not in a transaction block); 'T' if in a transaction block; or 'E' if in a failed transaction block (queries will be rejected until block is ended). - """ - - def __init__(self): - super().__init__("!cic", b"Z", [b"I"]) - - -class AuthenticationOkMessage(Message): - """ - AuthenticationOk - - Byte1('R') - Identifies the message as an authentication request. - - Int32(8) - Length of message contents in bytes, including self. - - Int32(0) - Specifies that the authentication was successful. - """ - - def __init__(self): - super().__init__("!cii", b"R", [0]) - - -class AuthenticationPassworkOkMessage(Message): - """ - AuthenticationPasswordOk - - Byte1('R') - Identifies the message as an authentication request. - - Int32(8) - Length of message contents in bytes, including self. - - Int32(3) - Specifies that a clear-text password is required. - """ - - def __init__(self): - super().__init__("!cii", b"R", [3]) - - -class SSLRequest(Message): - """ - SSLRequest (F) - Int32(8) - Length of message contents in bytes, including self. - - Int32(80877103) - The SSL request code. The value is chosen to contain 1234 in the most significant 16 bits, and 5679 in the least significant 16 bits. (To avoid confusion, this code must not be the same as any protocol version number.) - """ - - @staticmethod - def from_data(data): - msglen, sslcode = struct.unpack("!ii", data) - assert msglen == 8 - assert sslcode == 80877103 - return True - - -class StartupMessage(Message): - """ - Int32 - Length of message contents in bytes, including self. - - Int32(196608) - The protocol version number. The most significant 16 bits are the major version number (3 for the protocol described here). The least significant 16 bits are the minor version number (0 for the protocol described here). - - The protocol version number is followed by one or more pairs of parameter name and value strings. A zero byte is required as a terminator after the last name/value pair. Parameters can appear in any order. user is required, others are optional. Each parameter is specified as: - - String - The parameter name. Currently recognized names are: - - user - The database user name to connect as. Required; there is no default. - - database - The database to connect to. Defaults to the user name. - - options - Command-line arguments for the backend. (This is deprecated in favor of setting individual run-time parameters.) - - In addition to the above, other parameters may be listed. Parameter names beginning with _pq_. are reserved for use as protocol extensions, while others are treated as run-time parameters to be set at backend start time. Such settings will be applied during backend start (after parsing the command-line arguments if any) and will act as session defaults. - - String - The parameter value. - """ - - def __init__(self): - raise NotImplementedError - - @staticmethod - def from_data(data): - assert len(data) >= 8 - message_len, protoversion = struct.unpack("!ii", data[0:8]) - - assert message_len == len(data) - parameters_string = data[8:] - parameters_string = [p.decode() for p in parameters_string.split(b"\x00")] - - keys = filter(None, parameters_string[::2]) - values = filter(None, parameters_string[1::2]) - return protoversion, dict(zip(keys, values)) - - -class EmptyNoticeMessage(Message): - """ - NoticeResponse - - Byte1('N') - Identifies the message as a notice. - - Int32 - Length of message contents in bytes, including self. - - The message body consists of one or more identified fields, followed by a zero byte as a terminator. Fields can appear in any order. For each field there is the following: - - Byte1 - A code identifying the field type; if zero, this is the message terminator and no string follows. The presently defined field types are listed in Section 48.6. Since more field types might be added in future, frontends should silently ignore fields of unrecognized type. - - String - The field value. - """ - - def __init__(self): - super().__init__("!c", b"N") - - -class QueryMessage(Message): - """ - Byte1('Q') - Identifies the message as a simple query. - - Int32 - Length of message contents in bytes, including self. - - String - The query string itself. - - or - - Byte1('X') - Identifies the message as a termination. - - Int32(4) - Length of message contents in bytes, including self. - """ - - def __init__(self): - raise NotImplementedError - - @staticmethod - def from_data(data): - msgident, msglen = struct.unpack("!ci", data[0:5]) - - assert msgident == b"Q" - assert msglen == len(data) - 1 - return [query.decode() for query in data[5:].split(b"\x00") if query] - - -class AbortMessage(Message): - """ - Byte1('X') - Identifies the message as a termination. - - Int32(4) - Length of message contents in bytes, including self. - """ - - def __init__(self): - raise NotImplementedError - - @staticmethod - def from_data(data): - msgident, msglen = struct.unpack("!ci", data[0:5]) - - assert msgident == b"X" - assert msglen == len(data) - return False - - -class RowDescriptionMessage(Message): - """ - Byte1('T') - Identifies the message as a row description. - - Int32 - Length of message contents in bytes, including self. - - Int16 - Specifies the number of fields in a row (can be zero). - - Then, for each field, there is the following: - - String - The field name. - - Int32 - If the field can be identified as a column of a specific table, the object ID of the table; otherwise zero. - - Int16 - If the field can be identified as a column of a specific table, the attribute number of the column; otherwise zero. - - Int32 - The object ID of the field's data type. - - Int16 - The data type size (see pg_type.typlen). Note that negative values denote variable-width types. - - Int32 - The type modifier (see pg_attribute.atttypmod). The meaning of the modifier is type-specific. - - Int16 - The format code being used for the field. Currently will be zero (text) or one (binary). In a RowDescription returned from the statement variant of Describe, the format code is not yet known and will always be zero. - """ - - def __init__(self, df): - fieldnames = [str(col).encode() for col in df.columns] - - struct_type = "!cih" - fields = [len(fieldnames)] - for col in fieldnames: - struct_type += f"{len(col) + 1}sihihih" - fields += self._column_description(col) - - super().__init__(struct_type, b"T", fields) - - def _column_description(self, column_name): - tableid = 0 - columnid = 0 - datatypeid = 23 # TODO (23 = int4) - datatypesize = 4 # TODO (4 bytes) - typemodifier = -1 # TODO - format_code = 0 # 0=text 1=binary - return ( - column_name + b"\x00", - tableid, - columnid, - datatypeid, - datatypesize, - typemodifier, - format_code, - ) - - -class RowMessage(Message): - """ - Byte1('D') - Identifies the message as a data row. - - Int32 - Length of message contents in bytes, including self. - - Int16 - The number of column values that follow (possibly zero). - - Next, the following pair of fields appear for each column: - - Int32 - The length of the column value, in bytes (this count does not include itself). Can be zero. As a special case, -1 indicates a NULL column value. No value bytes follow in the NULL case. - - Byten - The value of the column, in the format indicated by the associated format code. n is the above length. - """ - - def __init__(self, row): - struct_type = "!cih" - fields = [len(row)] - - for field in row: - # TODO: we just convert to string. Not sure if this is the best thing to do - field_content = str(field).encode() - field_len = len(field_content) - struct_type += f"i{field_len}s" - fields += [field_len, field_content] - - super().__init__(struct_type, b"D", fields) - - -class CommandCompleteMessage(Message): - """ - Byte1('C') - Identifies the message as a command-completed response. - - Int32 - Length of message contents in bytes, including self. - - String - The command tag. This is usually a single word that identifies which SQL command was completed. - - For an INSERT command, the tag is INSERT oid rows, where rows is the number of rows inserted. oid is the object ID of the inserted row if rows is 1 and the target table has OIDs; otherwise oid is 0. - - For a DELETE command, the tag is DELETE rows where rows is the number of rows deleted. - - For an UPDATE command, the tag is UPDATE rows where rows is the number of rows updated. - - For a SELECT or CREATE TABLE AS command, the tag is SELECT rows where rows is the number of rows retrieved. - - For a MOVE command, the tag is MOVE rows where rows is the number of rows the cursor's position has been changed by. - - For a FETCH command, the tag is FETCH rows where rows is the number of rows that have been retrieved from the cursor. - - For a COPY command, the tag is COPY rows where rows is the number of rows copied. (Note: the row count appears only in PostgreSQL 8.2 and later.) - """ - - def __init__(self, df): - msg = f"SELECT {len(df)}\x00".encode() - struct_type = f"!ci{len(msg)}s" - - super().__init__(struct_type, b"C", [msg]) diff --git a/dask_sql/server/responses.py b/dask_sql/server/responses.py new file mode 100644 index 000000000..de61a3b1e --- /dev/null +++ b/dask_sql/server/responses.py @@ -0,0 +1,110 @@ +from typing import List +import uuid + +from fastapi import Request, FastAPI +import dask.dataframe as dd + +from dask_sql.mappings import python_to_sql_type + + +class StageStats: + def __init__(self): + self.stageId = "" + self.state = "" + self.done = True + self.nodes = 0 + self.totalSplits = 0 + self.queuedSplits = 0 + self.runningSplits = 0 + self.completedSplits = 0 + self.cpuTimeMillis = 0 + self.wallTimeMillis = 0 + self.processedRows = 0 + self.processedBytes = 0 + self.subStages = [] + + +class StatementStats: + def __init__(self): + self.state = "" + self.queued = False + self.scheduled = False + self.nodes = 0 + self.totalSplits = 0 + self.queuedSplits = 0 + self.runningSplits = 0 + self.completedSplits = 0 + self.cpuTimeMillis = 0 + self.wallTimeMillis = 0 + self.queuedTimeMillis = 0 + self.elapsedTimeMillis = 0 + self.processedRows = 0 + self.processedBytes = 0 + self.peakMemoryBytes = 0 + self.peakTotalMemoryBytes = 0 + self.peakTaskTotalMemoryBytes = 0 + self.spilledBytes = 0 + self.rootStage = StageStats() + + +class QueryResults: + def __init__(self, request: Request): + empty_url = str(request.url.replace(path=request.app.url_path_for("empty"))) + + self.id = str(uuid.uuid4()) + self.infoUri = empty_url + self.stats = StatementStats() + self.warnings = [] + + +class DataResults(QueryResults): + @staticmethod + def get_column_description(df): + sql_types = [str(python_to_sql_type(t)) for t in df.dtypes] + column_names = df.columns + return [ + { + "name": column_name, + "type": sql_type.lower(), + "typeSignature": {"rawType": sql_type.lower(), "arguments": []}, + } + for column_name, sql_type in zip(column_names, sql_types) + ] + + @staticmethod + def get_data_description(df): + return df.itertuples(index=False, name=None) + + def __init__(self, df: dd.DataFrame, request: Request): + super().__init__(request) + + if df is None: + return + + self.columns = self.get_column_description(df) + self.data = self.get_data_description(df) + self.nextUri = self.infoUri # use empty URL + self.partialCancelUri = self.infoUri # use empty URL + + +class ErrorResults(QueryResults): + def __init__(self, error: Exception, request: Request): + super().__init__(request) + + self.error = QueryError(error) + + +class QueryError: + def __init__(self, error: Exception): + self.message = str(error) + self.errorCode = 0 + self.errorName = str(type(error)) + self.errorType = "USER_ERROR" + + try: + self.errorLocation = { + "lineNumber": error.from_line + 1, + "columnNumber": error.from_col + 1, + } + except AttributeError: # pragma: no cover + pass diff --git a/dask_sql/utils.py b/dask_sql/utils.py index 855b2324b..143eb5934 100644 --- a/dask_sql/utils.py +++ b/dask_sql/utils.py @@ -53,7 +53,12 @@ def __init__(self, sql, validation_exception_string): Create a new exception out of the SQL query and the exception text raise by calcite. """ - message = self._extract_message(sql, validation_exception_string) + message, from_line, from_col = self._extract_message( + sql, validation_exception_string + ) + self.from_line = from_line + self.from_col = from_col + super().__init__(message) @staticmethod @@ -81,7 +86,7 @@ def _extract_message(self, sql, validation_exception_string): match = re.match(self.JAVA_MSG_REGEX, message) if not match: # Don't understand this message - just return it - return message + return message, 1, 1 match = match.groupdict() @@ -124,7 +129,7 @@ def _extract_message(self, sql, validation_exception_string): message += "The problem is probably somewhere here:\n" message += "\n\t" + "\n\t".join(sql) - return message + return message, from_line, from_col class LoggableDataFrame: diff --git a/docs/index.rst b/docs/index.rst index 7319f5275..40e45baec 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -69,6 +69,7 @@ but any other data (from disk, S3, API, hdfs) can be used. pages/sql pages/custom pages/api + pages/server pages/how_does_it_work diff --git a/docs/pages/api.rst b/docs/pages/api.rst index 8627bd5ee..0716705a8 100644 --- a/docs/pages/api.rst +++ b/docs/pages/api.rst @@ -5,3 +5,5 @@ API Documentation :members: :undoc-members: +.. autofunction:: dask_sql.run_server + diff --git a/docs/pages/server.rst b/docs/pages/server.rst new file mode 100644 index 000000000..b929524ec --- /dev/null +++ b/docs/pages/server.rst @@ -0,0 +1,96 @@ +SQL Server +========== + +``dask-sql`` comes with a small test implementation for a SQL server. +Instead of rebuilding a full ODBC driver, we re-use the `presto wire protocol `_. + +.. note:: + + It is - so far - only a start of the development and missing important concepts, such as + authentication. + + You need to install ``fastapi>=0.61.1`` and ``uvicorn>=0.11.3`` before running the server. + +You can test the sql presto server by running + +.. code-block:: bash + + python dask_sql/server/app.py + +in one terminal. This will spin up a server on port 8080 (by default). +The port and bind interfaces can be controlled with the ``--port`` and ``--host`` switch. + +The running server looks similar to a normal presto database to any presto client and can therefore be used +with any library, e.g. the `presto CLI client `_ or +``sqlalchemy`` via the `PyHive `_ package: + + +.. code-block:: bash + + presto --server localhost:8080 + +Now you can fire simple SQL queries (as no data is loaded by default): + +.. code-block:: + + => SELECT 1 + 1; + EXPR$0 + -------- + 2 + (1 row) + +Or via ``sqlalchemy`` (after having installed ``PyHive``): + +.. code-block:: python + + from sqlalchemy.engine import create_engine + engine = create_engine('presto://localhost:8080/') + + import pandas as pd + pd.read_sql_query("SELECT 1 + 1", con=engine) + +Of course, it is also possible to call the usual ``CREATE TABLE`` +commands. + + + +Run it in your own ``dask`` cluster +----------------------------------- + +The SQL server implementation in ``dask-sql`` allows you to run a SQL server as a service connected to your ``dask`` cluster. +This enables your users to run SQL command leveraging the full power of your ``dask`` cluster without the need to write python code +and allows also the usage of different non-python tools (such as BI tools) as long as they can speak the presto protocol. + +To run a standalone SQL server in your ``dask`` cluster, follow these three steps: + +1. Create a startup script to connect ``dask-sql`` to your cluster. + There exist many different ways to connect to a ``dask`` cluster (e.g. direct access to the scheduler, + dask gateway, ...). Choose the one suitable for your cluster and create a small startup script: + + .. code-block:: python + + # Connect to your cluster here, e.g. + from dask.distributed import Client + client = Client(scheduler_address) + + # Maybe pre-fill the ``dask-sql`` context: + from dask_sql import Context + c = Context() + ... + + # Then spin up the ``dask-sql`` server + from dask_sql import run_server + run_server(context=c) + +2. Deploy this script to your cluster as a service. How you do this, depends on your cluster infrastructure (kubernetes, mesos, openshift, ...). + For example you could create a docker image with a dockerfile similar to this: + + .. code-block:: dockerfile + + FROM nbraun/dask-sql + + COPY startup_script.py /opt/dask_sql/startup_script.py + + ENTRYPOINT [ "/opt/conda/bin/python", "/opt/dask_sql/startup_script.py" ] + +3. After your service is deployed, you can use it in your applications as a "normal" presto database. \ No newline at end of file diff --git a/setup.py b/setup.py index 00f1b3b8c..8dc137b32 100755 --- a/setup.py +++ b/setup.py @@ -47,8 +47,10 @@ def run(self): setuptools.command.build_py.build_py.run(self) -with open("README.md") as f: - long_description = f.read() +long_description = "" +if os.path.exists("README.md"): + with open("README.md") as f: + long_description = f.read() needs_sphinx = "build_sphinx" in sys.argv sphinx_requirements = ["sphinx>=3.2.1", "sphinx_rtd_theme"] if needs_sphinx else [] diff --git a/tests/integration/test_server.py b/tests/integration/test_server.py new file mode 100644 index 000000000..755dc9436 --- /dev/null +++ b/tests/integration/test_server.py @@ -0,0 +1,110 @@ +import os +import tempfile + +from fastapi.testclient import TestClient + +from dask_sql import Context +from dask_sql.server.app import app +from tests.integration.fixtures import DaskTestCase + + +class TestServer(DaskTestCase): + def setUp(self): + super().setUp() + + app.c = Context() + self.client = TestClient(app) + + self.f = os.path.join(tempfile.gettempdir(), os.urandom(24).hex()) + + def tearDown(self): + super().tearDown() + + if os.path.exists(self.f): + os.unlink(self.f) + + def test_routes(self): + self.assertEqual( + self.client.post("/v1/statement", data="SELECT 1 + 1").status_code, 200 + ) + self.assertEqual( + self.client.get("/v1/statement", data="SELECT 1 + 1").status_code, 405 + ) + self.assertEqual(self.client.get("/v1/empty").status_code, 200) + + def test_sql_query(self): + response = self.client.post("/v1/statement", data="SELECT 1 + 1") + self.assertEqual(response.status_code, 200) + + result = response.json() + + self.assertIn("columns", result) + self.assertIn("data", result) + self.assertEqual( + result["columns"], + [ + { + "name": "1 + 1", + "type": "integer", + "typeSignature": {"rawType": "integer", "arguments": []}, + } + ], + ) + self.assertEqual(result["data"], [[2]]) + self.assertNotIn("error", result) + + def test_wrong_sql_query(self): + response = self.client.post("/v1/statement", data="SELECT 1 + ") + self.assertEqual(response.status_code, 200) + + result = response.json() + + self.assertNotIn("columns", result) + self.assertNotIn("data", result) + self.assertIn("error", result) + self.assertIn("message", result["error"]) + self.assertIn("errorLocation", result["error"]) + self.assertEqual( + result["error"]["errorLocation"], {"lineNumber": 1, "columnNumber": 10} + ) + + def test_add_and_query(self): + self.df.to_csv(self.f, index=False) + + response = self.client.post( + "/v1/statement", + data=f""" + CREATE TABLE + new_table + WITH ( + location = '{self.f}', + format = 'csv' + ) + """, + ) + self.assertEqual(response.status_code, 200) + + response = self.client.post("/v1/statement", data="SELECT * FROM new_table") + self.assertEqual(response.status_code, 200) + + result = response.json() + + self.assertIn("columns", result) + self.assertIn("data", result) + self.assertEqual( + result["columns"], + [ + { + "name": "a", + "type": "double", + "typeSignature": {"rawType": "double", "arguments": []}, + }, + { + "name": "b", + "type": "double", + "typeSignature": {"rawType": "double", "arguments": []}, + }, + ], + ) + self.assertEqual(len(result["data"]), 700) + self.assertNotIn("error", result)