From cb8dc002f31e7bedf3afee5b1a636546194ce548 Mon Sep 17 00:00:00 2001 From: Dylan Sprayberry Date: Mon, 8 Jan 2024 19:34:30 +0000 Subject: [PATCH] bookmark after each page of conversation records --- tap_intercom/streams.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/tap_intercom/streams.py b/tap_intercom/streams.py index 5fa16c1..04ce9da 100644 --- a/tap_intercom/streams.py +++ b/tap_intercom/streams.py @@ -228,6 +228,10 @@ def sync(self, counter.increment() max_datetime = max(record_datetime, max_datetime) + # Sync child stream, if the child is selected and if we have records greater than the child stream bookmark + if has_child and is_child_selected and (record[self.replication_key] >= child_bookmark_ts): + state = child_stream_obj.sync_substream(record.get('id'), child_schema, child_metadata, record[self.replication_key], state) + if self.to_write_intermediate_bookmark and record_counter == MAX_PAGE_SIZE: # Write bookmark and state after every page of records state = singer.write_bookmark(state, @@ -238,10 +242,6 @@ def sync(self, # Reset counter record_counter = 0 - # Sync child stream, if the child is selected and if we have records greater than the child stream bookmark - if has_child and is_child_selected and (record[self.replication_key] >= child_bookmark_ts): - state = child_stream_obj.sync_substream(record.get('id'), child_schema, child_metadata, record[self.replication_key], state) - bookmark_date = singer.utils.strftime(max_datetime) LOGGER.info("FINISHED Syncing: {}, total_records: {}.".format(self.tap_stream_id, counter.value)) @@ -507,6 +507,7 @@ class Conversations(IncrementalStream): data_key = 'conversations' per_page = MAX_PAGE_SIZE child = 'conversation_parts' + to_write_intermediate_bookmark = True def get_records(self, bookmark_datetime=None, is_parent=False, stream_metadata=None) -> Iterator[list]: paging = True