Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions chuck_data/compute_providers/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
"""Compute Provider Abstractions.

Compute providers define where Stitch jobs execute (Databricks clusters, EMR clusters).
This is independent of where the data comes from (DataProvider).
"""

from chuck_data.compute_providers.provider import ComputeProvider
from chuck_data.compute_providers.databricks import DatabricksComputeProvider
from chuck_data.compute_providers.emr import EMRComputeProvider


__all__ = [
"ComputeProvider",
"DatabricksComputeProvider",
"EMRComputeProvider",
]
107 changes: 107 additions & 0 deletions chuck_data/compute_providers/databricks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
"""Databricks Compute Provider.

Runs Stitch jobs on Databricks clusters.
"""

from typing import Dict, Any


class DatabricksComputeProvider:
"""Run Stitch jobs on Databricks clusters.

This compute provider can process data from:
- Databricks Unity Catalog (direct access)
- AWS Redshift (via Spark-Redshift connector)

Note: This is a stub implementation for PR 1.
Full implementation will come in PR 3.
"""

def __init__(self, workspace_url: str, token: str, **kwargs):
"""Initialize Databricks compute provider.

Args:
workspace_url: Databricks workspace URL
token: Authentication token
**kwargs: Additional configuration options
"""
self.workspace_url = workspace_url
self.token = token
self.config = kwargs

def prepare_stitch_job(
self,
manifest: Dict[str, Any],
data_provider: Any,
config: Dict[str, Any],
) -> Dict[str, Any]:
"""Prepare job artifacts for Stitch execution.

Uploads manifests and init scripts via data_provider methods:
- Databricks data → data_provider.upload_manifest() to /Volumes
- Redshift data → data_provider.upload_manifest() to S3

Args:
manifest: Stitch configuration
data_provider: Data source provider (handles uploads to appropriate storage)
config: Job configuration

Returns:
Preparation results

Raises:
NotImplementedError: Stub implementation
"""
raise NotImplementedError(
"DatabricksComputeProvider.prepare_stitch_job() "
"will be implemented in PR 3"
)

def launch_stitch_job(self, preparation: Dict[str, Any]) -> Dict[str, Any]:
"""Launch the Stitch job on Databricks.

Args:
preparation: Results from prepare_stitch_job()

Returns:
Job execution results

Raises:
NotImplementedError: Stub implementation
"""
raise NotImplementedError(
"DatabricksComputeProvider.launch_stitch_job() "
"will be implemented in PR 3"
)

def get_job_status(self, job_id: str) -> Dict[str, Any]:
"""Get job status.

Args:
job_id: Job identifier

Returns:
Job status information

Raises:
NotImplementedError: Stub implementation
"""
raise NotImplementedError(
"DatabricksComputeProvider.get_job_status() " "will be implemented in PR 3"
)

def cancel_job(self, job_id: str) -> bool:
"""Cancel a running job.

Args:
job_id: Job identifier

Returns:
True if cancellation succeeded

Raises:
NotImplementedError: Stub implementation
"""
raise NotImplementedError(
"DatabricksComputeProvider.cancel_job() " "will be implemented in PR 3"
)
125 changes: 125 additions & 0 deletions chuck_data/compute_providers/emr.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
"""EMR Compute Provider.

Runs Stitch jobs on Amazon EMR clusters.
"""

from typing import Dict, Any, Optional


class EMRComputeProvider:
"""Run Stitch jobs on Amazon EMR clusters.

This compute provider can process data from:
- AWS Redshift (via Spark-Redshift connector)
- Databricks Unity Catalog (via Databricks JDBC connector)

Note: This is a stub implementation for PR 1.
Full implementation or detailed scaffolding will come in PR 4.

Uses boto3 credential discovery chain:
- AWS profiles (via aws_profile parameter)
- IAM roles
- Environment variables (AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)
- ~/.aws/credentials
"""

def __init__(
self,
region: str,
cluster_id: Optional[str] = None,
aws_profile: Optional[str] = None,
**kwargs,
):
"""Initialize EMR compute provider.

Args:
region: AWS region (e.g., 'us-west-2')
cluster_id: EMR cluster ID (optional, can create on-demand)
aws_profile: AWS profile name from ~/.aws/credentials (optional)
**kwargs: Additional configuration options (e.g., s3_bucket, iam_role)

Note: AWS credentials are discovered via boto3's standard credential chain:
1. Explicit profile (if aws_profile provided)
2. Environment variables (AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)
3. AWS credentials file (~/.aws/credentials)
4. IAM role (when running on EC2/ECS/Lambda)
"""
self.region = region
self.cluster_id = cluster_id
self.aws_profile = aws_profile
self.config = kwargs

