Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Draft] Add a context manager for spark session #123

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 66 additions & 0 deletions spark8t/session.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import pyspark
from lightkube import Client
from lightkube.core.exceptions import ApiError
from spark8t.services import K8sServiceAccountRegistry, LightKube
import socket

class SparkSession():

def __init__(self, app_name: str, namespace: str, username: str):
self.app_name = app_name
self.namespace = namespace
self.username = username
self.session = None

@property
def _pod_ip(self, ):
return socket.gethostbyname(socket.gethostname())


@property
def service_account(self, ):
interface = LightKube(None, None)
registry = K8sServiceAccountRegistry(interface)
try:
return registry.get(f"{self.namespace}:{self.username}")
except (ApiError, AttributeError):
return None

@property
def _sa_props(self, ):
if self.service_account is None:
return {}
return self.service_account.configurations.props

@property
def _extra_props(self, ) -> dict:
return {
"spark.driver.host": self._pod_ip
}

@property
def _k8s_master(self, ) -> str:
return Client().config.cluster.server

@property
def config(self, ) -> dict:
return self._sa_props | self._extra_props

def __enter__(self, ):
if self.session is not None:
return self.session

builder = pyspark.sql.SparkSession\
.builder\
.appName(self.app_name)\
.master(f"k8s://{self._k8s_master}")

for conf, val in self.config.items():
builder = builder.config(conf, val)
self.session = builder.getOrCreate()
return self.session


def __exit__(self, *args, **kwargs):
if self.session is not None:
self.session.stop()
Loading