diff --git a/daemon/main.py b/daemon/main.py index c0a8573..80b1e53 100644 --- a/daemon/main.py +++ b/daemon/main.py @@ -6,7 +6,7 @@ from requests import get from werkzeug import Response -from detector.config import KUBE_PROXY_IP, KUBE_PROXY_PORT, AGENT_PORT, DAEMON_PORT +from detector.config import KUBE_PROXY_IP, AGENT_PORT, DAEMON_PORT, REQUESTS_TOKEN, KUBE_PROXY_PORT app = Flask(__name__) CORS(app) @@ -17,25 +17,47 @@ def get_nodes(): global nodes nodes = list() - response = requests.get("http://" + KUBE_PROXY_IP + ":" + str(KUBE_PROXY_PORT) + "/api/v1/nodes").json()["items"] - for node in response: - try: - if requests.get( - "http://" + node["status"]["addresses"][0]["address"] + ":" + str(AGENT_PORT), timeout=2).status_code == 200: + if REQUESTS_TOKEN is None: + response = requests.get("http://" + KUBE_PROXY_IP + ":" + str(KUBE_PROXY_PORT) + "/api/v1/nodes").json()["items"] + for node in response: + try: + if requests.get( + "http://" + node["status"]["addresses"][0]["address"] + ":" + str(AGENT_PORT), timeout=2).status_code == 200: + nodes.append({ + "name": node["metadata"]["name"], + "ip_address": node["status"]["addresses"][0]["address"], + "agent": "true" + }) + else: + raise Exception + except Exception: nodes.append({ "name": node["metadata"]["name"], "ip_address": node["status"]["addresses"][0]["address"], - "agent": "true" + "agent": "false" + }) + else: + response = requests.get("https://" + KUBE_PROXY_IP + "/api/v1/pods", + headers={'Authorization': 'Bearer ' + REQUESTS_TOKEN}).json()["items"] + for pod in response: + ip = pod["status"]["podIP"] + name = pod["spec"]["nodeName"] + try: + if pod["metadata"]["name"].startswith("detector-agent-"): + if requests.get(f"http://{ip}:{AGENT_PORT}", timeout=2).status_code == 200: + nodes.append({ + "name": name, + "ip_address": ip, + "agent": "true" + }) + else: + raise Exception + except Exception: + nodes.append({ + "name": pod["metadata"]["name"], + "ip_address": pod["status"]["addresses"][0]["address"], + "agent": "false" }) - else: - raise Exception - except Exception: - nodes.append({ - "name": node["metadata"]["name"], - "ip_address": node["status"]["addresses"][0]["address"], - "agent": "false" - }) - @app.route('/', methods=["GET"]) @app.route('/status', methods=["GET"]) @@ -52,7 +74,10 @@ def agents(): @app.route('/proxy/') def proxy(path): - response = Response(get(f'{path}').content) + if REQUESTS_TOKEN is None: + response = Response(get(f'{path}').content) + else: + response = Response(get(f'{path}', headers={'Authorization': 'Bearer ' + REQUESTS_TOKEN}).content) response.headers["Access-Control-Allow-Origin"] = "*" return response diff --git a/deploy/Dockerfile b/deploy/Dockerfile new file mode 100644 index 0000000..dc4ef54 --- /dev/null +++ b/deploy/Dockerfile @@ -0,0 +1,10 @@ +FROM docker.io/sysdig/sysdig +RUN yum install python3.8 -y +ADD requirements.txt /detector/requirements.txt +WORKDIR /detector +RUN pip3 install -r requirements.txt +ADD setup.py /detector/setup.py +ADD agent /detector/agent +ADD daemon /detector/daemon +ADD detector /detector/detector +RUN pip3 install -e . diff --git a/deploy/README.md b/deploy/README.md new file mode 100644 index 0000000..7b68593 --- /dev/null +++ b/deploy/README.md @@ -0,0 +1,19 @@ +# Deploy instructions + +- To deploy the container: + ```shell + docker build . -f deploy/Dockerfile -t detector + ``` + +- To deploy in a Kubernetes cluster (one agent per node, one daemon, one detector, one database) + - Deploy the container in all nodes; + - Apply the manifest: + ```shell + kubectl apply -f deploy/detector.yml -n detector + ``` + - Wait a while until the setup completes; + - The dashboard is available through NodePort 31000 (i.e, `:31000`). + - To destroy the cluster: + ```shell + kubectl delete -f deploy/detector.yml -n detector + ``` diff --git a/deploy/detector.yml b/deploy/detector.yml new file mode 100644 index 0000000..885cc01 --- /dev/null +++ b/deploy/detector.yml @@ -0,0 +1,257 @@ +apiVersion: v1 +kind: Namespace +metadata: + name: detector + labels: + name: detector + +--- + +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: detector-role + namespace: detector +rules: +- apiGroups: ["apps", ""] + resources: ["namespaces", "nodes", "pods", "services", "deployments", "deployments.apps", "apps.deployments"] + verbs: ["get", "list", "watch"] + +--- + +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + name: detector-rbac +subjects: +- kind: ServiceAccount + name: default + namespace: detector +roleRef: + kind: ClusterRole + name: detector-role + apiGroup: rbac.authorization.k8s.io + + +--- + + +apiVersion: apps/v1 +kind: Deployment +metadata: + name: detector-daemon +spec: + selector: + matchLabels: + app: detector-daemon + template: + metadata: + labels: + app: detector-daemon + spec: + containers: + - name: detector-daemon + image: detector + env: + - name: KUBE_PROXY_IP + value: kubernetes.default.svc + - name: DAEMON_PORT + value: "9001" + - name: AGENT_PORT + value: "9002" + - name: REQUESTS_CA_BUNDLE + value: /var/run/secrets/kubernetes.io/serviceaccount/ca.crt + - name: REQUESTS_TOKEN_FILE + value: /var/run/secrets/kubernetes.io/serviceaccount/token + imagePullPolicy: Never + ports: + - containerPort: 9001 + command: + - "python3" + - "daemon/main.py" + +--- + +apiVersion: v1 +kind: Service +metadata: + name: detector-daemon +spec: + selector: + app: detector-daemon + type: ClusterIP + ports: + - port: 9001 + targetPort: 9001 + +--- + +apiVersion: apps/v1 +kind: DaemonSet +metadata: + name: detector-agent +spec: + selector: + matchLabels: + app: detector-agent + template: + metadata: + labels: + app: detector-agent + spec: + containers: + - name: detector-agent + image: detector + env: + - name: AGENT_PORT + value: "9002" + - name: WS_PORT + value: "9003" + imagePullPolicy: Never + ports: + - containerPort: 9002 + command: ["/bin/sh", "-c"] + args: ["/docker-entrypoint.sh; python3 agent/main.py;"] + securityContext: + privileged: true + volumeMounts: + - name: docker-sock + mountPath: /host/var/run/docker.sock + - name: dev + mountPath: /host/dev + - name: proc + mountPath: /host/proc + readOnly: true + - name: boot + mountPath: /host/boot + readOnly: true + - name: modules + mountPath: /host/lib/modules + readOnly: true + - name: usr + mountPath: /host/usr + readOnly: true + - name: etc + mountPath: /host/etc + readOnly: true + volumes: + - name: docker-sock + hostPath: + path: /var/run/docker.sock + type: Socket + - name: dev + hostPath: + path: /dev + type: Directory + - name: proc + hostPath: + path: /proc + type: Directory + - name: boot + hostPath: + path: /boot + type: Directory + - name: modules + hostPath: + path: /lib/modules + type: Directory + - name: usr + hostPath: + path: /usr + type: Directory + - name: etc + hostPath: + path: /etc + type: Directory + +--- + +apiVersion: apps/v1 +kind: Deployment +metadata: + name: detector-db +spec: + selector: + matchLabels: + app: detector-db + template: + metadata: + labels: + app: detector-db + spec: + containers: + - name: detector-db + image: redis:7.0.11 + imagePullPolicy: Always + ports: + - containerPort: 6379 + +--- + +apiVersion: v1 +kind: Service +metadata: + name: detector-db +spec: + selector: + app: detector-db + type: ClusterIP + ports: + - port: 6379 + targetPort: 6379 + +--- + +apiVersion: apps/v1 +kind: Deployment +metadata: + name: detector-detector +spec: + selector: + matchLabels: + app: detector-detector + template: + metadata: + labels: + app: detector-detector + spec: + containers: + - name: detector-detector + image: detector + env: + - name: KUBE_PROXY_IP + value: kubernetes.default.svc + - name: REDIS_IP + value: detector-db + - name: REDIS_PORT + value: "6379" + - name: DETECTOR_PORT + value: "5001" + - name: DAEMON_IP + value: detector-daemon + - name: DAEMON_PORT + value: "9001" + - name: WS_PORT + value: "9003" + - name: REQUESTS_TOKEN_FILE + value: /var/run/secrets/kubernetes.io/serviceaccount/token + imagePullPolicy: Never + ports: + - containerPort: 5001 + command: + - "python3" + - "detector/main.py" + +--- + +apiVersion: v1 +kind: Service +metadata: + name: detector-detector +spec: + selector: + app: detector-detector + type: NodePort + ports: + - port: 5001 + nodePort: 31000 diff --git a/detector/api.py b/detector/api.py index 86026d7..e2ff69e 100644 --- a/detector/api.py +++ b/detector/api.py @@ -5,8 +5,7 @@ import requests from flask import Blueprint, make_response, request, jsonify -from detector.config import DAEMON_IP, DAEMON_PORT, WS_PORT, WS_MAX_SIZE, ALGORITHMS_FILE, KUBE_PROXY_PORT, \ - KUBE_PROXY_IP +from detector.config import DAEMON_IP, DAEMON_PORT, WS_PORT, WS_MAX_SIZE, ALGORITHMS_FILE, KUBE_PROXY_IP, KUBE_PROXY_PORT, REQUESTS_TOKEN from detector.utils import start_monitoring, stop_monitoring, redis_connection, start_inspecting, stop_inspecting api = Blueprint("api", __name__, template_folder="templates") @@ -112,9 +111,14 @@ def api_resources_pods(): if request.args.get("namespace"): namespace = request.args.get("namespace") try: - pods += \ - requests.get("http://" + DAEMON_IP + ":" + str(DAEMON_PORT) + "/proxy/http://" + KUBE_PROXY_IP + ":" + str( - KUBE_PROXY_PORT) + "/api/v1/namespaces/" + namespace + "/pods", + if REQUESTS_TOKEN is None: + pods += \ + requests.get("http://" + DAEMON_IP + ":" + str(DAEMON_PORT) + "/proxy/http://" + KUBE_PROXY_IP + ":" + str( + KUBE_PROXY_PORT) + "/api/v1/namespaces/" + namespace + "/pods", + timeout=5).json()["items"] + else: + pods += \ + requests.get("http://" + DAEMON_IP + ":" + str(DAEMON_PORT) + "/proxy/https://" + KUBE_PROXY_IP + "/api/v1/namespaces/" + namespace + "/pods", timeout=5).json()["items"] except requests.exceptions.ConnectionError as e: print(e) @@ -128,11 +132,17 @@ def api_resources_services(): if request.args.get("namespace"): namespace = request.args.get("namespace") try: - services += \ - requests.get( - "http://" + DAEMON_IP + ":" + str(DAEMON_PORT) + "/proxy/http://" + KUBE_PROXY_IP + ":" + str( - KUBE_PROXY_PORT) + "/api/v1/namespaces/" + namespace + "/services", - timeout=5).json()["items"] + if REQUESTS_TOKEN is None: + services += \ + requests.get( + "http://" + DAEMON_IP + ":" + str(DAEMON_PORT) + "/proxy/http://" + KUBE_PROXY_IP + ":" + str( + KUBE_PROXY_PORT) + "/api/v1/namespaces/" + namespace + "/services", + timeout=5).json()["items"] + else: + services += \ + requests.get( + "http://" + DAEMON_IP + ":" + str(DAEMON_PORT) + "/proxy/https://" + KUBE_PROXY_IP + "/api/v1/namespaces/" + namespace + "/services", + timeout=5).json()["items"] except requests.exceptions.ConnectionError as e: print(e) return jsonify(services) @@ -145,11 +155,17 @@ def api_resources_deployments(): if request.args.get("namespace"): namespace = request.args.get("namespace") try: - deployments += \ - requests.get( - "http://" + DAEMON_IP + ":" + str(DAEMON_PORT) + "/proxy/http://" + KUBE_PROXY_IP + ":" + str( - KUBE_PROXY_PORT) + "/apis/apps/v1/namespaces/" + namespace + "/deployments", - timeout=5).json()["items"] + if REQUESTS_TOKEN is None: + deployments += \ + requests.get( + "http://" + DAEMON_IP + ":" + str(DAEMON_PORT) + "/proxy/http://" + KUBE_PROXY_IP + ":" + str( + KUBE_PROXY_PORT) + "/apis/apps/v1/namespaces/" + namespace + "/deployments", + timeout=5).json()["items"] + else: + deployments += \ + requests.get( + "http://" + DAEMON_IP + ":" + str(DAEMON_PORT) + "/proxy/https://" + KUBE_PROXY_IP + "/apis/apps/v1/namespaces/" + namespace + "/deployments", + timeout=5).json()["items"] except requests.exceptions.ConnectionError as e: print(e) return jsonify(deployments) @@ -159,9 +175,13 @@ def api_resources_deployments(): def api_resources_namespaces(): namespaces = list() try: - namespaces = \ - requests.get("http://" + DAEMON_IP + ":" + str(DAEMON_PORT) + "/proxy/http://" + KUBE_PROXY_IP + ":" + str( - KUBE_PROXY_PORT) + "/api/v1/namespaces", timeout=5).json()["items"] + if REQUESTS_TOKEN is None: + namespaces = \ + requests.get("http://" + DAEMON_IP + ":" + str(DAEMON_PORT) + "/proxy/http://" + KUBE_PROXY_IP + ":" + str( + KUBE_PROXY_PORT) + "/api/v1/namespaces", timeout=5).json()["items"] + else: + namespaces = \ + requests.get("http://" + DAEMON_IP + ":" + str(DAEMON_PORT) + "/proxy/https://" + KUBE_PROXY_IP + "/api/v1/namespaces", timeout=5).json()["items"] except requests.exceptions.ConnectionError as e: print(e) return jsonify(namespaces) @@ -171,9 +191,13 @@ def api_resources_namespaces(): def api_resources_nodes(): nodes = list() try: - nodes = \ - requests.get("http://" + DAEMON_IP + ":" + str(DAEMON_PORT) + "/proxy/http://" + KUBE_PROXY_IP + ":" + str( - KUBE_PROXY_PORT) + "/api/v1/nodes", timeout=5).json()["items"] + if REQUESTS_TOKEN is None: + nodes = \ + requests.get("http://" + DAEMON_IP + ":" + str(DAEMON_PORT) + "/proxy/http://" + KUBE_PROXY_IP + ":" + str( + KUBE_PROXY_PORT) + "/api/v1/nodes", timeout=5).json()["items"] + else: + nodes = \ + requests.get("http://" + DAEMON_IP + ":" + str(DAEMON_PORT) + "/proxy/https://" + KUBE_PROXY_IP + "/api/v1/nodes", timeout=5).json()["items"] except requests.exceptions.ConnectionError as e: print(e) return jsonify(nodes) diff --git a/detector/config.py b/detector/config.py index d707a00..7a1ea00 100644 --- a/detector/config.py +++ b/detector/config.py @@ -1,21 +1,26 @@ -DETECTOR_IP = "0.0.0.0" -DETECTOR_PORT = 5001 +import os +from pathlib import Path -DAEMON_IP = "10.3.1.45" -DAEMON_PORT = 9001 +DETECTOR_IP = os.environ.get('DETECTOR_IP', "0.0.0.0") +DETECTOR_PORT = os.environ.get('DETECTOR_PORT', 5001) -AGENT_PORT = 9002 +DAEMON_IP = os.environ.get('DAEMON_IP', "127.0.0.1") +DAEMON_PORT = os.environ.get('DAEMON_PORT', 9001) -KUBE_PROXY_IP = "127.0.0.1" -KUBE_PROXY_PORT = 8001 +AGENT_PORT = os.environ.get('AGENT_PORT', 9002) -REDIS_IP = "127.0.0.1" -REDIS_PORT = 6379 -REDIS_MAX_BATCH = 10 +KUBE_PROXY_IP = os.environ.get('KUBE_PROXY_IP', "127.0.0.1") +KUBE_PROXY_PORT = os.environ.get('KUBE_PROXY_PORT', 8001) -WS_PORT = 9003 -WS_MAX_SIZE = 1073741824 +REDIS_IP = os.environ.get('REDIS_IP', "127.0.0.1") +REDIS_PORT = os.environ.get('REDIS_PORT', 6379) +REDIS_MAX_BATCH = os.environ.get('REDIS_MAX_BATCH', 10) + +WS_PORT = os.environ.get('WS_PORT', 9003) +WS_MAX_SIZE = os.environ.get('WS_MAX_SIZE', 1073741824) # Algorithms -ALGORITHMS_FILE = "detector/algorithms/algorithms.json" +ALGORITHMS_FILE = os.environ.get('ALGORITHMS_FILE', "detector/algorithms/algorithms.json") +REQUESTS_TOKEN_FILE = os.environ.get('REQUESTS_TOKEN_FILE', None) +REQUESTS_TOKEN = Path(REQUESTS_TOKEN_FILE).read_text() if REQUESTS_TOKEN_FILE is not None else None diff --git a/detector/utils.py b/detector/utils.py index bdd60ae..f5a7096 100644 --- a/detector/utils.py +++ b/detector/utils.py @@ -92,39 +92,43 @@ def get_batch(host_ip): async def syscall_transfer(agent): - async with websockets.connect("ws://" + agent.ip_address + ":" + str(agent.ws_port), - max_size=agent.ws_max_size, ping_interval=None) as websocket: - try: - print("Receiving system calls from node \"" + agent.name + "\" (" + agent.ip_address + ")") - async for data in websocket: - # Handle the system calls received - if agent.syscalls_compression == "true": - decompressed_pickle = blosc2.decompress(data) - batch = pickle.loads(decompressed_pickle) - else: - batch = pickle.loads(data) - - batch_length = len(batch) - - # Store batch in redis (push to queue) - redis_connection.rpush("batch-" + agent.ip_address, decompressed_pickle) - - # Limit the number of batches to REDIS_MAX_BATCH - redis_connection.ltrim("batch-" + agent.ip_address, - int(REDIS_MAX_BATCH), -1) - - print("Received batch from " + agent.ip_address + " containing " + str( - batch_length) + " system calls with a size of " + str( - sys.getsizeof(data)) + " bytes") - - # Store stats in redis - redis_connection.incrby("monitoring_total_syscalls", batch_length) - redis_connection.incrby("monitoring_total_size", sys.getsizeof(data)) - redis_connection.incrby("monitoring_total_batches", 1) - redis_connection.set("monitoring_batch_length", batch_length) - except (KeyboardInterrupt, Exception): - # traceback.print_exc() - stop_monitoring() - os._exit(1) + while True: + try: # retry in case the agent is not ready + async with websockets.connect("ws://" + agent.ip_address + ":" + str(agent.ws_port), + max_size=agent.ws_max_size, ping_interval=None) as websocket: + try: + print("Receiving system calls from node \"" + agent.name + "\" (" + agent.ip_address + ")") + async for data in websocket: + # Handle the system calls received + if agent.syscalls_compression == "true": + decompressed_pickle = blosc2.decompress(data) + batch = pickle.loads(decompressed_pickle) + else: + batch = pickle.loads(data) + + batch_length = len(batch) + + # Store batch in redis (push to queue) + redis_connection.rpush("batch-" + agent.ip_address, decompressed_pickle) + + # Limit the number of batches to REDIS_MAX_BATCH + redis_connection.ltrim("batch-" + agent.ip_address, - int(REDIS_MAX_BATCH), -1) + + print("Received batch from " + agent.ip_address + " containing " + str( + batch_length) + " system calls with a size of " + str( + sys.getsizeof(data)) + " bytes") + + # Store stats in redis + redis_connection.incrby("monitoring_total_syscalls", batch_length) + redis_connection.incrby("monitoring_total_size", sys.getsizeof(data)) + redis_connection.incrby("monitoring_total_batches", 1) + redis_connection.set("monitoring_batch_length", batch_length) + except (KeyboardInterrupt, Exception): + # traceback.print_exc() + stop_monitoring() + os._exit(1) + except: + pass def ws_client(agent):