From d4610df1b4e9bf680af5d33f8f6db506acd48003 Mon Sep 17 00:00:00 2001 From: Ben Hu Date: Mon, 26 Jan 2026 10:02:16 -0800 Subject: [PATCH 1/4] Add build and run instructions --- README.md | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) create mode 100644 README.md diff --git a/README.md b/README.md new file mode 100644 index 0000000..bc12ee1 --- /dev/null +++ b/README.md @@ -0,0 +1,22 @@ + +## Build jar +Install java, maven. + +Then run: +```bash +mvn clean install +``` + +## Run jar + +First authentication to get ADC: +```bash +gcloud auth application-default login +``` + +Run main function: +```bash +mvn compile exec:java \ + -Dexec.mainClass="BigQueryStorageSampler" \ + -Dexec.args="--parent 'bigquery-user-bensea' --table 'bigquery-user-bensea.bensea_readapi.users' --format 'Avro'" +``` From 1d0c394f46542af69ae082203316d8e49e95ab2f Mon Sep 17 00:00:00 2001 From: Ben Hu Date: Mon, 26 Jan 2026 11:48:37 -0800 Subject: [PATCH 2/4] Add printing for Avro data format --- pom.xml | 26 ++++++-- src/main/java/BigQueryStorageSampler.java | 73 ++++++++++++++--------- 2 files changed, 67 insertions(+), 32 deletions(-) diff --git a/pom.xml b/pom.xml index 1220c8c..262642d 100644 --- a/pom.xml +++ b/pom.xml @@ -1,7 +1,6 @@ + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 4.0.0 Samples @@ -35,6 +34,22 @@ + + + + central + Maven Central remote repository + artifactregistry://us-maven.pkg.dev/artifact-foundry-prod/maven-3p-trusted + default + + true + + + true + + + + @@ -48,7 +63,10 @@ commons-cli 1.5.0 - + + org.apache.avro + avro + 1.11.5 + - diff --git a/src/main/java/BigQueryStorageSampler.java b/src/main/java/BigQueryStorageSampler.java index fdf5890..9425a2f 100644 --- a/src/main/java/BigQueryStorageSampler.java +++ b/src/main/java/BigQueryStorageSampler.java @@ -11,20 +11,23 @@ import com.google.common.base.Stopwatch; import io.grpc.ManagedChannelBuilder; import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder; - import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; import java.util.regex.Matcher; import java.util.regex.Pattern; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.BinaryDecoder; +import org.apache.avro.io.DecoderFactory; public class BigQueryStorageSampler { private static class TableReference { - static Pattern tableReferencePattern = - Pattern.compile("([\\w\\-]+)[\\:\\.]([^\\:\\.]+)\\.([^\\:\\.]+)"); + static Pattern tableReferencePattern = Pattern.compile("([\\w\\-]+)[\\:\\.]([^\\:\\.]+)\\.([^\\:\\.]+)"); static TableReference parseFromString(String table) { Matcher m = tableReferencePattern.matcher(table); @@ -102,6 +105,9 @@ static class ReaderThread extends Thread { long numTotalResponseBytes = 0; long numTotalResponseRows = 0; + private Schema avroSchema; + private GenericDatumReader datumReader; + public ReaderThread(ReadStream readStream, BigQueryReadClient client) { this.readStream = readStream; this.client = client; @@ -116,8 +122,7 @@ public void run() { } private void readRows() throws Exception { - ReadRowsRequest readRowsRequest = - ReadRowsRequest.newBuilder().setReadStream(readStream.getName()).build(); + ReadRowsRequest readRowsRequest = ReadRowsRequest.newBuilder().setReadStream(readStream.getName()).build(); Stopwatch stopwatch = Stopwatch.createStarted(); for (ReadRowsResponse response : client.readRowsCallable().call(readRowsRequest)) { numResponses++; @@ -125,9 +130,23 @@ private void readRows() throws Exception { numResponseRows += response.getRowCount(); printPeriodicUpdate(stopwatch.elapsed(TimeUnit.MICROSECONDS)); - // This is just a simple end-to-end throughput test, so we don't decode the Avro record - // block or Arrow record batch here. This may well have an impact on throughput in a - // normal use case! + // The schema is only sent in the first response. + if (response.hasAvroSchema()) { + if (avroSchema == null) { + avroSchema = new Schema.Parser().parse(response.getAvroSchema().getSchema()); + datumReader = new GenericDatumReader<>(avroSchema); + } + } + + if (datumReader != null && response.hasAvroRows()) { + BinaryDecoder decoder = DecoderFactory.get() + .binaryDecoder( + response.getAvroRows().getSerializedBinaryRows().toByteArray(), null); + while (!decoder.isEnd()) { + GenericRecord record = datumReader.read(null, decoder); + System.out.println(record.toString()); + } + } } stopwatch.stop(); @@ -172,11 +191,11 @@ private static BigQueryReadClient getClient(BigQueryStorageSamplerOptions option options.getChannelPoolOptions().getInitialChannelCount().ifPresent( channelPoolSettingsBuilder::setInitialChannelCount); - // TODO(kmj): Can we fetch this directly from the stub settings rather than copying it? - InstantiatingGrpcChannelProvider.Builder channelProviderBuilder = - InstantiatingGrpcChannelProvider.newBuilder() - .setMaxInboundMessageSize(Integer.MAX_VALUE) - .setChannelPoolSettings(channelPoolSettingsBuilder.build()); + // TODO(kmj): Can we fetch this directly from the stub settings rather than + // copying it? + InstantiatingGrpcChannelProvider.Builder channelProviderBuilder = InstantiatingGrpcChannelProvider.newBuilder() + .setMaxInboundMessageSize(Integer.MAX_VALUE) + .setChannelPoolSettings(channelPoolSettingsBuilder.build()); // Note: this setter method overrides the channel pool settings specified above. options.getChannelsPerCpu().ifPresent(channelProviderBuilder::setChannelsPerCpu); @@ -192,9 +211,8 @@ private static BigQueryReadClient getClient(BigQueryStorageSamplerOptions option } })); - BigQueryReadSettings.Builder settingsBuilder = - BigQueryReadSettings.newBuilder() - .setTransportChannelProvider(channelProviderBuilder.build()); + BigQueryReadSettings.Builder settingsBuilder = BigQueryReadSettings.newBuilder() + .setTransportChannelProvider(channelProviderBuilder.build()); options.getEndpoint().ifPresent(settingsBuilder::setEndpoint); options.getExecutorThreadCount().ifPresent(s -> settingsBuilder.setBackgroundExecutorProvider( @@ -210,18 +228,17 @@ public static void main(String[] args) throws Exception { System.out.println("Table: " + options.getTable()); System.out.println("Data format: " + options.getFormat()); - CreateReadSessionRequest createReadSessionRequest = - CreateReadSessionRequest.newBuilder() - .setParent( - ProjectReference.parseFromString(options.getParent()).toResourceName()) - .setReadSession( - ReadSession.newBuilder() - .setTable( - TableReference.parseFromString(options.getTable()).toResourceName()) - .setDataFormat( - DataFormat.parseFromString(options.getFormat()).toProto())) - .setMaxStreamCount(options.getMaxStreams()) - .build(); + CreateReadSessionRequest createReadSessionRequest = CreateReadSessionRequest.newBuilder() + .setParent( + ProjectReference.parseFromString(options.getParent()).toResourceName()) + .setReadSession( + ReadSession.newBuilder() + .setTable( + TableReference.parseFromString(options.getTable()).toResourceName()) + .setDataFormat( + DataFormat.parseFromString(options.getFormat()).toProto())) + .setMaxStreamCount(options.getMaxStreams()) + .build(); ReadSession readSession; long elapsedMillis; From fd016a76c8276e3217a323d79604011fde0d3c0c Mon Sep 17 00:00:00 2001 From: Ben Hu Date: Mon, 26 Jan 2026 11:49:01 -0800 Subject: [PATCH 3/4] Add printing for Avro data format --- src/main/java/BigQueryStorageSampler.java | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/src/main/java/BigQueryStorageSampler.java b/src/main/java/BigQueryStorageSampler.java index 9425a2f..74f5efd 100644 --- a/src/main/java/BigQueryStorageSampler.java +++ b/src/main/java/BigQueryStorageSampler.java @@ -130,21 +130,21 @@ private void readRows() throws Exception { numResponseRows += response.getRowCount(); printPeriodicUpdate(stopwatch.elapsed(TimeUnit.MICROSECONDS)); - // The schema is only sent in the first response. if (response.hasAvroSchema()) { + // The schema is only sent in the first response. if (avroSchema == null) { avroSchema = new Schema.Parser().parse(response.getAvroSchema().getSchema()); datumReader = new GenericDatumReader<>(avroSchema); } - } - if (datumReader != null && response.hasAvroRows()) { - BinaryDecoder decoder = DecoderFactory.get() - .binaryDecoder( - response.getAvroRows().getSerializedBinaryRows().toByteArray(), null); - while (!decoder.isEnd()) { - GenericRecord record = datumReader.read(null, decoder); - System.out.println(record.toString()); + if (datumReader != null && response.hasAvroRows()) { + BinaryDecoder decoder = DecoderFactory.get() + .binaryDecoder( + response.getAvroRows().getSerializedBinaryRows().toByteArray(), null); + while (!decoder.isEnd()) { + GenericRecord record = datumReader.read(null, decoder); + System.out.println(record.toString()); + } } } } From 2a30b3b86fe77771a9ea12723255fcbe80488b95 Mon Sep 17 00:00:00 2001 From: Ben Hu Date: Mon, 26 Jan 2026 12:20:56 -0800 Subject: [PATCH 4/4] Support Arrow format --- README.md | 11 +++- pom.xml | 17 ++++++ src/main/java/BigQueryStorageSampler.java | 64 ++++++++++++++++++++--- 3 files changed, 83 insertions(+), 9 deletions(-) diff --git a/README.md b/README.md index bc12ee1..d08364f 100644 --- a/README.md +++ b/README.md @@ -14,9 +14,18 @@ First authentication to get ADC: gcloud auth application-default login ``` -Run main function: +Read data with Avro: ```bash mvn compile exec:java \ -Dexec.mainClass="BigQueryStorageSampler" \ -Dexec.args="--parent 'bigquery-user-bensea' --table 'bigquery-user-bensea.bensea_readapi.users' --format 'Avro'" ``` + +Read data with Arrow: +```bash +export JDK_JAVA_OPTIONS="--add-opens=java.base/java.nio=org.apache.arrow.memory.core,ALL-UNNAMED" + +mvn compile exec:java \ + -Dexec.mainClass="BigQueryStorageSampler" \ + -Dexec.args="--parent 'bigquery-user-bensea' --table 'bigquery-user-bensea.bensea_readapi.users' --format 'Arrow'" +``` diff --git a/pom.xml b/pom.xml index 262642d..48449ce 100644 --- a/pom.xml +++ b/pom.xml @@ -68,5 +68,22 @@ avro 1.11.5 + + + org.apache.arrow + arrow-memory-core + 18.3.0 + + + org.apache.arrow + arrow-memory-netty + 18.3.0 + + + + org.apache.arrow + arrow-vector + 18.3.0 + diff --git a/src/main/java/BigQueryStorageSampler.java b/src/main/java/BigQueryStorageSampler.java index 74f5efd..e86443b 100644 --- a/src/main/java/BigQueryStorageSampler.java +++ b/src/main/java/BigQueryStorageSampler.java @@ -11,12 +11,21 @@ import com.google.common.base.Stopwatch; import io.grpc.ManagedChannelBuilder; import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder; +import java.io.ByteArrayInputStream; import java.io.IOException; +import java.nio.channels.Channels; import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; import java.util.regex.Matcher; import java.util.regex.Pattern; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.VectorLoader; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.ipc.ArrowStreamReader; +import org.apache.arrow.vector.ipc.ReadChannel; +import org.apache.arrow.vector.ipc.message.MessageSerializer; +import org.apache.arrow.vector.ipc.message.ArrowRecordBatch; import org.apache.avro.Schema; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericRecord; @@ -108,6 +117,9 @@ static class ReaderThread extends Thread { private Schema avroSchema; private GenericDatumReader datumReader; + private VectorSchemaRoot arrowRoot; + private RootAllocator rootAllocator = new RootAllocator(Long.MAX_VALUE); + public ReaderThread(ReadStream readStream, BigQueryReadClient client) { this.readStream = readStream; this.client = client; @@ -118,6 +130,11 @@ public void run() { readRows(); } catch (Exception e) { System.err.println("Caught exception while calling ReadRows: " + e); + } finally { + if (arrowRoot != null) { + arrowRoot.close(); + } + rootAllocator.close(); } } @@ -130,20 +147,51 @@ private void readRows() throws Exception { numResponseRows += response.getRowCount(); printPeriodicUpdate(stopwatch.elapsed(TimeUnit.MICROSECONDS)); + // Initialize data schema. + // The schema is only sent in the first response. if (response.hasAvroSchema()) { - // The schema is only sent in the first response. if (avroSchema == null) { avroSchema = new Schema.Parser().parse(response.getAvroSchema().getSchema()); datumReader = new GenericDatumReader<>(avroSchema); } + } else if (response.hasArrowSchema()) { + if (arrowRoot == null) { + ByteArrayInputStream schemaStream = new ByteArrayInputStream( + response.getArrowSchema().getSerializedSchema().toByteArray()); + try (ArrowStreamReader reader = new ArrowStreamReader(schemaStream, rootAllocator)) { + arrowRoot = reader.getVectorSchemaRoot(); + } + } + } - if (datumReader != null && response.hasAvroRows()) { - BinaryDecoder decoder = DecoderFactory.get() - .binaryDecoder( - response.getAvroRows().getSerializedBinaryRows().toByteArray(), null); - while (!decoder.isEnd()) { - GenericRecord record = datumReader.read(null, decoder); - System.out.println(record.toString()); + if (datumReader != null && response.hasAvroRows()) { + BinaryDecoder decoder = DecoderFactory.get() + .binaryDecoder( + response.getAvroRows().getSerializedBinaryRows().toByteArray(), null); + while (!decoder.isEnd()) { + GenericRecord record = datumReader.read(null, decoder); + System.out.println(record.toString()); + } + } else if (arrowRoot != null && response.hasArrowRecordBatch()) { + byte[] batchBytes = response.getArrowRecordBatch().getSerializedRecordBatch().toByteArray(); + try (final ArrowRecordBatch batch = MessageSerializer.deserializeRecordBatch( + new ReadChannel(Channels.newChannel(new ByteArrayInputStream(batchBytes))), + rootAllocator)) { + VectorLoader loader = new VectorLoader(arrowRoot); + loader.load(batch); + for (int i = 0; i < arrowRoot.getRowCount(); i++) { + StringBuilder sb = new StringBuilder(); + sb.append("{"); + for (int j = 0; j < arrowRoot.getFieldVectors().size(); j++) { + if (j > 0) { + sb.append(", "); + } + sb.append(arrowRoot.getSchema().getFields().get(j).getName()); + sb.append(": "); + sb.append(arrowRoot.getFieldVectors().get(j).getObject(i)); + } + sb.append("}"); + System.out.println(sb.toString()); } } }