Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,14 @@
* Replicated from snowflake-jdbc (v3.25.1)
* Source: https://github.com/snowflakedb/snowflake-jdbc/blob/v3.25.1/src/main/java/net/snowflake/client/jdbc/cloud/storage/GCSAccessStrategy.java
*
* Permitted differences: package, SFSession kept from JDBC temporarily,
* Permitted differences: package, SFSession removed (always null from callers),
* SFPair uses ingest version, all storage types use ingest versions (same package).
*/
package net.snowflake.ingest.streaming.internal.fileTransferAgent;

import java.io.File;
import java.io.InputStream;
import java.util.Map;
import net.snowflake.client.core.SFSession;
import net.snowflake.ingest.utils.SFPair;

interface GCSAccessStrategy {
Expand Down Expand Up @@ -40,7 +39,6 @@ boolean handleStorageException(
Exception ex,
int retryCount,
String operation,
SFSession session,
String command,
String queryId,
SnowflakeGCSClient gcsClient)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,9 @@
import com.amazonaws.services.s3.transfer.Upload;
import java.io.File;
import java.io.InputStream;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import net.snowflake.client.core.SFBaseSession;
import net.snowflake.client.core.SFSession;
import net.snowflake.ingest.streaming.internal.fileTransferAgent.log.SFLogger;
import net.snowflake.ingest.streaming.internal.fileTransferAgent.log.SFLoggerFactory;
import net.snowflake.ingest.utils.SFPair;
Expand All @@ -42,7 +39,7 @@ class GCSAccessStrategyAwsSdk implements GCSAccessStrategy {
private static final SFLogger logger = SFLoggerFactory.getLogger(GCSAccessStrategyAwsSdk.class);
private final AmazonS3 amazonClient;

GCSAccessStrategyAwsSdk(StageInfo stage, SFBaseSession session)
GCSAccessStrategyAwsSdk(StageInfo stage)
throws SnowflakeSQLException, net.snowflake.client.jdbc.SnowflakeSQLException {
String accessToken = (String) stage.getCredentials().get("GCS_ACCESS_TOKEN");

Expand Down Expand Up @@ -74,21 +71,7 @@ class GCSAccessStrategyAwsSdk implements GCSAccessStrategy {
clientConfig
.getApacheHttpClientConfig()
.setSslSocketFactory(SnowflakeS3Client.getSSLConnectionSocketFactory());
if (session != null) {
S3HttpUtil.setProxyForS3(session.getHttpClientKey(), clientConfig);
} else {
S3HttpUtil.setSessionlessProxyForS3(stage.getProxyProperties(), clientConfig);
}

if (session instanceof SFSession) {
List<net.snowflake.client.jdbc.HttpHeadersCustomizer> headersCustomizers =
((SFSession) session).getHttpHeadersCustomizers();
if (headersCustomizers != null && !headersCustomizers.isEmpty()) {
amazonS3Builder.withRequestHandlers(
new net.snowflake.client.core.HeaderCustomizerHttpRequestInterceptor(
headersCustomizers));
}
}
S3HttpUtil.setSessionlessProxyForS3(stage.getProxyProperties(), clientConfig);

if (accessToken != null) {
amazonS3Builder.withCredentials(
Expand Down Expand Up @@ -237,7 +220,6 @@ public boolean handleStorageException(
Exception ex,
int retryCount,
String operation,
SFSession session,
String command,
String queryId,
SnowflakeGCSClient gcsClient)
Expand All @@ -256,30 +238,23 @@ public boolean handleStorageException(
if (ex instanceof AmazonServiceException) {
AmazonServiceException ex1 = (AmazonServiceException) ex;

// The AWS credentials might have expired when server returns error 400 and
// does not return the ExpiredToken error code.
// If session is null we cannot renew the token so throw the exception
if (ex1.getStatusCode() == HttpStatus.SC_BAD_REQUEST && session != null) {
SnowflakeFileTransferAgent.renewExpiredToken(session, command, gcsClient);
} else {
throw new SnowflakeSQLLoggedException(
queryId,
session,
SqlState.SYSTEM_ERROR,
ErrorCode.S3_OPERATION_ERROR.getMessageCode(),
ex1,
operation,
ex1.getErrorType().toString(),
ex1.getErrorCode(),
ex1.getMessage(),
ex1.getRequestId(),
extendedRequestId);
}
throw new SnowflakeSQLLoggedException(
Comment thread
sfc-gh-ggeng marked this conversation as resolved.
queryId,
null,
SqlState.SYSTEM_ERROR,
ErrorCode.S3_OPERATION_ERROR.getMessageCode(),
ex1,
operation,
ex1.getErrorType().toString(),
ex1.getErrorCode(),
ex1.getMessage(),
ex1.getRequestId(),
extendedRequestId);

} else {
throw new SnowflakeSQLLoggedException(
queryId,
session,
null,
SqlState.SYSTEM_ERROR,
ErrorCode.AWS_CLIENT_ERROR.getMessageCode(),
ex,
Expand Down Expand Up @@ -314,16 +289,11 @@ public boolean handleStorageException(
if (ex instanceof AmazonS3Exception) {
AmazonS3Exception s3ex = (AmazonS3Exception) ex;
if (s3ex.getErrorCode().equalsIgnoreCase(EXPIRED_AWS_TOKEN_ERROR_CODE)) {
// If session is null we cannot renew the token so throw the ExpiredToken exception
if (session != null) {
SnowflakeFileTransferAgent.renewExpiredToken(session, command, gcsClient);
} else {
throw new SnowflakeSQLException(
queryId,
s3ex.getErrorCode(),
CLOUD_STORAGE_CREDENTIALS_EXPIRED,
"S3 credentials have expired");
}
throw new SnowflakeSQLException(
queryId,
s3ex.getErrorCode(),
CLOUD_STORAGE_CREDENTIALS_EXPIRED,
"S3 credentials have expired");
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import java.io.InputStream;
import java.nio.channels.Channels;
import java.util.Map;
import net.snowflake.client.core.SFSession;
import net.snowflake.ingest.streaming.internal.fileTransferAgent.log.SFLogger;
import net.snowflake.ingest.streaming.internal.fileTransferAgent.log.SFLoggerFactory;
import net.snowflake.ingest.utils.SFPair;
Expand All @@ -28,15 +27,15 @@ class GCSDefaultAccessStrategy implements GCSAccessStrategy {
private static final SFLogger logger = SFLoggerFactory.getLogger(GCSDefaultAccessStrategy.class);
private Storage gcsClient = null;

GCSDefaultAccessStrategy(StageInfo stage, SFSession session) {
GCSDefaultAccessStrategy(StageInfo stage) {
String accessToken = (String) stage.getCredentials().get("GCS_ACCESS_TOKEN");

if (accessToken != null) {
// We are authenticated with an oauth access token.
StorageOptions.Builder builder = StorageOptions.newBuilder();
overrideHost(stage, builder);

if (SnowflakeGCSClient.areDisabledGcsDefaultCredentials(session)) {
if (SnowflakeGCSClient.areDisabledGcsDefaultCredentials()) {
logger.debug(
"Adding explicit credentials to avoid default credential lookup by the GCS client");
builder.setCredentials(GoogleCredentials.create(new AccessToken(accessToken, null)));
Expand Down Expand Up @@ -176,7 +175,6 @@ public boolean handleStorageException(
Exception ex,
int retryCount,
String operation,
SFSession session,
String command,
String queryId,
SnowflakeGCSClient gcsClient)
Expand All @@ -191,7 +189,7 @@ public boolean handleStorageException(
if (retryCount > gcsClient.getMaxRetries()) {
throw new SnowflakeSQLLoggedException(
queryId,
session,
null,
SqlState.SYSTEM_ERROR,
ErrorCode.GCP_SERVICE_ERROR.getMessageCode(),
se,
Expand Down Expand Up @@ -223,17 +221,11 @@ public boolean handleStorageException(
}

if (se.getCode() == 401 && command != null) {
if (session != null) {
// A 401 indicates that the access token has expired,
// we need to refresh the GCS client with the new token
SnowflakeFileTransferAgent.renewExpiredToken(session, command, gcsClient);
} else {
throw new SnowflakeSQLException(
queryId,
se.getMessage(),
CLOUD_STORAGE_CREDENTIALS_EXPIRED,
"GCS credentials have expired");
}
throw new SnowflakeSQLException(
queryId,
se.getMessage(),
CLOUD_STORAGE_CREDENTIALS_EXPIRED,
"GCS credentials have expired");
}
}
return true;
Expand Down
Loading
Loading