Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@

## Build jar
Install java, maven.

Then run:
```bash
mvn clean install
```

## Run jar

First authentication to get ADC:
```bash
gcloud auth application-default login
```

Read data with Avro:
```bash
mvn compile exec:java \
-Dexec.mainClass="BigQueryStorageSampler" \
-Dexec.args="--parent 'bigquery-user-bensea' --table 'bigquery-user-bensea.bensea_readapi.users' --format 'Avro'"
```

Read data with Arrow:
```bash
export JDK_JAVA_OPTIONS="--add-opens=java.base/java.nio=org.apache.arrow.memory.core,ALL-UNNAMED"

mvn compile exec:java \
-Dexec.mainClass="BigQueryStorageSampler" \
-Dexec.args="--parent 'bigquery-user-bensea' --table 'bigquery-user-bensea.bensea_readapi.users' --format 'Arrow'"
```
43 changes: 39 additions & 4 deletions pom.xml
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>Samples</groupId>
Expand Down Expand Up @@ -35,6 +34,22 @@
</plugins>
</build>

<!-- Used to build on Corp/CloudTop -->
<repositories>
<repository>
<id>central</id>
<name>Maven Central remote repository</name>
<url>artifactregistry://us-maven.pkg.dev/artifact-foundry-prod/maven-3p-trusted</url>
<layout>default</layout>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>

<dependencies>
<!-- https://mvnrepository.com/artifact/com.google.cloud/google-cloud-bigquerystorage -->
<dependency>
Expand All @@ -48,7 +63,27 @@
<artifactId>commons-cli</artifactId>
<version>1.5.0</version>
</dependency>

<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.11.5</version>
</dependency>
<!-- Source: https://mvnrepository.com/artifact/org.apache.arrow/arrow-memory -->
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-memory-core</artifactId>
<version>18.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-memory-netty</artifactId>
<version>18.3.0</version>
</dependency>
<!-- Source: https://mvnrepository.com/artifact/org.apache.arrow/arrow-vector -->
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-vector</artifactId>
<version>18.3.0</version>
</dependency>
</dependencies>

</project>
121 changes: 93 additions & 28 deletions src/main/java/BigQueryStorageSampler.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,32 @@
import com.google.common.base.Stopwatch;
import io.grpc.ManagedChannelBuilder;
import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.channels.Channels;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.VectorLoader;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.ipc.ArrowStreamReader;
import org.apache.arrow.vector.ipc.ReadChannel;
import org.apache.arrow.vector.ipc.message.MessageSerializer;
import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DecoderFactory;

