Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 51 additions & 39 deletions immudb_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,13 @@
from pathlib import Path
from time import sleep
from traceback import format_exc
from typing import IO, Any, Dict, List, Optional, Union
from typing import (
IO,
Any,
Dict,
Optional,
Union,
)
from urllib.parse import urlparse

from git import Repo
Expand All @@ -21,6 +27,13 @@
Dict = Dict[str, Any]


POSSIBLE_EXC_DETAILS = (
'Connection timed out',
'Socker closed',
'Connection reset by peer',
)


class ImmudbWrapper(ImmudbClient):
def __init__(
self,
Expand Down Expand Up @@ -78,41 +91,36 @@ def __init__(
)
self.login()

def retry(possible_exc_details: Optional[List[str]] = None):
if not possible_exc_details:
possible_exc_details = []

def wrapper(func):
@wraps(func)
def wrapped(self, *args, **kwargs):
max_retries = self.max_retries
last_exc = Exception()
while max_retries:
try:
return func(self, *args, **kwargs)
except _InactiveRpcError as exc:
exc_details = exc.details()
last_exc = exc
if exc_details and any(
detail in exc_details
for detail in possible_exc_details
):
max_retries -= 1
self.logger.error(
'Running the "%s" function again after %d'
' seconds',
func.__name__,
self.retry_timeout,
)
sleep(self.retry_timeout)
continue
raise
raise last_exc

return wrapped

return wrapper

def retry(func):
@wraps(func)
def wrapped(self, *args, **kwargs):
max_retries = self.max_retries
last_exc = Exception()
while max_retries:
try:
return func(self, *args, **kwargs)
except _InactiveRpcError as exc:
exc_details = exc.details()
last_exc = exc
if exc_details and any(
detail in exc_details
for detail in POSSIBLE_EXC_DETAILS
):
max_retries -= 1
self.logger.error(
'Running the "%s" function again after %d'
' seconds',
func.__name__,
self.retry_timeout,
)
sleep(self.retry_timeout)
continue
raise
raise last_exc

return wrapped

@retry
def login(self):
encoded_database = self.encode(self.database)
super().login(
Expand Down Expand Up @@ -195,7 +203,11 @@ def get_size_format(
return f'{value:.2f} Y{suffix}'

def get_directory_size(self, path: Union[str, os.PathLike]) -> int:
return sum(file.stat().st_size for file in Path(path).rglob('*') if file.exists())
return sum(
file.stat().st_size
for file in Path(path).rglob('*')
if file.exists()
)

def get_file_size(self, file_path: Union[str, os.PathLike]) -> int:
return Path(file_path).stat().st_size
Expand Down Expand Up @@ -347,7 +359,7 @@ def verified_set(
except RpcError:
return {'error': format_exc()}

@retry(possible_exc_details=['Connection timed out', 'Socket closed'])
@retry
def notarize(
self,
key: str,
Expand Down Expand Up @@ -420,7 +432,7 @@ def notarize_git_repo(
value=payload,
)

@retry(possible_exc_details=['Connection timed out', 'Socket closed'])
@retry
def authenticate(
self,
key: Union[str, bytes],
Expand Down
Loading