Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1246,6 +1246,8 @@
<ignoredDependency>org.apache.commons:commons-compress</ignoredDependency>
<ignoredDependency>org.apache.commons:commons-configuration2</ignoredDependency>
<ignoredDependency>io.grpc:grpc-netty</ignoredDependency>
<!-- JDBC unused in production after FQN cleanup, demoted to test in Step 11d -->
<ignoredDependency>net.snowflake:snowflake-jdbc-thin</ignoredDependency>
</ignoredDependencies>
</configuration>
</execution>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Replicated from snowflake-jdbc (v3.25.1)
* Source: https://github.com/snowflakedb/snowflake-jdbc/blob/v3.25.1/src/main/java/net/snowflake/client/jdbc/cloud/storage/AwsSdkGCPSigner.java
*
* Permitted differences: package. @SnowflakeJdbcInternalApi removed.
*/
package net.snowflake.ingest.streaming.internal.fileTransferAgent;

import com.amazonaws.SignableRequest;
import com.amazonaws.auth.AWS4Signer;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.http.HttpMethodName;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;

public class AwsSdkGCPSigner extends AWS4Signer {
private static final Map<String, String> headerMap =
new HashMap<String, String>() {
{
put("x-amz-storage-class", "x-goog-storage-class");
put("x-amz-acl", "x-goog-acl");
put("x-amz-date", "x-goog-date");
put("x-amz-copy-source", "x-goog-copy-source");
put("x-amz-metadata-directive", "x-goog-metadata-directive");
put("x-amz-copy-source-if-match", "x-goog-copy-source-if-match");
put("x-amz-copy-source-if-none-match", "x-goog-copy-source-if-none-match");
put("x-amz-copy-source-if-unmodified-since", "x-goog-copy-source-if-unmodified-since");
put("x-amz-copy-source-if-modified-since", "x-goog-copy-source-if-modified-since");
}
};

@Override
public void sign(SignableRequest<?> request, AWSCredentials credentials) {
if (credentials.getAWSAccessKeyId() != null && !"".equals(credentials.getAWSAccessKeyId())) {
request.addHeader("Authorization", "Bearer " + credentials.getAWSAccessKeyId());
}

if (request.getHttpMethod() == HttpMethodName.GET) {
request.addHeader("Accept-Encoding", "gzip,deflate");
}

Map<String, String> headerCopy =
request.getHeaders().entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

for (Map.Entry<String, String> entry : headerCopy.entrySet()) {
String entryKey = entry.getKey().toLowerCase();
if (headerMap.containsKey(entryKey)) {
request.addHeader(headerMap.get(entryKey), entry.getValue());
} else if (entryKey.startsWith("x-amz-meta-")) {
request.addHeader(entryKey.replace("x-amz-meta-", "x-goog-meta-"), entry.getValue());
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ boolean handleStorageException(
String command,
String queryId,
SnowflakeGCSClient gcsClient)
throws SnowflakeSQLException, net.snowflake.client.jdbc.SnowflakeSQLException;
throws SnowflakeSQLException;

void shutdown();
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,7 @@ class GCSAccessStrategyAwsSdk implements GCSAccessStrategy {
private static final SFLogger logger = SFLoggerFactory.getLogger(GCSAccessStrategyAwsSdk.class);
private final AmazonS3 amazonClient;

GCSAccessStrategyAwsSdk(StageInfo stage)
throws SnowflakeSQLException, net.snowflake.client.jdbc.SnowflakeSQLException {
GCSAccessStrategyAwsSdk(StageInfo stage) throws SnowflakeSQLException {
String accessToken = (String) stage.getCredentials().get("GCS_ACCESS_TOKEN");

Optional<String> oEndpoint = stage.gcsCustomEndpoint();
Expand All @@ -64,9 +63,10 @@ class GCSAccessStrategyAwsSdk implements GCSAccessStrategy {
ClientConfiguration clientConfig = new ClientConfiguration();

SignerFactory.registerSigner(
"net.snowflake.client.jdbc.cloud.storage.AwsSdkGCPSigner",
net.snowflake.client.jdbc.cloud.storage.AwsSdkGCPSigner.class);
clientConfig.setSignerOverride("net.snowflake.client.jdbc.cloud.storage.AwsSdkGCPSigner");
"net.snowflake.ingest.streaming.internal.fileTransferAgent.AwsSdkGCPSigner",
net.snowflake.ingest.streaming.internal.fileTransferAgent.AwsSdkGCPSigner.class);
clientConfig.setSignerOverride(
"net.snowflake.ingest.streaming.internal.fileTransferAgent.AwsSdkGCPSigner");

clientConfig
.getApacheHttpClientConfig()
Expand Down Expand Up @@ -223,7 +223,7 @@ public boolean handleStorageException(
String command,
String queryId,
SnowflakeGCSClient gcsClient)
throws SnowflakeSQLException, net.snowflake.client.jdbc.SnowflakeSQLException {
throws SnowflakeSQLException {
if (ex instanceof AmazonClientException) {
logger.debug("GCSAccessStrategyAwsSdk: " + ex.getMessage());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ public boolean handleStorageException(
String command,
String queryId,
SnowflakeGCSClient gcsClient)
throws SnowflakeSQLException, net.snowflake.client.jdbc.SnowflakeSQLException {
throws SnowflakeSQLException {
if (ex instanceof StorageException) {
// NOTE: this code path only handle Access token based operation,
// presigned URL is not covered. Presigned Url do not raise
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Replicated from snowflake-jdbc: net.snowflake.client.core.HttpClientSettingsKey
* Replicated from snowflake-jdbc: HttpClientSettingsKey
* Tag: v3.25.1
* Source: https://github.com/snowflakedb/snowflake-jdbc/blob/v3.25.1/src/main/java/net/snowflake/client/core/HttpClientSettingsKey.java
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,7 @@ private SnowflakeAzureClient() {}
* required to decrypt/encrypt content in stage
*/
public static SnowflakeAzureClient createSnowflakeAzureClient(
StageInfo stage, RemoteStoreFileEncryptionMaterial encMat)
throws SnowflakeSQLException, net.snowflake.client.jdbc.SnowflakeSQLException {
StageInfo stage, RemoteStoreFileEncryptionMaterial encMat) throws SnowflakeSQLException {
logger.debug(
"Initializing Snowflake Azure client with encryption: {}",
encMat != null ? "true" : "false");
Expand All @@ -102,9 +101,7 @@ public static SnowflakeAzureClient createSnowflakeAzureClient(
* @throws IllegalArgumentException when invalid credentials are used
*/
private void setupAzureClient(StageInfo stage, RemoteStoreFileEncryptionMaterial encMat)
throws IllegalArgumentException,
SnowflakeSQLException,
net.snowflake.client.jdbc.SnowflakeSQLException {
throws IllegalArgumentException, SnowflakeSQLException, SnowflakeSQLException {
// Save the client creation parameters so that we can reuse them,
// to reset the Azure client.
this.stageInfo = stage;
Expand Down Expand Up @@ -189,8 +186,7 @@ public int getEncryptionKeySize() {
* @throws SnowflakeSQLException failure to renew the client
*/
@Override
public void renew(Map<?, ?> stageCredentials)
throws SnowflakeSQLException, net.snowflake.client.jdbc.SnowflakeSQLException {
public void renew(Map<?, ?> stageCredentials) throws SnowflakeSQLException {
logger.debug("Renewing the Azure client");
stageInfo.setCredentials(stageCredentials);
setupAzureClient(stageInfo, encMat);
Expand Down Expand Up @@ -302,7 +298,7 @@ public void download(
String stageRegion,
String presignedUrl,
String queryId)
throws SnowflakeSQLException, net.snowflake.client.jdbc.SnowflakeSQLException {
throws SnowflakeSQLException {
Stopwatch stopwatch = new Stopwatch();
stopwatch.start();
String localFilePath = localLocation + localFileSep + destFileName;
Expand Down Expand Up @@ -416,7 +412,7 @@ public InputStream downloadToStream(
String stageRegion,
String presignedUrl,
String queryId)
throws SnowflakeSQLException, net.snowflake.client.jdbc.SnowflakeSQLException {
throws SnowflakeSQLException {
logger.debug(
"Staring download of file from Azure stage path: {} to input stream", stageFilePath);
Stopwatch stopwatch = new Stopwatch();
Expand Down Expand Up @@ -528,7 +524,7 @@ public void upload(
String stageRegion,
String presignedUrl,
String queryId)
throws SnowflakeSQLException, net.snowflake.client.jdbc.SnowflakeSQLException {
throws SnowflakeSQLException {
logger.info(
StorageHelper.getStartUploadLog(
"Azure", uploadFromStream, inputStream, fileBackedOutputStream, srcFile, destFileName));
Expand Down Expand Up @@ -649,7 +645,7 @@ public void upload(
@Override
public void handleStorageException(
Exception ex, int retryCount, String operation, String command, String queryId)
throws SnowflakeSQLException, net.snowflake.client.jdbc.SnowflakeSQLException {
throws SnowflakeSQLException {
handleAzureException(ex, retryCount, operation, command, this, queryId);
}

Expand Down Expand Up @@ -755,7 +751,7 @@ private static void handleAzureException(
String command,
SnowflakeAzureClient azClient,
String queryId)
throws SnowflakeSQLException, net.snowflake.client.jdbc.SnowflakeSQLException {
throws SnowflakeSQLException {

// no need to retry if it is invalid key exception
if (ex.getCause() instanceof InvalidKeyException) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -638,9 +638,9 @@ public static void uploadWithoutConnection(SnowflakeFileTransferConfig config) t
String streamingIngestClientKey = config.getStreamingIngestClientKey();

// Create HttpClient key
net.snowflake.client.core.HttpClientSettingsKey key =
net.snowflake.client.jdbc.SnowflakeUtil.convertProxyPropertiesToHttpClientKey(
net.snowflake.client.core.OCSPMode.FAIL_OPEN, proxyProperties);
HttpClientSettingsKey key =
StorageClientUtil.convertProxyPropertiesToHttpClientKey(
net.snowflake.ingest.utils.OCSPMode.FAIL_OPEN, proxyProperties);

StageInfo stageInfo = metadata.getStageInfo();
stageInfo.setProxyProperties(proxyProperties);
Expand Down Expand Up @@ -860,7 +860,7 @@ private static void pushFileToRemoteStoreWithPresignedUrl(
FileCompressionType compressionType,
SnowflakeStorageClient initialClient,
int networkTimeoutInMilli,
net.snowflake.client.core.HttpClientSettingsKey ocspModeAndProxyKey,
HttpClientSettingsKey ocspModeAndProxyKey,
int parallel,
File srcFile,
boolean uploadFromStream,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,7 @@ private SnowflakeGCSClient() {}
* required to decrypt/encrypt content in stage
*/
public static SnowflakeGCSClient createSnowflakeGCSClient(
StageInfo stage, RemoteStoreFileEncryptionMaterial encMat)
throws SnowflakeSQLException, net.snowflake.client.jdbc.SnowflakeSQLException {
StageInfo stage, RemoteStoreFileEncryptionMaterial encMat) throws SnowflakeSQLException {
logger.debug(
"Initializing Snowflake GCS client with encryption: {}", encMat != null ? "true" : "false");
SnowflakeGCSClient sfGcsClient = new SnowflakeGCSClient();
Expand Down Expand Up @@ -137,8 +136,7 @@ public boolean requirePresignedUrl() {
}

@Override
public void renew(Map<?, ?> stageCredentials)
throws SnowflakeSQLException, net.snowflake.client.jdbc.SnowflakeSQLException {
public void renew(Map<?, ?> stageCredentials) throws SnowflakeSQLException {
logger.debug("Renewing the Snowflake GCS client");
stageInfo.setCredentials(stageCredentials);
setupGCSClient(stageInfo, encMat);
Expand Down Expand Up @@ -196,7 +194,7 @@ public void download(
String stageRegion,
String presignedUrl,
String queryId)
throws SnowflakeSQLException, net.snowflake.client.jdbc.SnowflakeSQLException {
throws SnowflakeSQLException {
String localFilePath = localLocation + localFileSep + destFileName;
logger.debug(
"Staring download of file from GCS stage path: {} to {}", stageFilePath, localFilePath);
Expand Down Expand Up @@ -379,7 +377,7 @@ public InputStream downloadToStream(
String stageRegion,
String presignedUrl,
String queryId)
throws SnowflakeSQLException, net.snowflake.client.jdbc.SnowflakeSQLException {
throws SnowflakeSQLException {
logger.debug("Staring download of file from GCS stage path: {} to input stream", stageFilePath);
int retryCount = 0;
Stopwatch stopwatch = new Stopwatch();
Expand Down Expand Up @@ -549,7 +547,7 @@ public InputStream downloadToStream(
@Override
public void uploadWithPresignedUrlWithoutConnection(
int networkTimeoutInMilli,
net.snowflake.client.core.HttpClientSettingsKey ocspModeAndProxyKey,
HttpClientSettingsKey ocspModeAndProxyKey,
int parallelism,
boolean uploadFromStream,
String remoteStorageLocation,
Expand All @@ -561,7 +559,7 @@ public void uploadWithPresignedUrlWithoutConnection(
String stageRegion,
String presignedUrl,
String queryId)
throws SnowflakeSQLException, net.snowflake.client.jdbc.SnowflakeSQLException {
throws SnowflakeSQLException {
logger.info(
StorageHelper.getStartUploadLog(
"GCS", uploadFromStream, inputStream, fileBackedOutputStream, srcFile, destFileName));
Expand Down Expand Up @@ -662,7 +660,7 @@ public void upload(
String stageRegion,
String presignedUrl,
String queryId)
throws SnowflakeSQLException, net.snowflake.client.jdbc.SnowflakeSQLException {
throws SnowflakeSQLException {
logger.info(
StorageHelper.getStartUploadLog(
"GCS", uploadFromStream, inputStream, fileBackedOutputStream, srcFile, destFileName));
Expand Down Expand Up @@ -813,7 +811,7 @@ private void uploadWithDownScopedToken(
long contentLength,
InputStream content,
String queryId)
throws SnowflakeSQLException, net.snowflake.client.jdbc.SnowflakeSQLException {
throws SnowflakeSQLException {
logger.debug("Uploading file {} to bucket {}", destFileName, remoteStorageLocation);
try {
this.gcsAccessStrategy.uploadWithDownScopedToken(
Expand Down Expand Up @@ -862,9 +860,9 @@ private void uploadWithPresignedUrl(
Map<String, String> metadata,
InputStream content,
String presignedUrl,
net.snowflake.client.core.HttpClientSettingsKey ocspAndProxyKey,
HttpClientSettingsKey ocspAndProxyKey,
String queryId)
throws SnowflakeSQLException, net.snowflake.client.jdbc.SnowflakeSQLException {
throws SnowflakeSQLException {
try {
URIBuilder uriBuilder = new URIBuilder(presignedUrl);

Expand Down Expand Up @@ -1037,7 +1035,7 @@ private SFPair<InputStream, Boolean> createUploadStream(
@Override
public void handleStorageException(
Exception ex, int retryCount, String operation, String command, String queryId)
throws SnowflakeSQLException, net.snowflake.client.jdbc.SnowflakeSQLException {
throws SnowflakeSQLException {
// no need to retry if it is invalid key exception
if (ex.getCause() instanceof InvalidKeyException) {
// Most likely cause is that the unlimited strength policy files are not installed
Expand Down Expand Up @@ -1172,9 +1170,7 @@ public String getDigestMetadata(StorageObjectMetadata meta) {
* @throws IllegalArgumentException when invalid credentials are used
*/
private void setupGCSClient(StageInfo stage, RemoteStoreFileEncryptionMaterial encMat)
throws IllegalArgumentException,
SnowflakeSQLException,
net.snowflake.client.jdbc.SnowflakeSQLException {
throws IllegalArgumentException, SnowflakeSQLException, SnowflakeSQLException {
// Save the client creation parameters so that we can reuse them,
// to reset the GCS client.
this.stageInfo = stage;
Expand Down Expand Up @@ -1244,8 +1240,7 @@ public String getStreamingIngestClientKey(StorageObjectMetadata meta) {
* use JDBC's HttpClientSettingsKey type. Will be removed once the full storage stack import swap
* is complete.
*/
private static HttpClientSettingsKey toIngestKey(
net.snowflake.client.core.HttpClientSettingsKey jdbcKey) {
private static HttpClientSettingsKey toIngestKey(HttpClientSettingsKey jdbcKey) {
if (jdbcKey == null) {
return null;
}
Expand Down
Loading
Loading