diff --git a/.plans/JDBC_REMOVAL_PLAN.md b/.plans/JDBC_REMOVAL_PLAN.md index c6a4cb46b..346da843c 100644 --- a/.plans/JDBC_REMOVAL_PLAN.md +++ b/.plans/JDBC_REMOVAL_PLAN.md @@ -278,8 +278,12 @@ is replaced. | Step 9c — Swap telemetry imports | ✅ Open | #1131 | | Step 10a — Replicate SFException, ExecTimeTelemetryData, etc. | ✅ Open | #1132 | | Step 10b — Swap SFException imports | ✅ Open | #1134 | -| Step 10c — Remove SFSession/SFBaseSession | ⬜ TODO | — | -| Step 10d — Demote JDBC to test scope | ⬜ TODO | — | +| Step 10c — Remove SFSession from storage stack | ✅ Open | #1135 | +| Step 10c2 — Remove SFSession from exceptions + telemetry | ✅ Open | #1136 | +| Step 11a — Replace JDBC HTTP calls with HttpRequestHelper | ⬜ TODO | — | +| Step 11b — Remove FQN SnowflakeSQLException from throws | ⬜ TODO | — | +| Step 11c — Clean up remaining FQN JDBC references | ⬜ TODO | — | +| Step 11d — Demote JDBC to test scope | ⬜ TODO | — | **Closed PRs:** #1117 (reverted 7b approach), #1122 (reverted 8c approach) **Other PRs:** #1118 (error/exception tests on master), #1133 (Maven retry config) @@ -616,34 +620,74 @@ NOT swapped — they interact with JDBC's `RestRequest.executeWithRetries()`. --- -### Step 10c — Remove SFSession/SFBaseSession ⬜ TODO +### Step 10c — Remove SFSession from storage stack ✅ Open (PR #1135) -SFSession/SFBaseSession are always null from ingest callers. Not feasible -to replicate (1498+1404 lines, 156-class transitive closure = 40K lines). -Need to remove these parameter types. +**Done:** Remove SFSession/SFBaseSession parameters and dead session-based +code from storage clients, interface, strategies, factory, agent, config. +Session was always null from ingest callers. -336 lines. --- -### Step 10d — Demote JDBC to test scope ⬜ TODO +### Step 10c2 — Remove SFSession from exceptions + telemetry ✅ Open (PR #1136) -Remaining 27 JDBC imports after Step 10b (all unreplicable due to massive -dependency chains): -- `SFSession`/`SFBaseSession` (15) — parameter types, always null -- `HttpUtil` (2) — GCS client + TelemetryClient -- `RestRequest` (1) — GCS client -- `SnowflakeConnectionV1` (1) — TelemetryClient session path -- `SnowflakeSQLException` (JDBC's, 1) — TelemetryClient -- `ExecTimeTelemetryData`/`HttpResponseContextDto` (2) — GCS client - (replicated but interact with JDBC RestRequest) -- IB `Telemetry`/`TelemetryField`/`TelemetryUtil` (3) — - interact with session.getTelemetryClient() -- `SFSession` in `SnowflakeSQLLoggedException` (2) — parameter types +**Done:** Remove SFSession/SFBaseSession from SnowflakeSQLLoggedException +(all 15 constructors) and TelemetryClient (session-based code). Remove IB +telemetry dead code. Update all callers (~12 files). -339 lines. -Then: +After 10c2: 6 JDBC imports remain + ~70 FQN JDBC references in throws/params. + +--- + +### Step 11a — Replace JDBC HTTP calls with HttpRequestHelper ⬜ TODO + +Create `HttpRequestHelper` utility with retry logic (replaces JDBC's +`RestRequest.executeWithRetries` and `HttpUtil.executeGeneralRequest`). +Replace the 6 remaining JDBC imports: + +- `TelemetryClient`: replace `HttpUtil.executeGeneralRequest()` + + `SnowflakeSQLException` catch +- `SnowflakeGCSClient`: replace `HttpUtil.getHttpClientWithoutDecompression()`, + `HttpUtil.getHttpClient()`, `RestRequest.executeWithRetries()`, + `HttpUtil.getSocketTimeout()`. Remove `ExecTimeTelemetryData`, + `HttpResponseContextDto`, `RestRequest` imports. + +--- + +### Step 11b — Remove FQN SnowflakeSQLException from throws ⬜ TODO + +Mechanical removal of `, net.snowflake.client.jdbc.SnowflakeSQLException` +from ~47 throws clauses across all replicated storage clients, interface, +strategies, factory, and GCS client. + +--- + +### Step 11c — Clean up remaining FQN JDBC references ⬜ TODO + +Swap remaining FQN JDBC type references to ingest versions: +- `net.snowflake.client.core.HttpClientSettingsKey` → `HttpClientSettingsKey` + (same package, 8 occurrences in S3HttpUtil, SnowflakeFileTransferAgent, + SnowflakeGCSClient, SnowflakeStorageClient) +- `net.snowflake.client.core.HttpProtocol` → `HttpProtocol` (same package, + 1 occurrence in S3HttpUtil) +- `net.snowflake.client.core.OCSPMode` → `net.snowflake.ingest.utils.OCSPMode` + (2 occurrences in SnowflakeFileTransferAgent) +- `net.snowflake.client.jdbc.SnowflakeUtil.convertProxyPropertiesToHttpClientKey` + → `StorageClientUtil.convertProxyPropertiesToHttpClientKey` (2 occurrences + in SnowflakeFileTransferAgent) +- `static import net.snowflake.client.core.HttpUtil.setSessionlessProxyForAzure` + → replicate method in StorageClientUtil (4 occurrences in SnowflakeAzureClient) +- `net.snowflake.client.jdbc.cloud.storage.AwsSdkGCPSigner` — JDBC class + reference in GCSAccessStrategyAwsSdk (string constant + class reference, + 3 occurrences) + +--- + +### Step 11d — Demote JDBC to test scope ⬜ TODO + +After all FQN references are cleaned up: 1. Demote `snowflake-jdbc-thin` to `test` scope in `pom.xml` 2. Remove JDBC shade relocation rules from Maven Shade plugin -3. Remove `snowflake-jdbc-thin` from `public_pom.xml` -4. Run full test suite +3. Run full test suite --- diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/fileTransferAgent/DecorrelatedJitterBackoff.java b/src/main/java/net/snowflake/ingest/streaming/internal/fileTransferAgent/DecorrelatedJitterBackoff.java new file mode 100644 index 000000000..5aeaaa287 --- /dev/null +++ b/src/main/java/net/snowflake/ingest/streaming/internal/fileTransferAgent/DecorrelatedJitterBackoff.java @@ -0,0 +1,39 @@ +/* + * Replicated from snowflake-jdbc (v3.25.1) + * Source: https://github.com/snowflakedb/snowflake-jdbc/blob/v3.25.1/src/main/java/net/snowflake/client/util/DecorrelatedJitterBackoff.java + * + * Permitted differences: package. + */ +package net.snowflake.ingest.streaming.internal.fileTransferAgent; + +import java.util.concurrent.ThreadLocalRandom; + +/** + * Decorrelated Jitter backoff + * + *

