From 4bb0eb5f412b403d4811dbbefa631dbb1307aa8c Mon Sep 17 00:00:00 2001 From: zouyunhe Date: Fri, 20 Jun 2025 10:04:09 +0000 Subject: [PATCH 1/7] support tps metric for source task --- .../runtime/metrics/SourceTaskMetrics.java | 73 +++++++++++++++++++ .../operators/GlutenSourceFunction.java | 25 +++++++ 2 files changed, 98 insertions(+) create mode 100644 gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/metrics/SourceTaskMetrics.java diff --git a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/metrics/SourceTaskMetrics.java b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/metrics/SourceTaskMetrics.java new file mode 100644 index 000000000000..b02eb2ed323a --- /dev/null +++ b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/metrics/SourceTaskMetrics.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.table.runtime.metrics; + +import io.github.zhztheplayer.velox4j.query.SerialTask; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; + +import java.util.List; + +public class SourceTaskMetrics { + + private final String keyOperatorType = "operatorType"; + private final String sourceOperatorName = "TableScan"; + private final String keyInputRows = "rawInputRows"; + private final String keyInputBytes = "rawInputBytes"; + private final long metricUpdateInterval = 2000; + private long sourceRecordsOut = 0; + private long sourceBytesOut = 0; + private long lastUpdateTime = System.currentTimeMillis(); + + private static final SourceTaskMetrics instance = new SourceTaskMetrics(); + + private SourceTaskMetrics() {} + + public static SourceTaskMetrics getInstance() { + return instance; + } + + public long getSourceRecordsOut() { + return sourceRecordsOut; + } + + public long getSourceBytesOut() { + return sourceBytesOut; + } + + public boolean updateMetrics(SerialTask task, List planIds) { + long currentTime = System.currentTimeMillis(); + if (currentTime - lastUpdateTime < metricUpdateInterval) { + return false; + } + for (String planId : planIds) { + try { + ObjectNode planStats = task.collectStats().planStats(planId); + JsonNode jsonNode = planStats.get(keyOperatorType); + if (jsonNode.asText().equals(sourceOperatorName)) { + sourceRecordsOut = planStats.get(keyInputRows).asInt(); + sourceBytesOut = planStats.get(keyInputBytes).asInt(); + } + } catch (Exception e) { + + } + } + lastUpdateTime = currentTime; + return true; + } +} diff --git a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSourceFunction.java b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSourceFunction.java index 8bbabb0ea39b..7dd026b25cc7 100644 --- a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSourceFunction.java +++ b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSourceFunction.java @@ -16,6 +16,7 @@ */ package org.apache.gluten.table.runtime.operators; +import org.apache.gluten.table.runtime.metrics.SourceTaskMetrics; import org.apache.gluten.vectorized.FlinkRowToVLVectorConvertor; import io.github.zhztheplayer.velox4j.Velox4j; @@ -33,6 +34,8 @@ import io.github.zhztheplayer.velox4j.session.Session; import io.github.zhztheplayer.velox4j.type.RowType; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.Counter; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; import org.apache.flink.table.data.RowData; @@ -57,6 +60,9 @@ public class GlutenSourceFunction extends RichParallelSourceFunction { private Query query; BufferAllocator allocator; private MemoryManager memoryManager; + private Counter sourceNumRecordsOut; + private Counter sourceNumBytesOut; + private SourceTaskMetrics sourceTaskMetrics; public GlutenSourceFunction( PlanNode planNode, RowType outputType, String id, ConnectorSplit split) { @@ -82,6 +88,24 @@ public ConnectorSplit getConnectorSplit() { return split; } + @Override + public void open(Configuration parameters) throws Exception { + sourceNumRecordsOut = + getRuntimeContext().getMetricGroup().getIOMetricGroup().getNumRecordsOutCounter(); + sourceNumBytesOut = + getRuntimeContext().getMetricGroup().getIOMetricGroup().getNumBytesOutCounter(); + sourceTaskMetrics = SourceTaskMetrics.getInstance(); + } + + private void updateSourceMetrics(SerialTask task) { + if (sourceTaskMetrics.updateMetrics(task, List.of(id))) { + long numRecordsOut = sourceTaskMetrics.getSourceRecordsOut(); + long numBytesOut = sourceTaskMetrics.getSourceBytesOut(); + sourceNumRecordsOut.inc(numRecordsOut - sourceNumRecordsOut.getCount()); + sourceNumBytesOut.inc(numBytesOut - sourceNumBytesOut.getCount()); + } + } + @Override public void run(SourceContext sourceContext) throws Exception { LOG.debug("Running GlutenSourceFunction: " + Serde.toJson(planNode)); @@ -108,6 +132,7 @@ public void run(SourceContext sourceContext) throws Exception { LOG.info("Velox task finished"); break; } + updateSourceMetrics(task); } task.close(); From fa8d81acdcfd67a86273568d8dea3397a0cb93cc Mon Sep 17 00:00:00 2001 From: zouyunhe Date: Fri, 20 Jun 2025 10:15:36 +0000 Subject: [PATCH 2/7] return false when metrics update exception --- .../apache/gluten/table/runtime/metrics/SourceTaskMetrics.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/metrics/SourceTaskMetrics.java b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/metrics/SourceTaskMetrics.java index b02eb2ed323a..3473e6c655d3 100644 --- a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/metrics/SourceTaskMetrics.java +++ b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/metrics/SourceTaskMetrics.java @@ -64,7 +64,7 @@ public boolean updateMetrics(SerialTask task, List planIds) { sourceBytesOut = planStats.get(keyInputBytes).asInt(); } } catch (Exception e) { - + return false; } } lastUpdateTime = currentTime; From 36a86fa56069508e03c9e0f95959ab33e5e36404 Mon Sep 17 00:00:00 2001 From: zouyunhe Date: Fri, 20 Jun 2025 10:21:03 +0000 Subject: [PATCH 3/7] use source id --- .../runtime/metrics/SourceTaskMetrics.java | 22 ++++++++----------- .../operators/GlutenSourceFunction.java | 2 +- 2 files changed, 10 insertions(+), 14 deletions(-) diff --git a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/metrics/SourceTaskMetrics.java b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/metrics/SourceTaskMetrics.java index 3473e6c655d3..8e4460eed2a7 100644 --- a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/metrics/SourceTaskMetrics.java +++ b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/metrics/SourceTaskMetrics.java @@ -21,8 +21,6 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; -import java.util.List; - public class SourceTaskMetrics { private final String keyOperatorType = "operatorType"; @@ -50,22 +48,20 @@ public long getSourceBytesOut() { return sourceBytesOut; } - public boolean updateMetrics(SerialTask task, List planIds) { + public boolean updateMetrics(SerialTask task, String planId) { long currentTime = System.currentTimeMillis(); if (currentTime - lastUpdateTime < metricUpdateInterval) { return false; } - for (String planId : planIds) { - try { - ObjectNode planStats = task.collectStats().planStats(planId); - JsonNode jsonNode = planStats.get(keyOperatorType); - if (jsonNode.asText().equals(sourceOperatorName)) { - sourceRecordsOut = planStats.get(keyInputRows).asInt(); - sourceBytesOut = planStats.get(keyInputBytes).asInt(); - } - } catch (Exception e) { - return false; + try { + ObjectNode planStats = task.collectStats().planStats(planId); + JsonNode jsonNode = planStats.get(keyOperatorType); + if (jsonNode.asText().equals(sourceOperatorName)) { + sourceRecordsOut = planStats.get(keyInputRows).asInt(); + sourceBytesOut = planStats.get(keyInputBytes).asInt(); } + } catch (Exception e) { + return false; } lastUpdateTime = currentTime; return true; diff --git a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSourceFunction.java b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSourceFunction.java index 7dd026b25cc7..56096fe417aa 100644 --- a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSourceFunction.java +++ b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSourceFunction.java @@ -98,7 +98,7 @@ public void open(Configuration parameters) throws Exception { } private void updateSourceMetrics(SerialTask task) { - if (sourceTaskMetrics.updateMetrics(task, List.of(id))) { + if (sourceTaskMetrics.updateMetrics(task, id)) { long numRecordsOut = sourceTaskMetrics.getSourceRecordsOut(); long numBytesOut = sourceTaskMetrics.getSourceBytesOut(); sourceNumRecordsOut.inc(numRecordsOut - sourceNumRecordsOut.getCount()); From c118f9334d3b85020ce7f6b3d3da312af08dac96 Mon Sep 17 00:00:00 2001 From: zouyunhe Date: Tue, 23 Sep 2025 07:24:47 +0000 Subject: [PATCH 4/7] fix reviews --- .../operators/GlutenSourceFunction.java | 23 +------------------ .../operators/GlutenVectorSourceFunction.java | 22 ++++++++++++++++++ 2 files changed, 23 insertions(+), 22 deletions(-) diff --git a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSourceFunction.java b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSourceFunction.java index 118ec52b0eea..0d7c11b82731 100644 --- a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSourceFunction.java +++ b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSourceFunction.java @@ -16,7 +16,6 @@ */ package org.apache.gluten.table.runtime.operators; -import org.apache.gluten.table.runtime.metrics.SourceTaskMetrics; import org.apache.gluten.vectorized.FlinkRowToVLVectorConvertor; import io.github.zhztheplayer.velox4j.Velox4j; @@ -36,7 +35,6 @@ import io.github.zhztheplayer.velox4j.type.RowType; import org.apache.flink.configuration.Configuration; -import org.apache.flink.metrics.Counter; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; import org.apache.flink.table.data.RowData; @@ -62,9 +60,6 @@ public class GlutenSourceFunction extends RichParallelSourceFunction { private Query query; BufferAllocator allocator; private MemoryManager memoryManager; - private Counter sourceNumRecordsOut; - private Counter sourceNumBytesOut; - private SourceTaskMetrics sourceTaskMetrics; public GlutenSourceFunction( StatefulPlanNode planNode, @@ -94,22 +89,7 @@ public ConnectorSplit getConnectorSplit() { } @Override - public void open(Configuration parameters) throws Exception { - sourceNumRecordsOut = - getRuntimeContext().getMetricGroup().getIOMetricGroup().getNumRecordsOutCounter(); - sourceNumBytesOut = - getRuntimeContext().getMetricGroup().getIOMetricGroup().getNumBytesOutCounter(); - sourceTaskMetrics = SourceTaskMetrics.getInstance(); - } - - private void updateSourceMetrics(SerialTask task) { - if (sourceTaskMetrics.updateMetrics(task, id)) { - long numRecordsOut = sourceTaskMetrics.getSourceRecordsOut(); - long numBytesOut = sourceTaskMetrics.getSourceBytesOut(); - sourceNumRecordsOut.inc(numRecordsOut - sourceNumRecordsOut.getCount()); - sourceNumBytesOut.inc(numBytesOut - sourceNumBytesOut.getCount()); - } - } + public void open(Configuration parameters) throws Exception {} @Override public void run(SourceContext sourceContext) throws Exception { @@ -140,7 +120,6 @@ public void run(SourceContext sourceContext) throws Exception { LOG.info("Velox task finished"); break; } - updateSourceMetrics(task); } task.close(); diff --git a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorSourceFunction.java b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorSourceFunction.java index a4d6745163f3..b79fcdc58e87 100644 --- a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorSourceFunction.java +++ b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorSourceFunction.java @@ -16,6 +16,8 @@ */ package org.apache.gluten.table.runtime.operators; +import org.apache.gluten.table.runtime.metrics.SourceTaskMetrics; + import io.github.zhztheplayer.velox4j.Velox4j; import io.github.zhztheplayer.velox4j.config.Config; import io.github.zhztheplayer.velox4j.config.ConnectorConfig; @@ -33,6 +35,7 @@ import org.apache.flink.api.common.state.CheckpointListener; import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.Counter; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; @@ -65,6 +68,9 @@ public class GlutenVectorSourceFunction extends RichParallelSourceFunction sourceContext) throws Exception { LOG.info("Velox task finished"); break; } + updateSourceMetrics(task); } task.close(); From b6b813cb2fe73c6082658cbd5306f74b1265d011 Mon Sep 17 00:00:00 2001 From: zouyunhe Date: Tue, 23 Sep 2025 07:26:52 +0000 Subject: [PATCH 5/7] remove useless changes --- .../gluten/table/runtime/operators/GlutenSourceFunction.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSourceFunction.java b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSourceFunction.java index 0d7c11b82731..ddcd5cae9436 100644 --- a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSourceFunction.java +++ b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSourceFunction.java @@ -34,7 +34,6 @@ import io.github.zhztheplayer.velox4j.stateful.StatefulElement; import io.github.zhztheplayer.velox4j.type.RowType; -import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; import org.apache.flink.table.data.RowData; @@ -88,9 +87,6 @@ public ConnectorSplit getConnectorSplit() { return split; } - @Override - public void open(Configuration parameters) throws Exception {} - @Override public void run(SourceContext sourceContext) throws Exception { LOG.debug("Running GlutenSourceFunction: " + Serde.toJson(planNode)); From 67714dd9cc4696a5cdb18f0ca4b25faecc903530 Mon Sep 17 00:00:00 2001 From: zouyunhe Date: Tue, 23 Sep 2025 08:30:00 +0000 Subject: [PATCH 6/7] optimize code --- .../runtime/metrics/SourceTaskMetrics.java | 30 ++++++++----------- .../operators/GlutenVectorSourceFunction.java | 23 ++------------ 2 files changed, 15 insertions(+), 38 deletions(-) diff --git a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/metrics/SourceTaskMetrics.java b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/metrics/SourceTaskMetrics.java index 8e4460eed2a7..5bbbb028b6a4 100644 --- a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/metrics/SourceTaskMetrics.java +++ b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/metrics/SourceTaskMetrics.java @@ -18,6 +18,9 @@ import io.github.zhztheplayer.velox4j.query.SerialTask; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.groups.OperatorMetricGroup; + import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; @@ -28,24 +31,13 @@ public class SourceTaskMetrics { private final String keyInputRows = "rawInputRows"; private final String keyInputBytes = "rawInputBytes"; private final long metricUpdateInterval = 2000; - private long sourceRecordsOut = 0; - private long sourceBytesOut = 0; + private Counter sourceNumRecordsOut; + private Counter sourceNumBytesOut; private long lastUpdateTime = System.currentTimeMillis(); - private static final SourceTaskMetrics instance = new SourceTaskMetrics(); - - private SourceTaskMetrics() {} - - public static SourceTaskMetrics getInstance() { - return instance; - } - - public long getSourceRecordsOut() { - return sourceRecordsOut; - } - - public long getSourceBytesOut() { - return sourceBytesOut; + public SourceTaskMetrics(OperatorMetricGroup metricGroup) { + sourceNumRecordsOut = metricGroup.getIOMetricGroup().getNumRecordsOutCounter(); + sourceNumBytesOut = metricGroup.getIOMetricGroup().getNumBytesOutCounter(); } public boolean updateMetrics(SerialTask task, String planId) { @@ -57,8 +49,10 @@ public boolean updateMetrics(SerialTask task, String planId) { ObjectNode planStats = task.collectStats().planStats(planId); JsonNode jsonNode = planStats.get(keyOperatorType); if (jsonNode.asText().equals(sourceOperatorName)) { - sourceRecordsOut = planStats.get(keyInputRows).asInt(); - sourceBytesOut = planStats.get(keyInputBytes).asInt(); + long numRecordsOut = planStats.get(keyInputRows).asInt(); + long numBytesOut = planStats.get(keyInputBytes).asInt(); + sourceNumRecordsOut.inc(numRecordsOut - sourceNumRecordsOut.getCount()); + sourceNumBytesOut.inc(numBytesOut - sourceNumBytesOut.getCount()); } } catch (Exception e) { return false; diff --git a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorSourceFunction.java b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorSourceFunction.java index b79fcdc58e87..7ea06cc58ca1 100644 --- a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorSourceFunction.java +++ b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorSourceFunction.java @@ -35,7 +35,6 @@ import org.apache.flink.api.common.state.CheckpointListener; import org.apache.flink.configuration.Configuration; -import org.apache.flink.metrics.Counter; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; @@ -68,9 +67,7 @@ public class GlutenVectorSourceFunction extends RichParallelSourceFunction sourceContext) throws Exception { LOG.info("Velox task finished"); break; } - updateSourceMetrics(task); + taskMetrics.updateMetrics(task, id); } task.close(); From a68f1ca5ed66a1c17d92f2b8a56d71939781c7fe Mon Sep 17 00:00:00 2001 From: zouyunhe Date: Fri, 10 Oct 2025 03:01:08 +0000 Subject: [PATCH 7/7] fix ci --- .../table/runtime/operators/GlutenVectorSourceFunction.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorSourceFunction.java b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorSourceFunction.java index edfefd1ac222..472bd0bfed0a 100644 --- a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorSourceFunction.java +++ b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorSourceFunction.java @@ -16,8 +16,8 @@ */ package org.apache.gluten.table.runtime.operators; -import org.apache.gluten.table.runtime.metrics.SourceTaskMetrics; import org.apache.gluten.table.runtime.config.VeloxQueryConfig; +import org.apache.gluten.table.runtime.metrics.SourceTaskMetrics; import io.github.zhztheplayer.velox4j.Velox4j; import io.github.zhztheplayer.velox4j.config.ConnectorConfig;