diff --git a/README.md b/README.md
new file mode 100644
index 0000000..d08364f
--- /dev/null
+++ b/README.md
@@ -0,0 +1,31 @@
+
+## Build jar
+Install java, maven.
+
+Then run:
+```bash
+mvn clean install
+```
+
+## Run jar
+
+First authentication to get ADC:
+```bash
+gcloud auth application-default login
+```
+
+Read data with Avro:
+```bash
+mvn compile exec:java \
+ -Dexec.mainClass="BigQueryStorageSampler" \
+ -Dexec.args="--parent 'bigquery-user-bensea' --table 'bigquery-user-bensea.bensea_readapi.users' --format 'Avro'"
+```
+
+Read data with Arrow:
+```bash
+export JDK_JAVA_OPTIONS="--add-opens=java.base/java.nio=org.apache.arrow.memory.core,ALL-UNNAMED"
+
+mvn compile exec:java \
+ -Dexec.mainClass="BigQueryStorageSampler" \
+ -Dexec.args="--parent 'bigquery-user-bensea' --table 'bigquery-user-bensea.bensea_readapi.users' --format 'Arrow'"
+```
diff --git a/pom.xml b/pom.xml
index 1220c8c..48449ce 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1,7 +1,6 @@
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
4.0.0
Samples
@@ -35,6 +34,22 @@
+
+
+
+ central
+ Maven Central remote repository
+ artifactregistry://us-maven.pkg.dev/artifact-foundry-prod/maven-3p-trusted
+ default
+
+ true
+
+
+ true
+
+
+
+
@@ -48,7 +63,27 @@
commons-cli
1.5.0
-
+
+ org.apache.avro
+ avro
+ 1.11.5
+
+
+
+ org.apache.arrow
+ arrow-memory-core
+ 18.3.0
+
+
+ org.apache.arrow
+ arrow-memory-netty
+ 18.3.0
+
+
+
+ org.apache.arrow
+ arrow-vector
+ 18.3.0
+
-
diff --git a/src/main/java/BigQueryStorageSampler.java b/src/main/java/BigQueryStorageSampler.java
index fdf5890..e86443b 100644
--- a/src/main/java/BigQueryStorageSampler.java
+++ b/src/main/java/BigQueryStorageSampler.java
@@ -11,20 +11,32 @@
import com.google.common.base.Stopwatch;
import io.grpc.ManagedChannelBuilder;
import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder;
-
+import java.io.ByteArrayInputStream;
import java.io.IOException;
+import java.nio.channels.Channels;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.VectorLoader;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.ipc.ArrowStreamReader;
+import org.apache.arrow.vector.ipc.ReadChannel;
+import org.apache.arrow.vector.ipc.message.MessageSerializer;
+import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.DecoderFactory;
public class BigQueryStorageSampler {
private static class TableReference {
- static Pattern tableReferencePattern =
- Pattern.compile("([\\w\\-]+)[\\:\\.]([^\\:\\.]+)\\.([^\\:\\.]+)");
+ static Pattern tableReferencePattern = Pattern.compile("([\\w\\-]+)[\\:\\.]([^\\:\\.]+)\\.([^\\:\\.]+)");
static TableReference parseFromString(String table) {
Matcher m = tableReferencePattern.matcher(table);
@@ -102,6 +114,12 @@ static class ReaderThread extends Thread {
long numTotalResponseBytes = 0;
long numTotalResponseRows = 0;
+ private Schema avroSchema;
+ private GenericDatumReader datumReader;
+
+ private VectorSchemaRoot arrowRoot;
+ private RootAllocator rootAllocator = new RootAllocator(Long.MAX_VALUE);
+
public ReaderThread(ReadStream readStream, BigQueryReadClient client) {
this.readStream = readStream;
this.client = client;
@@ -112,12 +130,16 @@ public void run() {
readRows();
} catch (Exception e) {
System.err.println("Caught exception while calling ReadRows: " + e);
+ } finally {
+ if (arrowRoot != null) {
+ arrowRoot.close();
+ }
+ rootAllocator.close();
}
}
private void readRows() throws Exception {
- ReadRowsRequest readRowsRequest =
- ReadRowsRequest.newBuilder().setReadStream(readStream.getName()).build();
+ ReadRowsRequest readRowsRequest = ReadRowsRequest.newBuilder().setReadStream(readStream.getName()).build();
Stopwatch stopwatch = Stopwatch.createStarted();
for (ReadRowsResponse response : client.readRowsCallable().call(readRowsRequest)) {
numResponses++;
@@ -125,9 +147,54 @@ private void readRows() throws Exception {
numResponseRows += response.getRowCount();
printPeriodicUpdate(stopwatch.elapsed(TimeUnit.MICROSECONDS));
- // This is just a simple end-to-end throughput test, so we don't decode the Avro record
- // block or Arrow record batch here. This may well have an impact on throughput in a
- // normal use case!
+ // Initialize data schema.
+ // The schema is only sent in the first response.
+ if (response.hasAvroSchema()) {
+ if (avroSchema == null) {
+ avroSchema = new Schema.Parser().parse(response.getAvroSchema().getSchema());
+ datumReader = new GenericDatumReader<>(avroSchema);
+ }
+ } else if (response.hasArrowSchema()) {
+ if (arrowRoot == null) {
+ ByteArrayInputStream schemaStream = new ByteArrayInputStream(
+ response.getArrowSchema().getSerializedSchema().toByteArray());
+ try (ArrowStreamReader reader = new ArrowStreamReader(schemaStream, rootAllocator)) {
+ arrowRoot = reader.getVectorSchemaRoot();
+ }
+ }
+ }
+
+ if (datumReader != null && response.hasAvroRows()) {
+ BinaryDecoder decoder = DecoderFactory.get()
+ .binaryDecoder(
+ response.getAvroRows().getSerializedBinaryRows().toByteArray(), null);
+ while (!decoder.isEnd()) {
+ GenericRecord record = datumReader.read(null, decoder);
+ System.out.println(record.toString());
+ }
+ } else if (arrowRoot != null && response.hasArrowRecordBatch()) {
+ byte[] batchBytes = response.getArrowRecordBatch().getSerializedRecordBatch().toByteArray();
+ try (final ArrowRecordBatch batch = MessageSerializer.deserializeRecordBatch(
+ new ReadChannel(Channels.newChannel(new ByteArrayInputStream(batchBytes))),
+ rootAllocator)) {
+ VectorLoader loader = new VectorLoader(arrowRoot);
+ loader.load(batch);
+ for (int i = 0; i < arrowRoot.getRowCount(); i++) {
+ StringBuilder sb = new StringBuilder();
+ sb.append("{");
+ for (int j = 0; j < arrowRoot.getFieldVectors().size(); j++) {
+ if (j > 0) {
+ sb.append(", ");
+ }
+ sb.append(arrowRoot.getSchema().getFields().get(j).getName());
+ sb.append(": ");
+ sb.append(arrowRoot.getFieldVectors().get(j).getObject(i));
+ }
+ sb.append("}");
+ System.out.println(sb.toString());
+ }
+ }
+ }
}
stopwatch.stop();
@@ -172,11 +239,11 @@ private static BigQueryReadClient getClient(BigQueryStorageSamplerOptions option
options.getChannelPoolOptions().getInitialChannelCount().ifPresent(
channelPoolSettingsBuilder::setInitialChannelCount);
- // TODO(kmj): Can we fetch this directly from the stub settings rather than copying it?
- InstantiatingGrpcChannelProvider.Builder channelProviderBuilder =
- InstantiatingGrpcChannelProvider.newBuilder()
- .setMaxInboundMessageSize(Integer.MAX_VALUE)
- .setChannelPoolSettings(channelPoolSettingsBuilder.build());
+ // TODO(kmj): Can we fetch this directly from the stub settings rather than
+ // copying it?
+ InstantiatingGrpcChannelProvider.Builder channelProviderBuilder = InstantiatingGrpcChannelProvider.newBuilder()
+ .setMaxInboundMessageSize(Integer.MAX_VALUE)
+ .setChannelPoolSettings(channelPoolSettingsBuilder.build());
// Note: this setter method overrides the channel pool settings specified above.
options.getChannelsPerCpu().ifPresent(channelProviderBuilder::setChannelsPerCpu);
@@ -192,9 +259,8 @@ private static BigQueryReadClient getClient(BigQueryStorageSamplerOptions option
}
}));
- BigQueryReadSettings.Builder settingsBuilder =
- BigQueryReadSettings.newBuilder()
- .setTransportChannelProvider(channelProviderBuilder.build());
+ BigQueryReadSettings.Builder settingsBuilder = BigQueryReadSettings.newBuilder()
+ .setTransportChannelProvider(channelProviderBuilder.build());
options.getEndpoint().ifPresent(settingsBuilder::setEndpoint);
options.getExecutorThreadCount().ifPresent(s -> settingsBuilder.setBackgroundExecutorProvider(
@@ -210,18 +276,17 @@ public static void main(String[] args) throws Exception {
System.out.println("Table: " + options.getTable());
System.out.println("Data format: " + options.getFormat());
- CreateReadSessionRequest createReadSessionRequest =
- CreateReadSessionRequest.newBuilder()
- .setParent(
- ProjectReference.parseFromString(options.getParent()).toResourceName())
- .setReadSession(
- ReadSession.newBuilder()
- .setTable(
- TableReference.parseFromString(options.getTable()).toResourceName())
- .setDataFormat(
- DataFormat.parseFromString(options.getFormat()).toProto()))
- .setMaxStreamCount(options.getMaxStreams())
- .build();
+ CreateReadSessionRequest createReadSessionRequest = CreateReadSessionRequest.newBuilder()
+ .setParent(
+ ProjectReference.parseFromString(options.getParent()).toResourceName())
+ .setReadSession(
+ ReadSession.newBuilder()
+ .setTable(
+ TableReference.parseFromString(options.getTable()).toResourceName())
+ .setDataFormat(
+ DataFormat.parseFromString(options.getFormat()).toProto()))
+ .setMaxStreamCount(options.getMaxStreams())
+ .build();
ReadSession readSession;
long elapsedMillis;