Skip to content

Commit

Permalink
feat: formula queries in EndpointTraceItemTable (#6844)
Browse files Browse the repository at this point in the history
This PR implements support for formulas in the TraceItemTable endpoint.
It is relevant to this ticket
https://github.com/orgs/getsentry/projects/284/views/1?pane=issue&itemId=85243940&issue=getsentry%7Ceap-planning%7C27
It enables queries such as `sum(my_attribute) / count(my_attribute)`
which werent possible before

## Major changes
* the column to expression conversion logic used to take place
[here](https://github.com/getsentry/snuba/pull/6844/files#diff-e1e06d7f875a7c2870cc11bc4301dd6ab9fba73263c76260452c0b3176f66110L141-L187)
I extracted this logic into 2 separate new functions:
`_get_reliability_context_columns` and `_column_to_expression`. (i.e.
the logic stayed the same but its not inside these functions)
* I then extended `_column_to_expression` to support formulas
https://github.com/getsentry/snuba/pull/6844/files#diff-e1e06d7f875a7c2870cc11bc4301dd6ab9fba73263c76260452c0b3176f66110R180-R191
* I also had to extend the existing result conversion and order by logic
to support formulas
https://github.com/getsentry/snuba/pull/6844/files#diff-e1e06d7f875a7c2870cc11bc4301dd6ab9fba73263c76260452c0b3176f66110R287-R288

## Testing
I wrote 3 new tests for this feature: 
* one that tests a simple formula on aggregates `sum(my_attribute) /
count(my_attribute)`
* one that is the same as above uses extrapolation, to ensure formulas
work with extrapolation.
* one that tests formulas on attributes without aggregation
`my_attribute + my_other_attribute`

## design decisions
* reliabilities dont work with formulas, if you do a formula on
extrapolated aggregates you will not get any reliability information
back. This decision was made because of the increased complexity it
would add to support. If needed we can implement support for this in the
future.
* I realized while implementing this that formulas using constants are
not supported such as `my_attribute * 10` if we need support for this it
must be implemented as a follow up. and will require further
modification of our protobuf grammar.
* it only supports spans not uptime or logs
  • Loading branch information
kylemumma authored Feb 4, 2025
1 parent ef5469f commit 8c1787b
Show file tree
Hide file tree
Showing 7 changed files with 332 additions and 50 deletions.
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ python-rapidjson==1.8
redis==4.5.4
sentry-arroyo==2.19.12
sentry-kafka-schemas==0.1.129
sentry-protos==0.1.57
sentry-protos==0.1.58
sentry-redis-tools==0.3.0
sentry-relay==0.9.5
sentry-sdk==2.18.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
extract_response_meta,
setup_trace_query_settings,
)
from snuba.web.rpc.common.exceptions import BadSnubaRPCRequestException
from snuba.web.rpc.v1.resolvers import ResolverTimeSeries
from snuba.web.rpc.v1.resolvers.R_eap_spans.common.aggregation import (
ExtrapolationContext,
Expand Down Expand Up @@ -297,6 +298,9 @@ def trace_item_type(cls) -> TraceItemType.ValueType:
return TraceItemType.TRACE_ITEM_TYPE_SPAN

def resolve(self, in_msg: TimeSeriesRequest) -> TimeSeriesResponse:
if len(in_msg.expressions) > 0:
raise BadSnubaRPCRequestException("expressions field not yet implemented")

snuba_request = _build_snuba_request(in_msg)
res = run_query(
dataset=PluggableDataset(name="eap", all_entities=[]),
Expand Down
139 changes: 91 additions & 48 deletions snuba/web/rpc/v1/resolvers/R_eap_spans/resolver_trace_item_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from sentry_protos.snuba.v1.endpoint_trace_item_table_pb2 import (
AggregationComparisonFilter,
AggregationFilter,
Column,
TraceItemColumnValues,
TraceItemTableRequest,
TraceItemTableResponse,
Expand Down Expand Up @@ -55,6 +56,13 @@

_DEFAULT_ROW_LIMIT = 10_000

OP_TO_EXPR = {
Column.BinaryFormula.OP_ADD: f.plus,
Column.BinaryFormula.OP_SUBTRACT: f.minus,
Column.BinaryFormula.OP_MULTIPLY: f.multiply,
Column.BinaryFormula.OP_DIVIDE: f.divide,
}


def aggregation_filter_to_expression(agg_filter: AggregationFilter) -> Expression:
op_to_expr = {
Expand Down Expand Up @@ -125,9 +133,81 @@ def _convert_order_by(
expression=aggregation_to_expression(x.column.aggregation),
)
)
elif x.column.HasField("formula"):
res.append(
OrderBy(
direction=direction,
expression=_formula_to_expression(x.column.formula),
)
)
return res


def _get_reliability_context_columns(column: Column) -> list[SelectedExpression]:
"""
extrapolated aggregates need to request extra columns to calculate the reliability of the result.
this function returns the list of columns that need to be requested.
"""
if not column.HasField("aggregation"):
return []

if (
column.aggregation.extrapolation_mode
== ExtrapolationMode.EXTRAPOLATION_MODE_SAMPLE_WEIGHTED
):
context_columns = []
confidence_interval_column = get_confidence_interval_column(column.aggregation)
if confidence_interval_column is not None:
context_columns.append(
SelectedExpression(
name=confidence_interval_column.alias,
expression=confidence_interval_column,
)
)

average_sample_rate_column = get_average_sample_rate_column(column.aggregation)
count_column = get_count_column(column.aggregation)
context_columns.append(
SelectedExpression(
name=average_sample_rate_column.alias,
expression=average_sample_rate_column,
)
)
context_columns.append(
SelectedExpression(name=count_column.alias, expression=count_column)
)
return context_columns
return []


def _formula_to_expression(formula: Column.BinaryFormula) -> Expression:
return OP_TO_EXPR[formula.op](
_column_to_expression(formula.left),
_column_to_expression(formula.right),
)


def _column_to_expression(column: Column) -> Expression:
"""
Given a column protobuf object, translates it into a Expression object and returns it.
"""
if column.HasField("key"):
return attribute_key_to_expression(column.key)
elif column.HasField("aggregation"):
function_expr = aggregation_to_expression(column.aggregation)
# aggregation label may not be set and the column label takes priority anyways.
function_expr = replace(function_expr, alias=column.label)
return function_expr
elif column.HasField("formula"):
formula_expr = _formula_to_expression(column.formula)
formula_expr = replace(formula_expr, alias=column.label)
return formula_expr
else:
raise BadSnubaRPCRequestException(
"Column is not one of: aggregate, attribute key, or formula"
)


def _build_query(request: TraceItemTableRequest) -> Query:
# TODO: This is hardcoded still
entity = Entity(
Expand All @@ -138,54 +218,15 @@ def _build_query(request: TraceItemTableRequest) -> Query:

selected_columns = []
for column in request.columns:
if column.HasField("key"):
key_col = attribute_key_to_expression(column.key)
# The key_col expression alias may differ from the column label. That is okay
# the attribute key name is used in the groupby, the column label is just the name of
# the returned attribute value
selected_columns.append(
SelectedExpression(name=column.label, expression=key_col)
)
elif column.HasField("aggregation"):
function_expr = aggregation_to_expression(column.aggregation)
# aggregation label may not be set and the column label takes priority anyways.
function_expr = replace(function_expr, alias=column.label)
selected_columns.append(
SelectedExpression(name=column.label, expression=function_expr)
)

if (
column.aggregation.extrapolation_mode
== ExtrapolationMode.EXTRAPOLATION_MODE_SAMPLE_WEIGHTED
):
confidence_interval_column = get_confidence_interval_column(
column.aggregation
)
if confidence_interval_column is not None:
selected_columns.append(
SelectedExpression(
name=confidence_interval_column.alias,
expression=confidence_interval_column,
)
)

average_sample_rate_column = get_average_sample_rate_column(
column.aggregation
)
count_column = get_count_column(column.aggregation)
selected_columns.append(
SelectedExpression(
name=average_sample_rate_column.alias,
expression=average_sample_rate_column,
)
)
selected_columns.append(
SelectedExpression(name=count_column.alias, expression=count_column)
)
else:
raise BadSnubaRPCRequestException(
"Column is neither an aggregate or an attribute"
# The key_col expression alias may differ from the column label. That is okay
# the attribute key name is used in the groupby, the column label is just the name of
# the returned attribute value
selected_columns.append(
SelectedExpression(
name=column.label, expression=_column_to_expression(column)
)
)
selected_columns.extend(_get_reliability_context_columns(column))

res = Query(
from_clause=entity,
Expand Down Expand Up @@ -255,9 +296,11 @@ def _convert_results(
converters[column.label] = lambda x: AttributeValue(val_double=float(x))
elif column.HasField("aggregation"):
converters[column.label] = lambda x: AttributeValue(val_double=float(x))
elif column.HasField("formula"):
converters[column.label] = lambda x: AttributeValue(val_double=float(x))
else:
raise BadSnubaRPCRequestException(
"column is neither an attribute or aggregation"
"column is not one of: attribute, aggregation, or formula"
)

res: defaultdict[str, TraceItemColumnValues] = defaultdict(TraceItemColumnValues)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ def _build_query(request: TraceItemTableRequest) -> Query:
)
else:
raise BadSnubaRPCRequestException(
"requested attribute is not a column (aggregation not supported for logs)"
"requested attribute is not a column (aggregation and formulanot supported for logs)"
)

res = Query(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,10 @@ def _build_query(request: TraceItemTableRequest) -> Query:
selected_columns.append(
SelectedExpression(name=column.label, expression=function_expr)
)
elif column.HasField("formula"):
raise BadSnubaRPCRequestException(
"formulas are not supported for uptime checks"
)
else:
raise BadSnubaRPCRequestException(
"Column is neither an aggregate or an attribute"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2307,6 +2307,161 @@ def test_sparse_aggregate(self, setup_teardown: Any) -> None:
),
]

def test_agg_formula(self, setup_teardown: Any) -> None:
"""
ensures formulas of aggregates work
ex sum(my_attribute) / count(my_attribute)
"""
span_ts = BASE_TIME - timedelta(minutes=1)
write_eap_span(span_ts, {"kyles_measurement": 6}, 10)
write_eap_span(span_ts, {"kyles_measurement": 7}, 2)

ts = Timestamp(seconds=int(BASE_TIME.timestamp()))
hour_ago = int((BASE_TIME - timedelta(hours=1)).timestamp())
message = TraceItemTableRequest(
meta=RequestMeta(
project_ids=[1, 2, 3],
organization_id=1,
cogs_category="something",
referrer="something",
start_timestamp=Timestamp(seconds=hour_ago),
end_timestamp=ts,
trace_item_type=TraceItemType.TRACE_ITEM_TYPE_SPAN,
),
filter=TraceItemFilter(
exists_filter=ExistsFilter(
key=AttributeKey(
type=AttributeKey.TYPE_DOUBLE, name="kyles_measurement"
)
)
),
columns=[
Column(
formula=Column.BinaryFormula(
op=Column.BinaryFormula.OP_DIVIDE,
left=Column(
aggregation=AttributeAggregation(
aggregate=Function.FUNCTION_SUM,
key=AttributeKey(
type=AttributeKey.TYPE_DOUBLE,
name="kyles_measurement",
),
),
label="sum(kyles_measurement)",
),
right=Column(
aggregation=AttributeAggregation(
aggregate=Function.FUNCTION_COUNT,
key=AttributeKey(
type=AttributeKey.TYPE_DOUBLE,
name="kyles_measurement",
),
),
label="count(kyles_measurement)",
),
),
label="sum(kyles_measurement) / count(kyles_measurement)",
),
],
limit=1,
)
response = EndpointTraceItemTable().execute(message)
assert response.column_values == [
TraceItemColumnValues(
attribute_name="sum(kyles_measurement) / count(kyles_measurement)",
results=[
AttributeValue(val_double=(74 / 12)),
],
),
]

def test_non_agg_formula(self, setup_teardown: Any) -> None:
"""
ensures formulas of non-aggregates work
ex: my_attribute + my_other_attribute
"""
span_ts = BASE_TIME - timedelta(minutes=1)
write_eap_span(span_ts, {"kyles_measurement": -1, "my_other_attribute": 1}, 4)
write_eap_span(span_ts, {"kyles_measurement": 3, "my_other_attribute": 2}, 2)
write_eap_span(span_ts, {"kyles_measurement": 10, "my_other_attribute": 3}, 1)

ts = Timestamp(seconds=int(BASE_TIME.timestamp()))
hour_ago = int((BASE_TIME - timedelta(hours=1)).timestamp())
message = TraceItemTableRequest(
meta=RequestMeta(
project_ids=[1, 2, 3],
organization_id=1,
cogs_category="something",
referrer="something",
start_timestamp=Timestamp(seconds=hour_ago),
end_timestamp=ts,
trace_item_type=TraceItemType.TRACE_ITEM_TYPE_SPAN,
),
filter=TraceItemFilter(
exists_filter=ExistsFilter(
key=AttributeKey(
type=AttributeKey.TYPE_DOUBLE, name="kyles_measurement"
)
)
),
columns=[
Column(
formula=Column.BinaryFormula(
op=Column.BinaryFormula.OP_ADD,
left=Column(
key=AttributeKey(
type=AttributeKey.TYPE_DOUBLE, name="kyles_measurement"
)
),
right=Column(
key=AttributeKey(
type=AttributeKey.TYPE_DOUBLE, name="my_other_attribute"
)
),
),
label="kyles_measurement + my_other_attribute",
),
],
order_by=[
TraceItemTableRequest.OrderBy(
column=Column(
formula=Column.BinaryFormula(
op=Column.BinaryFormula.OP_ADD,
left=Column(
key=AttributeKey(
type=AttributeKey.TYPE_DOUBLE,
name="kyles_measurement",
)
),
right=Column(
key=AttributeKey(
type=AttributeKey.TYPE_DOUBLE,
name="my_other_attribute",
)
),
),
label="kyles_measurement + my_other_attribute",
)
),
],
limit=50,
)
response = EndpointTraceItemTable().execute(message)
assert response.column_values == [
TraceItemColumnValues(
attribute_name="kyles_measurement + my_other_attribute",
results=[
AttributeValue(val_double=0),
AttributeValue(val_double=0),
AttributeValue(val_double=0),
AttributeValue(val_double=0),
AttributeValue(val_double=5),
AttributeValue(val_double=5),
AttributeValue(val_double=13),
],
),
]


class TestUtils:
def test_apply_labels_to_columns_backward_compat(self) -> None:
Expand Down
Loading

0 comments on commit 8c1787b

Please sign in to comment.