-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathaci2k8s.py
More file actions
130 lines (111 loc) · 4.53 KB
/
aci2k8s.py
File metadata and controls
130 lines (111 loc) · 4.53 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
#!/usr/bin/env python
###################################################################################
# Written by Wei, Hang #
# weihanghank@gmail.com #
###################################################################################
"""
Creates networkpolicies using apiVersion: networking.k8s.io/v1 based on MMS Network Template Instance
This script does not create namespaces. Make sure all namespaces have been created.
"""
from __future__ import print_function
import time
from os import path
import yaml
import json
from kubernetes import client, config
from kubernetes.client.rest import ApiException
from pprint import pprint
from my_py.k8s.data_input import read_data
DRY_RUN = False # if it's False, means all policies will be delivered to cluster
DATA = read_data() # input data from MMS or other policy orchestrator
# Following is the template to render REST API body
BASE_TEMPLATE = '''
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: {mynet}
spec:
podSelector:
matchLabels:
mms_network_tag: {mynet}
policyTypes:
- Ingress
ingress:
- from:
- namespaceSelector: {{}}
podSelector:
matchLabels:
mms_network_tag: {mynet}
'''
FROM_TEMPLATE = '''
- from:
- namespaceSelector: {{}}
podSelector:
matchExpressions:
- {{key: mms_network_tag, operator: In, values: [{yournets}]}}
'''
PORTS_TEMPLATE = '''
- protocol: {protocol}
port: {port}
'''
def get_body():
"""
Create a Network Policy API body for each namespace, return a dictionary
"""
_body = {}
for contract in DATA["contracts"]:
pnets = DATA["contracts"][contract]["provide_networks"]
cnets = DATA["contracts"][contract]["consume_networks"]
nets = list(set(pnets + cnets)) # all nets but eliminate redundancy
for net in nets:
if not _body.get(net):
_body[net] = BASE_TEMPLATE.format(mynet=net)
if net in pnets:
if net in cnets: # for those both in provide and consume
_body[net] += FROM_TEMPLATE.format(yournets=", ".join(nets)) # allow all other net in
else: # 'else' means 'provide only'
_body[net] += FROM_TEMPLATE.format(yournets=", ".join(cnets)) # only allow consume in
if DATA["contracts"][contract].get("ports"):
_body[net] += " ports: "
for port in DATA["contracts"][contract]["ports"]:
_body[net] += PORTS_TEMPLATE.format(protocol=port["protocol"], port=port["port"])
else: # for those consume net only
_body[net] += FROM_TEMPLATE.format(yournets=", ".join(pnets)) # allow only provide net in
if DATA.get("expose"):
for ex in DATA["expose"]:
net = ex["network"]
cidr = ex["cidr"]
except_list = ex["except"]
_body[net] += "\n - from: \n - ipBlock: \n cidr: {}\n".format(cidr)
if except_list:
_body[net] += " except: \n - "
_body[net] += "\n - ".join(except_list)
_body[net] += "\n"
if ex.get("ports"):
_body[net] += " ports: "
for port in ex["ports"]:
_body[net] += PORTS_TEMPLATE.format(protocol=port["protocol"], port=port["port"])
return _body
def main():
# Configs can be set in Configuration class directly or using helper
# utility. If no argument provided, the config will be loaded from
# default location.
bodies = get_body()
for net in bodies:
print(bodies[net])
config.load_kube_config()
try:
api_instance = client.NetworkingV1Api()
for namespace in DATA["namespaces"]:
for net in bodies:
body = yaml.load(bodies[net], Loader=yaml.FullLoader)
if DRY_RUN:
api_response = api_instance.create_namespaced_network_policy(namespace, body, pretty="true",
dry_run="All")
else:
api_response = api_instance.create_namespaced_network_policy(namespace, body, pretty="true")
# pprint(api_response) # used for diagnostics
except ApiException as e:
print("Exception when calling APIs: %s\n" % e)
if __name__ == '__main__':
main()