Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion BlocksScreen.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,10 @@ host: localhost
port: 7125

[screensaver]
timeout: 5000
timeout: 5000

[credentials]
username:

[udiskie]
mount_path: /media/
39 changes: 31 additions & 8 deletions BlocksScreen/helper_methods.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,15 @@


import ctypes
import os
import enum
import hashlib
import logging
import os
import pathlib
import struct
import subprocess
import typing


try:
ctypes.cdll.LoadLibrary("libXext.so.6")
libxext = ctypes.CDLL("libXext.so.6")
Expand Down Expand Up @@ -348,11 +349,33 @@ def get_file_name(filename: typing.Optional[str]) -> str:
return parts[-1] if filename else ""


# def get_hash(data) -> hashlib._Hash:
# hash = hashlib.sha256()
# hash.update(data.encode())
# hash.digest()
# return hash
def is_process_running(process_name: str) -> bool:
"""Verify if `process_name` is running on the local machine

Args:
process_name (str): Process Name

Returns:
bool: True if running, False otherwise
"""
try:
subprocess.check_output(["pgrep", process_name])
return True
except subprocess.CalledProcessError:
return False


def sha256_checksum(filepath: str) -> str:
"""Calculates the `SHA256` of a given file

def digest_hash() -> None: ...
Args:
filepath (str): File location path

Returns:
str: SHA256 for a given file
"""
h = hashlib.sha256()
with open(filepath, "rb") as f:
while chunk := f.read(8192):
h.update(chunk)
return h.hexdigest()
84 changes: 84 additions & 0 deletions BlocksScreen/lib/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,23 @@
#
from __future__ import annotations

import configparser
import getpass
import logging
import os
import typing
from pathlib import Path
from typing import Optional

import events
from configfile import BlocksScreenConfig
from events import ReceivedFileData
from helper_methods import is_process_running
from lib.moonrakerComm import MoonWebSocket
from PyQt6 import QtCore, QtGui, QtWidgets

_logger = logging.getLogger(name="logs/BlocksScreen.log")


