Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Index.map functionality #2136

Closed
wants to merge 33 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
5ed61f2
Initial Index.map impl
awdavidson Apr 1, 2021
e2ac4f0
Initial Index.map impl
awdavidson Apr 1, 2021
ae9c3f3
Reformat
awdavidson Apr 1, 2021
1a55284
Add pd.Series compatability
awdavidson Apr 2, 2021
13706cc
Avoid collects
awdavidson Apr 2, 2021
f48da2e
Update impl
awdavidson Apr 4, 2021
1f794f7
Clean up impl and add docs
awdavidson Apr 5, 2021
18e37c0
reformat
awdavidson Apr 5, 2021
3499aa0
reformat
awdavidson Apr 5, 2021
a949400
reformat
awdavidson Apr 5, 2021
af72e24
Reformat
awdavidson Apr 6, 2021
97e03d3
Reformat
awdavidson Apr 6, 2021
7c1c678
Fix comment
awdavidson Apr 6, 2021
ced7d97
Remove unused import
awdavidson Apr 6, 2021
694650a
Update
awdavidson Apr 6, 2021
f845001
Merge branch 'master' of github.com:databricks/koalas into feature/im…
awdavidson Apr 7, 2021
136bd4c
Add categorical mapping
awdavidson Apr 10, 2021
06b250a
Reformat
awdavidson Apr 11, 2021
0c74b95
Remove print statement
awdavidson Apr 11, 2021
0bd6b3e
Remove unused import
awdavidson Apr 11, 2021
e52a4ca
Final tweaks
awdavidson Apr 11, 2021
289b573
Remove unused import
awdavidson Apr 11, 2021
f0697ee
minor cast tweaks
awdavidson Apr 11, 2021
8d206d9
Reformat
awdavidson Apr 11, 2021
ea8fc7f
Fix docstring
awdavidson Apr 12, 2021
7f78833
Fix docstring
awdavidson Apr 12, 2021
a6cd83e
Fix docstring
awdavidson Apr 12, 2021
b57224d
Fix docstring
awdavidson Apr 12, 2021
fe338c8
Fix docstring
awdavidson Apr 12, 2021
491a57a
reformat
awdavidson Apr 12, 2021
3568d0a
reformat
awdavidson Apr 12, 2021
e7957bd
fix docstring
awdavidson Apr 12, 2021
751e77d
Fix docstring
awdavidson Apr 12, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 23 additions & 1 deletion databricks/koalas/indexes/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
#

from functools import partial
from typing import Any, List, Optional, Tuple, Union
from typing import Any, Callable, List, Optional, Tuple, Union
import warnings

import pandas as pd
Expand Down Expand Up @@ -507,6 +507,28 @@ def to_numpy(self, dtype=None, copy=False) -> np.ndarray:
result = result.copy()
return result

def map(
self, mapper: Union[dict, Callable[[Any], Any], dict, pd.Series], na_action: Any = None
) -> "Index":
"""
Map values using input correspondence (a dict, Series, or function).

Parameters
----------
mapper : function, dict, or pd.Series
Mapping correspondence.
na_action : {None, 'ignore'}
Currently not supported

Returns
-------
applied : Index, inferred
The output of the mapping function applied to the index.
"""
from databricks.koalas.indexes.extension import MapExtension

return MapExtension(index=self, na_action=na_action).map(mapper)

@property
def values(self) -> np.ndarray:
"""
Expand Down
81 changes: 78 additions & 3 deletions databricks/koalas/indexes/category.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,11 @@
# limitations under the License.
#
from functools import partial
from typing import Any
from typing import Any, Callable, Union

import pandas as pd
from pandas.api.types import is_hashable
import numpy as np
from pandas.api.types import is_hashable, CategoricalDtype

from databricks import koalas as ks
from databricks.koalas.indexes.base import Index
Expand Down Expand Up @@ -103,7 +104,7 @@ def __new__(cls, data=None, categories=None, ordered=None, dtype=None, copy=Fals
if isinstance(data, (Series, Index)):
if dtype is None:
dtype = "category"
return Index(data, dtype=dtype, copy=copy, name=name)
return Index(data, dtype=dtype, ordered=ordered, copy=copy, name=name)

return ks.from_pandas(
pd.CategoricalIndex(
Expand Down Expand Up @@ -177,6 +178,80 @@ def ordered(self) -> bool:
"""
return self.dtype.ordered

