Skip to content

Commit

Permalink
feat: Add Valkey instrumentation support
Browse files Browse the repository at this point in the history
Add support for Valkey instrumentation

Add valkeycluster service

Add release notes

Reformat snapshots

Apply suggested changes

remove ddtrace/contrib/valkey

update suitespec to reflect movement of files

fixes for moving internal

Update releasenotes/notes/add-valkey-support-6cc9f41351dc0cd9.yaml

Update docs/integrations.rst

fix builddocs with init file

use mirror image for testing

add valkeycluster to services

see if more open pattern fixes

switch from entrypoint to command for valkey cluster cont

add environment ip value to container

use rediscluster container for db
  • Loading branch information
AhmadMasry authored and ZStriker19 committed Feb 3, 2025
1 parent 0111544 commit 36c3e7c
Show file tree
Hide file tree
Showing 63 changed files with 3,752 additions and 1 deletion.
2 changes: 1 addition & 1 deletion .github/COMMIT_TEMPLATE.txt
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,5 @@ feat/fix/docs/refactor/ci(xxx): commit title here
# mysqlpython, openai, opentelemetry, opentracer, profile, psycopg, pylibmc, pymemcache,
# pymongo, pymysql, pynamodb, pyodbc, pyramid, pytest, redis, rediscluster, requests, rq,
# sanic, snowflake, sourcecode, sqlalchemy, starlette, stdlib, structlog, subprocess,
# telemetry, test_logging, tornado, tracer, unittest, urllib3, vendor, vertica, wsgi,
# telemetry, test_logging, tornado, tracer, unittest, urllib3, valkey, vendor, vertica, wsgi,
# yaaredis
6 changes: 6 additions & 0 deletions .gitlab/services.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@
redis:
name: registry.ddbuild.io/redis:7.0.7
alias: redis
valkey:
name: registry.ddbuild.io/images/mirror/valkey:8.0-alpine
alias: valkey
kafka:
name: registry.ddbuild.io/images/mirror/apache/kafka:3.8.0
alias: kafka
Expand All @@ -54,6 +57,9 @@
rediscluster:
name: registry.ddbuild.io/images/mirror/grokzen/redis-cluster:6.2.0
alias: rediscluster
valkeycluster:
name: registry.ddbuild.io/images/mirror/grokzen/redis-cluster:6.2.0
alias: valkeycluster
elasticsearch:
name: registry.ddbuild.io/images/mirror/library/elasticsearch:7.17.23
alias: elasticsearch
Expand Down
26 changes: 26 additions & 0 deletions .riot/requirements/11ac941.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#
# This file is autogenerated by pip-compile with Python 3.8
# by the following command:
#
# pip-compile --allow-unsafe --no-annotate .riot/requirements/11ac941.in
#
async-timeout==5.0.1
attrs==24.3.0
coverage[toml]==7.6.1
exceptiongroup==1.2.2
hypothesis==6.45.0
importlib-metadata==8.5.0
iniconfig==2.0.0
mock==5.1.0
opentracing==2.4.0
packaging==24.2
pluggy==1.5.0
pytest==8.3.4
pytest-asyncio==0.23.7
pytest-cov==5.0.0
pytest-mock==3.14.0
pytest-randomly==3.15.0
sortedcontainers==2.4.0
tomli==2.2.1
valkey==6.0.2
zipp==3.20.2
26 changes: 26 additions & 0 deletions .riot/requirements/1e98e9b.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#
# This file is autogenerated by pip-compile with Python 3.9
# by the following command:
#
# pip-compile --allow-unsafe --no-annotate .riot/requirements/1e98e9b.in
#
async-timeout==5.0.1
attrs==24.3.0
coverage[toml]==7.6.10
exceptiongroup==1.2.2
hypothesis==6.45.0
importlib-metadata==8.5.0
iniconfig==2.0.0
mock==5.1.0
opentracing==2.4.0
packaging==24.2
pluggy==1.5.0
pytest==8.3.4
pytest-asyncio==0.23.7
pytest-cov==6.0.0
pytest-mock==3.14.0
pytest-randomly==3.16.0
sortedcontainers==2.4.0
tomli==2.2.1
valkey==6.0.2
zipp==3.21.0
22 changes: 22 additions & 0 deletions .riot/requirements/4aa2a2a.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
#
# This file is autogenerated by pip-compile with Python 3.11
# by the following command:
#
# pip-compile --allow-unsafe --no-annotate .riot/requirements/4aa2a2a.in
#
async-timeout==5.0.1
attrs==24.3.0
coverage[toml]==7.6.10
hypothesis==6.45.0
iniconfig==2.0.0
mock==5.1.0
opentracing==2.4.0
packaging==24.2
pluggy==1.5.0
pytest==8.3.4
pytest-asyncio==0.23.7
pytest-cov==6.0.0
pytest-mock==3.14.0
pytest-randomly==3.16.0
sortedcontainers==2.4.0
valkey==6.0.2
21 changes: 21 additions & 0 deletions .riot/requirements/7219cf4.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#
# This file is autogenerated by pip-compile with Python 3.13
# by the following command:
#
# pip-compile --allow-unsafe --no-annotate .riot/requirements/7219cf4.in
#
attrs==24.3.0
coverage[toml]==7.6.10
hypothesis==6.45.0
iniconfig==2.0.0
mock==5.1.0
opentracing==2.4.0
packaging==24.2
pluggy==1.5.0
pytest==8.3.4
pytest-asyncio==0.23.7
pytest-cov==6.0.0
pytest-mock==3.14.0
pytest-randomly==3.16.0
sortedcontainers==2.4.0
valkey==6.0.2
21 changes: 21 additions & 0 deletions .riot/requirements/b96b665.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#
# This file is autogenerated by pip-compile with Python 3.12
# by the following command:
#
# pip-compile --allow-unsafe --no-annotate .riot/requirements/b96b665.in
#
attrs==24.3.0
coverage[toml]==7.6.10
hypothesis==6.45.0
iniconfig==2.0.0
mock==5.1.0
opentracing==2.4.0
packaging==24.2
pluggy==1.5.0
pytest==8.3.4
pytest-asyncio==0.23.7
pytest-cov==6.0.0
pytest-mock==3.14.0
pytest-randomly==3.16.0
sortedcontainers==2.4.0
valkey==6.0.2
24 changes: 24 additions & 0 deletions .riot/requirements/dd68acc.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
#
# This file is autogenerated by pip-compile with Python 3.10
# by the following command:
#
# pip-compile --allow-unsafe --no-annotate .riot/requirements/dd68acc.in
#
async-timeout==5.0.1
attrs==24.3.0
coverage[toml]==7.6.10
exceptiongroup==1.2.2
hypothesis==6.45.0
iniconfig==2.0.0
mock==5.1.0
opentracing==2.4.0
packaging==24.2
pluggy==1.5.0
pytest==8.3.4
pytest-asyncio==0.23.7
pytest-cov==6.0.0
pytest-mock==3.14.0
pytest-randomly==3.16.0
sortedcontainers==2.4.0
tomli==2.2.1
valkey==6.0.2
1 change: 1 addition & 0 deletions ddtrace/_monkey.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@
"unittest": True,
"coverage": False,
"selenium": True,
"valkey": True,
}


