Skip to content

Commit

Permalink
Fusion iceberg egress (#42)
Browse files Browse the repository at this point in the history
* Add egress SQL commands

* Add line-continuations to Fusion SQL

* Fix where clause

* Rename egress module

* Add management API classes; add storage profile commands

* Add tags

* Fix name

* Fix name

* Change catalog and storage profiles

* Update create egress syntax

* Refactor egress Fusion SQL commands

* Assign workspace group manager

* Fix region

* Fix f-strings

* Fix casing

* Fix casing

* Use region ID

* Add region codes

* Add region codes

* Finish implementation of egress Fusion SQL commands

* Use region name as-is

* Change to START EGRESS

* Add egress status

* Fix URL

* Fix parameter

* Update help text

* Update grammar

* Rename egress to export

* Fix doc

* Remove unneeded parameters

* Remove AWS codes

* Add debugging info

* Add status message

* Fix jobs Fusion SQL commands
  • Loading branch information
kesmit13 authored Nov 18, 2024
1 parent 0e2c713 commit 05796db
Show file tree
Hide file tree
Showing 5 changed files with 619 additions and 26 deletions.
86 changes: 68 additions & 18 deletions singlestoredb/fusion/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from typing import Iterable
from typing import List
from typing import Optional
from typing import Set
from typing import Tuple

from parsimonious import Grammar
Expand All @@ -23,16 +24,20 @@

CORE_GRAMMAR = r'''
ws = ~r"(\s+|(\s*/\*.*\*/\s*)+)"
qs = ~r"\"([^\"]*)\"|'([^\']*)'|`([^\`]*)`|([A-Za-z0-9_\-\.]+)"
number = ~r"[-+]?(\d*\.)?\d+(e[-+]?\d+)?"i
integer = ~r"-?\d+"
qs = ~r"\"([^\"]*)\"|'([^\']*)'|([A-Za-z0-9_\-\.]+)|`([^\`]+)`" ws*
number = ~r"[-+]?(\d*\.)?\d+(e[-+]?\d+)?"i ws*
integer = ~r"-?\d+" ws*
comma = ws* "," ws*
eq = ws* "=" ws*
open_paren = ws* "(" ws*
close_paren = ws* ")" ws*
open_repeats = ws* ~r"[\(\[\{]" ws*
close_repeats = ws* ~r"[\)\]\}]" ws*
select = ~r"SELECT"i ws+ ~r".+" ws*
table = ~r"(?:([A-Za-z0-9_\-]+)|`([^\`]+)`)(?:\.(?:([A-Za-z0-9_\-]+)|`([^\`]+)`))?" ws*
column = ~r"(?:([A-Za-z0-9_\-]+)|`([^\`]+)`)(?:\.(?:([A-Za-z0-9_\-]+)|`([^\`]+)`))?" ws*
link_name = ~r"(?:([A-Za-z0-9_\-]+)|`([^\`]+)`)(?:\.(?:([A-Za-z0-9_\-]+)|`([^\`]+)`))?" ws*
catalog_name = ~r"(?:([A-Za-z0-9_\-]+)|`([^\`]+)`)(?:\.(?:([A-Za-z0-9_\-]+)|`([^\`]+)`))?" ws*
json = ws* json_object ws*
json_object = ~r"{\s*" json_members? ~r"\s*}"
Expand Down Expand Up @@ -65,6 +70,10 @@
'<integer>': '',
'<number>': '',
'<json>': '',
'<table>': '',
'<column>': '',
'<catalog-name>': '',
'<link-name>': '',
}

BUILTIN_DEFAULTS = { # type: ignore
Expand Down Expand Up @@ -226,9 +235,13 @@ def build_syntax(grammar: str) -> str:
# Split on ';' on a line by itself
cmd, end = grammar.split(';', 1)

rules = {}
name = ''
rules: Dict[str, Any] = {}
for line in end.split('\n'):
line = line.strip()
if line.startswith('&'):
rules[name] += '\n' + line
continue
if not line:
continue
name, value = line.split('=', 1)
Expand All @@ -239,10 +252,16 @@ def build_syntax(grammar: str) -> str:
while re.search(r' [a-z0-9_]+\b', cmd):
cmd = re.sub(r' ([a-z0-9_]+)\b', functools.partial(expand_rules, rules), cmd)

def add_indent(m: Any) -> str:
return ' ' + (len(m.group(1)) * ' ')

# Indent line-continuations
cmd = re.sub(r'^(\&+)\s*', add_indent, cmd, flags=re.M)

cmd = textwrap.dedent(cmd).rstrip() + ';'
cmd = re.sub(r' +', ' ', cmd)
cmd = re.sub(r'^ ', ' ', cmd, flags=re.M)
cmd = re.sub(r'\s+,\.\.\.', ',...', cmd)
cmd = re.sub(r'(\S) +', r'\1 ', cmd)
cmd = re.sub(r'<comma>', ',', cmd)
cmd = re.sub(r'\s+,\s*\.\.\.', ',...', cmd)

return cmd

Expand Down Expand Up @@ -399,9 +418,15 @@ def process_grammar(
help_txt = build_help(syntax_txt, full_grammar)
grammar = build_cmd(grammar)

# Remove line-continuations
grammar = re.sub(r'\n\s*&+', r'', grammar)

# Make sure grouping characters all have whitespace around them
grammar = re.sub(r' *(\[|\{|\||\}|\]) *', r' \1 ', grammar)

grammar = re.sub(r'\(', r' open_paren ', grammar)
grammar = re.sub(r'\)', r' close_paren ', grammar)

for line in grammar.split('\n'):
if not line.strip():
continue
Expand All @@ -418,7 +443,7 @@ def process_grammar(
sql = re.sub(r'\]\s+\[', r' | ', sql)

# Lower-case keywords and make them case-insensitive
sql = re.sub(r'(\b|@+)([A-Z0-9]+)\b', lower_and_regex, sql)
sql = re.sub(r'(\b|@+)([A-Z0-9_]+)\b', lower_and_regex, sql)

# Convert literal strings to 'qs'
sql = re.sub(r"'[^']+'", r'qs', sql)
Expand Down Expand Up @@ -461,12 +486,18 @@ def process_grammar(
sql = re.sub(r'\s+ws$', r' ws*', sql)
sql = re.sub(r'\s+ws\s+\(', r' ws* (', sql)
sql = re.sub(r'\)\s+ws\s+', r') ws* ', sql)
sql = re.sub(r'\s+ws\s+', r' ws+ ', sql)
sql = re.sub(r'\s+ws\s+', r' ws* ', sql)
sql = re.sub(r'\?\s+ws\+', r'? ws*', sql)

# Remove extra ws around eq
sql = re.sub(r'ws\+\s*eq\b', r'eq', sql)

# Remove optional groupings when mandatory groupings are specified
sql = re.sub(r'open_paren\s+ws\*\s+open_repeats\?', r'open_paren', sql)
sql = re.sub(r'close_repeats\?\s+ws\*\s+close_paren', r'close_paren', sql)
sql = re.sub(r'open_paren\s+open_repeats\?', r'open_paren', sql)
sql = re.sub(r'close_repeats\?\s+close_paren', r'close_paren', sql)

out.append(f'{op} = {sql}')

for k, v in list(rules.items()):
Expand Down Expand Up @@ -548,6 +579,7 @@ class SQLHandler(NodeVisitor):

def __init__(self, connection: Connection):
self.connection = connection
self._handled: Set[str] = set()

@classmethod
def compile(cls, grammar: str = '') -> None:
Expand Down Expand Up @@ -614,12 +646,16 @@ def execute(self, sql: str) -> result.FusionSQLResult:
)

type(self).compile()
self._handled = set()
try:
params = self.visit(type(self).grammar.parse(sql))
for k, v in params.items():
params[k] = self.validate_rule(k, v)

res = self.run(params)

self._handled = set()

if res is not None:
res.format_results(self.connection)
return res
Expand Down Expand Up @@ -666,16 +702,20 @@ def visit_qs(self, node: Node, visited_children: Iterable[Any]) -> Any:
"""Quoted strings."""
if node is None:
return None
return node.match.group(1) or node.match.group(2) or \
node.match.group(3) or node.match.group(4)
return flatten(visited_children)[0]

def visit_compound(self, node: Node, visited_children: Iterable[Any]) -> Any:
"""Compound name."""
print(visited_children)
return flatten(visited_children)[0]

def visit_number(self, node: Node, visited_children: Iterable[Any]) -> Any:
"""Numeric value."""
return float(node.match.group(0))
return float(flatten(visited_children)[0])

def visit_integer(self, node: Node, visited_children: Iterable[Any]) -> Any:
"""Integer value."""
return int(node.match.group(0))
return int(flatten(visited_children)[0])

def visit_ws(self, node: Node, visited_children: Iterable[Any]) -> Any:
"""Whitespace and comments."""
Expand Down Expand Up @@ -804,19 +844,29 @@ def generic_visit(self, node: Node, visited_children: Iterable[Any]) -> Any:
if node.expr_name.endswith('_cmd'):
final = merge_dicts(flatten(visited_children)[n_keywords:])
for k, v in type(self).rule_info.items():
if k.endswith('_cmd') or k.endswith('_'):
if k.endswith('_cmd') or k.endswith('_') or k.startswith('_'):
continue
if k not in final:
if k not in final and k not in self._handled:
final[k] = BUILTIN_DEFAULTS.get(k, v['default'])
return final

# Filter out stray empty strings
out = [x for x in flatten(visited_children)[n_keywords:] if x]

if repeats or len(out) > 1:
return {node.expr_name: out}
# Remove underscore prefixes from rule name
key_name = re.sub(r'^_+', r'', node.expr_name)

return {node.expr_name: out[0] if out else True}
if repeats or len(out) > 1:
self._handled.add(node.expr_name)
# If all outputs are dicts, merge them
if len(out) > 1 and not repeats:
is_dicts = [x for x in out if isinstance(x, dict)]
if len(is_dicts) == len(out):
return {key_name: merge_dicts(out)}
return {key_name: out}

self._handled.add(node.expr_name)
return {key_name: out[0] if out else True}

if hasattr(node, 'match'):
if not visited_children and not node.match.groups():
Expand Down
Loading

0 comments on commit 05796db

Please sign in to comment.