-
Notifications
You must be signed in to change notification settings - Fork 72
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Added link to binder repository (forked from @raybellwaves) (#72)
* Added link to binder repository (forked from @raybellwaves) and two notebooks * Move to the new dask-sql API
- Loading branch information
1 parent
8e472ce
commit 05b08ae
Showing
3 changed files
with
335 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,159 @@ | ||
{ | ||
"cells": [ | ||
{ | ||
"cell_type": "markdown", | ||
"metadata": {}, | ||
"source": [ | ||
"# Custom Functions" | ||
] | ||
}, | ||
{ | ||
"cell_type": "markdown", | ||
"metadata": {}, | ||
"source": [ | ||
"Apart from the SQL functions that are already implemented in `dask-sql`, it is possible to add custom functions and aggregations.\n", | ||
"Have a look into [the documentation](https://dask-sql.readthedocs.io/en/latest/pages/custom.html) for more information." | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": null, | ||
"metadata": {}, | ||
"outputs": [], | ||
"source": [ | ||
"import numpy as np\n", | ||
"import dask.dataframe as dd\n", | ||
"import dask.datasets\n", | ||
"from dask_sql.context import Context" | ||
] | ||
}, | ||
{ | ||
"cell_type": "markdown", | ||
"metadata": {}, | ||
"source": [ | ||
"We use some generated test data for the notebook:" | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": null, | ||
"metadata": {}, | ||
"outputs": [], | ||
"source": [ | ||
"c = Context()\n", | ||
"\n", | ||
"df = dask.datasets.timeseries().reset_index().persist()\n", | ||
"c.create_table(\"timeseries\", df)" | ||
] | ||
}, | ||
{ | ||
"cell_type": "markdown", | ||
"metadata": {}, | ||
"source": [ | ||
"As a first step, we will create a scalar function to calculate the absolute value of a column.\n", | ||
"(Please note that this can also be done via the `ABS` function in SQL):" | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": null, | ||
"metadata": {}, | ||
"outputs": [], | ||
"source": [ | ||
"# The input to the function will be a dask series\n", | ||
"def my_abs(x):\n", | ||
" return x.abs()\n", | ||
"\n", | ||
"# As SQL is a typed language, we need to specify all types \n", | ||
"c.register_function(my_abs, \"MY_ABS\", parameters=[(\"x\", np.float64)], return_type=np.float64)" | ||
] | ||
}, | ||
{ | ||
"cell_type": "markdown", | ||
"metadata": {}, | ||
"source": [ | ||
"We are now able to use our new function in all queries" | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": null, | ||
"metadata": {}, | ||
"outputs": [], | ||
"source": [ | ||
"c.sql(\"\"\"\n", | ||
" SELECT\n", | ||
" x, y, MY_ABS(x) AS \"abs_x\", MY_ABS(y) AS \"abs_y\"\n", | ||
" FROM\n", | ||
" \"timeseries\"\n", | ||
" WHERE\n", | ||
" MY_ABS(x * y) > 0.5\n", | ||
"\"\"\").compute()" | ||
] | ||
}, | ||
{ | ||
"cell_type": "markdown", | ||
"metadata": {}, | ||
"source": [ | ||
"Next, we will register an aggregation, which gets a column as input and returns a single value.\n", | ||
"An aggregation needs to be an instance of `dask.Aggregation` (see the [dask docu](https://docs.dask.org/en/latest/dataframe-groupby.html#aggregate))." | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": null, | ||
"metadata": {}, | ||
"outputs": [], | ||
"source": [ | ||
"my_sum = dd.Aggregation(\"MY_SUM\", lambda x: x.sum(), lambda x: x.sum())\n", | ||
"\n", | ||
"c.register_aggregation(my_sum, \"MY_SUM\", [(\"x\", np.float64)], np.float64)" | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": null, | ||
"metadata": {}, | ||
"outputs": [], | ||
"source": [ | ||
"c.sql(\"\"\"\n", | ||
" SELECT\n", | ||
" name, MY_SUM(x) AS \"my_sum\"\n", | ||
" FROM\n", | ||
" \"timeseries\"\n", | ||
" GROUP BY\n", | ||
" name\n", | ||
" LIMIT 10\n", | ||
"\"\"\").compute()" | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": null, | ||
"metadata": {}, | ||
"outputs": [], | ||
"source": [] | ||
} | ||
], | ||
"metadata": { | ||
"kernelspec": { | ||
"display_name": "Python 3", | ||
"language": "python", | ||
"name": "python3" | ||
}, | ||
"language_info": { | ||
"codemirror_mode": { | ||
"name": "ipython", | ||
"version": 3 | ||
}, | ||
"file_extension": ".py", | ||
"mimetype": "text/x-python", | ||
"name": "python", | ||
"nbconvert_exporter": "python", | ||
"pygments_lexer": "ipython3", | ||
"version": "3.8.5" | ||
} | ||
}, | ||
"nbformat": 4, | ||
"nbformat_minor": 4 | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,175 @@ | ||
{ | ||
"cells": [ | ||
{ | ||
"cell_type": "markdown", | ||
"metadata": {}, | ||
"source": [ | ||
"# dask-sql Introduction" | ||
] | ||
}, | ||
{ | ||
"cell_type": "markdown", | ||
"metadata": {}, | ||
"source": [ | ||
"`dask-sql` lets you query your (dask) data using usual SQL language.\n", | ||
"You can find more information on the usage in the [documentation](https://dask-sql.readthedocs.io/)." | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": null, | ||
"metadata": {}, | ||
"outputs": [], | ||
"source": [ | ||
"from dask_sql import Context\n", | ||
"from dask.datasets import timeseries\n", | ||
"from dask.distributed import Client" | ||
] | ||
}, | ||
{ | ||
"cell_type": "markdown", | ||
"metadata": {}, | ||
"source": [ | ||
"As a first step, we will create a dask client to connect to a local dask cluster (which is started implicitly).\n", | ||
"You can open the dashboard by clicking on the shown link (in binder, this is already open on the left)." | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": null, | ||
"metadata": {}, | ||
"outputs": [], | ||
"source": [ | ||
"client = Client()\n", | ||
"client" | ||
] | ||
}, | ||
{ | ||
"cell_type": "markdown", | ||
"metadata": {}, | ||
"source": [ | ||
"Next, we create a context to hold the registered tables.\n", | ||
"You typically only do this once in your application." | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": null, | ||
"metadata": {}, | ||
"outputs": [], | ||
"source": [ | ||
"c = Context()" | ||
] | ||
}, | ||
{ | ||
"cell_type": "markdown", | ||
"metadata": {}, | ||
"source": [ | ||
"Load the data and register it in the context. This will give the table a name.\n", | ||
"In this example, we generate random data.\n", | ||
"It is also possible to load data from file, S3, hdfs etc.\n", | ||
"Have a look into [Data Loading](https://dask-sql.readthedocs.io/en/latest/pages/data_input.html) for more information." | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": null, | ||
"metadata": {}, | ||
"outputs": [], | ||
"source": [ | ||
"df = timeseries()\n", | ||
"c.create_table(\"timeseries\", df)" | ||
] | ||
}, | ||
{ | ||
"cell_type": "markdown", | ||
"metadata": {}, | ||
"source": [ | ||
"Now execute an SQL query. \n", | ||
"The result is a dask dataframe.\n", | ||
"\n", | ||
"The query looks for the id with the highest x for each name (this is just random test data, but you could think of looking for outliers in the sensor data)." | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": null, | ||
"metadata": {}, | ||
"outputs": [], | ||
"source": [ | ||
"result = c.sql(\"\"\"\n", | ||
" SELECT\n", | ||
" lhs.name,\n", | ||
" lhs.id,\n", | ||
" lhs.x\n", | ||
" FROM\n", | ||
" timeseries AS lhs\n", | ||
" JOIN\n", | ||
" (\n", | ||
" SELECT\n", | ||
" name AS max_name,\n", | ||
" MAX(x) AS max_x\n", | ||
" FROM timeseries\n", | ||
" GROUP BY name\n", | ||
" ) AS rhs\n", | ||
" ON\n", | ||
" lhs.name = rhs.max_name AND\n", | ||
" lhs.x = rhs.max_x\n", | ||
"\"\"\")" | ||
] | ||
}, | ||
{ | ||
"cell_type": "markdown", | ||
"metadata": {}, | ||
"source": [ | ||
"Now we can show the result:" | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": null, | ||
"metadata": {}, | ||
"outputs": [], | ||
"source": [ | ||
"result.compute()" | ||
] | ||
}, | ||
{ | ||
"cell_type": "markdown", | ||
"metadata": {}, | ||
"source": [ | ||
"... or use it for any other dask calculation" | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": null, | ||
"metadata": {}, | ||
"outputs": [], | ||
"source": [ | ||
"result.x.mean().compute()" | ||
] | ||
} | ||
], | ||
"metadata": { | ||
"kernelspec": { | ||
"display_name": "Python 3", | ||
"language": "python", | ||
"name": "python3" | ||
}, | ||
"language_info": { | ||
"codemirror_mode": { | ||
"name": "ipython", | ||
"version": 3 | ||
}, | ||
"file_extension": ".py", | ||
"mimetype": "text/x-python", | ||
"name": "python", | ||
"nbconvert_exporter": "python", | ||
"pygments_lexer": "ipython3", | ||
"version": "3.8.5" | ||
} | ||
}, | ||
"nbformat": 4, | ||
"nbformat_minor": 4 | ||
} |