From 27ed162b089f26b31c353c8ca17c30991c33f2ff Mon Sep 17 00:00:00 2001 From: Abhishek Krishna Date: Wed, 4 Feb 2026 15:41:29 +0530 Subject: [PATCH 01/13] Replace docker-compose with official Docker SDK --- .semaphore/semaphore.yml | 2 +- confluent/docker_utils/__init__.py | 371 +++++++++++++++-------------- confluent/docker_utils/compose.py | 321 +++++++++++++++++++++++++ requirements.txt | 9 +- tox.ini | 6 +- 5 files changed, 528 insertions(+), 181 deletions(-) create mode 100644 confluent/docker_utils/compose.py diff --git a/.semaphore/semaphore.yml b/.semaphore/semaphore.yml index 2d361f4..b38845f 100644 --- a/.semaphore/semaphore.yml +++ b/.semaphore/semaphore.yml @@ -9,7 +9,7 @@ global_job_config: prologue: commands: - checkout - - sem-version python 3.9 + - sem-version python 3.14 - pip install tox - COMMIT_MESSAGE_PREFIX="[ci skip] Publish version" blocks: diff --git a/confluent/docker_utils/__init__.py b/confluent/docker_utils/__init__.py index 1176be0..0f7583d 100644 --- a/confluent/docker_utils/__init__.py +++ b/confluent/docker_utils/__init__.py @@ -1,6 +1,23 @@ +"""Confluent Docker Utilities.""" + import base64 import os import subprocess +from typing import Dict, Optional + +import docker + +from .compose import ( + ComposeConfig, ComposeContainer, ComposeProject, ComposeService, create_docker_client, + STATE_KEY, EXIT_CODE_KEY, STATUS_RUNNING, STATUS_EXITED +) + +# Host config keys (for backward compatibility) +HOST_CONFIG_NETWORK_MODE = "NetworkMode" +HOST_CONFIG_BINDS = "Binds" + +# Testing label +TESTING_LABEL = "io.confluent.docker.testing" try: import boto3 @@ -8,207 +25,215 @@ except ImportError: HAS_BOTO3 = False -import docker -from compose.config.config import ConfigDetails, ConfigFile, load -from compose.container import Container -from compose.project import Project -from compose.service import ImageType -from compose.cli.docker_client import docker_client -from compose.config.environment import Environment - -def api_client(): - return docker.from_env().api +def api_client() -> docker.DockerClient: + """Get Docker client.""" + return docker.from_env() def ecr_login(): + """Login to AWS ECR.""" if not HAS_BOTO3: - raise ImportError( - "boto3 is required for ECR login. " - "Install with: pip install boto3" - ) - # see docker/docker-py#1677 + raise ImportError("boto3 required for ECR login") + ecr = boto3.client('ecr') - login = ecr.get_authorization_token() - b64token = login['authorizationData'][0]['authorizationToken'].encode('utf-8') - username, password = base64.b64decode(b64token).decode('utf-8').split(':') - registry = login['authorizationData'][0]['proxyEndpoint'] - client = docker.from_env() - client.login(username, password, registry=registry) - - -def build_image(image_name, dockerfile_dir): - print("Building image %s from %s" % (image_name, dockerfile_dir)) - client = api_client() - output = client.build(dockerfile_dir, rm=True, tag=image_name) - response = "".join([" %s" % (line,) for line in output]) - print(response) - - -def image_exists(image_name): - client = api_client() - tags = [t for image in client.images() for t in image['RepoTags'] or []] - return image_name in tags - - -def pull_image(image_name): - client = api_client() + auth = ecr.get_authorization_token()['authorizationData'][0] + token = base64.b64decode(auth['authorizationToken'].encode()).decode() + user, pwd = token.split(':') + docker.from_env().login(user, pwd, registry=auth['proxyEndpoint']) + + +def build_image(image_name: str, dockerfile_dir: str): + """Build Docker image.""" + print(f"Building image {image_name} from {dockerfile_dir}") + _, logs = api_client().images.build(path=dockerfile_dir, rm=True, tag=image_name, decode=True) + for line in logs: + if isinstance(line, dict) and 'stream' in line: + print(f" {line['stream']}", end='') + elif isinstance(line, (bytes, str)): + text = line.decode(errors="ignore") if isinstance(line, bytes) else line + print(f" {text}", end='') + + +def image_exists(image_name: str) -> bool: + """Check if image exists locally.""" + try: + api_client().images.get(image_name) + return True + except docker.errors.ImageNotFound: + return False + + +def pull_image(image_name: str): + """Pull image if not exists.""" if not image_exists(image_name): - client.pull(image_name) + api_client().images.pull(image_name) -def run_docker_command(timeout=None, **kwargs): +def run_docker_command(timeout=None, **kwargs) -> bytes: + """Run command in temporary container.""" pull_image(kwargs["image"]) - client = api_client() - kwargs["labels"] = {"io.confluent.docker.testing": "true"} - container = TestContainer.create(client, **kwargs) - container.start() - container.wait(timeout) - logs = container.logs() - print("Running command %s: %s" % (kwargs["command"], logs)) - container.shutdown() - return logs - - -def path_exists_in_image(image, path): - print("Checking for %s in %s" % (path, image)) - cmd = "bash -c '[ ! -e %s ] || echo success' " % (path,) - output = run_docker_command(image=image, command=cmd) + + cfg = { + 'image': kwargs['image'], + 'command': kwargs.get('command'), + 'labels': {TESTING_LABEL: "true"}, + 'detach': True, + } + + host_cfg = kwargs.get('host_config', {}) + if HOST_CONFIG_NETWORK_MODE in host_cfg: + cfg['network_mode'] = host_cfg[HOST_CONFIG_NETWORK_MODE] + if HOST_CONFIG_BINDS in host_cfg: + cfg['volumes'] = {b.split(':')[0]: {'bind': b.split(':')[1], 'mode': 'rw'} + for b in host_cfg[HOST_CONFIG_BINDS]} + + container = api_client().containers.create(**cfg) + try: + container.start() + container.wait(timeout=timeout) + logs = container.logs() + print(f"Running command {kwargs.get('command')}: {logs}") + return logs + finally: + try: + container.stop() + container.remove() + except Exception: + pass + + +def path_exists_in_image(image: str, path: str) -> bool: + """Check if path exists in image.""" + print(f"Checking for {path} in {image}") + output = run_docker_command(image=image, command=f"bash -c '[ ! -e {path} ] || echo success'") return b"success" in output -def executable_exists_in_image(image, path): - print("Checking for %s in %s" % (path, image)) - cmd = "bash -c '[ ! -x %s ] || echo success' " % (path,) - output = run_docker_command(image=image, command=cmd) +def executable_exists_in_image(image: str, path: str) -> bool: + """Check if executable exists in image.""" + print(f"Checking for {path} in {image}") + output = run_docker_command(image=image, command=f"bash -c '[ ! -x {path} ] || echo success'") return b"success" in output -def run_command_on_host(command): - logs = run_docker_command( - image="busybox", - command=command, - host_config={'NetworkMode': 'host', 'Binds': ['/tmp:/tmp']}) - print("Running command %s: %s" % (command, logs)) - return logs - - -def run_cmd(command): - if command.startswith('"'): - cmd = "bash -c %s" % command - else: - cmd = command - - output = subprocess.check_output(cmd, shell=True, stderr=subprocess.STDOUT) - - return output - - -def add_registry_and_tag(image, scope=""): - """ - Fully qualify an image name. `scope` may be an empty - string, "UPSTREAM" for upstream dependencies, or "TEST" - for test dependencies. The injected values correspond to - DOCKER_(${scope}_)REGISTRY and DOCKER_(${scope}_)TAG environment - variables, which are set up by the Maven build. - - :param str image: Image name, without registry prefix and tag postfix. - :param str scope: - """ - - if scope: - scope += "_" - - return "{0}{1}:{2}".format(os.environ.get("DOCKER_{0}REGISTRY".format(scope), ""), - image, - os.environ.get("DOCKER_{0}TAG".format(scope), "latest") - ) - - -class TestContainer(Container): - - def state(self): - return self.inspect_container["State"] - - def status(self): - return self.state()["Status"] - +def run_command_on_host(command: str) -> bytes: + """Run command on host via busybox.""" + return run_docker_command( + image="busybox", command=command, + host_config={HOST_CONFIG_NETWORK_MODE: 'host', HOST_CONFIG_BINDS: ['/tmp:/tmp']} + ) + + +def run_cmd(command: str) -> bytes: + """Run shell command locally.""" + cmd = f"bash -c {command}" if command.startswith('"') else command + return subprocess.check_output(cmd, shell=True, stderr=subprocess.STDOUT) + + +def add_registry_and_tag(image: str, scope: str = "") -> str: + """Qualify image name with registry and tag from env vars.""" + prefix = f"{scope}_" if scope else "" + registry = os.environ.get(f"DOCKER_{prefix}REGISTRY", "") + tag = os.environ.get(f"DOCKER_{prefix}TAG", "latest") + return f"{registry}{image}:{tag}" + + +class TestContainer(ComposeContainer): + """Container for testing with lifecycle methods.""" + + @classmethod + def create(cls, client, **kwargs) -> 'TestContainer': + cfg = { + 'image': kwargs.get('image'), + 'command': kwargs.get('command'), + 'labels': kwargs.get('labels', {}), + 'detach': True, + } + host_cfg = kwargs.get('host_config', {}) + if HOST_CONFIG_NETWORK_MODE in host_cfg: + cfg['network_mode'] = host_cfg[HOST_CONFIG_NETWORK_MODE] + if HOST_CONFIG_BINDS in host_cfg: + cfg['volumes'] = {b.split(':')[0]: {'bind': b.split(':')[1], 'mode': 'rw'} + for b in host_cfg[HOST_CONFIG_BINDS]} + return cls(client.containers.create(**cfg)) + + def state(self) -> Dict: + self.container.reload() + return self.container.attrs[STATE_KEY] + + def status(self) -> str: + return self.state()['Status'] + def shutdown(self): self.stop() self.remove() - - def execute(self, command): - eid = self.create_exec(command) - return self.start_exec(eid) - - def wait(self, timeout): - return self.client.wait(self.id, timeout) + + def execute(self, command: str) -> bytes: + return self.exec_run(command) -class TestCluster(): - - def __init__(self, name, working_dir, config_file): - config_file_path = os.path.join(working_dir, config_file) - cfg_file = ConfigFile.from_filename(config_file_path) - c = ConfigDetails(working_dir, [cfg_file],) - self.cd = load(c) +class TestCluster: + """Multi-container test cluster.""" + + def __init__(self, name: str, working_dir: str, config_file: str): self.name = name - - def get_project(self): - # Dont reuse the client to fix this bug : https://github.com/docker/compose/issues/1275 - client = docker_client(Environment()) - project = Project.from_config(self.name, self.cd, client) - return project - + self.config = ComposeConfig(working_dir, config_file) + + def get_project(self) -> ComposeProject: + return ComposeProject(self.name, self.config, create_docker_client()) + def start(self): self.shutdown() self.get_project().up() - - def is_running(self): - state = [container.is_running for container in self.get_project().containers()] - return all(state) and len(state) > 0 - - def is_service_running(self, service_name): - return self.get_container(service_name).is_running - + def shutdown(self): - project = self.get_project() - project.down(ImageType.none, True, True) - project.remove_stopped() - - def get_container(self, service_name, stopped=False): + p = self.get_project() + p.down(remove_volumes=True) + p.remove_stopped() + + def is_running(self) -> bool: + containers = self.get_project().containers() + return bool(containers) and all(c.is_running for c in containers) + + def is_service_running(self, service_name: str) -> bool: + try: + return self.get_container(service_name).is_running + except RuntimeError: + return False + + def get_container(self, service_name: str, stopped: bool = False) -> ComposeContainer: + if stopped: + containers = self.get_project().containers([service_name], stopped=True) + if containers: + return containers[0] + raise RuntimeError(f"No container for '{service_name}'") return self.get_project().get_service(service_name).get_container() - - def exit_code(self, service_name): + + def exit_code(self, service_name: str) -> Optional[int]: containers = self.get_project().containers([service_name], stopped=True) - return containers[0].exit_code - - def wait(self, service_name, timeout): - container = self.get_project().containers([service_name], stopped=True) - if container[0].is_running: - return self.get_project().client.wait(container[0].id, timeout) - - def run_command_on_service(self, service_name, command): - return self.run_command(command, self.get_container(service_name)) - - def service_logs(self, service_name, stopped=False): + return containers[0].exit_code if containers else None + + def wait(self, service_name: str, timeout=None): + containers = self.get_project().containers([service_name], stopped=True) + if containers and containers[0].is_running: + return containers[0].wait(timeout) + + def service_logs(self, service_name: str, stopped: bool = False) -> bytes: if stopped: containers = self.get_project().containers([service_name], stopped=True) - print(containers[0].logs()) - return containers[0].logs() - else: - return self.get_container(service_name).logs() - - def run_command(self, command, container): - print("Running %s on %s :" % (command, container)) - eid = container.create_exec(command) - output = container.start_exec(eid) - print("\n%s " % output) + return containers[0].logs() if containers else b'' + return self.get_container(service_name).logs() + + def run_command_on_service(self, service_name: str, command: str) -> bytes: + return self.run_command(command, self.get_container(service_name)) + + def run_command(self, command: str, container: ComposeContainer) -> bytes: + print(f"Running {command} on {container.name}:") + output = container.exec_run(command) + print(f"\n{output.decode('utf-8', errors='ignore') if isinstance(output, bytes) else output}") return output - - def run_command_on_all(self, command): - results = {} - for container in self.get_project().containers(): - results[container.name_without_project] = self.run_command(command, container) - - return results + + def run_command_on_all(self, command: str) -> Dict[str, bytes]: + return {c.name_without_project: self.run_command(command, c) + for c in self.get_project().containers()} diff --git a/confluent/docker_utils/compose.py b/confluent/docker_utils/compose.py new file mode 100644 index 0000000..df05b5f --- /dev/null +++ b/confluent/docker_utils/compose.py @@ -0,0 +1,321 @@ +""" +Docker Compose replacement using official Docker SDK. + +Drop-in replacement for the deprecated docker-compose Python library. +""" + +import os +import re +from typing import Dict, List, Optional, Any, Union + +import yaml +import docker +import docker.errors + + +def expand_env_vars(value: Any) -> Any: + """Expand environment variables in compose config values. + + Supports ${VAR}, ${VAR:-default}, ${VAR-default}, $VAR formats. + """ + if isinstance(value, str): + # Pattern for ${VAR}, ${VAR:-default}, ${VAR-default} + def replace_var(match): + var_expr = match.group(1) + # Handle ${VAR:-default} or ${VAR-default} + if ':-' in var_expr: + var_name, default = var_expr.split(':-', 1) + return os.environ.get(var_name, default) + elif '-' in var_expr and not var_expr.startswith('-'): + var_name, default = var_expr.split('-', 1) + return os.environ.get(var_name) if os.environ.get(var_name) is not None else default + else: + return os.environ.get(var_expr, '') + + # Replace ${VAR} patterns + result = re.sub(r'\$\{([^}]+)\}', replace_var, value) + # Replace $VAR patterns (word boundary) + result = re.sub(r'\$([A-Za-z_][A-Za-z0-9_]*)', lambda m: os.environ.get(m.group(1), ''), result) + return result + elif isinstance(value, dict): + return {k: expand_env_vars(v) for k, v in value.items()} + elif isinstance(value, list): + return [expand_env_vars(v) for v in value] + return value + +# Labels for compose container identification +LABEL_PROJECT = "com.docker.compose.project" +LABEL_SERVICE = "com.docker.compose.service" + +# Container status +STATUS_RUNNING = "running" +STATUS_EXITED = "exited" + +# Container state keys +STATE_KEY = "State" +EXIT_CODE_KEY = "ExitCode" + + +def create_docker_client() -> docker.DockerClient: + """Create Docker client from environment.""" + return docker.from_env() + + +class ComposeConfig: + """Parses and manages docker-compose.yml configuration.""" + + def __init__(self, working_dir: str, config_file: str): + self.working_dir = working_dir + self.config_file_path = os.path.join(working_dir, config_file) + self.config = self._load() + + def _load(self) -> Dict[str, Any]: + with open(self.config_file_path) as f: + config = yaml.safe_load(f) + if not config or 'services' not in config: + raise ValueError("Invalid compose file: missing 'services'") + # Expand environment variables in all config values + return expand_env_vars(config) + + @property + def services(self) -> Dict[str, Dict]: + return self.config.get('services', {}) + + def get_service(self, name: str) -> Dict[str, Any]: + if name not in self.services: + raise ValueError(f"Service '{name}' not found") + return self.services[name] + + +class ComposeContainer: + """Wrapper around Docker SDK container with compose-like interface.""" + + def __init__(self, container: docker.models.containers.Container): + self.container = container + + @property + def id(self) -> str: + return self.container.id + + @property + def name(self) -> str: + return self.container.name + + @property + def name_without_project(self) -> str: + """Service name from container.""" + label_service = self.container.labels.get(LABEL_SERVICE) + if label_service: + return label_service + if '_' in self.name: + return self.name.rsplit('_', 1)[-1] + return self.name + + @property + def is_running(self) -> bool: + self.container.reload() + return self.container.status == STATUS_RUNNING + + @property + def exit_code(self) -> Optional[int]: + self.container.reload() + return self.container.attrs[STATE_KEY][EXIT_CODE_KEY] if self.container.status == STATUS_EXITED else None + + @property + def client(self): + """For backward compatibility.""" + return self.container.client.api + + @property + def inspect_container(self) -> Dict: + """For backward compatibility.""" + self.container.reload() + return self.container.attrs + + def start(self): + self.container.start() + + def stop(self, timeout: int = 10): + try: + self.container.stop(timeout=timeout) + except docker.errors.APIError: + pass + + def remove(self, force: bool = False, v: bool = False): + try: + self.container.remove(force=force, v=v) + except (docker.errors.NotFound, docker.errors.APIError): + pass + + def wait(self, timeout: Optional[int] = None) -> Dict: + return self.container.wait(timeout=timeout) + + def logs(self) -> bytes: + return self.container.logs() + + def create_exec(self, command: str) -> str: + """For backward compatibility.""" + return self.container.client.api.exec_create(self.container.id, command)['Id'] + + def start_exec(self, exec_id: str) -> bytes: + """For backward compatibility.""" + return self.container.client.api.exec_start(exec_id) + + def exec_run(self, command: str) -> bytes: + return self.container.exec_run(command).output + + +class ComposeService: + """Represents a service in the compose project.""" + + def __init__(self, name: str, project: 'ComposeProject'): + self.name = name + self.project = project + + def get_container(self) -> ComposeContainer: + containers = self.project.containers([self.name]) + if not containers: + raise RuntimeError(f"No running container for service '{self.name}'") + return containers[0] + + +class ComposeProject: + """Manages multi-container compose project using Docker SDK.""" + + def __init__(self, name: str, config: ComposeConfig, client: docker.DockerClient): + self.name = name + self.config = config + self.client = client + + def up(self, services: Optional[List[str]] = None): + """Start services.""" + for svc in (services or list(self.config.services.keys())): + self._start_service(svc) + + def down(self, remove_images=None, remove_volumes: bool = False, remove_orphans: bool = False): + """Stop and remove containers.""" + for c in self.containers(stopped=True): + try: + c.stop() + c.remove(force=True, v=remove_volumes) + except (docker.errors.NotFound, docker.errors.APIError): + pass + + def remove_stopped(self): + """Remove stopped containers.""" + for c in self.containers(stopped=True): + if not c.is_running: + c.remove(force=True) + + def containers(self, service_names: Optional[List[str]] = None, stopped: bool = False) -> List[ComposeContainer]: + """Get project containers.""" + filters = {'label': f'{LABEL_PROJECT}={self.name}'} + if not stopped: + filters['status'] = STATUS_RUNNING + + result = [ComposeContainer(c) for c in self.client.containers.list(all=stopped, filters=filters)] + + if service_names: + result = [c for c in result if c.container.labels.get(LABEL_SERVICE) in service_names] + return result + + def get_service(self, name: str) -> ComposeService: + return ComposeService(name, self) + + def _start_service(self, service_name: str) -> ComposeContainer: + """Start a single service.""" + container_name = f"{self.name}_{service_name}_1" + + # Check if exists + try: + existing = self.client.containers.get(container_name) + if existing.status != STATUS_RUNNING: + existing.start() + return ComposeContainer(existing) + except docker.errors.NotFound: + pass + + # Create new + svc_config = self.config.get_service(service_name) + run_config = self._build_config(svc_config) + + # Validate image is set + if 'image' not in run_config or not run_config['image']: + raise ValueError(f"Service '{service_name}' has no valid image specified") + + container = self.client.containers.run( + name=container_name, + detach=True, + labels={LABEL_PROJECT: self.name, LABEL_SERVICE: service_name}, + **run_config + ) + return ComposeContainer(container) + + def _build_config(self, svc: Dict) -> Dict: + """Convert compose service config to Docker SDK format.""" + cfg = {} + + if 'image' in svc: + cfg['image'] = svc['image'] + + if 'command' in svc: + cfg['command'] = svc['command'] + + if 'environment' in svc: + env = svc['environment'] + if isinstance(env, list): + env_dict: Dict[str, str] = {} + for item in env: + if not isinstance(item, str): + continue + if '=' in item: + key, value = item.split('=', 1) + else: + key = item + value = os.environ.get(key, "") + env_dict[key] = value + cfg['environment'] = env_dict + elif isinstance(env, dict): + resolved_env: Dict[str, Any] = {} + for key, value in env.items(): + if value is None: + resolved_env[key] = os.environ.get(key, "") + else: + resolved_env[key] = value + cfg['environment'] = resolved_env + else: + cfg['environment'] = env + + if 'ports' in svc: + ports: Dict[str, Any] = {} + for port_spec in svc['ports']: + port_str = str(port_spec) + parts = port_str.split(':') + if len(parts) == 1: + # Just container port (e.g., "80" or "80/tcp") + ports[parts[0]] = None + elif len(parts) == 2: + # HOST:CONTAINER (e.g., "8080:80") + host_port, container_port = parts + ports[container_port] = int(host_port) if host_port.isdigit() else host_port + elif len(parts) == 3: + # IP:HOST:CONTAINER (e.g., "127.0.0.1:8080:80") + ip, host_port, container_port = parts + ports[container_port] = (ip, int(host_port) if host_port else None) + cfg['ports'] = ports + + if 'volumes' in svc: + cfg['volumes'] = {} + for v in svc['volumes']: + if ':' in v: + parts = v.split(':') + host = parts[0] + if host.startswith('./'): + host = os.path.join(self.config.working_dir, host[2:]) + cfg['volumes'][host] = {'bind': parts[1], 'mode': parts[2] if len(parts) > 2 else 'rw'} + + for key in ['network_mode', 'working_dir', 'hostname', 'entrypoint']: + if key in svc: + cfg[key] = svc[key] + + return cfg diff --git a/requirements.txt b/requirements.txt index b95e24d..14d150a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,5 @@ -docker==7.1.0 -docker-compose==1.29.2 -Jinja2==3.1.6 -requests==2.32.5 +boto3>=1.36.0 +docker>=7.1.0 +Jinja2>=3.1.0 +PyYAML>=6.0.0 +requests>=2.32.0 diff --git a/tox.ini b/tox.ini index 3df3ec5..8897c71 100644 --- a/tox.ini +++ b/tox.ini @@ -1,9 +1,9 @@ [tox] -envlist = py3 +envlist = py314 toxworkdir = {env:HOME:.}/.virtualenvs/docker_utils -[testenv:py3] -basepython = python3 +[testenv:py314] +basepython = python3.14 [testenv] install_command = pip install -U {packages} From 59da7646846b312bbd22d72bc21a80668d765205 Mon Sep 17 00:00:00 2001 From: Abhishek Krishna Date: Wed, 11 Feb 2026 10:58:33 +0530 Subject: [PATCH 02/13] Update docker util --- confluent/docker_utils/compose.py | 34 +++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/confluent/docker_utils/compose.py b/confluent/docker_utils/compose.py index df05b5f..feda91b 100644 --- a/confluent/docker_utils/compose.py +++ b/confluent/docker_utils/compose.py @@ -186,9 +186,31 @@ def __init__(self, name: str, config: ComposeConfig, client: docker.DockerClient self.name = name self.config = config self.client = client + self._network = None + + @property + def network_name(self) -> str: + """Default network name for the project.""" + return f"{self.name}_default" + + def _ensure_network(self): + """Create project network if it doesn't exist.""" + if self._network: + return self._network + + try: + self._network = self.client.networks.get(self.network_name) + except docker.errors.NotFound: + self._network = self.client.networks.create( + self.network_name, + driver="bridge", + labels={LABEL_PROJECT: self.name} + ) + return self._network def up(self, services: Optional[List[str]] = None): """Start services.""" + self._ensure_network() for svc in (services or list(self.config.services.keys())): self._start_service(svc) @@ -200,6 +222,14 @@ def down(self, remove_images=None, remove_volumes: bool = False, remove_orphans: c.remove(force=True, v=remove_volumes) except (docker.errors.NotFound, docker.errors.APIError): pass + + # Remove project network + try: + network = self.client.networks.get(self.network_name) + network.remove() + except (docker.errors.NotFound, docker.errors.APIError): + pass + self._network = None def remove_stopped(self): """Remove stopped containers.""" @@ -243,9 +273,13 @@ def _start_service(self, service_name: str) -> ComposeContainer: if 'image' not in run_config or not run_config['image']: raise ValueError(f"Service '{service_name}' has no valid image specified") + # Use project network for inter-service communication + network = self._ensure_network() + container = self.client.containers.run( name=container_name, detach=True, + network=network.name, labels={LABEL_PROJECT: self.name, LABEL_SERVICE: service_name}, **run_config ) From f85f1d42ad0f1b03c08201986650253b7314e105 Mon Sep 17 00:00:00 2001 From: Abhishek Krishna Date: Wed, 11 Feb 2026 12:34:48 +0530 Subject: [PATCH 03/13] Fix container startup: add error handling and hostname support --- confluent/docker_utils/compose.py | 35 ++++++++++++++++++++++++------- 1 file changed, 27 insertions(+), 8 deletions(-) diff --git a/confluent/docker_utils/compose.py b/confluent/docker_utils/compose.py index feda91b..53d3579 100644 --- a/confluent/docker_utils/compose.py +++ b/confluent/docker_utils/compose.py @@ -276,13 +276,32 @@ def _start_service(self, service_name: str) -> ComposeContainer: # Use project network for inter-service communication network = self._ensure_network() - container = self.client.containers.run( - name=container_name, - detach=True, - network=network.name, - labels={LABEL_PROJECT: self.name, LABEL_SERVICE: service_name}, - **run_config - ) + # Don't pass network if network_mode is set (they conflict) + if 'network_mode' in run_config: + network_kwargs = {} + else: + network_kwargs = {'network': network.name} + # Set hostname if not already in config (for DNS resolution) + if 'hostname' not in run_config: + network_kwargs['hostname'] = service_name + + try: + container = self.client.containers.run( + name=container_name, + detach=True, + labels={LABEL_PROJECT: self.name, LABEL_SERVICE: service_name}, + **network_kwargs, + **run_config + ) + except docker.errors.APIError as e: + raise RuntimeError(f"Failed to start service '{service_name}': {e}") + + # Verify container is running + container.reload() + if container.status != STATUS_RUNNING: + logs = container.logs().decode('utf-8', errors='ignore')[-500:] + raise RuntimeError(f"Service '{service_name}' container exited immediately. Logs:\n{logs}") + return ComposeContainer(container) def _build_config(self, svc: Dict) -> Dict: @@ -348,7 +367,7 @@ def _build_config(self, svc: Dict) -> Dict: host = os.path.join(self.config.working_dir, host[2:]) cfg['volumes'][host] = {'bind': parts[1], 'mode': parts[2] if len(parts) > 2 else 'rw'} - for key in ['network_mode', 'working_dir', 'hostname', 'entrypoint']: + for key in ['network_mode', 'working_dir', 'hostname', 'entrypoint', 'user', 'tty', 'stdin_open']: if key in svc: cfg[key] = svc[key] From 21ad89ec2de877b0e3df83a90eaa0ba71adab367 Mon Sep 17 00:00:00 2001 From: Abhishek Krishna Date: Wed, 11 Feb 2026 15:46:44 +0530 Subject: [PATCH 04/13] Refactor: Apply PEP guidelines and code review feedback --- confluent/docker_utils/__init__.py | 236 ++++++++++------- confluent/docker_utils/compose.py | 405 +++++++++++++++-------------- 2 files changed, 360 insertions(+), 281 deletions(-) diff --git a/confluent/docker_utils/__init__.py b/confluent/docker_utils/__init__.py index 0f7583d..4a8ccb4 100644 --- a/confluent/docker_utils/__init__.py +++ b/confluent/docker_utils/__init__.py @@ -8,55 +8,65 @@ import docker from .compose import ( - ComposeConfig, ComposeContainer, ComposeProject, ComposeService, create_docker_client, - STATE_KEY, EXIT_CODE_KEY, STATUS_RUNNING, STATUS_EXITED + ComposeConfig, + ComposeContainer, + ComposeProject, + ComposeService, + create_docker_client, + STATE_KEY, + STATUS_RUNNING, ) -# Host config keys (for backward compatibility) +# Host config keys for backward compatibility HOST_CONFIG_NETWORK_MODE = "NetworkMode" HOST_CONFIG_BINDS = "Binds" # Testing label TESTING_LABEL = "io.confluent.docker.testing" +# Volume configuration +VOLUME_MODE_RW = "rw" + try: import boto3 - HAS_BOTO3 = True + _HAS_BOTO3 = True except ImportError: - HAS_BOTO3 = False + _HAS_BOTO3 = False def api_client() -> docker.DockerClient: - """Get Docker client.""" + """Return Docker client from environment.""" return docker.from_env() -def ecr_login(): - """Login to AWS ECR.""" - if not HAS_BOTO3: +def ecr_login() -> None: + """Authenticate with AWS ECR.""" + if not _HAS_BOTO3: raise ImportError("boto3 required for ECR login") ecr = boto3.client('ecr') - auth = ecr.get_authorization_token()['authorizationData'][0] - token = base64.b64decode(auth['authorizationToken'].encode()).decode() - user, pwd = token.split(':') - docker.from_env().login(user, pwd, registry=auth['proxyEndpoint']) + auth_data = ecr.get_authorization_token()['authorizationData'][0] + token = base64.b64decode(auth_data['authorizationToken'].encode()).decode() + username, password = token.split(':') + docker.from_env().login(username, password, registry=auth_data['proxyEndpoint']) -def build_image(image_name: str, dockerfile_dir: str): - """Build Docker image.""" +def build_image(image_name: str, dockerfile_dir: str) -> None: + """Build Docker image from Dockerfile directory.""" print(f"Building image {image_name} from {dockerfile_dir}") - _, logs = api_client().images.build(path=dockerfile_dir, rm=True, tag=image_name, decode=True) - for line in logs: - if isinstance(line, dict) and 'stream' in line: - print(f" {line['stream']}", end='') - elif isinstance(line, (bytes, str)): - text = line.decode(errors="ignore") if isinstance(line, bytes) else line + _, build_logs = api_client().images.build( + path=dockerfile_dir, rm=True, tag=image_name, decode=True + ) + for log_line in build_logs: + if isinstance(log_line, dict) and 'stream' in log_line: + print(f" {log_line['stream']}", end='') + elif isinstance(log_line, (bytes, str)): + text = log_line.decode(errors='ignore') if isinstance(log_line, bytes) else log_line print(f" {text}", end='') def image_exists(image_name: str) -> bool: - """Check if image exists locally.""" + """Check if Docker image exists locally.""" try: api_client().images.get(image_name) return True @@ -64,99 +74,123 @@ def image_exists(image_name: str) -> bool: return False -def pull_image(image_name: str): - """Pull image if not exists.""" +def pull_image(image_name: str) -> None: + """Pull Docker image if not present locally.""" if not image_exists(image_name): api_client().images.pull(image_name) -def run_docker_command(timeout=None, **kwargs) -> bytes: - """Run command in temporary container.""" - pull_image(kwargs["image"]) +def run_docker_command(timeout: Optional[int] = None, **kwargs) -> bytes: + """Run command in temporary container and return output.""" + pull_image(kwargs['image']) - cfg = { + container_config = { 'image': kwargs['image'], 'command': kwargs.get('command'), - 'labels': {TESTING_LABEL: "true"}, + 'labels': {TESTING_LABEL: 'true'}, 'detach': True, } - host_cfg = kwargs.get('host_config', {}) - if HOST_CONFIG_NETWORK_MODE in host_cfg: - cfg['network_mode'] = host_cfg[HOST_CONFIG_NETWORK_MODE] - if HOST_CONFIG_BINDS in host_cfg: - cfg['volumes'] = {b.split(':')[0]: {'bind': b.split(':')[1], 'mode': 'rw'} - for b in host_cfg[HOST_CONFIG_BINDS]} + host_config = kwargs.get('host_config', {}) + if HOST_CONFIG_NETWORK_MODE in host_config: + container_config['network_mode'] = host_config[HOST_CONFIG_NETWORK_MODE] + if HOST_CONFIG_BINDS in host_config: + container_config['volumes'] = _parse_binds(host_config[HOST_CONFIG_BINDS]) - container = api_client().containers.create(**cfg) + container = api_client().containers.create(**container_config) try: container.start() container.wait(timeout=timeout) - logs = container.logs() - print(f"Running command {kwargs.get('command')}: {logs}") - return logs + output = container.logs() + print(f"Running command {kwargs.get('command')}: {output}") + return output finally: - try: - container.stop() - container.remove() - except Exception: - pass + _cleanup_container(container) + + +def _parse_binds(binds: list) -> Dict[str, Dict[str, str]]: + """Parse bind mount specifications.""" + return { + bind.split(':')[0]: {'bind': bind.split(':')[1], 'mode': VOLUME_MODE_RW} + for bind in binds + } + + +def _cleanup_container(container) -> None: + """Stop and remove container, ignoring errors.""" + try: + container.stop() + container.remove() + except Exception: + pass def path_exists_in_image(image: str, path: str) -> bool: - """Check if path exists in image.""" + """Check if path exists in Docker image.""" print(f"Checking for {path} in {image}") - output = run_docker_command(image=image, command=f"bash -c '[ ! -e {path} ] || echo success'") - return b"success" in output + output = run_docker_command( + image=image, + command=f"bash -c '[ ! -e {path} ] || echo success'" + ) + return b'success' in output def executable_exists_in_image(image: str, path: str) -> bool: - """Check if executable exists in image.""" + """Check if executable exists in Docker image.""" print(f"Checking for {path} in {image}") - output = run_docker_command(image=image, command=f"bash -c '[ ! -x {path} ] || echo success'") - return b"success" in output + output = run_docker_command( + image=image, + command=f"bash -c '[ ! -x {path} ] || echo success'" + ) + return b'success' in output def run_command_on_host(command: str) -> bytes: - """Run command on host via busybox.""" + """Run command on host via busybox container.""" return run_docker_command( - image="busybox", command=command, - host_config={HOST_CONFIG_NETWORK_MODE: 'host', HOST_CONFIG_BINDS: ['/tmp:/tmp']} + image='busybox', + command=command, + host_config={ + HOST_CONFIG_NETWORK_MODE: 'host', + HOST_CONFIG_BINDS: ['/tmp:/tmp'] + } ) def run_cmd(command: str) -> bytes: """Run shell command locally.""" - cmd = f"bash -c {command}" if command.startswith('"') else command - return subprocess.check_output(cmd, shell=True, stderr=subprocess.STDOUT) + if command.startswith('"'): + command = f'bash -c {command}' + return subprocess.check_output(command, shell=True, stderr=subprocess.STDOUT) -def add_registry_and_tag(image: str, scope: str = "") -> str: - """Qualify image name with registry and tag from env vars.""" - prefix = f"{scope}_" if scope else "" - registry = os.environ.get(f"DOCKER_{prefix}REGISTRY", "") - tag = os.environ.get(f"DOCKER_{prefix}TAG", "latest") - return f"{registry}{image}:{tag}" +def add_registry_and_tag(image: str, scope: str = '') -> str: + """Qualify image name with registry and tag from environment variables.""" + prefix = f'{scope}_' if scope else '' + registry = os.environ.get(f'DOCKER_{prefix}REGISTRY', '') + tag = os.environ.get(f'DOCKER_{prefix}TAG', 'latest') + return f'{registry}{image}:{tag}' class TestContainer(ComposeContainer): - """Container for testing with lifecycle methods.""" + """Container wrapper for testing with lifecycle methods.""" @classmethod - def create(cls, client, **kwargs) -> 'TestContainer': - cfg = { + def create(cls, client: docker.DockerClient, **kwargs) -> 'TestContainer': + container_config = { 'image': kwargs.get('image'), 'command': kwargs.get('command'), 'labels': kwargs.get('labels', {}), 'detach': True, } - host_cfg = kwargs.get('host_config', {}) - if HOST_CONFIG_NETWORK_MODE in host_cfg: - cfg['network_mode'] = host_cfg[HOST_CONFIG_NETWORK_MODE] - if HOST_CONFIG_BINDS in host_cfg: - cfg['volumes'] = {b.split(':')[0]: {'bind': b.split(':')[1], 'mode': 'rw'} - for b in host_cfg[HOST_CONFIG_BINDS]} - return cls(client.containers.create(**cfg)) + + host_config = kwargs.get('host_config', {}) + if HOST_CONFIG_NETWORK_MODE in host_config: + container_config['network_mode'] = host_config[HOST_CONFIG_NETWORK_MODE] + if HOST_CONFIG_BINDS in host_config: + container_config['volumes'] = _parse_binds(host_config[HOST_CONFIG_BINDS]) + + return cls(client.containers.create(**container_config)) def state(self) -> Dict: self.container.reload() @@ -165,7 +199,7 @@ def state(self) -> Dict: def status(self) -> str: return self.state()['Status'] - def shutdown(self): + def shutdown(self) -> None: self.stop() self.remove() @@ -174,26 +208,26 @@ def execute(self, command: str) -> bytes: class TestCluster: - """Multi-container test cluster.""" + """Multi-container test cluster manager.""" def __init__(self, name: str, working_dir: str, config_file: str): self.name = name - self.config = ComposeConfig(working_dir, config_file) + self._config = ComposeConfig(working_dir, config_file) - def get_project(self) -> ComposeProject: - return ComposeProject(self.name, self.config, create_docker_client()) + def _get_project(self) -> ComposeProject: + return ComposeProject(self.name, self._config, create_docker_client()) - def start(self): + def start(self) -> None: self.shutdown() - self.get_project().up() + self._get_project().up() - def shutdown(self): - p = self.get_project() - p.down(remove_volumes=True) - p.remove_stopped() + def shutdown(self) -> None: + project = self._get_project() + project.down(remove_volumes=True) + project.remove_stopped() def is_running(self) -> bool: - containers = self.get_project().containers() + containers = self._get_project().containers() return bool(containers) and all(c.is_running for c in containers) def is_service_running(self, service_name: str) -> bool: @@ -204,24 +238,33 @@ def is_service_running(self, service_name: str) -> bool: def get_container(self, service_name: str, stopped: bool = False) -> ComposeContainer: if stopped: - containers = self.get_project().containers([service_name], stopped=True) + containers = self._get_project().containers( + service_names=[service_name], stopped=True + ) if containers: return containers[0] raise RuntimeError(f"No container for '{service_name}'") - return self.get_project().get_service(service_name).get_container() + return self._get_project().get_service(service_name).get_container() def exit_code(self, service_name: str) -> Optional[int]: - containers = self.get_project().containers([service_name], stopped=True) + containers = self._get_project().containers( + service_names=[service_name], stopped=True + ) return containers[0].exit_code if containers else None - def wait(self, service_name: str, timeout=None): - containers = self.get_project().containers([service_name], stopped=True) + def wait(self, service_name: str, timeout: Optional[int] = None) -> Optional[Dict]: + containers = self._get_project().containers( + service_names=[service_name], stopped=True + ) if containers and containers[0].is_running: return containers[0].wait(timeout) + return None def service_logs(self, service_name: str, stopped: bool = False) -> bytes: if stopped: - containers = self.get_project().containers([service_name], stopped=True) + containers = self._get_project().containers( + service_names=[service_name], stopped=True + ) return containers[0].logs() if containers else b'' return self.get_container(service_name).logs() @@ -231,9 +274,18 @@ def run_command_on_service(self, service_name: str, command: str) -> bytes: def run_command(self, command: str, container: ComposeContainer) -> bytes: print(f"Running {command} on {container.name}:") output = container.exec_run(command) - print(f"\n{output.decode('utf-8', errors='ignore') if isinstance(output, bytes) else output}") + decoded = output.decode('utf-8', errors='ignore') if isinstance(output, bytes) else output + print(f"\n{decoded}") return output def run_command_on_all(self, command: str) -> Dict[str, bytes]: - return {c.name_without_project: self.run_command(command, c) - for c in self.get_project().containers()} + return { + container.name_without_project: self.run_command(command, container) + for container in self._get_project().containers() + } + + +# Backward compatibility aliases +def get_project(cluster: TestCluster) -> ComposeProject: + """Backward compatibility wrapper.""" + return cluster._get_project() diff --git a/confluent/docker_utils/compose.py b/confluent/docker_utils/compose.py index 53d3579..39bed77 100644 --- a/confluent/docker_utils/compose.py +++ b/confluent/docker_utils/compose.py @@ -1,60 +1,65 @@ -""" -Docker Compose replacement using official Docker SDK. - -Drop-in replacement for the deprecated docker-compose Python library. -""" +"""Docker Compose replacement using official Docker SDK.""" import os import re -from typing import Dict, List, Optional, Any, Union +from typing import Any, Dict, List, Optional -import yaml import docker import docker.errors +import yaml - -def expand_env_vars(value: Any) -> Any: - """Expand environment variables in compose config values. - - Supports ${VAR}, ${VAR:-default}, ${VAR-default}, $VAR formats. - """ - if isinstance(value, str): - # Pattern for ${VAR}, ${VAR:-default}, ${VAR-default} - def replace_var(match): - var_expr = match.group(1) - # Handle ${VAR:-default} or ${VAR-default} - if ':-' in var_expr: - var_name, default = var_expr.split(':-', 1) - return os.environ.get(var_name, default) - elif '-' in var_expr and not var_expr.startswith('-'): - var_name, default = var_expr.split('-', 1) - return os.environ.get(var_name) if os.environ.get(var_name) is not None else default - else: - return os.environ.get(var_expr, '') - - # Replace ${VAR} patterns - result = re.sub(r'\$\{([^}]+)\}', replace_var, value) - # Replace $VAR patterns (word boundary) - result = re.sub(r'\$([A-Za-z_][A-Za-z0-9_]*)', lambda m: os.environ.get(m.group(1), ''), result) - return result - elif isinstance(value, dict): - return {k: expand_env_vars(v) for k, v in value.items()} - elif isinstance(value, list): - return [expand_env_vars(v) for v in value] - return value - -# Labels for compose container identification +# Docker Compose labels LABEL_PROJECT = "com.docker.compose.project" LABEL_SERVICE = "com.docker.compose.service" -# Container status +# Container states STATUS_RUNNING = "running" STATUS_EXITED = "exited" -# Container state keys +# Container attribute keys STATE_KEY = "State" EXIT_CODE_KEY = "ExitCode" +# Network driver +NETWORK_DRIVER_BRIDGE = "bridge" + +# Volume mode +VOLUME_MODE_RW = "rw" + +# Environment variable patterns +ENV_VAR_BRACED_PATTERN = re.compile(r'\$\{([^}]+)\}') +ENV_VAR_SIMPLE_PATTERN = re.compile(r'\$([A-Za-z_][A-Za-z0-9_]*)') + + +def _expand_env_vars(value: Any) -> Any: + """Recursively expand environment variables in config values. + + Supports: ${VAR}, ${VAR:-default}, ${VAR-default}, $VAR + """ + if isinstance(value, str): + def _replace_braced(match: re.Match) -> str: + expr = match.group(1) + if ':-' in expr: + name, default = expr.split(':-', 1) + return os.environ.get(name, default) + if '-' in expr and not expr.startswith('-'): + name, default = expr.split('-', 1) + env_val = os.environ.get(name) + return env_val if env_val is not None else default + return os.environ.get(expr, '') + + result = ENV_VAR_BRACED_PATTERN.sub(_replace_braced, value) + result = ENV_VAR_SIMPLE_PATTERN.sub(lambda m: os.environ.get(m.group(1), ''), result) + return result + + if isinstance(value, dict): + return {k: _expand_env_vars(v) for k, v in value.items()} + + if isinstance(value, list): + return [_expand_env_vars(item) for item in value] + + return value + def create_docker_client() -> docker.DockerClient: """Create Docker client from environment.""" @@ -62,33 +67,34 @@ def create_docker_client() -> docker.DockerClient: class ComposeConfig: - """Parses and manages docker-compose.yml configuration.""" + """Parses docker-compose.yml configuration.""" def __init__(self, working_dir: str, config_file: str): self.working_dir = working_dir self.config_file_path = os.path.join(working_dir, config_file) - self.config = self._load() + self._config = self._load() def _load(self) -> Dict[str, Any]: - with open(self.config_file_path) as f: - config = yaml.safe_load(f) - if not config or 'services' not in config: - raise ValueError("Invalid compose file: missing 'services'") - # Expand environment variables in all config values - return expand_env_vars(config) + with open(self.config_file_path, encoding='utf-8') as f: + raw_config = yaml.safe_load(f) + + if not raw_config or 'services' not in raw_config: + raise ValueError(f"Invalid compose file: missing 'services' in {self.config_file_path}") + + return _expand_env_vars(raw_config) @property - def services(self) -> Dict[str, Dict]: - return self.config.get('services', {}) + def services(self) -> Dict[str, Dict[str, Any]]: + return self._config.get('services', {}) def get_service(self, name: str) -> Dict[str, Any]: if name not in self.services: - raise ValueError(f"Service '{name}' not found") + raise ValueError(f"Service '{name}' not found in compose file") return self.services[name] class ComposeContainer: - """Wrapper around Docker SDK container with compose-like interface.""" + """Wrapper around Docker SDK container.""" def __init__(self, container: docker.models.containers.Container): self.container = container @@ -103,10 +109,9 @@ def name(self) -> str: @property def name_without_project(self) -> str: - """Service name from container.""" - label_service = self.container.labels.get(LABEL_SERVICE) - if label_service: - return label_service + service_label = self.container.labels.get(LABEL_SERVICE) + if service_label: + return service_label if '_' in self.name: return self.name.rsplit('_', 1)[-1] return self.name @@ -119,46 +124,48 @@ def is_running(self) -> bool: @property def exit_code(self) -> Optional[int]: self.container.reload() - return self.container.attrs[STATE_KEY][EXIT_CODE_KEY] if self.container.status == STATUS_EXITED else None + if self.container.status == STATUS_EXITED: + return self.container.attrs[STATE_KEY][EXIT_CODE_KEY] + return None @property def client(self): - """For backward compatibility.""" + """Low-level API client for backward compatibility.""" return self.container.client.api @property - def inspect_container(self) -> Dict: - """For backward compatibility.""" + def inspect_container(self) -> Dict[str, Any]: + """Container attributes for backward compatibility.""" self.container.reload() return self.container.attrs - def start(self): + def start(self) -> None: self.container.start() - def stop(self, timeout: int = 10): + def stop(self, timeout: int = 10) -> None: try: self.container.stop(timeout=timeout) except docker.errors.APIError: pass - def remove(self, force: bool = False, v: bool = False): + def remove(self, force: bool = False, v: bool = False) -> None: try: self.container.remove(force=force, v=v) except (docker.errors.NotFound, docker.errors.APIError): pass - def wait(self, timeout: Optional[int] = None) -> Dict: + def wait(self, timeout: Optional[int] = None) -> Dict[str, Any]: return self.container.wait(timeout=timeout) def logs(self) -> bytes: return self.container.logs() def create_exec(self, command: str) -> str: - """For backward compatibility.""" + """Create exec instance for backward compatibility.""" return self.container.client.api.exec_create(self.container.id, command)['Id'] def start_exec(self, exec_id: str) -> bytes: - """For backward compatibility.""" + """Start exec instance for backward compatibility.""" return self.container.client.api.exec_start(exec_id) def exec_run(self, command: str) -> bytes: @@ -173,14 +180,16 @@ def __init__(self, name: str, project: 'ComposeProject'): self.project = project def get_container(self) -> ComposeContainer: - containers = self.project.containers([self.name]) + containers = self.project.containers(service_names=[self.name]) if not containers: raise RuntimeError(f"No running container for service '{self.name}'") return containers[0] class ComposeProject: - """Manages multi-container compose project using Docker SDK.""" + """Manages multi-container compose project.""" + + _PASSTHROUGH_KEYS = ('network_mode', 'working_dir', 'hostname', 'entrypoint', 'user', 'tty', 'stdin_open') def __init__(self, name: str, config: ComposeConfig, client: docker.DockerClient): self.name = name @@ -190,11 +199,9 @@ def __init__(self, name: str, config: ComposeConfig, client: docker.DockerClient @property def network_name(self) -> str: - """Default network name for the project.""" return f"{self.name}_default" - def _ensure_network(self): - """Create project network if it doesn't exist.""" + def _get_or_create_network(self) -> docker.models.networks.Network: if self._network: return self._network @@ -203,27 +210,29 @@ def _ensure_network(self): except docker.errors.NotFound: self._network = self.client.networks.create( self.network_name, - driver="bridge", + driver=NETWORK_DRIVER_BRIDGE, labels={LABEL_PROJECT: self.name} ) return self._network - def up(self, services: Optional[List[str]] = None): - """Start services.""" - self._ensure_network() - for svc in (services or list(self.config.services.keys())): - self._start_service(svc) + def up(self, services: Optional[List[str]] = None) -> None: + self._get_or_create_network() + service_list = services or list(self.config.services.keys()) + for service_name in service_list: + self._start_service(service_name) - def down(self, remove_images=None, remove_volumes: bool = False, remove_orphans: bool = False): - """Stop and remove containers.""" - for c in self.containers(stopped=True): + def down(self, remove_images: Optional[str] = None, remove_volumes: bool = False, + remove_orphans: bool = False) -> None: + for container in self.containers(stopped=True): try: - c.stop() - c.remove(force=True, v=remove_volumes) + container.stop() + container.remove(force=True, v=remove_volumes) except (docker.errors.NotFound, docker.errors.APIError): pass - # Remove project network + self._remove_network() + + def _remove_network(self) -> None: try: network = self.client.networks.get(self.network_name) network.remove() @@ -231,144 +240,162 @@ def down(self, remove_images=None, remove_volumes: bool = False, remove_orphans: pass self._network = None - def remove_stopped(self): - """Remove stopped containers.""" - for c in self.containers(stopped=True): - if not c.is_running: - c.remove(force=True) + def remove_stopped(self) -> None: + for container in self.containers(stopped=True): + if not container.is_running: + container.remove(force=True) - def containers(self, service_names: Optional[List[str]] = None, stopped: bool = False) -> List[ComposeContainer]: - """Get project containers.""" + def containers(self, service_names: Optional[List[str]] = None, + stopped: bool = False) -> List[ComposeContainer]: filters = {'label': f'{LABEL_PROJECT}={self.name}'} if not stopped: filters['status'] = STATUS_RUNNING - result = [ComposeContainer(c) for c in self.client.containers.list(all=stopped, filters=filters)] + all_containers = self.client.containers.list(all=stopped, filters=filters) + result = [ComposeContainer(c) for c in all_containers] if service_names: result = [c for c in result if c.container.labels.get(LABEL_SERVICE) in service_names] + return result def get_service(self, name: str) -> ComposeService: return ComposeService(name, self) def _start_service(self, service_name: str) -> ComposeContainer: - """Start a single service.""" container_name = f"{self.name}_{service_name}_1" - # Check if exists + existing = self._get_existing_container(container_name) + if existing: + return existing + + service_config = self.config.get_service(service_name) + run_kwargs = self._build_run_kwargs(service_name, service_config) + + try: + container = self.client.containers.run(**run_kwargs) + except docker.errors.APIError as err: + raise RuntimeError(f"Failed to start service '{service_name}': {err}") from err + + self._verify_container_running(container, service_name) + return ComposeContainer(container) + + def _get_existing_container(self, container_name: str) -> Optional[ComposeContainer]: try: existing = self.client.containers.get(container_name) if existing.status != STATUS_RUNNING: existing.start() return ComposeContainer(existing) except docker.errors.NotFound: - pass + return None + + def _build_run_kwargs(self, service_name: str, service_config: Dict[str, Any]) -> Dict[str, Any]: + container_config = self._parse_service_config(service_config) - # Create new - svc_config = self.config.get_service(service_name) - run_config = self._build_config(svc_config) + if 'image' not in container_config or not container_config['image']: + raise ValueError(f"Service '{service_name}' has no valid image") - # Validate image is set - if 'image' not in run_config or not run_config['image']: - raise ValueError(f"Service '{service_name}' has no valid image specified") + kwargs = { + 'name': f"{self.name}_{service_name}_1", + 'detach': True, + 'labels': {LABEL_PROJECT: self.name, LABEL_SERVICE: service_name}, + **container_config + } - # Use project network for inter-service communication - network = self._ensure_network() + if 'network_mode' not in container_config: + network = self._get_or_create_network() + kwargs['network'] = network.name + if 'hostname' not in container_config: + kwargs['hostname'] = service_name - # Don't pass network if network_mode is set (they conflict) - if 'network_mode' in run_config: - network_kwargs = {} - else: - network_kwargs = {'network': network.name} - # Set hostname if not already in config (for DNS resolution) - if 'hostname' not in run_config: - network_kwargs['hostname'] = service_name + return kwargs + + def _parse_service_config(self, service_config: Dict[str, Any]) -> Dict[str, Any]: + config = {} - try: - container = self.client.containers.run( - name=container_name, - detach=True, - labels={LABEL_PROJECT: self.name, LABEL_SERVICE: service_name}, - **network_kwargs, - **run_config - ) - except docker.errors.APIError as e: - raise RuntimeError(f"Failed to start service '{service_name}': {e}") + if 'image' in service_config: + config['image'] = service_config['image'] - # Verify container is running - container.reload() - if container.status != STATUS_RUNNING: - logs = container.logs().decode('utf-8', errors='ignore')[-500:] - raise RuntimeError(f"Service '{service_name}' container exited immediately. Logs:\n{logs}") + if 'command' in service_config: + config['command'] = service_config['command'] - return ComposeContainer(container) - - def _build_config(self, svc: Dict) -> Dict: - """Convert compose service config to Docker SDK format.""" - cfg = {} + if 'environment' in service_config: + config['environment'] = self._parse_environment(service_config['environment']) - if 'image' in svc: - cfg['image'] = svc['image'] + if 'ports' in service_config: + config['ports'] = self._parse_ports(service_config['ports']) - if 'command' in svc: - cfg['command'] = svc['command'] + if 'volumes' in service_config: + config['volumes'] = self._parse_volumes(service_config['volumes']) - if 'environment' in svc: - env = svc['environment'] - if isinstance(env, list): - env_dict: Dict[str, str] = {} - for item in env: - if not isinstance(item, str): - continue - if '=' in item: - key, value = item.split('=', 1) - else: - key = item - value = os.environ.get(key, "") - env_dict[key] = value - cfg['environment'] = env_dict - elif isinstance(env, dict): - resolved_env: Dict[str, Any] = {} - for key, value in env.items(): - if value is None: - resolved_env[key] = os.environ.get(key, "") - else: - resolved_env[key] = value - cfg['environment'] = resolved_env - else: - cfg['environment'] = env + for key in self._PASSTHROUGH_KEYS: + if key in service_config: + config[key] = service_config[key] - if 'ports' in svc: - ports: Dict[str, Any] = {} - for port_spec in svc['ports']: - port_str = str(port_spec) - parts = port_str.split(':') - if len(parts) == 1: - # Just container port (e.g., "80" or "80/tcp") - ports[parts[0]] = None - elif len(parts) == 2: - # HOST:CONTAINER (e.g., "8080:80") - host_port, container_port = parts - ports[container_port] = int(host_port) if host_port.isdigit() else host_port - elif len(parts) == 3: - # IP:HOST:CONTAINER (e.g., "127.0.0.1:8080:80") - ip, host_port, container_port = parts - ports[container_port] = (ip, int(host_port) if host_port else None) - cfg['ports'] = ports + return config + + def _parse_environment(self, env: Any) -> Dict[str, str]: + if isinstance(env, list): + result = {} + for item in env: + if not isinstance(item, str): + continue + if '=' in item: + key, value = item.split('=', 1) + else: + key, value = item, os.environ.get(item, '') + result[key] = value + return result - if 'volumes' in svc: - cfg['volumes'] = {} - for v in svc['volumes']: - if ':' in v: - parts = v.split(':') - host = parts[0] - if host.startswith('./'): - host = os.path.join(self.config.working_dir, host[2:]) - cfg['volumes'][host] = {'bind': parts[1], 'mode': parts[2] if len(parts) > 2 else 'rw'} + if isinstance(env, dict): + return { + key: os.environ.get(key, '') if value is None else str(value) + for key, value in env.items() + } - for key in ['network_mode', 'working_dir', 'hostname', 'entrypoint', 'user', 'tty', 'stdin_open']: - if key in svc: - cfg[key] = svc[key] + return env + + def _parse_ports(self, ports: List[Any]) -> Dict[str, Any]: + result = {} + for port_spec in ports: + port_str = str(port_spec) + parts = port_str.split(':') + + if len(parts) == 1: + result[parts[0]] = None + elif len(parts) == 2: + host_port, container_port = parts + result[container_port] = int(host_port) if host_port.isdigit() else host_port + elif len(parts) == 3: + ip_addr, host_port, container_port = parts + host_binding = int(host_port) if host_port else None + result[container_port] = (ip_addr, host_binding) + + return result + + def _parse_volumes(self, volumes: List[str]) -> Dict[str, Dict[str, str]]: + result = {} + for volume_spec in volumes: + if ':' not in volume_spec: + continue + + parts = volume_spec.split(':') + host_path = parts[0] + container_path = parts[1] + mode = parts[2] if len(parts) > 2 else VOLUME_MODE_RW + + if host_path.startswith('./'): + host_path = os.path.join(self.config.working_dir, host_path[2:]) + + result[host_path] = {'bind': container_path, 'mode': mode} - return cfg + return result + + def _verify_container_running(self, container: docker.models.containers.Container, + service_name: str) -> None: + container.reload() + if container.status != STATUS_RUNNING: + logs = container.logs().decode('utf-8', errors='ignore')[-500:] + raise RuntimeError( + f"Service '{service_name}' exited immediately.\nLogs:\n{logs}" + ) From 62754333f76ef7c60e82d3250558024920d0cb7b Mon Sep 17 00:00:00 2001 From: Abhishek Krishna Date: Fri, 13 Feb 2026 12:25:09 +0530 Subject: [PATCH 05/13] Fix: name_without_project logic, add __all__, remove duplication --- .python-version | 1 + confluent/docker_utils/__init__.py | 38 +++++++++++++++++++++++------- confluent/docker_utils/compose.py | 24 +++++++++++++++++-- 3 files changed, 52 insertions(+), 11 deletions(-) create mode 100644 .python-version diff --git a/.python-version b/.python-version new file mode 100644 index 0000000..f3fe474 --- /dev/null +++ b/.python-version @@ -0,0 +1 @@ +3.12.9 diff --git a/confluent/docker_utils/__init__.py b/confluent/docker_utils/__init__.py index 4a8ccb4..3bbbb6a 100644 --- a/confluent/docker_utils/__init__.py +++ b/confluent/docker_utils/__init__.py @@ -15,8 +15,33 @@ create_docker_client, STATE_KEY, STATUS_RUNNING, + VOLUME_MODE_RW, ) +__all__ = [ + # Compose classes + 'ComposeConfig', + 'ComposeContainer', + 'ComposeProject', + 'ComposeService', + 'create_docker_client', + # Test utilities + 'TestCluster', + 'TestContainer', + # Functions + 'api_client', + 'build_image', + 'image_exists', + 'pull_image', + 'run_docker_command', + 'run_command_on_host', + 'run_cmd', + 'path_exists_in_image', + 'executable_exists_in_image', + 'add_registry_and_tag', + 'ecr_login', +] + # Host config keys for backward compatibility HOST_CONFIG_NETWORK_MODE = "NetworkMode" HOST_CONFIG_BINDS = "Binds" @@ -24,9 +49,6 @@ # Testing label TESTING_LABEL = "io.confluent.docker.testing" -# Volume configuration -VOLUME_MODE_RW = "rw" - try: import boto3 _HAS_BOTO3 = True @@ -283,9 +305,7 @@ def run_command_on_all(self, command: str) -> Dict[str, bytes]: container.name_without_project: self.run_command(command, container) for container in self._get_project().containers() } - - -# Backward compatibility aliases -def get_project(cluster: TestCluster) -> ComposeProject: - """Backward compatibility wrapper.""" - return cluster._get_project() + + def get_project(self) -> ComposeProject: + """Get the compose project instance.""" + return self._get_project() diff --git a/confluent/docker_utils/compose.py b/confluent/docker_utils/compose.py index 39bed77..ff9afc4 100644 --- a/confluent/docker_utils/compose.py +++ b/confluent/docker_utils/compose.py @@ -8,6 +8,21 @@ import docker.errors import yaml +__all__ = [ + 'ComposeConfig', + 'ComposeContainer', + 'ComposeProject', + 'ComposeService', + 'create_docker_client', + 'LABEL_PROJECT', + 'LABEL_SERVICE', + 'STATUS_RUNNING', + 'STATUS_EXITED', + 'STATE_KEY', + 'EXIT_CODE_KEY', + 'VOLUME_MODE_RW', +] + # Docker Compose labels LABEL_PROJECT = "com.docker.compose.project" LABEL_SERVICE = "com.docker.compose.service" @@ -112,8 +127,13 @@ def name_without_project(self) -> str: service_label = self.container.labels.get(LABEL_SERVICE) if service_label: return service_label - if '_' in self.name: - return self.name.rsplit('_', 1)[-1] + # Container name format: {project}_{service}_{instance} + # Extract service name (middle part) + parts = self.name.split('_') + if len(parts) >= 3: + return '_'.join(parts[1:-1]) + if len(parts) == 2: + return parts[1] return self.name @property From c4724fe84fa8f719c715f15b0f8f38cae7899736 Mon Sep 17 00:00:00 2001 From: Abhishek Krishna Date: Fri, 13 Feb 2026 12:36:23 +0530 Subject: [PATCH 06/13] Remove all docstrings --- confluent/docker_utils/__init__.py | 26 -------------------------- confluent/docker_utils/compose.py | 26 -------------------------- 2 files changed, 52 deletions(-) diff --git a/confluent/docker_utils/__init__.py b/confluent/docker_utils/__init__.py index 3bbbb6a..2e478f9 100644 --- a/confluent/docker_utils/__init__.py +++ b/confluent/docker_utils/__init__.py @@ -1,5 +1,3 @@ -"""Confluent Docker Utilities.""" - import base64 import os import subprocess @@ -19,16 +17,13 @@ ) __all__ = [ - # Compose classes 'ComposeConfig', 'ComposeContainer', 'ComposeProject', 'ComposeService', 'create_docker_client', - # Test utilities 'TestCluster', 'TestContainer', - # Functions 'api_client', 'build_image', 'image_exists', @@ -42,11 +37,8 @@ 'ecr_login', ] -# Host config keys for backward compatibility HOST_CONFIG_NETWORK_MODE = "NetworkMode" HOST_CONFIG_BINDS = "Binds" - -# Testing label TESTING_LABEL = "io.confluent.docker.testing" try: @@ -57,12 +49,10 @@ def api_client() -> docker.DockerClient: - """Return Docker client from environment.""" return docker.from_env() def ecr_login() -> None: - """Authenticate with AWS ECR.""" if not _HAS_BOTO3: raise ImportError("boto3 required for ECR login") @@ -74,7 +64,6 @@ def ecr_login() -> None: def build_image(image_name: str, dockerfile_dir: str) -> None: - """Build Docker image from Dockerfile directory.""" print(f"Building image {image_name} from {dockerfile_dir}") _, build_logs = api_client().images.build( path=dockerfile_dir, rm=True, tag=image_name, decode=True @@ -88,7 +77,6 @@ def build_image(image_name: str, dockerfile_dir: str) -> None: def image_exists(image_name: str) -> bool: - """Check if Docker image exists locally.""" try: api_client().images.get(image_name) return True @@ -97,13 +85,11 @@ def image_exists(image_name: str) -> bool: def pull_image(image_name: str) -> None: - """Pull Docker image if not present locally.""" if not image_exists(image_name): api_client().images.pull(image_name) def run_docker_command(timeout: Optional[int] = None, **kwargs) -> bytes: - """Run command in temporary container and return output.""" pull_image(kwargs['image']) container_config = { @@ -131,7 +117,6 @@ def run_docker_command(timeout: Optional[int] = None, **kwargs) -> bytes: def _parse_binds(binds: list) -> Dict[str, Dict[str, str]]: - """Parse bind mount specifications.""" return { bind.split(':')[0]: {'bind': bind.split(':')[1], 'mode': VOLUME_MODE_RW} for bind in binds @@ -139,7 +124,6 @@ def _parse_binds(binds: list) -> Dict[str, Dict[str, str]]: def _cleanup_container(container) -> None: - """Stop and remove container, ignoring errors.""" try: container.stop() container.remove() @@ -148,7 +132,6 @@ def _cleanup_container(container) -> None: def path_exists_in_image(image: str, path: str) -> bool: - """Check if path exists in Docker image.""" print(f"Checking for {path} in {image}") output = run_docker_command( image=image, @@ -158,7 +141,6 @@ def path_exists_in_image(image: str, path: str) -> bool: def executable_exists_in_image(image: str, path: str) -> bool: - """Check if executable exists in Docker image.""" print(f"Checking for {path} in {image}") output = run_docker_command( image=image, @@ -168,7 +150,6 @@ def executable_exists_in_image(image: str, path: str) -> bool: def run_command_on_host(command: str) -> bytes: - """Run command on host via busybox container.""" return run_docker_command( image='busybox', command=command, @@ -180,14 +161,12 @@ def run_command_on_host(command: str) -> bytes: def run_cmd(command: str) -> bytes: - """Run shell command locally.""" if command.startswith('"'): command = f'bash -c {command}' return subprocess.check_output(command, shell=True, stderr=subprocess.STDOUT) def add_registry_and_tag(image: str, scope: str = '') -> str: - """Qualify image name with registry and tag from environment variables.""" prefix = f'{scope}_' if scope else '' registry = os.environ.get(f'DOCKER_{prefix}REGISTRY', '') tag = os.environ.get(f'DOCKER_{prefix}TAG', 'latest') @@ -195,8 +174,6 @@ def add_registry_and_tag(image: str, scope: str = '') -> str: class TestContainer(ComposeContainer): - """Container wrapper for testing with lifecycle methods.""" - @classmethod def create(cls, client: docker.DockerClient, **kwargs) -> 'TestContainer': container_config = { @@ -230,8 +207,6 @@ def execute(self, command: str) -> bytes: class TestCluster: - """Multi-container test cluster manager.""" - def __init__(self, name: str, working_dir: str, config_file: str): self.name = name self._config = ComposeConfig(working_dir, config_file) @@ -307,5 +282,4 @@ def run_command_on_all(self, command: str) -> Dict[str, bytes]: } def get_project(self) -> ComposeProject: - """Get the compose project instance.""" return self._get_project() diff --git a/confluent/docker_utils/compose.py b/confluent/docker_utils/compose.py index ff9afc4..6e59fe2 100644 --- a/confluent/docker_utils/compose.py +++ b/confluent/docker_utils/compose.py @@ -1,5 +1,3 @@ -"""Docker Compose replacement using official Docker SDK.""" - import os import re from typing import Any, Dict, List, Optional @@ -23,34 +21,24 @@ 'VOLUME_MODE_RW', ] -# Docker Compose labels LABEL_PROJECT = "com.docker.compose.project" LABEL_SERVICE = "com.docker.compose.service" -# Container states STATUS_RUNNING = "running" STATUS_EXITED = "exited" -# Container attribute keys STATE_KEY = "State" EXIT_CODE_KEY = "ExitCode" -# Network driver NETWORK_DRIVER_BRIDGE = "bridge" -# Volume mode VOLUME_MODE_RW = "rw" -# Environment variable patterns ENV_VAR_BRACED_PATTERN = re.compile(r'\$\{([^}]+)\}') ENV_VAR_SIMPLE_PATTERN = re.compile(r'\$([A-Za-z_][A-Za-z0-9_]*)') def _expand_env_vars(value: Any) -> Any: - """Recursively expand environment variables in config values. - - Supports: ${VAR}, ${VAR:-default}, ${VAR-default}, $VAR - """ if isinstance(value, str): def _replace_braced(match: re.Match) -> str: expr = match.group(1) @@ -77,13 +65,10 @@ def _replace_braced(match: re.Match) -> str: def create_docker_client() -> docker.DockerClient: - """Create Docker client from environment.""" return docker.from_env() class ComposeConfig: - """Parses docker-compose.yml configuration.""" - def __init__(self, working_dir: str, config_file: str): self.working_dir = working_dir self.config_file_path = os.path.join(working_dir, config_file) @@ -109,8 +94,6 @@ def get_service(self, name: str) -> Dict[str, Any]: class ComposeContainer: - """Wrapper around Docker SDK container.""" - def __init__(self, container: docker.models.containers.Container): self.container = container @@ -128,7 +111,6 @@ def name_without_project(self) -> str: if service_label: return service_label # Container name format: {project}_{service}_{instance} - # Extract service name (middle part) parts = self.name.split('_') if len(parts) >= 3: return '_'.join(parts[1:-1]) @@ -150,12 +132,10 @@ def exit_code(self) -> Optional[int]: @property def client(self): - """Low-level API client for backward compatibility.""" return self.container.client.api @property def inspect_container(self) -> Dict[str, Any]: - """Container attributes for backward compatibility.""" self.container.reload() return self.container.attrs @@ -181,11 +161,9 @@ def logs(self) -> bytes: return self.container.logs() def create_exec(self, command: str) -> str: - """Create exec instance for backward compatibility.""" return self.container.client.api.exec_create(self.container.id, command)['Id'] def start_exec(self, exec_id: str) -> bytes: - """Start exec instance for backward compatibility.""" return self.container.client.api.exec_start(exec_id) def exec_run(self, command: str) -> bytes: @@ -193,8 +171,6 @@ def exec_run(self, command: str) -> bytes: class ComposeService: - """Represents a service in the compose project.""" - def __init__(self, name: str, project: 'ComposeProject'): self.name = name self.project = project @@ -207,8 +183,6 @@ def get_container(self) -> ComposeContainer: class ComposeProject: - """Manages multi-container compose project.""" - _PASSTHROUGH_KEYS = ('network_mode', 'working_dir', 'hostname', 'entrypoint', 'user', 'tty', 'stdin_open') def __init__(self, name: str, config: ComposeConfig, client: docker.DockerClient): From 8f56737287e4a1bcbba53fc1252677dd9ef5e2c8 Mon Sep 17 00:00:00 2001 From: Abhishek Krishna Date: Fri, 13 Feb 2026 12:47:41 +0530 Subject: [PATCH 07/13] Improve error handling and code modularity --- confluent/docker_utils/__init__.py | 51 ++++++++++++++++++++---------- confluent/docker_utils/compose.py | 38 ++++++++++++++-------- 2 files changed, 58 insertions(+), 31 deletions(-) diff --git a/confluent/docker_utils/__init__.py b/confluent/docker_utils/__init__.py index 2e478f9..23b81ef 100644 --- a/confluent/docker_utils/__init__.py +++ b/confluent/docker_utils/__init__.py @@ -1,7 +1,7 @@ import base64 import os import subprocess -from typing import Dict, Optional +from typing import Dict, Optional, Tuple import docker @@ -41,11 +41,11 @@ HOST_CONFIG_BINDS = "Binds" TESTING_LABEL = "io.confluent.docker.testing" +boto3 = None try: import boto3 - _HAS_BOTO3 = True except ImportError: - _HAS_BOTO3 = False + pass def api_client() -> docker.DockerClient: @@ -53,8 +53,8 @@ def api_client() -> docker.DockerClient: def ecr_login() -> None: - if not _HAS_BOTO3: - raise ImportError("boto3 required for ECR login") + if boto3 is None: + raise ImportError("boto3 required for ECR login: pip install boto3") ecr = boto3.client('ecr') auth_data = ecr.get_authorization_token()['authorizationData'][0] @@ -85,8 +85,30 @@ def image_exists(image_name: str) -> bool: def pull_image(image_name: str) -> None: - if not image_exists(image_name): + if image_exists(image_name): + return + try: api_client().images.pull(image_name) + except docker.errors.APIError as err: + raise RuntimeError(f"Failed to pull image '{image_name}': {err}") from err + + +def parse_bind_mount(bind_spec: str) -> Tuple[str, Dict[str, str]]: + parts = bind_spec.split(':') + if len(parts) < 2: + raise ValueError(f"Invalid bind mount format: {bind_spec}") + host_path = parts[0] + container_path = parts[1] + mode = parts[2] if len(parts) > 2 else VOLUME_MODE_RW + return host_path, {'bind': container_path, 'mode': mode} + + +def _parse_binds(binds: list) -> Dict[str, Dict[str, str]]: + result = {} + for bind_spec in binds: + host_path, mount_config = parse_bind_mount(bind_spec) + result[host_path] = mount_config + return result def run_docker_command(timeout: Optional[int] = None, **kwargs) -> bytes: @@ -116,18 +138,11 @@ def run_docker_command(timeout: Optional[int] = None, **kwargs) -> bytes: _cleanup_container(container) -def _parse_binds(binds: list) -> Dict[str, Dict[str, str]]: - return { - bind.split(':')[0]: {'bind': bind.split(':')[1], 'mode': VOLUME_MODE_RW} - for bind in binds - } - - def _cleanup_container(container) -> None: try: container.stop() container.remove() - except Exception: + except docker.errors.APIError: pass @@ -161,9 +176,11 @@ def run_command_on_host(command: str) -> bytes: def run_cmd(command: str) -> bytes: - if command.startswith('"'): - command = f'bash -c {command}' - return subprocess.check_output(command, shell=True, stderr=subprocess.STDOUT) + shell_command = f'bash -c {command}' if command.startswith('"') else command + try: + return subprocess.check_output(shell_command, shell=True, stderr=subprocess.STDOUT) + except subprocess.CalledProcessError as err: + raise RuntimeError(f"Command failed with exit code {err.returncode}: {err.output}") from err def add_registry_and_tag(image: str, scope: str = '') -> str: diff --git a/confluent/docker_utils/compose.py b/confluent/docker_utils/compose.py index 6e59fe2..6da3fd0 100644 --- a/confluent/docker_utils/compose.py +++ b/confluent/docker_utils/compose.py @@ -38,22 +38,32 @@ ENV_VAR_SIMPLE_PATTERN = re.compile(r'\$([A-Za-z_][A-Za-z0-9_]*)') +def _resolve_braced_env_var(expr: str) -> str: + if ':-' in expr: + name, default = expr.split(':-', 1) + return os.environ.get(name, default) + + if '-' in expr and not expr.startswith('-'): + name, default = expr.split('-', 1) + env_val = os.environ.get(name) + return env_val if env_val is not None else default + + return os.environ.get(expr, '') + + +def _expand_env_string(value: str) -> str: + result = ENV_VAR_BRACED_PATTERN.sub( + lambda m: _resolve_braced_env_var(m.group(1)), value + ) + result = ENV_VAR_SIMPLE_PATTERN.sub( + lambda m: os.environ.get(m.group(1), ''), result + ) + return result + + def _expand_env_vars(value: Any) -> Any: if isinstance(value, str): - def _replace_braced(match: re.Match) -> str: - expr = match.group(1) - if ':-' in expr: - name, default = expr.split(':-', 1) - return os.environ.get(name, default) - if '-' in expr and not expr.startswith('-'): - name, default = expr.split('-', 1) - env_val = os.environ.get(name) - return env_val if env_val is not None else default - return os.environ.get(expr, '') - - result = ENV_VAR_BRACED_PATTERN.sub(_replace_braced, value) - result = ENV_VAR_SIMPLE_PATTERN.sub(lambda m: os.environ.get(m.group(1), ''), result) - return result + return _expand_env_string(value) if isinstance(value, dict): return {k: _expand_env_vars(v) for k, v in value.items()} From c928e715756215877a769eb063c42f44108506ca Mon Sep 17 00:00:00 2001 From: Abhishek Krishna Date: Tue, 17 Feb 2026 12:32:07 +0530 Subject: [PATCH 08/13] Remove .python-version file --- .python-version | 1 - 1 file changed, 1 deletion(-) delete mode 100644 .python-version diff --git a/.python-version b/.python-version deleted file mode 100644 index f3fe474..0000000 --- a/.python-version +++ /dev/null @@ -1 +0,0 @@ -3.12.9 From 77ffd8d84ce55578f177ddf0d1cd52b01a93804d Mon Sep 17 00:00:00 2001 From: Abhishek Krishna Date: Tue, 17 Feb 2026 18:24:02 +0530 Subject: [PATCH 09/13] fix tox file to fix warning --- tox.ini | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tox.ini b/tox.ini index 8897c71..0654613 100644 --- a/tox.ini +++ b/tox.ini @@ -35,3 +35,5 @@ max-line-length = 160 [pytest] addopts = -n 4 +markers = + integration: marks tests as integration tests From e9bd12c524bf64eef849f60f0f17a3fd70be0908 Mon Sep 17 00:00:00 2001 From: Abhishek Krishna Date: Wed, 18 Feb 2026 12:04:18 +0530 Subject: [PATCH 10/13] Address PR review comments --- confluent/docker_utils/__init__.py | 98 ++++++++++++------------------ confluent/docker_utils/compose.py | 62 +++++++++---------- 2 files changed, 67 insertions(+), 93 deletions(-) diff --git a/confluent/docker_utils/__init__.py b/confluent/docker_utils/__init__.py index 23b81ef..457d672 100644 --- a/confluent/docker_utils/__init__.py +++ b/confluent/docker_utils/__init__.py @@ -16,27 +16,6 @@ VOLUME_MODE_RW, ) -__all__ = [ - 'ComposeConfig', - 'ComposeContainer', - 'ComposeProject', - 'ComposeService', - 'create_docker_client', - 'TestCluster', - 'TestContainer', - 'api_client', - 'build_image', - 'image_exists', - 'pull_image', - 'run_docker_command', - 'run_command_on_host', - 'run_cmd', - 'path_exists_in_image', - 'executable_exists_in_image', - 'add_registry_and_tag', - 'ecr_login', -] - HOST_CONFIG_NETWORK_MODE = "NetworkMode" HOST_CONFIG_BINDS = "Binds" TESTING_LABEL = "io.confluent.docker.testing" @@ -111,21 +90,32 @@ def _parse_binds(binds: list) -> Dict[str, Dict[str, str]]: return result -def run_docker_command(timeout: Optional[int] = None, **kwargs) -> bytes: - pull_image(kwargs['image']) - - container_config = { - 'image': kwargs['image'], - 'command': kwargs.get('command'), - 'labels': {TESTING_LABEL: 'true'}, +def _build_container_config(image: str, command: Optional[str], labels: Dict, + host_config: Dict) -> Dict: + config = { + 'image': image, + 'command': command, + 'labels': labels, 'detach': True, } - host_config = kwargs.get('host_config', {}) if HOST_CONFIG_NETWORK_MODE in host_config: - container_config['network_mode'] = host_config[HOST_CONFIG_NETWORK_MODE] + config['network_mode'] = host_config[HOST_CONFIG_NETWORK_MODE] if HOST_CONFIG_BINDS in host_config: - container_config['volumes'] = _parse_binds(host_config[HOST_CONFIG_BINDS]) + config['volumes'] = _parse_binds(host_config[HOST_CONFIG_BINDS]) + + return config + + +def run_docker_command(timeout: Optional[int] = None, **kwargs) -> bytes: + pull_image(kwargs['image']) + + container_config = _build_container_config( + image=kwargs['image'], + command=kwargs.get('command'), + labels={TESTING_LABEL: 'true'}, + host_config=kwargs.get('host_config', {}) + ) container = api_client().containers.create(**container_config) try: @@ -142,7 +132,7 @@ def _cleanup_container(container) -> None: try: container.stop() container.remove() - except docker.errors.APIError: + except docker.errors.NotFound: pass @@ -193,19 +183,12 @@ def add_registry_and_tag(image: str, scope: str = '') -> str: class TestContainer(ComposeContainer): @classmethod def create(cls, client: docker.DockerClient, **kwargs) -> 'TestContainer': - container_config = { - 'image': kwargs.get('image'), - 'command': kwargs.get('command'), - 'labels': kwargs.get('labels', {}), - 'detach': True, - } - - host_config = kwargs.get('host_config', {}) - if HOST_CONFIG_NETWORK_MODE in host_config: - container_config['network_mode'] = host_config[HOST_CONFIG_NETWORK_MODE] - if HOST_CONFIG_BINDS in host_config: - container_config['volumes'] = _parse_binds(host_config[HOST_CONFIG_BINDS]) - + container_config = _build_container_config( + image=kwargs.get('image'), + command=kwargs.get('command'), + labels=kwargs.get('labels', {}), + host_config=kwargs.get('host_config', {}) + ) return cls(client.containers.create(**container_config)) def state(self) -> Dict: @@ -228,20 +211,18 @@ def __init__(self, name: str, working_dir: str, config_file: str): self.name = name self._config = ComposeConfig(working_dir, config_file) - def _get_project(self) -> ComposeProject: + def get_project(self) -> ComposeProject: return ComposeProject(self.name, self._config, create_docker_client()) def start(self) -> None: self.shutdown() - self._get_project().up() + self.get_project().up() def shutdown(self) -> None: - project = self._get_project() - project.down(remove_volumes=True) - project.remove_stopped() + self.get_project().down(remove_volumes=True) def is_running(self) -> bool: - containers = self._get_project().containers() + containers = self.get_project().containers() return bool(containers) and all(c.is_running for c in containers) def is_service_running(self, service_name: str) -> bool: @@ -252,22 +233,22 @@ def is_service_running(self, service_name: str) -> bool: def get_container(self, service_name: str, stopped: bool = False) -> ComposeContainer: if stopped: - containers = self._get_project().containers( + containers = self.get_project().containers( service_names=[service_name], stopped=True ) if containers: return containers[0] raise RuntimeError(f"No container for '{service_name}'") - return self._get_project().get_service(service_name).get_container() + return self.get_project().get_service(service_name).get_container() def exit_code(self, service_name: str) -> Optional[int]: - containers = self._get_project().containers( + containers = self.get_project().containers( service_names=[service_name], stopped=True ) return containers[0].exit_code if containers else None def wait(self, service_name: str, timeout: Optional[int] = None) -> Optional[Dict]: - containers = self._get_project().containers( + containers = self.get_project().containers( service_names=[service_name], stopped=True ) if containers and containers[0].is_running: @@ -276,7 +257,7 @@ def wait(self, service_name: str, timeout: Optional[int] = None) -> Optional[Dic def service_logs(self, service_name: str, stopped: bool = False) -> bytes: if stopped: - containers = self._get_project().containers( + containers = self.get_project().containers( service_names=[service_name], stopped=True ) return containers[0].logs() if containers else b'' @@ -295,8 +276,5 @@ def run_command(self, command: str, container: ComposeContainer) -> bytes: def run_command_on_all(self, command: str) -> Dict[str, bytes]: return { container.name_without_project: self.run_command(command, container) - for container in self._get_project().containers() + for container in self.get_project().containers() } - - def get_project(self) -> ComposeProject: - return self._get_project() diff --git a/confluent/docker_utils/compose.py b/confluent/docker_utils/compose.py index 6da3fd0..3570390 100644 --- a/confluent/docker_utils/compose.py +++ b/confluent/docker_utils/compose.py @@ -6,21 +6,6 @@ import docker.errors import yaml -__all__ = [ - 'ComposeConfig', - 'ComposeContainer', - 'ComposeProject', - 'ComposeService', - 'create_docker_client', - 'LABEL_PROJECT', - 'LABEL_SERVICE', - 'STATUS_RUNNING', - 'STATUS_EXITED', - 'STATE_KEY', - 'EXIT_CODE_KEY', - 'VOLUME_MODE_RW', -] - LABEL_PROJECT = "com.docker.compose.project" LABEL_SERVICE = "com.docker.compose.service" @@ -36,15 +21,18 @@ ENV_VAR_BRACED_PATTERN = re.compile(r'\$\{([^}]+)\}') ENV_VAR_SIMPLE_PATTERN = re.compile(r'\$([A-Za-z_][A-Za-z0-9_]*)') +ENV_VAR_DEFAULT_UNSET_PATTERN = re.compile(r'^([A-Za-z_][A-Za-z0-9_]*)-(.+)$') def _resolve_braced_env_var(expr: str) -> str: if ':-' in expr: name, default = expr.split(':-', 1) - return os.environ.get(name, default) + value = os.environ.get(name, '') + return default if value == '' else value - if '-' in expr and not expr.startswith('-'): - name, default = expr.split('-', 1) + match = ENV_VAR_DEFAULT_UNSET_PATTERN.match(expr) + if match: + name, default = match.groups() env_val = os.environ.get(name) return env_val if env_val is not None else default @@ -120,7 +108,6 @@ def name_without_project(self) -> str: service_label = self.container.labels.get(LABEL_SERVICE) if service_label: return service_label - # Container name format: {project}_{service}_{instance} parts = self.name.split('_') if len(parts) >= 3: return '_'.join(parts[1:-1]) @@ -155,13 +142,13 @@ def start(self) -> None: def stop(self, timeout: int = 10) -> None: try: self.container.stop(timeout=timeout) - except docker.errors.APIError: + except docker.errors.NotFound: pass def remove(self, force: bool = False, v: bool = False) -> None: try: self.container.remove(force=force, v=v) - except (docker.errors.NotFound, docker.errors.APIError): + except docker.errors.NotFound: pass def wait(self, timeout: Optional[int] = None) -> Dict[str, Any]: @@ -222,8 +209,12 @@ def _get_or_create_network(self) -> docker.models.networks.Network: def up(self, services: Optional[List[str]] = None) -> None: self._get_or_create_network() service_list = services or list(self.config.services.keys()) - for service_name in service_list: - self._start_service(service_name) + try: + for service_name in service_list: + self._start_service(service_name) + except Exception: + self.down() + raise def down(self, remove_images: Optional[str] = None, remove_volumes: bool = False, remove_orphans: bool = False) -> None: @@ -231,7 +222,7 @@ def down(self, remove_images: Optional[str] = None, remove_volumes: bool = False try: container.stop() container.remove(force=True, v=remove_volumes) - except (docker.errors.NotFound, docker.errors.APIError): + except docker.errors.NotFound: pass self._remove_network() @@ -240,7 +231,7 @@ def _remove_network(self) -> None: try: network = self.client.networks.get(self.network_name) network.remove() - except (docker.errors.NotFound, docker.errors.APIError): + except docker.errors.NotFound: pass self._network = None @@ -276,11 +267,7 @@ def _start_service(self, service_name: str) -> ComposeContainer: service_config = self.config.get_service(service_name) run_kwargs = self._build_run_kwargs(service_name, service_config) - try: - container = self.client.containers.run(**run_kwargs) - except docker.errors.APIError as err: - raise RuntimeError(f"Failed to start service '{service_name}': {err}") from err - + container = self.client.containers.run(**run_kwargs) self._verify_container_running(container, service_name) return ComposeContainer(container) @@ -357,7 +344,7 @@ def _parse_environment(self, env: Any) -> Dict[str, str]: for key, value in env.items() } - return env + raise ValueError(f"environment must be list or dict, got {type(env).__name__}") def _parse_ports(self, ports: List[Any]) -> Dict[str, Any]: result = {} @@ -366,17 +353,26 @@ def _parse_ports(self, ports: List[Any]) -> Dict[str, Any]: parts = port_str.split(':') if len(parts) == 1: - result[parts[0]] = None + container_port = self._normalize_port(parts[0]) + result[container_port] = None elif len(parts) == 2: host_port, container_port = parts + container_port = self._normalize_port(container_port) result[container_port] = int(host_port) if host_port.isdigit() else host_port elif len(parts) == 3: ip_addr, host_port, container_port = parts - host_binding = int(host_port) if host_port else None + container_port = self._normalize_port(container_port) + host_binding = int(host_port) if host_port.isdigit() else None result[container_port] = (ip_addr, host_binding) return result + def _normalize_port(self, port: str) -> str: + port = port.strip() + if '/' not in port: + return f"{port}/tcp" + return port + def _parse_volumes(self, volumes: List[str]) -> Dict[str, Dict[str, str]]: result = {} for volume_spec in volumes: From ef7fe35d30769ce07f471594662bf7e6ceac2ea7 Mon Sep 17 00:00:00 2001 From: Abhishek Krishna Date: Wed, 18 Feb 2026 15:39:04 +0530 Subject: [PATCH 11/13] Rename api_client() to docker_client() for clarity --- confluent/docker_utils/__init__.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/confluent/docker_utils/__init__.py b/confluent/docker_utils/__init__.py index 457d672..1b4de54 100644 --- a/confluent/docker_utils/__init__.py +++ b/confluent/docker_utils/__init__.py @@ -27,7 +27,7 @@ pass -def api_client() -> docker.DockerClient: +def docker_client() -> docker.DockerClient: return docker.from_env() @@ -44,7 +44,7 @@ def ecr_login() -> None: def build_image(image_name: str, dockerfile_dir: str) -> None: print(f"Building image {image_name} from {dockerfile_dir}") - _, build_logs = api_client().images.build( + _, build_logs = docker_client().images.build( path=dockerfile_dir, rm=True, tag=image_name, decode=True ) for log_line in build_logs: @@ -57,7 +57,7 @@ def build_image(image_name: str, dockerfile_dir: str) -> None: def image_exists(image_name: str) -> bool: try: - api_client().images.get(image_name) + docker_client().images.get(image_name) return True except docker.errors.ImageNotFound: return False @@ -67,7 +67,7 @@ def pull_image(image_name: str) -> None: if image_exists(image_name): return try: - api_client().images.pull(image_name) + docker_client().images.pull(image_name) except docker.errors.APIError as err: raise RuntimeError(f"Failed to pull image '{image_name}': {err}") from err @@ -117,7 +117,7 @@ def run_docker_command(timeout: Optional[int] = None, **kwargs) -> bytes: host_config=kwargs.get('host_config', {}) ) - container = api_client().containers.create(**container_config) + container = docker_client().containers.create(**container_config) try: container.start() container.wait(timeout=timeout) From 44efcb2a91f053ca42e1afcc10075fa6e0fb0062 Mon Sep 17 00:00:00 2001 From: Abhishek Krishna Date: Wed, 18 Feb 2026 18:03:06 +0530 Subject: [PATCH 12/13] fixing comments --- confluent/docker_utils/__init__.py | 7 ++----- confluent/docker_utils/compose.py | 2 +- requirements.txt | 10 +++++----- 3 files changed, 8 insertions(+), 11 deletions(-) diff --git a/confluent/docker_utils/__init__.py b/confluent/docker_utils/__init__.py index 1b4de54..3197e4c 100644 --- a/confluent/docker_utils/__init__.py +++ b/confluent/docker_utils/__init__.py @@ -133,7 +133,7 @@ def _cleanup_container(container) -> None: container.stop() container.remove() except docker.errors.NotFound: - pass + print(f"Container {container.id} already removed") def path_exists_in_image(image: str, path: str) -> bool: @@ -167,10 +167,7 @@ def run_command_on_host(command: str) -> bytes: def run_cmd(command: str) -> bytes: shell_command = f'bash -c {command}' if command.startswith('"') else command - try: - return subprocess.check_output(shell_command, shell=True, stderr=subprocess.STDOUT) - except subprocess.CalledProcessError as err: - raise RuntimeError(f"Command failed with exit code {err.returncode}: {err.output}") from err + return subprocess.check_output(shell_command, shell=True, stderr=subprocess.STDOUT) def add_registry_and_tag(image: str, scope: str = '') -> str: diff --git a/confluent/docker_utils/compose.py b/confluent/docker_utils/compose.py index 3570390..7e64e54 100644 --- a/confluent/docker_utils/compose.py +++ b/confluent/docker_utils/compose.py @@ -212,7 +212,7 @@ def up(self, services: Optional[List[str]] = None) -> None: try: for service_name in service_list: self._start_service(service_name) - except Exception: + except (docker.errors.APIError, RuntimeError, ValueError): self.down() raise diff --git a/requirements.txt b/requirements.txt index 14d150a..c8d6a8f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,5 @@ -boto3>=1.36.0 -docker>=7.1.0 -Jinja2>=3.1.0 -PyYAML>=6.0.0 -requests>=2.32.0 +boto3~=1.36.0 +docker~=7.1.0 +Jinja2~=3.1.0 +PyYAML~=6.0.0 +requests~=2.32.0 From 09182eb7a5365011859ecec30609be59892d9b6b Mon Sep 17 00:00:00 2001 From: Abhishek Krishna Date: Thu, 19 Feb 2026 12:50:16 +0530 Subject: [PATCH 13/13] Address PR review comments: simplify boto3 import, use compatible release versions, add specific type hints --- confluent/docker_utils/__init__.py | 16 ++++------------ requirements.txt | 2 +- 2 files changed, 5 insertions(+), 13 deletions(-) diff --git a/confluent/docker_utils/__init__.py b/confluent/docker_utils/__init__.py index 3197e4c..0e0f438 100644 --- a/confluent/docker_utils/__init__.py +++ b/confluent/docker_utils/__init__.py @@ -1,8 +1,9 @@ import base64 import os import subprocess -from typing import Dict, Optional, Tuple +from typing import Any, Dict, Optional, Tuple +import boto3 import docker from .compose import ( @@ -20,21 +21,12 @@ HOST_CONFIG_BINDS = "Binds" TESTING_LABEL = "io.confluent.docker.testing" -boto3 = None -try: - import boto3 -except ImportError: - pass - def docker_client() -> docker.DockerClient: return docker.from_env() def ecr_login() -> None: - if boto3 is None: - raise ImportError("boto3 required for ECR login: pip install boto3") - ecr = boto3.client('ecr') auth_data = ecr.get_authorization_token()['authorizationData'][0] token = base64.b64decode(auth_data['authorizationToken'].encode()).decode() @@ -90,8 +82,8 @@ def _parse_binds(binds: list) -> Dict[str, Dict[str, str]]: return result -def _build_container_config(image: str, command: Optional[str], labels: Dict, - host_config: Dict) -> Dict: +def _build_container_config(image: str, command: Optional[str], labels: Dict[str, str], + host_config: Dict[str, Any]) -> Dict[str, Any]: config = { 'image': image, 'command': command, diff --git a/requirements.txt b/requirements.txt index c8d6a8f..56afb23 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,4 @@ -boto3~=1.36.0 +boto3~=1.42.49 docker~=7.1.0 Jinja2~=3.1.0 PyYAML~=6.0.0