77import clickhouse_connect
88from cachetools import TTLCache
99
10- from datadog_checks .base import AgentCheck , ConfigurationError , is_affirmative
10+ from datadog_checks .base import AgentCheck
1111from datadog_checks .base .utils .db import QueryManager
1212from datadog_checks .base .utils .db .utils import TagManager , default_json_event_encoding
1313
1414from . import queries
1515from .__about__ import __version__
1616from .completed_query_samples import ClickhouseCompletedQuerySamples
17+ from .config import build_config
1718from .statement_samples import ClickhouseStatementSamples
1819from .statements import ClickhouseStatementMetrics
1920from .utils import ErrorSanitizer
@@ -31,44 +32,32 @@ class ClickhouseCheck(AgentCheck):
3132 def __init__ (self , name , init_config , instances ):
3233 super (ClickhouseCheck , self ).__init__ (name , init_config , instances )
3334
34- self ._server = self .instance .get ('server' , '' )
35- self ._port = self .instance .get ('port' )
36- self ._db = self .instance .get ('db' , 'default' )
37- self ._user = self .instance .get ('username' , self .instance .get ('user' , 'default' ))
38- self ._password = self .instance .get ('password' , '' )
39- self ._connect_timeout = float (self .instance .get ('connect_timeout' , 10 ))
40- self ._read_timeout = float (self .instance .get ('read_timeout' , 10 ))
41- self ._compression = self .instance .get ('compression' , False )
42- self ._tls_verify = is_affirmative (self .instance .get ('tls_verify' , False ))
43- self ._tls_ca_cert = self .instance .get ('tls_ca_cert' , None )
44- self ._verify = self .instance .get ('verify' , True )
45-
46- # Single endpoint mode configuration
47- # When true, uses clusterAllReplicas() to query system tables across all nodes
48- self ._single_endpoint_mode = is_affirmative (self .instance .get ('single_endpoint_mode' , False ))
49-
50- # DBM-related properties
35+ # Build typed configuration
36+ config , validation_result = build_config (self )
37+ self ._config = config
38+ self ._validation_result = validation_result
39+
40+ # Log validation warnings (errors will be raised in validate_config)
41+ for warning in validation_result .warnings :
42+ self .log .warning (warning )
43+
44+ # DBM-related properties (computed lazily)
5145 self ._resolved_hostname = None
5246 self ._database_identifier = None
5347 self ._agent_hostname = None
5448
55- # Database instance metadata collection interval
56- self ._database_instance_collection_interval = float (
57- self .instance .get ('database_instance_collection_interval' , 300 )
58- )
59-
6049 # _database_instance_emitted: limit the collection and transmission of the database instance metadata
6150 self ._database_instance_emitted = TTLCache (
6251 maxsize = 1 ,
63- ttl = self ._database_instance_collection_interval ,
52+ ttl = self ._config . database_instance_collection_interval ,
6453 )
6554
6655 # Initialize TagManager for tag management (similar to MySQL)
6756 self .tag_manager = TagManager ()
68- self .tag_manager .set_tags_from_list (self .instance . get ( ' tags' , []) , replace = True )
57+ self .tag_manager .set_tags_from_list (self ._config . tags , replace = True )
6958 self ._add_core_tags ()
7059
71- self ._error_sanitizer = ErrorSanitizer (self ._password )
60+ self ._error_sanitizer = ErrorSanitizer (self ._config . password )
7261 self .check_initializations .append (self .validate_config )
7362
7463 # We'll connect on the first check run
@@ -92,64 +81,25 @@ def __init__(self, name, init_config, instances):
9281 self .check_initializations .append (self ._query_manager .compile_queries )
9382
9483 # Initialize DBM components if enabled
95- self ._dbm_enabled = is_affirmative ( self . instance . get ( 'dbm' , False ) )
84+ self ._init_dbm_components ( )
9685
86+ def _init_dbm_components (self ):
87+ """Initialize DBM components based on typed configuration."""
9788 # Initialize query metrics (from system.query_log - analogous to pg_stat_statements)
98- self ._query_metrics_config = self .instance .get ('query_metrics' , {})
99- if self ._dbm_enabled and self ._query_metrics_config .get ('enabled' , True ):
100- # Create a simple config object for query metrics
101- class QueryMetricsConfig :
102- def __init__ (self , config_dict ):
103- self .enabled = config_dict .get ('enabled' , True )
104- self .collection_interval = config_dict .get ('collection_interval' , 10 )
105- self .run_sync = config_dict .get ('run_sync' , False )
106- self .full_statement_text_cache_max_size = config_dict .get (
107- 'full_statement_text_cache_max_size' , 10000
108- )
109- self .full_statement_text_samples_per_hour_per_query = config_dict .get (
110- 'full_statement_text_samples_per_hour_per_query' , 1
111- )
112-
113- self .statement_metrics = ClickhouseStatementMetrics (self , QueryMetricsConfig (self ._query_metrics_config ))
89+ if self ._config .dbm and self ._config .query_metrics .enabled :
90+ self .statement_metrics = ClickhouseStatementMetrics (self , self ._config .query_metrics )
11491 else :
11592 self .statement_metrics = None
11693
11794 # Initialize query samples (from system.processes - analogous to pg_stat_activity)
118- self ._query_samples_config = self .instance .get ('query_samples' , {})
119- if self ._dbm_enabled and self ._query_samples_config .get ('enabled' , True ):
120- # Create a simple config object for statement samples
121- class QuerySamplesConfig :
122- def __init__ (self , config_dict ):
123- self .enabled = config_dict .get ('enabled' , True )
124- self .collection_interval = config_dict .get ('collection_interval' , 1 )
125- self .run_sync = config_dict .get ('run_sync' , False )
126- self .samples_per_hour_per_query = config_dict .get ('samples_per_hour_per_query' , 15 )
127- self .seen_samples_cache_maxsize = config_dict .get ('seen_samples_cache_maxsize' , 10000 )
128- # Activity snapshot configuration
129- self .activity_enabled = config_dict .get ('activity_enabled' , True )
130- self .activity_collection_interval = config_dict .get ('activity_collection_interval' , 10 )
131- self .activity_max_rows = config_dict .get ('activity_max_rows' , 1000 )
132-
133- self .statement_samples = ClickhouseStatementSamples (self , QuerySamplesConfig (self ._query_samples_config ))
95+ if self ._config .dbm and self ._config .query_samples .enabled :
96+ self .statement_samples = ClickhouseStatementSamples (self , self ._config .query_samples )
13497 else :
13598 self .statement_samples = None
13699
137100 # Initialize completed query samples (from system.query_log - completed queries)
138- self ._completed_query_samples_config = self .instance .get ('completed_query_samples' , {})
139- if self ._dbm_enabled and self ._completed_query_samples_config .get ('enabled' , True ):
140- # Create a simple config object for completed query samples
141- class CompletedQuerySamplesConfig :
142- def __init__ (self , config_dict ):
143- self .enabled = config_dict .get ('enabled' , True )
144- self .collection_interval = config_dict .get ('collection_interval' , 10 )
145- self .run_sync = config_dict .get ('run_sync' , False )
146- self .samples_per_hour_per_query = config_dict .get ('samples_per_hour_per_query' , 15 )
147- self .seen_samples_cache_maxsize = config_dict .get ('seen_samples_cache_maxsize' , 10000 )
148- self .max_samples_per_collection = config_dict .get ('max_samples_per_collection' , 1000 )
149-
150- self .completed_query_samples = ClickhouseCompletedQuerySamples (
151- self , CompletedQuerySamplesConfig (self ._completed_query_samples_config )
152- )
101+ if self ._config .dbm and self ._config .completed_query_samples .enabled :
102+ self .completed_query_samples = ClickhouseCompletedQuerySamples (self , self ._config .completed_query_samples )
153103 else :
154104 self .completed_query_samples = None
155105
@@ -163,12 +113,25 @@ def _add_core_tags(self):
163113 Add tags that should be attached to every metric/event.
164114 These are core identification tags for the ClickHouse instance.
165115 """
166- self .tag_manager .set_tag ("server" , self ._server , replace = True )
167- self .tag_manager .set_tag ("port" , str (self ._port ), replace = True )
168- self .tag_manager .set_tag ("db" , self ._db , replace = True )
116+ self .tag_manager .set_tag ("server" , self ._config . server , replace = True )
117+ self .tag_manager .set_tag ("port" , str (self ._config . port ), replace = True )
118+ self .tag_manager .set_tag ("db" , self ._config . db , replace = True )
169119 self .tag_manager .set_tag ("database_hostname" , self .reported_hostname , replace = True )
170120 self .tag_manager .set_tag ("database_instance" , self .database_identifier , replace = True )
171121
122+ def validate_config (self ):
123+ """
124+ Validate the configuration and raise an error if invalid.
125+ This is called during check initialization.
126+ """
127+ from datadog_checks .base import ConfigurationError
128+
129+ if not self ._validation_result .valid :
130+ for error in self ._validation_result .errors :
131+ self .log .error (str (error ))
132+ if self ._validation_result .errors :
133+ raise ConfigurationError (str (self ._validation_result .errors [0 ]))
134+
172135 def _send_database_instance_metadata (self ):
173136 """Send database instance metadata to the metadata intake."""
174137 if self .database_identifier not in self ._database_instance_emitted :
@@ -186,21 +149,21 @@ def _send_database_instance_metadata(self):
186149
187150 event = {
188151 "host" : self .reported_hostname ,
189- "port" : self ._port ,
152+ "port" : self ._config . port ,
190153 "database_instance" : self .database_identifier ,
191154 "database_hostname" : self .reported_hostname ,
192155 "agent_version" : datadog_agent .get_version (),
193156 "ddagenthostname" : self .agent_hostname ,
194157 "dbms" : "clickhouse" ,
195158 "kind" : "database_instance" ,
196- "collection_interval" : self ._database_instance_collection_interval ,
159+ "collection_interval" : self ._config . database_instance_collection_interval ,
197160 "dbms_version" : version ,
198161 "integration_version" : __version__ ,
199162 "tags" : tags_no_db ,
200163 "timestamp" : time () * 1000 ,
201164 "metadata" : {
202- "dbm" : self ._dbm_enabled ,
203- "connection_host" : self ._server ,
165+ "dbm" : self ._config . dbm ,
166+ "connection_host" : self ._config . server ,
204167 },
205168 }
206169
@@ -241,15 +204,15 @@ def execute_query_raw(self, query):
241204
242205 def _get_debug_tags (self ):
243206 """Return debug tags for metrics"""
244- return ['server:{}' .format (self ._server )]
207+ return ['server:{}' .format (self ._config . server )]
245208
246209 @property
247210 def reported_hostname (self ):
248211 """
249212 Get the hostname to be reported in metrics and events.
250213 """
251214 if self ._resolved_hostname is None :
252- self ._resolved_hostname = self ._server
215+ self ._resolved_hostname = self ._config . server
253216 return self ._resolved_hostname
254217
255218 @property
@@ -266,7 +229,7 @@ def database_identifier(self):
266229 """
267230 if self ._database_identifier is None :
268231 # Create a unique identifier based on server, port, and database name
269- self ._database_identifier = "{}:{}:{}" .format (self ._server , self ._port , self ._db )
232+ self ._database_identifier = "{}:{}:{}" .format (self ._config . server , self ._config . port , self ._config . db )
270233 return self ._database_identifier
271234
272235 @property
@@ -278,7 +241,7 @@ def is_single_endpoint_mode(self):
278241 across all nodes in the cluster, since replicas are abstracted behind a single
279242 endpoint (e.g., load balancer or managed service like ClickHouse Cloud).
280243 """
281- return self ._single_endpoint_mode
244+ return self ._config . single_endpoint_mode
282245
283246 def get_system_table (self , table_name ):
284247 """
@@ -299,24 +262,14 @@ def get_system_table(self, table_name):
299262 >>> self.get_system_table('query_log')
300263 "system.query_log" # Direct connection
301264 """
302- if self ._single_endpoint_mode :
265+ if self ._config . single_endpoint_mode :
303266 # Single endpoint mode: Use clusterAllReplicas to query all nodes
304267 # The cluster name is 'default' for ClickHouse Cloud and most setups
305268 return f"clusterAllReplicas('default', system.{ table_name } )"
306269 else :
307270 # Direct connection: Query the local system table directly
308271 return f"system.{ table_name } "
309272
310- def validate_config (self ):
311- if not self ._server :
312- raise ConfigurationError ('the `server` setting is required' )
313-
314- # Validate compression type
315- if self ._compression and self ._compression not in ['lz4' , 'zstd' , 'br' , 'gzip' ]:
316- raise ConfigurationError (
317- f'Invalid compression type "{ self ._compression } ". Valid values are: lz4, zstd, br, gzip'
318- )
319-
320273 def ping_clickhouse (self ):
321274 return self ._client .ping ()
322275
@@ -338,20 +291,22 @@ def connect(self):
338291 self ._client = None
339292
340293 try :
294+ # Convert compression None to False for get_client
295+ compress = self ._config .compression if self ._config .compression else False
341296 client = clickhouse_connect .get_client (
342297 # https://clickhouse.com/docs/integrations/python#connection-arguments
343- host = self ._server ,
344- port = self ._port ,
345- username = self ._user ,
346- password = self ._password ,
347- database = self ._db ,
348- secure = self ._tls_verify ,
349- connect_timeout = self ._connect_timeout ,
350- send_receive_timeout = self ._read_timeout ,
298+ host = self ._config .server ,
299+ port = self ._config .port ,
300+ username = self ._config .username ,
301+ password = self ._config .password ,
302+ database = self ._config .db ,
303+ connect_timeout = self ._config .connect_timeout ,
304+ send_receive_timeout = self ._config .read_timeout ,
305+ secure = self ._config .tls_verify ,
306+ ca_cert = self ._config .tls_ca_cert ,
307+ verify = self ._config .verify ,
351308 client_name = f'datadog-{ self .check_id } ' ,
352- compress = self ._compression ,
353- ca_cert = self ._tls_ca_cert ,
354- verify = self ._verify ,
309+ compress = compress ,
355310 # https://clickhouse.com/docs/integrations/language-clients/python/driver-api#multi-threaded-applications
356311 autogenerate_session_id = False ,
357312 # https://clickhouse.com/docs/integrations/python#settings-argument
@@ -373,19 +328,21 @@ def create_dbm_client(self):
373328 This prevents concurrent query errors when multiple jobs run simultaneously.
374329 """
375330 try :
331+ # Convert compression None to False for get_client
332+ compress = self ._config .compression if self ._config .compression else False
376333 client = clickhouse_connect .get_client (
377- host = self ._server ,
378- port = self ._port ,
379- username = self ._user ,
380- password = self ._password ,
381- database = self ._db ,
382- secure = self ._tls_verify ,
383- connect_timeout = self ._connect_timeout ,
384- send_receive_timeout = self ._read_timeout ,
334+ host = self ._config . server ,
335+ port = self ._config . port ,
336+ username = self ._config . username ,
337+ password = self ._config . password ,
338+ database = self ._config . db ,
339+ secure = self ._config . tls_verify ,
340+ connect_timeout = self ._config . connect_timeout ,
341+ send_receive_timeout = self ._config . read_timeout ,
385342 client_name = f'datadog-dbm-{ self .check_id } ' ,
386- compress = self . _compression ,
387- ca_cert = self ._tls_ca_cert ,
388- verify = self ._verify ,
343+ compress = compress ,
344+ ca_cert = self ._config . tls_ca_cert ,
345+ verify = self ._config . verify ,
389346 settings = {},
390347 )
391348 return client
0 commit comments