Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 91 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
# Secrets and internal config files
**/.secrets/*
.env

# Ignore meltano internal cache and sqlite systemdb

.meltano/
plugins/
output/

# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class

# C extensions
*.so

# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
pip-wheel-metadata/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST

# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec

# Installer logs
pip-log.txt
pip-delete-this-directory.txt

# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/

# IPython
profile_default/
ipython_config.py

# pyenv
.python-version

# PEP 582; used by e.g. github.com/David-OConnor/pyflow
__pypackages__/


# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/

# mkdocs documentation
/site

# mypy
.mypy_cache/
.dmypy.json
dmypy.json

80 changes: 80 additions & 0 deletions meltano.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
version: 1
default_environment: dev
project_id: tap-mailgun

environments:
- name: dev
- name: staging
- name: prod

plugins:
extractors:
- name: tap-mailgun
namespace: tap_mailgun
pip_url: -e .
executable: tap-mailgun
capabilities:
- catalog
- discover
- state
settings:
- name: api_key
kind: password # To mark as sensitive
label: API Key
description: Your Mailgun API Key.
sensitive: true
- name: base_url
value: https://api.mailgun.net
label: Base URL
description: Mailgun API Base URL (e.g., https://api.mailgun.net or
https://api.eu.mailgun.net). Do not include /v3.
- name: start_date
kind: string # RFC 2822 is strictly needed by the tap and Meltano UI doesn't support it directly
label: Start Date (Analytics)
description: Start date for analytics metrics (RFC 2822 format, e.g.,
'Mon, 01 Jan 2024 00:00:00 +0000'). Used if Analytics Duration is not
set.
- name: end_date
kind: string
label: End Date (Analytics)
description: End date for analytics metrics (RFC 2822 format, e.g., 'Tue,
31 Dec 2024 23:59:59 +0000').

# Analytics Metrics Stream specific settings
- name: analytics_resolution
label: 'Analytics: Resolution'
description: Resolution for analytics metrics ('day', 'hour', or 'month').
- name: analytics_duration
kind: string # Optional
label: 'Analytics: Duration'
description: Duration for analytics metrics (e.g., '1d', '7d', '1m').
Overwrites Start Date if provided.
- name: analytics_dimensions
kind: array
label: 'Analytics: Dimensions'
description: Array of dimensions for analytics metrics (e.g., ["time",
"tag"]). At least 'time' is recommended.
- name: analytics_metrics
kind: array
label: 'Analytics: Metrics'
description: Array of metrics to retrieve (e.g., ["delivered_count",
"opened_count"]).
- name: analytics_filters
kind: array
label: 'Analytics: Filters'
description: 'An array of filter objects. Each object should have "attribute", "comparator", and "value" keys (e.g., [{"attribute": "domain", "comparator": "=", "value": "example.com"}]). Refer to Mailgun API documentation for filter structure.'
value: []
- name: analytics_include_subaccounts
kind: boolean
value: true
label: 'Analytics: Include Subaccounts'
description: Include stats from all subaccounts for analytics.
- name: analytics_include_aggregates
kind: boolean
value: true
label: 'Analytics: Include Aggregates'
description: Include top-level aggregate metrics for analytics.
loaders:
- name: target-jsonl
variant: andyh1203
pip_url: target-jsonl
34 changes: 34 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
[build-system]
requires = [
"hatchling>=1,<2",
]
build-backend = "hatchling.build"

[project]
name = "tap-mailgun"
version = "0.1.0"
description = "Singer tap for extracting data from Mailgun API"
authors = [
{ name="Jonathan Perron", email="jonathan.perron@lumapps.com" },
]
readme = "README.md"
requires-python = ">=3.10"
classifiers = [
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
"License :: OSI Approved :: Apache Software License",
"Development Status :: 4 - Beta",
"Intended Audience :: Developers",
]
dependencies = [
"singer-sdk~=0.46.4",
"requests~=2.32.3",
]

[project.scripts]
tap-mailgun = "tap_mailgun:TapMailgun.cli"

[project.urls]
Homepage = "https://github.com/lumapps/tap-mailgun" # Replace with your actual URL
Repository = "https://github.com/lumapps/tap-mailgun" # Replace with your actual URL
3 changes: 3 additions & 0 deletions tap_mailgun/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .tap import TapMailgun

__all__ = ["TapMailgun"]
7 changes: 7 additions & 0 deletions tap_mailgun/__main__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
"""TapMailgun entry point."""

from __future__ import annotations

from tap_mailgun.tap import TapMailgun

TapMailgun.cli()
123 changes: 123 additions & 0 deletions tap_mailgun/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
"""REST client handling, including MailgunStream base class."""

from __future__ import annotations

from importlib import resources
from requests import Response
from typing import Any, Optional, Any, Dict, Iterable

from singer_sdk.authenticators import BasicAuthenticator
from singer_sdk.streams import RESTStream

SCHEMAS_DIR = resources.files(__package__) / "schemas"


class MailgunStream(RESTStream):
"""Mailgun stream class."""

@property
def url_base(self) -> str:
"""Return the API URL root, configurable via tap settings."""
return self.config.get("base_url", "https://api.mailgun.net")


@property
def authenticator(self) -> BasicAuthenticator:
"""Return a basic authenticator."""
return BasicAuthenticator(
stream=self,
username="api",
password=self.config.get("api_key")
)

def prepare_request_payload(
self, context: Optional[dict], next_page_token: Optional[Any]
) -> Optional[dict]:
"""Prepare the data payload for the API request.

Args:
context: Stream partition or context dictionary.
next_page_token: Token for the next page of results.

Returns:
A dictionary with the body payload, or None if no data needs to be sent.
"""
payload: dict = {}
payload["start"] = self.config.get("start_date")
payload["end"] = self.config.get("end_date")
payload["resolution"] = self.config.get("analytics_resolution")

if self.config.get("analytics_duration"):
payload["duration"] = self.config.get("analytics_duration")
# Note: If 'duration' is provided, Mailgun API calculates 'start' based on 'end' and 'duration'.
# The tap does not need to pre-calculate 'start' in this case.

payload["dimensions"] = self.config.get("analytics_dimensions")
payload["metrics"] = self.config.get("analytics_metrics")
payload["filter"] = self.config.get("analytics_filters")
payload["include_subaccounts"] = self.config.get("analytics_include_subaccounts")
payload["include_aggregates"] = self.config.get("analytics_include_aggregates")

# Pagination handling
pagination = {
"skip": next_page_token if next_page_token is not None else 0,
"limit": 300, # Default limit, can be adjusted
}
payload["pagination"] = pagination
return payload

def get_next_page_token(
self, response: Response, previous_token: t.Optional[t.Any]
) -> t.Optional[t.Any]:
"""Return a token for identifying next page or None if no more pages."""
data = response.json()
pagination_info = data.get("pagination", {})
current_skip = pagination_info.get("skip", 0)
limit = pagination_info.get("limit", 0) # Use the limit from the response
total = pagination_info.get("total", 0)

if (current_skip + limit) < total:
return current_skip + limit
return None

def parse_response(self, response: Response) -> Iterable[dict]:
"""Parse the response and extract records."""
data = response.json()
yield from data.get("items", [])

def post_process(self, row: dict, context: Optional[dict] = None) -> Optional[dict]:
"""Transform raw data in each record.

Flattens dimensions and metrics into a single record.
Extracts 'time_value' from the 'time' dimension for replication.
"""
processed_record: Dict[str, Any] = {}
time_dim_value: Optional[str] = None

# Extract dimensions
for dim_obj in row.get("dimensions", []):
dim_name = dim_obj.get("dimension")
dim_value = dim_obj.get("value")
if dim_name:
processed_record[str(dim_name)] = dim_value
if dim_name == "time":
time_dim_value = dim_value

if time_dim_value:
processed_record["time_value"] = time_dim_value
else:
# If 'time' dimension is crucial for replication and is missing,
# it might be an issue. For now, we log a warning.
# Ensure 'time' is always requested in 'analytics_dimensions' config.
self.logger.warning(
f"Record missing 'time' dimension or its value, "
f"which is used for replication_key 'time_value'. Record: {row}"
)
# Depending on requirements, could return None to skip record or raise an error.

# Extract metrics
metrics_data = row.get("metrics", {})
for metric_name, metric_value in metrics_data.items():
processed_record[str(metric_name)] = metric_value

return processed_record
Loading