def prepare_stitch_job(
self,
manifest: Dict[str, Any],
data_provider: Any,
config: Dict[str, Any],
) -> Dict[str, Any]:
"""Prepare job artifacts for Stitch execution.

Uploads manifests and init scripts via data_provider methods:
- Redshift data → data_provider.upload_manifest() to S3
- Databricks data → data_provider.upload_manifest() (if needed)

Args:
manifest: Stitch configuration
data_provider: Data source provider (handles uploads to appropriate storage)
config: Job configuration

Returns:
Preparation results

Raises:
NotImplementedError: Stub implementation
"""
raise NotImplementedError(
"EMRComputeProvider.prepare_stitch_job() " "will be implemented in PR 4"
)

def launch_stitch_job(self, preparation: Dict[str, Any]) -> Dict[str, Any]:
"""Launch the Stitch job on EMR.

Args:
preparation: Results from prepare_stitch_job()

Returns:
Job execution results

Raises:
NotImplementedError: Stub implementation
"""
raise NotImplementedError(
"EMRComputeProvider.launch_stitch_job() " "will be implemented in PR 4"
)

def get_job_status(self, job_id: str) -> Dict[str, Any]:
"""Get job status.

Args:
job_id: Job identifier

Returns:
Job status information

Raises:
NotImplementedError: Stub implementation
"""
raise NotImplementedError(
"EMRComputeProvider.get_job_status() " "will be implemented in PR 4"
)

def cancel_job(self, job_id: str) -> bool:
"""Cancel a running job.

Args:
job_id: Job identifier

Returns:
True if cancellation succeeded

Raises:
NotImplementedError: Stub implementation
"""
raise NotImplementedError(
"EMRComputeProvider.cancel_job() " "will be implemented in PR 4"
)
81 changes: 81 additions & 0 deletions chuck_data/compute_providers/provider.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
"""Compute Provider Protocol.

Defines the interface that all compute providers must implement.
"""

from typing import Protocol, Dict, Any, Optional


class ComputeProvider(Protocol):
"""Protocol for compute providers.

Compute providers define WHERE Stitch jobs execute (Databricks clusters, EMR clusters).
This is independent of where data comes from (DataProvider).

A Stitch job can:
- Read from Databricks → Run on Databricks
- Read from Redshift → Run on Databricks
- Read from Redshift → Run on EMR (future)
- Read from Databricks → Run on EMR (future)
"""

def prepare_stitch_job(
self,
manifest: Dict[str, Any],
data_provider: Any, # DataProvider type
config: Dict[str, Any],
) -> Dict[str, Any]:
"""Prepare job artifacts for Stitch execution.

This method prepares everything needed to run a Stitch job:
- Upload manifest and init scripts via data_provider methods
- Create job definitions (Databricks job, EMR step, etc.)
- Configure data connectors based on data_provider type

The data_provider handles uploads:
- Databricks data → Upload to /Volumes
- Redshift data → Upload to S3

Args:
manifest: Stitch configuration with tables and semantic tags
data_provider: Where the data comes from (Databricks/Redshift)
Also handles uploading artifacts to appropriate storage
config: Job configuration (cluster size, etc.)

Returns:
Dictionary containing preparation results (job_id, paths, etc.)
"""
...

def launch_stitch_job(self, preparation: Dict[str, Any]) -> Dict[str, Any]:
"""Launch the Stitch job on this compute platform.

Args:
preparation: Results from prepare_stitch_job()

Returns:
Dictionary containing job execution results (run_id, status, etc.)
"""
...

def get_job_status(self, job_id: str) -> Dict[str, Any]:
"""Get the status of a running or completed job.

Args:
job_id: Job identifier

Returns:
Dictionary containing job status information
"""
...

def cancel_job(self, job_id: str) -> bool:
"""Cancel a running job.

Args:
job_id: Job identifier

Returns:
True if cancellation succeeded
"""
...
15 changes: 15 additions & 0 deletions chuck_data/data_providers/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
"""Data Providers for accessing data from different platforms."""

from chuck_data.data_providers.provider import DataProvider
from chuck_data.data_providers.adapters import (
DatabricksProviderAdapter,
RedshiftProviderAdapter,
)
from chuck_data.data_providers.factory import DataProviderFactory

__all__ = [
"DataProvider",
"DatabricksProviderAdapter",
"RedshiftProviderAdapter",
"DataProviderFactory",
]
Loading