Skip to content

Commit

Permalink
[computes] fixed session-type extraction for connectors
Browse files Browse the repository at this point in the history
The problem is that connector based query execution is not able to
reuse session to fetch results. The frontend is sending the correct
session_id but our session fetching logic got broken when the computes
was implemented. we are now looking for the session_type from
compute['name'] for computes, connector['name'] for connector and
then snippets['type'] for old config file based hive/impala sessions.

A related change is to make use of session for get_log and check_status
calls if the frontend is sending it.
  • Loading branch information
amitsrivastava committed Dec 18, 2024
1 parent ca87369 commit 79428fb
Show file tree
Hide file tree
Showing 7 changed files with 40 additions and 33 deletions.
27 changes: 17 additions & 10 deletions apps/beeswax/src/beeswax/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,15 +91,27 @@ def tokenize_and_convert(item, key=None):
return sorted(collection, key=lambda i: tokenize_and_convert(i, key=key))


def is_compute(cluster):
def find_compute_in_cluster(cluster):
if not cluster:
return False
return None
connector = cluster.get('connector')
compute = cluster.get('compute')

def compute_check(x):
def _compute_check(x):
return x and x.get('type') in COMPUTE_TYPES
return compute_check(cluster) or compute_check(connector) or compute_check(compute)

return (
cluster if _compute_check(cluster)
else compute if _compute_check(compute)
else connector if _compute_check(connector) else None)


def extract_session_type(snippet):
compute = find_compute_in_cluster(snippet)
if compute and compute.get('name'):
return compute['name']
return snippet.get('type') if snippet else None