Expand Down
8 changes: 8 additions & 0 deletions ddtrace/_trace/trace_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -649,6 +649,11 @@ def _on_redis_command_post(ctx: core.ExecutionContext, rowcount):
ctx.span.set_metric(db.ROWCOUNT, rowcount)


def _on_valkey_command_post(ctx: core.ExecutionContext, rowcount):
if rowcount is not None:
ctx.span.set_metric(db.ROWCOUNT, rowcount)


def _on_test_visibility_enable(config) -> None:
from ddtrace.internal.ci_visibility import CIVisibility

Expand Down Expand Up @@ -758,6 +763,8 @@ def listen():
core.on("botocore.kinesis.GetRecords.post", _on_botocore_kinesis_getrecords_post)
core.on("redis.async_command.post", _on_redis_command_post)
core.on("redis.command.post", _on_redis_command_post)
core.on("valkey.async_command.post", _on_valkey_command_post)
core.on("valkey.command.post", _on_valkey_command_post)
core.on("azure.functions.request_call_modifier", _on_azure_functions_request_span_modifier)
core.on("azure.functions.start_response", _on_azure_functions_start_response)

Expand Down Expand Up @@ -786,6 +793,7 @@ def listen():
"botocore.patched_stepfunctions_api_call",
"botocore.patched_bedrock_api_call",
"redis.command",
"valkey.command",
"rq.queue.enqueue_job",
"rq.traced_queue_fetch_job",
"rq.worker.perform_job",
Expand Down
96 changes: 96 additions & 0 deletions ddtrace/_trace/utils_valkey.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
"""
Some utils used by the dogtrace valkey integration
"""

from contextlib import contextmanager
from typing import List
from typing import Optional

from ddtrace.constants import _ANALYTICS_SAMPLE_RATE_KEY
from ddtrace.constants import SPAN_KIND
from ddtrace.constants import SPAN_MEASURED_KEY
from ddtrace.contrib import trace_utils
from ddtrace.contrib.internal.valkey_utils import _extract_conn_tags
from ddtrace.ext import SpanKind
from ddtrace.ext import SpanTypes
from ddtrace.ext import db
from ddtrace.ext import valkey as valkeyx
from ddtrace.internal import core
from ddtrace.internal.constants import COMPONENT
from ddtrace.internal.schema import schematize_cache_operation
from ddtrace.internal.utils.formats import stringify_cache_args


format_command_args = stringify_cache_args


