-
Notifications
You must be signed in to change notification settings - Fork 5
Expand file tree
/
Copy pathhancock_agent.py
More file actions
1394 lines (1239 loc) · 63.9 KB
/
hancock_agent.py
File metadata and controls
1394 lines (1239 loc) · 63.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env python3
# Copyright (c) 2025 CyberViser. All Rights Reserved.
# Licensed under the CyberViser Proprietary License — see LICENSE for details.
# Unauthorized commercial use, redistribution, or AI training is prohibited.
"""
Hancock Agent — Multi-backend Inference (Ollama → NVIDIA NIM/OpenAI fallback)
CyberViser | hancock_agent.py
Two modes:
python hancock_agent.py → interactive CLI chat
python hancock_agent.py --server → REST API server (port 5000)
CLI mode commands:
/mode pentest — switch to Pentest Specialist persona
/mode soc — switch to SOC Analyst persona
/mode auto — combined persona (default)
/mode code — security code generation (Qwen Coder 32B)
/mode ciso — CISO strategy, compliance & board reporting
/mode sigma — Sigma detection rule authoring
/mode yara — YARA malware detection rule authoring
/mode ioc — IOC threat intelligence enrichment
/clear — clear conversation history
/history — show history
/model <id> — switch model
/exit — quit
Set your key:
export NVIDIA_API_KEY="nvapi-..."
or pass --api-key "nvapi-..."
"""
from __future__ import annotations
import argparse
import hmac
import json
import logging
import os
import sys
import time
import readline # noqa: F401 — enables arrow-key history in CLI
from hancock_constants import VERSION, require_openai, OPENAI_IMPORT_ERROR_MSG
from monitoring.logging_config import (
get_request_id,
init_flask_logging,
)
logger = logging.getLogger(__name__)
try:
from openai import OpenAI
except ImportError: # allow import without OpenAI; client factories enforce requirement at runtime
OpenAI = None # type: ignore
# ── Hancock identity ──────────────────────────────────────────────────────────
PENTEST_SYSTEM = """You are Hancock, an elite penetration tester and offensive security specialist built by CyberViser.
Your expertise covers:
- Reconnaissance: OSINT, subdomain enumeration, port scanning (nmap, amass, subfinder)
- Web Application Testing: SQLi, XSS, SSRF, auth bypass, IDOR, JWT attacks (Burp Suite, sqlmap)
- Network Exploitation: Metasploit, lateral movement, credential attacks (CrackMapExec, impacket)
- Post-Exploitation: Privilege escalation (LinPEAS, WinPEAS, GTFOBins), persistence, pivoting
- Vulnerability Analysis: CVE research, CVSS scoring, PoC analysis, patch prioritization
- Reporting: PTES methodology, professional finding write-ups, executive summaries
You operate STRICTLY within authorized scope. You always:
1. Confirm authorization before suggesting active techniques
2. Recommend responsible disclosure and remediation
3. Reference real tools, commands, and CVEs with accuracy
4. Provide actionable, technically precise answers
You are Hancock. You are methodical, precise, and professional."""
SOC_SYSTEM = """You are Hancock, an expert SOC Tier-2/3 analyst and incident responder built by CyberViser.
Your expertise covers:
- Alert Triage: Classify and prioritize SIEM/EDR/IDS/IPS alerts using MITRE ATT&CK mapping
- Log Analysis: Windows Event Logs (4624/4625/4688/7045), Syslog, Apache/Nginx, firewall, DNS
- SIEM Queries: Splunk SPL, Elastic KQL, Microsoft Sentinel KQL — writing precise detection queries
- Incident Response: NIST SP 800-61 / PICERL (Prepare, Identify, Contain, Eradicate, Recover, Lessons Learned)
- Threat Hunting: Hypothesis-driven hunting, IOC sweeps, behavioral analytics, UEBA
- IOC Analysis: Hash analysis, domain/IP reputation, WHOIS, passive DNS, file/process/network pivoting
- Detection Engineering: Sigma rules, YARA rules, custom alerts, tuning FP reduction
- Malware Triage: Static (strings, PE headers, imports) + dynamic (sandbox detonation, behavior analysis)
- Threat Intelligence: MISP, OpenCTI, TAXII/STIX, APT group TTPs, attribution
You always:
1. Follow the PICERL framework for incident response
2. Document findings with timestamps, evidence, and chain of custody
3. Write precise detection logic (Sigma, SPL, KQL) with comments
4. Escalate appropriately and communicate clearly to stakeholders
5. Stay calm under pressure — triage by impact and urgency
You are Hancock. You are methodical, calm, and thorough."""
AUTO_SYSTEM = """You are Hancock, an elite cybersecurity specialist built by CyberViser. You operate as both a penetration tester and SOC analyst, depending on context.
**Pentest Mode:** Reconnaissance, exploitation, post-exploitation, CVE analysis, Metasploit, Burp Suite, authorized engagements only.
**SOC Mode:** Alert triage, SIEM queries (Splunk SPL / Elastic KQL / Sentinel KQL), incident response (PICERL), threat hunting, detection engineering, IOC analysis.
You always:
- Operate within authorized scope
- Follow PICERL for IR and PTES for pentesting
- Provide accurate, actionable technical guidance
- Reference real tools, real CVEs, and real detection logic
You are Hancock. Built by CyberViser."""
CODE_SYSTEM = """You are Hancock Code, CyberViser's expert security code assistant powered by Qwen 2.5 Coder 32B.
You write production-quality security tooling code in Python, Bash, PowerShell, and Go.
Your specialties:
- Security automation: scanners, parsers, log analyzers, alert enrichers
- Exploit PoC code (authorized research only — always add warnings)
- SIEM query writing: Splunk SPL, Elastic KQL, Sentinel KQL, Sigma YAML
- Detection scripts: YARA rules, Suricata/Snort rules, custom IDS signatures
- Pentest helpers: recon scripts, payload generators, C2 scaffolding (authorized only)
- Secure code review: identify vulns (OWASP Top 10, CWE), suggest fixes with examples
- CTF solvers: rev, pwn, web, crypto — with explanations
You always:
1. Add authorization/legal warnings to offensive tooling
2. Include error handling, type hints, and docstrings
3. Explain what the code does and any security implications
4. Suggest safer alternatives when relevant
You are Hancock Code. Precision over verbosity. Ship working code."""
CISO_SYSTEM = """You are Hancock CISO, CyberViser's AI-powered Chief Information Security Officer advisor.
Your expertise covers:
- Risk Management: NIST RMF, ISO 27001/27005, FAIR quantitative risk analysis, risk register management
- Compliance & Frameworks: SOC 2, ISO 27001, PCI-DSS, HIPAA, GDPR, DORA, NIST CSF 2.0, CIS Controls v8
- Board & Executive Reporting: security posture summaries, risk-adjusted metrics, budget justification, KRI/KPI dashboards
- Security Program Strategy: maturity assessments (CMMI, C2M2), roadmap planning, control gap analysis
- Vendor & Third-Party Risk: TPRM frameworks, questionnaire assessment, supply chain risk, SLA review
- Incident Communication: breach notification drafting, regulatory reporting, stakeholder messaging
- Security Architecture Review: zero trust, cloud security posture (CSPM), identity governance, data classification
- Budget & ROI: security spend optimization, tool consolidation, make-vs-buy analysis, cyber insurance guidance
You always:
1. Translate technical risk into business impact (financial, reputational, regulatory)
2. Prioritize by likelihood × impact × cost-to-remediate
3. Align recommendations to the organization's risk appetite and industry sector
4. Provide executive-ready language — clear, concise, no jargon unless requested
5. Reference specific control numbers (CIS Control 5.1, NIST CSF ID.AM-1) where relevant
You are Hancock CISO. You speak business and security fluently."""
SIGMA_SYSTEM = """You are Hancock Sigma, CyberViser's expert detection engineer specializing in Sigma rule authoring.
Your expertise covers:
- Sigma rule syntax (title, id, status, description, references, author, date, modified, tags, logsource, detection, falsepositives, level, fields)
- Log sources: Windows Event Logs, Sysmon, Linux auditd, cloud (AWS CloudTrail, Azure Activity, GCP Audit), web proxy, firewall, DNS, EDR
- MITRE ATT&CK tagging: correct attack.tXXXX technique and sub-technique IDs
- Detection logic: selection/filter patterns, keywords, regex, aggregations, near/timeframe conditions
- False positive analysis and tuning recommendations
- Converting IOCs, TTPs, threat intel reports → detection rules
- Sigma rule quality: specificity vs. coverage trade-offs, noise reduction
You always:
1. Output valid, well-formed SIGMA YAML with all required fields
2. Include `falsepositives` and `level` (informational/low/medium/high/critical)
3. Tag correctly with MITRE ATT&CK technique IDs in the `tags` field
4. Add a `filter` condition when the detection is prone to noise
5. After the rule, briefly explain what it detects and any tuning notes
You are Hancock Sigma. Every rule you write is ready to deploy."""
SYSTEMS = {
"pentest": PENTEST_SYSTEM,
"soc": SOC_SYSTEM,
"auto": AUTO_SYSTEM,
"code": CODE_SYSTEM,
"ciso": CISO_SYSTEM,
"sigma": SIGMA_SYSTEM,
"yara": None, # filled below after YARA_SYSTEM is defined
}
YARA_SYSTEM = """You are Hancock YARA, CyberViser's expert malware analyst and detection engineer.
Your expertise covers:
- YARA rule syntax: meta, strings ($hex, $ascii, $regex, $wide, $nocase), condition logic
- Malware families: ransomware, RATs, stealers, loaders, botnets, APT tooling
- File-format artefacts: PE headers, macros, packer signatures, shellcode patterns
- Memory scanning: process injection, reflective loading, hollow process indicators
- YARA best practices: performance (all of them at filesize, pe.imports), specificity vs coverage
- YARA modules: pe, elf, math, hash, dotnet, magic, androguard
You always:
1. Output a complete, syntactically valid YARA rule with meta (description, author, date, hash if known)
2. Use multiple string conditions to reduce false positives
3. Add a condition that limits to relevant file type/size when possible
4. After the rule, explain what it detects and list any known false positive sources
You are Hancock YARA. Every rule you write is ready to run with `yara64 -r rule.yar /path`."""
SYSTEMS["yara"] = YARA_SYSTEM
IOC_SYSTEM = """You are Hancock IOC, CyberViser's threat intelligence analyst.
When given an indicator of compromise (IP address, domain, URL, file hash, or email),
you provide a structured enrichment report covering:
- Indicator type and classification
- Threat intelligence context (known malware families, threat actors, campaigns)
- MITRE ATT&CK techniques associated with this indicator
- Risk score (1–10) with justification
- Recommended defensive actions (block, monitor, investigate)
- Relevant CVEs or GHSA advisories if applicable
Format your response as a clear, structured threat intel report."""
SYSTEMS["ioc"] = IOC_SYSTEM
OSINT_SYSTEM = """You are Hancock OSINT, CyberViser's expert geolocation intelligence analyst.
Your expertise covers:
- IP and domain geolocation: multi-source lookups (ip-api.com, ipinfo.io, ipapi.co), ASN/ISP/hosting identification
- Infrastructure mapping: geographic clustering of threat actor infrastructure, ASN hopping patterns, bulletproof hosting detection
- Threat actor tracking: correlating IP/domain indicators to known campaigns, attribution hints, MITRE ATT&CK techniques
- Predictive location analytics: forecasting future threat infrastructure based on historical patterns, country/ASN preferences, rotation intervals
- Risk scoring: assessing IPs/domains using bulletproof ASN lists, country cyber-risk indices, proxy/VPN/Tor flags
- OSINT pivoting: WHOIS analysis, passive DNS, certificate transparency logs, Shodan/Censys correlation
You always:
1. Provide structured, actionable intelligence reports with confidence levels
2. Cite data sources and note when information may be outdated
3. Map findings to MITRE ATT&CK where applicable (T1583, T1584, T1090, etc.)
4. Flag high-risk indicators (Tor exits, bulletproof hosters, known threat actor infrastructure)
5. Recommend defensive actions: block, monitor, sinkhole, or investigate
You are Hancock OSINT. Every analysis you produce is intelligence-grade."""
SYSTEMS["osint"] = OSINT_SYSTEM
DEFAULT_MODE = "auto"
# Keep backward-compatible alias
HANCOCK_SYSTEM = AUTO_SYSTEM
NIM_BASE_URL = "https://integrate.api.nvidia.com/v1"
OLLAMA_BASE_URL = os.getenv("OLLAMA_BASE_URL", "http://localhost:11434") + "/v1"
DEFAULT_MODEL = os.getenv("OLLAMA_MODEL", "llama3.1:8b")
CODER_MODEL = os.getenv("OLLAMA_CODER_MODEL", "qwen2.5-coder:7b")
PROCESS_STARTED_AT_UNIX = int(time.time())
PROCESS_STARTED_AT_MONOTONIC = time.monotonic()
# ── Available models ──────────────────────────────────────────────────────────
MODELS = {
# Ollama models (local)
"llama3.1": "llama3.1:8b",
"llama3.2": "llama3.2:3b",
"mistral": "mistral:7b",
"qwen-coder": "qwen2.5-coder:7b",
"gemma3": "gemma3:12b",
# NVIDIA NIM models (used when HANCOCK_LLM_BACKEND=nvidia)
"nim-mistral": "mistralai/mistral-7b-instruct-v0.3",
"nim-qwen": "qwen/qwen2.5-coder-32b-instruct",
"nim-llama": "meta/llama-3.1-8b-instruct",
}
# ── OpenAI fallback ───────────────────────────────────────────────────────────
OPENAI_MODEL = os.getenv("OPENAI_MODEL", "gpt-4o-mini")
OPENAI_CODER_MODEL = os.getenv("OPENAI_CODER_MODEL", "gpt-4o")
BANNER = """
╔══════════════════════════════════════════════════════════╗
║ ██╗ ██╗ █████╗ ███╗ ██╗ ██████╗ ██████╗ ██████╗██╗ ║
║ ██║ ██║██╔══██╗████╗ ██║██╔════╝██╔═══██╗██╔════╝██║ ║
║ ███████║███████║██╔██╗ ██║██║ ██║ ██║██║ ██║ ║
║ ██╔══██║██╔══██║██║╚██╗██║██║ ██║ ██║██║ ██╚╗║
║ ██║ ██║██║ ██║██║ ╚████║╚██████╗╚██████╔╝╚██████╗╚═╝║║
║ ╚═╝ ╚═╝╚═╝ ╚═╝╚═╝ ╚═══╝ ╚═════╝ ╚═════╝ ╚═════╝ ║
║ CyberViser — Pentest + SOC + CISO + Code ║
║ Llama 3.1 · Qwen 2.5 Coder · Ollama (local) ║
╚══════════════════════════════════════════════════════════╝
Modes : /mode pentest | soc | auto | code | ciso | sigma | yara | ioc | osint
Models: /model llama3.1 | llama3.2 | mistral | qwen-coder | gemma3
Other : /clear /history /exit
"""
def require_openai_or_exit() -> None:
"""Ensure OpenAI dependency is available or exit with a clear message."""
try:
require_openai(OpenAI)
except ImportError:
sys.exit(OPENAI_IMPORT_ERROR_MSG)
def make_ollama_client() -> OpenAI:
"""Returns an OpenAI-compatible client pointed at the local Ollama server."""
require_openai_or_exit()
return OpenAI(base_url=OLLAMA_BASE_URL, api_key="ollama")
def make_client(api_key: str) -> OpenAI:
"""Returns an OpenAI-compatible client pointed at NVIDIA NIM (legacy)."""
require_openai_or_exit()
return OpenAI(base_url=NIM_BASE_URL, api_key=api_key)
def make_openai_client() -> OpenAI | None:
"""Returns an OpenAI client if credentials are available, else None.
Unlike the Ollama/NIM factories, OpenAI is a best-effort fallback, so the
absence of the dependency simply disables this path.
"""
if OpenAI is None:
return None
key = os.getenv("OPENAI_API_KEY", "")
if not key or key.startswith("sk-your"):
return None
return OpenAI(api_key=key, organization=os.getenv("OPENAI_ORG_ID") or None)
def chat(client: OpenAI, history: list[dict], model: str, stream: bool = True,
system_prompt: str | None = None) -> str:
system = system_prompt or HANCOCK_SYSTEM
messages = [{"role": "system", "content": system}] + history
try:
return _do_chat(client, messages, model, stream)
except Exception as primary_err:
# Auto-fallback to OpenAI if primary backend (Ollama or NIM) fails
fallback = make_openai_client()
if fallback:
print(f"\n[Hancock] Backend error ({primary_err}) — falling back to OpenAI {OPENAI_MODEL}...")
return _do_chat(fallback, messages, OPENAI_MODEL, stream)
raise
def _do_chat(client: OpenAI, messages: list[dict], model: str, stream: bool) -> str:
if stream:
response_text = ""
print("\n\033[1;32mHancock:\033[0m ", end="", flush=True)
stream_resp = client.chat.completions.create(
model=model, messages=messages, max_tokens=1024,
temperature=0.7, top_p=0.95, stream=True,
)
for chunk in stream_resp:
if chunk.choices and chunk.choices[0].delta.content:
delta = chunk.choices[0].delta.content
print(delta, end="", flush=True)
response_text += delta
print()
return response_text
resp = client.chat.completions.create(
model=model, messages=messages, max_tokens=1024,
temperature=0.7, top_p=0.95,
)
return _extract_content(resp)
def _extract_content(resp) -> str:
"""Safely extract message content from an OpenAI chat response."""
if not resp.choices:
return ""
return resp.choices[0].message.content or ""
# ── CLI mode ──────────────────────────────────────────────────────────────────
def run_cli(client: OpenAI, model: str):
print(BANNER)
print(f" Model : {model}")
backend = os.getenv("HANCOCK_LLM_BACKEND", "ollama").lower()
if backend == "ollama":
print(f" Endpoint: {OLLAMA_BASE_URL}")
elif backend == "nvidia":
print(f" Endpoint: {NIM_BASE_URL}")
print(f" Mode : auto (Pentest + SOC)")
print()
history: list[dict] = []
current_mode = DEFAULT_MODE
while True:
try:
user_input = input("\033[1;34m[You]\033[0m ").strip()
except (EOFError, KeyboardInterrupt):
print("\n[Hancock] Signing off. Stay in scope.")
break
if not user_input:
continue
if user_input.lower() in ("/exit", "/quit", "exit", "quit"):
print("[Hancock] Signing off. Stay in scope.")
break
if user_input == "/clear":
history.clear()
print("[Hancock] Conversation cleared.")
continue
if user_input == "/history":
for i, m in enumerate(history):
role = m["role"].upper()
print(f" [{i}] {role}: {m['content'][:80]}...")
continue
if user_input.startswith("/mode"):
parts = user_input.split()
if len(parts) == 2 and parts[1] in SYSTEMS:
current_mode = parts[1]
history.clear()
# Auto-switch to coder model when entering code mode
if current_mode == "code" and model == DEFAULT_MODEL:
model = CODER_MODEL
print(f"[Hancock] Auto-switched to {CODER_MODEL} for code mode.")
label = {
"pentest": "Pentest Specialist 🔴",
"soc": "SOC Analyst 🔵",
"auto": "Auto (Pentest+SOC) ⚡",
"code": "Code Assistant 💻 (Qwen 2.5 Coder 32B)",
"ciso": "CISO Advisor 👔",
}
print(f"[Hancock] Switched to {label[current_mode]} — history cleared.")
else:
print("[Hancock] Usage: /mode pentest | /mode soc | /mode auto | /mode code | /mode ciso | /mode sigma | /mode yara | /mode ioc")
continue
if user_input.startswith("/model "):
alias = user_input[7:].strip()
model = MODELS.get(alias, alias) # resolve alias or use raw model ID
print(f"[Hancock] Switched to model: {model}")
continue
history.append({"role": "user", "content": user_input})
try:
response = chat(client, history, model, stream=True,
system_prompt=SYSTEMS[current_mode])
history.append({"role": "assistant", "content": response})
except Exception as e:
print(f"\033[1;31m[Error]\033[0m {e}")
history.pop() # remove failed user message
# ── REST API server mode ──────────────────────────────────────────────────────
def build_app(client, model: str):
"""Build and return the Flask app (used by both run_server and tests)."""
try:
from flask import Flask, request, jsonify, Response, stream_with_context
except ImportError:
sys.exit("Run: .venv/bin/pip install flask")
app = Flask("hancock")
init_flask_logging(app)
backend = os.getenv("HANCOCK_LLM_BACKEND", "ollama").lower()
def _mode_from_request(default_mode: str = "n/a") -> str:
payload = request.get_json(silent=True) if request.is_json else {}
if isinstance(payload, dict):
return str(payload.get("mode", default_mode))
return default_mode
def _error_response(message: str, status_code: int, mode: str = "n/a"):
request_id = get_request_id()
logger.warning(
"request_error",
extra={
"event": "request_error",
"endpoint": request.path,
"mode": mode,
"backend": backend,
"status": status_code,
"request_id": request_id,
"error": message,
},
)
return jsonify({"error": message, "request_id": request_id}), status_code
# ── Metrics counters ──────────────────────────────────────────────────────
import threading
_metrics_lock = threading.Lock()
_metrics: dict = {
"requests_total": 0,
"errors_total": 0,
"requests_by_endpoint": {},
"requests_by_mode": {},
}
def _inc(key: str, label: str = ""):
with _metrics_lock:
if label:
_metrics[key][label] = _metrics[key].get(label, 0) + 1
else:
_metrics[key] += 1
# ── Auth + rate limiting ───────────────────────────────────────────────────
_HANCOCK_API_KEY = os.getenv("HANCOCK_API_KEY", "")
_rate_counts: dict = {} # ip → [timestamp, ...]
_RATE_LIMIT = int(os.getenv("HANCOCK_RATE_LIMIT", "60")) # requests/min
_RATE_WINDOW = 60 # seconds
_ENABLE_INTERNAL_DIAGNOSTICS = os.getenv(
"HANCOCK_ENABLE_INTERNAL_DIAGNOSTICS", "false"
).strip().lower() in {"1", "true", "yes", "on"}
def _prune_rate_counts(now: float) -> None:
"""Drop IP buckets whose timestamps have all aged out of the current window."""
stale_ips = [
bucket_ip
for bucket_ip, bucket_timestamps in _rate_counts.items()
if not any(now - timestamp < _RATE_WINDOW for timestamp in bucket_timestamps)
]
for bucket_ip in stale_ips:
del _rate_counts[bucket_ip]
def _check_auth_and_rate() -> "tuple[bool, str, int]":
"""Returns (ok, error_message, remaining). Empty HANCOCK_API_KEY disables auth."""
import time
# Auth check (skip if key not configured)
if _HANCOCK_API_KEY:
auth = request.headers.get("Authorization", "")
token = auth.removeprefix("Bearer ").strip()
if not hmac.compare_digest(token, _HANCOCK_API_KEY):
return False, "Unauthorized: provide Authorization: Bearer <HANCOCK_API_KEY>", 0
# In-memory rate limiter (per source IP) — evicts stale entries to prevent memory leak
now = time.time()
ip = request.remote_addr or "unknown"
timestamps = _rate_counts.get(ip, [])
timestamps = [t for t in timestamps if now - t < _RATE_WINDOW]
if len(timestamps) >= _RATE_LIMIT:
return False, f"Rate limit exceeded: {_RATE_LIMIT} requests/min", 0
timestamps.append(now)
_rate_counts[ip] = timestamps
# Evict IPs with no recent requests (keep dict bounded)
if len(_rate_counts) > 10_000:
_prune_rate_counts(now)
return True, "", _RATE_LIMIT - len(timestamps)
@app.after_request
def _add_rate_headers(response):
"""Attach X-RateLimit-* headers to every response."""
import time
ip = request.remote_addr or "unknown"
now = time.time()
_prune_rate_counts(now)
recent = [t for t in _rate_counts.get(ip, []) if now - t < _RATE_WINDOW]
remaining = max(0, _RATE_LIMIT - len(recent))
response.headers["X-RateLimit-Limit"] = str(_RATE_LIMIT)
response.headers["X-RateLimit-Remaining"] = str(remaining)
response.headers["X-RateLimit-Window"] = "60s"
return response
@app.route("/health", methods=["GET"])
def health():
return jsonify({
"status": "ok", "agent": "Hancock",
"model": model, "company": "CyberViser",
"modes": ["pentest", "soc", "auto", "code", "ciso", "sigma", "yara", "ioc", "osint"],
"models_available": MODELS,
"endpoints": ["/v1/chat", "/v1/ask", "/v1/triage",
"/v1/hunt", "/v1/respond", "/v1/code",
"/v1/ciso", "/v1/sigma", "/v1/yara", "/v1/ioc",
"/v1/geolocate", "/v1/predict-locations", "/v1/map-infrastructure",
"/v1/agents", "/v1/webhook", "/metrics", "/internal/diagnostics"],
})
@app.route("/metrics", methods=["GET"])
def metrics_endpoint():
"""Prometheus-compatible plain-text metrics."""
with _metrics_lock:
snap = {
"requests_total": _metrics["requests_total"],
"errors_total": _metrics["errors_total"],
"by_endpoint": dict(_metrics["requests_by_endpoint"]),
"by_mode": dict(_metrics["requests_by_mode"]),
}
lines = [
"# HELP hancock_requests_total Total API requests",
"# TYPE hancock_requests_total counter",
f'hancock_requests_total {snap["requests_total"]}',
"# HELP hancock_errors_total Total 4xx/5xx errors",
"# TYPE hancock_errors_total counter",
f'hancock_errors_total {snap["errors_total"]}',
"# HELP hancock_requests_by_endpoint Requests per endpoint",
"# TYPE hancock_requests_by_endpoint counter",
]
for ep, cnt in snap["by_endpoint"].items():
lines.append(f'hancock_requests_by_endpoint{{endpoint="{ep}"}} {cnt}')
lines += [
"# HELP hancock_requests_by_mode Requests per mode",
"# TYPE hancock_requests_by_mode counter",
]
for m, cnt in snap["by_mode"].items():
lines.append(f'hancock_requests_by_mode{{mode="{m}"}} {cnt}')
return Response("\n".join(lines) + "\n", mimetype=f"text/plain; version={VERSION}")
@app.route("/internal/diagnostics", methods=["GET"])
def internal_diagnostics_endpoint():
"""Auth-gated runtime diagnostics endpoint."""
if not _ENABLE_INTERNAL_DIAGNOSTICS:
return _error_response("Not found", 404)
if not _HANCOCK_API_KEY:
_inc("errors_total")
return _error_response(
"Internal diagnostics requires HANCOCK_API_KEY authentication to be configured",
403,
)
ok, err, _ = _check_auth_and_rate()
if not ok:
_inc("errors_total")
return _error_response(err, 401 if "Unauthorized" in err else 429)
_inc("requests_total")
_inc("requests_by_endpoint", "/internal/diagnostics")
uptime_seconds = max(0, int(time.monotonic() - PROCESS_STARTED_AT_MONOTONIC))
return jsonify({
"backend_mode": backend,
"current_model": model,
"model_aliases": dict(MODELS),
"rate_limit": {
"requests_per_minute": _RATE_LIMIT,
"window_seconds": _RATE_WINDOW,
"auth_enabled": bool(_HANCOCK_API_KEY),
},
"uptime": {
"seconds": uptime_seconds,
"started_at_unix": PROCESS_STARTED_AT_UNIX,
},
})
@app.route("/v1/agents", methods=["GET"])
def agents_endpoint():
"""Return available agent system prompts for automation."""
ok, err, _ = _check_auth_and_rate()
if not ok:
_inc("errors_total")
return _error_response(err, 401 if "Unauthorized" in err else 429)
_inc("requests_total"); _inc("requests_by_endpoint", "/v1/agents")
prompts = {name: prompt for name, prompt in SYSTEMS.items() if prompt}
return jsonify({
"agents": prompts,
"default_mode": DEFAULT_MODE,
"model": model,
})
@app.route("/v1/chat", methods=["POST"])
def chat_endpoint():
ok, err, _ = _check_auth_and_rate()
if not ok:
_inc("errors_total")
return _error_response(err, 401 if "Unauthorized" in err else 429, mode=_mode_from_request("auto"))
_inc("requests_total"); _inc("requests_by_endpoint", "/v1/chat")
data = request.get_json(force=True)
user_msg = data.get("message", "")
history = data.get("history", [])
stream = data.get("stream", False)
mode = data.get("mode", "auto")
if not user_msg:
_inc("errors_total"); return _error_response("message required", 400, mode=mode)
if mode not in SYSTEMS and mode != "auto":
_inc("errors_total"); return _error_response(
f"invalid mode '{mode}'; valid: {list(SYSTEMS.keys())}", 400, mode=mode
)
if not isinstance(history, list):
_inc("errors_total"); return _error_response("history must be a list", 400, mode=mode)
_inc("requests_by_mode", mode)
system = SYSTEMS.get(mode, AUTO_SYSTEM)
history.append({"role": "user", "content": user_msg})
messages = [{"role": "system", "content": system}] + history
if stream:
def generate():
full = ""
try:
stream_resp = client.chat.completions.create(
model=model, messages=messages, max_tokens=1024,
temperature=0.7, top_p=0.95, stream=True,
)
for chunk in stream_resp:
if chunk.choices and chunk.choices[0].delta.content:
delta = chunk.choices[0].delta.content
full += delta
yield f"data: {json.dumps({'delta': delta})}\n\n"
except Exception as exc:
logger.error(
"streaming_error",
extra={
"event": "streaming_error",
"endpoint": request.path,
"mode": mode,
"request_id": get_request_id(),
"error": str(exc),
},
)
yield f"data: {json.dumps({'error': str(exc), 'request_id': get_request_id()})}\n\n"
yield f"data: {json.dumps({'done': True, 'response': full})}\n\n"
return Response(stream_with_context(generate()), mimetype="text/event-stream")
resp = client.chat.completions.create(
model=model, messages=messages, max_tokens=1024,
temperature=0.7, top_p=0.95,
)
response_text = _extract_content(resp)
if not response_text:
_inc("errors_total"); return _error_response("model returned empty response", 502, mode=mode)
return jsonify({"response": response_text, "model": model, "mode": mode})
@app.route("/v1/ask", methods=["POST"])
def ask_endpoint():
"""Simple single-shot endpoint — no history needed."""
ok, err, _ = _check_auth_and_rate()
if not ok:
_inc("errors_total"); return _error_response(
err, 401 if "Unauthorized" in err else 429, mode=_mode_from_request("auto")
)
_inc("requests_total"); _inc("requests_by_endpoint", "/v1/ask")
data = request.get_json(force=True)
question = data.get("question", "")
mode = data.get("mode", "auto")
if not question:
_inc("errors_total"); return _error_response("question required", 400, mode=mode)
system = SYSTEMS.get(mode, AUTO_SYSTEM)
messages = [
{"role": "system", "content": system},
{"role": "user", "content": question},
]
resp = client.chat.completions.create(
model=model, messages=messages, max_tokens=1024,
temperature=0.7, top_p=0.95,
)
answer = _extract_content(resp)
if not answer:
_inc("errors_total"); return _error_response("model returned empty response", 502, mode=mode)
return jsonify({"answer": answer, "model": model, "mode": mode})
@app.route("/v1/triage", methods=["POST"])
def triage_endpoint():
"""SOC alert triage — classify and prioritize a security alert."""
ok, err, _ = _check_auth_and_rate()
if not ok:
_inc("errors_total"); return _error_response(err, 401 if "Unauthorized" in err else 429, mode="soc")
_inc("requests_total"); _inc("requests_by_endpoint", "/v1/triage"); _inc("requests_by_mode", "soc")
data = request.get_json(force=True)
alert = data.get("alert", "")
if not alert:
_inc("errors_total"); return _error_response("alert required", 400, mode="soc")
prompt = (
f"Triage the following security alert. Classify severity (Critical/High/Medium/Low/Info), "
f"identify the MITRE ATT&CK technique(s), determine if it is a True Positive or likely False "
f"Positive, list immediate containment actions, and recommend next steps.\n\nAlert:\n{alert}"
)
messages = [
{"role": "system", "content": SOC_SYSTEM},
{"role": "user", "content": prompt},
]
resp = client.chat.completions.create(
model=model, messages=messages, max_tokens=1200,
temperature=0.4, top_p=0.95,
)
triage_text = _extract_content(resp)
if not triage_text:
_inc("errors_total"); return _error_response("model returned empty response", 502, mode="soc")
return jsonify({"triage": triage_text, "model": model})
@app.route("/v1/hunt", methods=["POST"])
def hunt_endpoint():
"""Threat hunting query generator — generate SIEM queries for a given TTP."""
ok, err, _ = _check_auth_and_rate()
if not ok:
_inc("errors_total"); return _error_response(err, 401 if "Unauthorized" in err else 429, mode="soc")
_inc("requests_total"); _inc("requests_by_endpoint", "/v1/hunt"); _inc("requests_by_mode", "soc")
data = request.get_json(force=True)
target = data.get("target", "")
siem = data.get("siem", "splunk")
if not target:
_inc("errors_total"); return _error_response("target required", 400, mode="soc")
prompt = (
f"Generate a {siem.upper()} threat hunting query for: {target}\n"
f"Include: the query, what data sources are needed, expected fields to review, "
f"and MITRE ATT&CK mapping. Add comments to explain the logic."
)
messages = [
{"role": "system", "content": SOC_SYSTEM},
{"role": "user", "content": prompt},
]
resp = client.chat.completions.create(
model=model, messages=messages, max_tokens=1200,
temperature=0.4, top_p=0.95,
)
query_text = _extract_content(resp)
if not query_text:
_inc("errors_total"); return _error_response("model returned empty response", 502, mode="soc")
return jsonify({"query": query_text, "siem": siem, "model": model})
@app.route("/v1/respond", methods=["POST"])
def respond_endpoint():
"""Incident response guidance — PICERL playbook for an incident type."""
ok, err, _ = _check_auth_and_rate()
if not ok:
_inc("errors_total"); return _error_response(err, 401 if "Unauthorized" in err else 429, mode="soc")
_inc("requests_total"); _inc("requests_by_endpoint", "/v1/respond"); _inc("requests_by_mode", "soc")
data = request.get_json(force=True)
incident_type = data.get("incident", "")
if not incident_type:
_inc("errors_total"); return _error_response("incident required", 400, mode="soc")
prompt = (
f"Provide a detailed PICERL incident response playbook for: {incident_type}\n"
f"For each phase (Prepare, Identify, Contain, Eradicate, Recover, Lessons Learned), "
f"provide specific actions, tools to use, evidence to collect, and stakeholder communication steps."
)
messages = [
{"role": "system", "content": SOC_SYSTEM},
{"role": "user", "content": prompt},
]
resp = client.chat.completions.create(
model=model, messages=messages, max_tokens=1500,
temperature=0.4, top_p=0.95,
)
playbook_text = _extract_content(resp)
if not playbook_text:
_inc("errors_total"); return _error_response("model returned empty response", 502, mode="soc")
return jsonify({"playbook": playbook_text, "incident": incident_type, "model": model})
@app.route("/v1/code", methods=["POST"])
def code_endpoint():
"""Security code generation — uses Qwen 2.5 Coder 32B for best results."""
ok, err, _ = _check_auth_and_rate()
if not ok:
_inc("errors_total"); return _error_response(err, 401 if "Unauthorized" in err else 429, mode="code")
_inc("requests_total"); _inc("requests_by_endpoint", "/v1/code"); _inc("requests_by_mode", "code")
data = request.get_json(force=True)
task = data.get("task", "")
language = data.get("language", "")
if not task:
_inc("errors_total"); return _error_response("task required", 400, mode="code")
lang_hint = f" Write the solution in {language}." if language else ""
prompt = f"{task}{lang_hint}\nProvide working, production-ready code with comments."
messages = [
{"role": "system", "content": CODE_SYSTEM},
{"role": "user", "content": prompt},
]
# Prefer the coder model; fall back to whatever is configured
code_model = os.getenv("HANCOCK_CODER_MODEL", CODER_MODEL)
resp = client.chat.completions.create(
model=code_model, messages=messages, max_tokens=2048,
temperature=0.2, top_p=0.7,
)
code_text = _extract_content(resp)
if not code_text:
_inc("errors_total"); return _error_response("model returned empty response", 502, mode="code")
return jsonify({
"code": code_text,
"model": code_model,
"language": language or "auto",
"task": task,
})
@app.route("/v1/ciso", methods=["POST"])
def ciso_endpoint():
"""CISO advisor — risk, compliance, board reporting, framework guidance."""
ok, err, _ = _check_auth_and_rate()
if not ok:
_inc("errors_total"); return _error_response(err, 401 if "Unauthorized" in err else 429, mode="ciso")
_inc("requests_total"); _inc("requests_by_endpoint", "/v1/ciso"); _inc("requests_by_mode", "ciso")
data = request.get_json(force=True)
question = data.get("question", "") or data.get("query", "") or data.get("message", "")
context = data.get("context", "")
output = data.get("output", "advice")
if not question:
_inc("errors_total"); return _error_response("question required", 400, mode="ciso")
output_hints = {
"report": "Format your response as a structured risk report with Executive Summary, Findings, Risk Ratings, and Recommendations.",
"gap-analysis": "Format your response as a gap analysis table: Control | Current State | Target State | Gap | Priority.",
"board-summary": "Format your response as a concise board-ready executive summary (max 300 words, no jargon, business impact focus).",
"advice": "",
}
hint = output_hints.get(output, "")
ctx_line = f"\n\nOrganisation context: {context}" if context else ""
prompt = f"{question}{ctx_line}\n\n{hint}".strip()
messages = [
{"role": "system", "content": CISO_SYSTEM},
{"role": "user", "content": prompt},
]
resp = client.chat.completions.create(
model=model, messages=messages, max_tokens=2048,
temperature=0.3, top_p=0.95,
)
answer = _extract_content(resp)
if not answer:
_inc("errors_total"); return _error_response("model returned empty response", 502, mode="ciso")
return jsonify({"advice": answer, "output": output, "model": model})
@app.route("/v1/sigma", methods=["POST"])
def sigma_endpoint():
"""Sigma detection rule generator — convert a TTP or alert description into a Sigma rule."""
ok, err, _ = _check_auth_and_rate()
if not ok:
_inc("errors_total"); return _error_response(err, 401 if "Unauthorized" in err else 429, mode="sigma")
_inc("requests_total"); _inc("requests_by_endpoint", "/v1/sigma"); _inc("requests_by_mode", "sigma")
data = request.get_json(force=True)
description = data.get("description", "") or data.get("ttp", "") or data.get("query", "")
logsource = data.get("logsource", "") # e.g. "windows sysmon", "linux auditd", "aws cloudtrail"
technique = data.get("technique", "") # e.g. "T1059.001" — auto-tagged if provided
if not description:
_inc("errors_total"); return _error_response("description required", 400, mode="sigma")
hints = []
if logsource:
hints.append(f"Target log source: {logsource}.")
if technique:
hints.append(f"MITRE ATT&CK technique: {technique} — use this in the tags field.")
hint_text = " ".join(hints)
prompt = (
f"Write a complete, production-ready Sigma rule for the following:\n\n"
f"{description}\n\n"
f"{hint_text}\n\n"
f"Output the full YAML rule first, then a brief explanation of what it detects "
f"and any false positive tuning advice."
).strip()
messages = [
{"role": "system", "content": SIGMA_SYSTEM},
{"role": "user", "content": prompt},
]
resp = client.chat.completions.create(
model=model, messages=messages, max_tokens=2048,
temperature=0.2, top_p=0.7,
)
rule_text = _extract_content(resp)
if not rule_text:
_inc("errors_total"); return _error_response("model returned empty response", 502, mode="sigma")
return jsonify({
"rule": rule_text,
"logsource": logsource or "auto",
"technique": technique or "auto",
"model": model,
})
@app.route("/v1/yara", methods=["POST"])
def yara_endpoint():
"""YARA malware detection rule generator."""
ok, err, _ = _check_auth_and_rate()
if not ok:
_inc("errors_total"); return _error_response(err, 401 if "Unauthorized" in err else 429, mode="yara")
_inc("requests_total"); _inc("requests_by_endpoint", "/v1/yara"); _inc("requests_by_mode", "yara")
data = request.get_json(force=True)
description = data.get("description", "") or data.get("malware", "") or data.get("query", "")
file_type = data.get("file_type", "") # e.g. "PE", "Office macro", "PDF", "script"
sample_hash = data.get("hash", "") # optional SHA256 for meta
if not description:
_inc("errors_total"); return _error_response("description required", 400, mode="yara")
hints = []
if file_type:
hints.append(f"Target file type: {file_type}.")
if sample_hash:
hints.append(f"Known sample hash: {sample_hash} — include in rule meta.")
hint_text = " ".join(hints)
prompt = (
f"Write a complete, production-ready YARA rule for the following:\n\n"
f"{description}\n\n"
f"{hint_text}\n\n"
f"Output the full YARA rule first, then a brief explanation of what it detects "
f"and any known false positive sources."
).strip()
messages = [
{"role": "system", "content": YARA_SYSTEM},
{"role": "user", "content": prompt},
]
resp = client.chat.completions.create(
model=model, messages=messages, max_tokens=2048,
temperature=0.2, top_p=0.7,
)
rule_text = _extract_content(resp)
if not rule_text:
_inc("errors_total"); return _error_response("model returned empty response", 502, mode="yara")
return jsonify({
"rule": rule_text,
"file_type": file_type or "auto",
"model": model,
})
@app.route("/v1/ioc", methods=["POST"])
def ioc_endpoint():
"""IOC enrichment — threat intel report for IP, domain, URL, hash, or email."""
ok, err, _ = _check_auth_and_rate()
if not ok:
_inc("errors_total"); return _error_response(err, 401 if "Unauthorized" in err else 429, mode="ioc")
_inc("requests_total"); _inc("requests_by_endpoint", "/v1/ioc"); _inc("requests_by_mode", "ioc")
data = request.get_json(force=True)
indicator = (data.get("indicator") or data.get("ioc") or data.get("query") or "").strip()
ioc_type = data.get("type", "auto")
context = data.get("context", "")
if not indicator:
_inc("errors_total"); return _error_response("indicator required", 400, mode="ioc")