class Files(QtCore.QObject):
request_file_list = QtCore.pyqtSignal([], [str], name="api-get-files-list")
Expand Down Expand Up @@ -42,6 +51,8 @@ def __init__(
self.files: list = []
self.directories: list = []
self.files_metadata: dict = {}
self._current_username: Optional[str] = None
self._udiskie_mouting_path: Optional[str] = None
self.request_file_list.connect(slot=self.ws.api.get_file_list)
self.request_file_list[str].connect(slot=self.ws.api.get_file_list)
self.request_dir_info.connect(slot=self.ws.api.get_dir_information)
Expand All @@ -51,12 +62,35 @@ def __init__(
self.request_files_thumbnails.connect(slot=self.ws.api.get_gcode_thumbnail)
self.request_file_download.connect(slot=self.ws.api.download_file)
QtWidgets.QApplication.instance().installEventFilter(self) # type: ignore
self.parse_config(parent.config)
self.check_usb_symlink_local()

@property
def file_list(self):
"""Available files list"""
return self.files

def parse_config(self, config: BlocksScreenConfig) -> None:
"""Parses the credential configs to get the user username"""
try:
self.credentials_config = config.get_section("credentials", fallback=None)
if self.credentials_config:
self._current_username = self.credentials_config.get(
"username", parser=str, default=""
)
except configparser.NoSectionError as e:
_logger.info("Error: %s", e)
self._current_username = ""
try:
self.udiskie = config.get_section("udiskie", fallback=None)
if self.udiskie:
self._udiskie_mouting_path = self.udiskie.get(
"mount_path", parser=str, default=""
)
except configparser.NoSectionError as e:
_logger.info("Error: %s", e)
self._udiskie_mouting_path = ""

def handle_message_received(self, method: str, data, params: dict) -> None:
"""Handle file related messages received by moonraker"""
if "server.files.list" in method:
Expand Down Expand Up @@ -178,3 +212,53 @@ def event(self, a0: QtCore.QEvent) -> bool:
self.handle_message_received(a0.method, a0.data, a0.params)
return True
return super().event(a0)

def check_usb_symlink_local(self) -> None:
"""Check if the symlink from /media/<username> to
~/printer_data/gcodes/USB/<username> exists and create it if not"""
username = self._current_username
if self._current_username == "":
try:
username = getpass.getuser()
except Exception as e:
_logger.info("Error retrieving username: %s", e)

home = Path.home()
if self._udiskie_mouting_path == "":
mouting_dir = Path("/media/") / username
else:
mouting_dir = Path(self._udiskie_mouting_path + username)

dir_path = home / "printer_data/gcodes/USB/"
symlink_src = dir_path / username
if not dir_path.exists():
try:
dir_path.mkdir(parents=True, exist_ok=True)
except PermissionError:
_logger.info(
"Permission denied — adjust directory permissions or run with elevated privileges"
)
except Exception as e:
_logger.info("Error: %s", e.with_traceback)

if not mouting_dir.exists():
try:
mouting_dir.mkdir(parents=True, exist_ok=True)
except PermissionError:
_logger.info(
"Permission denied — adjust directory permissions or run with elevated privileges"
)
except Exception as e:
_logger.info("Error: %s ", e.with_traceback)

if not os.path.islink(symlink_src) and not is_process_running("udiskie"):
try:
os.symlink(mouting_dir, symlink_src, target_is_directory=True)
except PermissionError:
_logger.info(
"Permission denied — adjust directory permissions or run with elevated privileges"
)
except FileNotFoundError:
_logger.info("Directory not found")
except Exception as e:
_logger.info("Error: %s", e.with_traceback)
27 changes: 24 additions & 3 deletions BlocksScreen/lib/moonrakerComm.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ def __init__(self, parent: QtCore.QObject) -> None:
self.klippy_state_signal.connect(self.api.request_printer_info)
_logger.info("Websocket object initialized")

@property
def moonRest(self) -> MoonRest:
"""Returns the current moonrestAPI object"""
return self._moonRest

@QtCore.pyqtSlot(name="retry_wb_conn")
def retry_wb_conn(self):
"""Retry websocket connection"""
Expand Down Expand Up @@ -123,7 +128,7 @@ def connect(self) -> bool:
)
# TODO Handle if i cannot connect to moonraker, request server.info and see if i get a result
try:
_oneshot_token = self._moonRest.get_oneshot_token()
_oneshot_token = self.moonRest.get_oneshot_token()
if _oneshot_token is None:
raise OneShotTokenError("Unable to retrieve oneshot token")
except Exception as e:
Expand Down Expand Up @@ -362,6 +367,10 @@ def request_temperature_cached_data(self, include_monitors: bool = False):
params={"include_monitors": include_monitors},
)

def request_server_info(self):
"""Requested printer information"""
return self._ws.send_request(method="server.config")

@QtCore.pyqtSlot(name="query_printer_info")
def request_printer_info(self):
"""Requested printer information"""
Expand Down Expand Up @@ -446,6 +455,10 @@ def restart_service(self, service):
method="machine.services.restart", params={"service": service}
)

def system_info(self):
"""Returns a top level System Info object containing various attributes that report info"""
return self._ws.send_request(method="machine.system_info")

@QtCore.pyqtSlot(name="firmware_restart")
def firmware_restart(self):
"""Request Klipper firmware restart
Expand Down Expand Up @@ -560,12 +573,12 @@ def download_file(self, root: str, filename: str):
filename (str): file to download

Returns:
_type_: _description_
dict: The body of the response contains the contents of the requested file.
"""
if not isinstance(filename, str) or not isinstance(root, str):
return False

return self._ws._moonRest.get_request(f"/server/files/{root}/{filename}")
return self._ws.moonRest.get_request(f"/server/files/{root}/{filename}")

@QtCore.pyqtSlot(name="api-get-dir-info")
@QtCore.pyqtSlot(str, name="api-get-dir-info")
Expand Down Expand Up @@ -789,6 +802,14 @@ def rollback_update(self, name: str):
method="machine,update.rollback", params={"name": name}
)

def get_user(self):
"""Request current username"""
return self._ws.send_request(method="access.get_user")

def get_user_list(self):
"""Request users list"""
return self._ws.send_request(method="access.users.list")

def history_list(self, limit, start, since, before, order):
"""Request Job history list"""
raise NotImplementedError
Expand Down
Loading
Loading