-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcommon.py
More file actions
171 lines (137 loc) · 5.37 KB
/
common.py
File metadata and controls
171 lines (137 loc) · 5.37 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
import subprocess
import requests
from typing import Any, Callable, Iterable, Optional, TypeVar
import os
import sys
import asyncio
from pprint import pprint
T = TypeVar("T")
def identity(x: T) -> T:
"return the input unchanged"
return x
def fzf_iterable(iterable: Iterable) -> str:
"""
Given an iterable, shells out to fzf with all the values and returns the chosen value.
"""
fzf_command = ["fzf"]
# Prepare the input for fzf
input_str = "\n".join(str(item) for item in iterable)
# Call fzf and get the chosen value
with subprocess.Popen(
fzf_command, stdin=subprocess.PIPE, stdout=subprocess.PIPE, text=True
) as proc:
stdout, _ = proc.communicate(input_str)
return stdout.strip()
def fzf_choose(
inp: Iterable,
display_func: Callable = identity,
output_func: Callable = identity,
sortby_func: Optional[Callable] = None,
**kwargs,
) -> Any:
"""
Input: any iterable (which is indexable)
* interactive prompt (fzf) *
Output: a single value from that iterable
TODO: add way to hide numbers on the left, aka do this without indexing
"""
choices = []
if sortby_func:
inp = sorted(inp, key=sortby_func)
for idx, val in enumerate(inp):
choices.append(f"{idx:2} {display_func(val)}")
choice_idx = int(fzf_iterable(choices, **kwargs).strip().split(" ")[0])
return output_func(inp[choice_idx])
def get_url(url: str, execute_js: bool = False):
"""
get a webpage's html, optionally render javascript
thanks to https://pypi.org/project/requests-html/
"""
from requests_html import HTMLResponse, HTMLSession
html_session = HTMLSession()
req = html_session.get(url)
if execute_js:
req.html.render(timeout=20)
return req
def get_processes():
"""
Parse the output of `ps aux` into a list of dictionaries representing the parsed
process information from each row of the output. Keys are mapped to column names,
parsed from the first line of the process' output.
:rtype: list[dict]
:returns: List of dictionaries, each representing a parsed row from the command output
"""
output = subprocess.Popen(["ps", "aux"], stdout=subprocess.PIPE).stdout.readlines()
headers = [
h for h in " ".join(output[0].decode("utf-8").strip().split()).split() if h
]
raw_data = map(lambda s: s.strip().split(None, len(headers) - 1), output[1:])
return [dict(zip(headers, r)) for r in raw_data]
def fetch_url_with_retry(
url,
max_retries=5,
request_timeout=10,
retry_timeout=3,
) -> Optional[requests.Response]:
"""
Fetch the content of a URL using the requests library with retry.
Parameters:
url (str): The URL to fetch the content from.
max_retries (int, optional): The maximum number of retry attempts. Default is 5.
retry_timeout (int, optional): The timeout in seconds between retry attempts. Default is 10.
request_timeout (int, optional): The timeout for the request in seconds. Default is 30.
Returns:
str: The content of the URL if successfully fetched, or None if all retry attempts failed.
"""
for retry in range(max_retries + 1):
try:
response = requests.get(url, timeout=request_timeout)
response.raise_for_status() # Raise an exception for non-200 status codes
return response
except (requests.RequestException, requests.HTTPError, requests.Timeout) as e:
print(f"Attempt {retry + 1}/{max_retries + 1} failed. Error: {e}")
if retry < max_retries:
print(f"Retrying in {retry_timeout} seconds...")
time.sleep(retry_timeout)
return None # Return None if all retry attempts fail
def tryInsertIntoPocketBase(record: dict = {}, db_name: str = "") -> None:
try:
insertIntoPocketBase(record, db_name)
except Exception as e:
print(f"Failed to insert into pocketbase: {e}")
def insertIntoPocketBase(record: dict = {}, db_name: str = "") -> None:
"""
Insert a record into PocketBase, specifically the bandcamps collection
params:
record: dict - the record to insert
example:
{
"url": "https://kevin.bandcamp.com/album/album",
"full_cmd": "yt-dlp...",
"cwd": "/home/kevin/Downloads",
"hostname": "kevin-arch"
}
"""
from pocketbase import PocketBase
POCKETBASE_URL = os.getenv("POCKETBASE_URL")
POCKETBASE_USERNAME = os.getenv("POCKETBASE_ADMIN_USERNAME")
POCKETBASE_PASSWORD = os.getenv("POCKETBASE_ADMIN_PASSWORD")
if not all([POCKETBASE_URL, POCKETBASE_USERNAME, POCKETBASE_PASSWORD]):
print("Missing environment variables")
sys.exit(1)
if not db_name:
print("No database name provided")
sys.exit(1)
if not all([POCKETBASE_URL, POCKETBASE_USERNAME, POCKETBASE_PASSWORD]):
print("Missing environment variables")
sys.exit(1)
pb = PocketBase(POCKETBASE_URL)
async def inner(params=record):
await pb.admins.auth.with_password(
email=POCKETBASE_USERNAME, password=POCKETBASE_PASSWORD
)
collection = pb.collection("bandcamps")
created = await collection.create(params=params)
# print what was created
pprint(created)
asyncio.run(inner(record))