diff --git a/clipnotify.c b/clipnotify.c new file mode 100644 index 0000000..c5de7ac --- /dev/null +++ b/clipnotify.c @@ -0,0 +1,96 @@ +#include +#include +#include +#include +#include +#include +#include + +static enum selections { + NONE = 0, + SELECTION_CLIPBOARD = (1 << 0), + SELECTION_PRIMARY = (1 << 1), + SELECTION_SECONDARY = (1 << 2) +} selections = NONE; + +static int loop; + +int main(int argc, char *argv[]) { + static const char *usage = + "%s: Notify by exiting on clipboard events.\n\n" + " -l Instead of exiting, print a newline when a new selection is available.\n" + " -s The selection to use. Available selections:\n" + " clipboard, primary, secondary\n" + " The default is to monitor clipboard and primary.\n"; + Display *disp; + Window root; + Atom clip; + XEvent evt; + int opt; + + while ((opt = getopt(argc, argv, "hs:l")) != -1) { + switch (opt) { + case 'h': + printf(usage, argv[0]); + return EXIT_SUCCESS; + case 'l': + loop = 1; + break; + case 's': { + char *token = strtok(optarg, ","); + while (token != NULL) { + if (strcmp(token, "clipboard") == 0) { + selections |= SELECTION_CLIPBOARD; + } else if (strcmp(token, "primary") == 0) { + selections |= SELECTION_PRIMARY; + } else if (strcmp(token, "secondary") == 0) { + selections |= SELECTION_SECONDARY; + } else { + fprintf(stderr, "Unknown selection '%s'\n", token); + return EXIT_FAILURE; + } + token = strtok(NULL, ","); + } + break; + } + default: + fprintf(stderr, usage, argv[0]); + return EXIT_FAILURE; + } + } + + disp = XOpenDisplay(NULL); + if (!disp) { + fprintf(stderr, "Can't open X display\n"); + return EXIT_FAILURE; + } + + root = DefaultRootWindow(disp); + + clip = XInternAtom(disp, "CLIPBOARD", False); + + /* <= 1.0.2 backwards compatibility */ + if (!selections) + selections = SELECTION_CLIPBOARD | SELECTION_PRIMARY; + + if (selections & SELECTION_CLIPBOARD) + XFixesSelectSelectionInput(disp, root, clip, + XFixesSetSelectionOwnerNotifyMask); + if (selections & SELECTION_PRIMARY) + XFixesSelectSelectionInput(disp, root, XA_PRIMARY, + XFixesSetSelectionOwnerNotifyMask); + if (selections & SELECTION_SECONDARY) + XFixesSelectSelectionInput(disp, root, XA_SECONDARY, + XFixesSetSelectionOwnerNotifyMask); + + if (loop) { + (void)setvbuf(stdout, NULL, _IONBF, 0); + do { + XNextEvent(disp, &evt); + printf("\n"); + } while (1); + } else { + XNextEvent(disp, &evt); + } + XCloseDisplay(disp); +} diff --git a/clipsync.py b/clipsync.py new file mode 100644 index 0000000..985c31c --- /dev/null +++ b/clipsync.py @@ -0,0 +1,423 @@ +#!/usr/bin/env python3 +"""Mirror X selections (e.g. inside Xvfb) into the Wayland clipboard.""" + +from __future__ import annotations + +import argparse +import hashlib +import os +import queue +import shutil +import signal +import subprocess +import sys +import threading +import time +from dataclasses import dataclass +from typing import Dict, Iterable, List, Optional, Sequence, Callable + +DESKTOP = os.environ.get("XDG_CURRENT_DESKTOP", "") +DEFAULT_INTERVAL = float(os.environ.get("CLIPSYNC_INTERVAL", "0.3")) +DEFAULT_SELECTIONS: Sequence[str] = ("clipboard", "primary") +DEFAULT_TEXT_MIME = "text/plain;charset=utf-8" +DEFAULT_BINARY_MIME = "application/octet-stream" +IGNORED_TARGETS = {"targets", "timestamp", "multiple", "save_targets"} +IMAGE_TARGET_PREFS = [ + "image/png", + "image/jpeg", + "image/webp", + "image/bmp", + "image/tiff", + "image/gif", +] +FILE_TARGET_PREFS = [ + "text/uri-list", +] +if DESKTOP == "GNOME": + FILE_TARGET_PREFS.insert(0, "x-special/gnome-copied-files") +elif DESKTOP == "KDE": + FILE_TARGET_PREFS.insert(0, "application/x-kde-cutselection") +TEXT_TARGET_PREFS = [ + "text/plain;charset=utf-8", + "text/plain", + "text/plain; charset=utf-8", + "text/plain;charset=UTF-8", + "utf8_string", + "string", + "text", + "text/html", + "text/richtext", +] +TYPE_ALIASES = { + "utf8_string": DEFAULT_TEXT_MIME, + "text": DEFAULT_TEXT_MIME, + "string": "text/plain", + "text/plain; charset=utf-8": DEFAULT_TEXT_MIME, + "text/plain;charset=utf-8": DEFAULT_TEXT_MIME, + "text/plain;charset=utf-16": "text/plain", + "text/plain;charset=utf16": "text/plain", +} + + +def build_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser( + description="Continuously mirror the X clipboard and primary selection into the Wayland clipboard via wl-copy.", + ) + parser.add_argument( + "--interval", + type=float, + default=DEFAULT_INTERVAL, + help="Maximum wait (seconds) before checking for shutdown. Defaults to %(default)s.", + ) + parser.add_argument( + "--display", + default=os.environ.get("DISPLAY"), + help="X display/socket to talk to. Defaults to $DISPLAY.", + ) + parser.add_argument( + "--debug", + action="store_true", + help="Emit verbose debug logs.", + ) + parser.add_argument( + "command", + nargs=argparse.REMAINDER, + help="Command to exec after the sync loop starts. Leave empty to keep syncing until interrupted.", + ) + return parser + + +@dataclass +class ClipboardSelection: + x_target: str + wl_type: str + category: str + + +def require_binary(name: str) -> None: + if shutil.which(name) is None: + print(f"error: {name} not found in PATH", file=sys.stderr) + sys.exit(1) + + +def log(message: str, *, debug: bool = False, enabled: bool = False) -> None: + if debug and not enabled: + return + prefix = "[debug] " if debug else "" + print(prefix + message, file=sys.stderr) + + +def lower_map(targets: Iterable[str]) -> Dict[str, str]: + return {t.lower(): t for t in targets} + + +def pick_preferred( + targets: Sequence[str], + preferences: Sequence[str], + *, + extra_predicate: Optional[Callable] = None, +) -> Optional[str]: + mapping = lower_map(targets) + for pref in preferences: + key = pref.lower() + if key in mapping: + return mapping[key] + if extra_predicate: + for original in targets: + if extra_predicate(original.lower()): + return original + return None + + +def classify_targets(targets: Sequence[str]) -> Optional[ClipboardSelection]: + cleaned = [t for t in targets if t and t.lower() not in IGNORED_TARGETS] + if not cleaned: + return None + + def alias_for(target: str, *, default: str) -> str: + return TYPE_ALIASES.get( + target.lower(), target if target.startswith("text/") else default + ) + + image_target = pick_preferred( + cleaned, + IMAGE_TARGET_PREFS, + extra_predicate=lambda t: t.startswith("image/"), + ) + if image_target: + return ClipboardSelection(image_target, image_target, "image") + + file_target = pick_preferred( + cleaned, + FILE_TARGET_PREFS, + ) + if file_target: + wl_type = TYPE_ALIASES.get(file_target.lower(), file_target) + return ClipboardSelection(file_target, wl_type, "files") + + text_target = pick_preferred( + cleaned, + TEXT_TARGET_PREFS, + extra_predicate=lambda t: t.startswith("text/") + or t in {"utf8_string", "string"}, + ) + if text_target: + wl_type = alias_for(text_target, default=DEFAULT_TEXT_MIME) + return ClipboardSelection(text_target, wl_type, "text") + + fallback = cleaned[0] + wl_type = TYPE_ALIASES.get(fallback.lower(), DEFAULT_BINARY_MIME) + return ClipboardSelection(fallback, wl_type, "binary") + + +def run_xclip( + selection: str, + args: Sequence[str], + *, + env: Dict[str, str], + text: bool = False, +) -> subprocess.CompletedProcess: + base = ["xclip", "-selection", selection] + return subprocess.run( + base + list(args), + env=env, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + check=True, + text=text, + timeout=5, + ) + + +def current_targets( + selection: str, env: Dict[str, str], *, debug_enabled: bool +) -> Sequence[str]: + try: + proc = run_xclip(selection, ["-out", "-target", "TARGETS"], env=env, text=True) + except subprocess.CalledProcessError as exc: + log( + f"failed to query TARGETS for {selection}: {exc.stderr.strip() or exc}", + debug=True, + enabled=debug_enabled, + ) + return [] + return [line.strip() for line in proc.stdout.splitlines() if line.strip()] + + +def read_target( + selection: str, + target: str, + env: Dict[str, str], + *, + debug_enabled: bool, +) -> Optional[bytes]: + try: + proc = run_xclip(selection, ["-out", "-target", target], env=env, text=False) + return proc.stdout + except subprocess.CalledProcessError as exc: + err = exc.stderr.decode().strip() if exc.stderr else str(exc) + log( + f"failed to read target {target} for {selection}: {err}", + debug=True, + enabled=debug_enabled, + ) + return None + + +def copy_to_wayland(data: bytes, wl_type: str) -> bool: + try: + subprocess.run( + ["wl-copy", "--type", wl_type], + input=data, + check=True, + ) + return True + except subprocess.CalledProcessError as exc: + log(f"wl-copy failed: {exc}", enabled=True) + return False + + +def fingerprint(target: str, data: bytes) -> str: + return hashlib.sha256(target.encode("utf-8") + b"\0" + data).hexdigest() + + +def handle_selection( + selection: str, + env: Dict[str, str], + last_marks: Dict[str, Optional[str]], + last_wayland_mark: Optional[str], + debug: bool, +) -> Optional[str]: + targets = current_targets(selection, env, debug_enabled=debug) + info = classify_targets(targets) + if not info: + if last_marks.get(selection) is not None: + log(f"{selection} selection empty; waiting", debug=True, enabled=debug) + last_marks[selection] = None + return last_wayland_mark + + payload = read_target(selection, info.x_target, env, debug_enabled=debug) + if payload is None: + return last_wayland_mark + + mark = fingerprint(info.wl_type, payload) + if mark == last_marks.get(selection) or mark == last_wayland_mark: + last_marks[selection] = mark + return last_wayland_mark + + if copy_to_wayland(payload, info.wl_type): + last_marks[selection] = mark + log( + f"Wayland clipboard updated from {selection} ({info.category}) via '{info.wl_type}' ({len(payload)} bytes)", + enabled=True, + ) + return mark + + return last_wayland_mark + + +def clipnotify_worker( + selection: str, + env: Dict[str, str], + event_queue: "queue.Queue[str]", + stop_event: threading.Event, + debug: bool, +) -> None: + event_queue.put(selection) + args = ["clipnotify", "-s", selection, "-l"] + while not stop_event.is_set(): + try: + proc = subprocess.Popen( + args, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + bufsize=1, + env=env, + ) + except FileNotFoundError: + log("clipnotify not found in PATH", enabled=True) + stop_event.set() + return + + try: + while not stop_event.is_set(): + line = proc.stdout.readline() + if not line: + if proc.poll() is not None: + err = proc.stderr.read().strip() if proc.stderr else "" + log( + f"clipnotify for {selection} exited unexpectedly: {err or 'no details'}", + enabled=True, + ) + stop_event.set() + return + continue + event_queue.put(selection) + finally: + if proc.poll() is None: + proc.terminate() + try: + proc.wait(timeout=0.5) + except subprocess.TimeoutExpired: + proc.kill() + proc.wait(timeout=0.5) + time.sleep(0.1) + + +def mirror_clipboards( + env: Dict[str, str], + interval: float, + debug: bool, + stop_event: threading.Event, +) -> None: + selections = list(DEFAULT_SELECTIONS) + event_queue: "queue.Queue[str]" = queue.Queue() + watchers: List[threading.Thread] = [] + + for sel in selections: + thread = threading.Thread( + target=clipnotify_worker, + args=(sel, env, event_queue, stop_event, debug), + daemon=True, + ) + thread.start() + watchers.append(thread) + + last_marks: Dict[str, Optional[str]] = {sel: None for sel in selections} + last_wayland_mark: Optional[str] = None + + while not stop_event.is_set(): + try: + selection = event_queue.get(timeout=interval) + except queue.Empty: + continue + if selection not in last_marks: + continue + last_wayland_mark = handle_selection( + selection, env, last_marks, last_wayland_mark, debug + ) + + for thread in watchers: + thread.join(timeout=interval) + + +def run_wrapped_command(command: Sequence[str]) -> int: + if not command: + return 0 + proc = subprocess.Popen(command) + try: + return proc.wait() + except KeyboardInterrupt: + proc.send_signal(signal.SIGINT) + return proc.wait() + + +def main() -> int: + parser = build_parser() + args = parser.parse_args() + + require_binary("xclip") + require_binary("wl-copy") + require_binary("clipnotify") + + if not args.display: + print( + "error: --display was not provided and $DISPLAY is unset", file=sys.stderr + ) + return 1 + + env = dict(os.environ) + env["DISPLAY"] = args.display + + command = list(args.command) + if command and command[0] == "--": + command = command[1:] + + stop_event = threading.Event() + worker = threading.Thread( + target=mirror_clipboards, + args=(env, args.interval, args.debug, stop_event), + daemon=True, + ) + worker.start() + + exit_code = 0 + try: + if command: + exit_code = run_wrapped_command(command) + else: + while not stop_event.wait(1): + continue + except KeyboardInterrupt: + log("Interrupted, exiting...", enabled=True) + exit_code = 130 + finally: + stop_event.set() + worker.join(timeout=args.interval * 2) + + return exit_code + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/com.qq.QQ.yaml b/com.qq.QQ.yaml index cde5363..8f442f1 100644 --- a/com.qq.QQ.yaml +++ b/com.qq.QQ.yaml @@ -92,11 +92,63 @@ modules: # https://github.com/flathub/org.flatpak.Builder.BaseApp/blob/branch/25.08/xvfb.json - xvfb.json + # https://github.com/flathub/io.neovim.nvim/blob/a10af66097bbfff4a98286d215f183d4909aa1e6/io.neovim.nvim.yaml#L186 + - name: xclip + sources: + - type: archive + url: https://github.com/astrand/xclip/archive/0.13.tar.gz + sha256: ca5b8804e3c910a66423a882d79bf3c9450b875ac8528791fb60ec9de667f758 + cleanup: + - /share/man + - /bin/xclip-*file + modules: + - name: libXmu + config-opts: + - --disable-static + - --disable-docs + sources: + - type: archive + url: https://www.x.org/releases/individual/lib/libXmu-1.1.3.tar.bz2 + sha256: 9c343225e7c3dc0904f2122b562278da5fed639b1b5e880d25111561bac5b731 + cleanup: + - /include + - /lib/pkgconfig + - "*.la" + + # https://github.com/flathub/io.neovim.nvim/blob/a10af66097bbfff4a98286d215f183d4909aa1e6/io.neovim.nvim.yaml#L169 + - name: wl-clipboard + buildsystem: meson + config-opts: + - -Dzshcompletiondir=no + - -Dfishcompletiondir=no + sources: + - type: archive + url: https://github.com/bugaevc/wl-clipboard/archive/v2.2.1.tar.gz + sha256: 6eb8081207fb5581d1d82c4bcd9587205a31a3d47bea3ebeb7f41aa1143783eb + x-checker-data: + type: anitya + project-id: 49082 + url-template: https://github.com/bugaevc/wl-clipboard/archive/v$version.tar.gz + + cleanup: + - /share/man + + # https://github.com/cdown/clipnotify + - name: clipnotify + buildsystem: simple + build-commands: + - gcc -o clipnotify clipnotify.c -O2 -lX11 -lXfixes + - install -Dm755 clipnotify $FLATPAK_DEST/bin/clipnotify + sources: + - type: file + path: clipnotify.c + - name: qq buildsystem: simple build-commands: - install apply_extra $FLATPAK_DEST/bin - install -Dm755 qq.sh $FLATPAK_DEST/bin/qq + - install -Dm755 clipsync.py $FLATPAK_DEST/bin/clipsync.py - install -Dm644 com.qq.QQ.metainfo.xml -t $FLATPAK_DEST/share/metainfo - install -Dm644 com.qq.QQ.png -t $FLATPAK_DEST/share/icons/hicolor/512x512/apps - install -Dm644 com.qq.QQ.desktop -t $FLATPAK_DEST/share/applications @@ -119,7 +171,7 @@ modules: - | if [ -z "$(ls -A /tmp/.X11-unix 2>/dev/null)" ] && [ -n "$WAYLAND_DISPLAY" ]; then echo "X11 socket is not available, using Wayland + Xvfb..." - exec xvfb-run zypak-wrapper /app/extra/QQ/qq "${FLAGS[@]}" "$@" + exec xvfb-run clipsync.py zypak-wrapper /app/extra/QQ/qq "${FLAGS[@]}" "$@" else exec zypak-wrapper /app/extra/QQ/qq "${FLAGS[@]}" "$@" fi @@ -140,6 +192,9 @@ modules: - type: file path: com.qq.QQ.desktop + + - type: file + path: clipsync.py - type: extra-data filename: qq.deb