9
9
from hunter .config import Config
10
10
from hunter .data_selector import DataSelector
11
11
from hunter .graphite import DataPoint , Graphite , GraphiteError
12
+ from hunter .postgres import Postgres
12
13
from hunter .series import Metric , Series
13
14
from hunter .test_config import (
14
15
CsvMetric ,
15
16
CsvTestConfig ,
16
17
GraphiteTestConfig ,
17
18
HistoStatTestConfig ,
19
+ PostgresMetric ,
20
+ PostgresTestConfig ,
18
21
TestConfig ,
19
22
)
20
23
from hunter .util import (
@@ -431,17 +434,122 @@ def fetch_all_metric_names(self, test: HistoStatTestConfig) -> List[str]:
431
434
return metric_names
432
435
433
436
437
+ class PostgresImporter :
438
+ __postgres : Postgres
439
+
440
+ def __init__ (self , postgres : Postgres ):
441
+ self .__postgres = postgres
442
+
443
+ @staticmethod
444
+ def __selected_metrics (
445
+ defined_metrics : Dict [str , PostgresMetric ], selected_metrics : Optional [List [str ]]
446
+ ) -> Dict [str , PostgresMetric ]:
447
+
448
+ if selected_metrics is not None :
449
+ return {name : defined_metrics [name ] for name in selected_metrics }
450
+ else :
451
+ return defined_metrics
452
+
453
+ def fetch_data (self , test_conf : TestConfig , selector : DataSelector = DataSelector ()) -> Series :
454
+ if not isinstance (test_conf , PostgresTestConfig ):
455
+ raise ValueError ("Expected PostgresTestConfig" )
456
+
457
+ if selector .branch :
458
+ raise ValueError ("Postgres tests don't support branching yet" )
459
+
460
+ since_time = selector .since_time
461
+ until_time = selector .until_time
462
+ if since_time .timestamp () > until_time .timestamp ():
463
+ raise DataImportError (
464
+ f"Invalid time range: ["
465
+ f"{ format_timestamp (int (since_time .timestamp ()))} , "
466
+ f"{ format_timestamp (int (until_time .timestamp ()))} ]"
467
+ )
468
+ metrics = self .__selected_metrics (test_conf .metrics , selector .metrics )
469
+
470
+ columns , rows = self .__postgres .fetch_data (test_conf .query )
471
+
472
+ # Decide which columns to fetch into which components of the result:
473
+ try :
474
+ time_index : int = columns .index (test_conf .time_column )
475
+ attr_indexes : List [int ] = [columns .index (c ) for c in test_conf .attributes ]
476
+ metric_names = [m .name for m in metrics .values ()]
477
+ metric_columns = [m .column for m in metrics .values ()]
478
+ metric_indexes : List [int ] = [columns .index (c ) for c in metric_columns ]
479
+ except ValueError as err :
480
+ raise DataImportError (f"Column not found { err .args [0 ]} " )
481
+
482
+ time : List [int ] = []
483
+ data : Dict [str , List [float ]] = {}
484
+ for n in metric_names :
485
+ data [n ] = []
486
+ attributes : Dict [str , List [str ]] = {}
487
+ for i in attr_indexes :
488
+ attributes [columns [i ]] = []
489
+
490
+ for row in rows :
491
+ ts : datetime = row [time_index ]
492
+ if since_time is not None and ts < since_time :
493
+ continue
494
+ if until_time is not None and ts >= until_time :
495
+ continue
496
+ time .append (int (ts .timestamp ()))
497
+
498
+ # Read metric values. Note we can still fail on conversion to float,
499
+ # because the user is free to override the column selection and thus
500
+ # they may select a column that contains non-numeric data:
501
+ for (name , i ) in zip (metric_names , metric_indexes ):
502
+ try :
503
+ data [name ].append (float (row [i ]))
504
+ except ValueError as err :
505
+ raise DataImportError (
506
+ "Could not convert value in column " + headers [i ] + ": " + err .args [0 ]
507
+ )
508
+
509
+ # Attributes are just copied as-is, with no conversion:
510
+ for i in attr_indexes :
511
+ attributes [columns [i ]].append (row [i ])
512
+
513
+ # Convert metrics to series.Metrics
514
+ metrics = {m .name : Metric (m .direction , m .scale ) for m in metrics .values ()}
515
+
516
+ # Leave last n points:
517
+ time = time [- selector .last_n_points :]
518
+ tmp = data
519
+ data = {}
520
+ for k , v in tmp .items ():
521
+ data [k ] = v [- selector .last_n_points :]
522
+ tmp = attributes
523
+ attributes = {}
524
+ for k , v in tmp .items ():
525
+ attributes [k ] = v [- selector .last_n_points :]
526
+
527
+ return Series (
528
+ test_conf .name ,
529
+ branch = None ,
530
+ time = time ,
531
+ metrics = metrics ,
532
+ data = data ,
533
+ attributes = attributes ,
534
+ )
535
+
536
+ def fetch_all_metric_names (self , test_conf : PostgresTestConfig ) -> List [str ]:
537
+ return [m for m in test_conf .metrics .keys ()]
538
+
539
+
434
540
class Importers :
435
541
__config : Config
436
542
__csv_importer : Optional [CsvImporter ]
437
543
__graphite_importer : Optional [GraphiteImporter ]
438
544
__histostat_importer : Optional [HistoStatImporter ]
545
+ __postgres_importer : Optional [PostgresImporter ]
439
546
440
547
def __init__ (self , config : Config ):
441
548
self .__config = config
442
549
self .__csv_importer = None
443
550
self .__graphite_importer = None
444
551
self .__histostat_importer = None
552
+ self .__postgres_importer = None
445
553
446
554
def csv_importer (self ) -> CsvImporter :
447
555
if self .__csv_importer is None :
@@ -458,12 +566,19 @@ def histostat_importer(self) -> HistoStatImporter:
458
566
self .__histostat_importer = HistoStatImporter ()
459
567
return self .__histostat_importer
460
568
569
+ def postgres_importer (self ) -> PostgresImporter :
570
+ if self .__postgres_importer is None :
571
+ self .__postgres_importer = PostgresImporter (Postgres (self .__config .postgres ))
572
+ return self .__postgres_importer
573
+
461
574
def get (self , test : TestConfig ) -> Importer :
462
575
if isinstance (test , CsvTestConfig ):
463
576
return self .csv_importer ()
464
577
elif isinstance (test , GraphiteTestConfig ):
465
578
return self .graphite_importer ()
466
579
elif isinstance (test , HistoStatTestConfig ):
467
580
return self .histostat_importer ()
581
+ elif isinstance (test , PostgresTestConfig ):
582
+ return self .postgres_importer ()
468
583
else :
469
584
raise ValueError (f"Unsupported test type { type (test )} " )
0 commit comments