Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
311 changes: 188 additions & 123 deletions qsticky.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ def __init__(self):
self.logger = self._setup_logger()
self.current_port: Optional[int] = None
self.session: Optional[aiohttp.ClientSession] = None
self.qbit_authenticated = False
self.base_url = f"{'https' if self.settings.qbittorrent_https else 'http'}://{self.settings.qbittorrent_host}:{self.settings.qbittorrent_port}"
self.gluetun_base_url = f"http://{self.settings.gluetun_host}:{self.settings.gluetun_port}"
self.start_time = datetime.now()
Expand Down Expand Up @@ -130,98 +131,186 @@ def _get_qbit_cookie_jar(self) -> Optional[aiohttp.CookieJar]:
async def get_current_qbit_port(self) -> Optional[int]:
self.logger.debug("Retrieving current qBittorrent port")
try:
async with self.session.get(f"{self.base_url}/api/v2/app/preferences", timeout=ClientTimeout(total=10)) as response:
if response.status == 200:
prefs = await response.json()
if prefs is None:
self.logger.error("Got None response from preferences API")
return None
port = prefs.get('listen_port')
self.logger.debug(f"Current qBittorrent port: {port}")
return port
else:
self.logger.error(f"Failed to get preferences: {response.status}")
status, content = await self._qbit_request(
"GET",
"/api/v2/app/preferences",
timeout=ClientTimeout(total=10)
)
if status == 200 and content is not None:
prefs = json.loads(content)
if prefs is None:
self.logger.error("Got None response from preferences API")
return None
port = prefs.get('listen_port')
self.logger.debug(f"Current qBittorrent port: {port}")
return port

self.logger.error(f"Failed to get preferences: {status}")
return None
except json.JSONDecodeError as e:
self.logger.error(f"Failed to parse preferences response: {str(e)}")
return None
except Exception as e:
self.logger.error(f"Error getting current port: {str(e)}")
return None

async def _init_session(self) -> None:
if self.session is None:
self.logger.debug("Initializing new aiohttp session")
timeout = ClientTimeout(
total=30,
connect=10,
sock_connect=10,
sock_read=10
)

# https://github.com/monstermuffin/qSticky/issues/53
ssl_context = None
if self.settings.qbittorrent_https:
if not self.settings.qbittorrent_verify_ssl:
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
self.logger.debug("SSL verification disabled (default)")
else:
self.logger.debug("SSL verification enabled")

connector = aiohttp.TCPConnector(ssl=ssl_context)
self.session = aiohttp.ClientSession(
timeout=timeout,
connector=connector,
cookie_jar=self._get_qbit_cookie_jar()
)
self.logger.debug("Session initialized with timeouts")
if self.session is not None and not self.session.closed:
return

self.logger.debug("Initializing new qBittorrent aiohttp session")
timeout = ClientTimeout(
total=30,
connect=10,
sock_connect=10,
sock_read=10
)

# https://github.com/monstermuffin/qSticky/issues/53
ssl_context = None
if self.settings.qbittorrent_https:
if not self.settings.qbittorrent_verify_ssl:
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
self.logger.debug("SSL verification disabled (default)")
else:
self.logger.debug("SSL verification enabled")

connector = aiohttp.TCPConnector(ssl=ssl_context)
self.session = aiohttp.ClientSession(
timeout=timeout,
connector=connector,
cookie_jar=self._get_qbit_cookie_jar()
)
self.qbit_authenticated = False
self.logger.debug("qBittorrent session initialized with timeouts")

async def _reset_qbit_session(self) -> None:
if self.session is not None and not self.session.closed:
await self.session.close()
self.logger.debug("Closed qBittorrent aiohttp session")

self.session = None
self.qbit_authenticated = False

async def _ensure_qbit_login(self) -> bool:
await self._init_session()

if self.qbit_authenticated:
return True

return await self._login()

async def _login(self) -> bool:
try:
await self._init_session()
async with self.session.post(
f"{self.base_url}/api/v2/auth/login",
data={
"username": self.settings.qbittorrent_user,
"password": self.settings.qbittorrent_pass
}
) as response:
if response.status == 200:
content = (await response.text()).strip()
if response.status == 200 and content == "Ok.":
if self.first_run or self.last_login_failed:
self.logger.info("Successfully logged in to qBittorrent")
self.last_login_failed = False
self.qbit_authenticated = True
self.health_status.healthy = True
self.health_status.last_error = None
return True
else:
self.logger.error(f"Login failed with status {response.status}")
self.health_status.healthy = False
self.health_status.last_error = f"Login failed: {response.status}"
self.last_login_failed = True
return False

self.logger.error(
f"Login failed with status {response.status}: {content or 'empty response'}"
)
self.health_status.healthy = False
self.health_status.last_error = (
f"Login failed: {response.status} {content}".strip()
)
self.last_login_failed = True
self.qbit_authenticated = False
return False
except Exception as e:
self.logger.error(f"Login error: {str(e)}")
self.health_status.healthy = False
self.health_status.last_error = f"Login error: {str(e)}"
self.last_login_failed = True
self.qbit_authenticated = False
return False

async def _qbit_request(
self,
method: str,
path: str,
*,
retry: bool = True,
**kwargs: Any
) -> tuple[Optional[int], Optional[str]]:
if not await self._ensure_qbit_login():
return None, None

try:
async with self.session.request(
method,
f"{self.base_url}{path}",
**kwargs
) as response:
content = await response.text()

