From 86378901631bcbf8ed2592ce985b18a3fa188204 Mon Sep 17 00:00:00 2001 From: Gibbs Geng Date: Thu, 2 Apr 2026 06:07:23 +0000 Subject: [PATCH] [SNOW-3249917] JDBC removal Step 10c: Remove SFSession from storage stack MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Remove SFSession/SFBaseSession parameters and dead session-based code from the storage client stack. Session was always null from ingest callers. Storage clients (SnowflakeS3Client, SnowflakeAzureClient, SnowflakeGCSClient): - Remove SFSession/SFBaseSession fields and constructor params - Remove dead `if (session != null)` proxy/headers/permissions blocks (keep sessionless path) - Remove dead `if (session != null) { renewExpiredToken(...) }` blocks - Simplify getMaxRetries() to return default (no session property lookup) - Replace session.isOwnerOnlyStageFilePermissionsEnabled() with false Interface + strategies: - SnowflakeStorageClient: remove SFSession from all method signatures - GCSAccessStrategy/GCSAccessStrategyAwsSdk/GCSDefaultAccessStrategy: remove SFSession from constructors and handleStorageException - StorageClientFactory: remove SFSession/SFBaseSession params Agent + config: - SnowflakeFileTransferAgent: remove session from getStageInfo(), pushFileToRemoteStore(), compressStreamWithGZIP/NoDigest(). Remove dead renewExpiredToken(), parseCommandInGS(), getLocalFilePathFromCommand() methods. - SnowflakeFileTransferConfig: remove SFSession field, getter, setter Remaining SFSession imports (3): SnowflakeSQLLoggedException (constructor params), TelemetryClient (session-based code) — to be handled next. Co-Authored-By: Claude Opus 4.6 --- .../fileTransferAgent/GCSAccessStrategy.java | 4 +- .../GCSAccessStrategyAwsSdk.java | 70 +++----- .../GCSDefaultAccessStrategy.java | 24 +-- .../SnowflakeAzureClient.java | 111 ++++-------- .../SnowflakeFileTransferAgent.java | 169 ++---------------- .../SnowflakeFileTransferConfig.java | 15 +- .../fileTransferAgent/SnowflakeGCSClient.java | 126 +++++-------- .../fileTransferAgent/SnowflakeS3Client.java | 137 +++++--------- .../SnowflakeStorageClient.java | 31 +--- .../StorageClientFactory.java | 27 +-- 10 files changed, 189 insertions(+), 525 deletions(-) diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/fileTransferAgent/GCSAccessStrategy.java b/src/main/java/net/snowflake/ingest/streaming/internal/fileTransferAgent/GCSAccessStrategy.java index 2f4f47b34..4ce9b261f 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/fileTransferAgent/GCSAccessStrategy.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/fileTransferAgent/GCSAccessStrategy.java @@ -2,7 +2,7 @@ * Replicated from snowflake-jdbc (v3.25.1) * Source: https://github.com/snowflakedb/snowflake-jdbc/blob/v3.25.1/src/main/java/net/snowflake/client/jdbc/cloud/storage/GCSAccessStrategy.java * - * Permitted differences: package, SFSession kept from JDBC temporarily, + * Permitted differences: package, SFSession removed (always null from callers), * SFPair uses ingest version, all storage types use ingest versions (same package). */ package net.snowflake.ingest.streaming.internal.fileTransferAgent; @@ -10,7 +10,6 @@ import java.io.File; import java.io.InputStream; import java.util.Map; -import net.snowflake.client.core.SFSession; import net.snowflake.ingest.utils.SFPair; interface GCSAccessStrategy { @@ -40,7 +39,6 @@ boolean handleStorageException( Exception ex, int retryCount, String operation, - SFSession session, String command, String queryId, SnowflakeGCSClient gcsClient) diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/fileTransferAgent/GCSAccessStrategyAwsSdk.java b/src/main/java/net/snowflake/ingest/streaming/internal/fileTransferAgent/GCSAccessStrategyAwsSdk.java index 32e985bf9..e4f182f5e 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/fileTransferAgent/GCSAccessStrategyAwsSdk.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/fileTransferAgent/GCSAccessStrategyAwsSdk.java @@ -27,12 +27,9 @@ import com.amazonaws.services.s3.transfer.Upload; import java.io.File; import java.io.InputStream; -import java.util.List; import java.util.Map; import java.util.Optional; import java.util.stream.Collectors; -import net.snowflake.client.core.SFBaseSession; -import net.snowflake.client.core.SFSession; import net.snowflake.ingest.streaming.internal.fileTransferAgent.log.SFLogger; import net.snowflake.ingest.streaming.internal.fileTransferAgent.log.SFLoggerFactory; import net.snowflake.ingest.utils.SFPair; @@ -42,7 +39,7 @@ class GCSAccessStrategyAwsSdk implements GCSAccessStrategy { private static final SFLogger logger = SFLoggerFactory.getLogger(GCSAccessStrategyAwsSdk.class); private final AmazonS3 amazonClient; - GCSAccessStrategyAwsSdk(StageInfo stage, SFBaseSession session) + GCSAccessStrategyAwsSdk(StageInfo stage) throws SnowflakeSQLException, net.snowflake.client.jdbc.SnowflakeSQLException { String accessToken = (String) stage.getCredentials().get("GCS_ACCESS_TOKEN"); @@ -74,21 +71,7 @@ class GCSAccessStrategyAwsSdk implements GCSAccessStrategy { clientConfig .getApacheHttpClientConfig() .setSslSocketFactory(SnowflakeS3Client.getSSLConnectionSocketFactory()); - if (session != null) { - S3HttpUtil.setProxyForS3(session.getHttpClientKey(), clientConfig); - } else { - S3HttpUtil.setSessionlessProxyForS3(stage.getProxyProperties(), clientConfig); - } - - if (session instanceof SFSession) { - List headersCustomizers = - ((SFSession) session).getHttpHeadersCustomizers(); - if (headersCustomizers != null && !headersCustomizers.isEmpty()) { - amazonS3Builder.withRequestHandlers( - new net.snowflake.client.core.HeaderCustomizerHttpRequestInterceptor( - headersCustomizers)); - } - } + S3HttpUtil.setSessionlessProxyForS3(stage.getProxyProperties(), clientConfig); if (accessToken != null) { amazonS3Builder.withCredentials( @@ -237,7 +220,6 @@ public boolean handleStorageException( Exception ex, int retryCount, String operation, - SFSession session, String command, String queryId, SnowflakeGCSClient gcsClient) @@ -256,30 +238,23 @@ public boolean handleStorageException( if (ex instanceof AmazonServiceException) { AmazonServiceException ex1 = (AmazonServiceException) ex; - // The AWS credentials might have expired when server returns error 400 and - // does not return the ExpiredToken error code. - // If session is null we cannot renew the token so throw the exception - if (ex1.getStatusCode() == HttpStatus.SC_BAD_REQUEST && session != null) { - SnowflakeFileTransferAgent.renewExpiredToken(session, command, gcsClient); - } else { - throw new SnowflakeSQLLoggedException( - queryId, - session, - SqlState.SYSTEM_ERROR, - ErrorCode.S3_OPERATION_ERROR.getMessageCode(), - ex1, - operation, - ex1.getErrorType().toString(), - ex1.getErrorCode(), - ex1.getMessage(), - ex1.getRequestId(), - extendedRequestId); - } + throw new SnowflakeSQLLoggedException( + queryId, + null, + SqlState.SYSTEM_ERROR, + ErrorCode.S3_OPERATION_ERROR.getMessageCode(), + ex1, + operation, + ex1.getErrorType().toString(), + ex1.getErrorCode(), + ex1.getMessage(), + ex1.getRequestId(), + extendedRequestId); } else { throw new SnowflakeSQLLoggedException( queryId, - session, + null, SqlState.SYSTEM_ERROR, ErrorCode.AWS_CLIENT_ERROR.getMessageCode(), ex, @@ -314,16 +289,11 @@ public boolean handleStorageException( if (ex instanceof AmazonS3Exception) { AmazonS3Exception s3ex = (AmazonS3Exception) ex; if (s3ex.getErrorCode().equalsIgnoreCase(EXPIRED_AWS_TOKEN_ERROR_CODE)) { - // If session is null we cannot renew the token so throw the ExpiredToken exception - if (session != null) { - SnowflakeFileTransferAgent.renewExpiredToken(session, command, gcsClient); - } else { - throw new SnowflakeSQLException( - queryId, - s3ex.getErrorCode(), - CLOUD_STORAGE_CREDENTIALS_EXPIRED, - "S3 credentials have expired"); - } + throw new SnowflakeSQLException( + queryId, + s3ex.getErrorCode(), + CLOUD_STORAGE_CREDENTIALS_EXPIRED, + "S3 credentials have expired"); } } } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/fileTransferAgent/GCSDefaultAccessStrategy.java b/src/main/java/net/snowflake/ingest/streaming/internal/fileTransferAgent/GCSDefaultAccessStrategy.java index a3357bbbf..8494a2720 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/fileTransferAgent/GCSDefaultAccessStrategy.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/fileTransferAgent/GCSDefaultAccessStrategy.java @@ -19,7 +19,6 @@ import java.io.InputStream; import java.nio.channels.Channels; import java.util.Map; -import net.snowflake.client.core.SFSession; import net.snowflake.ingest.streaming.internal.fileTransferAgent.log.SFLogger; import net.snowflake.ingest.streaming.internal.fileTransferAgent.log.SFLoggerFactory; import net.snowflake.ingest.utils.SFPair; @@ -28,7 +27,7 @@ class GCSDefaultAccessStrategy implements GCSAccessStrategy { private static final SFLogger logger = SFLoggerFactory.getLogger(GCSDefaultAccessStrategy.class); private Storage gcsClient = null; - GCSDefaultAccessStrategy(StageInfo stage, SFSession session) { + GCSDefaultAccessStrategy(StageInfo stage) { String accessToken = (String) stage.getCredentials().get("GCS_ACCESS_TOKEN"); if (accessToken != null) { @@ -36,7 +35,7 @@ class GCSDefaultAccessStrategy implements GCSAccessStrategy { StorageOptions.Builder builder = StorageOptions.newBuilder(); overrideHost(stage, builder); - if (SnowflakeGCSClient.areDisabledGcsDefaultCredentials(session)) { + if (SnowflakeGCSClient.areDisabledGcsDefaultCredentials()) { logger.debug( "Adding explicit credentials to avoid default credential lookup by the GCS client"); builder.setCredentials(GoogleCredentials.create(new AccessToken(accessToken, null))); @@ -176,7 +175,6 @@ public boolean handleStorageException( Exception ex, int retryCount, String operation, - SFSession session, String command, String queryId, SnowflakeGCSClient gcsClient) @@ -191,7 +189,7 @@ public boolean handleStorageException( if (retryCount > gcsClient.getMaxRetries()) { throw new SnowflakeSQLLoggedException( queryId, - session, + null, SqlState.SYSTEM_ERROR, ErrorCode.GCP_SERVICE_ERROR.getMessageCode(), se, @@ -223,17 +221,11 @@ public boolean handleStorageException( } if (se.getCode() == 401 && command != null) { - if (session != null) { - // A 401 indicates that the access token has expired, - // we need to refresh the GCS client with the new token - SnowflakeFileTransferAgent.renewExpiredToken(session, command, gcsClient); - } else { - throw new SnowflakeSQLException( - queryId, - se.getMessage(), - CLOUD_STORAGE_CREDENTIALS_EXPIRED, - "GCS credentials have expired"); - } + throw new SnowflakeSQLException( + queryId, + se.getMessage(), + CLOUD_STORAGE_CREDENTIALS_EXPIRED, + "GCS credentials have expired"); } } return true; diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/fileTransferAgent/SnowflakeAzureClient.java b/src/main/java/net/snowflake/ingest/streaming/internal/fileTransferAgent/SnowflakeAzureClient.java index 2978dc7ab..fb3b76c48 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/fileTransferAgent/SnowflakeAzureClient.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/fileTransferAgent/SnowflakeAzureClient.java @@ -6,11 +6,10 @@ * ErrorCode/SqlState/SnowflakeSQLException/SnowflakeSQLLoggedException use ingest versions, * all storage types use ingest versions (same package). * SnowflakeUtil static methods replaced with StorageClientUtil equivalents. - * SFSession/SFBaseSession/SFSessionProperty kept from JDBC temporarily. + * SFSession/SFBaseSession removed (always null from callers). */ package net.snowflake.ingest.streaming.internal.fileTransferAgent; -import static net.snowflake.client.core.HttpUtil.setProxyForAzure; import static net.snowflake.client.core.HttpUtil.setSessionlessProxyForAzure; import static net.snowflake.ingest.streaming.internal.fileTransferAgent.ErrorCode.CLOUD_STORAGE_CREDENTIALS_EXPIRED; import static net.snowflake.ingest.streaming.internal.fileTransferAgent.StorageClientUtil.systemGetProperty; @@ -50,12 +49,9 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import net.snowflake.client.core.SFBaseSession; -import net.snowflake.client.core.SFSession; import net.snowflake.ingest.streaming.internal.fileTransferAgent.log.SFLogger; import net.snowflake.ingest.streaming.internal.fileTransferAgent.log.SFLoggerFactory; import net.snowflake.ingest.utils.SFPair; -import net.snowflake.ingest.utils.SFSessionProperty; import net.snowflake.ingest.utils.Stopwatch; import org.apache.commons.io.IOUtils; @@ -73,7 +69,6 @@ public class SnowflakeAzureClient implements SnowflakeStorageClient { private CloudBlobClient azStorageClient; private static final SFLogger logger = SFLoggerFactory.getLogger(SnowflakeAzureClient.class); private OperationContext opContext = null; - private SFBaseSession session; private SnowflakeAzureClient() {} ; @@ -85,13 +80,13 @@ private SnowflakeAzureClient() {} * required to decrypt/encrypt content in stage */ public static SnowflakeAzureClient createSnowflakeAzureClient( - StageInfo stage, RemoteStoreFileEncryptionMaterial encMat, SFBaseSession sfSession) + StageInfo stage, RemoteStoreFileEncryptionMaterial encMat) throws SnowflakeSQLException, net.snowflake.client.jdbc.SnowflakeSQLException { logger.debug( "Initializing Snowflake Azure client with encryption: {}", encMat != null ? "true" : "false"); SnowflakeAzureClient azureClient = new SnowflakeAzureClient(); - azureClient.setupAzureClient(stage, encMat, sfSession); + azureClient.setupAzureClient(stage, encMat); return azureClient; } @@ -106,8 +101,7 @@ public static SnowflakeAzureClient createSnowflakeAzureClient( * required to decrypt/encrypt content in stage * @throws IllegalArgumentException when invalid credentials are used */ - private void setupAzureClient( - StageInfo stage, RemoteStoreFileEncryptionMaterial encMat, SFBaseSession sfSession) + private void setupAzureClient(StageInfo stage, RemoteStoreFileEncryptionMaterial encMat) throws IllegalArgumentException, SnowflakeSQLException, net.snowflake.client.jdbc.SnowflakeSQLException { @@ -115,7 +109,6 @@ private void setupAzureClient( // to reset the Azure client. this.stageInfo = stage; this.encMat = encMat; - this.session = sfSession; logger.debug("Setting up the Azure client ", false); @@ -140,7 +133,7 @@ private void setupAzureClient( if (encryptionKeySize != 128 && encryptionKeySize != 192 && encryptionKeySize != 256) { throw new SnowflakeSQLLoggedException( QueryIdHelper.queryIdFromEncMatOr(encMat, null), - session, + null, ErrorCode.INTERNAL_ERROR.getMessageCode(), SqlState.INTERNAL_ERROR, "unsupported key size", @@ -149,11 +142,7 @@ private void setupAzureClient( } this.azStorageClient = new CloudBlobClient(storageEndpoint, azCreds); opContext = new OperationContext(); - if (session != null) { - setProxyForAzure(session.getHttpClientKey(), opContext); - } else { - setSessionlessProxyForAzure(stage.getProxyProperties(), opContext); - } + setSessionlessProxyForAzure(stage.getProxyProperties(), opContext); } catch (URISyntaxException ex) { throw new IllegalArgumentException("invalid_azure_credentials"); } @@ -162,12 +151,6 @@ private void setupAzureClient( // Returns the Max number of retry attempts @Override public int getMaxRetries() { - if (session != null - && session - .getConnectionPropertiesMap() - .containsKey(SFSessionProperty.PUT_GET_MAX_RETRIES)) { - return (int) session.getConnectionPropertiesMap().get(SFSessionProperty.PUT_GET_MAX_RETRIES); - } return 25; } @@ -211,7 +194,7 @@ public void renew(Map stageCredentials) throws SnowflakeSQLException, net.snowflake.client.jdbc.SnowflakeSQLException { logger.debug("Renewing the Azure client"); stageInfo.setCredentials(stageCredentials); - setupAzureClient(stageInfo, encMat, session); + setupAzureClient(stageInfo, encMat); } /** shuts down the client */ @@ -298,7 +281,6 @@ public StorageObjectMetadata getObjectMetadata(String remoteStorageLocation, Str /** * Download a file from remote storage. * - * @param session session object * @param command command to download file * @param localLocation local file path * @param destFileName destination file name @@ -312,7 +294,6 @@ public StorageObjectMetadata getObjectMetadata(String remoteStorageLocation, Str */ @Override public void download( - SFSession session, String command, String localLocation, String destFileName, @@ -339,8 +320,7 @@ public void download( transferOptions.setConcurrentRequestCount(parallelism); blob.downloadToFile(localFilePath, null, transferOptions, opContext); - StorageClientUtil.assureOnlyUserAccessibleFilePermissions( - localFile, session.isOwnerOnlyStageFilePermissionsEnabled()); + StorageClientUtil.assureOnlyUserAccessibleFilePermissions(localFile, false); stopwatch.stop(); long downloadMillis = stopwatch.elapsedMillis(); @@ -356,7 +336,7 @@ public void download( if (!userDefinedMetadata.containsKey(AZ_ENCRYPTIONDATAPROP)) { throw new SnowflakeSQLLoggedException( queryId, - session, + null, ErrorCode.INTERNAL_ERROR.getMessageCode(), SqlState.INTERNAL_ERROR, "Encryption data not found in the metadata of a file being downloaded"); @@ -370,7 +350,7 @@ public void download( if (key == null || iv == null) { throw new SnowflakeSQLLoggedException( queryId, - session, + null, ErrorCode.INTERNAL_ERROR.getMessageCode(), SqlState.INTERNAL_ERROR, "File metadata incomplete"); @@ -406,13 +386,13 @@ public void download( } catch (Exception ex) { logger.debug("Download unsuccessful {}", ex); - handleAzureException(ex, ++retryCount, "download", session, command, this, queryId); + handleAzureException(ex, ++retryCount, "download", command, this, queryId); } } while (retryCount <= getMaxRetries()); throw new SnowflakeSQLLoggedException( queryId, - session, + null, ErrorCode.INTERNAL_ERROR.getMessageCode(), SqlState.INTERNAL_ERROR, "Unexpected: download unsuccessful without exception!"); @@ -421,7 +401,6 @@ public void download( /** * Download a file from remote storage * - * @param session session object * @param command command to download file * @param parallelism number of threads for parallel downloading * @param remoteStorageLocation remote storage location, i.e. bucket for s3 @@ -434,7 +413,6 @@ public void download( */ @Override public InputStream downloadToStream( - SFSession session, String command, int parallelism, String remoteStorageLocation, @@ -465,7 +443,7 @@ public InputStream downloadToStream( if (!userDefinedMetadata.containsKey(AZ_ENCRYPTIONDATAPROP)) { throw new SnowflakeSQLLoggedException( queryId, - session, + null, ErrorCode.INTERNAL_ERROR.getMessageCode(), SqlState.INTERNAL_ERROR, "Encryption data not found in the metadata of a file being downloaded"); @@ -478,7 +456,7 @@ public InputStream downloadToStream( if (key == null || iv == null) { throw new SnowflakeSQLLoggedException( queryId, - session, + null, ErrorCode.INTERNAL_ERROR.getMessageCode(), SqlState.INTERNAL_ERROR, "File metadata incomplete"); @@ -514,13 +492,13 @@ public InputStream downloadToStream( } catch (Exception ex) { logger.debug("Downloading unsuccessful {}", ex); - handleAzureException(ex, ++retryCount, "download", session, command, this, queryId); + handleAzureException(ex, ++retryCount, "download", command, this, queryId); } } while (retryCount < getMaxRetries()); throw new SnowflakeSQLLoggedException( queryId, - session, + null, ErrorCode.INTERNAL_ERROR.getMessageCode(), SqlState.INTERNAL_ERROR, "Unexpected: download unsuccessful without exception!"); @@ -529,7 +507,6 @@ public InputStream downloadToStream( /** * Upload a file/stream to remote storage * - * @param session session object * @param command upload command * @param parallelism number of threads for parallel uploading * @param uploadFromStream true if upload source is stream @@ -546,7 +523,6 @@ public InputStream downloadToStream( */ @Override public void upload( - SFSession session, String command, int parallelism, boolean uploadFromStream, @@ -629,7 +605,7 @@ public void upload( return; } catch (Exception ex) { - handleAzureException(ex, ++retryCount, "upload", session, command, this, queryId); + handleAzureException(ex, ++retryCount, "upload", command, this, queryId); if (uploadFromStream && fileBackedOutputStream == null) { throw new SnowflakeSQLException( @@ -673,21 +649,15 @@ public void upload( * @param retryCount current number of retries, incremented by the caller before each call * @param operation string that indicates the function/operation that was taking place, when the * exception was raised, for example "upload" - * @param session the current SFSession object used by the client * @param command the command attempted at the time of the exception * @param queryId last query id * @throws SnowflakeSQLException exceptions not handled */ @Override public void handleStorageException( - Exception ex, - int retryCount, - String operation, - SFSession session, - String command, - String queryId) + Exception ex, int retryCount, String operation, String command, String queryId) throws SnowflakeSQLException, net.snowflake.client.jdbc.SnowflakeSQLException { - handleAzureException(ex, retryCount, operation, session, command, this, queryId); + handleAzureException(ex, retryCount, operation, command, this, queryId); } private SFPair createUploadStream( @@ -731,7 +701,7 @@ private SFPair createUploadStream( logger.error("Failed to encrypt input", ex); throw new SnowflakeSQLLoggedException( queryId, - session, + null, SqlState.INTERNAL_ERROR, ErrorCode.INTERNAL_ERROR.getMessageCode(), ex, @@ -755,7 +725,7 @@ private SFPair createUploadStream( logger.error("Failed to open input file", ex); throw new SnowflakeSQLLoggedException( queryId, - session, + null, SqlState.INTERNAL_ERROR, ErrorCode.INTERNAL_ERROR.getMessageCode(), ex, @@ -765,7 +735,7 @@ private SFPair createUploadStream( logger.error("Failed to open input stream", ex); throw new SnowflakeSQLLoggedException( queryId, - session, + null, SqlState.INTERNAL_ERROR, ErrorCode.INTERNAL_ERROR.getMessageCode(), ex, @@ -784,7 +754,6 @@ private SFPair createUploadStream( * @param retryCount current number of retries, incremented by the caller before each call * @param operation string that indicates the function/operation that was taking place, when the * exception was raised, for example "upload" - * @param session the current SFSession object used by the client * @param command the command attempted at the time of the exception * @param azClient the current Snowflake Azure client object * @throws SnowflakeSQLException exceptions not handled @@ -793,7 +762,6 @@ private static void handleAzureException( Exception ex, int retryCount, String operation, - SFSession session, String command, SnowflakeAzureClient azClient, String queryId) @@ -809,7 +777,7 @@ private static void handleAzureException( // If there is no space left in the download location, java.io.IOException is thrown. // Don't retry. if (StorageClientUtil.getRootCause(ex) instanceof IOException) { - StorageClientUtil.throwNoSpaceLeftError(session, operation, ex, queryId); + StorageClientUtil.throwNoSpaceLeftError(null, operation, ex, queryId); } if (ex instanceof StorageException) { @@ -817,17 +785,12 @@ private static void handleAzureException( if (((StorageException) ex).getHttpStatusCode() == 403) { // A 403 indicates that the SAS token has expired, - // we need to refresh the Azure client with the new token - if (session != null) { - SnowflakeFileTransferAgent.renewExpiredToken(session, command, azClient); - } else { - // If session is null we cannot renew the token so throw the ExpiredToken exception - throw new SnowflakeSQLException( - queryId, - se.getErrorCode(), - CLOUD_STORAGE_CREDENTIALS_EXPIRED, - "Azure credentials may have expired"); - } + // we cannot renew without a session so throw the ExpiredToken exception + throw new SnowflakeSQLException( + queryId, + se.getErrorCode(), + CLOUD_STORAGE_CREDENTIALS_EXPIRED, + "Azure credentials may have expired"); } // If we have exceeded the max number of retries, propagate the error // no need for back off and retry if the file does not exist @@ -835,7 +798,7 @@ private static void handleAzureException( || ((StorageException) ex).getHttpStatusCode() == 404) { throw new SnowflakeSQLLoggedException( queryId, - session, + null, SqlState.SYSTEM_ERROR, ErrorCode.AZURE_SERVICE_ERROR.getMessageCode(), se, @@ -869,8 +832,12 @@ private static void handleAzureException( if (se.getHttpStatusCode() == 403) { // A 403 indicates that the SAS token has expired, - // we need to refresh the Azure client with the new token - SnowflakeFileTransferAgent.renewExpiredToken(session, command, azClient); + // we cannot renew without a session so throw the ExpiredToken exception + throw new SnowflakeSQLException( + queryId, + se.getErrorCode(), + CLOUD_STORAGE_CREDENTIALS_EXPIRED, + "Azure credentials may have expired"); } } } else { @@ -879,7 +846,7 @@ private static void handleAzureException( if (retryCount > azClient.getMaxRetries()) { throw new SnowflakeSQLLoggedException( queryId, - session, + null, SqlState.SYSTEM_ERROR, ErrorCode.IO_ERROR.getMessageCode(), ex, @@ -894,7 +861,7 @@ private static void handleAzureException( } else { throw new SnowflakeSQLLoggedException( queryId, - session, + null, SqlState.SYSTEM_ERROR, ErrorCode.IO_ERROR.getMessageCode(), ex, @@ -989,7 +956,7 @@ private SimpleEntry parseEncryptionData(String jsonEncryptionDat } catch (Exception ex) { throw new SnowflakeSQLLoggedException( queryId, - session, + null, SqlState.SYSTEM_ERROR, ErrorCode.IO_ERROR.getMessageCode(), ex, diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/fileTransferAgent/SnowflakeFileTransferAgent.java b/src/main/java/net/snowflake/ingest/streaming/internal/fileTransferAgent/SnowflakeFileTransferAgent.java index 6266142bb..041d859ce 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/fileTransferAgent/SnowflakeFileTransferAgent.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/fileTransferAgent/SnowflakeFileTransferAgent.java @@ -42,7 +42,6 @@ import java.util.Properties; import java.util.Set; import java.util.zip.GZIPOutputStream; -import net.snowflake.client.core.SFSession; import net.snowflake.ingest.streaming.internal.fileTransferAgent.log.SFLogger; import net.snowflake.ingest.streaming.internal.fileTransferAgent.log.SFLoggerFactory; import org.apache.commons.io.IOUtils; @@ -97,7 +96,7 @@ static List getEncryptionMaterial( return encryptionMaterial; } - static StageInfo getStageInfo(JsonNode jsonNode, SFSession session) throws SnowflakeSQLException { + static StageInfo getStageInfo(JsonNode jsonNode) throws SnowflakeSQLException { String queryId = jsonNode.path("data").path("queryId").asText(); // more parameters common to upload/download @@ -201,20 +200,13 @@ static StageInfo getStageInfo(JsonNode jsonNode, SFSession session) throws Snowf setupUseVirtualUrl(jsonNode, stageInfo); if (stageInfo.getStageType() == StageInfo.StageType.S3) { - if (session == null) { - // This node's value is set if PUT is used without Session. (For Snowpipe Streaming, we rely - // on a response from a server to have this field set to use S3RegionalURL) - JsonNode useS3RegionalURLNode = - jsonNode.path("data").path("stageInfo").path("useS3RegionalUrl"); - if (!useS3RegionalURLNode.isMissingNode()) { - boolean useS3RegionalUrl = useS3RegionalURLNode.asBoolean(false); - stageInfo.setUseS3RegionalUrl(useS3RegionalUrl); - } - } else { - // Update StageInfo to reflect use of S3 regional URL. - // This is required for connecting to S3 over privatelink when the - // target stage is in us-east-1 - stageInfo.setUseS3RegionalUrl(session.getUseRegionalS3EndpointsForPresignedURL()); + // This node's value is set if PUT is used without Session. (For Snowpipe Streaming, we rely + // on a response from a server to have this field set to use S3RegionalURL) + JsonNode useS3RegionalURLNode = + jsonNode.path("data").path("stageInfo").path("useS3RegionalUrl"); + if (!useS3RegionalURLNode.isMissingNode()) { + boolean useS3RegionalUrl = useS3RegionalURLNode.asBoolean(false); + stageInfo.setUseS3RegionalUrl(useS3RegionalUrl); } } @@ -337,7 +329,7 @@ public static List getFileTransferMetadatas( final Set sourceFiles = expandFileNames(srcLocations, queryId); - StageInfo stageInfo = getStageInfo(jsonNode, null /*SFSession*/); + StageInfo stageInfo = getStageInfo(jsonNode); List result = new ArrayList<>(); if (stageInfo.getStageType() != StageInfo.StageType.GCS @@ -475,118 +467,8 @@ static Set expandFileNames(String[] filePathList, String queryId) return result; } - /** - * Replicated from SnowflakeFileTransferAgent.parseCommandInGS. Source: - * https://github.com/snowflakedb/snowflake-jdbc/blob/v3.25.1/src/main/java/net/snowflake/client/jdbc/SnowflakeFileTransferAgent.java - */ - private static com.fasterxml.jackson.databind.JsonNode parseCommandInGS( - net.snowflake.client.core.SFStatement statement, String command) - throws SnowflakeSQLException, net.snowflake.client.jdbc.SnowflakeSQLException { - Object result = null; - // send the command to GS - try { - result = - statement.executeHelper( - command, - "application/json", - null, // bindValues - false, // describeOnly - false, // internal - false, // async - new net.snowflake.client.core - .ExecTimeTelemetryData()); // OOB telemetry timing queries - } catch (net.snowflake.client.core.SFException ex) { - throw new SnowflakeSQLException( - ex.getQueryId(), ex, ex.getSqlState(), ex.getVendorCode(), ex.getParams()); - } - - com.fasterxml.jackson.databind.JsonNode jsonNode = - (com.fasterxml.jackson.databind.JsonNode) result; - - logger.debug( - "Response: {}", net.snowflake.client.util.SecretDetector.maskSecrets(jsonNode.toString())); - - net.snowflake.client.jdbc.SnowflakeUtil.checkErrorAndThrowException(jsonNode); - return jsonNode; - } - - /** - * Replicated from SnowflakeFileTransferAgent.getLocalFilePathFromCommand. Source: - * https://github.com/snowflakedb/snowflake-jdbc/blob/v3.25.1/src/main/java/net/snowflake/client/jdbc/SnowflakeFileTransferAgent.java - */ - private static String getLocalFilePathFromCommand(String command, boolean unescape) { - if (command == null) { - logger.error("null command", false); - return null; - } - - if (command.indexOf(FILE_PROTOCOL) < 0) { - logger.error("file:// prefix not found in command: {}", command); - return null; - } - - int localFilePathBeginIdx = command.indexOf(FILE_PROTOCOL) + FILE_PROTOCOL.length(); - boolean isLocalFilePathQuoted = - (localFilePathBeginIdx > FILE_PROTOCOL.length()) - && (command.charAt(localFilePathBeginIdx - 1 - FILE_PROTOCOL.length()) == '\''); - - // the ending index is exclusive - int localFilePathEndIdx = 0; - String localFilePath = ""; - - if (isLocalFilePathQuoted) { - // look for the matching quote - localFilePathEndIdx = command.indexOf("'", localFilePathBeginIdx); - if (localFilePathEndIdx > localFilePathBeginIdx) { - localFilePath = command.substring(localFilePathBeginIdx, localFilePathEndIdx); - } - // unescape backslashes to match the file name from GS - if (unescape) { - localFilePath = localFilePath.replaceAll("\\\\\\\\", "\\\\"); - } - } else { - // look for the first space or new line or semi colon - java.util.List indexList = new java.util.ArrayList<>(); - char[] delimiterChars = {' ', '\n', ';'}; - for (int i = 0; i < delimiterChars.length; i++) { - int charIndex = command.indexOf(delimiterChars[i], localFilePathBeginIdx); - if (charIndex != -1) { - indexList.add(charIndex); - } - } - - localFilePathEndIdx = indexList.isEmpty() ? -1 : java.util.Collections.min(indexList); - - if (localFilePathEndIdx > localFilePathBeginIdx) { - localFilePath = command.substring(localFilePathBeginIdx, localFilePathEndIdx); - } else if (localFilePathEndIdx == -1) { - localFilePath = command.substring(localFilePathBeginIdx); - } - } - - return localFilePath; - } - private static final String FILE_PROTOCOL = "file://"; - /** - * Replicated from SnowflakeFileTransferAgent.renewExpiredToken. Source: - * https://github.com/snowflakedb/snowflake-jdbc/blob/v3.25.1/src/main/java/net/snowflake/client/jdbc/SnowflakeFileTransferAgent.java - */ - public static void renewExpiredToken( - net.snowflake.client.core.SFSession session, String command, SnowflakeStorageClient client) - throws SnowflakeSQLException, net.snowflake.client.jdbc.SnowflakeSQLException { - net.snowflake.client.core.SFStatement statement = - new net.snowflake.client.core.SFStatement(session); - com.fasterxml.jackson.databind.JsonNode jsonNode = parseCommandInGS(statement, command); - String queryId = jsonNode.path("data").path("queryId").asText(); - java.util.Map stageCredentials = extractStageCreds(jsonNode, queryId); - - // renew client with the fresh token - logger.debug("Renewing expired access token"); - client.renew(stageCredentials); - } - // ---- Inner classes and methods replicated from JDBC for uploadWithoutConnection ---- static class InputStreamWithMetadata { @@ -626,8 +508,7 @@ public static remoteLocation extractLocationAndPath(String stageLocationPath) { } private static InputStreamWithMetadata compressStreamWithGZIP( - InputStream inputStream, net.snowflake.client.core.SFBaseSession session, String queryId) - throws SnowflakeSQLException { + InputStream inputStream, String queryId) throws SnowflakeSQLException { FileBackedOutputStream tempStream = new FileBackedOutputStream(MAX_BUFFER_SIZE, true); try { @@ -667,7 +548,7 @@ private static InputStreamWithMetadata compressStreamWithGZIP( throw new SnowflakeSQLLoggedException( queryId, - session, + null, SqlState.INTERNAL_ERROR, ErrorCode.INTERNAL_ERROR.getMessageCode(), ex, @@ -677,8 +558,7 @@ private static InputStreamWithMetadata compressStreamWithGZIP( @Deprecated private static InputStreamWithMetadata compressStreamWithGZIPNoDigest( - InputStream inputStream, net.snowflake.client.core.SFBaseSession session, String queryId) - throws SnowflakeSQLException { + InputStream inputStream, String queryId) throws SnowflakeSQLException { try { FileBackedOutputStream tempStream = new FileBackedOutputStream(MAX_BUFFER_SIZE, true); @@ -709,7 +589,7 @@ private static InputStreamWithMetadata compressStreamWithGZIPNoDigest( throw new SnowflakeSQLLoggedException( queryId, - session, + null, SqlState.INTERNAL_ERROR, ErrorCode.INTERNAL_ERROR.getMessageCode(), ex, @@ -789,8 +669,8 @@ public static void uploadWithoutConnection(SnowflakeFileTransferConfig config) t if (requireCompress) { InputStreamWithMetadata compressedSizeAndStream = (encMat == null - ? compressStreamWithGZIPNoDigest(uploadStream, /* session= */ null, null) - : compressStreamWithGZIP(uploadStream, /* session= */ null, encMat.getQueryId())); + ? compressStreamWithGZIPNoDigest(uploadStream, null) + : compressStreamWithGZIP(uploadStream, encMat.getQueryId())); fileBackedOutputStream = compressedSizeAndStream.fileBackedOutputStream; @@ -826,7 +706,7 @@ public static void uploadWithoutConnection(SnowflakeFileTransferConfig config) t uploadSize); SnowflakeStorageClient initialClient = - StorageClientFactory.getFactory().createClient(stageInfo, 1, encMat, /* session= */ null); + StorageClientFactory.getFactory().createClient(stageInfo, 1, encMat); // Normal flow will never hit here. This is only for testing purposes if (isInjectedFileTransferExceptionEnabled()) { @@ -846,7 +726,6 @@ public static void uploadWithoutConnection(SnowflakeFileTransferConfig config) t digest, (requireCompress ? FileCompressionType.GZIP : null), initialClient, - config.getSession(), config.getCommand(), 1, fileToUpload, @@ -910,7 +789,6 @@ private static void pushFileToRemoteStore( String digest, FileCompressionType compressionType, SnowflakeStorageClient initialClient, - net.snowflake.client.core.SFSession session, String command, int parallel, File srcFile, @@ -954,22 +832,7 @@ private static void pushFileToRemoteStore( try { String presignedUrl = ""; - if (initialClient.requirePresignedUrl()) { - // need to replace file://mypath/myfile?.csv with file://mypath/myfile1.csv.gz - String localFilePath = getLocalFilePathFromCommand(command, false); - String commandWithExactPath = command.replace(localFilePath, origDestFileName); - // then hand that to GS to get the actual presigned URL we'll use - net.snowflake.client.core.SFStatement statement = - new net.snowflake.client.core.SFStatement(session); - com.fasterxml.jackson.databind.JsonNode jsonNode = - parseCommandInGS(statement, commandWithExactPath); - - if (!jsonNode.path("data").path("stageInfo").path("presignedUrl").isMissingNode()) { - presignedUrl = jsonNode.path("data").path("stageInfo").path("presignedUrl").asText(); - } - } initialClient.upload( - session, command, parallel, uploadFromStream, diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/fileTransferAgent/SnowflakeFileTransferConfig.java b/src/main/java/net/snowflake/ingest/streaming/internal/fileTransferAgent/SnowflakeFileTransferConfig.java index db7e54cd7..1ff00e19f 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/fileTransferAgent/SnowflakeFileTransferConfig.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/fileTransferAgent/SnowflakeFileTransferConfig.java @@ -3,13 +3,12 @@ * Source: https://github.com/snowflakedb/snowflake-jdbc/blob/v3.25.1/src/main/java/net/snowflake/client/jdbc/SnowflakeFileTransferConfig.java * * Permitted differences: package, OCSPMode uses ingest version, - * @SnowflakeOrgInternalApi removed. SFSession kept from JDBC temporarily. + * @SnowflakeOrgInternalApi removed. SFSession removed (always null from callers). */ package net.snowflake.ingest.streaming.internal.fileTransferAgent; import java.io.InputStream; import java.util.Properties; -import net.snowflake.client.core.SFSession; import net.snowflake.ingest.utils.OCSPMode; /** @@ -24,7 +23,6 @@ public class SnowflakeFileTransferConfig { private Properties proxyProperties; private String prefix; private String destFileName; - private SFSession session; // Optional, added for S3 and Azure (always null from ingest callers) private String command; // Optional, added for S3 and Azure private boolean useS3RegionalUrl; // only for S3 us-east-1 private link deployments private String streamingIngestClientName; @@ -40,7 +38,6 @@ public SnowflakeFileTransferConfig(Builder builder) { this.proxyProperties = builder.proxyProperties; this.prefix = builder.prefix; this.destFileName = builder.destFileName; - this.session = builder.session; this.command = builder.command; this.useS3RegionalUrl = builder.useS3RegionalUrl; this.streamingIngestClientKey = builder.streamingIngestClientKey; @@ -80,10 +77,6 @@ public String getDestFileName() { return destFileName; } - public SFSession getSession() { - return session; - } - public String getCommand() { return command; } @@ -114,7 +107,6 @@ public static class Builder { private Properties proxyProperties = null; private String prefix = null; private String destFileName = null; - private SFSession session = null; private String command = null; private boolean useS3RegionalUrl = false; // only for S3 us-east-1 private link deployments private String streamingIngestClientName; @@ -185,11 +177,6 @@ public Builder setDestFileName(String destFileName) { return this; } - public Builder setSFSession(SFSession session) { - this.session = session; - return this; - } - public Builder setCommand(String command) { this.command = command; return this; diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/fileTransferAgent/SnowflakeGCSClient.java b/src/main/java/net/snowflake/ingest/streaming/internal/fileTransferAgent/SnowflakeGCSClient.java index e6c452993..e12e9b5bb 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/fileTransferAgent/SnowflakeGCSClient.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/fileTransferAgent/SnowflakeGCSClient.java @@ -6,7 +6,7 @@ * ErrorCode/SqlState/SnowflakeSQLException/SnowflakeSQLLoggedException use ingest versions, * all storage types use ingest versions (same package). * SnowflakeUtil static methods replaced with StorageClientUtil equivalents. - * SFSession/SFBaseSession kept from JDBC temporarily. + * SFSession/SFBaseSession removed (always null from callers). * HttpClientSettingsKey uses ingest version (same package). */ package net.snowflake.ingest.streaming.internal.fileTransferAgent; @@ -41,13 +41,11 @@ import net.snowflake.client.core.ExecTimeTelemetryData; import net.snowflake.client.core.HttpResponseContextDto; import net.snowflake.client.core.HttpUtil; -import net.snowflake.client.core.SFSession; import net.snowflake.client.jdbc.RestRequest; import net.snowflake.ingest.streaming.internal.fileTransferAgent.log.ArgSupplier; import net.snowflake.ingest.streaming.internal.fileTransferAgent.log.SFLogger; import net.snowflake.ingest.streaming.internal.fileTransferAgent.log.SFLoggerFactory; import net.snowflake.ingest.utils.SFPair; -import net.snowflake.ingest.utils.SFSessionProperty; import net.snowflake.ingest.utils.Stopwatch; import org.apache.commons.io.IOUtils; import org.apache.http.HttpResponse; @@ -73,7 +71,6 @@ public class SnowflakeGCSClient implements SnowflakeStorageClient { private int encryptionKeySize = 0; // used for PUTs private StageInfo stageInfo; private RemoteStoreFileEncryptionMaterial encMat; - private SFSession session = null; private GCSAccessStrategy gcsAccessStrategy = null; private static final SFLogger logger = SFLoggerFactory.getLogger(SnowflakeGCSClient.class); @@ -87,12 +84,12 @@ private SnowflakeGCSClient() {} * required to decrypt/encrypt content in stage */ public static SnowflakeGCSClient createSnowflakeGCSClient( - StageInfo stage, RemoteStoreFileEncryptionMaterial encMat, SFSession session) + StageInfo stage, RemoteStoreFileEncryptionMaterial encMat) throws SnowflakeSQLException, net.snowflake.client.jdbc.SnowflakeSQLException { logger.debug( "Initializing Snowflake GCS client with encryption: {}", encMat != null ? "true" : "false"); SnowflakeGCSClient sfGcsClient = new SnowflakeGCSClient(); - sfGcsClient.setupGCSClient(stage, encMat, session); + sfGcsClient.setupGCSClient(stage, encMat); return sfGcsClient; } @@ -100,12 +97,6 @@ public static SnowflakeGCSClient createSnowflakeGCSClient( // Returns the Max number of retry attempts @Override public int getMaxRetries() { - if (session != null - && session - .getConnectionPropertiesMap() - .containsKey(SFSessionProperty.PUT_GET_MAX_RETRIES)) { - return (int) session.getConnectionPropertiesMap().get(SFSessionProperty.PUT_GET_MAX_RETRIES); - } return 25; } @@ -153,7 +144,7 @@ public void renew(Map stageCredentials) throws SnowflakeSQLException, net.snowflake.client.jdbc.SnowflakeSQLException { logger.debug("Renewing the Snowflake GCS client"); stageInfo.setCredentials(stageCredentials); - setupGCSClient(stageInfo, encMat, session); + setupGCSClient(stageInfo, encMat); } @Override @@ -186,7 +177,6 @@ public StorageObjectMetadata getObjectMetadata(String remoteStorageLocation, Str /** * Download a file from remote storage. * - * @param session session object * @param command command to download file * @param localLocation local file path * @param destFileName destination file name @@ -200,7 +190,6 @@ public StorageObjectMetadata getObjectMetadata(String remoteStorageLocation, Str */ @Override public void download( - SFSession session, String command, String localLocation, String destFileName, @@ -232,18 +221,16 @@ public void download( logger.debug("Fetching result: {}", scrubPresignedUrl(presignedUrl)); - CloseableHttpClient httpClient = - HttpUtil.getHttpClientWithoutDecompression( - session.getHttpClientKey(), session.getHttpHeadersCustomizers()); + CloseableHttpClient httpClient = HttpUtil.getHttpClientWithoutDecompression(null, null); // Get the file on storage using the presigned url HttpResponseContextDto responseDto = RestRequest.executeWithRetries( httpClient, httpRequest, - session.getNetworkTimeoutInMilli() / 1000, // retry timeout + 0, // retry timeout (dead code path, session was always null) 0, - session.getHttpClientSocketTimeout(), + 0, // socket timeout (dead code path, session was always null) getMaxRetries(), 0, // no socket timeout injection null, // no canceling @@ -270,8 +257,7 @@ public void download( outStream.flush(); outStream.close(); bodyStream.close(); - StorageClientUtil.assureOnlyUserAccessibleFilePermissions( - localFile, session.isOwnerOnlyStageFilePermissionsEnabled()); + StorageClientUtil.assureOnlyUserAccessibleFilePermissions(localFile, false); if (isEncrypting()) { Map userDefinedHeaders = createCaseInsensitiveMap(response.getAllHeaders()); @@ -287,21 +273,20 @@ public void download( logger.debug("Download successful", false); } catch (IOException ex) { logger.debug("Download unsuccessful {}", ex); - handleStorageException(ex, ++retryCount, "download", session, command, queryId); + handleStorageException(ex, ++retryCount, "download", command, queryId); } } else { Exception ex = new HttpResponseException( response.getStatusLine().getStatusCode(), EntityUtils.toString(response.getEntity())); - handleStorageException(ex, ++retryCount, "download", session, command, queryId); + handleStorageException(ex, ++retryCount, "download", command, queryId); } } else { Map userDefinedMetadata = this.gcsAccessStrategy.download( parallelism, remoteStorageLocation, stageFilePath, localFile); - StorageClientUtil.assureOnlyUserAccessibleFilePermissions( - localFile, session.isOwnerOnlyStageFilePermissionsEnabled()); + StorageClientUtil.assureOnlyUserAccessibleFilePermissions(localFile, false); stopwatch.stop(); downloadMillis = stopwatch.elapsedMillis(); logger.debug("Download successful", false); @@ -324,7 +309,7 @@ public void download( if (key == null || iv == null) { throw new SnowflakeSQLLoggedException( queryId, - session, + null, ErrorCode.INTERNAL_ERROR.getMessageCode(), SqlState.INTERNAL_ERROR, "File metadata incomplete"); @@ -349,7 +334,7 @@ public void download( logger.error("Error decrypting file", ex); throw new SnowflakeSQLLoggedException( queryId, - session, + null, ErrorCode.INTERNAL_ERROR.getMessageCode(), SqlState.INTERNAL_ERROR, "Cannot decrypt file"); @@ -365,13 +350,13 @@ public void download( return; } catch (Exception ex) { logger.debug("Download unsuccessful {}", ex); - handleStorageException(ex, ++retryCount, "download", session, command, queryId); + handleStorageException(ex, ++retryCount, "download", command, queryId); } } while (retryCount <= getMaxRetries()); throw new SnowflakeSQLLoggedException( queryId, - session, + null, ErrorCode.INTERNAL_ERROR.getMessageCode(), SqlState.INTERNAL_ERROR, "Unexpected: download unsuccessful without exception!"); @@ -380,7 +365,6 @@ public void download( /** * Download a file from remote storage * - * @param session session object * @param command command to download file * @param parallelism number of threads for parallel downloading * @param remoteStorageLocation remote storage location, i.e. bucket for s3 @@ -393,7 +377,6 @@ public void download( */ @Override public InputStream downloadToStream( - SFSession session, String command, int parallelism, String remoteStorageLocation, @@ -422,18 +405,16 @@ public InputStream downloadToStream( logger.debug("Fetching result: {}", scrubPresignedUrl(presignedUrl)); - CloseableHttpClient httpClient = - HttpUtil.getHttpClientWithoutDecompression( - session.getHttpClientKey(), session.getHttpHeadersCustomizers()); + CloseableHttpClient httpClient = HttpUtil.getHttpClientWithoutDecompression(null, null); // Put the file on storage using the presigned url HttpResponse response = RestRequest.executeWithRetries( httpClient, httpRequest, - session.getNetworkTimeoutInMilli() / 1000, // retry timeout + 0, // retry timeout (dead code path, session was always null) 0, - session.getHttpClientSocketTimeout(), + 0, // socket timeout (dead code path, session was always null) getMaxRetries(), 0, // no socket timeout injection null, // no canceling @@ -467,14 +448,14 @@ public InputStream downloadToStream( logger.debug("Download successful", false); } catch (IOException ex) { logger.debug("Download unsuccessful {}", ex); - handleStorageException(ex, ++retryCount, "download", session, command, queryId); + handleStorageException(ex, ++retryCount, "download", command, queryId); } } else { Exception ex = new HttpResponseException( response.getStatusLine().getStatusCode(), EntityUtils.toString(response.getEntity())); - handleStorageException(ex, ++retryCount, "download", session, command, queryId); + handleStorageException(ex, ++retryCount, "download", command, queryId); } } else { SFPair> pair = @@ -525,7 +506,7 @@ public InputStream downloadToStream( logger.error("Error decrypting file", ex); throw new SnowflakeSQLLoggedException( queryId, - session, + null, ErrorCode.INTERNAL_ERROR.getMessageCode(), SqlState.INTERNAL_ERROR, "Cannot decrypt file"); @@ -541,13 +522,13 @@ public InputStream downloadToStream( return inputStream; } catch (Exception ex) { logger.debug("Download unsuccessful {}", ex); - handleStorageException(ex, ++retryCount, "download", session, command, queryId); + handleStorageException(ex, ++retryCount, "download", command, queryId); } } while (retryCount <= getMaxRetries()); throw new SnowflakeSQLLoggedException( queryId, - session, + null, ErrorCode.INTERNAL_ERROR.getMessageCode(), SqlState.INTERNAL_ERROR, "Unexpected: download unsuccessful without exception!"); @@ -659,7 +640,6 @@ public void uploadWithPresignedUrlWithoutConnection( /** * Upload a file/stream to remote storage * - * @param session session object * @param command upload command * @param parallelism [ not used by the GCP implementation ] * @param uploadFromStream true if upload source is stream @@ -676,7 +656,6 @@ public void uploadWithPresignedUrlWithoutConnection( */ @Override public void upload( - SFSession session, String command, int parallelism, boolean uploadFromStream, @@ -716,13 +695,13 @@ public void upload( if (!isNullOrEmpty(presignedUrl)) { logger.debug("Starting upload with downscope token", false); uploadWithPresignedUrl( - session.getNetworkTimeoutInMilli(), - session.getHttpClientSocketTimeout(), + 0, // dead code path, session was always null + 0, // dead code path, session was always null meta.getContentEncoding(), meta.getUserMetadata(), uploadStreamInfo.left, presignedUrl, - session.getHttpClientKey(), + null, // dead code path, session was always null queryId); stopwatch.stop(); logger.debug("Upload successful", false); @@ -785,12 +764,12 @@ public void upload( return; } catch (Exception ex) { - handleStorageException(ex, ++retryCount, "upload", session, command, queryId); + handleStorageException(ex, ++retryCount, "upload", command, queryId); if (uploadFromStream && fileBackedOutputStream == null) { throw new SnowflakeSQLLoggedException( queryId, - session, + null, SqlState.SYSTEM_ERROR, ErrorCode.IO_ERROR.getMessageCode(), ex, @@ -818,7 +797,7 @@ public void upload( throw new SnowflakeSQLLoggedException( queryId, - session, + null, ErrorCode.INTERNAL_ERROR.getMessageCode(), SqlState.INTERNAL_ERROR, "Unexpected: upload unsuccessful without exception!"); @@ -855,7 +834,7 @@ private void uploadWithDownScopedToken( content, queryId); } catch (Exception e) { - handleStorageException(e, 0, "upload", session, queryId); + handleStorageException(e, 0, "upload", queryId); SnowflakeSQLException wrappedException; if (e instanceof SnowflakeSQLException) { wrappedException = (SnowflakeSQLException) e; @@ -864,7 +843,7 @@ private void uploadWithDownScopedToken( wrappedException = new SnowflakeSQLLoggedException( queryId, - session, + null, SqlState.SYSTEM_ERROR, ErrorCode.IO_ERROR.getMessageCode(), e, @@ -918,8 +897,7 @@ private void uploadWithPresignedUrl( InputStreamEntity contentEntity = new InputStreamEntity(content, -1); httpRequest.setEntity(contentEntity); - CloseableHttpClient httpClient = - HttpUtil.getHttpClient(ocspAndProxyKey, session.getHttpHeadersCustomizers()); + CloseableHttpClient httpClient = HttpUtil.getHttpClient(ocspAndProxyKey, null); // Put the file on storage using the presigned url HttpResponse response = @@ -949,19 +927,19 @@ private void uploadWithPresignedUrl( new HttpResponseException( response.getStatusLine().getStatusCode(), EntityUtils.toString(response.getEntity())); - handleStorageException(ex, 0, "upload", session, null, queryId); + handleStorageException(ex, 0, "upload", null, queryId); } } catch (URISyntaxException e) { throw new SnowflakeSQLLoggedException( queryId, - session, + null, ErrorCode.INTERNAL_ERROR.getMessageCode(), SqlState.INTERNAL_ERROR, "Unexpected: upload presigned URL invalid"); } catch (Exception e) { throw new SnowflakeSQLLoggedException( queryId, - session, + null, ErrorCode.INTERNAL_ERROR.getMessageCode(), SqlState.INTERNAL_ERROR, "Unexpected: upload with presigned url failed"); @@ -1024,7 +1002,7 @@ private SFPair createUploadStream( logger.error("Failed to encrypt input", ex); throw new SnowflakeSQLLoggedException( queryId, - session, + null, SqlState.INTERNAL_ERROR, ErrorCode.INTERNAL_ERROR.getMessageCode(), ex, @@ -1048,7 +1026,7 @@ private SFPair createUploadStream( logger.error("Failed to open input file", ex); throw new SnowflakeSQLLoggedException( queryId, - session, + null, SqlState.INTERNAL_ERROR, ErrorCode.INTERNAL_ERROR.getMessageCode(), ex, @@ -1058,7 +1036,7 @@ private SFPair createUploadStream( logger.error("Failed to open input stream", ex); throw new SnowflakeSQLLoggedException( queryId, - session, + null, SqlState.INTERNAL_ERROR, ErrorCode.INTERNAL_ERROR.getMessageCode(), ex, @@ -1071,12 +1049,7 @@ private SFPair createUploadStream( @Override public void handleStorageException( - Exception ex, - int retryCount, - String operation, - SFSession session, - String command, - String queryId) + Exception ex, int retryCount, String operation, String command, String queryId) throws SnowflakeSQLException, net.snowflake.client.jdbc.SnowflakeSQLException { // no need to retry if it is invalid key exception if (ex.getCause() instanceof InvalidKeyException) { @@ -1088,18 +1061,18 @@ public void handleStorageException( // If there is no space left in the download location, java.io.IOException is thrown. // Don't retry. if (getRootCause(ex) instanceof IOException) { - StorageClientUtil.throwNoSpaceLeftError(session, operation, ex, queryId); + StorageClientUtil.throwNoSpaceLeftError(null, operation, ex, queryId); } if (this.gcsAccessStrategy.handleStorageException( - ex, retryCount, operation, session, command, queryId, this)) { + ex, retryCount, operation, command, queryId, this)) { // exception is handled in gcsAccessStrategy.handleStorageException } else if (ex instanceof InterruptedException || getRootCause(ex) instanceof SocketTimeoutException) { if (retryCount > getMaxRetries()) { throw new SnowflakeSQLLoggedException( queryId, - session, + null, SqlState.SYSTEM_ERROR, ErrorCode.IO_ERROR.getMessageCode(), ex, @@ -1114,7 +1087,7 @@ public void handleStorageException( } else { throw new SnowflakeSQLLoggedException( queryId, - session, + null, SqlState.SYSTEM_ERROR, ErrorCode.IO_ERROR.getMessageCode(), ex, @@ -1213,8 +1186,7 @@ public String getDigestMetadata(StorageObjectMetadata meta) { * required to decrypt/encrypt content in stage * @throws IllegalArgumentException when invalid credentials are used */ - private void setupGCSClient( - StageInfo stage, RemoteStoreFileEncryptionMaterial encMat, SFSession session) + private void setupGCSClient(StageInfo stage, RemoteStoreFileEncryptionMaterial encMat) throws IllegalArgumentException, SnowflakeSQLException, net.snowflake.client.jdbc.SnowflakeSQLException { @@ -1222,7 +1194,6 @@ private void setupGCSClient( // to reset the GCS client. this.stageInfo = stage; this.encMat = encMat; - this.session = session; logger.debug("Setting up the GCS client ", false); @@ -1230,9 +1201,9 @@ private void setupGCSClient( boolean overrideAwsAccessStrategy = Boolean.valueOf(System.getenv("SNOWFLAKE_GCS_FORCE_VIRTUAL_STYLE_DOMAINS")); if (stage.getUseVirtualUrl() || overrideAwsAccessStrategy) { - this.gcsAccessStrategy = new GCSAccessStrategyAwsSdk(stage, session); + this.gcsAccessStrategy = new GCSAccessStrategyAwsSdk(stage); } else { - this.gcsAccessStrategy = new GCSDefaultAccessStrategy(stage, session); + this.gcsAccessStrategy = new GCSDefaultAccessStrategy(stage); } if (encMat != null) { @@ -1253,9 +1224,8 @@ private void setupGCSClient( } } - protected static boolean areDisabledGcsDefaultCredentials(SFSession session) { - return session != null && session.getDisableGcsDefaultCredentials() - || convertSystemPropertyToBooleanValue(DISABLE_GCS_DEFAULT_CREDENTIALS_PROPERTY_NAME, true); + protected static boolean areDisabledGcsDefaultCredentials() { + return convertSystemPropertyToBooleanValue(DISABLE_GCS_DEFAULT_CREDENTIALS_PROPERTY_NAME, true); } private static boolean isSuccessStatusCode(int code) { diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/fileTransferAgent/SnowflakeS3Client.java b/src/main/java/net/snowflake/ingest/streaming/internal/fileTransferAgent/SnowflakeS3Client.java index f6a65d8fa..86372196b 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/fileTransferAgent/SnowflakeS3Client.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/fileTransferAgent/SnowflakeS3Client.java @@ -9,7 +9,7 @@ * StorageObjectSummaryCollection/S3ObjectMetadata/HttpHeadersCustomizer/ * HeaderCustomizerHttpRequestInterceptor/FileCompressionType use ingest versions (same package). * SnowflakeUtil static methods replaced with StorageClientUtil equivalents. - * SFSession/SFBaseSession kept from JDBC temporarily (always null from callers). + * SFSession/SFBaseSession removed (always null from callers). * @SnowflakeJdbcInternalApi removed. * CLOUD_STORAGE_CREDENTIALS_EXPIRED imported from ErrorCode. */ @@ -65,13 +65,10 @@ import java.util.concurrent.ExecutorService; import javax.crypto.SecretKey; import javax.crypto.spec.SecretKeySpec; -import net.snowflake.client.core.SFBaseSession; -import net.snowflake.client.core.SFSession; import net.snowflake.ingest.streaming.internal.fileTransferAgent.log.SFLogger; import net.snowflake.ingest.streaming.internal.fileTransferAgent.log.SFLoggerFactory; import net.snowflake.ingest.utils.HttpUtil; import net.snowflake.ingest.utils.SFPair; -import net.snowflake.ingest.utils.SFSessionProperty; import net.snowflake.ingest.utils.Stopwatch; import org.apache.commons.io.IOUtils; import org.apache.http.HttpStatus; @@ -98,7 +95,6 @@ public class SnowflakeS3Client implements SnowflakeStorageClient { private Properties proxyProperties = null; private String stageRegion = null; private String stageEndPoint = null; // FIPS endpoint, if needed - private SFBaseSession session = null; private boolean isClientSideEncrypted = true; private boolean isUseS3RegionalUrl = false; @@ -113,14 +109,12 @@ public SnowflakeS3Client( String stageRegion, String stageEndPoint, boolean isClientSideEncrypted, - SFBaseSession session, boolean useS3RegionalUrl) throws SnowflakeSQLException, net.snowflake.client.jdbc.SnowflakeSQLException { logger.debug( "Initializing Snowflake S3 client with encryption: {}, client side encrypted: {}", encMat != null, isClientSideEncrypted); - this.session = session; this.isUseS3RegionalUrl = useS3RegionalUrl; setupSnowflakeS3Client( stageCredentials, @@ -129,8 +123,7 @@ public SnowflakeS3Client( proxyProperties, stageRegion, stageEndPoint, - isClientSideEncrypted, - session); + isClientSideEncrypted); } private void setupSnowflakeS3Client( @@ -140,8 +133,7 @@ private void setupSnowflakeS3Client( Properties proxyProperties, String stageRegion, String stageEndPoint, - boolean isClientSideEncrypted, - SFBaseSession session) + boolean isClientSideEncrypted) throws SnowflakeSQLException, net.snowflake.client.jdbc.SnowflakeSQLException { // Save the client creation parameters so that we can reuse them, // to reset the AWS client. We won't save the awsCredentials since @@ -151,7 +143,6 @@ private void setupSnowflakeS3Client( this.encMat = encMat; this.proxyProperties = proxyProperties; this.stageEndPoint = stageEndPoint; // FIPS endpoint, if needed - this.session = session; this.isClientSideEncrypted = isClientSideEncrypted; logger.debug("Setting up AWS client ", false); @@ -169,11 +160,7 @@ private void setupSnowflakeS3Client( clientConfig.withSignerOverride("AWSS3V4SignerType"); clientConfig.getApacheHttpClientConfig().setSslSocketFactory(getSSLConnectionSocketFactory()); - if (session != null) { - S3HttpUtil.setProxyForS3(session.getHttpClientKey(), clientConfig); - } else { - S3HttpUtil.setSessionlessProxyForS3(proxyProperties, clientConfig); - } + S3HttpUtil.setSessionlessProxyForS3(proxyProperties, clientConfig); AmazonS3Builder amazonS3Builder = AmazonS3Client.builder(); if (encMat != null) { byte[] decodedKey = Base64.getDecoder().decode(encMat.getQueryStageMasterKey()); @@ -201,7 +188,7 @@ private void setupSnowflakeS3Client( } else { throw new SnowflakeSQLLoggedException( QueryIdHelper.queryIdFromEncMatOr(encMat, null), - session, + null, ErrorCode.INTERNAL_ERROR.getMessageCode(), SqlState.INTERNAL_ERROR, "unsupported key size", @@ -233,16 +220,6 @@ private void setupSnowflakeS3Client( // Explicitly force to use virtual address style amazonS3Builder.withPathStyleAccessEnabled(false); - if (session instanceof SFSession) { - List headersCustomizers = - ((SFSession) session).getHttpHeadersCustomizers(); - if (headersCustomizers != null && !headersCustomizers.isEmpty()) { - amazonS3Builder.withRequestHandlers( - new net.snowflake.client.core.HeaderCustomizerHttpRequestInterceptor( - headersCustomizers)); - } - } - amazonClient = (AmazonS3) amazonS3Builder.build(); } @@ -253,12 +230,6 @@ static String getDomainSuffixForRegionalUrl(String regionName) { // Returns the Max number of retry attempts @Override public int getMaxRetries() { - if (session != null - && session - .getConnectionPropertiesMap() - .containsKey(SFSessionProperty.PUT_GET_MAX_RETRIES)) { - return (int) session.getConnectionPropertiesMap().get(SFSessionProperty.PUT_GET_MAX_RETRIES); - } return 25; } @@ -304,8 +275,7 @@ public void renew(Map stageCredentials) this.proxyProperties, this.stageRegion, this.stageEndPoint, - this.isClientSideEncrypted, - this.session); + this.isClientSideEncrypted); } @Override @@ -331,7 +301,6 @@ public StorageObjectMetadata getObjectMetadata(String remoteStorageLocation, Str /** * Download a file from S3. * - * @param session session object * @param command command to download file * @param localLocation local file path * @param destFileName destination file name @@ -347,7 +316,6 @@ public StorageObjectMetadata getObjectMetadata(String remoteStorageLocation, Str */ @Override public void download( - SFSession session, String command, String localLocation, String destFileName, @@ -396,8 +364,7 @@ public ExecutorService newExecutor() { String iv = metaMap.get(AMZ_IV); myDownload.waitForCompletion(); - StorageClientUtil.assureOnlyUserAccessibleFilePermissions( - localFile, session.isOwnerOnlyStageFilePermissionsEnabled()); + StorageClientUtil.assureOnlyUserAccessibleFilePermissions(localFile, false); stopwatch.stop(); long downloadMillis = stopwatch.elapsedMillis(); @@ -406,7 +373,7 @@ public ExecutorService newExecutor() { if (key == null || iv == null) { throw new SnowflakeSQLLoggedException( queryId, - session, + null, ErrorCode.INTERNAL_ERROR.getMessageCode(), SqlState.INTERNAL_ERROR, "File metadata incomplete"); @@ -442,7 +409,7 @@ public ExecutorService newExecutor() { return; } catch (Exception ex) { - handleS3Exception(ex, ++retryCount, "download", session, command, this, queryId); + handleS3Exception(ex, ++retryCount, "download", command, this, queryId); } finally { if (tx != null) { @@ -453,7 +420,7 @@ public ExecutorService newExecutor() { throw new SnowflakeSQLLoggedException( queryId, - session, + null, ErrorCode.INTERNAL_ERROR.getMessageCode(), SqlState.INTERNAL_ERROR, "Unexpected: download unsuccessful without exception!"); @@ -462,7 +429,6 @@ public ExecutorService newExecutor() { /** * Download a file from remote storage * - * @param session session object * @param command command to download file * @param parallelism number of threads for parallel downloading * @param remoteStorageLocation remote storage location, i.e. bucket for s3 @@ -475,7 +441,6 @@ public ExecutorService newExecutor() { */ @Override public InputStream downloadToStream( - SFSession session, String command, int parallelism, String remoteStorageLocation, @@ -506,7 +471,7 @@ public InputStream downloadToStream( if (key == null || iv == null) { throw new SnowflakeSQLLoggedException( queryId, - session, + null, ErrorCode.INTERNAL_ERROR.getMessageCode(), SqlState.INTERNAL_ERROR, "File metadata incomplete"); @@ -539,13 +504,13 @@ public InputStream downloadToStream( } return stream; } catch (Exception ex) { - handleS3Exception(ex, ++retryCount, "download", session, command, this, queryId); + handleS3Exception(ex, ++retryCount, "download", command, this, queryId); } } while (retryCount <= getMaxRetries()); throw new SnowflakeSQLLoggedException( queryId, - session, + null, ErrorCode.INTERNAL_ERROR.getMessageCode(), SqlState.INTERNAL_ERROR, "Unexpected: download unsuccessful without exception!"); @@ -554,7 +519,6 @@ public InputStream downloadToStream( /** * Upload a file (-stream) to S3. * - * @param session session object * @param command upload command * @param parallelism number of threads do parallel uploading * @param uploadFromStream true if upload source is stream @@ -571,7 +535,6 @@ public InputStream downloadToStream( */ @Override public void upload( - SFSession session, String command, int parallelism, boolean uploadFromStream, @@ -676,7 +639,7 @@ public ExecutorService newExecutor() { return; } catch (Exception ex) { - handleS3Exception(ex, ++retryCount, "upload", session, command, this, queryId); + handleS3Exception(ex, ++retryCount, "upload", command, this, queryId); if (uploadFromStream && fileBackedOutputStream == null) { throw new SnowflakeSQLException( queryId, @@ -710,7 +673,7 @@ public ExecutorService newExecutor() { throw new SnowflakeSQLLoggedException( queryId, - session, + null, ErrorCode.INTERNAL_ERROR.getMessageCode(), SqlState.INTERNAL_ERROR, "Unexpected: upload unsuccessful without exception!"); @@ -758,7 +721,7 @@ private SFPair createUploadStream( logger.error("Failed to encrypt input", ex); throw new SnowflakeSQLLoggedException( queryId, - session, + null, SqlState.INTERNAL_ERROR, ErrorCode.INTERNAL_ERROR.getMessageCode(), ex, @@ -785,7 +748,7 @@ private SFPair createUploadStream( logger.error("Failed to open input file", ex); throw new SnowflakeSQLLoggedException( queryId, - session, + null, SqlState.INTERNAL_ERROR, ErrorCode.INTERNAL_ERROR.getMessageCode(), ex, @@ -795,7 +758,7 @@ private SFPair createUploadStream( logger.error("Failed to open input stream", ex); throw new SnowflakeSQLLoggedException( queryId, - session, + null, SqlState.INTERNAL_ERROR, ErrorCode.INTERNAL_ERROR.getMessageCode(), ex, @@ -808,21 +771,15 @@ private SFPair createUploadStream( @Override public void handleStorageException( - Exception ex, - int retryCount, - String operation, - SFSession session, - String command, - String queryId) + Exception ex, int retryCount, String operation, String command, String queryId) throws SnowflakeSQLException, net.snowflake.client.jdbc.SnowflakeSQLException { - handleS3Exception(ex, retryCount, operation, session, command, this, queryId); + handleS3Exception(ex, retryCount, operation, command, this, queryId); } private static void handleS3Exception( Exception ex, int retryCount, String operation, - SFSession session, String command, SnowflakeS3Client s3Client, String queryId) @@ -837,7 +794,7 @@ private static void handleS3Exception( // If there is no space left in the download location, java.io.IOException is thrown. // Don't retry. if (getRootCause(ex) instanceof IOException) { - StorageClientUtil.throwNoSpaceLeftError(session, operation, ex, queryId); + StorageClientUtil.throwNoSpaceLeftError(null, operation, ex, queryId); } // Don't retry if max retries has been reached or the error code is 404/400 @@ -854,30 +811,23 @@ private static void handleS3Exception( if (ex instanceof AmazonServiceException) { AmazonServiceException ex1 = (AmazonServiceException) ex; - // The AWS credentials might have expired when server returns error 400 and - // does not return the ExpiredToken error code. - // If session is null we cannot renew the token so throw the exception - if (ex1.getStatusCode() == HttpStatus.SC_BAD_REQUEST && session != null) { - SnowflakeFileTransferAgent.renewExpiredToken(session, command, s3Client); - } else { - throw new SnowflakeSQLLoggedException( - queryId, - session, - SqlState.SYSTEM_ERROR, - ErrorCode.S3_OPERATION_ERROR.getMessageCode(), - ex1, - operation, - ex1.getErrorType().toString(), - ex1.getErrorCode(), - ex1.getMessage(), - ex1.getRequestId(), - extendedRequestId); - } + throw new SnowflakeSQLLoggedException( + queryId, + null, + SqlState.SYSTEM_ERROR, + ErrorCode.S3_OPERATION_ERROR.getMessageCode(), + ex1, + operation, + ex1.getErrorType().toString(), + ex1.getErrorCode(), + ex1.getMessage(), + ex1.getRequestId(), + extendedRequestId); } else { throw new SnowflakeSQLLoggedException( queryId, - session, + null, SqlState.SYSTEM_ERROR, ErrorCode.AWS_CLIENT_ERROR.getMessageCode(), ex, @@ -912,16 +862,11 @@ private static void handleS3Exception( if (ex instanceof AmazonS3Exception) { AmazonS3Exception s3ex = (AmazonS3Exception) ex; if (s3ex.getErrorCode().equalsIgnoreCase(EXPIRED_AWS_TOKEN_ERROR_CODE)) { - // If session is null we cannot renew the token so throw the ExpiredToken exception - if (session != null) { - SnowflakeFileTransferAgent.renewExpiredToken(session, command, s3Client); - } else { - throw new SnowflakeSQLException( - queryId, - s3ex.getErrorCode(), - CLOUD_STORAGE_CREDENTIALS_EXPIRED, - "S3 credentials have expired"); - } + throw new SnowflakeSQLException( + queryId, + s3ex.getErrorCode(), + CLOUD_STORAGE_CREDENTIALS_EXPIRED, + "S3 credentials have expired"); } } } @@ -931,7 +876,7 @@ private static void handleS3Exception( if (retryCount > s3Client.getMaxRetries()) { throw new SnowflakeSQLLoggedException( queryId, - session, + null, SqlState.SYSTEM_ERROR, ErrorCode.IO_ERROR.getMessageCode(), ex, @@ -946,7 +891,7 @@ private static void handleS3Exception( } else { throw new SnowflakeSQLLoggedException( queryId, - session, + null, SqlState.SYSTEM_ERROR, ErrorCode.IO_ERROR.getMessageCode(), ex, diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/fileTransferAgent/SnowflakeStorageClient.java b/src/main/java/net/snowflake/ingest/streaming/internal/fileTransferAgent/SnowflakeStorageClient.java index 12c42fb3e..b7dc6dfdb 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/fileTransferAgent/SnowflakeStorageClient.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/fileTransferAgent/SnowflakeStorageClient.java @@ -5,7 +5,7 @@ * Permitted differences: package, @SnowflakeJdbcInternalApi removed, * ErrorCode/SqlState/SnowflakeSQLException/SnowflakeSQLLoggedException/MatDesc/ * FileBackedOutputStream/StorageObjectMetadata/HttpClientSettingsKey use ingest versions - * (same package). SFSession kept from JDBC temporarily (always null from callers). + * (same package). SFSession/SFBaseSession removed (always null from callers). * StorageObjectSummary/StorageObjectSummaryCollection/StorageProviderException use ingest versions. */ package net.snowflake.ingest.streaming.internal.fileTransferAgent; @@ -13,7 +13,6 @@ import java.io.File; import java.io.InputStream; import java.util.Map; -import net.snowflake.client.core.SFSession; /** Interface for storage client provider implementations */ public interface SnowflakeStorageClient { @@ -91,12 +90,11 @@ StorageObjectMetadata getObjectMetadata(String remoteStorageLocation, String pre /** * Download a file from remote storage. * - * @deprecated use {@link #download(SFSession, String, String, String, int, String, String, - * String, String, String)} + * @deprecated use {@link #download(String, String, String, int, String, String, String, String, + * String)} */ @Deprecated default void download( - SFSession connection, String command, String localLocation, String destFileName, @@ -107,7 +105,6 @@ default void download( String presignedUrl) throws SnowflakeSQLException, net.snowflake.client.jdbc.SnowflakeSQLException { download( - connection, command, localLocation, destFileName, @@ -122,7 +119,6 @@ default void download( /** * Download a file from remote storage. * - * @param connection connection object * @param command command to download file * @param localLocation local file path * @param destFileName destination file name @@ -135,7 +131,6 @@ default void download( * @throws SnowflakeSQLException download failure */ void download( - SFSession connection, String command, String localLocation, String destFileName, @@ -152,7 +147,6 @@ void download( */ @Deprecated default InputStream downloadToStream( - SFSession connection, String command, int parallelism, String remoteStorageLocation, @@ -161,7 +155,6 @@ default InputStream downloadToStream( String presignedUrl) throws SnowflakeSQLException, net.snowflake.client.jdbc.SnowflakeSQLException { return downloadToStream( - connection, command, parallelism, remoteStorageLocation, @@ -174,7 +167,6 @@ default InputStream downloadToStream( /** * Download a file from remote storage * - * @param connection connection object * @param command command to download file * @param parallelism number of threads for parallel downloading * @param remoteStorageLocation remote storage location, i.e. bucket for s3 @@ -186,7 +178,6 @@ default InputStream downloadToStream( * @throws SnowflakeSQLException when download failure */ InputStream downloadToStream( - SFSession connection, String command, int parallelism, String remoteStorageLocation, @@ -201,7 +192,6 @@ InputStream downloadToStream( */ @Deprecated default void upload( - SFSession connection, String command, int parallelism, boolean uploadFromStream, @@ -215,7 +205,6 @@ default void upload( String presignedUrl) throws SnowflakeSQLException, net.snowflake.client.jdbc.SnowflakeSQLException { upload( - connection, command, parallelism, uploadFromStream, @@ -233,7 +222,6 @@ default void upload( /** * Upload a file (-stream) to remote storage * - * @param connection connection object * @param command upload command * @param parallelism number of threads do parallel uploading * @param uploadFromStream true if upload source is stream @@ -249,7 +237,6 @@ default void upload( * @throws SnowflakeSQLException if upload failed even after retry */ void upload( - SFSession connection, String command, int parallelism, boolean uploadFromStream, @@ -336,9 +323,9 @@ default void uploadWithPresignedUrlWithoutConnection( */ @Deprecated default void handleStorageException( - Exception ex, int retryCount, String operation, SFSession connection, String command) + Exception ex, int retryCount, String operation, String command) throws SnowflakeSQLException, net.snowflake.client.jdbc.SnowflakeSQLException { - handleStorageException(ex, retryCount, operation, connection, command, null); + handleStorageException(ex, retryCount, operation, command, null); } /** @@ -348,18 +335,12 @@ default void handleStorageException( * @param retryCount current number of retries, incremented by the caller before each call * @param operation string that indicates the function/operation that was taking place, when the * exception was raised, for example "upload" - * @param connection the current SFSession object used by the client * @param command the command attempted at the time of the exception * @param queryId last query id * @throws SnowflakeSQLException exceptions not handled */ void handleStorageException( - Exception ex, - int retryCount, - String operation, - SFSession connection, - String command, - String queryId) + Exception ex, int retryCount, String operation, String command, String queryId) throws SnowflakeSQLException, net.snowflake.client.jdbc.SnowflakeSQLException; /** diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/fileTransferAgent/StorageClientFactory.java b/src/main/java/net/snowflake/ingest/streaming/internal/fileTransferAgent/StorageClientFactory.java index d58dfcf55..e1c540297 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/fileTransferAgent/StorageClientFactory.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/fileTransferAgent/StorageClientFactory.java @@ -4,7 +4,7 @@ * * Permitted differences: package, SFLogger uses ingest's replicated version, * HttpUtil.isSocksProxyDisabled() uses ingest's HttpUtil, SFBaseSession/SFSession - * kept from JDBC temporarily (always null from callers), all storage types use + * removed (always null from callers), all storage types use * ingest versions (same package). */ package net.snowflake.ingest.streaming.internal.fileTransferAgent; @@ -12,8 +12,6 @@ import com.amazonaws.ClientConfiguration; import java.util.Map; import java.util.Properties; -import net.snowflake.client.core.SFBaseSession; -import net.snowflake.client.core.SFSession; import net.snowflake.ingest.streaming.internal.fileTransferAgent.log.SFLogger; import net.snowflake.ingest.streaming.internal.fileTransferAgent.log.SFLoggerFactory; import net.snowflake.ingest.utils.HttpUtil; @@ -49,21 +47,17 @@ public static StorageClientFactory getFactory() { * @param stage the stage properties * @param parallel the degree of parallelism to be used by the client * @param encMat encryption material for the client - * @param session SFSession * @return a SnowflakeStorageClient interface to the instance created * @throws SnowflakeSQLException if any error occurs */ public SnowflakeStorageClient createClient( - StageInfo stage, int parallel, RemoteStoreFileEncryptionMaterial encMat, SFSession session) + StageInfo stage, int parallel, RemoteStoreFileEncryptionMaterial encMat) throws SnowflakeSQLException, net.snowflake.client.jdbc.SnowflakeSQLException { logger.debug("Creating storage client. Client type: {}", stage.getStageType().name()); switch (stage.getStageType()) { case S3: - boolean useS3RegionalUrl = - stage.getUseS3RegionalUrl() - || stage.getUseRegionalUrl() - || session != null && session.getUseRegionalS3EndpointsForPresignedURL(); + boolean useS3RegionalUrl = stage.getUseS3RegionalUrl() || stage.getUseRegionalUrl(); return createS3Client( stage.getCredentials(), parallel, @@ -72,14 +66,13 @@ public SnowflakeStorageClient createClient( stage.getRegion(), stage.getEndPoint(), stage.getIsClientSideEncrypted(), - session, useS3RegionalUrl); case AZURE: - return createAzureClient(stage, encMat, session); + return createAzureClient(stage, encMat); case GCS: - return createGCSClient(stage, encMat, session); + return createGCSClient(stage, encMat); default: // We don't create a storage client for FS_LOCAL, @@ -112,7 +105,6 @@ private SnowflakeS3Client createS3Client( String stageRegion, String stageEndPoint, boolean isClientSideEncrypted, - SFBaseSession session, boolean useS3RegionalUrl) throws SnowflakeSQLException, net.snowflake.client.jdbc.SnowflakeSQLException { final int S3_TRANSFER_MAX_RETRIES = 3; @@ -151,7 +143,6 @@ private SnowflakeS3Client createS3Client( stageRegion, stageEndPoint, isClientSideEncrypted, - session, useS3RegionalUrl); } catch (Exception ex) { logger.debug("Exception creating s3 client", ex); @@ -197,14 +188,14 @@ public StorageObjectMetadata createStorageMetadataObj(StageInfo.StageType stageT * @return the SnowflakeS3Client instance created */ private SnowflakeAzureClient createAzureClient( - StageInfo stage, RemoteStoreFileEncryptionMaterial encMat, SFBaseSession session) + StageInfo stage, RemoteStoreFileEncryptionMaterial encMat) throws SnowflakeSQLException, net.snowflake.client.jdbc.SnowflakeSQLException { logger.debug("Creating Azure client with encryption: {}", (encMat == null ? "no" : "yes")); SnowflakeAzureClient azureClient; try { - azureClient = SnowflakeAzureClient.createSnowflakeAzureClient(stage, encMat, session); + azureClient = SnowflakeAzureClient.createSnowflakeAzureClient(stage, encMat); } catch (Exception ex) { logger.debug("Exception creating Azure Storage client", ex); throw ex; @@ -222,14 +213,14 @@ private SnowflakeAzureClient createAzureClient( * @return the SnowflakeGCSClient instance created */ private SnowflakeGCSClient createGCSClient( - StageInfo stage, RemoteStoreFileEncryptionMaterial encMat, SFSession session) + StageInfo stage, RemoteStoreFileEncryptionMaterial encMat) throws SnowflakeSQLException, net.snowflake.client.jdbc.SnowflakeSQLException { logger.debug("Creating GCS client with encryption: {}", (encMat == null ? "no" : "yes")); SnowflakeGCSClient gcsClient; try { - gcsClient = SnowflakeGCSClient.createSnowflakeGCSClient(stage, encMat, session); + gcsClient = SnowflakeGCSClient.createSnowflakeGCSClient(stage, encMat); } catch (Exception ex) { logger.debug("Exception creating GCS Storage client", ex); throw ex;