Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion tap_google_analytics/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

LOGGER = singer.get_logger()

DEFAULT_PAGE_SIZE = 1000

# TODO: Add an integration test with multiple profiles that asserts state
def clean_state_for_report(config, state, tap_stream_id):
Expand Down Expand Up @@ -57,12 +58,31 @@ def get_end_date(config):
def get_view_ids(config):
return config.get('view_ids') or [config.get('view_id')]

def get_page_size(config):
"""
This function will get page size from config,
and will return the default value if an invalid page size is given.
"""
page_size = config.get('page_size', DEFAULT_PAGE_SIZE)
try:
if int(float(page_size)) > 0:
return int(float(page_size))
else:
LOGGER.warning(f"The entered page size is invalid; it will be set to the default page size of {DEFAULT_PAGE_SIZE}")
return DEFAULT_PAGE_SIZE
except Exception:
LOGGER.warning(f"The entered page size is invalid; it will be set to the default page size of {DEFAULT_PAGE_SIZE}")
return DEFAULT_PAGE_SIZE

def do_sync(client, config, catalog, state):
"""
Translate metadata into a set of metrics and dimensions and call out
to sync to generate the required reports.
"""
selected_streams = catalog.get_selected_streams(state)
# Get page size
page_size = get_page_size(config)

for stream in selected_streams:
# Transform state for this report to new format before proceeding
state = clean_state_for_report(config, state, stream.tap_stream_id)
Expand Down Expand Up @@ -122,7 +142,7 @@ def do_sync(client, config, catalog, state):

is_historical_sync, start_date = get_start_date(config, report['profile_id'], state, report['id'])

sync_report(client, schema, report, start_date, end_date, state, is_historical_sync)
sync_report(client, schema, report, start_date, end_date, state, page_size, is_historical_sync)
state.pop('currently_syncing_view', None)
singer.write_state(state)
state = singer.set_currently_syncing(state, None)
Expand Down
3 changes: 2 additions & 1 deletion tap_google_analytics/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,7 @@ def get_custom_dimensions(self, account_id, web_property_id):

# Sync Requests w/ Pagination and token refresh
# Docs for more info: https://developers.google.com/analytics/devguides/reporting/core/v4/rest/v4/reports/batchGet
def get_report(self, name, profile_id, report_date, metrics, dimensions):
def get_report(self, name, profile_id, report_date, metrics, dimensions, page_size):
"""
Parameters:
- name - the tap_stream_id of the report being run
Expand All @@ -454,6 +454,7 @@ def get_report(self, name, profile_id, report_date, metrics, dimensions):
nextPageToken)
body = {"reportRequests":
[{"viewId": profile_id,
"pageSize": page_size, # Pagination parameter
"dateRanges": [{"startDate": report_date_string,
"endDate": report_date_string}],
"metrics": [{"expression": m} for m in metrics],
Expand Down
4 changes: 2 additions & 2 deletions tap_google_analytics/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ def transform_datetimes(rec):
rec[field_name] = datetime.strptime(value, DATETIME_FORMATS[field_name]).strftime(singer.utils.DATETIME_FMT)
return rec

def sync_report(client, schema, report, start_date, end_date, state, historically_syncing=False):
def sync_report(client, schema, report, start_date, end_date, state, page_size, historically_syncing=False):
"""
Run a sync, beginning from either the start_date or bookmarked date,
requesting a report per day, until the last full day of data. (e.g.,
Expand All @@ -114,7 +114,7 @@ def sync_report(client, schema, report, start_date, end_date, state, historicall
for report_date in generate_report_dates(start_date, end_date):
for raw_report_response in client.get_report(report['name'], report['profile_id'],
report_date, report['metrics'],
report['dimensions']):
report['dimensions'], page_size):

with singer.metrics.record_counter(report['name']) as counter:
time_extracted = singer.utils.now()
Expand Down
24 changes: 16 additions & 8 deletions tests/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ class GoogleAnalyticsBaseTest(unittest.TestCase):
FULL_TABLE = "FULL_TABLE"
START_DATE_FORMAT = "%Y-%m-%dT00:00:00Z"
REPLICATION_KEY_FORMAT = "%Y-%m-%dT00:00:00.000000Z"
PAGE_SIZE = 1000

start_date = ""

Expand All @@ -50,7 +51,8 @@ def get_properties(self, original: bool = True):
return_value = {
'start_date' : (dt.utcnow() - timedelta(days=30)).strftime(self.START_DATE_FORMAT),
'view_id': os.getenv('TAP_GOOGLE_ANALYTICS_VIEW_ID'),
'report_definitions': [{"id": "a665732c-d18b-445c-89b2-5ca8928a7305", "name": "Test Report 1"}]
'report_definitions': [{"id": "a665732c-d18b-445c-89b2-5ca8928a7305", "name": "Test Report 1"}],
'page_size': self.PAGE_SIZE
}
if original:
return return_value
Expand Down Expand Up @@ -436,16 +438,22 @@ def expected_default_fields():
@staticmethod
def expected_pagination_fields():
return {
"Test Report 1" : set(),
"Test Report 1" : {
'ga:userType', 'ga:sessionCount', 'ga:daysSinceLastSession', 'ga:exitPagePath', 'ga:previousPagePath'
},
"Audience Overview": {
"ga:users", "ga:newUsers", "ga:sessions", "ga:sessionsPerUser", "ga:pageviews",
"ga:pageviewsPerSession", "ga:sessionDuration", "ga:bounceRate", "ga:date",
# "ga:pageviews",
'ga:browser', 'ga:operatingSystem', 'ga:country', 'ga:city', 'ga:language', 'ga:year', 'ga:month', 'ga:hour'
},
"Audience Geo Location": {
'ga:year', 'ga:month', 'ga:hour'
},
"Audience Geo Location": set(),
"Audience Technology": set(),
"Acquisition Overview": set(),
"Behavior Overview": set(),
"Acquisition Overview": {
'ga:acquisitionMedium', 'ga:acquisitionSource', 'ga:acquisitionSourceMedium', 'ga:acquisitionTrafficChannel'
},
"Behavior Overview": {
"ga:year", "ga:month", "ga:hour",
},
"Ecommerce Overview": set(),
}

Expand Down
162 changes: 97 additions & 65 deletions tests/test_google_analytics_pagination.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,93 +2,125 @@
### TODO WAITING ON https://stitchdata.atlassian.net/browse/SRCE-5065
##########################################################################

# import os

# from datetime import timedelta
# from datetime import datetime as dt
from math import ceil
from datetime import timedelta
from datetime import datetime as dt

# from tap_tester import connections, runner
from tap_tester import connections, runner

# from base import GoogleAnalyticsBaseTest
from base import GoogleAnalyticsBaseTest


# class GoogleAnalyticsPaginationTest(GoogleAnalyticsBaseTest):
# """Test that we are paginating for streams when exceeding the API record limit of a single query"""
class GoogleAnalyticsPaginationTest(GoogleAnalyticsBaseTest):
"""Test that we are paginating for streams when exceeding the API record limit of a single query"""

# @staticmethod
# def name():
# return "tap_tester_google_analytics_pagination_test"
# TODO https://stitchdata.atlassian.net/browse/SRCE-5084
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to be an old link (old JIRA environment).
Is there a new JIRA associated with it, if not can we remove this ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cosimon @kspeer825 Should we remove such a Jira link from the tap-tester? We are not sure whether it is an old Jira link or not because it is not accessible to us.

SKIP_STREAMS = {'Ecommerce Overview',}

API_LIMIT = 2

@staticmethod
def name():
return "tap_tester_google_analytics_pagination_test"

# def test_run(self):
# """
# Verify that for each report you can get multiple pages of data for a given date.

# PREREQUISITE
# For EACH report, a dimension must be selected in which the number of distinct values is
# greater than the default value of maxResults (page size).
# """
# self.start_date = (dt.utcnow() - timedelta(days=4)).strftime(self.START_DATE_FORMAT) # Needs to be prior to isGolden..
# API_LIMIT = {stream: 1000 for stream in self.expected_sync_streams()}
# expected_streams = {'Audience Overview', 'report 1'} # self.expected_sync_streams()
def test_run(self):
"""
Verify that for each report you can get multiple pages of data for a given date.

# # Create connection but do not use default start date
# conn_id = connections.ensure_connection(self, original_properties=False)
PREREQUISITE
For EACH report, a dimension must be selected in which the number of distinct values is
greater than the default value of maxResults (page size).
"""
self.start_date = (dt.utcnow() - timedelta(days=4)).strftime(self.START_DATE_FORMAT) # Needs to be prior to isGolden..

expected_streams = self.expected_sync_streams() - self.SKIP_STREAMS

# # run check mode
# found_catalogs = self.run_and_verify_check_mode(conn_id)
# Reduce page_size to pass the pagination test case.
self.PAGE_SIZE = self.API_LIMIT

# Create connection but do not use default start date
conn_id = connections.ensure_connection(self, original_properties=False)

# # TODO remove once all streams covered
# found_catalogs = [catalog for catalog in found_catalogs
# if catalog.get('stream_name') in expected_streams]
# run check mode
found_catalogs = self.run_and_verify_check_mode(conn_id)

# # TODO select specific streams
# # table and field selection
# self.perform_and_verify_table_and_field_selection(
# conn_id, found_catalogs, select_all_fields=True,
# #select_default_fields=False, select_pagination_fields=True
# )
# TODO remove once all streams covered
found_catalogs = [catalog for catalog in found_catalogs
if catalog.get('stream_name') in expected_streams]

# TODO select specific streams
# table and field selection
self.perform_and_verify_table_and_field_selection(
conn_id, found_catalogs, #select_all_fields=True,
select_default_fields=False, select_pagination_fields=True
)

# # run initial sync
# record_count_by_stream = self.run_and_verify_sync(conn_id)

# synced_records = runner.get_records_from_target_output()
# import pdb; pdb.set_trace()
# # messages1 = synced_records['Audience Overview']['messages']
# messages2 = synced_records['report 1']['messages']
# run initial sync
record_count_by_stream = self.run_and_verify_sync(conn_id)

# # m1_25 = [m for m in messages1 if m['data']['start_date'] == '2021-02-25T00:00:00.000000Z' ]
# # m1_26 = [m for m in messages1 if m['data']['start_date'] == '2021-02-26T00:00:00.000000Z' ]
# # m1_27 = [m for m in messages1 if m['data']['start_date'] == '2021-02-27T00:00:00.000000Z' ]
# # m1_28 = [m for m in messages1 if m['data']['start_date'] == '2021-02-28T00:00:00.000000Z' ]
synced_records = runner.get_records_from_target_output()

# m2_25 = [m for m in messages2 if m['data']['start_date'] == '2021-02-25T00:00:00.000000Z' ]
# m2_26 = [m for m in messages2 if m['data']['start_date'] == '2021-02-26T00:00:00.000000Z' ]
# m2_27 = [m for m in messages2 if m['data']['start_date'] == '2021-02-27T00:00:00.000000Z' ]
# m2_28 = [m for m in messages2 if m['data']['start_date'] == '2021-02-28T00:00:00.000000Z' ]
# Verify no unexpected streams were replicated
synced_stream_names = set(synced_records.keys())
self.assertSetEqual(expected_streams, synced_stream_names)

# for stream in expected_streams:
# with self.subTest(stream=stream):
# Revert back page size to 1000 for other test cases.
self.PAGE_SIZE = 1000
Comment on lines +70 to +71
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is the page size changed here? And could you update this test to cover a larger page size than 1 so that we are validating multiple records per page?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have updated the page size to 2 for a pagination test case. Smaller page size takes a long time to complete the sync. So, here we are using a smaller page size just for the pagination test case, and for the rest of the test cases, we are reverting back it to 1000.


# # TODO Verify we are paginating for testable synced streams
# self.assertGreater(record_count_by_stream.get(stream, -1), API_LIMIT.get(stream),
# msg="We didn't guarantee pagination. The number of records should exceed the api limit.")
for stream in expected_streams:
with self.subTest(stream=stream):

# data = synced_records.get(stream, [])
# actual_records = [row['data'] for row in data['messages']]
# record_messages_keys = [set(row['data'].keys()) for row in data['messages']]
# auto_fields = self.expected_automatic_fields().get(stream)
# expected values
expected_primary_keys = self.expected_primary_keys()[stream]

# for actual_keys in record_messages_keys:
# TODO Verify we are paginating for testable synced streams
self.assertGreater(record_count_by_stream.get(stream, -1), self.API_LIMIT,
msg="We didn't guarantee pagination. The number of records should exceed the api limit.")

# # Verify that the automatic fields are sent to the target for paginated streams
# self.assertTrue(auto_fields.issubset(actual_keys),
# msg="A paginated synced stream has a record that is missing automatic fields.")
data = synced_records.get(stream, [])
record_messages_keys = [set(row['data'].keys()) for row in data['messages']]
auto_fields = self.expected_automatic_fields().get(stream)

# # Verify we have more fields sent to the target than just automatic fields (this is set above)
# self.assertEqual(set(), auto_fields.difference(actual_keys),
# msg="A paginated synced stream has a record that is missing expected fields.")
for actual_keys in record_messages_keys:

# # TODO Verify by pks that the replicated records match our expectations
# # self.assertPKsEqual(stream, expected_records.get(stream), actual_records)
# Verify that the automatic fields are sent to the target for paginated streams
self.assertTrue(auto_fields.issubset(actual_keys),
msg="A paginated synced stream has a record that is missing automatic fields.")

# Verify we have more fields sent to the target than just automatic fields (this is set above)
self.assertEqual(set(), auto_fields.difference(actual_keys),
msg="A paginated synced stream has a record that is missing expected fields.")


messages = synced_records[stream]['messages']

# Retrieve records for the start_date window only because, the tap calls the API on day-wise.
# So, we are verifying pagination for the start_date API call.
start_date_messages = [m for m in messages if m['data']['start_date'] == self.start_date ]
primary_keys_list = [tuple([message.get('data').get(expected_pk) for expected_pk in expected_primary_keys])
for message in start_date_messages
if message.get('action') == 'upsert']

# Chunk the replicated records (just primary keys) into expected pages
pages = []
page_count = ceil(len(primary_keys_list) / self.API_LIMIT)
page_size = self.API_LIMIT
for page_index in range(page_count):
page_start = page_index * page_size
page_end = (page_index + 1) * page_size
pages.append(set(primary_keys_list[page_start:page_end]))

# Verify by primary keys that data is unique for each page
for current_index, current_page in enumerate(pages):
with self.subTest(current_page_primary_keys=current_page):

for other_index, other_page in enumerate(pages):
if current_index == other_index:
continue # don't compare the page to itself

self.assertTrue(
current_page.isdisjoint(other_page), msg=f'other_page_primary_keys={other_page}'
)
Loading