From 5ca9ec27d33cac62880665de1176ef2b19c1275f Mon Sep 17 00:00:00 2001 From: Sebastiaan Raven Date: Fri, 4 Sep 2020 17:29:07 +0200 Subject: [PATCH 1/5] Now works with the ingress's own name --- consulk8s.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/consulk8s.py b/consulk8s.py index fa22c76..94a3e39 100644 --- a/consulk8s.py +++ b/consulk8s.py @@ -95,12 +95,17 @@ def k8s_ingresses_as_services(ingresses, default_ip, interval): ann = ingress.metadata.annotations name = ann.get('consulk8s/service') if ann is not None else None if name is None or not name: - continue - + if ingress_name: + name = ingress_name + else: + continue ip = ann.get('consulk8s/address') if ip is None: - status = ingress.status.load_balancer.ingress[0] - ip = status.ip or default_ip + if ingress.status.load_balancer.ingress: + status = ingress.status.load_balancer.ingress[0] + ip = status.ip or default_ip + else: + ip = default_ip port_ = ann.get('consulk8s/port', 80) try: From bdc6a41c37c82acde0689f783072c549654199a5 Mon Sep 17 00:00:00 2001 From: Sebastiaan Raven Date: Wed, 9 Sep 2020 07:55:47 +0200 Subject: [PATCH 2/5] Now with working consul sink --- consulk8s.py | 132 ++++++++++++++++++++++++++++++++++++++++----------- setup.py | 3 +- 2 files changed, 105 insertions(+), 30 deletions(-) diff --git a/consulk8s.py b/consulk8s.py index 94a3e39..e71345a 100644 --- a/consulk8s.py +++ b/consulk8s.py @@ -1,17 +1,24 @@ import sys import json import subprocess +import yaml from collections import OrderedDict import click import kubernetes +import requests DEFAULT_CONSUL_URL = 'http://localhost:8500' DEFAULT_INTERVAL = '30s' DEFAULT_CHECK_IP = '127.0.0.1' +DEFAULT_CONSUL_SINK_URL = '127.0.0.1:8500' +DEFAULT_CONSUL_SINK_DOMAIN = '.consul' +DEFAULT_CONSUL_SINK_PATH = '/v1/agent/service/register' DEFAULT_SVC_FILE = '/etc/consul.d/consulk8s_services.json' +DEFAULT_BACKEND_PORT = 80 +yaml.warnings({'YAMLLoadWarning': False}) @click.group() @click.option('--k8s-config', '-k', default=None, metavar='PATH', @@ -29,49 +36,70 @@ def cli(k8s_config, k8s_context): @click.option('--default-ip', '--check-ip', default=DEFAULT_CHECK_IP, metavar='IP', help='Default Ingress IP (default: {})'.format(DEFAULT_CHECK_IP)) +@click.option('--consul-sink-url', '-c', + default=None, metavar='STRING', + help='Consul Sink url to upload services to (default: {})'.format(DEFAULT_CONSUL_SINK_URL)) +@click.option('--consul-sink-domain', '-d', + default=DEFAULT_CONSUL_SINK_DOMAIN, metavar='STRING', + help='Consul Sink domain, used to upload services to (default: {})'.format(DEFAULT_CONSUL_SINK_DOMAIN)) +@click.option('--consul-sink-path', default=DEFAULT_CONSUL_SINK_PATH, metavar='PATH', + help='Path on Consul Sink (default: {})'.format(DEFAULT_CONSUL_SINK_PATH)) +@click.option('--host-as-name', '-h', default=False, is_flag=True, metavar='BOOL', type=click.BOOL, + help='Use the ingress host as service name to help dns query (default: False)') +@click.option('--verbose', '-v', default=False, is_flag=True, metavar='BOOL', type=click.BOOL, + help='Show output (default: False)') +@click.option('--skip-checks', default=False, is_flag=True, metavar='BOOL', type=click.BOOL, + help='Skip checks (default: False)') @click.option('--check-interval', '-i', default='30s', metavar='INTERVAL', help='HTTP check interval (default: {})'.format(DEFAULT_INTERVAL)) @click.option('--code-when-changed', default=0, metavar='NUM', type=click.INT, help='Exit code to return when services file is changed') @click.option('--change-command', '-C', default=None, metavar='CMD', help='Command to run if service file is changed') -def write_ingresses(service_file, default_ip, check_interval, code_when_changed, + +def write_ingresses(service_file, default_ip, consul_sink_url, consul_sink_domain, consul_sink_path, host_as_name, verbose, skip_checks, check_interval, code_when_changed, change_command): ingresses = get_k8s_ingresses() - services = k8s_ingresses_as_services(ingresses, default_ip=default_ip, - interval=check_interval) - try: - click.echo('Reading {}'.format(service_file)) - with open(service_file, 'r') as f: - current_json = f.read() - except FileNotFoundError: - current_json = None - data = {'services': services} - json_to_write = json.dumps(data, indent=2) + '\n' - if json_to_write != current_json: - click.echo('Writing {}...'.format(service_file)) - with open(service_file, 'w') as f: - f.write(json_to_write) - click.echo('Done!') - if change_command is not None: - click.echo('Running: {}...'.format(change_command)) - result = subprocess.run(change_command, shell=True, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - click.echo(result.stdout, nl=False) - click.echo(result.stderr, err=True, nl=False) - sys.exit(code_when_changed) - else: - click.echo('No changes') - sys.exit(0) + services = k8s_ingresses_as_services(ingresses, default_ip=default_ip, interval=check_interval, host_as_name=host_as_name, consul_sink_domain=consul_sink_domain) + if consul_sink_url: + try: + return_status_code = put_services(services, consul_sink_url=consul_sink_url, consul_sink_domain=consul_sink_domain, consul_sink_path=consul_sink_path, code_when_changed=code_when_changed, change_command=change_command, verbose=verbose, skip_checks=skip_checks) + if return_status_code != 200: + click.echo('HTTP Error {}'.format(return_status_code)) + except Exception as error: + click.echo('An exception occurred: {}'.format(error)) + pass + else: + try: + click.echo('Reading {}'.format(service_file)) + with open(service_file, 'r') as f: + current_json = f.read() + except FileNotFoundError: + current_json = None + if skip_checks: + for service in services: + del service['checks'] + data = {'services': services} + json_to_write = json.dumps(data, indent=2) + '\n' + if verbose: + click.echo(json_to_write) + if json_to_write != current_json: + click.echo('Writing {}...'.format(service_file)) + with open(service_file, 'w') as f: + f.write(json_to_write) + click.echo('Done!') + exec_change_command(change_command=change_command, code_when_changed=code_when_changed) + else: + click.echo('No changes') + sys.exit(0) def get_k8s_ingresses(): k8s = kubernetes.client.ExtensionsV1beta1Api() return k8s.list_ingress_for_all_namespaces().items -def k8s_ingresses_as_services(ingresses, default_ip, interval): +def k8s_ingresses_as_services(ingresses, default_ip, interval, host_as_name, consul_sink_domain): """ Build a dict of Consul Service definitions based on k8s ingress resources. @@ -94,6 +122,16 @@ def k8s_ingresses_as_services(ingresses, default_ip, interval): ingress.metadata.name) ann = ingress.metadata.annotations name = ann.get('consulk8s/service') if ann is not None else None + if host_as_name: + try: + def rreplace(s, old, new, occurrence=1): + li = s.rsplit(old, occurrence) + return new.join(li) + name = rreplace(ingress.spec.rules[0].host, consul_sink_domain, '') + except (KeyError, IndexError): + click.echo('Ingress "{}" has no host!'.format(ingress_name), + err=True) + sys.exit(1) if name is None or not name: if ingress_name: name = ingress_name @@ -107,7 +145,9 @@ def k8s_ingresses_as_services(ingresses, default_ip, interval): else: ip = default_ip - port_ = ann.get('consulk8s/port', 80) + port_ = ann.get('consulk8s/port') + if port_ == None: + port_ = ingress.spec.rules[0].http.paths[0].backend.service_port if hasattr(ingress.spec.rules[0], "http") and type(ingress.spec.rules[0].http.paths[0].backend.service_port) == type(int(1)) else DEFAULT_BACKEND_PORT try: port = int(port_) except ValueError: @@ -151,6 +191,40 @@ def k8s_ingresses_as_services(ingresses, default_ip, interval): ))) return services +def exec_change_command(change_command, code_when_changed): + if change_command is not None: + click.echo('Running: {}...'.format(change_command)) + result = subprocess.run(change_command, shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + click.echo(result.stdout, nl=False) + click.echo(result.stderr, err=True, nl=False) + sys.exit(code_when_changed) + +def put_services(services, consul_sink_url, consul_sink_domain, consul_sink_path, code_when_changed, change_command, verbose, skip_checks): + click.echo('Putting to {}...'.format(consul_sink_url+consul_sink_path)) + port = consul_sink_url.rsplit(':',1) + put_scheme = 'https://' if port == 443 else 'http://' + + for service in services: + del service['id'] + if skip_checks: + del service['checks'] + if service['Tags'] if 'Tags' in service else False: + service['Tags'].extend(['k8s', 'k8s-ingress']) + else: + service['Tags'] = ['k8s', 'k8s-ingress'] + json_to_put = json.dumps(service, indent=4) + '\n' + if verbose: + click.echo(json_to_put) + response = requests.put(put_scheme+consul_sink_url+consul_sink_path, data =json_to_put) + if response.status_code != 200: + break + click.echo('Completed Put') + + exec_change_command(change_command=change_command, code_when_changed=code_when_changed) + return response.status_code + if __name__ == '__main__': cli() diff --git a/setup.py b/setup.py index e5c5714..0631469 100644 --- a/setup.py +++ b/setup.py @@ -24,7 +24,8 @@ }, install_requires=[ 'click>=7,<8', - 'kubernetes>=8,<9' + 'kubernetes>=8,<9', + 'requests>=2,<3' ], entry_points={ 'console_scripts': [ From ad258773862d12b4d22919c0f0393fff5e9b744b Mon Sep 17 00:00:00 2001 From: Sebastiaan Raven Date: Fri, 19 Mar 2021 21:56:18 +0100 Subject: [PATCH 3/5] Added support for traefik --- consulk8s.py | 55 ++++++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 47 insertions(+), 8 deletions(-) diff --git a/consulk8s.py b/consulk8s.py index e71345a..67f1aae 100644 --- a/consulk8s.py +++ b/consulk8s.py @@ -7,7 +7,9 @@ import click import kubernetes +from kubernetes.client.rest import ApiException import requests +import re DEFAULT_CONSUL_URL = 'http://localhost:8500' DEFAULT_INTERVAL = '30s' @@ -59,8 +61,14 @@ def cli(k8s_config, k8s_context): def write_ingresses(service_file, default_ip, consul_sink_url, consul_sink_domain, consul_sink_path, host_as_name, verbose, skip_checks, check_interval, code_when_changed, change_command): + services = [] ingresses = get_k8s_ingresses() - services = k8s_ingresses_as_services(ingresses, default_ip=default_ip, interval=check_interval, host_as_name=host_as_name, consul_sink_domain=consul_sink_domain) + services += k8s_ingresses_as_services(ingresses, default_ip=default_ip, interval=check_interval, host_as_name=host_as_name, consul_sink_domain=consul_sink_domain) + + ingress_routes = get_k8s_ingress_routes() + services += k8s_ingresses_as_services(ingress_routes, default_ip=default_ip, interval=check_interval, host_as_name=host_as_name, consul_sink_domain=consul_sink_domain) + + if consul_sink_url: try: return_status_code = put_services(services, consul_sink_url=consul_sink_url, consul_sink_domain=consul_sink_domain, consul_sink_path=consul_sink_path, code_when_changed=code_when_changed, change_command=change_command, verbose=verbose, skip_checks=skip_checks) @@ -98,6 +106,20 @@ def get_k8s_ingresses(): k8s = kubernetes.client.ExtensionsV1beta1Api() return k8s.list_ingress_for_all_namespaces().items +def get_k8s_ingress_routes(): + crd_name = 'ingressroutes.traefik.containo.us' + crd_group = 'traefik.containo.us' + crd_version = 'v1alpha1' + crd_plural = 'ingressroutes' + ingress_routes = [] + try: + custom_api_instance = kubernetes.client.CustomObjectsApi() + api_response = custom_api_instance.list_cluster_custom_object(group=crd_group, version=crd_version, plural=crd_plural) + ingress_routes = api_response['items'] + except ApiException: + print("No resource %s found\n" % crd_name) + return ingress_routes + def k8s_ingresses_as_services(ingresses, default_ip, interval, host_as_name, consul_sink_domain): """ @@ -118,16 +140,26 @@ def k8s_ingresses_as_services(ingresses, default_ip, interval, host_as_name, con """ services = [] for ingress in ingresses: - ingress_name = '{}/{}'.format(ingress.metadata.namespace, - ingress.metadata.name) - ann = ingress.metadata.annotations + useObject = False + if type(ingress) is dict: + useObject = True + if useObject: + ingress_name = '{}/{}'.format(ingress['metadata']['namespace'], ingress['metadata']['name']) + ann = ingress['metadata']['annotations'] + else: + ingress_name = '{}/{}'.format(ingress.metadata.namespace, ingress.metadata.name) + ann = ingress.metadata.annotations name = ann.get('consulk8s/service') if ann is not None else None if host_as_name: try: def rreplace(s, old, new, occurrence=1): li = s.rsplit(old, occurrence) return new.join(li) - name = rreplace(ingress.spec.rules[0].host, consul_sink_domain, '') + if useObject: + pattern = "\(\`(.+)\..*\`\) " + name = re.findall(pattern, ingress['spec']['routes'][0]['match'])[0] + else: + name = rreplace(ingress.spec.rules[0].host, consul_sink_domain, '') except (KeyError, IndexError): click.echo('Ingress "{}" has no host!'.format(ingress_name), err=True) @@ -139,7 +171,7 @@ def rreplace(s, old, new, occurrence=1): continue ip = ann.get('consulk8s/address') if ip is None: - if ingress.status.load_balancer.ingress: + if not useObject and ingress.status.load_balancer.ingress: status = ingress.status.load_balancer.ingress[0] ip = status.ip or default_ip else: @@ -147,7 +179,10 @@ def rreplace(s, old, new, occurrence=1): port_ = ann.get('consulk8s/port') if port_ == None: - port_ = ingress.spec.rules[0].http.paths[0].backend.service_port if hasattr(ingress.spec.rules[0], "http") and type(ingress.spec.rules[0].http.paths[0].backend.service_port) == type(int(1)) else DEFAULT_BACKEND_PORT + if useObject: + port_ = DEFAULT_BACKEND_PORT + else: + port_ = ingress.spec.rules[0].http.paths[0].backend.service_port if hasattr(ingress.spec.rules[0], "http") and type(ingress.spec.rules[0].http.paths[0].backend.service_port) == type(int(1)) else DEFAULT_BACKEND_PORT try: port = int(port_) except ValueError: @@ -158,7 +193,11 @@ def rreplace(s, old, new, occurrence=1): check_host = ann.get('consulk8s/check_host') if check_host is None: try: - check_host = ingress.spec.rules[0].host + if useObject: + pattern = "\(\`(.+).*\`\) " + check_host = re.findall(pattern, ingress['spec']['routes'][0]['match'])[0] + else: + check_host = ingress.spec.rules[0].host except (KeyError, IndexError): click.echo('Ingress "{}" has no host!'.format(ingress_name), err=True) From 01f94ce1923256a6296532340660bc85ac4767f6 Mon Sep 17 00:00:00 2001 From: Sebastiaan Raven Date: Sun, 4 Apr 2021 16:08:05 +0200 Subject: [PATCH 4/5] Regex fix --- consulk8s.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/consulk8s.py b/consulk8s.py index 67f1aae..45aaee6 100644 --- a/consulk8s.py +++ b/consulk8s.py @@ -156,7 +156,7 @@ def rreplace(s, old, new, occurrence=1): li = s.rsplit(old, occurrence) return new.join(li) if useObject: - pattern = "\(\`(.+)\..*\`\) " + pattern = "\(\`(.+\.*)\..*\`" name = re.findall(pattern, ingress['spec']['routes'][0]['match'])[0] else: name = rreplace(ingress.spec.rules[0].host, consul_sink_domain, '') From 0cf893d029d87ea37dcddf55e1e3f894ca813825 Mon Sep 17 00:00:00 2001 From: Sebastiaan Raven Date: Mon, 22 Nov 2021 21:57:47 +0100 Subject: [PATCH 5/5] - Small version fix to make it all work again --- .vscode/launch.json | 16 ++++++++++++++++ setup.py | 3 ++- 2 files changed, 18 insertions(+), 1 deletion(-) create mode 100644 .vscode/launch.json diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 0000000..788842e --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,16 @@ +{ + // Use IntelliSense to learn about possible attributes. + // Hover to view descriptions of existing attributes. + // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 + "version": "0.2.0", + "configurations": [ + { + "name": "Python: Module", + "type": "python", + "request": "launch", + "module": "consulk8s", + "args": ["write-ingresses", "--default-ip=192.168.6.1", "--consul-sink-url=192.168.6.91:80", "--consul-sink-domain=.bas", "--host-as-name","--skip-checks"], + + } + ] +} \ No newline at end of file diff --git a/setup.py b/setup.py index 0631469..98b41ac 100644 --- a/setup.py +++ b/setup.py @@ -25,7 +25,8 @@ install_requires=[ 'click>=7,<8', 'kubernetes>=8,<9', - 'requests>=2,<3' + 'requests>=2,<3', + 'pyyaml==5.4.1' ], entry_points={ 'console_scripts': [