Skip to content

Commit

Permalink
[apache#5203] feat(client-python): porting partitions from java client (
Browse files Browse the repository at this point in the history
apache#5964)

### What changes were proposed in this pull request?

Porting `interface Partitions`, `interface IdentityPartition`,
`interface ListPartition`, `interface RangePartition`, and `class
Partitions` from java to python.

Fix: apache#5203 

### Does this PR introduce _any_ user-facing change?

Yes.

### How was this patch tested?

Unit tests.
  • Loading branch information
unknowntpo authored Jan 9, 2025
1 parent 31a60e5 commit e9d8ee7
Show file tree
Hide file tree
Showing 6 changed files with 536 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from abc import abstractmethod
from typing import List, Any

from .partition import Partition
from ..literals.literal import Literal


class IdentityPartition(Partition):
"""
An identity partition represents a result of identity partitioning. For example, for Hive
partition
```
PARTITION (dt='2008-08-08',country='us')
```
its partition name is "dt=2008-08-08/country=us", field names are [["dt"], ["country"]] and
values are ["2008-08-08", "us"].
APIs that are still evolving towards becoming stable APIs, and can change from one feature release to another (0.5.0 to 0.6.0).
"""

@abstractmethod
def field_names(self) -> List[List[str]]:
"""
Returns:
List[List[str]]: A list of lists representing the field names of the identity partition.
"""
pass

@abstractmethod
def values(self) -> List[Literal[Any]]:
"""
Returns:
List[Literal[Any]]: The values of the identity partition.
"""
pass
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from abc import abstractmethod
from typing import List, Any

from gravitino.api.expressions.literals.literal import Literal
from gravitino.api.expressions.partitions.partition import Partition


class ListPartition(Partition):
"""
A list partition represents a result of list partitioning. For example, for list partition
```
PARTITION p202204_California VALUES IN (
("2022-04-01", "Los Angeles"),
("2022-04-01", "San Francisco")
)
```
its name is "p202204_California" and lists are [["2022-04-01","Los Angeles"], ["2022-04-01", "San Francisco"]].
APIs that are still evolving towards becoming stable APIs, and can change from one feature release to another (0.5.0 to 0.6.0).
"""

@abstractmethod
def lists(self) -> List[List[Literal[Any]]]:
"""
Returns:
List[List[Literal[Any]]]: The values of the list partition.
"""
pass
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from abc import ABC, abstractmethod
from typing import Dict


class Partition(ABC):
"""
A partition represents a result of partitioning a table. The partition can be either a
`IdentityPartition`, `ListPartition`, or `RangePartition`. It depends on the `Table.partitioning()`.
APIs that are still evolving towards becoming stable APIs, and can change from one feature release to another (0.5.0 to 0.6.0).
"""

@abstractmethod
def name(self) -> str:
"""
Returns:
str: The name of the partition.
"""
pass

@abstractmethod
def properties(self) -> Dict[str, str]:
"""
Returns:
Dict[str, str]: The properties of the partition, such as statistics, location, etc.
"""
pass
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from typing import List, Dict, Any, Optional

from gravitino.api.expressions.literals.literal import Literal
from gravitino.api.expressions.partitions.identity_partition import IdentityPartition
from gravitino.api.expressions.partitions.list_partition import ListPartition
from gravitino.api.expressions.partitions.partition import Partition
from gravitino.api.expressions.partitions.range_partition import RangePartition


class Partitions:
"""The helper class for partition expressions."""

EMPTY_PARTITIONS: List[Partition] = []
"""
An empty array of partitions
"""

@staticmethod
def range(
name: str,
upper: Literal[Any],
lower: Literal[Any],
properties: Optional[Dict[str, str]],
) -> RangePartition:
"""
Creates a range partition.
Args:
name: The name of the partition.
upper: The upper bound of the partition.
lower: The lower bound of the partition.
properties: The properties of the partition.
Returns:
The created partition.
"""
return RangePartitionImpl(name, upper, lower, properties)

@staticmethod
def list(
name: str,
lists: List[List[Literal[Any]]],
properties: Optional[Dict[str, str]],
) -> ListPartition:
"""
Creates a list partition.
Args:
name: The name of the partition.
lists: The values of the list partition.
properties: The properties of the partition.
Returns:
The created partition.
"""
return ListPartitionImpl(name, lists, properties or {})

@staticmethod
def identity(
name: Optional[str],
field_names: List[List[str]],
values: List[Literal[Any]],
properties: Optional[Dict[str, str]] = None,
) -> IdentityPartition:
"""
Creates an identity partition.
The `values` must correspond to the `field_names`.
Args:
name: The name of the partition.
field_names: The field names of the identity partition.
values: The value of the identity partition.
properties: The properties of the partition.
Returns:
The created partition.
"""
return IdentityPartitionImpl(name, field_names, values, properties or {})


class RangePartitionImpl(RangePartition):
"""
Represents a result of range partitioning.
"""

def __init__(
self,
name: str,
upper: Literal[Any],
lower: Literal[Any],
properties: Optional[Dict[str, str]],
):
self._name = name
self._upper = upper
self._lower = lower
self._properties = properties

def upper(self) -> Literal[Any]:
"""Returns the upper bound of the partition."""
return self._upper

def lower(self) -> Literal[Any]:
"""Returns the lower bound of the partition."""
return self._lower

def name(self) -> str:
return self._name

def properties(self) -> Dict[str, str]:
return self._properties

def __eq__(self, other: Any) -> bool:
if not isinstance(other, RangePartitionImpl):
return False
return (
self._name == other._name
and self._upper == other._upper
and self._lower == other._lower
and self._properties == other._properties
)

def __hash__(self) -> int:
return hash(
(self._name, self._upper, self._lower, frozenset(self._properties.items()))
)


class ListPartitionImpl(ListPartition):
def __init__(
self,
name: str,
lists: List[List[Literal[Any]]],
properties: Optional[Dict[str, str]],
):
self._name = name
self._lists = lists
self._properties = properties

def lists(self) -> List[List[Literal[Any]]]:
"""Returns the values of the list partition."""
return self._lists

def name(self) -> str:
return self._name

def properties(self) -> Dict[str, str]:
return self._properties

def __eq__(self, other: Any) -> bool:
if not isinstance(other, ListPartitionImpl):
return False
return (
self._name == other._name
and self._lists == other._lists
and self._properties == other._properties
)

def __hash__(self) -> int:
return hash(
(
self._name,
tuple(tuple(l) for l in self._lists),
frozenset(self._properties.items()),
)
)


class IdentityPartitionImpl(IdentityPartition):
def __init__(
self,
name: str,
field_names: List[List[str]],
values: List[Literal[Any]],
properties: Dict[str, str],
):
self._name = name
self._field_names = field_names
self._values = values
self._properties = properties

def field_names(self) -> List[List[str]]:
"""Returns the field names of the identity partition."""
return self._field_names

def values(self) -> List[Literal[Any]]:
"""Returns the values of the identity partition."""
return self._values

def name(self) -> str:
return self._name

def properties(self) -> Dict[str, str]:
return self._properties

def __eq__(self, other: Any) -> bool:
if not isinstance(other, IdentityPartitionImpl):
return False
return (
self._name == other._name
and self._field_names == other._field_names
and self._values == other._values
and self._properties == other._properties
)

def __hash__(self) -> int:
return hash(
(
self._name,
tuple(tuple(fn) for fn in self._field_names),
tuple(self._values),
frozenset(self._properties.items()),
)
)
Loading

0 comments on commit e9d8ee7

Please sign in to comment.