-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
717 lines (615 loc) · 28.4 KB
/
main.py
File metadata and controls
717 lines (615 loc) · 28.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
import os
import requests
import logging
from fastmcp import FastMCP
# --- Configuration ---
VIKUNJA_URL = os.getenv("VIKUNJA_URL")
VIKUNJA_USERNAME = os.getenv("VIKUNJA_USERNAME")
VIKUNJA_PASSWORD = os.getenv("VIKUNJA_PASSWORD")
LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO")
DEBUG_TASK_MATCHES = os.getenv("DEBUG_TASK_MATCHES", "false").lower() in ("1", "true", "yes")
# --- MCP Application Setup ---
mcp = FastMCP()
session = requests.Session()
# Configure basic logging. Level can be controlled with LOG_LEVEL env var.
level = getattr(logging, LOG_LEVEL.upper(), logging.INFO)
logging.basicConfig(level=level, format='%(asctime)s %(levelname)s %(message)s')
logger = logging.getLogger(__name__)
if DEBUG_TASK_MATCHES:
logger.info("DEBUG_TASK_MATCHES is enabled: per-task match attempts will be logged at INFO level")
# --- Input Validation ---
if not all([VIKUNJA_URL, VIKUNJA_USERNAME, VIKUNJA_PASSWORD]):
print("Error: Please set the VIKUNJA_URL, VIKUNJA_USERNAME, and VIKUNJA_PASSWORD environment variables.")
exit(1)
@mcp.tool()
def login():
"""
Authenticates with the Vikunja API to get a session token.
"""
global session
try:
response = session.post(
f"{VIKUNJA_URL}/api/v1/login",
json={"username": VIKUNJA_USERNAME, "password": VIKUNJA_PASSWORD}
)
response.raise_for_status()
token = response.json().get("token")
if not token:
return "Login failed: Token not found in response."
session.headers.update({"Authorization": f"Bearer {token}"})
return "Login successful. Token stored for session."
except requests.exceptions.RequestException as e:
return f"Login failed: {e}"
@mcp.tool()
def search_tasks(query: str):
"""
Searches for tasks in Vikunja.
:param query: The search string to use for finding tasks.
"""
if "Authorization" not in session.headers:
return "Please run the 'login' command first."
try:
# Vikunja does not expose a /tasks/search endpoint in the public API.
# Fetch all tasks and filter client-side by title/description.
logger.info("search_tasks: fetching all tasks from %s", f"{VIKUNJA_URL}/api/v1/tasks/all")
all_tasks = []
page = 1
per_page = 50
while True:
response = session.get(f"{VIKUNJA_URL}/api/v1/tasks/all?page={page}&limit={per_page}")
response.raise_for_status()
data = response.json()
tasks = data if isinstance(data, list) else data.get("tasks", [])
if not tasks:
break
all_tasks.extend(tasks)
page += 1
tasks = all_tasks
q = (query or "").strip()
logger.info("search_tasks: raw query='%s'", q)
if not q:
logger.info("search_tasks: empty query, returning %d tasks", len(tasks))
return tasks
terms = [t.lower() for t in q.split() if t]
logger.info("search_tasks: parsed terms=%s", terms)
def matches(task: dict) -> bool:
title = (task.get("title") or "")
description = (task.get("description") or "")
title_l = title.lower()
desc_l = description.lower()
for term in terms:
if term in title_l or term in desc_l:
logger.debug("search_tasks: task id=%s matched term='%s' (title=%r, description=%r)", task.get("id"), term, title, description)
return True
else:
logger.debug("search_tasks: task id=%s did not match term='%s' (title=%r)", task.get("id"), term, title)
return False
filtered = [t for t in tasks if matches(t)]
logger.info("search_tasks: fetched=%d filtered=%d", len(tasks), len(filtered))
return filtered
except requests.exceptions.RequestException as e:
logger.exception("search_tasks: request failed")
return f"Error searching tasks: {e}"
@mcp.tool()
def active_tasks(project_id: int = None, is_favorite: bool = None):
"""
Returns a list of active tasks (where 'done' is False) from Vikunja.
:param project_id: Optional project ID to filter tasks by project.
:param is_favorite: Optional filter to return only tasks matching favorite status.
"""
if "Authorization" not in session.headers:
return "Please run the 'login' command first."
try:
logger.info("active_tasks: fetching all tasks from %s", f"{VIKUNJA_URL}/api/v1/tasks/all")
all_tasks = []
page = 1
per_page = 50
while True:
response = session.get(f"{VIKUNJA_URL}/api/v1/tasks/all?page={page}&limit={per_page}")
response.raise_for_status()
data = response.json()
tasks = data if isinstance(data, list) else data.get("tasks", [])
if not tasks:
break
all_tasks.extend(tasks)
page += 1
# Filter active tasks (not done)
active = [t for t in all_tasks if not t.get("done", False)]
# If project_id provided, further filter by task's project_id
if project_id is not None:
before = len(active)
active = [t for t in active if t.get("project_id") == project_id or t.get("projectID") == project_id]
logger.info("active_tasks: filtered by project_id=%s before=%d after=%d", project_id, before, len(active))
# If is_favorite provided, filter by is_favorite / is_favourite field
if is_favorite is not None:
before = len(active)
def fav_val(task):
return bool(task.get("is_favorite", task.get("is_favourite", False)))
active = [t for t in active if fav_val(t) == bool(is_favorite)]
logger.info("active_tasks: filtered by is_favorite=%s before=%d after=%d", is_favorite, before, len(active))
logger.info("active_tasks: fetched=%d active=%d", len(all_tasks), len(active))
return active
except requests.exceptions.RequestException as e:
logger.exception("active_tasks: request failed")
return f"Error retrieving active tasks: {e}"
@mcp.tool()
def get_task_details(task_id: int):
"""
Gets task details in Vikunja.
:param task_id: The ID of the task of which to fetch comments.
"""
if "Authorization" not in session.headers:
return "Please run the 'login' command first."
try:
response = session.get(f"{VIKUNJA_URL}/api/v1/tasks/{task_id}")
response.raise_for_status()
details = response.json()
if not details:
return "No details found for this task."
# Helpers to produce compact readable strings for nested structures
def join_names(items):
if not items:
return "[]"
out = []
for it in items:
if not isinstance(it, dict):
out.append(str(it))
continue
out.append(str(it.get("name") or it.get("username") or it.get("id") or it))
return ", ".join(out)
def labels_list(labels):
if not labels:
return "[]"
out = []
for l in labels:
if isinstance(l, dict):
out.append(str(l.get("title") or l.get("name") or l.get("id") or l))
else:
out.append(str(l))
return ", ".join(out)
fetched_id = details.get("id", "N/A")
title = details.get("title", "N/A")
identifier = details.get("identifier", "N/A")
created = details.get("created", "N/A")
updated = details.get("updated", "N/A")
created_by = details.get("created_by") or {}
created_by_str = f"{created_by.get('name','N/A')} (id={created_by.get('id','N/A')}, email={created_by.get('email','N/A')}, username={created_by.get('username','N/A')})"
description = details.get("description", "N/A")
done = details.get("done", "N/A")
done_at = details.get("done_at", "N/A")
percent_done = details.get("percent_done", "N/A")
due_date = details.get("due_date", "N/A")
start_date = details.get("start_date", "N/A")
end_date = details.get("end_date", "N/A")
repeat_mode = details.get("repeat_mode", "N/A")
repeat_after = details.get("repeat_after", "N/A")
project_id = details.get("project_id", "N/A")
bucket_id = details.get("bucket_id", "N/A")
# Coerce potentially-None fields to safe defaults so len() and iteration are safe.
# Vikunja may return null for some nested fields; normalize to empty lists/dicts
# to avoid TypeError when calling len() or iterating.
buckets = details.get("buckets") or []
index = details.get("index", "N/A")
position = details.get("position", "N/A")
priority = details.get("priority", "N/A")
is_favorite = details.get("is_favorite", details.get("is_favourite", "N/A"))
hex_color = details.get("hex_color", "N/A")
assignees = details.get("assignees") or []
attachments = details.get("attachments") or []
cover_image_attachment_id = details.get("cover_image_attachment_id", "N/A")
comments = details.get("comments") or []
labels = details.get("labels") or []
reminders = details.get("reminders") or []
reactions = details.get("reactions") or {}
related_tasks = details.get("related_tasks") or {}
subscription = details.get("subscription") or {}
result = []
result.append(f"ID: {fetched_id}, Identifier: {identifier}, Title: {title}")
result.append(f"Created: {created} by {created_by_str}, Updated: {updated}")
result.append(f"Done: {done} (done_at={done_at}), Percent Done: {percent_done}, Priority: {priority}, Favorite: {is_favorite}")
result.append(f"Due Date: {due_date}, Start Date: {start_date}, End Date: {end_date}, Repeat Mode: {repeat_mode}, Repeat After: {repeat_after}")
result.append(f"Project ID: {project_id}, Bucket ID: {bucket_id}, Buckets: {len(buckets)} items, Index: {index}, Position: {position}")
result.append(f"Labels: {labels_list(labels)}, Hex Color: {hex_color}")
result.append(f"Assignees: {join_names(assignees)}, Attachments Count: {len(attachments)}, Cover Image Attachment ID: {cover_image_attachment_id}")
result.append(f"Comments Count: {len(comments)}, Reminders Count: {len(reminders)}")
result.append(f"Reactions: {reactions}, Related Tasks: {related_tasks}, Subscription: {subscription}")
result.append(f"Description: {description}")
return "\n".join(result)
except requests.exceptions.RequestException as e:
logger.exception("get_task_details: request failed for task_id=%s", task_id)
return f"Error looking up details for task: {e}"
@mcp.tool()
def add_task(project_id: int, title: str, description: str = ""):
"""
Adds a new task to a Vikunja project.
:param project_id: The ID of the project to add the task to.
:param title: The title of the new task.
:param description: An optional description for the task.
"""
if "Authorization" not in session.headers:
return "Please run the 'login' command first."
task_payload = {
"project_id": project_id,
"title": title,
"description": description
}
try:
response = session.put(
f"{VIKUNJA_URL}/api/v1/projects/{project_id}/tasks",
json=task_payload
)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
return f"Error adding task: {e}"
@mcp.tool()
def update_task_title(task_id: int, title: str):
"""
Updates an existing title on a task.
:param task_id: The ID of the task.
:param title: The updated title text.
"""
if "Authorization" not in session.headers:
return "Please run the 'login' command first."
if not (title or "").strip():
return "Title cannot be empty."
task_payload = {
"title": title
}
try:
response = session.post(
f"{VIKUNJA_URL}/api/v1/tasks/{task_id}",
json=task_payload
)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
logger.exception("update_task_title: request failed for task_id=%s", task_id)
return f"Error updating task title: {e}"
@mcp.tool()
def update_task_description(task_id: int, description: str):
"""
Updates an existing description on a task.
:param task_id: The ID of the task.
:param description: The updated description text.
"""
if "Authorization" not in session.headers:
return "Please run the 'login' command first."
if not (description or "").strip():
return "Description cannot be empty."
task_payload = {
"description": description
}
try:
response = session.post(
f"{VIKUNJA_URL}/api/v1/tasks/{task_id}",
json=task_payload
)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
logger.exception("update_task_description: request failed for task_id=%s", task_id)
return f"Error updating task description: {e}"
@mcp.tool()
def delete_task(task_id: int):
"""
Deletes an existing task.
:param task_id: The ID of the task.
"""
if "Authorization" not in session.headers:
return "Please run the 'login' command first."
try:
response = session.delete(f"{VIKUNJA_URL}/api/v1/tasks/{task_id}")
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
logger.exception("delete_task: request failed for task_id=%s", task_id)
return f"Error deleting task: {e}"
@mcp.tool()
def close_task(task_id: int):
"""
Closes (marks as done) a task in Vikunja.
:param task_id: The ID of the task to close.
"""
if "Authorization" not in session.headers:
return "Please run the 'login' command first."
task_payload = {
"done": True
}
try:
response = session.post(
f"{VIKUNJA_URL}/api/v1/tasks/{task_id}",
json=task_payload
)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
return f"Error closing task: {e}"
@mcp.tool()
def comment_task(task_id: int, description: str):
"""
Adds a comment to a task in Vikunja.
:param task_id: The ID of the task to comment on.
:param description: The text of the comment to add.
"""
if "Authorization" not in session.headers:
return "Please run the 'login' command first."
if not (description or "").strip():
return "Comment description cannot be empty."
payload = {"comment": description}
try:
response = session.put(
f"{VIKUNJA_URL}/api/v1/tasks/{task_id}/comments",
json=payload
)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
logger.exception("comment_task: request failed for task_id=%s", task_id)
return f"Error adding comment to task: {e}"
@mcp.tool()
def lookup_comment_task(task_id: int):
"""
Lists comments in a task in Vikunja.
:param task_id: The ID of the task of which to fetch comments.
"""
if "Authorization" not in session.headers:
return "Please run the 'login' command first."
try:
response = session.get(f"{VIKUNJA_URL}/api/v1/tasks/{task_id}/comments")
response.raise_for_status()
comments = response.json()
if not comments:
return "No comments found for this task."
result = []
for comment in comments:
comment_id = comment.get("id", "N/A")
comment_text = comment.get("comment", "N/A")
comment_created = comment.get("created", "N/A")
comment_author = comment.get("author", "N/A")
result.append(f"ID: {comment_id}, Created: {comment_created}, Author: {comment_author}, Comment: {comment_text}")
return "\n".join(result)
except requests.exceptions.RequestException as e:
logger.exception("lookup_comment_task: request failed for task_id=%s", task_id)
return f"Error looking up comments to task: {e}"
@mcp.tool()
def update_comment(task_id: int, comment_id: int, comment: str):
"""
Updates an existing comment on a task.
:param task_id: The ID of the task.
:param comment_id: The ID of the comment to update.
:param comment: The updated comment text.
"""
if "Authorization" not in session.headers:
return "Please run the 'login' command first."
if not (comment or "").strip():
return "Comment cannot be empty."
payload = {"comment": comment}
try:
response = session.post(
f"{VIKUNJA_URL}/api/v1/tasks/{task_id}/comments/{comment_id}",
json=payload
)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
logger.exception("update_comment: request failed for task_id=%s, comment_id=%s", task_id, comment_id)
return f"Error updating comment for task: {e}"
@mcp.tool()
def lookup_project():
"""
Retrieves all projects the user has access to and lists them by name and ID.
"""
if "Authorization" not in session.headers:
return "Please run the 'login' command first."
try:
response = session.get(f"{VIKUNJA_URL}/api/v1/projects")
response.raise_for_status()
projects = response.json()
if not projects:
return "No projects found."
result = []
for project in projects:
project_id = project.get("id", "N/A")
project_title = project.get("title", "Untitled")
result.append(f"ID: {project_id}, Name: {project_title}")
return "\n".join(result)
except requests.exceptions.RequestException as e:
return f"Error retrieving projects: {e}"
@mcp.tool()
def create_project(title: str, description: str = "", is_favorite: bool = False):
"""
Creates a new project in Vikunja.
:param title: The title of the project.
:param description: Optional description for the project.
:param is_favorite: Whether the project should be marked as favorite.
"""
if "Authorization" not in session.headers:
return "Please run the 'login' command first."
if not (title or "").strip():
return "Project title cannot be empty."
payload = {
"title": title,
"description": description,
"is_favourite": bool(is_favorite)
}
try:
response = session.put(
f"{VIKUNJA_URL}/api/v1/projects",
json=payload
)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
logger.exception("create_project: request failed for title=%s", title)
return f"Error creating project: {e}"
@mcp.tool()
def update_task_details(
task_id: int,
title: str = None,
description: str = None,
done: bool = None,
percent_done: int = None,
due_date: str = None,
start_date: str = None,
end_date: str = None,
repeat_mode: int = None,
repeat_after: int = None,
project_id: int = None,
bucket_id: int = None,
priority: int = None,
position: int = None,
is_favorite: bool = None,
hex_color: str = None,
labels: str = None,
):
"""
Updates any primary fields on a task. Only fields provided (not None) will be sent.
If `labels` is provided as a comma-separated string, it will replace existing labels.
"""
if "Authorization" not in session.headers:
return "Please run the 'login' command first."
try:
# First, fetch the existing task to get all its properties
response = session.get(f"{VIKUNJA_URL}/api/v1/tasks/{task_id}")
response.raise_for_status()
payload = response.json()
except requests.exceptions.RequestException as e:
logger.exception("update_task_details: failed to fetch existing task id=%s", task_id)
return f"Error fetching task details before update: {e}"
# Simple helper to set only when value explicitly passed (not None)
def set_if(provided, key, transform=lambda x: x):
if provided is not None:
payload[key] = transform(provided)
set_if(title, "title")
set_if(description, "description")
set_if(done, "done")
set_if(percent_done, "percent_done")
set_if(due_date, "due_date")
set_if(start_date, "start_date")
set_if(end_date, "end_date")
set_if(repeat_mode, "repeat_mode")
set_if(repeat_after, "repeat_after")
set_if(project_id, "project_id")
set_if(bucket_id, "bucket_id")
set_if(priority, "priority")
set_if(position, "position")
set_if(is_favorite, "is_favorite")
set_if(hex_color, "hex_color")
# Prepare labels payload separately because Vikunja's task update endpoint
# does not accept label updates; labels are managed via the labels bulk API.
labels_payload = None
if labels is not None:
# Vikunja expects an array of Label objects, not raw strings.
# Normalize various inputs into a list of label objects like {"title": "..."} or {"id": 123}.
def make_label_obj(item):
# If it's already a dict, try to normalize known keys.
if isinstance(item, dict):
# If key 'name' used, convert to 'title' which Vikunja also accepts.
if 'title' in item or 'id' in item:
return item
if 'name' in item:
new = dict(item)
new['title'] = new.pop('name')
return new
# Unknown dict shape - keep as-is but coerce to string title fallback.
return {k: v for k, v in item.items()}
# Integers are likely label IDs
if isinstance(item, int):
return {'id': item}
# Otherwise treat as a title string
return {'title': str(item)}
if isinstance(labels, str):
parsed = [s.strip() for s in labels.split(',') if s.strip()]
labels_payload = {"labels": [make_label_obj(s) for s in parsed]}
elif isinstance(labels, list):
labels_payload = {"labels": [make_label_obj(i) for i in labels]}
elif isinstance(labels, dict):
labels_payload = {"labels": [make_label_obj(labels)]}
else:
labels_payload = {"labels": [make_label_obj(labels)]}
try:
logger.info("update_task_details: updating task_id=%s with payload keys=%s", task_id, list(payload.keys()))
response = session.post(f"{VIKUNJA_URL}/api/v1/tasks/{task_id}", json=payload)
response.raise_for_status()
task_update_resp = response.json()
# If labels were provided, resolve existing labels and send them to the labels bulk endpoint
if labels_payload is not None:
try:
logger.info("update_task_details: resolving existing labels from %s/labels", VIKUNJA_URL)
# Fetch all existing labels for this user to try to reuse existing label IDs
existing_resp = session.get(f"{VIKUNJA_URL}/api/v1/labels")
existing_resp.raise_for_status()
existing_labels = existing_resp.json() or []
# Build a map of title(lowercase) -> id for quick lookup
title_to_id = {}
for lab in existing_labels:
if not isinstance(lab, dict):
continue
title = lab.get('title') or lab.get('name')
if title:
title_to_id[title.lower()] = lab.get('id')
final_labels = []
for item in labels_payload.get('labels', []):
# If already an id, pass through
if isinstance(item, dict) and 'id' in item:
final_labels.append({'id': item['id']})
continue
# Determine title string
if isinstance(item, dict):
title = item.get('title') or item.get('name') or str(item)
else:
title = str(item)
existing_id = title_to_id.get(title.lower())
if existing_id:
final_labels.append({'id': existing_id})
else:
# Try to create the label first via the /labels endpoint so we have a real id
try:
logger.info("update_task_details: creating missing label '%s' via %s/labels", title, VIKUNJA_URL)
create_resp = session.put(f"{VIKUNJA_URL}/api/v1/labels", json={"title": title, "description": title})
# If creation failed with 4xx/5xx this will raise and be caught below
create_resp.raise_for_status()
created = create_resp.json() or {}
created_id = created.get('id')
if created_id:
title_to_id[title.lower()] = created_id
final_labels.append({'id': created_id})
logger.info("update_task_details: created label '%s' id=%s", title, created_id)
else:
# Fallback: include title/description in payload if no id returned
logger.warning("update_task_details: label created but no id returned for title='%s' - including title in bulk payload", title)
final_labels.append({'title': title, 'description': title})
except requests.exceptions.RequestException as ce:
# Creation failed; log more details and include title in payload as a fallback
resp = getattr(ce, 'response', None)
if resp is not None:
try:
logger.error("update_task_details: label creation returned status=%s body=%s for title=%s", resp.status_code, resp.text, title)
except Exception:
logger.exception("update_task_details: failed to read create label response body for title=%s", title)
else:
logger.exception("update_task_details: label creation request failed for title=%s", title)
# As a last resort try to send title in bulk payload (some servers may accept this)
final_labels.append({'title': title, 'description': title})
final_payload = {'labels': final_labels}
logger.info("update_task_details: updating labels for task_id=%s labels=%s", task_id, final_payload)
lab_resp = session.post(f"{VIKUNJA_URL}/api/v1/tasks/{task_id}/labels/bulk", json=final_payload)
lab_resp.raise_for_status()
logger.info("update_task_details: labels updated successfully for task_id=%s", task_id)
except requests.exceptions.RequestException as le:
resp = getattr(le, 'response', None)
if resp is not None:
try:
logger.error("update_task_details: labels update returned status=%s body=%s", resp.status_code, resp.text)
except Exception:
logger.exception("update_task_details: failed to read labels response body for task_id=%s", task_id)
logger.exception("update_task_details: labels update request failed for task_id=%s", task_id)
return task_update_resp
except requests.exceptions.RequestException as e:
logger.exception("update_task_details: request failed for task_id=%s", task_id)
return f"Error updating task details: {e}"
if __name__ == "__main__":
print("--- Vikunja MCP Client ---")
print("Available commands: login, search_tasks, add_task, close_task, lookup_project, help, exit")
mcp.run()