if response.status in (401, 403) and retry:
self.logger.warning(
f"qBittorrent request to {path} returned {response.status}, recreating session"
)
await self._reset_qbit_session()
return await self._qbit_request(
method,
path,
retry=False,
**kwargs
)

return response.status, content
except (aiohttp.ClientError, asyncio.TimeoutError) as e:
if retry:
self.logger.warning(
f"qBittorrent request to {path} failed: {str(e)}, recreating session"
)
await self._reset_qbit_session()
return await self._qbit_request(
method,
path,
retry=False,
**kwargs
)

self.logger.error(f"qBittorrent request to {path} failed: {str(e)}")
return None, None

async def _update_port(self, new_port: int) -> bool:
if not isinstance(new_port, int) or new_port < 1024 or new_port > 65535:
self.logger.error(f"Invalid port value: {new_port}")
return False

try:
async with self.session.post(
f"{self.base_url}/api/v2/app/setPreferences",
status, _ = await self._qbit_request(
"POST",
"/api/v2/app/setPreferences",
data={'json': f'{{"listen_port":{new_port}}}'}
) as response:
if response.status == 200:
verified_port = await self.get_current_qbit_port()
if verified_port == new_port:
self.current_port = new_port
self.health_status.last_port_change = datetime.now()
return True
else:
self.logger.error(f"Port verification failed: expected {new_port}, got {verified_port}")
return False
else:
self.logger.error(f"Failed to update port: {response.status}")
return False
)
if status == 200:
verified_port = await self.get_current_qbit_port()
if verified_port == new_port:
self.current_port = new_port
self.health_status.last_port_change = datetime.now()
return True

self.logger.error(f"Port verification failed: expected {new_port}, got {verified_port}")
return False

self.logger.error(f"Failed to update port: {status}")
return False
except Exception as e:
self.logger.error(f"Port update error: {str(e)}")
return False
Expand Down Expand Up @@ -305,67 +394,46 @@ async def _get_forwarded_port(self) -> Optional[int]:
return None

async def handle_port_change(self) -> None:
# https://github.com/monstermuffin/qSticky/issues/53
ssl_context = None
if self.settings.qbittorrent_https:
if not self.settings.qbittorrent_verify_ssl:
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE

connector = aiohttp.TCPConnector(ssl=ssl_context)
async with aiohttp.ClientSession(
timeout=ClientTimeout(total=30),
connector=connector,
cookie_jar=self._get_qbit_cookie_jar()
) as session:
self.session = session
try:
new_port = await self._get_forwarded_port()
if not new_port:
self.health_status.healthy = False
return

if not await self._login():
self.health_status.healthy = False
return

current_qbit_port = await self.get_current_qbit_port()
if current_qbit_port is None:
self.health_status.healthy = False
return

self.current_port = new_port
self.health_status.healthy = True

if current_qbit_port != new_port:
self.logger.info(f"Port change needed: {current_qbit_port} -> {new_port}")
if await self._update_port(new_port):
self.health_status.last_port_change = datetime.now()
verified_port = await self.get_current_qbit_port()
if verified_port == new_port:
self.logger.info(f"Successfully updated port to {new_port}")
self.current_port = new_port
else:
self.logger.error(f"Port change verification failed - expected {new_port}, got {verified_port}")
self.health_status.healthy = False
self.health_status.last_error = "Port change verification failed"
else:
if self.first_run:
self.logger.info(f"Initial port check: {new_port} already set correctly")
try:
new_port = await self._get_forwarded_port()
if not new_port:
self.health_status.healthy = False
return

current_qbit_port = await self.get_current_qbit_port()
if current_qbit_port is None:
self.health_status.healthy = False
return

self.current_port = new_port
self.health_status.healthy = True

if current_qbit_port != new_port:
self.logger.info(f"Port change needed: {current_qbit_port} -> {new_port}")
if await self._update_port(new_port):
self.health_status.last_port_change = datetime.now()
verified_port = await self.get_current_qbit_port()
if verified_port == new_port:
self.logger.info(f"Successfully updated port to {new_port}")
self.current_port = new_port
else:
self.logger.debug(f"Port {new_port} already set correctly")
self.current_port = current_qbit_port
self.logger.error(f"Port change verification failed - expected {new_port}, got {verified_port}")
self.health_status.healthy = False
self.health_status.last_error = "Port change verification failed"
else:
if self.first_run:
self.logger.info(f"Initial port check: {new_port} already set correctly")
else:
self.logger.debug(f"Port {new_port} already set correctly")
self.current_port = current_qbit_port

await self.update_health_file()
self.first_run = False
await self.update_health_file()
self.first_run = False

except Exception as e:
self.health_status.healthy = False
self.health_status.last_error = str(e)
await self.update_health_file()
finally:
self.session = None
except Exception as e:
self.health_status.healthy = False
self.health_status.last_error = str(e)
await self.update_health_file()

async def get_health(self) -> Dict[str, Any]:
now = datetime.now()
Expand Down Expand Up @@ -459,9 +527,7 @@ async def watch_port(self) -> None:
await asyncio.sleep(5)

async def cleanup(self) -> None:
if self.session:
await self.session.close()
self.logger.debug("Closed aiohttp session")
await self._reset_qbit_session()
try:
if os.path.exists(self.health_file):
os.remove(self.health_file)
Expand All @@ -479,8 +545,7 @@ def setup_signal_handlers(self):
async def shutdown(self):
self.logger.info("Starting graceful shutdown...")
self.shutdown_event.set()
if self.session:
await self.session.close()
await self._reset_qbit_session()
self.logger.info("Shutdown complete")

async def main() -> None:
Expand Down