diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 0000000..788842e --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,16 @@ +{ + // Use IntelliSense to learn about possible attributes. + // Hover to view descriptions of existing attributes. + // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 + "version": "0.2.0", + "configurations": [ + { + "name": "Python: Module", + "type": "python", + "request": "launch", + "module": "consulk8s", + "args": ["write-ingresses", "--default-ip=192.168.6.1", "--consul-sink-url=192.168.6.91:80", "--consul-sink-domain=.bas", "--host-as-name","--skip-checks"], + + } + ] +} \ No newline at end of file diff --git a/consulk8s.py b/consulk8s.py index fa22c76..45aaee6 100644 --- a/consulk8s.py +++ b/consulk8s.py @@ -1,17 +1,26 @@ import sys import json import subprocess +import yaml from collections import OrderedDict import click import kubernetes +from kubernetes.client.rest import ApiException +import requests +import re DEFAULT_CONSUL_URL = 'http://localhost:8500' DEFAULT_INTERVAL = '30s' DEFAULT_CHECK_IP = '127.0.0.1' +DEFAULT_CONSUL_SINK_URL = '127.0.0.1:8500' +DEFAULT_CONSUL_SINK_DOMAIN = '.consul' +DEFAULT_CONSUL_SINK_PATH = '/v1/agent/service/register' DEFAULT_SVC_FILE = '/etc/consul.d/consulk8s_services.json' +DEFAULT_BACKEND_PORT = 80 +yaml.warnings({'YAMLLoadWarning': False}) @click.group() @click.option('--k8s-config', '-k', default=None, metavar='PATH', @@ -29,49 +38,90 @@ def cli(k8s_config, k8s_context): @click.option('--default-ip', '--check-ip', default=DEFAULT_CHECK_IP, metavar='IP', help='Default Ingress IP (default: {})'.format(DEFAULT_CHECK_IP)) +@click.option('--consul-sink-url', '-c', + default=None, metavar='STRING', + help='Consul Sink url to upload services to (default: {})'.format(DEFAULT_CONSUL_SINK_URL)) +@click.option('--consul-sink-domain', '-d', + default=DEFAULT_CONSUL_SINK_DOMAIN, metavar='STRING', + help='Consul Sink domain, used to upload services to (default: {})'.format(DEFAULT_CONSUL_SINK_DOMAIN)) +@click.option('--consul-sink-path', default=DEFAULT_CONSUL_SINK_PATH, metavar='PATH', + help='Path on Consul Sink (default: {})'.format(DEFAULT_CONSUL_SINK_PATH)) +@click.option('--host-as-name', '-h', default=False, is_flag=True, metavar='BOOL', type=click.BOOL, + help='Use the ingress host as service name to help dns query (default: False)') +@click.option('--verbose', '-v', default=False, is_flag=True, metavar='BOOL', type=click.BOOL, + help='Show output (default: False)') +@click.option('--skip-checks', default=False, is_flag=True, metavar='BOOL', type=click.BOOL, + help='Skip checks (default: False)') @click.option('--check-interval', '-i', default='30s', metavar='INTERVAL', help='HTTP check interval (default: {})'.format(DEFAULT_INTERVAL)) @click.option('--code-when-changed', default=0, metavar='NUM', type=click.INT, help='Exit code to return when services file is changed') @click.option('--change-command', '-C', default=None, metavar='CMD', help='Command to run if service file is changed') -def write_ingresses(service_file, default_ip, check_interval, code_when_changed, + +def write_ingresses(service_file, default_ip, consul_sink_url, consul_sink_domain, consul_sink_path, host_as_name, verbose, skip_checks, check_interval, code_when_changed, change_command): + services = [] ingresses = get_k8s_ingresses() - services = k8s_ingresses_as_services(ingresses, default_ip=default_ip, - interval=check_interval) - try: - click.echo('Reading {}'.format(service_file)) - with open(service_file, 'r') as f: - current_json = f.read() - except FileNotFoundError: - current_json = None - data = {'services': services} - json_to_write = json.dumps(data, indent=2) + '\n' - if json_to_write != current_json: - click.echo('Writing {}...'.format(service_file)) - with open(service_file, 'w') as f: - f.write(json_to_write) - click.echo('Done!') - if change_command is not None: - click.echo('Running: {}...'.format(change_command)) - result = subprocess.run(change_command, shell=True, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - click.echo(result.stdout, nl=False) - click.echo(result.stderr, err=True, nl=False) - sys.exit(code_when_changed) - else: - click.echo('No changes') - sys.exit(0) + services += k8s_ingresses_as_services(ingresses, default_ip=default_ip, interval=check_interval, host_as_name=host_as_name, consul_sink_domain=consul_sink_domain) + + ingress_routes = get_k8s_ingress_routes() + services += k8s_ingresses_as_services(ingress_routes, default_ip=default_ip, interval=check_interval, host_as_name=host_as_name, consul_sink_domain=consul_sink_domain) + + + if consul_sink_url: + try: + return_status_code = put_services(services, consul_sink_url=consul_sink_url, consul_sink_domain=consul_sink_domain, consul_sink_path=consul_sink_path, code_when_changed=code_when_changed, change_command=change_command, verbose=verbose, skip_checks=skip_checks) + if return_status_code != 200: + click.echo('HTTP Error {}'.format(return_status_code)) + except Exception as error: + click.echo('An exception occurred: {}'.format(error)) + pass + else: + try: + click.echo('Reading {}'.format(service_file)) + with open(service_file, 'r') as f: + current_json = f.read() + except FileNotFoundError: + current_json = None + if skip_checks: + for service in services: + del service['checks'] + data = {'services': services} + json_to_write = json.dumps(data, indent=2) + '\n' + if verbose: + click.echo(json_to_write) + if json_to_write != current_json: + click.echo('Writing {}...'.format(service_file)) + with open(service_file, 'w') as f: + f.write(json_to_write) + click.echo('Done!') + exec_change_command(change_command=change_command, code_when_changed=code_when_changed) + else: + click.echo('No changes') + sys.exit(0) def get_k8s_ingresses(): k8s = kubernetes.client.ExtensionsV1beta1Api() return k8s.list_ingress_for_all_namespaces().items +def get_k8s_ingress_routes(): + crd_name = 'ingressroutes.traefik.containo.us' + crd_group = 'traefik.containo.us' + crd_version = 'v1alpha1' + crd_plural = 'ingressroutes' + ingress_routes = [] + try: + custom_api_instance = kubernetes.client.CustomObjectsApi() + api_response = custom_api_instance.list_cluster_custom_object(group=crd_group, version=crd_version, plural=crd_plural) + ingress_routes = api_response['items'] + except ApiException: + print("No resource %s found\n" % crd_name) + return ingress_routes + -def k8s_ingresses_as_services(ingresses, default_ip, interval): +def k8s_ingresses_as_services(ingresses, default_ip, interval, host_as_name, consul_sink_domain): """ Build a dict of Consul Service definitions based on k8s ingress resources. @@ -90,19 +140,49 @@ def k8s_ingresses_as_services(ingresses, default_ip, interval): """ services = [] for ingress in ingresses: - ingress_name = '{}/{}'.format(ingress.metadata.namespace, - ingress.metadata.name) - ann = ingress.metadata.annotations + useObject = False + if type(ingress) is dict: + useObject = True + if useObject: + ingress_name = '{}/{}'.format(ingress['metadata']['namespace'], ingress['metadata']['name']) + ann = ingress['metadata']['annotations'] + else: + ingress_name = '{}/{}'.format(ingress.metadata.namespace, ingress.metadata.name) + ann = ingress.metadata.annotations name = ann.get('consulk8s/service') if ann is not None else None + if host_as_name: + try: + def rreplace(s, old, new, occurrence=1): + li = s.rsplit(old, occurrence) + return new.join(li) + if useObject: + pattern = "\(\`(.+\.*)\..*\`" + name = re.findall(pattern, ingress['spec']['routes'][0]['match'])[0] + else: + name = rreplace(ingress.spec.rules[0].host, consul_sink_domain, '') + except (KeyError, IndexError): + click.echo('Ingress "{}" has no host!'.format(ingress_name), + err=True) + sys.exit(1) if name is None or not name: - continue - + if ingress_name: + name = ingress_name + else: + continue ip = ann.get('consulk8s/address') if ip is None: - status = ingress.status.load_balancer.ingress[0] - ip = status.ip or default_ip + if not useObject and ingress.status.load_balancer.ingress: + status = ingress.status.load_balancer.ingress[0] + ip = status.ip or default_ip + else: + ip = default_ip - port_ = ann.get('consulk8s/port', 80) + port_ = ann.get('consulk8s/port') + if port_ == None: + if useObject: + port_ = DEFAULT_BACKEND_PORT + else: + port_ = ingress.spec.rules[0].http.paths[0].backend.service_port if hasattr(ingress.spec.rules[0], "http") and type(ingress.spec.rules[0].http.paths[0].backend.service_port) == type(int(1)) else DEFAULT_BACKEND_PORT try: port = int(port_) except ValueError: @@ -113,7 +193,11 @@ def k8s_ingresses_as_services(ingresses, default_ip, interval): check_host = ann.get('consulk8s/check_host') if check_host is None: try: - check_host = ingress.spec.rules[0].host + if useObject: + pattern = "\(\`(.+).*\`\) " + check_host = re.findall(pattern, ingress['spec']['routes'][0]['match'])[0] + else: + check_host = ingress.spec.rules[0].host except (KeyError, IndexError): click.echo('Ingress "{}" has no host!'.format(ingress_name), err=True) @@ -146,6 +230,40 @@ def k8s_ingresses_as_services(ingresses, default_ip, interval): ))) return services +def exec_change_command(change_command, code_when_changed): + if change_command is not None: + click.echo('Running: {}...'.format(change_command)) + result = subprocess.run(change_command, shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + click.echo(result.stdout, nl=False) + click.echo(result.stderr, err=True, nl=False) + sys.exit(code_when_changed) + +def put_services(services, consul_sink_url, consul_sink_domain, consul_sink_path, code_when_changed, change_command, verbose, skip_checks): + click.echo('Putting to {}...'.format(consul_sink_url+consul_sink_path)) + port = consul_sink_url.rsplit(':',1) + put_scheme = 'https://' if port == 443 else 'http://' + + for service in services: + del service['id'] + if skip_checks: + del service['checks'] + if service['Tags'] if 'Tags' in service else False: + service['Tags'].extend(['k8s', 'k8s-ingress']) + else: + service['Tags'] = ['k8s', 'k8s-ingress'] + json_to_put = json.dumps(service, indent=4) + '\n' + if verbose: + click.echo(json_to_put) + response = requests.put(put_scheme+consul_sink_url+consul_sink_path, data =json_to_put) + if response.status_code != 200: + break + click.echo('Completed Put') + + exec_change_command(change_command=change_command, code_when_changed=code_when_changed) + return response.status_code + if __name__ == '__main__': cli() diff --git a/setup.py b/setup.py index e5c5714..98b41ac 100644 --- a/setup.py +++ b/setup.py @@ -24,7 +24,9 @@ }, install_requires=[ 'click>=7,<8', - 'kubernetes>=8,<9' + 'kubernetes>=8,<9', + 'requests>=2,<3', + 'pyyaml==5.4.1' ], entry_points={ 'console_scripts': [