public class BigQueryStorageSampler {

private static class TableReference {

static Pattern tableReferencePattern =
Pattern.compile("([\\w\\-]+)[\\:\\.]([^\\:\\.]+)\\.([^\\:\\.]+)");
static Pattern tableReferencePattern = Pattern.compile("([\\w\\-]+)[\\:\\.]([^\\:\\.]+)\\.([^\\:\\.]+)");

static TableReference parseFromString(String table) {
Matcher m = tableReferencePattern.matcher(table);
Expand Down Expand Up @@ -102,6 +114,12 @@ static class ReaderThread extends Thread {
long numTotalResponseBytes = 0;
long numTotalResponseRows = 0;

private Schema avroSchema;
private GenericDatumReader<GenericRecord> datumReader;

private VectorSchemaRoot arrowRoot;
private RootAllocator rootAllocator = new RootAllocator(Long.MAX_VALUE);

public ReaderThread(ReadStream readStream, BigQueryReadClient client) {
this.readStream = readStream;
this.client = client;
Expand All @@ -112,22 +130,71 @@ public void run() {
readRows();
} catch (Exception e) {
System.err.println("Caught exception while calling ReadRows: " + e);
} finally {
if (arrowRoot != null) {
arrowRoot.close();
}
rootAllocator.close();
}
}

private void readRows() throws Exception {
ReadRowsRequest readRowsRequest =
ReadRowsRequest.newBuilder().setReadStream(readStream.getName()).build();
ReadRowsRequest readRowsRequest = ReadRowsRequest.newBuilder().setReadStream(readStream.getName()).build();
Stopwatch stopwatch = Stopwatch.createStarted();
for (ReadRowsResponse response : client.readRowsCallable().call(readRowsRequest)) {
numResponses++;
numResponseBytes += response.getSerializedSize();
numResponseRows += response.getRowCount();
printPeriodicUpdate(stopwatch.elapsed(TimeUnit.MICROSECONDS));

// This is just a simple end-to-end throughput test, so we don't decode the Avro record
// block or Arrow record batch here. This may well have an impact on throughput in a
// normal use case!
// Initialize data schema.
// The schema is only sent in the first response.
if (response.hasAvroSchema()) {
if (avroSchema == null) {
avroSchema = new Schema.Parser().parse(response.getAvroSchema().getSchema());
datumReader = new GenericDatumReader<>(avroSchema);
}
} else if (response.hasArrowSchema()) {
if (arrowRoot == null) {
ByteArrayInputStream schemaStream = new ByteArrayInputStream(
response.getArrowSchema().getSerializedSchema().toByteArray());
try (ArrowStreamReader reader = new ArrowStreamReader(schemaStream, rootAllocator)) {
arrowRoot = reader.getVectorSchemaRoot();
}
}
}

if (datumReader != null && response.hasAvroRows()) {
BinaryDecoder decoder = DecoderFactory.get()
.binaryDecoder(
response.getAvroRows().getSerializedBinaryRows().toByteArray(), null);
while (!decoder.isEnd()) {
GenericRecord record = datumReader.read(null, decoder);
System.out.println(record.toString());
}
} else if (arrowRoot != null && response.hasArrowRecordBatch()) {
byte[] batchBytes = response.getArrowRecordBatch().getSerializedRecordBatch().toByteArray();
try (final ArrowRecordBatch batch = MessageSerializer.deserializeRecordBatch(
new ReadChannel(Channels.newChannel(new ByteArrayInputStream(batchBytes))),
rootAllocator)) {
VectorLoader loader = new VectorLoader(arrowRoot);
loader.load(batch);
for (int i = 0; i < arrowRoot.getRowCount(); i++) {
StringBuilder sb = new StringBuilder();
sb.append("{");
for (int j = 0; j < arrowRoot.getFieldVectors().size(); j++) {
if (j > 0) {
sb.append(", ");
}
sb.append(arrowRoot.getSchema().getFields().get(j).getName());
sb.append(": ");
sb.append(arrowRoot.getFieldVectors().get(j).getObject(i));
}
sb.append("}");
System.out.println(sb.toString());
}
}
}
}

stopwatch.stop();
Expand Down Expand Up @@ -172,11 +239,11 @@ private static BigQueryReadClient getClient(BigQueryStorageSamplerOptions option
options.getChannelPoolOptions().getInitialChannelCount().ifPresent(
channelPoolSettingsBuilder::setInitialChannelCount);

// TODO(kmj): Can we fetch this directly from the stub settings rather than copying it?
InstantiatingGrpcChannelProvider.Builder channelProviderBuilder =
InstantiatingGrpcChannelProvider.newBuilder()
.setMaxInboundMessageSize(Integer.MAX_VALUE)
.setChannelPoolSettings(channelPoolSettingsBuilder.build());
// TODO(kmj): Can we fetch this directly from the stub settings rather than
// copying it?
InstantiatingGrpcChannelProvider.Builder channelProviderBuilder = InstantiatingGrpcChannelProvider.newBuilder()
.setMaxInboundMessageSize(Integer.MAX_VALUE)
.setChannelPoolSettings(channelPoolSettingsBuilder.build());

// Note: this setter method overrides the channel pool settings specified above.
options.getChannelsPerCpu().ifPresent(channelProviderBuilder::setChannelsPerCpu);
Expand All @@ -192,9 +259,8 @@ private static BigQueryReadClient getClient(BigQueryStorageSamplerOptions option
}
}));

BigQueryReadSettings.Builder settingsBuilder =
BigQueryReadSettings.newBuilder()
.setTransportChannelProvider(channelProviderBuilder.build());
BigQueryReadSettings.Builder settingsBuilder = BigQueryReadSettings.newBuilder()
.setTransportChannelProvider(channelProviderBuilder.build());

options.getEndpoint().ifPresent(settingsBuilder::setEndpoint);
options.getExecutorThreadCount().ifPresent(s -> settingsBuilder.setBackgroundExecutorProvider(
Expand All @@ -210,18 +276,17 @@ public static void main(String[] args) throws Exception {
System.out.println("Table: " + options.getTable());
System.out.println("Data format: " + options.getFormat());

CreateReadSessionRequest createReadSessionRequest =
CreateReadSessionRequest.newBuilder()
.setParent(
ProjectReference.parseFromString(options.getParent()).toResourceName())
.setReadSession(
ReadSession.newBuilder()
.setTable(
TableReference.parseFromString(options.getTable()).toResourceName())
.setDataFormat(
DataFormat.parseFromString(options.getFormat()).toProto()))
.setMaxStreamCount(options.getMaxStreams())
.build();
CreateReadSessionRequest createReadSessionRequest = CreateReadSessionRequest.newBuilder()
.setParent(
ProjectReference.parseFromString(options.getParent()).toResourceName())
.setReadSession(
ReadSession.newBuilder()
.setTable(
TableReference.parseFromString(options.getTable()).toResourceName())
.setDataFormat(
DataFormat.parseFromString(options.getFormat()).toProto()))
.setMaxStreamCount(options.getMaxStreams())
.build();

ReadSession readSession;
long elapsedMillis;
Expand Down