Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,10 @@
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.io.IOException;
import java.rmi.UnexpectedException;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.LinkedList;
import java.util.Objects;
import java.util.concurrent.Future;
import net.snowflake.client.core.HttpUtil;
import net.snowflake.client.core.SFSession;
import net.snowflake.client.jdbc.SnowflakeConnectionV1;
import net.snowflake.client.jdbc.SnowflakeSQLException;
import net.snowflake.ingest.streaming.internal.fileTransferAgent.ObjectMapperFactory;
import net.snowflake.ingest.streaming.internal.fileTransferAgent.TelemetryThreadPool;
Expand All @@ -32,7 +27,6 @@
public class TelemetryClient implements Telemetry {
private static final SFLogger logger = SFLoggerFactory.getLogger(TelemetryClient.class);

private static final String SF_PATH_TELEMETRY = "/telemetry/send";
private static final String SF_PATH_TELEMETRY_SESSIONLESS = "/telemetry/send/sessionless";

// if the number of cached logs is larger than this threshold,
Expand All @@ -44,7 +38,6 @@ public class TelemetryClient implements Telemetry {
private final String serverUrl;
private final String telemetryUrl;

private final SFSession session;
private LinkedList<TelemetryData> logBatch;
private static final ObjectMapper mapper = ObjectMapperFactory.getObjectMapper();

Expand All @@ -67,23 +60,6 @@ public class TelemetryClient implements Telemetry {
// Retry timeout for the HTTP request
private static final int TELEMETRY_HTTP_RETRY_TIMEOUT_IN_SEC = 1000;

private TelemetryClient(SFSession session, int flushSize) {
this.session = session;
this.serverUrl = session.getUrl();
this.httpClient = null;

if (this.serverUrl.endsWith("/")) {
this.telemetryUrl =
this.serverUrl.substring(0, this.serverUrl.length() - 1) + SF_PATH_TELEMETRY;
} else {
this.telemetryUrl = this.serverUrl + SF_PATH_TELEMETRY;
}

this.logBatch = new LinkedList<>();
this.isClosed = false;
this.forceFlushSize = flushSize;
}

/**
* Constructor for creating a sessionless telemetry client
*
Expand All @@ -94,7 +70,6 @@ private TelemetryClient(SFSession session, int flushSize) {
*/
private TelemetryClient(
CloseableHttpClient httpClient, String serverUrl, String authType, int flushSize) {
this.session = null;
this.serverUrl = serverUrl;
this.httpClient = httpClient;

Expand Down Expand Up @@ -127,8 +102,7 @@ private TelemetryClient(
* @return whether client is enabled
*/
public boolean isTelemetryEnabled() {
return (this.session == null || this.session.isClientTelemetryEnabled())
&& this.isTelemetryServiceAvailable;
return this.isTelemetryServiceAvailable;
}

/** Disable any use of the client to add/send metrics */
Expand All @@ -137,54 +111,6 @@ public void disableTelemetry() {
this.isTelemetryServiceAvailable = false;
}

/**
* Initialize the telemetry connector
*
* @param conn connection with the session to use for the connector
* @param flushSize maximum size of telemetry batch before flush
* @return a telemetry connector
*/
public static Telemetry createTelemetry(Connection conn, int flushSize) {
try {
return createTelemetry(
(SFSession) conn.unwrap(SnowflakeConnectionV1.class).getSFBaseSession(), flushSize);
} catch (SQLException ex) {
logger.debug("Input connection is not a SnowflakeConnection", false);
return null;
}
}

/**
* Initialize the telemetry connector
*
* @param conn connection with the session to use for the connector
* @return a telemetry connector
*/
public static Telemetry createTelemetry(Connection conn) {
return createTelemetry(conn, DEFAULT_FORCE_FLUSH_SIZE);
}

/**
* Initialize the telemetry connector
*
* @param session session to use for telemetry dumps
* @return a telemetry connector
*/
public static Telemetry createTelemetry(SFSession session) {
return createTelemetry(session, DEFAULT_FORCE_FLUSH_SIZE);
}

/**
* Initialize the telemetry connector
*
* @param session session to use for telemetry dumps
* @param flushSize maximum size of telemetry batch before flush
* @return a telemetry connector
*/
public static Telemetry createTelemetry(SFSession session, int flushSize) {
return new TelemetryClient(session, flushSize);
}

/**
* Initialize the sessionless telemetry connector using KEYPAIR_JWT as the default auth type
*
Expand Down Expand Up @@ -330,14 +256,9 @@ private boolean sendBatch() throws IOException {
this.logBatch = new LinkedList<>();
}

if (this.session != null && this.session.isClosed()) {
throw new UnexpectedException("Session is closed when sending log");
}

if (!tmpList.isEmpty()) {
Stopwatch stopwatch = new Stopwatch();
stopwatch.start();
// session shared with JDBC
String payload = logsToString(tmpList);

logger.debugNoMask("Payload of telemetry is : " + payload);
Expand All @@ -346,35 +267,21 @@ private boolean sendBatch() throws IOException {
post.setEntity(new StringEntity(payload));
post.setHeader("Content-type", "application/json");

if (this.session == null) {
post.setHeader(HttpHeaders.AUTHORIZATION, "Bearer " + this.token);
post.setHeader("X-Snowflake-Authorization-Token-Type", this.authType);
post.setHeader(HttpHeaders.ACCEPT, "application/json");
} else {
post.setHeader(
HttpHeaders.AUTHORIZATION,
"Snowflake Token=\"" + this.session.getSessionToken() + "\"");
}
post.setHeader(HttpHeaders.AUTHORIZATION, "Bearer " + this.token);
post.setHeader("X-Snowflake-Authorization-Token-Type", this.authType);
post.setHeader(HttpHeaders.ACCEPT, "application/json");

String response = null;

try {
response =
this.session == null
? HttpUtil.executeGeneralRequest(
post,
TELEMETRY_HTTP_RETRY_TIMEOUT_IN_SEC,
0,
(int) HttpUtil.getSocketTimeout().toMillis(),
0,
this.httpClient)
: HttpUtil.executeGeneralRequest(
post,
TELEMETRY_HTTP_RETRY_TIMEOUT_IN_SEC,
0,
this.session.getHttpClientSocketTimeout(),
0,
this.session.getHttpClientKey());
HttpUtil.executeGeneralRequest(
post,
TELEMETRY_HTTP_RETRY_TIMEOUT_IN_SEC,
0,
(int) HttpUtil.getSocketTimeout().toMillis(),
0,
this.httpClient);
stopwatch.stop();
logger.debug(
"Sending telemetry took {} ms. Batch size: {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,6 @@ public boolean handleStorageException(

throw new SnowflakeSQLLoggedException(
queryId,
null,
SqlState.SYSTEM_ERROR,
ErrorCode.S3_OPERATION_ERROR.getMessageCode(),
ex1,
Expand All @@ -254,7 +253,6 @@ public boolean handleStorageException(
} else {
throw new SnowflakeSQLLoggedException(
queryId,
null,
SqlState.SYSTEM_ERROR,
ErrorCode.AWS_CLIENT_ERROR.getMessageCode(),
ex,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,6 @@ public boolean handleStorageException(
if (retryCount > gcsClient.getMaxRetries()) {
throw new SnowflakeSQLLoggedException(
queryId,
null,
SqlState.SYSTEM_ERROR,
ErrorCode.GCP_SERVICE_ERROR.getMessageCode(),
se,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,6 @@ private SFPair<InputStream, Boolean> createUploadStream(
} catch (FileNotFoundException ex) {
logger.logError("Failed to open input file", ex);
throw new SnowflakeSQLLoggedException(
null /* session */,
SqlState.INTERNAL_ERROR,
ErrorCode.INTERNAL_ERROR.getMessageCode(),
ex,
Expand All @@ -317,7 +316,6 @@ private SFPair<InputStream, Boolean> createUploadStream(
} catch (IOException ex) {
logger.logError("Failed to open input stream", ex);
throw new SnowflakeSQLLoggedException(
null /* session */,
SqlState.INTERNAL_ERROR,
ErrorCode.INTERNAL_ERROR.getMessageCode(),
ex,
Expand Down Expand Up @@ -373,7 +371,6 @@ private static void handleAzureException(
if (retryCount > azClient.getMaxRetries()
|| ((StorageException) ex).getHttpStatusCode() == 404) {
throw new SnowflakeSQLLoggedException(
null /* session */,
SqlState.SYSTEM_ERROR,
ErrorCode.AZURE_SERVICE_ERROR.getMessageCode(),
se,
Expand Down Expand Up @@ -410,7 +407,6 @@ private static void handleAzureException(
|| StorageClientUtil.getRootCause(ex) instanceof SocketTimeoutException) {
if (retryCount > azClient.getMaxRetries()) {
throw new SnowflakeSQLLoggedException(
null /* session */,
SqlState.SYSTEM_ERROR,
ErrorCode.IO_ERROR.getMessageCode(),
ex,
Expand All @@ -424,7 +420,6 @@ private static void handleAzureException(
}
} else {
throw new SnowflakeSQLLoggedException(
null /* session */,
SqlState.SYSTEM_ERROR,
ErrorCode.IO_ERROR.getMessageCode(),
ex,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,6 @@ public String upload(
String presignedUrl)
throws SnowflakeSQLException {
throw new SnowflakeSQLLoggedException(
null,
ErrorCode.INTERNAL_ERROR.getMessageCode(),
SqlState.INTERNAL_ERROR,
/* session= */ "IcebergGCSClient.upload" + " only works with pre-signed URL.");
Expand Down Expand Up @@ -324,7 +323,6 @@ private SFPair<InputStream, Boolean> createUploadStream(
} catch (FileNotFoundException ex) {
logger.logError("Failed to open input file", ex);
throw new SnowflakeSQLLoggedException(
null /* session */,
SqlState.INTERNAL_ERROR,
ErrorCode.INTERNAL_ERROR.getMessageCode(),
ex,
Expand All @@ -333,7 +331,6 @@ private SFPair<InputStream, Boolean> createUploadStream(
} catch (IOException ex) {
logger.logError("Failed to open input stream", ex);
throw new SnowflakeSQLLoggedException(
null /* session */,
SqlState.INTERNAL_ERROR,
ErrorCode.INTERNAL_ERROR.getMessageCode(),
ex,
Expand Down Expand Up @@ -368,7 +365,6 @@ private void handleStorageException(Exception ex, int retryCount, String operati
// If we have exceeded the max number of retries, propagate the error
if (retryCount > getMaxRetries()) {
throw new SnowflakeSQLLoggedException(
null /* session */,
SqlState.SYSTEM_ERROR,
ErrorCode.GCP_SERVICE_ERROR.getMessageCode(),
se,
Expand Down Expand Up @@ -403,7 +399,6 @@ private void handleStorageException(Exception ex, int retryCount, String operati
|| getRootCause(ex) instanceof SocketTimeoutException) {
if (retryCount > getMaxRetries()) {
throw new SnowflakeSQLLoggedException(
null /* session */,
SqlState.SYSTEM_ERROR,
ErrorCode.IO_ERROR.getMessageCode(),
ex,
Expand All @@ -417,7 +412,6 @@ private void handleStorageException(Exception ex, int retryCount, String operati
}
} else {
throw new SnowflakeSQLLoggedException(
null /* session */,
SqlState.SYSTEM_ERROR,
ErrorCode.IO_ERROR.getMessageCode(),
ex,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,6 @@ public ExecutorService newExecutor() {
}

throw new SnowflakeSQLLoggedException(
null,
ErrorCode.INTERNAL_ERROR.getMessageCode(),
SqlState.INTERNAL_ERROR,
"Unexpected: upload unsuccessful without exception!");
Expand Down Expand Up @@ -433,7 +432,6 @@ private SFPair<InputStream, Boolean> createUploadStream(
} catch (FileNotFoundException ex) {
logger.logError("Failed to open input file", ex);
throw new SnowflakeSQLLoggedException(
null /* session */,
SqlState.INTERNAL_ERROR,
ErrorCode.INTERNAL_ERROR.getMessageCode(),
ex,
Expand All @@ -442,7 +440,6 @@ private SFPair<InputStream, Boolean> createUploadStream(
} catch (IOException ex) {
logger.logError("Failed to open input stream", ex);
throw new SnowflakeSQLLoggedException(
null /* session */,
SqlState.INTERNAL_ERROR,
ErrorCode.INTERNAL_ERROR.getMessageCode(),
ex,
Expand Down Expand Up @@ -486,7 +483,6 @@ private static void handleS3Exception(
// does not return the ExpiredToken error code.
// If session is null we cannot renew the token so throw the exception
throw new SnowflakeSQLLoggedException(
null /* session */,
SqlState.SYSTEM_ERROR,
ErrorCode.S3_OPERATION_ERROR.getMessageCode(),
ex1,
Expand All @@ -498,7 +494,6 @@ private static void handleS3Exception(
extendedRequestId);
} else {
throw new SnowflakeSQLLoggedException(
null /* session */,
SqlState.SYSTEM_ERROR,
ErrorCode.AWS_CLIENT_ERROR.getMessageCode(),
ex,
Expand Down Expand Up @@ -546,7 +541,6 @@ private static void handleS3Exception(
|| getRootCause(ex) instanceof SocketTimeoutException) {
if (retryCount > s3Client.getMaxRetries()) {
throw new SnowflakeSQLLoggedException(
null /* session */,
SqlState.SYSTEM_ERROR,
ErrorCode.IO_ERROR.getMessageCode(),
ex,
Expand All @@ -560,7 +554,6 @@ private static void handleS3Exception(
}
} else {
throw new SnowflakeSQLLoggedException(
null /* session */,
SqlState.SYSTEM_ERROR,
ErrorCode.IO_ERROR.getMessageCode(),
ex,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ default String uploadWithPresignedUrlWithoutConnection(
String presignedUrl)
throws SnowflakeSQLException {
throw new SnowflakeSQLLoggedException(
null,
ErrorCode.INTERNAL_ERROR.getMessageCode(),
SqlState.INTERNAL_ERROR,
/* session= */ "uploadWithPresignedUrlWithoutConnection"
Expand Down
Loading
Loading