forked from splunk/security_content
-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathgenerate.py
More file actions
362 lines (282 loc) · 15.1 KB
/
generate.py
File metadata and controls
362 lines (282 loc) · 15.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
#!/usr/bin/python
'''
Generates splunk configurations from manifest files under the security-content repo.
'''
import glob
import yaml
import argparse
from os import path
import sys
import datetime
from jinja2 import Environment, FileSystemLoader
# global variables
REPO_PATH = ''
VERBOSE = False
OUTPUT_PATH = ''
def load_objects(file_path):
files = []
manifest_files = path.join(path.expanduser(REPO_PATH), file_path)
for file in glob.glob(manifest_files):
files.append(load_file(file))
return files
def load_file(file_path):
with open(file_path, 'r') as stream:
try:
file = list(yaml.safe_load_all(stream))[0]
except yaml.YAMLError as exc:
print(exc)
sys.exit("ERROR: reading {0}".format(file_path))
return file
def generate_transforms_conf(lookups):
sorted_lookups = sorted(lookups, key=lambda i: i['name'])
utc_time = datetime.datetime.utcnow().replace(microsecond=0).isoformat()
j2_env = Environment(loader=FileSystemLoader('bin/jinja2_templates'),
trim_blocks=True)
template = j2_env.get_template('transforms.j2')
output_path = OUTPUT_PATH + "/default/transforms.conf"
output = template.render(lookups=sorted_lookups, time=utc_time)
with open(output_path, 'w') as f:
f.write(output)
return output_path
def generate_savedsearches_conf(detections, investigations, baselines):
utc_time = datetime.datetime.utcnow().replace(microsecond=0).isoformat()
j2_env = Environment(loader=FileSystemLoader('bin/jinja2_templates'),
trim_blocks=True)
template = j2_env.get_template('savedsearches.j2')
output_path = OUTPUT_PATH + "/default/savedsearches.conf"
output = template.render(detections=detections, investigations=investigations, baselines=baselines, time=utc_time)
with open(output_path, 'w') as f:
output = output.encode('ascii', 'ignore').decode('ascii')
f.write(output)
return output_path
def generate_analytics_story_conf(stories):
utc_time = datetime.datetime.utcnow().replace(microsecond=0).isoformat()
j2_env = Environment(loader=FileSystemLoader('bin/jinja2_templates'),
trim_blocks=True)
template = j2_env.get_template('analytic_stories.j2')
output_path = OUTPUT_PATH + "/default/analytic_stories.conf"
output = template.render(stories=stories, time=utc_time)
with open(output_path, 'w') as f:
f.write(output)
return output_path
def generate_use_case_library_conf(stories, detections, investigations, baselines):
utc_time = datetime.datetime.utcnow().replace(microsecond=0).isoformat()
j2_env = Environment(loader=FileSystemLoader('bin/jinja2_templates'),
trim_blocks=True)
template = j2_env.get_template('use_case_library.j2')
output_path = OUTPUT_PATH + "/default/use_case_library.conf"
output = template.render(stories=stories, detections=detections,
investigations=investigations,
baselines=baselines, time=utc_time)
with open(output_path, 'w') as f:
f.write(output)
return output_path
def generate_macros_conf(macros):
utc_time = datetime.datetime.utcnow().replace(microsecond=0).isoformat()
j2_env = Environment(loader=FileSystemLoader('bin/jinja2_templates'),
trim_blocks=True)
template = j2_env.get_template('macros.j2')
output_path = OUTPUT_PATH + "/default/macros.conf"
output = template.render(macros=macros, time=utc_time)
with open(output_path, 'w') as f:
f.write(output)
return output_path
def identify_next_steps(detections, investigations):
enriched_detections = []
for detection in detections:
if 'splunk' in detection['detect']:
if 'correlation_rule' in detection['detect']['splunk']:
investigations_output = ""
has_phantom = False
next_steps = ""
if 'investigations' in detection:
for i in detection['investigations']:
if i['type'] == 'splunk':
investigations_output += "ESCU - {0}\\n".format(i['name'])
next_steps = "{\"version\": 1, \"data\": \"Recommended following steps:\\n\\n"
next_steps += "1.[[action|escu_investigate]]: Based on ESCU investigate \
recommendations:\\ n%s\"}" % investigations_output
if i['type'] == 'phantom':
has_phantom = True
# lets pull the playbook URL out from investigation object
playbook_url = ''
for inv in investigations:
if i['name'] == inv['name']:
playbook_url = inv['investigate']['phantom']['playbook_url']
# construct next steps with the playbook info
playbook_next_steps_string = "Splunk>Phantom Response Playbook - Monitor enrichment of the \
Splunk>Phantom Playbook called " + str(i['name']) + " and answer any \
analyst prompt in Mission Control with a response decision. \
Link to the playbook " + str(playbook_url)
next_steps = "{\"version\": 1, \"data\": \"Recommended following"
next_steps += ":\\n\\n1. [[action|runphantomplaybook]]: Phantom playbook "
next_steps += "recommendations:\\n%s\\n2. [[action|escu_investigate]]: " % (playbook_next_steps_string)
next_steps += "Based on ESCU investigate recommendations:\\n%s\"}" % (investigations_output)
if has_phantom:
detection['recommended_actions'] = 'runphantomplaybook, escu_investigate'
enriched_detections.append(detection)
return enriched_detections
def map_investigations_to_detection(detections):
inv_det = {}
for detection in detections:
if 'investigations' in detection:
for investigation in detection['investigations']:
if not (investigation['id'] in inv_det):
inv_det[investigation['id']] = {detection['id']}
else:
inv_det[investigation['id']].add(detection['id'])
return inv_det
def map_baselines_to_detection(detections):
bas_det = {}
for detection in detections:
if 'baselines' in detection:
for baseline in detection['baselines']:
if not (baseline['id'] in bas_det):
bas_det[baseline['id']] = {detection['id']}
else:
bas_det[baseline['id']].add(detection['id'])
return bas_det
def map_detection_to_stories(stories):
det_sto = {}
for story in stories:
for detection in story['detections']:
if not (detection['detection_id'] in det_sto):
det_sto[detection['detection_id']] = {story['name']}
else:
det_sto[detection['detection_id']].add(story['name'])
return det_sto
def enrich_investigations_with_stories(investigations, map_inv_det, map_det_sto):
enriched_investigations = []
for investigation in investigations:
stories_set = set()
if investigation['id'] in map_inv_det:
for detection_id in map_inv_det[investigation['id']]:
if detection_id in map_det_sto:
stories_set = stories_set | map_det_sto[detection_id]
investigation['stories'] = sorted(list(stories_set))
enriched_investigations.append(investigation)
return enriched_investigations
def enrich_detections_with_stories(detections, map_det_sto):
enriched_detections = []
for detection in detections:
stories_set = set()
if detection['id'] in map_det_sto:
stories_set = stories_set | map_det_sto[detection['id']]
detection['stories'] = sorted(list(stories_set))
enriched_detections.append(detection)
return enriched_detections
def enrich_baselines_with_stories(baselines, map_bas_det, map_det_sto):
enriched_baselines = []
for baseline in baselines:
stories_set = set()
if baseline['id'] in map_bas_det:
for baseline_id in map_bas_det[baseline['id']]:
if baseline_id in map_det_sto:
stories_set = stories_set | map_det_sto[baseline_id]
baseline['stories'] = sorted(list(stories_set))
enriched_baselines.append(baseline)
return enriched_baselines
def enrich_stories(stories, detections, investigations, baselines):
enriched_stories = []
for story in stories:
providing_technologies = set()
data_models = set()
detection_names = []
mappings = dict()
mappings["cis20"] = set()
mappings["kill_chain_phases"] = set()
mappings["mitre_attack"] = set()
mappings["nist"] = set()
mappings["mitre_technique_id"] = set()
searches = []
for detection in story['detections']:
for detection_obj in detections:
if detection['detection_id'] == detection_obj['id']:
if 'providing_technologies' in detection_obj['data_metadata']:
providing_technologies = providing_technologies | set(detection_obj
['data_metadata']['providing_technologies'])
if 'data_models' in detection_obj['data_metadata']:
data_models = data_models | set(detection_obj['data_metadata']['data_models'])
if detection_obj['type'] == 'splunk':
detection_names.append("ESCU - " + detection_obj['name'] + " - Rule")
for key in detection_obj['mappings']:
mappings[key] = mappings[key] | set(detection_obj['mappings'][key])
for key in mappings.keys():
mappings[key] = sorted(list(mappings[key]))
story['mappings'] = mappings
story['detection_names'] = sorted(detection_names)
searches = sorted(detection_names)
investigation_names = []
for investigation in investigations:
for s in investigation['stories']:
if s == story['name']:
if 'providing_technologies' in investigation['data_metadata']:
providing_technologies = providing_technologies | set(investigation
['data_metadata']['providing_technologies'])
if 'data_models' in investigation['data_metadata']:
data_models = data_models | set(investigation['data_metadata']['data_models'])
if investigation['type'] == 'splunk':
investigation_names.append("ESCU - " + investigation['name'])
story['investigation_names'] = sorted(investigation_names)
searches = searches + sorted(investigation_names)
baseline_names = []
for baseline in baselines:
for s in baseline['stories']:
if s == story['name']:
if 'providing_technologies' in baseline['data_metadata']:
providing_technologies = providing_technologies | set(baseline['data_metadata']['providing_technologies'])
if 'data_models' in baseline['data_metadata']:
data_models = data_models | set(baseline['data_metadata']['data_models'])
if baseline['type'] == 'splunk':
baseline_names.append("ESCU - " + baseline['name'])
story['baseline_names'] = sorted(baseline_names)
searches = searches + sorted(baseline_names)
story['providing_technologies'] = sorted(list(providing_technologies))
story['data_models'] = sorted(list(data_models))
story['searches'] = searches
enriched_stories.append(story)
return enriched_stories
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="generates splunk conf files out of security-content manifests", epilog="""
This tool converts manifests to the source files to be used by products like Splunk Enterprise.
It generates the savesearches.conf, analytics_stories.conf files for ES.""")
parser.add_argument("-p", "--path", required=True, help="path to security-content repo")
parser.add_argument("-o", "--output", required=True, help="path to the output directory")
parser.add_argument("-v", "--verbose", required=False, default=False, action='store_true', help="prints verbose output")
# parse them
args = parser.parse_args()
REPO_PATH = args.path
OUTPUT_PATH = args.output
VERBOSE = args.verbose
stories = load_objects("stories/*.yml")
macros = load_objects("macros/*.yml")
lookups = load_objects("lookups/*.yml")
detections = load_objects("detections/*.yml")
investigations = load_objects("investigations/*.yml")
baselines = load_objects("baselines/*.yml")
detections = identify_next_steps(detections, investigations)
map_inv_det = map_investigations_to_detection(detections)
map_det_sto = map_detection_to_stories(stories)
map_bas_det = map_baselines_to_detection(detections)
detections = enrich_detections_with_stories(detections, map_det_sto)
investigations = enrich_investigations_with_stories(investigations, map_inv_det, map_det_sto)
baselines = enrich_baselines_with_stories(baselines, map_bas_det, map_det_sto)
stories = enrich_stories(stories, detections, investigations, baselines)
lookups_path = generate_transforms_conf(lookups)
detections = sorted(detections, key=lambda d: d['name'])
investigations = sorted(investigations, key=lambda i: i['name'])
baselines = sorted(baselines, key=lambda b: b['name'])
detection_path = generate_savedsearches_conf(detections, investigations, baselines)
stories = sorted(stories, key=lambda s: s['name'])
story_path = generate_analytics_story_conf(stories)
use_case_lib_path = generate_use_case_library_conf(stories, detections, investigations, baselines)
macros = sorted(macros, key=lambda m: m['name'])
macros_path = generate_macros_conf(macros)
if VERBOSE:
print("{0} stories have been successfully written to {1}".format(len(stories), story_path))
print("{0} stories have been successfully written to {1}".format(len(stories), use_case_lib_path))
print("{0} detections have been successfully written to {1}".format(len(detections), detection_path))
print("{0} investigations have been successfully written to {1}".format(len(investigations), detection_path))
print("{0} baselines have been successfully written to {1}".format(len(baselines), detection_path))
print("{0} macros have been successfully written to {1}".format(len(macros), macros_path))
print("security content generation completed..")