def map(
self, mapper: Union[Callable[[Any], Any], dict, pd.Series], na_action: Any = None
) -> "CategoricalIndex":
"""
Map values using input correspondence (a dict, Series, or function).
Maps the values (their categories, not the codes) of the index to new
categories.
If a `dict` or :class:`~pandas.Series` is used any unmapped category is
mapped to np.nan.
Parameters
----------
mapper : function, dict, or Series
Mapping correspondence.
Returns
-------
CategoricalIndex
Mapped index.
Examples
--------
>>> kidx = ks.CategoricalIndex(['a', 'b', 'c'])
>>> kidx # doctest: +NORMALIZE_WHITESPACE
CategoricalIndex(['a', 'b', 'c'], categories=['a', 'b', 'c'],
ordered=False, dtype='category')
>>> kidx.map(lambda x: x.upper()) # doctest: +NORMALIZE_WHITESPACE
CategoricalIndex(['A', 'B', 'C'], categories=['A', 'B', 'C'],
ordered=False, dtype='category')
>>> kidx.map({'a': 'first', 'b': 'second', 'c': 'third'}) # doctest: +NORMALIZE_WHITESPACE
CategoricalIndex(['first', 'second', 'third'], categories=['first', 'second', 'third'],
ordered=False, dtype='category')
>>> kidx = ks.CategoricalIndex(['a', 'b', 'c'], ordered=True)
>>> kidx # doctest: +NORMALIZE_WHITESPACE
CategoricalIndex(['a', 'b', 'c'], categories=['a', 'b', 'c'],
ordered=True, dtype='category')
>>> kidx.map({'a': 3, 'b': 2, 'c': 1}) # doctest: +NORMALIZE_WHITESPACE
CategoricalIndex([3, 2, 1], categories=[1, 2, 3], ordered=True, dtype='category')
>>> kidx.map({'a': 'first', 'b': 'second', 'c': 'first'}) # doctest: +NORMALIZE_WHITESPACE
CategoricalIndex(['first', 'second', 'first'], categories=['first', 'second'],
ordered=True, dtype='category')
>>> kidx.map({'a': 'first', 'b': 'second'}) # doctest: +NORMALIZE_WHITESPACE
CategoricalIndex(['first', 'second', nan], categories=['first', 'second'],
ordered=True, dtype='category')
"""

from databricks.koalas.indexes.extension import MapExtension, getOrElse

extension_mapper = MapExtension(index=self, na_action=na_action)
return_type = extension_mapper._mapper_return_type(mapper)
# Pull unique elements of series and sort
if isinstance(mapper, pd.Series):
mapper = pd.Series(np.sort(mapper.unique()))

unique_categories = self.dtype.categories.values
pos_dict = {}
for i in range(len(unique_categories)):
if isinstance(mapper, dict):
category = unique_categories[i]
pos_dict[i] = mapper.get(category, np.nan) # type: ignore
elif isinstance(mapper, pd.Series):
pos_dict[i] = getOrElse(mapper, i, return_type)
elif isinstance(mapper, ks.Series):
raise NotImplementedError(
"Currently do not support input of ks.Series in CategoricalIndex.map"
)
else:
pos_dict[i] = mapper(unique_categories[i])

new_index = CategoricalIndex(extension_mapper.map(pos_dict))

return CategoricalIndex(
new_index.astype(
CategoricalDtype(categories=new_index.dtype.categories, ordered=self.ordered)
)
)

def __getattr__(self, item: str) -> Any:
if hasattr(MissingPandasLikeCategoricalIndex, item):
property_or_func = getattr(MissingPandasLikeCategoricalIndex, item)
Expand Down
180 changes: 180 additions & 0 deletions databricks/koalas/indexes/extension.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
#
# Copyright (C) 2019 Databricks, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from datetime import datetime
from typing import Any, Callable, Union

import pandas as pd
import numpy as np

from pyspark.sql.functions import pandas_udf, PandasUDFType

import databricks.koalas as ks
from databricks.koalas.indexes.base import Index
from databricks.koalas.internal import SPARK_DEFAULT_INDEX_NAME
from databricks.koalas.typedef.typehints import as_spark_type, Dtype, Scalar


def getOrElse(input: pd.Series, pos, return_type: Union[Scalar, Dtype], default_value=np.nan):
try:
return input.loc[pos]
except:
if default_value is not np.nan:
return return_type(default_value) # type: ignore
else:
return default_value


# TODO: Implement na_action similar functionality to pandas
# NB: Passing return_type into class cause Serialisation errors; instead pass at method level
class MapExtension:
def __init__(self, index, na_action: Any):
self._index = index
if na_action is not None:
raise NotImplementedError("Currently do not support na_action functionality")
else:
self._na_action = na_action

def map(self, mapper: Union[dict, Callable[[Any], Any], dict, pd.Series]) -> Index:
"""
Single callable/entry point to map Index values

Parameters
----------
mapper: dict, function or pd.Series

Returns
-------
Index

"""
if isinstance(mapper, dict):
idx = self._map_dict(mapper)
elif isinstance(mapper, pd.Series):
idx = self._map_series(mapper)
elif isinstance(mapper, ks.Series):
raise NotImplementedError("Currently do not support input of ks.Series in Index.map")
else:
idx = self._map_lambda(mapper)
return idx

def _map_dict(self, mapper: dict) -> Index:
"""
Helper method that has been isolated to merely help map Index values
when argument in dict type.

Parameters
----------
mapper: dict
Key-value pairs that are used to instruct mapping from index value
to new value

Returns
-------
Index

.. note:: Default return value for missing elements np.nan

"""
return_type = self._mapper_return_type(mapper)

