Skip to content

Refactor StreamGraphTranslator and GlutenOperators#32

Open
lgbo-ustc wants to merge 3 commits intoflink_202512from
flink_202512_lgbo
Open

Refactor StreamGraphTranslator and GlutenOperators#32
lgbo-ustc wants to merge 3 commits intoflink_202512from
flink_202512_lgbo

Conversation

@lgbo-ustc
Copy link

@lgbo-ustc lgbo-ustc commented Dec 24, 2025

What changes were proposed in this pull request?

(Please fill in changes proposed in this fix)

重构了 StreamGraphTranslator

StreamGraphTranslator 代码实现进行了一定的简化。 多个 operator 不再合并为一个,但仍然预留了可合并的可能。

重构了 GlutenOperator 的具体实现

之前为每个 Operator 实现了两个版本, 如 GlutenOneInputOperator 和 GlutenVectorOneInputVector。在这里,通过泛型统一到一种实现,只根据输入的数据类型采取不同的转换操作。

解决 RowVector 在算子间如何传递的问题

RowVector在算子间传递涉及到

  • 设置合适的序列化/反序列化实现
  • 设置合理的 key selector
  • 合理安排RowVector的释放位置,避免出现重复释放或者未释放的问题

How was this patch tested?

(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)

(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)

@github-actions
Copy link

Thanks for opening a pull request!

Could you open an issue for this pull request on Github Issues?

https://github.com/apache/incubator-gluten/issues

Then could you also rename commit message and pull request title in the following format?

[GLUTEN-${ISSUES_ID}][COMPONENT]feat/fix: ${detailed message}

See also:

@lgbo-ustc lgbo-ustc force-pushed the flink_202512_lgbo branch 3 times, most recently from 2e7fb8c to b147241 Compare December 25, 2025 01:53
Copy link

@KevinyhZou KevinyhZou left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check

sourceContext.collect(row);
if (element.isRecord()) {
StatefulRecord record = element.asRecord();
if (outClass.isAssignableFrom(RowData.class)) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里为什么要使用两种output?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

下游算子的两种情况

  • 下游算子不能offload。转成rowdata,再传输
  • 下游算子可以offload。直接传递rowvector的指针


@Override
public StatefulRecord deserialize(DataInputView source) throws IOException {
LOG.error("xxx deserialize");

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

去掉

import java.io.Serializable;

// This bridge is used to convert the input data to RowVector.
public class VectorInputBridge<IN> implements Serializable {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

感觉这块最好实现一个类:

StatefulRowData implements RowData {
 private StatefulRecord record;

 public int getInt() {
   return record.getInt();
  } 
......

这样各个operator 的输入输出就可以统一使用RowData类型,也不用专门搞一个Bridge 来转换

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个估计有点复杂,后面再看看

StatefulRecord record,
BufferAllocator allocator,
RowType outputType) {
if (outClass.isAssignableFrom(RowData.class)) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

同上

@github-actions
Copy link

github-actions bot commented Jan 4, 2026

Run Gluten Clickhouse CI on x86

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants