Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,16 @@ jobs:
coverage run -m pytest tests/unittests
coverage html
when: always
- run:
name: "Integration Tests"
command: |
source /usr/local/share/virtualenvs/tap-tester/bin/activate
uv pip install --upgrade awscli
aws s3 cp s3://com-stitchdata-dev-deployment-assets/environments/tap-tester/tap_tester_sandbox dev_env.sh
source dev_env.sh
unset USE_STITCH_BACKEND
uv pip install -e .
run-test --tap=tap-frontapp tests

workflows:
version: 2
Expand Down
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changelog

## 3.0.0
* Upgrade `singer-python` to `6.8.0` and `requests` to `2.33.0`
* Schema Update. [#39](https://github.com/singer-io/tap-frontapp/pull/39)

## 2.2.0
* Upgraded dependencies versions and added unit tests [#38](https://github.com/singer-io/tap-frontapp/pull/38)

Expand Down
9 changes: 5 additions & 4 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,26 @@

setup(
name="tap-frontapp",
version="2.2.0",
version="3.0.0",
description="Singer.io tap for extracting data from the FrontApp API",
author="bytcode.io",
url="http://singer.io",
classifiers=["Programming Language :: Python :: 3 :: Only"],
py_modules=["tap_frontapp"],
install_requires=[
"singer-python==6.7.0",
"singer-python==6.8.0",
"pendulum==3.2.0",
"ratelimit==2.2.1",
"backoff==2.2.1",
"requests==2.32.5",
"requests==2.33.0",
],
entry_points="""
[console_scripts]
tap-frontapp=tap_frontapp:main
""",
packages=find_packages(),
package_data = {
"schemas": ["tap_frontapp/schemas/*.json"]
"tap_frontapp": ["schemas/*.json"]
},
include_package_data=True
)
44 changes: 40 additions & 4 deletions tap_frontapp/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,41 @@
from singer import utils
from singer.catalog import Catalog
from .context import Context
from .discover import discover, validate_credentials
from .sync import sync
from . import schemas
from .discover import discover as _discover_impl, validate_credentials as _validate_credentials_impl
from .sync import sync as _sync_impl

REQUIRED_CONFIG_KEYS = ["token"]
LOGGER = singer.get_logger()


def discover(*args, **kwargs):
"""Public discover function re-exported from discover module.

This wrapper exists so tests can patch ``tap_frontapp.discover``
and so the CLI entrypoint can call a stable symbol regardless of
where the implementation lives.
"""

return _discover_impl(*args, **kwargs)


def validate_credentials(*args, **kwargs):
"""Public credential validation helper re-exported for tests."""

return _validate_credentials_impl(*args, **kwargs)


def sync(*args, **kwargs):
"""Public sync function re-exported from the sync module.

Tests patch ``tap_frontapp.sync`` and expect the entrypoint to call
this symbol directly.
"""

return _sync_impl(*args, **kwargs)


def get_abs_path(path):
"""Returns absolute path for a given relative path."""
return os.path.join(os.path.dirname(os.path.realpath(__file__)), path)
Expand All @@ -39,10 +66,19 @@ def main():
if args.discover:
validate_credentials(args.config["token"])
catalog = discover()
json.dump(catalog.to_dict(), sys.stdout)
json.dump(catalog.to_dict(), sys.stdout, indent=2)
else:
atx = Context(args.config, args.state)
atx.catalog = Catalog.from_dict(args.properties) if args.properties else discover()

catalog_obj = getattr(args, "properties", None)
if catalog_obj is None:
catalog_obj = getattr(args, "catalog", None)
if catalog_obj is None:
catalog_obj = discover()
elif isinstance(catalog_obj, dict):
catalog_obj = Catalog.from_dict(catalog_obj)

atx.catalog = catalog_obj
sync(atx)


Expand Down
21 changes: 12 additions & 9 deletions tap_frontapp/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,16 +60,19 @@ def get_schemas():

for stream_id in STATIC_SCHEMA_STREAM_IDS:
schema = load_schema(stream_id)
mdata = metadata.new()
# Use Singer's standard metadata
mdata = metadata.get_standard_metadata(
schema=schema,
key_properties=PK_FIELDS[stream_id],
valid_replication_keys=["analytics_date"],
replication_method="INCREMENTAL",
)

# Stream-level metadata
mdata = metadata.write(mdata, (), "inclusion", "available")
mdata = metadata.write(mdata, (), "table-key-properties", PK_FIELDS[stream_id])

# Field-level metadata
for field_name in schema["properties"]:
inclusion = "automatic" if field_name in PK_FIELDS[stream_id] else "available"
mdata = metadata.write(mdata, ("properties", field_name), "inclusion", inclusion)
mdata = metadata.to_map(mdata)
automatic_fields = set(PK_FIELDS[stream_id]) | {"analytics_date"}
for field_name in schema.get("properties", {}).keys():
if field_name in automatic_fields:
mdata = metadata.write(mdata, ("properties", field_name), "inclusion", "automatic")

schemas[stream_id] = schema
metadata_map[stream_id] = mdata
Expand Down
3 changes: 2 additions & 1 deletion tap_frontapp/schemas/accounts_table.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
"type": ["null", "string"]
},
"analytics_date": {
"type": ["null", "string"]
"type": ["null", "string"],
"format": "date-time"
Comment on lines +9 to +10
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Such datatype updates cause column split on the destination side if we do minor release. IMO, this should be major release.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I update the major version 3.0.0

},
"analytics_range": {
"type": ["null", "string"]
Expand Down
3 changes: 2 additions & 1 deletion tap_frontapp/schemas/channels_table.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
"type": ["null", "string"]
},
"analytics_date": {
"type": ["null", "string"]
"type": ["null", "string"],
"format": "date-time"
},
"analytics_range": {
"type": ["null", "string"]
Expand Down
3 changes: 2 additions & 1 deletion tap_frontapp/schemas/inboxes_table.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
"type": ["null", "string"]
},
"analytics_date": {
"type": ["null", "string"]
"type": ["null", "string"],
"format": "date-time"
},
"analytics_range": {
"type": ["null", "string"]
Expand Down
3 changes: 2 additions & 1 deletion tap_frontapp/schemas/tags_table.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
"type": ["null", "string"]
},
"analytics_date": {
"type": ["null", "string"]
"type": ["null", "string"],
"format": "date-time"
},
"analytics_range": {
"type": ["null", "string"]
Expand Down
3 changes: 2 additions & 1 deletion tap_frontapp/schemas/teammates_table.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
"type": ["null", "string"]
},
"analytics_date": {
"type": ["null", "string"]
"type": ["null", "string"],
"format": "date-time"
},
"analytics_range": {
"type": ["null", "string"]
Expand Down
3 changes: 2 additions & 1 deletion tap_frontapp/schemas/teams_table.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
"type": ["null", "string"]
},
"analytics_date": {
"type": ["null", "string"]
"type": ["null", "string"],
"format": "date-time"
},
"analytics_range": {
"type": ["null", "string"]
Expand Down
23 changes: 19 additions & 4 deletions tap_frontapp/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,9 @@ def create_report(atx, start_date, end_date, filters):
raise e


def sync_metric(atx, metric_name, start_date, end_date):
def sync_metric(atx, metric_name, start_date, end_date, mdata=None):
if mdata is None:
mdata = {}
for metric in atx.client.list_metrics(path=METRIC_API_PATH[metric_name]):
metric_id = metric['id']
metric_description = metric[METRIC_API_DESCRIPTION_KEY[metric_name]]
Expand All @@ -154,7 +156,7 @@ def sync_metric(atx, metric_name, start_date, end_date):

with singer.metrics.job_timer('daily_aggregated_metric'):
start = time.monotonic()
start_date_formatted = datetime.datetime.fromtimestamp(start_date, tz=datetime.timezone.utc).strftime('%Y-%m-%d')
start_date_formatted = datetime.datetime.fromtimestamp(start_date, tz=datetime.timezone.utc).strftime('%Y-%m-%dT00:00:00Z')
# we've really moved this functionality to the request in the http script
# so we don't expect that this will actually have to run mult times
while True:
Expand Down Expand Up @@ -184,6 +186,9 @@ def sync_metric(atx, metric_name, start_date, end_date):
"metric_description": metric_description,
**{report_metric["id"]: report_metric["value"] for report_metric in report_metrics}
}

if mdata:
record = select_fields(mdata, record)
write_records(metric_name, [record])


Expand All @@ -192,7 +197,17 @@ def write_metrics_state(atx, metric, date_to_resume):
atx.write_state()


def sync_metrics(atx, metric_name):
def sync_metrics(atx, metric_name, mdata=None):
if mdata is None:
mdata = {}
catalog = getattr(atx, 'catalog', None)
streams = getattr(catalog, 'streams', None) if catalog is not None else None
if streams is not None:
for stream in streams:
if getattr(stream, 'tap_stream_id', None) == metric_name:
mdata = singer.metadata.to_map(stream.metadata)
break

bookmark = atx.state.get('bookmarks', {}).get(metric_name, {})
LOGGER.info('metric: {} '.format(metric_name))

Expand Down Expand Up @@ -229,7 +244,7 @@ def sync_metrics(atx, metric_name):
LOGGER.info('ut_current_date: {} '.format(ut_current_date))
ut_next_date = int(next_date.timestamp())
LOGGER.info('ut_next_date: {} '.format(ut_next_date))
sync_metric(atx, metric_name, ut_current_date, ut_next_date)
sync_metric(atx, metric_name, ut_current_date, ut_next_date, mdata)
# if the prior sync is successful it will write the date_to_resume bookmark
write_metrics_state(atx, metric_name, next_date)
current_date = next_date
Expand Down
13 changes: 11 additions & 2 deletions tap_frontapp/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,23 @@ def sync(atx):
catalog = atx.catalog
state = atx.state

streams_to_sync = [s.tap_stream_id for s in catalog.get_selected_streams(state)]
# Use a single source of truth for selected streams. When the
# Context has populated ``selected_stream_ids`` from Singer
# metadata, prefer that set; otherwise fall back to the catalog's
# helper so tests using simple mocks still work.
selected_stream_ids = getattr(atx, "selected_stream_ids", None)
if selected_stream_ids is not None:
streams_to_sync = list(selected_stream_ids)
else:
streams_to_sync = [s.tap_stream_id for s in catalog.get_selected_streams(state)]
LOGGER.info("Selected streams: %s", streams_to_sync)

last_stream = singer.get_currently_syncing(state)
LOGGER.info("Currently syncing: %s", last_stream)

for stream_name in STATIC_SCHEMA_STREAM_IDS:
load_and_write_schema(stream_name)
if stream_name in streams_to_sync:
load_and_write_schema(stream_name)

LOGGER.info("Starting sync of selected streams.")
sync_selected_streams(atx)
Expand Down
Loading
Loading