From dc258e041ce472a76f7b9f8550ed271312f26648 Mon Sep 17 00:00:00 2001 From: skyflow-bharti Date: Fri, 12 Sep 2025 15:47:23 +0530 Subject: [PATCH 01/12] SK-2292 retry on connection --- skyflow/vault/_client.py | 40 +++++++++++++++++++++++++++++----------- 1 file changed, 29 insertions(+), 11 deletions(-) diff --git a/skyflow/vault/_client.py b/skyflow/vault/_client.py index e426f59f..b9d858fa 100644 --- a/skyflow/vault/_client.py +++ b/skyflow/vault/_client.py @@ -45,7 +45,6 @@ def __init__(self, config: Configuration): def insert(self, records: dict, options: InsertOptions = InsertOptions()): interface = InterfaceName.INSERT.value log_info(InfoMessages.INSERT_TRIGGERED.value, interface=interface) - self._checkConfig(interface) jsonBody = getInsertRequestBody(records, options) @@ -57,16 +56,35 @@ def insert(self, records: dict, options: InsertOptions = InsertOptions()): "sky-metadata": json.dumps(getMetrics()) } - response = requests.post(requestURL, data=jsonBody, headers=headers) - processedResponse = processResponse(response) - result, partial = convertResponse(records, processedResponse, options) - if partial: - log_error(SkyflowErrorMessages.BATCH_INSERT_PARTIAL_SUCCESS.value, interface) - elif 'records' not in result: - log_error(SkyflowErrorMessages.BATCH_INSERT_FAILURE.value, interface) - else: - log_info(InfoMessages.INSERT_DATA_SUCCESS.value, interface) - return result + # Use for-loop for retry logic, avoid code repetition + for attempt in range(2): + try: + # If jsonBody is a dict, use json=, else use data= + if isinstance(jsonBody, dict): + response = requests.post(requestURL, json=jsonBody, headers=headers) + else: + response = requests.post(requestURL, data=jsonBody, headers=headers) + processedResponse = processResponse(response) + result, partial = convertResponse(records, processedResponse, options) + if partial: + log_error(SkyflowErrorMessages.BATCH_INSERT_PARTIAL_SUCCESS.value, interface) + raise SkyflowError(SkyflowErrorCodes.PARTIAL_SUCCESS, SkyflowErrorMessages.BATCH_INSERT_PARTIAL_SUCCESS.value, result, interface=interface) + if 'records' not in result: + log_error(SkyflowErrorMessages.BATCH_INSERT_FAILURE.value, interface) + raise SkyflowError(SkyflowErrorCodes.SERVER_ERROR, SkyflowErrorMessages.BATCH_INSERT_FAILURE.value, result, interface=interface) + log_info(InfoMessages.INSERT_DATA_SUCCESS.value, interface) + return result + except requests.exceptions.ConnectionError as err: + log_error(f'Connection error inserting record: {err}', interface) + if attempt == 0: + log_info("Retrying record...", interface) + continue + else: + raise SkyflowError(SkyflowErrorCodes.SERVER_ERROR, f"Connection error after retry: {err}", interface=interface) + except Exception as err: + log_error(f'Unexpected error in insert: {err}', interface) + raise + def detokenize(self, records: dict, options: DetokenizeOptions = DetokenizeOptions()): interface = InterfaceName.DETOKENIZE.value From adafc1cc1b618f4d26c868fb764b490a92147580 Mon Sep 17 00:00:00 2001 From: skyflow-bharti Date: Fri, 12 Sep 2025 15:51:48 +0530 Subject: [PATCH 02/12] SK-2292 set default retry --- skyflow/vault/_client.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/skyflow/vault/_client.py b/skyflow/vault/_client.py index b9d858fa..58e28d05 100644 --- a/skyflow/vault/_client.py +++ b/skyflow/vault/_client.py @@ -55,9 +55,9 @@ def insert(self, records: dict, options: InsertOptions = InsertOptions()): "Authorization": "Bearer " + self.storedToken, "sky-metadata": json.dumps(getMetrics()) } - + max_retries = 1 # Use for-loop for retry logic, avoid code repetition - for attempt in range(2): + for attempt in range(max_retries): try: # If jsonBody is a dict, use json=, else use data= if isinstance(jsonBody, dict): From b3d791cafb37e6e1d0748dde5b389dd07d737178 Mon Sep 17 00:00:00 2001 From: skyflow-bharti Date: Fri, 12 Sep 2025 18:48:11 +0530 Subject: [PATCH 03/12] SK-2293 max retry set to 3 --- skyflow/vault/_client.py | 19 +++++-------------- 1 file changed, 5 insertions(+), 14 deletions(-) diff --git a/skyflow/vault/_client.py b/skyflow/vault/_client.py index 58e28d05..785c42eb 100644 --- a/skyflow/vault/_client.py +++ b/skyflow/vault/_client.py @@ -55,15 +55,12 @@ def insert(self, records: dict, options: InsertOptions = InsertOptions()): "Authorization": "Bearer " + self.storedToken, "sky-metadata": json.dumps(getMetrics()) } - max_retries = 1 + max_retries = 3 # Use for-loop for retry logic, avoid code repetition - for attempt in range(max_retries): + for attempt in range(max_retries+1): try: # If jsonBody is a dict, use json=, else use data= - if isinstance(jsonBody, dict): - response = requests.post(requestURL, json=jsonBody, headers=headers) - else: - response = requests.post(requestURL, data=jsonBody, headers=headers) + response = requests.post(requestURL, data=jsonBody, headers=headers) processedResponse = processResponse(response) result, partial = convertResponse(records, processedResponse, options) if partial: @@ -75,16 +72,10 @@ def insert(self, records: dict, options: InsertOptions = InsertOptions()): log_info(InfoMessages.INSERT_DATA_SUCCESS.value, interface) return result except requests.exceptions.ConnectionError as err: - log_error(f'Connection error inserting record: {err}', interface) - if attempt == 0: - log_info("Retrying record...", interface) + if attempt < max_retries: continue else: - raise SkyflowError(SkyflowErrorCodes.SERVER_ERROR, f"Connection error after retry: {err}", interface=interface) - except Exception as err: - log_error(f'Unexpected error in insert: {err}', interface) - raise - + raise SkyflowError(SkyflowErrorCodes.SERVER_ERROR, f"Error occurred: {err}", interface=interface) def detokenize(self, records: dict, options: DetokenizeOptions = DetokenizeOptions()): interface = InterfaceName.DETOKENIZE.value From bb6c87450d2c2a82f0a1ab305f01979ce0e616f2 Mon Sep 17 00:00:00 2001 From: skyflow-bharti Date: Fri, 12 Sep 2025 14:30:46 +0000 Subject: [PATCH 04/12] [AUTOMATED] Public Release - 1.15.2 --- setup.py | 2 +- skyflow/version.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/setup.py b/setup.py index c7756728..89ba87d0 100644 --- a/setup.py +++ b/setup.py @@ -6,7 +6,7 @@ if sys.version_info < (3, 7): raise RuntimeError("skyflow requires Python 3.7+") -current_version = '1.15.1' +current_version = '1.15.2' setup( name='skyflow', diff --git a/skyflow/version.py b/skyflow/version.py index 4d6b1a07..82208a4e 100644 --- a/skyflow/version.py +++ b/skyflow/version.py @@ -1 +1 @@ -SDK_VERSION = '1.15.1' \ No newline at end of file +SDK_VERSION = '1.15.2' \ No newline at end of file From 5700874e2eda09325f94dd3c561c2d0b9d7cad87 Mon Sep 17 00:00:00 2001 From: skyflow-bharti Date: Fri, 12 Sep 2025 21:52:59 +0530 Subject: [PATCH 05/12] SK-2293 retry on every exception --- skyflow/vault/_client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/skyflow/vault/_client.py b/skyflow/vault/_client.py index 785c42eb..1c1236c2 100644 --- a/skyflow/vault/_client.py +++ b/skyflow/vault/_client.py @@ -71,7 +71,7 @@ def insert(self, records: dict, options: InsertOptions = InsertOptions()): raise SkyflowError(SkyflowErrorCodes.SERVER_ERROR, SkyflowErrorMessages.BATCH_INSERT_FAILURE.value, result, interface=interface) log_info(InfoMessages.INSERT_DATA_SUCCESS.value, interface) return result - except requests.exceptions.ConnectionError as err: + except Exception as err: if attempt < max_retries: continue else: From 3b47b5ca3d6d4da3e2819f690f4924e6933a8f55 Mon Sep 17 00:00:00 2001 From: skyflow-bharti Date: Fri, 12 Sep 2025 16:31:59 +0000 Subject: [PATCH 06/12] [AUTOMATED] Public Release - 1.15.3 --- setup.py | 2 +- skyflow/version.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/setup.py b/setup.py index 89ba87d0..9351f06a 100644 --- a/setup.py +++ b/setup.py @@ -6,7 +6,7 @@ if sys.version_info < (3, 7): raise RuntimeError("skyflow requires Python 3.7+") -current_version = '1.15.2' +current_version = '1.15.3' setup( name='skyflow', diff --git a/skyflow/version.py b/skyflow/version.py index 82208a4e..30ae0eae 100644 --- a/skyflow/version.py +++ b/skyflow/version.py @@ -1 +1 @@ -SDK_VERSION = '1.15.2' \ No newline at end of file +SDK_VERSION = '1.15.3' \ No newline at end of file From 922f98c977145227dc50b4117243557cf994dc48 Mon Sep 17 00:00:00 2001 From: skyflow-bharti Date: Fri, 12 Sep 2025 22:39:07 +0530 Subject: [PATCH 07/12] SK-2293 retry on every exception --- skyflow/vault/_client.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/skyflow/vault/_client.py b/skyflow/vault/_client.py index 1c1236c2..e21dcbab 100644 --- a/skyflow/vault/_client.py +++ b/skyflow/vault/_client.py @@ -75,7 +75,10 @@ def insert(self, records: dict, options: InsertOptions = InsertOptions()): if attempt < max_retries: continue else: - raise SkyflowError(SkyflowErrorCodes.SERVER_ERROR, f"Error occurred: {err}", interface=interface) + if isinstance(err, SkyflowError): + raise err + else: + raise SkyflowError(SkyflowErrorCodes.SERVER_ERROR, f"Error occurred: {err}", interface=interface) def detokenize(self, records: dict, options: DetokenizeOptions = DetokenizeOptions()): interface = InterfaceName.DETOKENIZE.value From 450db9d8e16bb6194b7e4a9a536e9e96ab607d1a Mon Sep 17 00:00:00 2001 From: skyflow-bharti Date: Fri, 12 Sep 2025 17:15:17 +0000 Subject: [PATCH 08/12] [AUTOMATED] Public Release - 1.15.4 --- setup.py | 2 +- skyflow/version.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/setup.py b/setup.py index 9351f06a..323fa31d 100644 --- a/setup.py +++ b/setup.py @@ -6,7 +6,7 @@ if sys.version_info < (3, 7): raise RuntimeError("skyflow requires Python 3.7+") -current_version = '1.15.3' +current_version = '1.15.4' setup( name='skyflow', diff --git a/skyflow/version.py b/skyflow/version.py index 30ae0eae..5c7ae5de 100644 --- a/skyflow/version.py +++ b/skyflow/version.py @@ -1 +1 @@ -SDK_VERSION = '1.15.3' \ No newline at end of file +SDK_VERSION = '1.15.4' \ No newline at end of file From 373c6bb90c870c8fd1e14f54d2712614d9f1939e Mon Sep 17 00:00:00 2001 From: skyflow-shravan Date: Sun, 14 Sep 2025 18:59:50 +0530 Subject: [PATCH 09/12] SK-2285 insert with requests session --- skyflow/vault/_client.py | 52 +++++++++++++++++++++------------------- 1 file changed, 28 insertions(+), 24 deletions(-) diff --git a/skyflow/vault/_client.py b/skyflow/vault/_client.py index e21dcbab..5e1d7d81 100644 --- a/skyflow/vault/_client.py +++ b/skyflow/vault/_client.py @@ -5,6 +5,8 @@ import types import requests import asyncio +from requests.packages.urllib3.util.retry import Retry +from requests.adapters import HTTPAdapter from skyflow.vault._insert import getInsertRequestBody, processResponse, convertResponse from skyflow.vault._update import sendUpdateRequests, createUpdateResponseBody from skyflow.vault._config import Configuration, ConnectionConfig, DeleteOptions, DetokenizeOptions, GetOptions, InsertOptions, UpdateOptions, QueryOptions @@ -36,6 +38,16 @@ def __init__(self, config: Configuration): raise SkyflowError(SkyflowErrorCodes.INVALID_INPUT, SkyflowErrorMessages.TOKEN_PROVIDER_ERROR.value % ( str(type(config.tokenProvider))), interface=interface) + retry_strategy = Retry( + total=3, + backoff_factor=0.5, + status_forcelist=[500, 502, 503, 504], + ) + + self.session = requests.Session() + adapter = HTTPAdapter(pool_connections=1, pool_maxsize=20, pool_block=True, max_retries=retry_strategy) + self.session.mount("https://", adapter) + self.vaultID = config.vaultID self.vaultURL = config.vaultURL.rstrip('/') self.tokenProvider = config.tokenProvider @@ -55,30 +67,22 @@ def insert(self, records: dict, options: InsertOptions = InsertOptions()): "Authorization": "Bearer " + self.storedToken, "sky-metadata": json.dumps(getMetrics()) } - max_retries = 3 - # Use for-loop for retry logic, avoid code repetition - for attempt in range(max_retries+1): - try: - # If jsonBody is a dict, use json=, else use data= - response = requests.post(requestURL, data=jsonBody, headers=headers) - processedResponse = processResponse(response) - result, partial = convertResponse(records, processedResponse, options) - if partial: - log_error(SkyflowErrorMessages.BATCH_INSERT_PARTIAL_SUCCESS.value, interface) - raise SkyflowError(SkyflowErrorCodes.PARTIAL_SUCCESS, SkyflowErrorMessages.BATCH_INSERT_PARTIAL_SUCCESS.value, result, interface=interface) - if 'records' not in result: - log_error(SkyflowErrorMessages.BATCH_INSERT_FAILURE.value, interface) - raise SkyflowError(SkyflowErrorCodes.SERVER_ERROR, SkyflowErrorMessages.BATCH_INSERT_FAILURE.value, result, interface=interface) - log_info(InfoMessages.INSERT_DATA_SUCCESS.value, interface) - return result - except Exception as err: - if attempt < max_retries: - continue - else: - if isinstance(err, SkyflowError): - raise err - else: - raise SkyflowError(SkyflowErrorCodes.SERVER_ERROR, f"Error occurred: {err}", interface=interface) + # response = requests.post(requestURL, data=jsonBody, headers=headers) + response = self.session.post( + requestURL, + data=jsonBody, + headers=headers, + ) + processedResponse = processResponse(response) + print(">>> processedResponse", processedResponse) + result, partial = convertResponse(records, processedResponse, options) + if partial: + log_error(SkyflowErrorMessages.BATCH_INSERT_PARTIAL_SUCCESS.value, interface) + elif 'records' not in result: + log_error(SkyflowErrorMessages.BATCH_INSERT_FAILURE.value, interface) + else: + log_info(InfoMessages.INSERT_DATA_SUCCESS.value, interface) + return result def detokenize(self, records: dict, options: DetokenizeOptions = DetokenizeOptions()): interface = InterfaceName.DETOKENIZE.value From dbd4af2c6bb9f5ae807d2d7e1fb2721ed7af8bc9 Mon Sep 17 00:00:00 2001 From: skyflow-shravan Date: Mon, 15 Sep 2025 14:14:17 +0530 Subject: [PATCH 10/12] SK-2294 requests session --- skyflow/vault/_client.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/skyflow/vault/_client.py b/skyflow/vault/_client.py index 5e1d7d81..b728b7d7 100644 --- a/skyflow/vault/_client.py +++ b/skyflow/vault/_client.py @@ -40,12 +40,14 @@ def __init__(self, config: Configuration): retry_strategy = Retry( total=3, + connect=5, backoff_factor=0.5, status_forcelist=[500, 502, 503, 504], + raise_on_status=True, ) self.session = requests.Session() - adapter = HTTPAdapter(pool_connections=1, pool_maxsize=20, pool_block=True, max_retries=retry_strategy) + adapter = HTTPAdapter(pool_connections=1, pool_maxsize=20, pool_block=False, max_retries=retry_strategy) self.session.mount("https://", adapter) self.vaultID = config.vaultID @@ -65,16 +67,18 @@ def insert(self, records: dict, options: InsertOptions = InsertOptions()): self.storedToken, self.tokenProvider, interface) headers = { "Authorization": "Bearer " + self.storedToken, - "sky-metadata": json.dumps(getMetrics()) + "sky-metadata": json.dumps(getMetrics()), + # "User-Agent": 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.36' } # response = requests.post(requestURL, data=jsonBody, headers=headers) response = self.session.post( requestURL, data=jsonBody, headers=headers, + # timeout=(5, 300), ) processedResponse = processResponse(response) - print(">>> processedResponse", processedResponse) + print(">>> processedResponse local: ", processedResponse) result, partial = convertResponse(records, processedResponse, options) if partial: log_error(SkyflowErrorMessages.BATCH_INSERT_PARTIAL_SUCCESS.value, interface) From 6be388bdbcfda510b24e1ed95f5dd64f8bfa213f Mon Sep 17 00:00:00 2001 From: skyflow-bharti Date: Tue, 16 Sep 2025 13:02:53 +0530 Subject: [PATCH 11/12] SK-2293 check the response history --- skyflow/vault/_client.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/skyflow/vault/_client.py b/skyflow/vault/_client.py index b728b7d7..fb249b77 100644 --- a/skyflow/vault/_client.py +++ b/skyflow/vault/_client.py @@ -5,7 +5,7 @@ import types import requests import asyncio -from requests.packages.urllib3.util.retry import Retry +from urllib3.util.retry import Retry from requests.adapters import HTTPAdapter from skyflow.vault._insert import getInsertRequestBody, processResponse, convertResponse from skyflow.vault._update import sendUpdateRequests, createUpdateResponseBody @@ -77,6 +77,7 @@ def insert(self, records: dict, options: InsertOptions = InsertOptions()): headers=headers, # timeout=(5, 300), ) + print(">>> raw response: ", response.history) processedResponse = processResponse(response) print(">>> processedResponse local: ", processedResponse) result, partial = convertResponse(records, processedResponse, options) From 8a2b4e5bf0484453864025690956637076c74655 Mon Sep 17 00:00:00 2001 From: skyflow-bharti Date: Tue, 16 Sep 2025 07:35:17 +0000 Subject: [PATCH 12/12] [AUTOMATED] Private Release 1.15.4.dev0+ef26f0d --- setup.py | 2 +- skyflow/version.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/setup.py b/setup.py index 323fa31d..af42de74 100644 --- a/setup.py +++ b/setup.py @@ -6,7 +6,7 @@ if sys.version_info < (3, 7): raise RuntimeError("skyflow requires Python 3.7+") -current_version = '1.15.4' +current_version = '1.15.4.dev0+ef26f0d' setup( name='skyflow', diff --git a/skyflow/version.py b/skyflow/version.py index 9d9a8747..37d07012 100644 --- a/skyflow/version.py +++ b/skyflow/version.py @@ -1 +1 @@ -SDK_VERSION = '1.15.4' +SDK_VERSION = '1.15.4.dev0+ef26f0d'