forked from Zie619/n8n-workflows
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathimport_workflows.py
More file actions
204 lines (163 loc) · 7.12 KB
/
import_workflows.py
File metadata and controls
204 lines (163 loc) · 7.12 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
#!/usr/bin/env python3
"""
N8N Workflow Importer
Python replacement for import-workflows.sh with better error handling and progress tracking.
"""
import json
import subprocess
import sys
from pathlib import Path
from typing import List, Dict, Any
from create_categories import categorize_by_filename
def load_categories():
"""Load the search categories file."""
try:
with open('context/search_categories.json', 'r', encoding='utf-8') as f:
return json.load(f)
except (FileNotFoundError, json.JSONDecodeError):
return []
def save_categories(data):
"""Save the search categories file."""
with open('context/search_categories.json', 'w', encoding='utf-8') as f:
json.dump(data, f, indent=2, ensure_ascii=False)
class WorkflowImporter:
"""Import n8n workflows with progress tracking and error handling."""
def __init__(self, workflows_dir: str = "workflows"):
self.workflows_dir = Path(workflows_dir)
self.imported_count = 0
self.failed_count = 0
self.errors = []
def validate_workflow(self, file_path: Path) -> bool:
"""Validate workflow JSON before import."""
try:
with open(file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
# Basic validation
if not isinstance(data, dict):
return False
# Check required fields
required_fields = ['nodes', 'connections']
for field in required_fields:
if field not in data:
return False
return True
except (json.JSONDecodeError, FileNotFoundError, PermissionError):
return False
def import_workflow(self, file_path: Path) -> bool:
"""Import a single workflow file."""
try:
# Validate first
if not self.validate_workflow(file_path):
self.errors.append(f"Invalid JSON: {file_path.name}")
return False
# Run n8n import command
result = subprocess.run([
'npx', 'n8n', 'import:workflow',
f'--input={file_path}'
], capture_output=True, text=True, timeout=30)
if result.returncode == 0:
print(f"✅ Imported: {file_path.name}")
# Categorize the workflow and update search_categories.json
suggested_category = categorize_by_filename(file_path.name)
all_workflows_data = load_categories()
found = False
for workflow_entry in all_workflows_data:
if workflow_entry.get('filename') == file_path.name:
workflow_entry['category'] = suggested_category
found = True
break
if not found:
# Add new workflow entry if not found (e.g., first import)
all_workflows_data.append({
"filename": file_path.name,
"category": suggested_category,
"name": file_path.stem, # Assuming workflow name is filename without extension
"description": "", # Placeholder, can be updated manually
"nodes": [] # Placeholder, can be updated manually
})
save_categories(all_workflows_data)
print(f" Categorized '{file_path.name}' as '{suggested_category or 'Uncategorized'}'")
return True
else:
error_msg = result.stderr.strip() or result.stdout.strip()
self.errors.append(f"Import failed for {file_path.name}: {error_msg}")
print(f"❌ Failed: {file_path.name}")
return False
except subprocess.TimeoutExpired:
self.errors.append(f"Timeout importing {file_path.name}")
print(f"⏰ Timeout: {file_path.name}")
return False
except Exception as e:
self.errors.append(f"Error importing {file_path.name}: {str(e)}")
print(f"❌ Error: {file_path.name} - {str(e)}")
return False
def get_workflow_files(self) -> List[Path]:
"""Get all workflow JSON files."""
if not self.workflows_dir.exists():
print(f"❌ Workflows directory not found: {self.workflows_dir}")
return []
json_files = list(self.workflows_dir.glob("*.json"))
if not json_files:
print(f"❌ No JSON files found in: {self.workflows_dir}")
return []
return sorted(json_files)
def import_all(self) -> Dict[str, Any]:
"""Import all workflow files."""
workflow_files = self.get_workflow_files()
total_files = len(workflow_files)
if total_files == 0:
return {"success": False, "message": "No workflow files found"}
print(f"🚀 Starting import of {total_files} workflows...")
print("-" * 50)
for i, file_path in enumerate(workflow_files, 1):
print(f"[{i}/{total_files}] Processing {file_path.name}...")
if self.import_workflow(file_path):
self.imported_count += 1
else:
self.failed_count += 1
# Summary
print("\n" + "=" * 50)
print(f"📊 Import Summary:")
print(f"✅ Successfully imported: {self.imported_count}")
print(f"❌ Failed imports: {self.failed_count}")
print(f"📁 Total files: {total_files}")
if self.errors:
print(f"\n❌ Errors encountered:")
for error in self.errors[:10]: # Show first 10 errors
print(f" • {error}")
if len(self.errors) > 10:
print(f" ... and {len(self.errors) - 10} more errors")
return {
"success": self.failed_count == 0,
"imported": self.imported_count,
"failed": self.failed_count,
"total": total_files,
"errors": self.errors
}
def check_n8n_available() -> bool:
"""Check if n8n CLI is available."""
try:
result = subprocess.run(
['npx', 'n8n', '--version'],
capture_output=True, text=True, timeout=10
)
return result.returncode == 0
except (subprocess.TimeoutExpired, FileNotFoundError):
return False
def main():
"""Main entry point."""
sys.stdout.reconfigure(encoding='utf-8')
print("🔧 N8N Workflow Importer")
print("=" * 40)
# Check if n8n is available
if not check_n8n_available():
print("❌ n8n CLI not found. Please install n8n first:")
print(" npm install -g n8n")
sys.exit(1)
# Create importer and run
importer = WorkflowImporter()
result = importer.import_all()
# Exit with appropriate code
sys.exit(0 if result["success"] else 1)
if __name__ == "__main__":
main()