Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions sdks/go/pkg/beam/io/xlang/debeziumio/debezium.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ type readFromDebeziumSchema struct {
Host string
Port string
MaxNumberOfRecords *int64
MaxTimeToRun *int64
ConnectionProperties []string
}

Expand Down Expand Up @@ -133,6 +134,13 @@ func MaxRecord(r int64) readOption {
}
}

// MaxTimeToRun specifies maximum number of milliseconds to run before stop.
func MaxTimeToRun(r int64) readOption {
return func(cfg *debeziumConfig) {
cfg.readSchema.MaxTimeToRun = &r
}
}

// ConnectionProperties specifies properties of the debezium connection passed as
// a string with format [propertyName=property;]*
func ConnectionProperties(cp []string) readOption {
Expand Down
3 changes: 2 additions & 1 deletion sdks/go/test/integration/io/xlang/debezium/debezium.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@ func ReadPipeline(addr, username, password, dbname, host, port string, connector
p, s := beam.NewPipelineWithRoot()
result := debeziumio.Read(s.Scope("Read from debezium"), username, password, host, port,
connectorClass, reflectx.String, debeziumio.MaxRecord(maxrecords),
debeziumio.MaxTimeToRun(120000),
debeziumio.ConnectionProperties(connectionProperties), debeziumio.ExpansionAddr(addr))
expectedJson := `{"metadata":{"connector":"postgresql","version":"1.3.1.Final","name":"dbserver1","database":"inventory","schema":"inventory","table":"customers"},"before":null,"after":{"fields":{"last_name":"Thomas","id":1001,"first_name":"Sally","email":"sally.thomas@acme.com"}}}`
expectedJson := `{"metadata":{"connector":"postgresql","version":"3.1.3.Final","name":"beam-debezium-connector","database":"inventory","schema":"inventory","table":"customers"},"before":null,"after":{"fields":{"last_name":"Thomas","id":1001,"first_name":"Sally","email":"sally.thomas@acme.com"}}}`
expected := beam.Create(s, expectedJson)
passert.Equals(s, result, expected)
return p
Expand Down
3 changes: 1 addition & 2 deletions sdks/go/test/integration/io/xlang/debezium/debezium_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (
)

const (
debeziumImage = "quay.io/debezium/example-postgres:latest"
debeziumImage = "quay.io/debezium/example-postgres:3.1.3.Final"
debeziumPort = "5432/tcp"
maxRetries = 5
)
Expand Down Expand Up @@ -82,7 +82,6 @@ func TestDebeziumIO_BasicRead(t *testing.T) {
connectionProperties := []string{
"database.dbname=inventory",
"database.server.name=dbserver1",
"database.include.list=inventory",
"include.schema.changes=false",
}
read := ReadPipeline(expansionAddr, username, password, dbname, host, port, debeziumio.PostgreSQL, 1, connectionProperties)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ public static class ReadBuilder
public static class Configuration extends CrossLanguageConfiguration {
private @Nullable List<String> connectionProperties;
private @Nullable Long maxNumberOfRecords;
private @Nullable Long maxTimeToRun;

public void setConnectionProperties(@Nullable List<String> connectionProperties) {
this.connectionProperties = connectionProperties;
Expand All @@ -85,6 +86,10 @@ public void setConnectionProperties(@Nullable List<String> connectionProperties)
public void setMaxNumberOfRecords(@Nullable Long maxNumberOfRecords) {
this.maxNumberOfRecords = maxNumberOfRecords;
}

public void setMaxTimeToRun(@Nullable Long maxTimeToRun) {
this.maxTimeToRun = maxTimeToRun;
}
}

@Override
Expand Down Expand Up @@ -114,6 +119,10 @@ public PTransform<PBegin, PCollection<String>> buildExternal(Configuration confi
readTransform.withMaxNumberOfRecords(configuration.maxNumberOfRecords.intValue());
}

if (configuration.maxTimeToRun != null) {
readTransform = readTransform.withMaxTimeToRun(configuration.maxTimeToRun);
}

return readTransform;
}
}
Expand Down
Loading