forked from shaniacht1/content
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathautomation-CheckDockerImageAvailable.yml
More file actions
151 lines (135 loc) · 5.47 KB
/
automation-CheckDockerImageAvailable.yml
File metadata and controls
151 lines (135 loc) · 5.47 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
args:
- description: 'Docker image full name with version: For example: demisto/python:2.7.15.155'
name: input
required: true
- auto: PREDEFINED
defaultValue: "yes"
description: Use system proxy settings
name: use_system_proxy
predefined:
- "yes"
- "no"
- auto: PREDEFINED
defaultValue: "no"
description: Trust any certificate (not secure)
name: trust_any_certificate
predefined:
- "yes"
- "no"
comment: Check if a docker image is available for performing docker pull. Script simulates
the docker pull flow but doesn't actually pull the image. Returns an entry with
'ok' if all is good otherwise will return an error.
commonfields:
id: CheckDockerImageAvailable
version: -1
enabled: true
name: CheckDockerImageAvailable
runas: DBotWeakRole
runonce: false
script: |2-
import requests
import re
requests.packages.urllib3.disable_warnings()
ACCEPT_HEADER = {
'Accept': "application/json, "
"application/vnd.docker.distribution.manifest.v2+json, "
"application/vnd.docker.distribution.manifest.list.v2+json"
}
# use 10 seconds timeout for requests
TIMEOUT = 10
DEFAULT_REGISTRY = "registry-1.docker.io"
def parse_www_auth(www_auth):
"""Parse realm and service from www-authenticate string of the form:
Bearer realm="https://auth.docker.io/token",service="registry.docker.io"
:param www_auth: www-authenticate header value
:type www_auth: string
"""
match = re.match(r'.*realm="(.+)",service="(.+)".*', www_auth, re.IGNORECASE)
if not match:
return None
return (match.group(1), match.group(2))
def docker_auth(image_name, verify_ssl=True, registry=DEFAULT_REGISTRY):
"""
Authenticate to the docker service. Return an authentication token if authentication is required.
"""
res = requests.get("https://{}/v2/".format(registry), headers=ACCEPT_HEADER,
timeout=TIMEOUT, verify=verify_ssl)
if res.status_code == 401: # need to authenticate
# defaults in case we fail for some reason
realm = "https://auth.docker.io/token"
service = "registry.docker.io"
# Shold contain header: Www-Authenticate
www_auth = res.headers.get('www-authenticate')
if www_auth:
parse_auth = parse_www_auth(www_auth)
if parse_auth:
realm, service = parse_auth
else:
demisto.info('Failed parsing www-authenticate header: {}'.format(www_auth))
else:
demisto.info('Failed extracting www-authenticate header from registry: {}, final url: {}'.format(
registry, res.url))
res = requests.get(
"{}?scope=repository:{}:pull&service={}".format(realm, image_name, service),
headers=ACCEPT_HEADER, timeout=TIMEOUT, verify=verify_ssl)
res.raise_for_status()
res_json = res.json()
return res_json.get('token')
else:
res.raise_for_status()
return None
def docker_min_layer(layers):
def layer_size(l):
return l['size']
return min(layers, key=layer_size)
def main():
if demisto.args().get('use_system_proxy') == 'no':
del os.environ['HTTP_PROXY']
del os.environ['HTTPS_PROXY']
del os.environ['http_proxy']
del os.environ['https_proxy']
verify_ssl = demisto.args().get('trust_any_certificate') != 'yes'
docker_full_name = demisto.args()['input']
registry = DEFAULT_REGISTRY
image_name_tag = docker_full_name
if docker_full_name.count('/') > 1:
registry, image_name_tag = docker_full_name.split('/', 1)
try:
split = image_name_tag.split(':')
image_name = split[0]
tag = 'latest'
if len(split) > 1:
tag = split[1]
if tag is None:
tag = 'latest'
auth_token = docker_auth(image_name, verify_ssl, registry)
headers = ACCEPT_HEADER.copy()
if auth_token:
headers['Authorization'] = "Bearer {}".format(auth_token)
res = requests.get("https://{}/v2/{}/manifests/{}".format(registry, image_name, tag),
headers=headers, timeout=TIMEOUT, verify=verify_ssl)
res.raise_for_status()
layers = res.json().get('layers')
if not layers:
raise ValueError("No 'layers' found in json response: {}".format(res.content))
layer_min = docker_min_layer(layers)
headers['Range'] = "bytes=0-99"
res = requests.get("https://{}/v2/{}/blobs/{}".format(registry, image_name, layer_min['digest']),
headers=headers, timeout=TIMEOUT, verify=verify_ssl)
res.raise_for_status()
expected_len = min([100, layer_min['size']])
cont_len = len(res.content)
demisto.info("Docker image check [{}] downloaded layer content of len: {}".format(docker_full_name, cont_len))
if cont_len < expected_len:
raise ValueError('Content returned is shorter than expected length: {}. Content: {}'.format(expected_len,
res.content))
demisto.results('ok')
except Exception as ex:
return_error("Failed verifying: {}. Err: {}".format(docker_full_name, str(ex)))
# python2 uses __builtin__ python3 uses builtins
if __name__ == "__builtin__" or __name__ == "builtins":
main()
scripttarget: 0
system: true
tags: []
type: python