@pandas_udf(as_spark_type(return_type), PandasUDFType.SCALAR)
def pyspark_mapper(col):
return col.apply(lambda i: mapper.get(i, np.nan)) # type: ignore

return self._index._with_new_scol(pyspark_mapper(SPARK_DEFAULT_INDEX_NAME))

def _map_series(self, mapper: pd.Series) -> Index:
"""
Helper method that has been isolated to merely help map an Index values
when argument in pd.Series type.

Parameters
----------
mapper: pandas.Series
Series of (index, value) that is used to instruct mapping from index value
to new value

Returns
-------
Index

.. note:: Default return value for missing elements is np.nan

"""
return_type = self._mapper_return_type(mapper)

@pandas_udf(as_spark_type(return_type), PandasUDFType.SCALAR)
def pyspark_mapper(col):
return col.apply(lambda i: getOrElse(mapper, i, return_type))

return self._index._with_new_scol(pyspark_mapper(SPARK_DEFAULT_INDEX_NAME))

def _map_lambda(self, mapper: Callable[[Any], Any]) -> Index:
"""
Helper method that has been isolated to merely help map Index values when the argument is a
generic lambda function.

Parameters
----------
mapper: function
Generic lambda function to apply to index

Returns
-------
Index

"""
return_type = self._mapper_return_type(mapper)

@pandas_udf(as_spark_type(return_type), PandasUDFType.SCALAR)
def pyspark_mapper(col):
return col.apply(mapper)

return self._index._with_new_scol(scol=pyspark_mapper(SPARK_DEFAULT_INDEX_NAME))

def _mapper_return_type(
self, mapper: Union[dict, Callable[[Any], Any], pd.Series]
) -> Union[Scalar, Dtype]:
"""
Helper method to get the mapper's return type. The return type is required for
the pandas_udf

Parameters
----------
mapper: dict, function or pd.Series

Returns
-------
Scalar or Dtype

"""

if isinstance(mapper, dict):
types = list(set(type(k) for k in mapper.values() if k is not np.nan))
return_type = types[0] if len(types) == 1 else str
elif isinstance(mapper, pd.Series):
# Pandas dtype('O') means pandas str
return_type = str if mapper.dtype == np.dtype("object") else mapper.dtype
else:
if isinstance(self._index, ks.CategoricalIndex):
return_type = self._index.categories.dtype
else:
return_type = type(mapper(self._index.min()))

# Handle pandas Timestamp - map to basic datetime
if return_type == pd._libs.tslibs.timestamps.Timestamp:
return_type = datetime
return return_type
7 changes: 6 additions & 1 deletion databricks/koalas/indexes/multi.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

from distutils.version import LooseVersion
from functools import partial
from typing import Any, Optional, Tuple, Union, cast
from typing import Any, Callable, Optional, Tuple, Union, cast
import warnings

import pandas as pd
Expand Down Expand Up @@ -158,6 +158,11 @@ def _with_new_scol(self, scol: spark.Column, *, dtype=None):
def _align_and_column_op(self, f, *args) -> Index:
raise NotImplementedError("Not supported for type MultiIndex")

def map(
self, mapper: Union[dict, Callable[[Any], Any], dict, pd.Series], na_action: Any = None
) -> "Index":
raise NotImplementedError("Not supported for type MultiIndex")

def any(self, *args, **kwargs) -> None:
raise TypeError("cannot perform any with this index type: MultiIndex")

Expand Down
3 changes: 0 additions & 3 deletions databricks/koalas/missing/indexes.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ class MissingPandasLikeIndex(object):
is_ = _unsupported_function("is_")
is_lexsorted_for_tuple = _unsupported_function("is_lexsorted_for_tuple")
join = _unsupported_function("join")
map = _unsupported_function("map")
putmask = _unsupported_function("putmask")
ravel = _unsupported_function("ravel")
reindex = _unsupported_function("reindex")
Expand Down Expand Up @@ -132,7 +131,6 @@ class MissingPandasLikeCategoricalIndex(MissingPandasLikeIndex):
set_categories = _unsupported_function("set_categories", cls="CategoricalIndex")
as_ordered = _unsupported_function("as_ordered", cls="CategoricalIndex")
as_unordered = _unsupported_function("as_unordered", cls="CategoricalIndex")
map = _unsupported_function("map", cls="CategoricalIndex")


class MissingPandasLikeMultiIndex(object):
Expand Down Expand Up @@ -161,7 +159,6 @@ class MissingPandasLikeMultiIndex(object):
is_lexsorted = _unsupported_function("is_lexsorted")
is_lexsorted_for_tuple = _unsupported_function("is_lexsorted_for_tuple")
join = _unsupported_function("join")
map = _unsupported_function("map")
putmask = _unsupported_function("putmask")
ravel = _unsupported_function("ravel")
reindex = _unsupported_function("reindex")
Expand Down
Loading