diff --git a/sdks/go/pkg/beam/io/xlang/debeziumio/debezium.go b/sdks/go/pkg/beam/io/xlang/debeziumio/debezium.go index eb43fd741752..547aba0ceb99 100644 --- a/sdks/go/pkg/beam/io/xlang/debeziumio/debezium.go +++ b/sdks/go/pkg/beam/io/xlang/debeziumio/debezium.go @@ -75,6 +75,7 @@ type readFromDebeziumSchema struct { Host string Port string MaxNumberOfRecords *int64 + MaxTimeToRun *int64 ConnectionProperties []string } @@ -133,6 +134,13 @@ func MaxRecord(r int64) readOption { } } +// MaxTimeToRun specifies maximum number of milliseconds to run before stop. +func MaxTimeToRun(r int64) readOption { + return func(cfg *debeziumConfig) { + cfg.readSchema.MaxTimeToRun = &r + } +} + // ConnectionProperties specifies properties of the debezium connection passed as // a string with format [propertyName=property;]* func ConnectionProperties(cp []string) readOption { diff --git a/sdks/go/test/integration/io/xlang/debezium/debezium.go b/sdks/go/test/integration/io/xlang/debezium/debezium.go index 26e4d974abf4..e1b9bab963c3 100644 --- a/sdks/go/test/integration/io/xlang/debezium/debezium.go +++ b/sdks/go/test/integration/io/xlang/debezium/debezium.go @@ -29,8 +29,9 @@ func ReadPipeline(addr, username, password, dbname, host, port string, connector p, s := beam.NewPipelineWithRoot() result := debeziumio.Read(s.Scope("Read from debezium"), username, password, host, port, connectorClass, reflectx.String, debeziumio.MaxRecord(maxrecords), + debeziumio.MaxTimeToRun(120000), debeziumio.ConnectionProperties(connectionProperties), debeziumio.ExpansionAddr(addr)) - expectedJson := `{"metadata":{"connector":"postgresql","version":"1.3.1.Final","name":"dbserver1","database":"inventory","schema":"inventory","table":"customers"},"before":null,"after":{"fields":{"last_name":"Thomas","id":1001,"first_name":"Sally","email":"sally.thomas@acme.com"}}}` + expectedJson := `{"metadata":{"connector":"postgresql","version":"3.1.3.Final","name":"beam-debezium-connector","database":"inventory","schema":"inventory","table":"customers"},"before":null,"after":{"fields":{"last_name":"Thomas","id":1001,"first_name":"Sally","email":"sally.thomas@acme.com"}}}` expected := beam.Create(s, expectedJson) passert.Equals(s, result, expected) return p diff --git a/sdks/go/test/integration/io/xlang/debezium/debezium_test.go b/sdks/go/test/integration/io/xlang/debezium/debezium_test.go index 208a062f9436..a4850d4a3a33 100644 --- a/sdks/go/test/integration/io/xlang/debezium/debezium_test.go +++ b/sdks/go/test/integration/io/xlang/debezium/debezium_test.go @@ -34,7 +34,7 @@ import ( ) const ( - debeziumImage = "quay.io/debezium/example-postgres:latest" + debeziumImage = "quay.io/debezium/example-postgres:3.1.3.Final" debeziumPort = "5432/tcp" maxRetries = 5 ) @@ -82,7 +82,6 @@ func TestDebeziumIO_BasicRead(t *testing.T) { connectionProperties := []string{ "database.dbname=inventory", "database.server.name=dbserver1", - "database.include.list=inventory", "include.schema.changes=false", } read := ReadPipeline(expansionAddr, username, password, dbname, host, port, debeziumio.PostgreSQL, 1, connectionProperties) diff --git a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumTransformRegistrar.java b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumTransformRegistrar.java index eb6732180b08..22a34ae2654b 100644 --- a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumTransformRegistrar.java +++ b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumTransformRegistrar.java @@ -77,6 +77,7 @@ public static class ReadBuilder public static class Configuration extends CrossLanguageConfiguration { private @Nullable List connectionProperties; private @Nullable Long maxNumberOfRecords; + private @Nullable Long maxTimeToRun; public void setConnectionProperties(@Nullable List connectionProperties) { this.connectionProperties = connectionProperties; @@ -85,6 +86,10 @@ public void setConnectionProperties(@Nullable List connectionProperties) public void setMaxNumberOfRecords(@Nullable Long maxNumberOfRecords) { this.maxNumberOfRecords = maxNumberOfRecords; } + + public void setMaxTimeToRun(@Nullable Long maxTimeToRun) { + this.maxTimeToRun = maxTimeToRun; + } } @Override @@ -114,6 +119,10 @@ public PTransform> buildExternal(Configuration confi readTransform.withMaxNumberOfRecords(configuration.maxNumberOfRecords.intValue()); } + if (configuration.maxTimeToRun != null) { + readTransform = readTransform.withMaxTimeToRun(configuration.maxTimeToRun); + } + return readTransform; } }