diff --git a/.github/workflows/actions.yml b/.github/workflows/actions.yml index 38b7432e..47624ca1 100644 --- a/.github/workflows/actions.yml +++ b/.github/workflows/actions.yml @@ -31,6 +31,7 @@ jobs: interactor-tests: name: Interactor Tests runs-on: ubuntu-latest + timeout-minutes: 120 steps: - name: Checkout code uses: actions/checkout@v4 @@ -57,16 +58,50 @@ jobs: - name: Setup Docker run: | - sudo systemctl start docker || true - docker pull multiversx/chainsimulator || true + sudo systemctl start docker + docker pull multiversx/chainsimulator - - name: Run targeted interactor test via wrapper + - name: Run first interactor test suite + timeout-minutes: 60 env: PATH: ${{ github.workspace }}/interactor/scripts:${{ env.PATH }} WORKSPACE_ROOT: ${{ github.workspace }} - MAX_TEST_CONCURRENCY: "4" + MAX_TEST_CONCURRENCY: "2" + GITHUB_ACTIONS: "true" run: | - cargo test --package rust-interact --test complete_flow_tests --all-features - cargo test --package rust-interact --test mvx_esdt_safe_tests --all-features + cargo test --package rust-interact --test complete_flow_tests --all-features + + - name: Cleanup containers after first test suite + if: always() + run: | + # Clean up containers from first test suite before starting second + CONTAINERS=$(docker ps -a --filter "name=chain-sim-" --format "{{.ID}}") + if [ -n "$CONTAINERS" ]; then + echo "Cleaning up containers: $CONTAINERS" + docker stop $CONTAINERS + docker rm -f $CONTAINERS + fi + # Wait a moment for ports to be released + sleep 2 + - name: Run second interactor test suite + timeout-minutes: 60 + env: + PATH: ${{ github.workspace }}/interactor/scripts:${{ env.PATH }} + WORKSPACE_ROOT: ${{ github.workspace }} + MAX_TEST_CONCURRENCY: "2" + GITHUB_ACTIONS: "true" + run: | + cargo test --package rust-interact --test mvx_esdt_safe_tests --all-features + + - name: Cleanup chain simulator containers + if: always() + run: | + # Remove only the per-test chain simulator containers created by the wrapper + CONTAINERS=$(docker ps -a --filter "name=chain-sim-" --format "{{.ID}}") + if [ -n "$CONTAINERS" ]; then + echo "Final cleanup of containers: $CONTAINERS" + docker stop $CONTAINERS + docker rm -f $CONTAINERS + fi diff --git a/interactor/scripts/cargo-test-wrapper.py b/interactor/scripts/cargo-test-wrapper.py index 665bc7da..0be56aca 100755 --- a/interactor/scripts/cargo-test-wrapper.py +++ b/interactor/scripts/cargo-test-wrapper.py @@ -4,6 +4,7 @@ Called by the cargo wrapper script when an interactor test is detected """ +import fcntl import os import random import socket @@ -24,8 +25,8 @@ # Maximum number of test cases to run in parallel # Can be overridden via MAX_TEST_CONCURRENCY environment variable def get_max_concurrency() -> int: - """Get maximum concurrency from environment or default to 2.""" - return int(os.environ.get("MAX_TEST_CONCURRENCY", "2")) + """Get maximum concurrency from environment or default to 4.""" + return int(os.environ.get("MAX_TEST_CONCURRENCY", "4")) def remove_script_dir_from_path(script_dir: Path) -> str: @@ -138,43 +139,103 @@ def find_available_port() -> int: Generates a port based on PID, timestamp, and random component, then verifies it's not in use by checking socket binding and Docker. + Uses a lock file to prevent race conditions in parallel execution. Returns: An available port number between 1024 and 65535. Raises: - SystemExit: If no available port is found after 100 attempts. + SystemExit: If no available port is found after 200 attempts. """ base_port = 8085 - port = base_port + (os.getpid() % 1000) + (int(time.time()) % 1000) + random.randint(0, 99) + # Use a wider range to avoid collisions in parallel execution + port = base_port + (os.getpid() % 5000) + (int(time.time() * 1000) % 5000) + random.randint(0, 999) while port < 1024 or port > 65535: - port = base_port + (port % 1000) + random.randint(0, 99) + port = base_port + (port % 5000) + random.randint(0, 999) + + # Use a lock file to prevent race conditions + lock_file_path = os.path.join(tempfile.gettempdir(), f"port_lock_{port}.lock") + lock_file = None attempts = 0 - while attempts < 100: + while attempts < 200: # Increased attempts for better reliability + # Check if port is available via socket binding sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) try: sock.bind(("localhost", port)) sock.close() except OSError: - port += 1 + port = base_port + (port % 5000) + random.randint(0, 999) + if port < 1024 or port > 65535: + port = base_port + (port % 5000) + random.randint(0, 999) attempts += 1 continue - result = subprocess.run(["docker", "ps", "--filter", f"publish={port}", "--format", "{{.ID}}"], capture_output=True, text=True, check=False) + # Check Docker containers using both publish filter and name pattern + result = subprocess.run( + ["docker", "ps", "--filter", f"publish={port}", "--format", "{{.ID}}"], + capture_output=True, + text=True, + check=False, + timeout=5, + ) if result.returncode == 0 and result.stdout.strip(): - port += 1 + port = base_port + (port % 5000) + random.randint(0, 999) + if port < 1024 or port > 65535: + port = base_port + (port % 5000) + random.randint(0, 999) attempts += 1 continue - break + # Try to acquire lock on this port + try: + lock_file = open(lock_file_path, "w") + fcntl.flock(lock_file.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB) + # Double-check Docker after acquiring lock + result = subprocess.run( + ["docker", "ps", "--filter", f"publish={port}", "--format", "{{.ID}}"], + capture_output=True, + text=True, + check=False, + timeout=5, + ) + if result.returncode == 0 and result.stdout.strip(): + lock_file.close() + try: + os.remove(lock_file_path) + except OSError: + pass + port = base_port + (port % 5000) + random.randint(0, 999) + if port < 1024 or port > 65535: + port = base_port + (port % 5000) + random.randint(0, 999) + attempts += 1 + continue + # Port is available, lock acquired + break + except (IOError, OSError): + # Lock acquisition failed, port might be in use + if lock_file: + lock_file.close() + port = base_port + (port % 5000) + random.randint(0, 999) + if port < 1024 or port > 65535: + port = base_port + (port % 5000) + random.randint(0, 999) + attempts += 1 + continue - if port > 65535: - print("Failed to find available port after 100 attempts", file=sys.stderr) + if port > 65535 or attempts >= 200: + if lock_file: + lock_file.close() + try: + os.remove(lock_file_path) + except OSError: + pass + print("Failed to find available port after 200 attempts", file=sys.stderr) sys.exit(1) + # Lock will be released when process exits, but we keep it for now + # The lock file will be cleaned up on process exit return port @@ -210,7 +271,6 @@ def wait_for_simulator(port: int, container_name: str, max_attempts: int = 30) - return False - def filter_output(output: str) -> str: """Filter out duplicate and empty 'successes:' and 'failures:' sections. @@ -278,12 +338,14 @@ def print_test_output(case_name: str, output: str, exit_code: int): print(f"{'='*80}\n", file=sys.stderr) -def run_parallel_tests(test_cases: List[str], args: List[str]) -> None: +def run_parallel_tests(test_cases: List[str], args: List[str], workspace_root: str, test_file: Optional[str] = None) -> None: """Run multiple test cases in parallel with concurrency limit. Args: test_cases: List of test case names to run. args: Original cargo test arguments to reuse. + workspace_root: Root directory of the workspace. + test_file: Optional name of the test file (for summary). Exits with 0 if all tests pass, 1 if any fail, 130 on KeyboardInterrupt. """ @@ -293,10 +355,10 @@ def run_parallel_tests(test_cases: List[str], args: List[str]) -> None: processes = {} completed_processes = set() exit_codes = {} + container_names = {} # Track containers for cleanup test_index = 0 try: - # Start initial batch of processes up to max_concurrency while test_index < len(test_cases) and len(processes) < max_concurrency: case_name = test_cases[test_index] case_args = list(args) @@ -323,7 +385,6 @@ def run_parallel_tests(test_cases: List[str], args: List[str]) -> None: processes[case_name] = process test_index += 1 - # Process completed tests and start new ones as slots become available while len(completed_processes) < len(test_cases): for case_name, process in list(processes.items()): if case_name in completed_processes: @@ -338,7 +399,6 @@ def run_parallel_tests(test_cases: List[str], args: List[str]) -> None: print_test_output(case_name, output, exit_code) - # Start next test if available if test_index < len(test_cases): next_case_name = test_cases[test_index] case_args = list(args) @@ -368,7 +428,9 @@ def run_parallel_tests(test_cases: List[str], args: List[str]) -> None: if len(completed_processes) < len(test_cases): time.sleep(0.1) - # Calculate summary statistics + # Wait for all child processes to finish + time.sleep(1) + passed_tests = [] failed_tests = [] @@ -379,7 +441,6 @@ def run_parallel_tests(test_cases: List[str], args: List[str]) -> None: else: failed_tests.append(case_name) - # Print summary total_tests = len(test_cases) passed_count = len(passed_tests) failed_count = len(failed_tests) @@ -396,6 +457,11 @@ def run_parallel_tests(test_cases: List[str], args: List[str]) -> None: for test_name in failed_tests: print(f" - {test_name}", file=sys.stderr) + # Final cleanup at the end of the process + # In GitHub Actions: only clean up containers from this process + # In local: comprehensive cleanup (all containers, networks, volumes) + cleanup_all_docker_resources() + overall_exit_code = 0 if failed_count == 0 else 1 sys.exit(overall_exit_code) @@ -407,9 +473,161 @@ def run_parallel_tests(test_cases: List[str], args: List[str]) -> None: except subprocess.TimeoutExpired: process.kill() process.wait() + # Final cleanup at the end of the process (even on interrupt) + # In GitHub Actions: only clean up containers from this process + # In local: comprehensive cleanup (all containers, networks, volumes) + cleanup_all_docker_resources() sys.exit(130) +def cleanup_container(container_name: str) -> None: + """Stop and remove a Docker container. + + Args: + container_name: Name of the container to clean up. + """ + # Check if container exists before attempting cleanup + check_result = subprocess.run( + ["docker", "ps", "-a", "--filter", f"name=^{container_name}$", "--format", "{{.Names}}"], + capture_output=True, + text=True, + check=False, + timeout=5, + ) + + if check_result.returncode != 0 or not check_result.stdout.strip(): + # Container doesn't exist, nothing to clean up + return + + try: + subprocess.run( + ["docker", "stop", container_name], + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + check=False, + timeout=10, + ) + subprocess.run( + ["docker", "rm", container_name], + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + check=False, + timeout=10, + ) + except subprocess.TimeoutExpired: + # Force kill if stop times out + subprocess.run( + ["docker", "kill", container_name], + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + check=False, + ) + subprocess.run( + ["docker", "rm", "-f", container_name], + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + check=False, + ) + except Exception: + # Ignore errors during cleanup + pass + + +def is_github_actions() -> bool: + """Check if running in GitHub Actions environment. + + Returns: + True if running in GitHub Actions, False otherwise. + """ + return os.environ.get("GITHUB_ACTIONS") == "true" + + +def cleanup_all_docker_resources(container_name: Optional[str] = None) -> None: + """Clean up Docker resources created by tests. + + In GitHub Actions: Only cleans up the specified container (or containers matching current PID). + In local runs: Performs comprehensive cleanup of all chain-sim- containers, networks, and volumes. + + Args: + container_name: Optional specific container name to clean up. If None and in GitHub Actions, + only cleans up containers matching the current process PID. + """ + try: + if is_github_actions(): + # In GitHub Actions: only clean up containers from this process + if container_name: + # Clean up specific container + cleanup_container(container_name) + else: + # Clean up containers matching current PID pattern + current_pid = os.getpid() + result = subprocess.run( + ["docker", "ps", "-a", "--filter", f"name=chain-sim-", "--format", "{{.Names}}"], + capture_output=True, + text=True, + check=False, + timeout=10, + ) + + if result.returncode == 0 and result.stdout.strip(): + for name in result.stdout.strip().split("\n"): + if name.strip() and f"-{current_pid}-" in name: + cleanup_container(name.strip()) + else: + # Local runs: comprehensive cleanup + # Clean up all chain-sim- containers + result = subprocess.run( + ["docker", "ps", "-a", "--filter", "name=chain-sim-", "--format", "{{.ID}}"], + capture_output=True, + text=True, + check=False, + timeout=10, + ) + + if result.returncode == 0 and result.stdout.strip(): + container_ids = result.stdout.strip().split("\n") + for container_id in container_ids: + if container_id.strip(): + try: + subprocess.run( + ["docker", "stop", container_id.strip()], + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + check=False, + timeout=10, + ) + subprocess.run( + ["docker", "rm", "-f", container_id.strip()], + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + check=False, + timeout=10, + ) + except Exception: + pass + + # Clean up dangling networks + subprocess.run( + ["docker", "network", "prune", "-f"], + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + check=False, + timeout=30, + ) + + # Clean up dangling volumes + subprocess.run( + ["docker", "volume", "prune", "-f"], + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + check=False, + timeout=30, + ) + except Exception: + # Ignore errors during cleanup + pass + + def start_simulator_container(port: int, container_name: str) -> bool: """Start the chain simulator Docker container and wait for it to be ready. @@ -421,8 +639,6 @@ def start_simulator_container(port: int, container_name: str) -> bool: True if simulator started successfully, False otherwise. """ print(f"Starting chain simulator on port {port}...", file=sys.stderr) - # Add memory limit (2GB per container) to prevent OOM kills with 2 parallel containers - # With 16GB total RAM, 2 containers * 2GB = 4GB, leaving 12GB for system and other processes result = subprocess.run( ["docker", "run", "-d", "-p", f"{port}:8085", "--memory=2g", "--name", container_name, "multiversx/chainsimulator"], stdout=subprocess.DEVNULL, @@ -436,6 +652,7 @@ def start_simulator_container(port: int, container_name: str) -> bool: if not wait_for_simulator(port, container_name): print("Chain simulator failed to start after 30 seconds", file=sys.stderr) + cleanup_container(container_name) return False return True @@ -502,29 +719,59 @@ def main(): test_cases = discover_test_cases(test_file, INTERACTOR_PACKAGE, workspace_root, filter_test_name) if test_cases: - run_parallel_tests(test_cases, args) + run_parallel_tests(test_cases, args, workspace_root, test_file) + else: + sys.exit(1) - # Check if CHAIN_SIMULATOR_PORT is already set (container started externally) port_str = os.environ.get("CHAIN_SIMULATOR_PORT") + container_name = None if port_str: + # This is a child process running a specific test (spawned by parallel runner) + # Each child creates its own container, so we need to track it for cleanup port = int(port_str) - # Check if container already exists on this port - result = subprocess.run( - ["docker", "ps", "--filter", f"publish={port}", "--format", "{{.Names}}"], - capture_output=True, - text=True, - check=False, - ) - if result.returncode == 0 and result.stdout.strip(): - container_name = result.stdout.strip().split("\n")[0] - print(f"Using existing chain simulator container '{container_name}' on port {port}", file=sys.stderr) - # Verify it's still running and ready - if not wait_for_simulator(port, container_name, max_attempts=5): - print(f"Warning: Container {container_name} on port {port} may not be ready", file=sys.stderr) - exit_code = run_test(args, script_dir, port, test_file, test_name) - sys.exit(exit_code) + exit_code = 0 - # Otherwise, start a new container as before + try: + # Check if container already exists (shouldn't happen in normal flow) + result = subprocess.run( + ["docker", "ps", "--filter", f"publish={port}", "--format", "{{.Names}}"], + capture_output=True, + text=True, + check=False, + ) + if result.returncode == 0 and result.stdout.strip(): + container_name = result.stdout.strip().split("\n")[0] + print(f"Using existing chain simulator container '{container_name}' on port {port}", file=sys.stderr) + if not wait_for_simulator(port, container_name, max_attempts=5): + print(f"Warning: Container {container_name} on port {port} may not be ready", file=sys.stderr) + else: + # Create a new container for this child process + random_suffix = random.randint(1000, 9999) + container_name = f"chain-sim-{port}-{os.getpid()}-{int(time.time())}-{random_suffix}" + if not start_simulator_container(port, container_name): + exit_code = 1 + container_name = None + + if exit_code == 0 and container_name: + exit_code = run_test(args, script_dir, port, test_file, test_name) + except KeyboardInterrupt: + exit_code = 130 + except Exception as e: + print(f"Unexpected error: {e}", file=sys.stderr) + exit_code = 1 + finally: + # Final cleanup at the end of the child process (always runs) + if container_name: + if is_github_actions(): + cleanup_all_docker_resources(container_name) + else: + cleanup_container(container_name) + # Additional comprehensive cleanup for local runs + cleanup_all_docker_resources() + + sys.exit(exit_code) + + # This is a single test execution (not spawned by parallel runner) port = find_available_port() os.environ["CHAIN_SIMULATOR_PORT"] = str(port) @@ -535,13 +782,22 @@ def main(): try: if not start_simulator_container(port, container_name): exit_code = 1 - exit_code = run_test(args, script_dir, port, test_file, test_name) - + else: + exit_code = run_test(args, script_dir, port, test_file, test_name) except KeyboardInterrupt: exit_code = 130 except Exception as e: print(f"Unexpected error: {e}", file=sys.stderr) exit_code = 1 + finally: + # Final cleanup at the end of the process (always runs) + if container_name: + if is_github_actions(): + cleanup_all_docker_resources(container_name) + else: + cleanup_container(container_name) + # Additional comprehensive cleanup for local runs + cleanup_all_docker_resources() sys.exit(exit_code)