Skip to content

Commit

Permalink
add gauge for count
Browse files Browse the repository at this point in the history
  • Loading branch information
toutian committed Mar 21, 2019
1 parent 5b31f77 commit 8e523c7
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,9 @@ public class Metrics {
public static String START_LOCATION = "startLocation";

public static String TABLE_COL = "tableCol";

public static String METRIC_GROUP_KEY_FLINKX = "flinkx";
public static String METRIC_GROUP_VALUE_INPUT = "input";
public static String METRIC_GROUP_VALUE_OUTPUT = "output";

}
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,10 @@ public void open(InputSplit inputSplit) throws IOException {
jobName = vars.get(Metrics.JOB_NAME);
}

inputMetric = new InputMetric(getRuntimeContext());

numReadCounter = getRuntimeContext().getLongCounter(Metrics.NUM_READS);

inputMetric = new InputMetric(getRuntimeContext(), numReadCounter);

openInternal(inputSplit);

if (StringUtils.isNotBlank(this.monitorUrls) && this.bytes > 0) {
Expand All @@ -79,7 +79,6 @@ public void open(InputSplit inputSplit) throws IOException {
@Override
public Row nextRecord(Row row) throws IOException {
numReadCounter.add(1);
inputMetric.getNumRead().inc();

if(byteRateLimiter != null) {
byteRateLimiter.acquire();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,32 +19,25 @@
package com.dtstack.flinkx.metrics;

import com.dtstack.flinkx.constants.Metrics;
import org.apache.flink.api.common.accumulators.LongCounter;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.MetricGroup;

/**
* company: www.dtstack.com
* author: toutian
* @author: toutian
* create: 2019/3/18
*/
public class InputMetric {

private Counter numRead;

private RuntimeContext runtimeContext;

public InputMetric(RuntimeContext runtimeContext) {
public InputMetric(RuntimeContext runtimeContext, LongCounter numRead) {
this.runtimeContext = runtimeContext;

initMetric();
}

private void initMetric() {
numRead = getRuntimeContext().getMetricGroup().counter(Metrics.NUM_READS);
}
final MetricGroup flinkxInput = getRuntimeContext().getMetricGroup().addGroup(Metrics.METRIC_GROUP_KEY_FLINKX, Metrics.METRIC_GROUP_VALUE_INPUT);

public Counter getNumRead() {
return numRead;
flinkxInput.gauge(Metrics.NUM_READS, new SimpleAccumulatorGauge<Long>(numRead));
}

private RuntimeContext getRuntimeContext() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
*
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand All @@ -19,67 +19,36 @@
package com.dtstack.flinkx.metrics;

import com.dtstack.flinkx.constants.Metrics;
import org.apache.flink.api.common.accumulators.IntCounter;
import org.apache.flink.api.common.accumulators.LongCounter;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.MetricGroup;

/**
* company: www.dtstack.com
* author: toutian
*
* @author: toutian
* create: 2019/3/18
*/
public class OutputMetric {

private Counter numErrors;
private Counter numNullErrors;
private Counter numDuplicateErrors;
private Counter numConversionErrors;
private Counter numOtherErrors;
private Counter numWrite;

private transient RuntimeContext runtimeContext;

public OutputMetric(RuntimeContext runtimeContext) {
public OutputMetric(RuntimeContext runtimeContext, IntCounter numErrors, IntCounter numNullErrors,
IntCounter numDuplicateErrors, IntCounter numConversionErrors, IntCounter numOtherErrors, LongCounter numWrite) {
this.runtimeContext = runtimeContext;

initMetric();
}

private void initMetric() {
numErrors = getRuntimeContext().getMetricGroup().counter(Metrics.NUM_ERRORS);
numNullErrors = getRuntimeContext().getMetricGroup().counter(Metrics.NUM_NULL_ERRORS);
numDuplicateErrors = getRuntimeContext().getMetricGroup().counter(Metrics.NUM_DUPLICATE_ERRORS);
numConversionErrors = getRuntimeContext().getMetricGroup().counter(Metrics.NUM_CONVERSION_ERRORS);
numOtherErrors = getRuntimeContext().getMetricGroup().counter(Metrics.NUM_OTHER_ERRORS);
numWrite = getRuntimeContext().getMetricGroup().counter(Metrics.NUM_WRITES);
}

public Counter getNumErrors() {
return numErrors;
}

public Counter getNumNullErrors() {
return numNullErrors;
}

public Counter getNumDuplicateErrors() {
return numDuplicateErrors;
}

public Counter getNumConversionErrors() {
return numConversionErrors;
}

public Counter getNumOtherErrors() {
return numOtherErrors;
}
final MetricGroup flinkxOutput = getRuntimeContext().getMetricGroup().addGroup(Metrics.METRIC_GROUP_KEY_FLINKX, Metrics.METRIC_GROUP_VALUE_OUTPUT);

public Counter getNumWrite() {
return numWrite;
flinkxOutput.gauge(Metrics.NUM_ERRORS, new SimpleAccumulatorGauge<Integer>(numErrors));
flinkxOutput.gauge(Metrics.NUM_NULL_ERRORS, new SimpleAccumulatorGauge<Integer>(numNullErrors));
flinkxOutput.gauge(Metrics.NUM_DUPLICATE_ERRORS, new SimpleAccumulatorGauge<Integer>(numDuplicateErrors));
flinkxOutput.gauge(Metrics.NUM_CONVERSION_ERRORS, new SimpleAccumulatorGauge<Integer>(numConversionErrors));
flinkxOutput.gauge(Metrics.NUM_OTHER_ERRORS, new SimpleAccumulatorGauge<Integer>(numOtherErrors));
flinkxOutput.gauge(Metrics.NUM_WRITES, new SimpleAccumulatorGauge<Long>(numWrite));
}

private RuntimeContext getRuntimeContext() {
return runtimeContext;
}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.dtstack.flinkx.metrics;

import org.apache.flink.api.common.accumulators.SimpleAccumulator;
import org.apache.flink.metrics.Gauge;

import java.io.Serializable;

/**
* company: www.dtstack.com
* @author: toutian
* create: 2019/3/21
*/
public class SimpleAccumulatorGauge<T extends Serializable> implements Gauge<T> {

private SimpleAccumulator<T> accumulator;

public SimpleAccumulatorGauge(SimpleAccumulator<T> accumulator) {
this.accumulator = accumulator;
}

@Override
public T getValue() {
return accumulator.getLocalValue();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,6 @@ public void open(int taskNumber, int numTasks) throws IOException {
context = (StreamingRuntimeContext) getRuntimeContext();
this.numTasks = numTasks;

outputMetric = new OutputMetric(context);

//错误记录数
errCounter = context.getIntCounter(Metrics.NUM_ERRORS);
nullErrCounter = context.getIntCounter(Metrics.NUM_NULL_ERRORS);
Expand All @@ -178,6 +176,8 @@ public void open(int taskNumber, int numTasks) throws IOException {
//总记录数
numWriteCounter = context.getLongCounter(Metrics.NUM_WRITES);

outputMetric = new OutputMetric(context, errCounter, nullErrCounter, duplicateErrCounter, conversionErrCounter, otherErrCounter, numWriteCounter);

Map<String, String> vars = context.getMetricGroup().getAllVariables();

if(vars != null && vars.get(Metrics.JOB_NAME) != null) {
Expand Down Expand Up @@ -241,10 +241,8 @@ protected void writeSingleRecord(Row row) {

// 总记录数加1
numWriteCounter.add(1);
outputMetric.getNumWrite().inc();
} catch(WriteRecordException e) {
errCounter.add(1);
outputMetric.getNumErrors().inc();
String errMsg = e.getMessage();

int pos = e.getColIndex();
Expand All @@ -261,16 +259,12 @@ protected void writeSingleRecord(Row row) {
String errorType = dirtyDataManager.writeData(row, e);
if (ERR_NULL_POINTER.equals(errorType)){
nullErrCounter.add(1);
outputMetric.getNumNullErrors().inc();
} else if(ERR_FORMAT_TRANSFORM.equals(errorType)){
conversionErrCounter.add(1);
outputMetric.getNumConversionErrors().inc();
} else if(ERR_PRIMARY_CONFLICT.equals(errorType)){
duplicateErrCounter.add(1);
outputMetric.getNumDuplicateErrors().inc();
} else {
otherErrCounter.add(1);
outputMetric.getNumOtherErrors().inc();
}
}

Expand Down

0 comments on commit 8e523c7

Please sign in to comment.