Skip to content

Commit

Permalink
测试 promethues 数据的准确性
Browse files Browse the repository at this point in the history
  • Loading branch information
toutian committed Mar 21, 2019
1 parent 8e523c7 commit 24f6390
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 3 deletions.
1 change: 1 addition & 0 deletions flinkx-core/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ target
.classpath
*.eclipse.*
*.iml
/dependency-reduced-pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,22 @@

package com.dtstack.flinkx.metrics;

import org.apache.flink.api.common.accumulators.SimpleAccumulator;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.metrics.Gauge;

import java.io.Serializable;

/**
* company: www.dtstack.com
*
* @author: toutian
* create: 2019/3/21
*/
public class SimpleAccumulatorGauge<T extends Serializable> implements Gauge<T> {

private SimpleAccumulator<T> accumulator;
private Accumulator<T, T> accumulator;

public SimpleAccumulatorGauge(SimpleAccumulator<T> accumulator) {
public SimpleAccumulatorGauge(Accumulator<T, T> accumulator) {
this.accumulator = accumulator;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.dtstack.flinkx.common.ColumnType;
import com.dtstack.flinkx.constants.Metrics;
import com.dtstack.flinkx.enums.EDatabaseType;
import com.dtstack.flinkx.metrics.SimpleAccumulatorGauge;
import com.dtstack.flinkx.rdb.DatabaseInterface;
import com.dtstack.flinkx.rdb.type.TypeConverterInterface;
import com.dtstack.flinkx.rdb.util.DBUtil;
Expand All @@ -36,6 +37,7 @@
import org.apache.flink.core.io.GenericInputSplit;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.core.io.InputSplitAssigner;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.types.Row;
import java.io.IOException;
import java.sql.*;
Expand Down Expand Up @@ -120,16 +122,19 @@ public void configure(Configuration configuration) {

private void setMetric(){
Map<String, Accumulator<?, ?>> accumulatorMap = getRuntimeContext().getAllAccumulators();
final MetricGroup flinkxOutputMetricGroup = getRuntimeContext().getMetricGroup().addGroup(Metrics.METRIC_GROUP_KEY_FLINKX, Metrics.METRIC_GROUP_VALUE_OUTPUT);

if(!accumulatorMap.containsKey(Metrics.TABLE_COL)){
tableColAccumulator = new StringAccumulator();
tableColAccumulator.add(table + "-" + increCol);
getRuntimeContext().addAccumulator(Metrics.TABLE_COL,tableColAccumulator);
flinkxOutputMetricGroup.gauge(Metrics.TABLE_COL, new SimpleAccumulatorGauge<String>(tableColAccumulator));
}

if(!accumulatorMap.containsKey(Metrics.END_LOCATION)){
endLocationAccumulator = new MaximumAccumulator();
getRuntimeContext().addAccumulator(Metrics.END_LOCATION,endLocationAccumulator);
flinkxOutputMetricGroup.gauge(Metrics.END_LOCATION, new SimpleAccumulatorGauge<String>(endLocationAccumulator));
}

if (startLocation != null){
Expand All @@ -138,6 +143,7 @@ private void setMetric(){
startLocationAccumulator = new StringAccumulator();
startLocationAccumulator.add(startLocation);
getRuntimeContext().addAccumulator(Metrics.START_LOCATION,startLocationAccumulator);
flinkxOutputMetricGroup.gauge(Metrics.START_LOCATION, new SimpleAccumulatorGauge<String>(startLocationAccumulator));
}
}

Expand Down

0 comments on commit 24f6390

Please sign in to comment.