diff --git a/flinkx-core/.gitignore b/flinkx-core/.gitignore index ca7ca55c4c..9803fe0b0e 100644 --- a/flinkx-core/.gitignore +++ b/flinkx-core/.gitignore @@ -11,3 +11,4 @@ target .classpath *.eclipse.* *.iml +/dependency-reduced-pom.xml diff --git a/flinkx-core/src/main/java/com/dtstack/flinkx/metrics/SimpleAccumulatorGauge.java b/flinkx-core/src/main/java/com/dtstack/flinkx/metrics/SimpleAccumulatorGauge.java index 5c52d9fee5..a4d470b66f 100644 --- a/flinkx-core/src/main/java/com/dtstack/flinkx/metrics/SimpleAccumulatorGauge.java +++ b/flinkx-core/src/main/java/com/dtstack/flinkx/metrics/SimpleAccumulatorGauge.java @@ -18,21 +18,22 @@ package com.dtstack.flinkx.metrics; -import org.apache.flink.api.common.accumulators.SimpleAccumulator; +import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.metrics.Gauge; import java.io.Serializable; /** * company: www.dtstack.com + * * @author: toutian * create: 2019/3/21 */ public class SimpleAccumulatorGauge implements Gauge { - private SimpleAccumulator accumulator; + private Accumulator accumulator; - public SimpleAccumulatorGauge(SimpleAccumulator accumulator) { + public SimpleAccumulatorGauge(Accumulator accumulator) { this.accumulator = accumulator; } diff --git a/flinkx-rdb/src/main/java/com/dtstack/flinkx/rdb/inputformat/JdbcInputFormat.java b/flinkx-rdb/src/main/java/com/dtstack/flinkx/rdb/inputformat/JdbcInputFormat.java index 5da390da37..ff7273944d 100644 --- a/flinkx-rdb/src/main/java/com/dtstack/flinkx/rdb/inputformat/JdbcInputFormat.java +++ b/flinkx-rdb/src/main/java/com/dtstack/flinkx/rdb/inputformat/JdbcInputFormat.java @@ -21,6 +21,7 @@ import com.dtstack.flinkx.common.ColumnType; import com.dtstack.flinkx.constants.Metrics; import com.dtstack.flinkx.enums.EDatabaseType; +import com.dtstack.flinkx.metrics.SimpleAccumulatorGauge; import com.dtstack.flinkx.rdb.DatabaseInterface; import com.dtstack.flinkx.rdb.type.TypeConverterInterface; import com.dtstack.flinkx.rdb.util.DBUtil; @@ -36,6 +37,7 @@ import org.apache.flink.core.io.GenericInputSplit; import org.apache.flink.core.io.InputSplit; import org.apache.flink.core.io.InputSplitAssigner; +import org.apache.flink.metrics.MetricGroup; import org.apache.flink.types.Row; import java.io.IOException; import java.sql.*; @@ -120,16 +122,19 @@ public void configure(Configuration configuration) { private void setMetric(){ Map> accumulatorMap = getRuntimeContext().getAllAccumulators(); + final MetricGroup flinkxOutputMetricGroup = getRuntimeContext().getMetricGroup().addGroup(Metrics.METRIC_GROUP_KEY_FLINKX, Metrics.METRIC_GROUP_VALUE_OUTPUT); if(!accumulatorMap.containsKey(Metrics.TABLE_COL)){ tableColAccumulator = new StringAccumulator(); tableColAccumulator.add(table + "-" + increCol); getRuntimeContext().addAccumulator(Metrics.TABLE_COL,tableColAccumulator); + flinkxOutputMetricGroup.gauge(Metrics.TABLE_COL, new SimpleAccumulatorGauge(tableColAccumulator)); } if(!accumulatorMap.containsKey(Metrics.END_LOCATION)){ endLocationAccumulator = new MaximumAccumulator(); getRuntimeContext().addAccumulator(Metrics.END_LOCATION,endLocationAccumulator); + flinkxOutputMetricGroup.gauge(Metrics.END_LOCATION, new SimpleAccumulatorGauge(endLocationAccumulator)); } if (startLocation != null){ @@ -138,6 +143,7 @@ private void setMetric(){ startLocationAccumulator = new StringAccumulator(); startLocationAccumulator.add(startLocation); getRuntimeContext().addAccumulator(Metrics.START_LOCATION,startLocationAccumulator); + flinkxOutputMetricGroup.gauge(Metrics.START_LOCATION, new SimpleAccumulatorGauge(startLocationAccumulator)); } }