def _set_span_tags(
span, pin, config_integration, args: Optional[List], instance, query: Optional[List], is_cluster: bool = False
):
span.set_tag_str(SPAN_KIND, SpanKind.CLIENT)
span.set_tag_str(COMPONENT, config_integration.integration_name)
span.set_tag_str(db.SYSTEM, valkeyx.APP)
span.set_tag(SPAN_MEASURED_KEY)
if query is not None:
span_name = schematize_cache_operation(valkeyx.RAWCMD, cache_provider=valkeyx.APP) # type: ignore[operator]
span.set_tag_str(span_name, query)
if pin.tags:
span.set_tags(pin.tags)
# some valkey clients do not have a connection_pool attribute (ex. aiovalkey v1.3)
if not is_cluster and hasattr(instance, "connection_pool"):
span.set_tags(_extract_conn_tags(instance.connection_pool.connection_kwargs))
if args is not None:
span.set_metric(valkeyx.ARGS_LEN, len(args))
else:
for attr in ("command_stack", "_command_stack"):
if hasattr(instance, attr):
span.set_metric(valkeyx.PIPELINE_LEN, len(getattr(instance, attr)))
# set analytics sample rate if enabled
span.set_tag(_ANALYTICS_SAMPLE_RATE_KEY, config_integration.get_analytics_sample_rate())


@contextmanager
def _instrument_valkey_cmd(pin, config_integration, instance, args):
query = stringify_cache_args(args, cmd_max_len=config_integration.cmd_max_length)
with core.context_with_data(
"valkey.command",
span_name=schematize_cache_operation(valkeyx.CMD, cache_provider=valkeyx.APP),
pin=pin,
service=trace_utils.ext_service(pin, config_integration),
span_type=SpanTypes.VALKEY,
resource=query.split(" ")[0] if config_integration.resource_only_command else query,
) as ctx, ctx.span as span:
_set_span_tags(span, pin, config_integration, args, instance, query)
yield ctx


@contextmanager
def _instrument_valkey_execute_pipeline(pin, config_integration, cmds, instance, is_cluster=False):
cmd_string = resource = "\n".join(cmds)
if config_integration.resource_only_command:
resource = "\n".join([cmd.split(" ")[0] for cmd in cmds])

with pin.tracer.trace(
schematize_cache_operation(valkeyx.CMD, cache_provider=valkeyx.APP),
resource=resource,
service=trace_utils.ext_service(pin, config_integration),
span_type=SpanTypes.VALKEY,
) as span:
_set_span_tags(span, pin, config_integration, None, instance, cmd_string)
yield span


@contextmanager
def _instrument_valkey_execute_async_cluster_pipeline(pin, config_integration, cmds, instance):
cmd_string = resource = "\n".join(cmds)
if config_integration.resource_only_command:
resource = "\n".join([cmd.split(" ")[0] for cmd in cmds])

with pin.tracer.trace(
schematize_cache_operation(valkeyx.CMD, cache_provider=valkeyx.APP),
resource=resource,
service=trace_utils.ext_service(pin, config_integration),
span_type=SpanTypes.VALKEY,
) as span:
_set_span_tags(span, pin, config_integration, None, instance, cmd_string)
yield span
36 changes: 36 additions & 0 deletions ddtrace/contrib/internal/valkey/asyncio_patch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from ddtrace import config
from ddtrace._trace.utils_valkey import _instrument_valkey_cmd
from ddtrace._trace.utils_valkey import _instrument_valkey_execute_async_cluster_pipeline
from ddtrace._trace.utils_valkey import _instrument_valkey_execute_pipeline
from ddtrace.contrib.internal.valkey_utils import _run_valkey_command_async
from ddtrace.internal.utils.formats import stringify_cache_args
from ddtrace.trace import Pin


async def instrumented_async_execute_command(func, instance, args, kwargs):
pin = Pin.get_from(instance)
if not pin or not pin.enabled():
return await func(*args, **kwargs)

with _instrument_valkey_cmd(pin, config.valkey, instance, args) as ctx:
return await _run_valkey_command_async(ctx=ctx, func=func, args=args, kwargs=kwargs)


async def instrumented_async_execute_pipeline(func, instance, args, kwargs):
pin = Pin.get_from(instance)
if not pin or not pin.enabled():
return await func(*args, **kwargs)

cmds = [stringify_cache_args(c, cmd_max_len=config.valkey.cmd_max_length) for c, _ in instance.command_stack]
with _instrument_valkey_execute_pipeline(pin, config.valkey, cmds, instance):
return await func(*args, **kwargs)


async def instrumented_async_execute_cluster_pipeline(func, instance, args, kwargs):
pin = Pin.get_from(instance)
if not pin or not pin.enabled():
return await func(*args, **kwargs)

cmds = [stringify_cache_args(c.args, cmd_max_len=config.valkey.cmd_max_length) for c in instance._command_stack]
with _instrument_valkey_execute_async_cluster_pipeline(pin, config.valkey, cmds, instance):
return await func(*args, **kwargs)
Loading

0 comments on commit 36c3e7c

Please sign in to comment.