https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/ + */ +public class DecorrelatedJitterBackoff { + private final long base; + private final long cap; + + public DecorrelatedJitterBackoff(long base, long cap) { + this.base = base; + this.cap = cap; + } + + public long nextSleepTime(long sleep) { + long correctedSleep = sleep <= base ? base + 1 : sleep; + return Math.min(cap, ThreadLocalRandom.current().nextLong(base, correctedSleep)); + } + + public long getJitterForLogin(long currentTime) { + double multiplicationFactor = chooseRandom(-1, 1); + long jitter = (long) (multiplicationFactor * currentTime * 0.5); + return jitter; + } + + public double chooseRandom(double min, double max) { + return min + (Math.random() * (max - min)); + } +} diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/fileTransferAgent/HttpExecutingContext.java b/src/main/java/net/snowflake/ingest/streaming/internal/fileTransferAgent/HttpExecutingContext.java new file mode 100644 index 000000000..8a9c07cb6 --- /dev/null +++ b/src/main/java/net/snowflake/ingest/streaming/internal/fileTransferAgent/HttpExecutingContext.java @@ -0,0 +1,311 @@ +/* + * Replicated from snowflake-jdbc (v3.25.1) + * Source: https://github.com/snowflakedb/snowflake-jdbc/blob/v3.25.1/src/main/java/net/snowflake/client/core/HttpExecutingContext.java + * + * Permitted differences: package declaration, import swaps for already-replicated classes, + * @SnowflakeJdbcInternalApi annotation removed. + */ +package net.snowflake.ingest.streaming.internal.fileTransferAgent; + +import java.util.concurrent.atomic.AtomicBoolean; + +public class HttpExecutingContext { + + // min backoff in milli before we retry due to transient issues + private static final long minBackoffMillis = 1000; + + // max backoff in milli before we retry due to transient issues + // we double the backoff after each retry till we reach the max backoff + private static final long maxBackoffMillis = 16000; + + // retry at least once even if timeout limit has been reached + private static final int MIN_RETRY_COUNT = 1; + + // retry at least once even if timeout limit has been reached + private static final int DEFAULT_RETRY_TIMEOUT = 300; + + private final String requestId; + private final String requestInfoScrubbed; + private final long startTime; + // start time for each request, + // used for keeping track how much time we have spent + // due to network issues so that we can compare against the user + // specified network timeout to make sure we do not retry infinitely + // when there are transient network/GS issues + private long startTimePerRequest; + // Used to indicate that this is a login/auth request and will be using the new retry strategy. + private boolean isLoginRequest; + // Tracks the total time spent handling transient network issues and retries during HTTP requests + private long elapsedMilliForTransientIssues; + private long retryTimeout; + private long authTimeout; + private DecorrelatedJitterBackoff backoff; + private long backoffInMillis; + private int origSocketTimeout; + private String breakRetryReason; + private String breakRetryEventName; + private String lastStatusCodeForRetry; + private int retryCount; + private int maxRetries; + private boolean noRetry; + private int injectSocketTimeout; + private boolean retryHTTP403; + private boolean shouldRetry; + private boolean skipRetriesBecauseOf200; // todo create skip retry reason enum + private boolean withoutCookies; + private boolean includeRetryParameters; + private boolean includeRequestGuid; + private boolean unpackResponse; + private AtomicBoolean canceling; + + public HttpExecutingContext(String requestIdStr, String requestInfoScrubbed) { + this.requestId = requestIdStr; + this.requestInfoScrubbed = requestInfoScrubbed; + this.startTime = System.currentTimeMillis(); + this.startTimePerRequest = startTime; + this.backoff = new DecorrelatedJitterBackoff(getMinBackoffInMillis(), getMaxBackoffInMilli()); + this.backoffInMillis = minBackoffMillis; + } + + public String getRequestId() { + return requestId; + } + + public long getStartTime() { + return startTime; + } + + public long getStartTimePerRequest() { + return startTimePerRequest; + } + + public void setStartTimePerRequest(long startTimePerRequest) { + this.startTimePerRequest = startTimePerRequest; + } + + public boolean isLoginRequest() { + return isLoginRequest; + } + + public void setLoginRequest(boolean loginRequest) { + isLoginRequest = loginRequest; + } + + public long getElapsedMilliForTransientIssues() { + return elapsedMilliForTransientIssues; + } + + public long getRetryTimeoutInMilliseconds() { + return retryTimeout * 1000; + } + + public long getRetryTimeout() { + return retryTimeout; + } + + public void setRetryTimeout(long retryTimeout) { + this.retryTimeout = retryTimeout; + } + + public long getMinBackoffInMillis() { + return minBackoffMillis; + } + + public long getBackoffInMillis() { + return backoffInMillis; + } + + public void setBackoffInMillis(long backoffInMillis) { + this.backoffInMillis = backoffInMillis; + } + + public long getMaxBackoffInMilli() { + return maxBackoffMillis; + } + + public long getAuthTimeout() { + return authTimeout; + } + + public long getAuthTimeoutInMilliseconds() { + return authTimeout * 1000; + } + + public void setAuthTimeout(long authTimeout) { + this.authTimeout = authTimeout; + } + + public DecorrelatedJitterBackoff getBackoff() { + return backoff; + } + + public void setBackoff(DecorrelatedJitterBackoff backoff) { + this.backoff = backoff; + } + + public int getOrigSocketTimeout() { + return origSocketTimeout; + } + + public void setOrigSocketTimeout(int origSocketTimeout) { + this.origSocketTimeout = origSocketTimeout; + } + + public String getBreakRetryReason() { + return breakRetryReason; + } + + public void setBreakRetryReason(String breakRetryReason) { + this.breakRetryReason = breakRetryReason; + } + + public String getBreakRetryEventName() { + return breakRetryEventName; + } + + public void setBreakRetryEventName(String breakRetryEventName) { + this.breakRetryEventName = breakRetryEventName; + } + + public String getLastStatusCodeForRetry() { + return lastStatusCodeForRetry; + } + + public void setLastStatusCodeForRetry(String lastStatusCodeForRetry) { + this.lastStatusCodeForRetry = lastStatusCodeForRetry; + } + + public int getRetryCount() { + return retryCount; + } + + public void setRetryCount(int retryCount) { + this.retryCount = retryCount; + } + + public void resetRetryCount() { + this.retryCount = 0; + } + + public void incrementRetryCount() { + this.retryCount++; + } + + public int getMaxRetries() { + return maxRetries; + } + + public void setMaxRetries(int maxRetries) { + this.maxRetries = maxRetries; + } + + public String getRequestInfoScrubbed() { + return requestInfoScrubbed; + } + + public boolean isNoRetry() { + return noRetry; + } + + public void setNoRetry(boolean noRetry) { + this.noRetry = noRetry; + } + + public boolean isRetryHTTP403() { + return retryHTTP403; + } + + public void setRetryHTTP403(boolean retryHTTP403) { + this.retryHTTP403 = retryHTTP403; + } + + public boolean isShouldRetry() { + return shouldRetry; + } + + public void setShouldRetry(boolean shouldRetry) { + this.shouldRetry = shouldRetry; + } + + public void increaseElapsedMilliForTransientIssues(long elapsedMilliForLastCall) { + this.elapsedMilliForTransientIssues += elapsedMilliForLastCall; + } + + public boolean elapsedTimeExceeded() { + return elapsedMilliForTransientIssues > getRetryTimeoutInMilliseconds(); + } + + public boolean moreThanMinRetries() { + return retryCount >= MIN_RETRY_COUNT; + } + + public boolean maxRetriesExceeded() { + return maxRetries > 0 && retryCount >= maxRetries; + } + + public boolean socketOrConnectTimeoutReached() { + return authTimeout > 0 + && elapsedMilliForTransientIssues > getAuthTimeoutInMilliseconds() + && (origSocketTimeout == 0 || elapsedMilliForTransientIssues < origSocketTimeout); + } + + public AtomicBoolean getCanceling() { + return canceling; + } + + public void setCanceling(AtomicBoolean canceling) { + this.canceling = canceling; + } + + public boolean isIncludeRequestGuid() { + return includeRequestGuid; + } + + public void setIncludeRequestGuid(boolean includeRequestGuid) { + this.includeRequestGuid = includeRequestGuid; + } + + public boolean isWithoutCookies() { + return withoutCookies; + } + + public void setWithoutCookies(boolean withoutCookies) { + this.withoutCookies = withoutCookies; + } + + public int isInjectSocketTimeout() { + return injectSocketTimeout; + } + + public void setInjectSocketTimeout(int injectSocketTimeout) { + this.injectSocketTimeout = injectSocketTimeout; + } + + public int getInjectSocketTimeout() { + return injectSocketTimeout; + } + + public boolean isIncludeRetryParameters() { + return includeRetryParameters; + } + + public boolean isUnpackResponse() { + return unpackResponse; + } + + public void setUnpackResponse(boolean unpackResponse) { + this.unpackResponse = unpackResponse; + } + + public void setIncludeRetryParameters(boolean includeRetryParameters) { + this.includeRetryParameters = includeRetryParameters; + } + + public boolean isSkipRetriesBecauseOf200() { + return skipRetriesBecauseOf200; + } + + public void setSkipRetriesBecauseOf200(boolean skipRetriesBecauseOf200) { + this.skipRetriesBecauseOf200 = skipRetriesBecauseOf200; + } +} diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/fileTransferAgent/HttpExecutingContextBuilder.java b/src/main/java/net/snowflake/ingest/streaming/internal/fileTransferAgent/HttpExecutingContextBuilder.java new file mode 100644 index 000000000..d990a9e2a --- /dev/null +++ b/src/main/java/net/snowflake/ingest/streaming/internal/fileTransferAgent/HttpExecutingContextBuilder.java @@ -0,0 +1,285 @@ +/* + * Replicated from snowflake-jdbc (v3.25.1) + * Source: https://github.com/snowflakedb/snowflake-jdbc/blob/v3.25.1/src/main/java/net/snowflake/client/core/HttpExecutingContextBuilder.java + * + * Permitted differences: package declaration, @SnowflakeJdbcInternalApi annotation removed. + */ +package net.snowflake.ingest.streaming.internal.fileTransferAgent; + +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Builder class for {@link HttpExecutingContext}. Provides a fluent interface for constructing + * HttpExecutingContext instances with many optional parameters. + */ +public class HttpExecutingContextBuilder { + private final String requestId; + private final String requestInfoScrubbed; + private long retryTimeout; + private long authTimeout; + private int origSocketTimeout; + private int maxRetries; + private int injectSocketTimeout; + private AtomicBoolean canceling; + private boolean withoutCookies; + private boolean includeRetryParameters; + private boolean includeRequestGuid; + private boolean retryHTTP403; + private boolean noRetry; + private boolean unpackResponse; + private boolean isLoginRequest; + + /** + * Creates a new builder instance with required parameters. + * + * @param requestId Request ID for logging and tracking + * @param requestInfoScrubbed Scrubbed request info for logging + */ + public HttpExecutingContextBuilder(String requestId, String requestInfoScrubbed) { + this.requestId = requestId; + this.requestInfoScrubbed = requestInfoScrubbed; + } + + /** + * Copy constructor to create a new builder from an existing HttpExecutingContext. + * + * @param context The context to copy settings from + */ + public HttpExecutingContextBuilder(HttpExecutingContext context) { + this.requestId = context.getRequestId(); + this.requestInfoScrubbed = context.getRequestInfoScrubbed(); + this.retryTimeout = context.getRetryTimeout(); + this.authTimeout = context.getAuthTimeout(); + this.origSocketTimeout = context.getOrigSocketTimeout(); + this.maxRetries = context.getMaxRetries(); + this.injectSocketTimeout = context.getInjectSocketTimeout(); + this.canceling = context.getCanceling(); + this.withoutCookies = context.isWithoutCookies(); + this.includeRetryParameters = context.isIncludeRetryParameters(); + this.includeRequestGuid = context.isIncludeRequestGuid(); + this.retryHTTP403 = context.isRetryHTTP403(); + this.noRetry = context.isNoRetry(); + this.unpackResponse = context.isUnpackResponse(); + this.isLoginRequest = context.isLoginRequest(); + } + + /** + * Creates a new builder for a login request with common defaults. + * + * @param requestId Request ID for logging and tracking + * @param requestInfoScrubbed Scrubbed request info for logging + * @return A new builder instance configured for login requests + */ + public static HttpExecutingContextBuilder forLogin(String requestId, String requestInfoScrubbed) { + return new HttpExecutingContextBuilder(requestId, requestInfoScrubbed) + .loginRequest(true) + .includeRequestGuid(true) + .retryHTTP403(true); + } + + /** + * Creates a new builder for a query request with common defaults. + * + * @param requestId Request ID for logging and tracking + * @param requestInfoScrubbed Scrubbed request info for logging + * @return A new builder instance configured for query requests + */ + public static HttpExecutingContextBuilder forQuery(String requestId, String requestInfoScrubbed) { + return new HttpExecutingContextBuilder(requestId, requestInfoScrubbed) + .includeRetryParameters(true) + .includeRequestGuid(true) + .unpackResponse(true); + } + + /** + * Creates a new builder for a simple HTTP request with minimal retry settings. + * + * @param requestId Request ID for logging and tracking + * @param requestInfoScrubbed Scrubbed request info for logging + * @return A new builder instance configured for simple requests + */ + public static HttpExecutingContextBuilder forSimpleRequest( + String requestId, String requestInfoScrubbed) { + return new HttpExecutingContextBuilder(requestId, requestInfoScrubbed) + .noRetry(true) + .includeRequestGuid(true); + } + + /** + * Creates a new builder with default settings for retryable requests. + * + * @param requestId Request ID for logging and tracking + * @param requestInfoScrubbed Scrubbed request info for logging + * @return A new builder instance with default retry settings + */ + public static HttpExecutingContextBuilder withRequest( + String requestId, String requestInfoScrubbed) { + return new HttpExecutingContextBuilder(requestId, requestInfoScrubbed); + } + + /** + * Sets the retry timeout in seconds. + * + * @param retryTimeout Retry timeout in seconds + * @return this builder instance + */ + public HttpExecutingContextBuilder retryTimeout(long retryTimeout) { + this.retryTimeout = retryTimeout; + return this; + } + + /** + * Sets the authentication timeout in seconds. + * + * @param authTimeout Authentication timeout in seconds + * @return this builder instance + */ + public HttpExecutingContextBuilder authTimeout(long authTimeout) { + this.authTimeout = authTimeout; + return this; + } + + /** + * Sets the original socket timeout in milliseconds. + * + * @param origSocketTimeout Socket timeout in milliseconds + * @return this builder instance + */ + public HttpExecutingContextBuilder origSocketTimeout(int origSocketTimeout) { + this.origSocketTimeout = origSocketTimeout; + return this; + } + + /** + * Sets the maximum number of retries. + * + * @param maxRetries Maximum number of retries + * @return this builder instance + */ + public HttpExecutingContextBuilder maxRetries(int maxRetries) { + this.maxRetries = maxRetries; + return this; + } + + /** + * Sets the injected socket timeout for testing. + * + * @param injectSocketTimeout Socket timeout to inject + * @return this builder instance + */ + public HttpExecutingContextBuilder injectSocketTimeout(int injectSocketTimeout) { + this.injectSocketTimeout = injectSocketTimeout; + return this; + } + + /** + * Sets the canceling flag. + * + * @param canceling AtomicBoolean for cancellation + * @return this builder instance + */ + public HttpExecutingContextBuilder canceling(AtomicBoolean canceling) { + this.canceling = canceling; + return this; + } + + /** + * Sets whether to disable cookies. + * + * @param withoutCookies true to disable cookies + * @return this builder instance + */ + public HttpExecutingContextBuilder withoutCookies(boolean withoutCookies) { + this.withoutCookies = withoutCookies; + return this; + } + + /** + * Sets whether to include retry parameters in requests. + * + * @param includeRetryParameters true to include retry parameters + * @return this builder instance + */ + public HttpExecutingContextBuilder includeRetryParameters(boolean includeRetryParameters) { + this.includeRetryParameters = includeRetryParameters; + return this; + } + + /** + * Sets whether to include request GUID. + * + * @param includeRequestGuid true to include request GUID + * @return this builder instance + */ + public HttpExecutingContextBuilder includeRequestGuid(boolean includeRequestGuid) { + this.includeRequestGuid = includeRequestGuid; + return this; + } + + /** + * Sets whether to retry on HTTP 403 errors. + * + * @param retryHTTP403 true to retry on HTTP 403 + * @return this builder instance + */ + public HttpExecutingContextBuilder retryHTTP403(boolean retryHTTP403) { + this.retryHTTP403 = retryHTTP403; + return this; + } + + /** + * Sets whether to disable retries. + * + * @param noRetry true to disable retries + * @return this builder instance + */ + public HttpExecutingContextBuilder noRetry(boolean noRetry) { + this.noRetry = noRetry; + return this; + } + + /** + * Sets whether to unpack the response. + * + * @param unpackResponse true to unpack response + * @return this builder instance + */ + public HttpExecutingContextBuilder unpackResponse(boolean unpackResponse) { + this.unpackResponse = unpackResponse; + return this; + } + + /** + * Sets whether this is a login request. + * + * @param isLoginRequest true if this is a login request + * @return this builder instance + */ + public HttpExecutingContextBuilder loginRequest(boolean isLoginRequest) { + this.isLoginRequest = isLoginRequest; + return this; + } + + /** + * Builds and returns a new HttpExecutingContext instance with the configured parameters. + * + * @return A new HttpExecutingContext instance + */ + public HttpExecutingContext build() { + HttpExecutingContext context = new HttpExecutingContext(requestId, requestInfoScrubbed); + context.setRetryTimeout(retryTimeout); + context.setAuthTimeout(authTimeout); + context.setOrigSocketTimeout(origSocketTimeout); + context.setMaxRetries(maxRetries); + context.setInjectSocketTimeout(injectSocketTimeout); + context.setCanceling(canceling); + context.setWithoutCookies(withoutCookies); + context.setIncludeRetryParameters(includeRetryParameters); + context.setIncludeRequestGuid(includeRequestGuid); + context.setRetryHTTP403(retryHTTP403); + context.setNoRetry(noRetry); + context.setUnpackResponse(unpackResponse); + context.setLoginRequest(isLoginRequest); + return context; + } +} diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/fileTransferAgent/OCSPErrorCode.java b/src/main/java/net/snowflake/ingest/streaming/internal/fileTransferAgent/OCSPErrorCode.java new file mode 100644 index 000000000..43afa4da0 --- /dev/null +++ b/src/main/java/net/snowflake/ingest/streaming/internal/fileTransferAgent/OCSPErrorCode.java @@ -0,0 +1,26 @@ +/* + * Replicated from snowflake-jdbc (v3.25.1) + * Source: https://github.com/snowflakedb/snowflake-jdbc/blob/v3.25.1/src/main/java/net/snowflake/client/jdbc/OCSPErrorCode.java + * + * Permitted differences: package declaration. + */ +package net.snowflake.ingest.streaming.internal.fileTransferAgent; + +public enum OCSPErrorCode { + CERTIFICATE_STATUS_GOOD, + CERTIFICATE_STATUS_REVOKED, + CERTIFICATE_STATUS_UNKNOWN, + OCSP_CACHE_DOWNLOAD_TIMEOUT, + OCSP_RESPONSE_FETCH_TIMEOUT, + OCSP_RESPONSE_FETCH_FAILURE, + INVALID_CACHE_SERVER_URL, + EXPIRED_OCSP_SIGNING_CERTIFICATE, + INVALID_CERTIFICATE_SIGNATURE, + INVALID_OCSP_RESPONSE_SIGNATURE, + INVALID_OCSP_RESPONSE_VALIDITY, + INVALID_OCSP_RESPONSE, + REVOCATION_CHECK_FAILURE, + INVALID_SSD, + NO_OCSP_URL_ATTACHED, + NO_ROOTCA_FOUND +} diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/fileTransferAgent/RestRequest.java b/src/main/java/net/snowflake/ingest/streaming/internal/fileTransferAgent/RestRequest.java new file mode 100644 index 000000000..f3cb54a04 --- /dev/null +++ b/src/main/java/net/snowflake/ingest/streaming/internal/fileTransferAgent/RestRequest.java @@ -0,0 +1,1305 @@ +/* + * Replicated from snowflake-jdbc (v3.25.1) + * Source: https://github.com/snowflakedb/snowflake-jdbc/blob/v3.25.1/src/main/java/net/snowflake/client/jdbc/RestRequest.java + * + * Permitted differences: package declaration, import swaps for already-replicated classes, + * @SnowflakeJdbcInternalApi annotation removed. + * SnowflakeUtil.isNullOrEmpty -> StorageClientUtil.isNullOrEmpty (already in package). + * SnowflakeUtil.logResponseDetails -> FQN net.snowflake.client.jdbc.SnowflakeUtil.logResponseDetails. + * SessionUtil.isNewRetryStrategyRequest -> FQN net.snowflake.client.core.SessionUtil.isNewRetryStrategyRequest. + * HttpUtil references kept as FQN net.snowflake.client.core.HttpUtil (to be replicated in next PR). + */ +package net.snowflake.ingest.streaming.internal.fileTransferAgent; + +import static net.snowflake.ingest.streaming.internal.fileTransferAgent.StorageClientUtil.isNullOrEmpty; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.PrintWriter; +import java.io.StringWriter; +import java.net.URISyntaxException; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; +import javax.net.ssl.SSLHandshakeException; +import javax.net.ssl.SSLKeyException; +import javax.net.ssl.SSLPeerUnverifiedException; +import javax.net.ssl.SSLProtocolException; +import net.snowflake.ingest.streaming.internal.fileTransferAgent.log.ArgSupplier; +import net.snowflake.ingest.streaming.internal.fileTransferAgent.log.Event; +import net.snowflake.ingest.streaming.internal.fileTransferAgent.log.EventUtil; +import net.snowflake.ingest.streaming.internal.fileTransferAgent.log.SFLogger; +import net.snowflake.ingest.streaming.internal.fileTransferAgent.log.SFLoggerFactory; +import net.snowflake.ingest.streaming.internal.fileTransferAgent.log.SecretDetector; +import net.snowflake.ingest.streaming.internal.fileTransferAgent.log.UUIDUtils; +import net.snowflake.ingest.utils.Stopwatch; +import org.apache.commons.io.IOUtils; +import org.apache.http.Header; +import org.apache.http.HttpResponse; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpRequestBase; +import org.apache.http.client.utils.URIBuilder; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.util.EntityUtils; + +/** + * This is an abstraction on top of http client. + * + *

Currently it only has one method for retrying http request execution so that the same logic + * doesn't have to be replicated at difference places where retry is needed. + */ +public class RestRequest { + private static final SFLogger logger = SFLoggerFactory.getLogger(RestRequest.class); + + // Request guid per HTTP request + private static final String SF_REQUEST_GUID = "request_guid"; + + // min backoff in milli before we retry due to transient issues + private static final long minBackoffInMilli = 1000; + + // max backoff in milli before we retry due to transient issues + // we double the backoff after each retry till we reach the max backoff + private static final long maxBackoffInMilli = 16000; + + // retry at least once even if timeout limit has been reached + private static final int MIN_RETRY_COUNT = 1; + + static final String ERROR_FIELD_NAME = "error"; + static final String ERROR_USE_DPOP_NONCE = "use_dpop_nonce"; + static final String DPOP_NONCE_HEADER_NAME = "dpop-nonce"; + + static final Set> sslExceptions = + new HashSet<>( + Arrays.asList( + SSLHandshakeException.class, + SSLKeyException.class, + SSLPeerUnverifiedException.class, + SSLProtocolException.class)); + + /** + * Execute an HTTP request with retry logic. + * + * @param httpClient client object used to communicate with other machine + * @param httpRequest request object contains all the request information + * @param retryTimeout : retry timeout (in seconds) + * @param authTimeout : authenticator specific timeout (in seconds) + * @param socketTimeout : curl timeout (in ms) + * @param maxRetries : max retry count for the request + * @param injectSocketTimeout : simulate socket timeout + * @param canceling canceling flag + * @param withoutCookies whether the cookie spec should be set to IGNORE or not + * @param includeRetryParameters whether to include retry parameters in retried requests. Only + * needs to be true for JDBC statement execution (query requests to Snowflake server). + * @param includeRequestGuid whether to include request_guid parameter + * @param retryHTTP403 whether to retry on HTTP 403 or not should be executed before and/or after + * the retry + * @return HttpResponse Object get from server + * @throws net.snowflake.ingest.streaming.internal.fileTransferAgent.SnowflakeSQLException Request + * timeout Exception or Illegal State Exception i.e. connection is already shutdown etc + */ + public static CloseableHttpResponse execute( + CloseableHttpClient httpClient, + HttpRequestBase httpRequest, + long retryTimeout, + long authTimeout, + int socketTimeout, + int maxRetries, + int injectSocketTimeout, + AtomicBoolean canceling, + boolean withoutCookies, + boolean includeRetryParameters, + boolean includeRequestGuid, + boolean retryHTTP403, + ExecTimeTelemetryData execTimeTelemetryData) + throws SnowflakeSQLException { + return execute( + httpClient, + httpRequest, + retryTimeout, + authTimeout, + socketTimeout, + maxRetries, + injectSocketTimeout, + canceling, + withoutCookies, + includeRetryParameters, + includeRequestGuid, + retryHTTP403, + false, // noRetry + execTimeTelemetryData, + null); + } + + /** + * Execute an HTTP request with retry logic. + * + * @param httpClient client object used to communicate with other machine + * @param httpRequest request object contains all the request information + * @param retryTimeout : retry timeout (in seconds) + * @param authTimeout : authenticator specific timeout (in seconds) + * @param socketTimeout : curl timeout (in ms) + * @param maxRetries : max retry count for the request + * @param injectSocketTimeout : simulate socket timeout + * @param canceling canceling flag + * @param withoutCookies whether the cookie spec should be set to IGNORE or not + * @param includeRetryParameters whether to include retry parameters in retried requests. Only + * needs to be true for JDBC statement execution (query requests to Snowflake server). + * @param includeRequestGuid whether to include request_guid parameter + * @param retryHTTP403 whether to retry on HTTP 403 or not should be executed before and/or after + * the retry + * @return HttpResponse Object get from server + * @throws net.snowflake.ingest.streaming.internal.fileTransferAgent.SnowflakeSQLException Request + * timeout Exception or Illegal State Exception i.e. connection is already shutdown etc + */ + public static CloseableHttpResponse execute( + CloseableHttpClient httpClient, + HttpRequestBase httpRequest, + long retryTimeout, + long authTimeout, + int socketTimeout, + int maxRetries, + int injectSocketTimeout, + AtomicBoolean canceling, + boolean withoutCookies, + boolean includeRetryParameters, + boolean includeRequestGuid, + boolean retryHTTP403, + boolean noRetry, + ExecTimeTelemetryData execTimeData) + throws SnowflakeSQLException { + return execute( + httpClient, + httpRequest, + retryTimeout, + authTimeout, + socketTimeout, + maxRetries, + injectSocketTimeout, + canceling, + withoutCookies, + includeRetryParameters, + includeRequestGuid, + retryHTTP403, + noRetry, + execTimeData, + null); + } + + /** + * Execute an HTTP request with retry logic. + * + * @param httpClient client object used to communicate with other machine + * @param httpRequest request object contains all the request information + * @param retryTimeout : retry timeout (in seconds) + * @param authTimeout : authenticator specific timeout (in seconds) + * @param socketTimeout : curl timeout (in ms) + * @param maxRetries : max retry count for the request + * @param injectSocketTimeout : simulate socket timeout + * @param canceling canceling flag + * @param withoutCookies whether the cookie spec should be set to IGNORE or not + * @param includeRetryParameters whether to include retry parameters in retried requests. Only + * needs to be true for JDBC statement execution (query requests to Snowflake server). + * @param includeRequestGuid whether to include request_guid parameter + * @param retryHTTP403 whether to retry on HTTP 403 or not + * @param execTimeData ExecTimeTelemetryData should be executed before and/or after the retry + * @return HttpResponse Object get from server + * @throws net.snowflake.ingest.streaming.internal.fileTransferAgent.SnowflakeSQLException Request + * timeout Exception or Illegal State Exception i.e. connection is already shutdown etc + */ + public static CloseableHttpResponse execute( + CloseableHttpClient httpClient, + HttpRequestBase httpRequest, + long retryTimeout, + long authTimeout, + int socketTimeout, + int maxRetries, + int injectSocketTimeout, + AtomicBoolean canceling, + boolean withoutCookies, + boolean includeRetryParameters, + boolean includeRequestGuid, + boolean retryHTTP403, + ExecTimeTelemetryData execTimeData, + RetryContextManager retryContextManager) + throws SnowflakeSQLException { + return execute( + httpClient, + httpRequest, + retryTimeout, + authTimeout, + socketTimeout, + maxRetries, + injectSocketTimeout, + canceling, + withoutCookies, + includeRetryParameters, + includeRequestGuid, + retryHTTP403, + false, // noRetry + execTimeData, + retryContextManager); + } + + /** + * Execute an HTTP request with retry logic. + * + * @param httpClient client object used to communicate with other machine + * @param httpRequest request object contains all the request information + * @param retryTimeout : retry timeout (in seconds) + * @param authTimeout : authenticator specific timeout (in seconds) + * @param socketTimeout : curl timeout (in ms) + * @param maxRetries : max retry count for the request + * @param injectSocketTimeout : simulate socket timeout + * @param canceling canceling flag + * @param withoutCookies whether the cookie spec should be set to IGNORE or not + * @param includeRetryParameters whether to include retry parameters in retried requests. Only + * needs to be true for JDBC statement execution (query requests to Snowflake server). + * @param includeRequestGuid whether to include request_guid parameter + * @param retryHTTP403 whether to retry on HTTP 403 or not + * @param noRetry should we disable retry on non-successful http resp code + * @param execTimeData ExecTimeTelemetryData + * @param retryManager RetryContextManager - object allowing to optionally pass custom logic that + * should be executed before and/or after the retry + * @return HttpResponse Object get from server + * @throws net.snowflake.ingest.streaming.internal.fileTransferAgent.SnowflakeSQLException Request + * timeout Exception or Illegal State Exception i.e. connection is already shutdown etc + */ + public static CloseableHttpResponse execute( + CloseableHttpClient httpClient, + HttpRequestBase httpRequest, + long retryTimeout, + long authTimeout, + int socketTimeout, + int maxRetries, + int injectSocketTimeout, + AtomicBoolean canceling, + boolean withoutCookies, + boolean includeRetryParameters, + boolean includeRequestGuid, + boolean retryHTTP403, + boolean noRetry, + ExecTimeTelemetryData execTimeData, + RetryContextManager retryManager) + throws SnowflakeSQLException { + return executeWithRetries( + httpClient, + httpRequest, + retryTimeout, + authTimeout, + socketTimeout, + maxRetries, + injectSocketTimeout, + canceling, // no canceling + withoutCookies, // no cookie + includeRetryParameters, // no retry + includeRequestGuid, // no request_guid + retryHTTP403, // retry on HTTP 403 + noRetry, + new ExecTimeTelemetryData()) + .getHttpResponse(); + } + + static long getNewBackoffInMilli( + long previousBackoffInMilli, + boolean isLoginRequest, + DecorrelatedJitterBackoff decorrelatedJitterBackoff, + int retryCount, + long retryTimeoutInMilliseconds, + long elapsedMilliForTransientIssues) { + long backoffInMilli; + if (isLoginRequest) { + long jitteredBackoffInMilli = + decorrelatedJitterBackoff.getJitterForLogin(previousBackoffInMilli); + backoffInMilli = + (long) + decorrelatedJitterBackoff.chooseRandom( + jitteredBackoffInMilli + previousBackoffInMilli, + Math.pow(2, retryCount) + jitteredBackoffInMilli); + } else { + + backoffInMilli = decorrelatedJitterBackoff.nextSleepTime(previousBackoffInMilli); + } + + backoffInMilli = Math.min(maxBackoffInMilli, Math.max(previousBackoffInMilli, backoffInMilli)); + + if (retryTimeoutInMilliseconds > 0 + && (elapsedMilliForTransientIssues + backoffInMilli) > retryTimeoutInMilliseconds) { + // If the timeout will be reached before the next backoff, just use the remaining + // time (but cannot be negative) - this is the only place when backoff is not in range + // min-max. + backoffInMilli = + Math.max( + 0, + Math.min( + backoffInMilli, retryTimeoutInMilliseconds - elapsedMilliForTransientIssues)); + logger.debug( + "We are approaching retry timeout {}ms, setting backoff to {}ms", + retryTimeoutInMilliseconds, + backoffInMilli); + } + return backoffInMilli; + } + + static boolean isNonRetryableHTTPCode(CloseableHttpResponse response, boolean retryHTTP403) { + return (response != null) + && (response.getStatusLine().getStatusCode() < 500 + || // service unavailable + response.getStatusLine().getStatusCode() >= 600) + && // gateway timeout + response.getStatusLine().getStatusCode() != 408 + && // retry + response.getStatusLine().getStatusCode() != 429 + && // request timeout + (!retryHTTP403 || response.getStatusLine().getStatusCode() != 403); + } + + private static boolean isCertificateRevoked(Exception ex) { + if (ex == null) { + return false; + } + Throwable ex0 = getRootCause(ex); + if (!(ex0 instanceof SFOCSPException)) { + return false; + } + SFOCSPException cause = (SFOCSPException) ex0; + return cause.getErrorCode() == OCSPErrorCode.CERTIFICATE_STATUS_REVOKED; + } + + private static Throwable getRootCause(Throwable ex) { + Throwable ex0 = ex; + while (ex0.getCause() != null) { + ex0 = ex0.getCause(); + } + return ex0; + } + + private static void setRequestConfig( + HttpRequestBase httpRequest, + boolean withoutCookies, + int injectSocketTimeout, + String requestIdStr, + long authTimeoutInMilli) { + if (withoutCookies) { + httpRequest.setConfig(net.snowflake.client.core.HttpUtil.getRequestConfigWithoutCookies()); + } + + // For first call, simulate a socket timeout by setting socket timeout + // to the injected socket timeout value + if (injectSocketTimeout != 0) { + // test code path + logger.debug( + "{}Injecting socket timeout by setting socket timeout to {} ms", + requestIdStr, + injectSocketTimeout); + httpRequest.setConfig( + net.snowflake.client.core.HttpUtil.getDefaultRequestConfigWithSocketTimeout( + injectSocketTimeout, withoutCookies)); + } + + // When the auth timeout is set, set the socket timeout as the authTimeout + // so that it can be renewed in time and pass it to the http request configuration. + if (authTimeoutInMilli > 0) { + int requestSocketAndConnectTimeout = (int) authTimeoutInMilli; + logger.debug( + "{}Setting auth timeout as the socket timeout: {} ms", requestIdStr, authTimeoutInMilli); + httpRequest.setConfig( + net.snowflake.client.core.HttpUtil.getDefaultRequestConfigWithSocketAndConnectTimeout( + requestSocketAndConnectTimeout, withoutCookies)); + } + } + + private static void setRequestURI( + HttpRequestBase httpRequest, + String requestIdStr, + boolean includeRetryParameters, + boolean includeRequestGuid, + int retryCount, + String lastStatusCodeForRetry, + long startTime, + String requestInfoScrubbed) + throws URISyntaxException { + /* + * Add retryCount if the first request failed + * GS can use the parameter for optimization. Specifically GS + * will only check metadata database to see if a query has been running + * for a retry request. This way for the majority of query requests + * which are not part of retry we don't have to pay the performance + * overhead of looking up in metadata database. + */ + URIBuilder builder = new URIBuilder(httpRequest.getURI()); + // If HTAP + if ("true".equalsIgnoreCase(System.getenv("HTAP_SIMULATION")) + && builder.getPathSegments().contains("query-request")) { + logger.debug("{}Setting htap simulation", requestIdStr); + builder.setParameter("target", "htap_simulation"); + } + if (includeRetryParameters && retryCount > 0) { + updateRetryParameters(builder, retryCount, lastStatusCodeForRetry, startTime); + } + + if (includeRequestGuid) { + UUID guid = UUIDUtils.getUUID(); + logger.debug("{}Request {} guid: {}", requestIdStr, requestInfoScrubbed, guid.toString()); + // Add request_guid for better tracing + builder.setParameter(SF_REQUEST_GUID, guid.toString()); + } + + httpRequest.setURI(builder.build()); + } + + /** + * Execute an HTTP request with retry logic. + * + * @param httpClient client object used to communicate with other machine + * @param httpRequest request object contains all the request information + * @param retryTimeout : retry timeout (in seconds) + * @param authTimeout : authenticator specific timeout (in seconds) + * @param socketTimeout : curl timeout (in ms) + * @param maxRetries : max retry count for the request + * @param injectSocketTimeout : simulate socket timeout + * @param canceling canceling flag + * @param withoutCookies whether the cookie spec should be set to IGNORE or not + * @param includeRetryParameters whether to include retry parameters in retried requests. Only + * needs to be true for JDBC statement execution (query requests to Snowflake server). + * @param includeRequestGuid whether to include request_guid parameter + * @param retryHTTP403 whether to retry on HTTP 403 or not + * @return HttpResponseContextDto Object get from server or exception + * @throws net.snowflake.ingest.streaming.internal.fileTransferAgent.SnowflakeSQLException Request + * timeout Exception or Illegal State Exception i.e. connection is already shutdown etc + */ + public static HttpResponseContextDto executeWithRetries( + CloseableHttpClient httpClient, + HttpRequestBase httpRequest, + long retryTimeout, + long authTimeout, + int socketTimeout, + int maxRetries, + int injectSocketTimeout, + AtomicBoolean canceling, + boolean withoutCookies, + boolean includeRetryParameters, + boolean includeRequestGuid, + boolean retryHTTP403, + boolean unpackResponse, + ExecTimeTelemetryData execTimeTelemetryData) + throws SnowflakeSQLException { + return executeWithRetries( + httpClient, + httpRequest, + retryTimeout, + authTimeout, + socketTimeout, + maxRetries, + injectSocketTimeout, + canceling, + withoutCookies, + includeRetryParameters, + includeRequestGuid, + retryHTTP403, + false, + unpackResponse, + execTimeTelemetryData); + } + + /** + * Execute an HTTP request with retry logic. + * + * @param httpClient client object used to communicate with other machine + * @param httpRequest request object contains all the request information + * @param retryTimeout : retry timeout (in seconds) + * @param authTimeout : authenticator specific timeout (in seconds) + * @param socketTimeout : curl timeout (in ms) + * @param maxRetries : max retry count for the request + * @param injectSocketTimeout : simulate socket timeout + * @param canceling canceling flag + * @param withoutCookies whether the cookie spec should be set to IGNORE or not + * @param includeRetryParameters whether to include retry parameters in retried requests. Only + * needs to be true for JDBC statement execution (query requests to Snowflake server). + * @param includeRequestGuid whether to include request_guid parameter + * @param retryHTTP403 whether to retry on HTTP 403 or not + * @param execTimeTelemetryData ExecTimeTelemetryData should be executed before and/or after the + * retry + * @return HttpResponseContextDto Object get from server or exception + * @throws net.snowflake.ingest.streaming.internal.fileTransferAgent.SnowflakeSQLException Request + * timeout Exception or Illegal State Exception i.e. connection is already shutdown etc + */ + public static HttpResponseContextDto executeWithRetries( + CloseableHttpClient httpClient, + HttpRequestBase httpRequest, + long retryTimeout, + long authTimeout, + int socketTimeout, + int maxRetries, + int injectSocketTimeout, + AtomicBoolean canceling, + boolean withoutCookies, + boolean includeRetryParameters, + boolean includeRequestGuid, + boolean retryHTTP403, + boolean noRetry, + boolean unpackResponse, + ExecTimeTelemetryData execTimeTelemetryData) + throws SnowflakeSQLException { + String requestIdStr = URLUtil.getRequestIdLogStr(httpRequest.getURI()); + String requestInfoScrubbed = SecretDetector.maskSASToken(httpRequest.toString()); + HttpExecutingContext context = + HttpExecutingContextBuilder.withRequest(requestIdStr, requestInfoScrubbed) + .retryTimeout(retryTimeout) + .authTimeout(authTimeout) + .origSocketTimeout(socketTimeout) + .maxRetries(maxRetries) + .injectSocketTimeout(injectSocketTimeout) + .canceling(canceling) + .withoutCookies(withoutCookies) + .includeRetryParameters(includeRetryParameters) + .includeRequestGuid(includeRequestGuid) + .retryHTTP403(retryHTTP403) + .noRetry(noRetry) + .unpackResponse(unpackResponse) + .loginRequest( + net.snowflake.client.core.SessionUtil.isNewRetryStrategyRequest(httpRequest)) + .build(); + return executeWithRetries(httpClient, httpRequest, context, execTimeTelemetryData, null); + } + + /** + * Execute an HTTP request with retry logic. + * + * @param httpClient client object used to communicate with other machine + * @param httpRequest request object contains all the request information + * @param execTimeData ExecTimeTelemetryData should be executed before and/or after the retry + * @param retryManager RetryManager containing extra actions used during retries + * @return HttpResponseContextDto Object get from server or exception + * @throws net.snowflake.ingest.streaming.internal.fileTransferAgent.SnowflakeSQLException Request + * timeout Exception or Illegal State Exception i.e. connection is already shutdown etc + */ + public static HttpResponseContextDto executeWithRetries( + CloseableHttpClient httpClient, + HttpRequestBase httpRequest, + HttpExecutingContext httpExecutingContext, + ExecTimeTelemetryData execTimeData, + RetryContextManager retryManager) + throws SnowflakeSQLException { + Stopwatch networkComunnicationStapwatch = null; + Stopwatch requestReponseStopWatch = null; + HttpResponseContextDto responseDto = new HttpResponseContextDto(); + + if (logger.isDebugEnabled()) { + networkComunnicationStapwatch = new Stopwatch(); + networkComunnicationStapwatch.start(); + logger.debug( + "{}Executing rest request: {}, retry timeout: {}, socket timeout: {}, max retries: {}," + + " inject socket timeout: {}, canceling: {}, without cookies: {}, include retry" + + " parameters: {}, include request guid: {}, retry http 403: {}, no retry: {}", + httpExecutingContext.getRequestId(), + httpExecutingContext.getRequestInfoScrubbed(), + httpExecutingContext.getRetryTimeoutInMilliseconds(), + httpExecutingContext.getOrigSocketTimeout(), + httpExecutingContext.getMaxRetries(), + httpExecutingContext.isInjectSocketTimeout(), + httpExecutingContext.getCanceling(), + httpExecutingContext.isWithoutCookies(), + httpExecutingContext.isIncludeRetryParameters(), + httpExecutingContext.isIncludeRequestGuid(), + httpExecutingContext.isRetryHTTP403(), + httpExecutingContext.isNoRetry()); + } + if (httpExecutingContext.isLoginRequest()) { + logger.debug( + "{}Request is a login/auth request. Using new retry strategy", + httpExecutingContext.getRequestId()); + } + + RestRequest.setRequestConfig( + httpRequest, + httpExecutingContext.isWithoutCookies(), + httpExecutingContext.getInjectSocketTimeout(), + httpExecutingContext.getRequestId(), + httpExecutingContext.getAuthTimeoutInMilliseconds()); + + // try request till we get a good response or retry timeout + while (true) { + logger.debug( + "{}Retry count: {}, max retries: {}, retry timeout: {} s, backoff: {} ms. Attempting" + + " request: {}", + httpExecutingContext.getRequestId(), + httpExecutingContext.getRetryCount(), + httpExecutingContext.getMaxRetries(), + httpExecutingContext.getRetryTimeout(), + httpExecutingContext.getMinBackoffInMillis(), + httpExecutingContext.getRequestInfoScrubbed()); + try { + // update start time + httpExecutingContext.setStartTimePerRequest(System.currentTimeMillis()); + + RestRequest.setRequestURI( + httpRequest, + httpExecutingContext.getRequestId(), + httpExecutingContext.isIncludeRetryParameters(), + httpExecutingContext.isIncludeRequestGuid(), + httpExecutingContext.getRetryCount(), + httpExecutingContext.getLastStatusCodeForRetry(), + httpExecutingContext.getStartTime(), + httpExecutingContext.getRequestInfoScrubbed()); + + execTimeData.setHttpClientStart(); + CloseableHttpResponse response = httpClient.execute(httpRequest); + responseDto.setHttpResponse(response); + execTimeData.setHttpClientEnd(); + } catch (Exception ex) { + responseDto.setSavedEx(handlingNotRetryableException(ex, httpExecutingContext)); + } finally { + // Reset the socket timeout to its original value if it is not the + // very first iteration. + if (httpExecutingContext.getInjectSocketTimeout() != 0 + && httpExecutingContext.getRetryCount() == 0) { + // test code path + httpRequest.setConfig( + net.snowflake.client.core.HttpUtil.getDefaultRequestConfigWithSocketTimeout( + httpExecutingContext.getOrigSocketTimeout(), + httpExecutingContext.isWithoutCookies())); + } + } + boolean shouldSkipRetry = + shouldSkipRetryWithLoggedReason(httpRequest, responseDto, httpExecutingContext); + httpExecutingContext.setShouldRetry(!shouldSkipRetry); + + if (httpExecutingContext.isUnpackResponse() + && responseDto.getHttpResponse() != null + && responseDto.getHttpResponse().getStatusLine().getStatusCode() + == 200) { // todo extract getter for statusCode + processHttpResponse(httpExecutingContext, execTimeData, responseDto); + } + + if (!httpExecutingContext.isShouldRetry()) { + if (responseDto.getHttpResponse() == null) { + if (responseDto.getSavedEx() != null) { + logger.error( + "{}Returning null response. Cause: {}, request: {}", + httpExecutingContext.getRequestId(), + getRootCause(responseDto.getSavedEx()), + httpExecutingContext.getRequestInfoScrubbed()); + } else { + logger.error( + "{}Returning null response for request: {}", + httpExecutingContext.getRequestId(), + httpExecutingContext.getRequestInfoScrubbed()); + } + } else if (responseDto.getHttpResponse().getStatusLine().getStatusCode() != 200) { + logger.error( + "{}Error response: HTTP Response code: {}, request: {}", + httpExecutingContext.getRequestId(), + responseDto.getHttpResponse().getStatusLine().getStatusCode(), + httpExecutingContext.getRequestInfoScrubbed()); + responseDto.setSavedEx( + new SnowflakeSQLException( + SqlState.IO_ERROR, + ErrorCode.NETWORK_ERROR.getMessageCode(), + "HTTP status=" + + ((responseDto.getHttpResponse() != null) + ? responseDto.getHttpResponse().getStatusLine().getStatusCode() + : "null response"))); + } else if ((responseDto.getHttpResponse() == null + || responseDto.getHttpResponse().getStatusLine().getStatusCode() != 200)) { + sendTelemetryEvent( + httpRequest, + httpExecutingContext, + responseDto.getHttpResponse(), + responseDto.getSavedEx()); + } + break; + } else { + prepareRetry(httpRequest, httpExecutingContext, retryManager, responseDto); + } + } + + logger.debug( + "{}Execution of request {} took {} ms with total of {} retries", + httpExecutingContext.getRequestId(), + httpExecutingContext.getRequestInfoScrubbed(), + networkComunnicationStapwatch == null + ? "n/a" + : networkComunnicationStapwatch.elapsedMillis(), + httpExecutingContext.getRetryCount()); + + httpExecutingContext.resetRetryCount(); + if (logger.isDebugEnabled() && networkComunnicationStapwatch != null) { + networkComunnicationStapwatch.stop(); + } + if (responseDto.getSavedEx() != null) { + Exception savedEx = responseDto.getSavedEx(); + if (savedEx instanceof SnowflakeSQLException) { + throw (SnowflakeSQLException) savedEx; + } else { + throw new SnowflakeSQLException( + savedEx, + ErrorCode.NETWORK_ERROR, + "Exception encountered for HTTP request: " + savedEx.getMessage()); + } + } + return responseDto; + } + + private static void processHttpResponse( + HttpExecutingContext httpExecutingContext, + ExecTimeTelemetryData execTimeData, + HttpResponseContextDto responseDto) { + CloseableHttpResponse response = responseDto.getHttpResponse(); + try { + String responseText; + responseText = verifyAndUnpackResponse(response, execTimeData); + httpExecutingContext.setShouldRetry(false); + responseDto.setUnpackedCloseableHttpResponse(responseText); + } catch (IOException ex) { + boolean skipRetriesBecauseOf200 = httpExecutingContext.isSkipRetriesBecauseOf200(); + boolean retryReasonDifferentThan200 = + !httpExecutingContext.isShouldRetry() && skipRetriesBecauseOf200; + httpExecutingContext.setShouldRetry(retryReasonDifferentThan200); + responseDto.setSavedEx(ex); + } + } + + private static void updateRetryParameters( + URIBuilder builder, int retryCount, String lastStatusCodeForRetry, long startTime) { + builder.setParameter("retryCount", String.valueOf(retryCount)); + builder.setParameter("retryReason", lastStatusCodeForRetry); + builder.setParameter("clientStartTime", String.valueOf(startTime)); + } + + private static void prepareRetry( + HttpRequestBase httpRequest, + HttpExecutingContext httpExecutingContext, + RetryContextManager retryManager, + HttpResponseContextDto dto) + throws SnowflakeSQLException { + // Potentially retryable error + logRequestResult( + dto.getHttpResponse(), + httpExecutingContext.getRequestId(), + httpExecutingContext.getRequestInfoScrubbed(), + dto.getSavedEx()); + + // get the elapsed time for the last request + // elapsed in millisecond for last call, used for calculating the + // remaining amount of time to sleep: + // (backoffInMilli - elapsedMilliForLastCall) + long elapsedMilliForLastCall = + System.currentTimeMillis() - httpExecutingContext.getStartTimePerRequest(); + + if (httpExecutingContext.socketOrConnectTimeoutReached()) + /* socket timeout not reached */ { + /* connect timeout not reached */ + // check if this is a login-request + if (String.valueOf(httpRequest.getURI()).contains("login-request")) { + throw new SnowflakeSQLException( + ErrorCode.AUTHENTICATOR_REQUEST_TIMEOUT, + httpExecutingContext.getRetryCount(), + true, + httpExecutingContext.getElapsedMilliForTransientIssues() / 1000); + } + } + + // sleep for backoff - elapsed amount of time + sleepForBackoffAndPrepareNext(elapsedMilliForLastCall, httpExecutingContext); + + httpExecutingContext.incrementRetryCount(); + httpExecutingContext.setLastStatusCodeForRetry( + dto.getHttpResponse() == null + ? "0" + : String.valueOf(dto.getHttpResponse().getStatusLine().getStatusCode())); + // If the request failed with any other retry-able error and auth timeout is reached + // increase the retry count and throw special exception to renew the token before retrying. + + RetryContextManager.RetryHook retryManagerHook = null; + if (retryManager != null) { + retryManagerHook = retryManager.getRetryHook(); + retryManager + .getRetryContext() + .setElapsedTimeInMillis(httpExecutingContext.getElapsedMilliForTransientIssues()) + .setRetryTimeoutInMillis(httpExecutingContext.getRetryTimeoutInMilliseconds()); + } + + // Make sure that any authenticator specific info that needs to be + // updated gets updated before the next retry. Ex - OKTA OTT, JWT token + // Aim is to achieve this using RetryContextManager, but raising + // AUTHENTICATOR_REQUEST_TIMEOUT Exception is still supported as well. In both cases the + // retried request must be aware of the elapsed time not to exceed the timeout limit. + if (retryManagerHook == RetryContextManager.RetryHook.ALWAYS_BEFORE_RETRY) { + retryManager.executeRetryCallbacks(httpRequest); + } + + if (httpExecutingContext.getAuthTimeout() > 0 + && httpExecutingContext.getElapsedMilliForTransientIssues() + >= httpExecutingContext.getAuthTimeout()) { + throw new SnowflakeSQLException( + ErrorCode.AUTHENTICATOR_REQUEST_TIMEOUT, + httpExecutingContext.getRetryCount(), + false, + httpExecutingContext.getElapsedMilliForTransientIssues() / 1000); + } + + int numOfRetryToTriggerTelemetry = + TelemetryService.getInstance().getNumOfRetryToTriggerTelemetry(); + if (httpExecutingContext.getRetryCount() == numOfRetryToTriggerTelemetry) { + TelemetryService.getInstance() + .logHttpRequestTelemetryEvent( + String.format("HttpRequestRetry%dTimes", numOfRetryToTriggerTelemetry), + httpRequest, + httpExecutingContext.getInjectSocketTimeout(), + httpExecutingContext.getCanceling(), + httpExecutingContext.isWithoutCookies(), + httpExecutingContext.isIncludeRetryParameters(), + httpExecutingContext.isIncludeRequestGuid(), + dto.getHttpResponse(), + dto.getSavedEx(), + httpExecutingContext.getBreakRetryReason(), + httpExecutingContext.getRetryTimeout(), + httpExecutingContext.getRetryCount(), + SqlState.IO_ERROR, + ErrorCode.NETWORK_ERROR.getMessageCode()); + } + dto.setSavedEx(null); + httpExecutingContext.setSkipRetriesBecauseOf200(false); + + // release connection before retry + httpRequest.releaseConnection(); + } + + private static void sendTelemetryEvent( + HttpRequestBase httpRequest, + HttpExecutingContext httpExecutingContext, + CloseableHttpResponse response, + Exception savedEx) { + String eventName; + if (response == null) { + eventName = "NullResponseHttpError"; + } else { + if (response.getStatusLine() == null) { + eventName = "NullResponseStatusLine"; + } else { + eventName = String.format("HttpError%d", response.getStatusLine().getStatusCode()); + } + } + TelemetryService.getInstance() + .logHttpRequestTelemetryEvent( + eventName, + httpRequest, + httpExecutingContext.getInjectSocketTimeout(), + httpExecutingContext.getCanceling(), + httpExecutingContext.isWithoutCookies(), + httpExecutingContext.isIncludeRetryParameters(), + httpExecutingContext.isIncludeRequestGuid(), + response, + savedEx, + httpExecutingContext.getBreakRetryReason(), + httpExecutingContext.getRetryTimeout(), + httpExecutingContext.getRetryCount(), + null, + 0); + } + + private static void sleepForBackoffAndPrepareNext( + long elapsedMilliForLastCall, HttpExecutingContext context) { + if (context.getMinBackoffInMillis() > elapsedMilliForLastCall) { + try { + logger.debug( + "{}Retry request {}: sleeping for {} ms", + context.getRequestId(), + context.getRequestInfoScrubbed(), + context.getBackoffInMillis()); + Thread.sleep(context.getBackoffInMillis()); + } catch (InterruptedException ex1) { + logger.debug( + "{}Backoff sleep before retrying login got interrupted", context.getRequestId()); + } + context.increaseElapsedMilliForTransientIssues(context.getBackoffInMillis()); + context.setBackoffInMillis( + getNewBackoffInMilli( + context.getBackoffInMillis(), + context.isLoginRequest(), + context.getBackoff(), + context.getRetryCount(), + context.getRetryTimeoutInMilliseconds(), + context.getElapsedMilliForTransientIssues())); + } + } + + private static void logRequestResult( + CloseableHttpResponse response, + String requestIdStr, + String requestInfoScrubbed, + Exception savedEx) { + if (response != null) { + logger.debug( + "{}HTTP response not ok: status code: {}, request: {}", + requestIdStr, + response.getStatusLine().getStatusCode(), + requestInfoScrubbed); + } else if (savedEx != null) { + logger.debug( + "{}Null response for cause: {}, request: {}", + requestIdStr, + getRootCause(savedEx).getMessage(), + requestInfoScrubbed); + } else { + logger.debug("{}Null response for request: {}", requestIdStr, requestInfoScrubbed); + } + } + + private static void checkForDPoPNonceError(CloseableHttpResponse response) throws IOException { + String errorResponse = EntityUtils.toString(response.getEntity()); + if (!isNullOrEmpty(errorResponse)) { + ObjectMapper objectMapper = ObjectMapperFactory.getObjectMapper(); + JsonNode rootNode = objectMapper.readTree(errorResponse); + JsonNode errorNode = rootNode.get(ERROR_FIELD_NAME); + if (errorNode != null + && errorNode.isValueNode() + && errorNode.isTextual() + && errorNode.textValue().equals(ERROR_USE_DPOP_NONCE)) { + throw new SnowflakeUseDPoPNonceException( + response.getFirstHeader(DPOP_NONCE_HEADER_NAME).getValue()); + } + } + } + + static Exception handlingNotRetryableException( + Exception ex, HttpExecutingContext httpExecutingContext) throws SnowflakeSQLLoggedException { + Exception savedEx = null; + if (ex instanceof IllegalStateException) { + throw new SnowflakeSQLLoggedException( + ErrorCode.INVALID_STATE, ex, /* session= */ ex.getMessage()); + } else if (isExceptionInGroup(ex, sslExceptions) && !isProtocolVersionError(ex)) { + String formattedMsg = + ex.getMessage() + + "\n" + + "Verify that the hostnames and portnumbers in SYSTEM$ALLOWLIST are added to your" + + " firewall's allowed list.\n" + + "To troubleshoot your connection further, you can refer to this article:\n" + + "https://docs.snowflake.com/en/user-guide/client-connectivity-troubleshooting/overview"; + + throw new SnowflakeSQLLoggedException(ErrorCode.NETWORK_ERROR, ex, formattedMsg); + } else if (ex instanceof Exception) { + savedEx = ex; + // if the request took more than socket timeout log a warning + long currentMillis = System.currentTimeMillis(); + if ((currentMillis - httpExecutingContext.getStartTimePerRequest()) + > net.snowflake.client.core.HttpUtil.getSocketTimeout().toMillis()) { + logger.warn( + "{}HTTP request took longer than socket timeout {} ms: {} ms", + httpExecutingContext.getRequestId(), + net.snowflake.client.core.HttpUtil.getSocketTimeout().toMillis(), + (currentMillis - httpExecutingContext.getStartTimePerRequest())); + } + StringWriter sw = new StringWriter(); + savedEx.printStackTrace(new PrintWriter(sw)); + logger.debug( + "{}Exception encountered for: {}, {}, {}", + httpExecutingContext.getRequestId(), + httpExecutingContext.getRequestInfoScrubbed(), + ex.getLocalizedMessage(), + (ArgSupplier) sw::toString); + } + return ex; + } + + static boolean isExceptionInGroup(Exception e, Set> group) { + for (Class clazz : group) { + if (clazz.isInstance(e)) { + return true; + } + } + return false; + } + + static boolean isProtocolVersionError(Exception e) { + return e.getMessage() != null + && e.getMessage().contains("Received fatal alert: protocol_version"); + } + + private static boolean handleCertificateRevoked( + Exception savedEx, HttpExecutingContext httpExecutingContext, boolean skipRetrying) { + if (!skipRetrying && RestRequest.isCertificateRevoked(savedEx)) { + String msg = "Unknown reason"; + Throwable rootCause = RestRequest.getRootCause(savedEx); + msg = + rootCause.getMessage() != null && !rootCause.getMessage().isEmpty() + ? rootCause.getMessage() + : msg; + logger.debug( + "{}Error response not retryable, " + msg + ", request: {}", + httpExecutingContext.getRequestId(), + httpExecutingContext.getRequestInfoScrubbed()); + EventUtil.triggerBasicEvent( + Event.EventType.NETWORK_ERROR, + msg + ", Request: " + httpExecutingContext.getRequestInfoScrubbed(), + false); + + httpExecutingContext.setBreakRetryReason("certificate revoked error"); + httpExecutingContext.setBreakRetryEventName("HttpRequestRetryVertificateRevoked"); + httpExecutingContext.setShouldRetry(false); + return true; + } + return skipRetrying; + } + + private static boolean handleNonRetryableHttpCode( + HttpResponseContextDto dto, HttpExecutingContext httpExecutingContext, boolean skipRetrying) { + CloseableHttpResponse response = dto.getHttpResponse(); + if (!skipRetrying && isNonRetryableHTTPCode(response, httpExecutingContext.isRetryHTTP403())) { + String msg = "Unknown reason"; + if (response != null) { + logger.debug( + "{}HTTP response code for request {}: {}", + httpExecutingContext.getRequestId(), + httpExecutingContext.getRequestInfoScrubbed(), + response.getStatusLine().getStatusCode()); + msg = + "StatusCode: " + + response.getStatusLine().getStatusCode() + + ", Reason: " + + response.getStatusLine().getReasonPhrase(); + } else if (dto.getSavedEx() != null) // may be null. + { + Throwable rootCause = RestRequest.getRootCause(dto.getSavedEx()); + msg = rootCause.getMessage(); + } + + if (response == null || response.getStatusLine().getStatusCode() != 200) { + logger.debug( + "{}Error response not retryable, " + msg + ", request: {}", + httpExecutingContext.getRequestId(), + httpExecutingContext.getRequestInfoScrubbed()); + EventUtil.triggerBasicEvent( + Event.EventType.NETWORK_ERROR, + msg + ", Request: " + httpExecutingContext.getRequestInfoScrubbed(), + false); + } + httpExecutingContext.setBreakRetryReason("status code does not need retry"); + httpExecutingContext.setShouldRetry(false); + httpExecutingContext.setSkipRetriesBecauseOf200( + response.getStatusLine().getStatusCode() == 200); + + try { + if (response == null || response.getStatusLine().getStatusCode() != 200) { + logger.error( + "Error executing request: {}", httpExecutingContext.getRequestInfoScrubbed()); + + if (response != null + && response.getStatusLine().getStatusCode() == 400 + && response.getEntity() != null) { + checkForDPoPNonceError(response); + } + + logResponseDetails(response, logger); + + if (response != null) { + EntityUtils.consume(response.getEntity()); + } + + // We throw here exception if timeout was reached for login + dto.setSavedEx( + new SnowflakeSQLException( + SqlState.IO_ERROR, + ErrorCode.NETWORK_ERROR.getMessageCode(), + "HTTP status=" + + ((response != null) + ? response.getStatusLine().getStatusCode() + : "null response"))); + } + } catch (IOException e) { + dto.setSavedEx( + new SnowflakeSQLException( + SqlState.IO_ERROR, + ErrorCode.NETWORK_ERROR.getMessageCode(), + "Exception details: " + e.getMessage())); + } + return true; + } + return skipRetrying; + } + + private static void logTelemetryEvent( + HttpRequestBase request, + CloseableHttpResponse response, + Exception savedEx, + HttpExecutingContext httpExecutingContext) { + TelemetryService.getInstance() + .logHttpRequestTelemetryEvent( + httpExecutingContext.getBreakRetryEventName(), + request, + httpExecutingContext.getInjectSocketTimeout(), + httpExecutingContext.getCanceling(), + httpExecutingContext.isWithoutCookies(), + httpExecutingContext.isIncludeRetryParameters(), + httpExecutingContext.isIncludeRequestGuid(), + response, + savedEx, + httpExecutingContext.getBreakRetryReason(), + httpExecutingContext.getRetryTimeout(), + httpExecutingContext.getRetryCount(), + SqlState.IO_ERROR, + ErrorCode.NETWORK_ERROR.getMessageCode()); + } + + private static boolean handleMaxRetriesExceeded( + HttpExecutingContext httpExecutingContext, boolean skipRetrying) { + if (!skipRetrying && httpExecutingContext.maxRetriesExceeded()) { + logger.error( + "{}Stop retrying as max retries have been reached for request: {}! Max retry count: {}", + httpExecutingContext.getRequestId(), + httpExecutingContext.getRequestInfoScrubbed(), + httpExecutingContext.getMaxRetries()); + + httpExecutingContext.setBreakRetryReason("max retries reached"); + httpExecutingContext.setBreakRetryEventName("HttpRequestRetryLimitExceeded"); + httpExecutingContext.setShouldRetry(false); + return true; + } + return skipRetrying; + } + + private static boolean handleElapsedTimeoutExceeded( + HttpExecutingContext httpExecutingContext, boolean skipRetrying) { + if (!skipRetrying && httpExecutingContext.getRetryTimeoutInMilliseconds() > 0) { + // Check for retry time-out. + // increment total elapsed due to transient issues + long elapsedMilliForLastCall = + System.currentTimeMillis() - httpExecutingContext.getStartTimePerRequest(); + httpExecutingContext.increaseElapsedMilliForTransientIssues(elapsedMilliForLastCall); + + // check if the total elapsed time for transient issues has exceeded + // the retry timeout and we retry at least the min, if so, we will not + // retry + if (httpExecutingContext.elapsedTimeExceeded() && httpExecutingContext.moreThanMinRetries()) { + logger.error( + "{}Stop retrying since elapsed time due to network " + + "issues has reached timeout. " + + "Elapsed: {} ms, timeout: {} ms", + httpExecutingContext.getRequestId(), + httpExecutingContext.getElapsedMilliForTransientIssues(), + httpExecutingContext.getRetryTimeoutInMilliseconds()); + + httpExecutingContext.setBreakRetryReason("retry timeout"); + httpExecutingContext.setBreakRetryEventName("HttpRequestRetryTimeout"); + httpExecutingContext.setShouldRetry(false); + return true; + } + } + return skipRetrying; + } + + private static boolean handleCancelingSignal( + HttpExecutingContext httpExecutingContext, boolean skipRetrying) { + if (!skipRetrying + && httpExecutingContext.getCanceling() != null + && httpExecutingContext.getCanceling().get()) { + logger.debug( + "{}Stop retrying since canceling is requested", httpExecutingContext.getRequestId()); + httpExecutingContext.setBreakRetryReason("canceling is requested"); + httpExecutingContext.setShouldRetry(false); + return true; + } + return skipRetrying; + } + + private static boolean handleNoRetryFlag( + HttpExecutingContext httpExecutingContext, boolean skipRetrying) { + if (!skipRetrying && httpExecutingContext.isNoRetry()) { + logger.debug( + "{}HTTP retry disabled for this request. noRetry: {}", + httpExecutingContext.getRequestId(), + httpExecutingContext.isNoRetry()); + httpExecutingContext.setBreakRetryReason("retry is disabled"); + httpExecutingContext.resetRetryCount(); + httpExecutingContext.setShouldRetry(false); + return true; + } + return skipRetrying; + } + + private static boolean shouldSkipRetryWithLoggedReason( + HttpRequestBase request, + HttpResponseContextDto responseDto, + HttpExecutingContext httpExecutingContext) { + CloseableHttpResponse response = responseDto.getHttpResponse(); + Exception savedEx = responseDto.getSavedEx(); + List> conditions = + Arrays.asList( + skipRetrying -> handleNoRetryFlag(httpExecutingContext, skipRetrying), + skipRetrying -> handleCancelingSignal(httpExecutingContext, skipRetrying), + skipRetrying -> handleElapsedTimeoutExceeded(httpExecutingContext, skipRetrying), + skipRetrying -> handleMaxRetriesExceeded(httpExecutingContext, skipRetrying), + skipRetrying -> handleCertificateRevoked(savedEx, httpExecutingContext, skipRetrying), + skipRetrying -> + handleNonRetryableHttpCode(responseDto, httpExecutingContext, skipRetrying)); + + // Process each condition using Stream + boolean skipRetrying = + conditions.stream().reduce(Function::andThen).orElse(Function.identity()).apply(false); + + // Log telemetry + logTelemetryEvent(request, response, savedEx, httpExecutingContext); + + return skipRetrying; + } + + /** + * Inlined from SnowflakeUtil.logResponseDetails — cannot call JDBC's version because it expects + * net.snowflake.client.log.SFLogger, not our replicated SFLogger. + */ + private static void logResponseDetails(HttpResponse response, SFLogger logger) { + if (response == null) { + logger.error("null response", false); + return; + } + + // log the response + if (response.getStatusLine() != null) { + logger.error("Response status line reason: {}", response.getStatusLine().getReasonPhrase()); + } + + // log each header from response + Header[] headers = response.getAllHeaders(); + if (headers != null) { + for (Header header : headers) { + logger.debug("Header name: {}, value: {}", header.getName(), header.getValue()); + } + } + + // log response + if (response.getEntity() != null) { + try { + StringWriter writer = new StringWriter(); + BufferedReader bufferedReader = + new BufferedReader(new InputStreamReader((response.getEntity().getContent()))); + IOUtils.copy(bufferedReader, writer); + logger.error("Response content: {}", writer.toString()); + } catch (IOException ex) { + logger.error("Failed to read content due to exception: " + "{}", ex.getMessage()); + } + } + } + + private static String verifyAndUnpackResponse( + CloseableHttpResponse response, ExecTimeTelemetryData execTimeData) throws IOException { + try (StringWriter writer = new StringWriter()) { + execTimeData.setResponseIOStreamStart(); + try (InputStream ins = response.getEntity().getContent()) { + IOUtils.copy(ins, writer, "UTF-8"); + } + + execTimeData.setResponseIOStreamEnd(); + return writer.toString(); + } finally { + IOUtils.closeQuietly(response); + } + } +} diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/fileTransferAgent/RetryContext.java b/src/main/java/net/snowflake/ingest/streaming/internal/fileTransferAgent/RetryContext.java new file mode 100644 index 000000000..c2996ae73 --- /dev/null +++ b/src/main/java/net/snowflake/ingest/streaming/internal/fileTransferAgent/RetryContext.java @@ -0,0 +1,44 @@ +/* + * Replicated from snowflake-jdbc (v3.25.1) + * Source: https://github.com/snowflakedb/snowflake-jdbc/blob/v3.25.1/src/main/java/net/snowflake/client/jdbc/RetryContext.java + * + * Permitted differences: package declaration, @SnowflakeJdbcInternalApi annotation removed. + */ +package net.snowflake.ingest.streaming.internal.fileTransferAgent; + +/** RetryContext stores information about an ongoing request's retrying process. */ +public class RetryContext { + static final int SECONDS_TO_MILLIS_FACTOR = 1000; + private long elapsedTimeInMillis; + private long retryTimeoutInMillis; + private long retryCount; + + public RetryContext() {} + + public RetryContext setElapsedTimeInMillis(long elapsedTimeInMillis) { + this.elapsedTimeInMillis = elapsedTimeInMillis; + return this; + } + + public RetryContext setRetryTimeoutInMillis(long retryTimeoutInMillis) { + this.retryTimeoutInMillis = retryTimeoutInMillis; + return this; + } + + public RetryContext setRetryCount(long retryCount) { + this.retryCount = retryCount; + return this; + } + + private long getRemainingRetryTimeoutInMillis() { + return retryTimeoutInMillis - elapsedTimeInMillis; + } + + public long getRemainingRetryTimeoutInSeconds() { + return (getRemainingRetryTimeoutInMillis()) / SECONDS_TO_MILLIS_FACTOR; + } + + public long getRetryCount() { + return retryCount; + } +} diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/fileTransferAgent/RetryContextManager.java b/src/main/java/net/snowflake/ingest/streaming/internal/fileTransferAgent/RetryContextManager.java new file mode 100644 index 000000000..60a5abe4e --- /dev/null +++ b/src/main/java/net/snowflake/ingest/streaming/internal/fileTransferAgent/RetryContextManager.java @@ -0,0 +1,90 @@ +/* + * Replicated from snowflake-jdbc (v3.25.1) + * Source: https://github.com/snowflakedb/snowflake-jdbc/blob/v3.25.1/src/main/java/net/snowflake/client/jdbc/RetryContextManager.java + * + * Permitted differences: package declaration, import swaps for already-replicated classes, + * @SnowflakeJdbcInternalApi annotation removed. + */ +package net.snowflake.ingest.streaming.internal.fileTransferAgent; + +import java.util.ArrayList; +import java.util.List; +import org.apache.http.client.methods.HttpRequestBase; + +/** + * RetryContextManager lets you register logic (as callbacks) that will be re-executed during a + * retry of a request. + */ +public class RetryContextManager { + + // List of retry callbacks that will be executed in the order they were registered. + private final List< + ThrowingBiFunction> + retryCallbacks = new ArrayList<>(); + + // A RetryHook flag that can be used by client code to decide when (or if) callbacks should be + // executed. + private final RetryHook retryHook; + private RetryContext retryContext; + + /** Enumeration for different retry hook strategies. */ + public enum RetryHook { + /** Always execute the registered retry callbacks on every retry. */ + ALWAYS_BEFORE_RETRY, + } + + /** Default constructor using ALWAYS_BEFORE_RETRY as the default retry hook. */ + public RetryContextManager() { + this(RetryHook.ALWAYS_BEFORE_RETRY); + } + + /** + * Constructor that accepts a specific RetryHook. + * + * @param retryHook the retry hook strategy. + */ + public RetryContextManager(RetryHook retryHook) { + this.retryHook = retryHook; + this.retryContext = new RetryContext(); + } + + /** + * Registers a retry callback that will be executed on each retry. + * + * @param callback A RetryCallback encapsulating the logic to be replayed on retry. + * @return the current instance for fluent chaining. + */ + public RetryContextManager registerRetryCallback( + ThrowingBiFunction + callback) { + retryCallbacks.add(callback); + return this; + } + + /** + * Executes all registered retry callbacks in the order they were added, before reattempting the + * operation. + * + * @param requestToRetry the HTTP request to retry. + * @throws SnowflakeSQLException if an error occurs during callback execution. + */ + public void executeRetryCallbacks(HttpRequestBase requestToRetry) throws SnowflakeSQLException { + for (ThrowingBiFunction + callback : retryCallbacks) { + retryContext = callback.apply(requestToRetry, retryContext); + } + } + + /** + * Returns the configured RetryHook. + * + * @return the retry hook. + */ + public RetryHook getRetryHook() { + return retryHook; + } + + public RetryContext getRetryContext() { + return retryContext; + } +} diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/fileTransferAgent/SFOCSPException.java b/src/main/java/net/snowflake/ingest/streaming/internal/fileTransferAgent/SFOCSPException.java new file mode 100644 index 000000000..0513569b3 --- /dev/null +++ b/src/main/java/net/snowflake/ingest/streaming/internal/fileTransferAgent/SFOCSPException.java @@ -0,0 +1,41 @@ +/* + * Replicated from snowflake-jdbc (v3.25.1) + * Source: https://github.com/snowflakedb/snowflake-jdbc/blob/v3.25.1/src/main/java/net/snowflake/client/core/SFOCSPException.java + * + * Permitted differences: package declaration, import swaps for already-replicated classes. + */ +package net.snowflake.ingest.streaming.internal.fileTransferAgent; + +import net.snowflake.ingest.streaming.internal.fileTransferAgent.log.SFLogger; +import net.snowflake.ingest.streaming.internal.fileTransferAgent.log.SFLoggerFactory; + +public class SFOCSPException extends Throwable { + private static final SFLogger logger = SFLoggerFactory.getLogger(SFOCSPException.class); + + private static final long serialVersionUID = 1L; + + private final OCSPErrorCode errorCode; + + public SFOCSPException(OCSPErrorCode errorCode, String errorMsg) { + this(errorCode, errorMsg, null); + } + + public SFOCSPException(OCSPErrorCode errorCode, String errorMsg, Throwable cause) { + super(errorMsg); + this.errorCode = errorCode; + if (cause != null) { + this.initCause(cause); + } + } + + public OCSPErrorCode getErrorCode() { + return errorCode; + } + + @Override + public String toString() { + return super.toString() + + (getErrorCode() != null ? ", errorCode = " + getErrorCode() : "") + + (getMessage() != null ? ", errorMsg = " + getMessage() : ""); + } +} diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/fileTransferAgent/SnowflakeUseDPoPNonceException.java b/src/main/java/net/snowflake/ingest/streaming/internal/fileTransferAgent/SnowflakeUseDPoPNonceException.java new file mode 100644 index 000000000..dd901214a --- /dev/null +++ b/src/main/java/net/snowflake/ingest/streaming/internal/fileTransferAgent/SnowflakeUseDPoPNonceException.java @@ -0,0 +1,20 @@ +/* + * Replicated from snowflake-jdbc (v3.25.1) + * Source: https://github.com/snowflakedb/snowflake-jdbc/blob/v3.25.1/src/main/java/net/snowflake/client/jdbc/SnowflakeUseDPoPNonceException.java + * + * Permitted differences: package declaration, @SnowflakeJdbcInternalApi annotation removed. + */ +package net.snowflake.ingest.streaming.internal.fileTransferAgent; + +public class SnowflakeUseDPoPNonceException extends RuntimeException { + + private final String nonce; + + public SnowflakeUseDPoPNonceException(String nonce) { + this.nonce = nonce; + } + + public String getNonce() { + return nonce; + } +} diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/fileTransferAgent/ThrowingBiFunction.java b/src/main/java/net/snowflake/ingest/streaming/internal/fileTransferAgent/ThrowingBiFunction.java new file mode 100644 index 000000000..d9745268b --- /dev/null +++ b/src/main/java/net/snowflake/ingest/streaming/internal/fileTransferAgent/ThrowingBiFunction.java @@ -0,0 +1,12 @@ +/* + * Replicated from snowflake-jdbc (v3.25.1) + * Source: https://github.com/snowflakedb/snowflake-jdbc/blob/v3.25.1/src/main/java/net/snowflake/client/util/ThrowingBiFunction.java + * + * Permitted differences: package declaration, @SnowflakeJdbcInternalApi annotation removed. + */ +package net.snowflake.ingest.streaming.internal.fileTransferAgent; + +@FunctionalInterface +public interface ThrowingBiFunction { + R apply(A a, B b) throws T; +} diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/fileTransferAgent/URLUtil.java b/src/main/java/net/snowflake/ingest/streaming/internal/fileTransferAgent/URLUtil.java new file mode 100644 index 000000000..db2c3391d --- /dev/null +++ b/src/main/java/net/snowflake/ingest/streaming/internal/fileTransferAgent/URLUtil.java @@ -0,0 +1,82 @@ +/* + * Replicated from snowflake-jdbc (v3.25.1) + * Source: https://github.com/snowflakedb/snowflake-jdbc/blob/v3.25.1/src/main/java/net/snowflake/client/core/URLUtil.java + * + * Permitted differences: package declaration, import swaps for already-replicated classes, + * @SnowflakeJdbcInternalApi annotation removed. + * SFSession.SF_QUERY_REQUEST_ID inlined as a local constant (value: "requestId"). + */ +package net.snowflake.ingest.streaming.internal.fileTransferAgent; + +import java.io.UnsupportedEncodingException; +import java.net.MalformedURLException; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.URL; +import java.net.URLEncoder; +import java.nio.charset.StandardCharsets; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.regex.PatternSyntaxException; +import javax.annotation.Nullable; +import net.snowflake.ingest.streaming.internal.fileTransferAgent.log.SFLogger; +import net.snowflake.ingest.streaming.internal.fileTransferAgent.log.SFLoggerFactory; +import org.apache.http.NameValuePair; +import org.apache.http.client.utils.URLEncodedUtils; + +public class URLUtil { + + private static final SFLogger logger = SFLoggerFactory.getLogger(URLUtil.class); + + // Inlined from SFSession.SF_QUERY_REQUEST_ID + private static final String SF_QUERY_REQUEST_ID = "requestId"; + + static final String validURLPattern = + "^http(s?)\\:\\/\\/[0-9a-zA-Z]([-.\\w]*[0-9a-zA-Z@:])*(:(0-9)*)*(\\/?)([a-zA-Z0-9\\-\\.\\?\\,\\&\\(\\)\\/\\\\\\+&%\\$#_=@]*)?$"; + static final Pattern pattern = Pattern.compile(validURLPattern); + + public static boolean isValidURL(String url) { + try { + Matcher matcher = pattern.matcher(url); + return matcher.find(); + } catch (PatternSyntaxException pex) { + logger.debug("The URL REGEX is invalid. Falling back to basic sanity test"); + try { + new URL(url).toURI(); + return true; + } catch (MalformedURLException mex) { + logger.debug("The URL " + url + ", is invalid"); + return false; + } catch (URISyntaxException uex) { + logger.debug("The URL " + url + ", is invalid"); + return false; + } + } + } + + @Nullable + public static String urlEncode(String target) throws UnsupportedEncodingException { + String encodedTarget; + try { + encodedTarget = URLEncoder.encode(target, StandardCharsets.UTF_8.toString()); + } catch (UnsupportedEncodingException uex) { + logger.debug("The string to be encoded- " + target + ", is invalid"); + return null; + } + return encodedTarget; + } + + public static String getRequestId(URI uri) { + return URLEncodedUtils.parse(uri, StandardCharsets.UTF_8).stream() + .filter(p -> p.getName().equals(SF_QUERY_REQUEST_ID)) + .findFirst() + .map(NameValuePair::getValue) + .orElse(null); + } + + public static String getRequestIdLogStr(URI uri) { + String requestId = getRequestId(uri); + + return requestId == null ? "" : "[requestId=" + requestId + "] "; + } +}