'''
Expand All @@ -119,13 +131,8 @@ def find_compute(cluster=None, user=None, dialect=None, namespace_id=None):
connector = cluster.get('connector')
compute = cluster.get('compute')

def compute_check(x):
return x and x.get('type') in COMPUTE_TYPES

# Pick the most probable compute object
selected_compute = (cluster if compute_check(cluster)
else compute if compute_check(compute)
else connector if compute_check(connector) else None)
selected_compute = find_compute_in_cluster(cluster)

# If found, we will attempt to reload it, first by id then by name
if selected_compute:
Expand Down
12 changes: 6 additions & 6 deletions apps/beeswax/src/beeswax/server/dbms.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from kazoo.client import KazooClient

from azure.abfs import abfspath
from beeswax.common import apply_natural_sort, is_compute
from beeswax.common import apply_natural_sort, find_compute_in_cluster
from beeswax.conf import (
APPLY_NATURAL_SORT_MAX,
AUTH_PASSWORD,
Expand Down Expand Up @@ -164,7 +164,7 @@ def get(user, query_server=None, cluster=None):


def get_query_server_config(name='beeswax', connector=None):
if connector and (has_connectors() or is_compute(connector)):
if connector and (has_connectors() or find_compute_in_cluster(connector)):
LOG.debug("Query via connector %s (%s)" % (name, connector.get('type')))
query_server = get_query_server_config_via_connector(connector)
else:
Expand Down Expand Up @@ -1042,14 +1042,14 @@ def use(self, database, session=None):
query = hql_query('USE `%s`' % database)
return self.client.use(query, session=session)

def get_log(self, query_handle, start_over=True):
return self.client.get_log(query_handle, start_over)
def get_log(self, query_handle, start_over=True, session=None):
return self.client.get_log(query_handle, start_over, session=session)

def get_state(self, handle):
return self.client.get_state(handle)

def get_operation_status(self, handle):
return self.client.get_operation_status(handle)
def get_operation_status(self, handle, session=None):
return self.client.get_operation_status(handle, session=session)

def execute_and_wait(self, query, timeout_sec=30.0, sleep_interval=0.5):
"""
Expand Down
2 changes: 1 addition & 1 deletion apps/beeswax/src/beeswax/server/hive_metastore_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ def get_state(self, handle):
def close(self, handle):
pass

def get_operation_status(self, handle):
def get_operation_status(self, handle, session=None):
return MockFinishedOperation()

def get_default_configuration(self, *args, **kwargs):
Expand Down
18 changes: 9 additions & 9 deletions apps/beeswax/src/beeswax/server/hive_server2_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -1103,18 +1103,18 @@ def fetch_log(self, operation_handle, orientation=TFetchOrientation.FETCH_NEXT,

return '\n'.join(lines)

def get_operation_status(self, operation_handle):
def get_operation_status(self, operation_handle, session=None):
req = TGetOperationStatusReq(operationHandle=operation_handle)
(res, session) = self.call(self._client.GetOperationStatus, req)
(res, session) = self.call(self._client.GetOperationStatus, req, session=session)
return res

def get_log(self, operation_handle):
def get_log(self, operation_handle, session=None):
try:
req = TGetLogReq(operationHandle=operation_handle)
(res, session) = self.call(self._client.GetLog, req)
(res, session) = self.call(self._client.GetLog, req, session=session)
return res.log
except Exception as e:
if 'Invalid query handle' in str(e):
if 'Invalid query handle' in str(e) or 'Invalid or unknown query handle' in str(e):
message = 'Invalid query handle'
LOG.error('%s: %s' % (message, e))
else:
Expand Down Expand Up @@ -1436,9 +1436,9 @@ def get_state(self, handle):
res = self._client.get_operation_status(operationHandle)
return HiveServerQueryHistory.STATE_MAP[res.operationState]

def get_operation_status(self, handle):
def get_operation_status(self, handle, session=None):
operationHandle = handle.get_rpc_handle()
return self._client.get_operation_status(operationHandle)
return self._client.get_operation_status(operationHandle, session=session)

def use(self, query, session=None):
data = self._client.execute_query(query, session=session)
Expand Down Expand Up @@ -1482,11 +1482,11 @@ def close_session(self, session):
def dump_config(self):
return 'Does not exist in HS2'

def get_log(self, handle, start_over=True):
def get_log(self, handle, start_over=True, session=None):
operationHandle = handle.get_rpc_handle()

if beeswax_conf.USE_GET_LOG_API.get() or self.query_server.get('dialect') == 'impala':
return self._client.get_log(operationHandle)
return self._client.get_log(operationHandle, session=session)
else:
if start_over:
orientation = TFetchOrientation.FETCH_FIRST
Expand Down
3 changes: 2 additions & 1 deletion apps/jobbrowser/src/jobbrowser/apis/query_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from babel import localtime
from django.utils.translation import gettext as _

from beeswax.common import extract_session_type
from desktop.lib import export_csvxls
from impala.conf import COORDINATOR_UI_SPNEGO
from jobbrowser.apis.base_api import Api
Expand All @@ -54,7 +55,7 @@ def _get_api(user, cluster=None):
server_url = compute['options'].get('api_url')
else:
# TODO: multi computes if snippet.get('compute') or snippet['type'] has computes
application = cluster['compute']['type'] if cluster.get('compute') else cluster.get('interface', 'impala')
application = extract_session_type(cluster) or 'impala'
session = Session.objects.get_session(user, application=application)
server_url = _get_impala_server_url(session)
return get_impalad_api(user=user, url=server_url)
Expand Down
6 changes: 3 additions & 3 deletions desktop/libs/notebook/src/notebook/connectors/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from django.utils.encoding import smart_str
from django.utils.translation import gettext as _

from beeswax.common import find_compute, is_compute
from beeswax.common import find_compute, find_compute_in_cluster
from desktop.auth.backend import is_admin
from desktop.conf import TASK_SERVER, has_connectors, is_cdw_compute_enabled
from desktop.lib import export_csvxls
Expand Down Expand Up @@ -402,7 +402,7 @@ def patch_snippet_for_connector(snippet, user=None):
Connector backward compatibility switcher.
# TODO Connector unification
"""
if is_compute(snippet):
if find_compute_in_cluster(snippet):
snippet['connector'] = find_compute(cluster=snippet, user=user)
if snippet['connector'] and snippet['connector'].get('dialect'):
snippet['dialect'] = snippet['connector']['dialect']
Expand Down Expand Up @@ -433,7 +433,7 @@ def get_api(request, snippet):
if has_connectors() and snippet.get('type') == 'hello' and is_admin(request.user):
LOG.debug('Using the interpreter from snippet')
interpreter = snippet.get('interpreter')
elif is_cdw_compute_enabled():
elif find_compute_in_cluster(snippet):
LOG.debug("Finding the compute from db using snippet: %s" % snippet)
interpreter = find_compute(cluster=snippet, user=request.user)
if interpreter is None:
Expand Down
5 changes: 2 additions & 3 deletions desktop/libs/notebook/src/notebook/connectors/hiveserver2.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
from django.urls import reverse
from django.utils.translation import gettext as _

from beeswax.common import is_compute
from beeswax.common import extract_session_type
from desktop.auth.backend import is_admin
from desktop.conf import USE_DEFAULT_CONFIGURATION, has_connectors
from desktop.lib.conf import BoundConfig
Expand Down Expand Up @@ -321,8 +321,7 @@ def execute(self, notebook, snippet):
db = self._get_db(snippet, interpreter=self.interpreter)

statement = self._get_current_statement(notebook, snippet)
compute = snippet.get('compute', {})
session_type = compute['name'] if is_compute(snippet) and compute.get('name') else snippet['type']
session_type = extract_session_type(snippet)
session = self._get_session(notebook, session_type)

query = self._prepare_hql_query(snippet, statement['statement'], session)
Expand Down

0 comments on commit 79428fb

Please sign in to comment.