diff --git a/.gitignore b/.gitignore index 6a7d527..9eb9e20 100644 --- a/.gitignore +++ b/.gitignore @@ -4,7 +4,6 @@ config/ virtualenvs/ *catalog*.json *config*.json -*state*.json target*.json *.sublime-* .python-version @@ -20,3 +19,4 @@ tap_intercom/.vscode/settings.json .DS_Store test_configuration.py tap_target_commands.sh +.env \ No newline at end of file diff --git a/CHANGELOG.md b/CHANGELOG.md index f7c174a..94fc054 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,8 @@ # Changelog +## 2.3.0 + * Add tickets stream support + ## 2.2.4 * Fix interrupted bookmarking for conversations [#85](https://github.com/singer-io/tap-intercom/pull/85) diff --git a/README.md b/README.md index 468db8c..9643b74 100644 --- a/README.md +++ b/README.md @@ -20,6 +20,7 @@ This tap: - [Company Segments](https://developers.intercom.com/intercom-api-reference/reference#list-segments) - [Tags](https://developers.intercom.com/intercom-api-reference/reference#list-tags-for-an-app) - [Teams](https://developers.intercom.com/intercom-api-reference/reference#list-teams) + - [Tickets](https://developers.intercom.com/intercom-api-reference/reference/search-tickets) - [Users](https://developers.intercom.com/intercom-api-reference/reference#list-users) - Outputs the schema for each resource - Incrementally pulls data based on the input state @@ -113,6 +114,15 @@ reference#list-customer-data-attributes) - Replication strategy: FULL_TABLE - Transformations: none +[tickets](https://developers.intercom.com/intercom-api-reference/reference/search-tickets) +- Endpoint: https://api.intercom.io/tickets/search +- Primary key fields: id +- Foreign key fields: contact_ids, admin_assignee_id, team_assignee_id +- Replication strategy: INCREMENTAL (query filtered) + - Sort: updated_at asc + - Bookmark: updated_at (date-time) +- Transformations: none + [users](https://developers.intercom.com/intercom-api-reference/reference#list-users) - Endpoint: https://api.intercom.io/users - Primary key fields: id diff --git a/tap_intercom/schemas/ticket_states.json b/tap_intercom/schemas/ticket_states.json new file mode 100644 index 0000000..fb65a56 --- /dev/null +++ b/tap_intercom/schemas/ticket_states.json @@ -0,0 +1,52 @@ +{ + "type": [ + "null", + "object" + ], + "properties": { + "type": { + "type": [ + "null", + "string" + ] + }, + "id": { + "type": [ + "null", + "string" + ] + }, + "category": { + "type": [ + "null", + "string" + ] + }, + "internal_label": { + "type": [ + "null", + "string" + ] + }, + "external_label": { + "type": [ + "null", + "string" + ] + }, + "archived": { + "type": [ + "null", + "boolean" + ] + }, + "ticket_types": { + "type": [ + "null", + "object" + ], + "additionalProperties": true + } + } +} + diff --git a/tap_intercom/schemas/ticket_types.json b/tap_intercom/schemas/ticket_types.json new file mode 100644 index 0000000..321857a --- /dev/null +++ b/tap_intercom/schemas/ticket_types.json @@ -0,0 +1,91 @@ +{ + "type": [ + "null", + "object" + ], + "properties": { + "type": { + "type": [ + "null", + "string" + ] + }, + "id": { + "type": [ + "null", + "string" + ] + }, + "category": { + "type": [ + "null", + "string" + ] + }, + "name": { + "type": [ + "null", + "string" + ] + }, + "description": { + "type": [ + "null", + "string" + ] + }, + "icon": { + "type": [ + "null", + "string" + ] + }, + "workspace_id": { + "type": [ + "null", + "string" + ] + }, + "ticket_type_attributes": { + "type": [ + "null", + "object" + ], + "additionalProperties": true + }, + "ticket_states": { + "type": [ + "null", + "object" + ], + "additionalProperties": true + }, + "archived": { + "type": [ + "null", + "boolean" + ] + }, + "created_at": { + "type": [ + "null", + "string" + ], + "format": "date-time" + }, + "updated_at": { + "type": [ + "null", + "string" + ], + "format": "date-time" + }, + "is_internal": { + "type": [ + "null", + "boolean" + ] + } + } +} + diff --git a/tap_intercom/schemas/tickets.json b/tap_intercom/schemas/tickets.json new file mode 100644 index 0000000..07cd10a --- /dev/null +++ b/tap_intercom/schemas/tickets.json @@ -0,0 +1,593 @@ +{ + "type": [ + "null", + "object" + ], + "properties": { + "type": { + "type": [ + "null", + "string" + ] + }, + "id": { + "type": [ + "null", + "string" + ] + }, + "ticket_id": { + "type": [ + "null", + "string" + ] + }, + "created_at": { + "type": [ + "null", + "string" + ], + "format": "date-time" + }, + "updated_at": { + "type": [ + "null", + "string" + ], + "format": "date-time" + }, + "title": { + "type": [ + "null", + "string" + ] + }, + "description": { + "type": [ + "null", + "string" + ] + }, + "category": { + "type": [ + "null", + "string" + ] + }, + "ticket_type_id": { + "type": [ + "null", + "string" + ] + }, + "state": { + "type": [ + "null", + "string" + ] + }, + "open": { + "type": [ + "null", + "boolean" + ] + }, + "snoozed_until": { + "type": [ + "null", + "string" + ], + "format": "date-time" + }, + "contact_ids": { + "type": [ + "null", + "array" + ], + "items": { + "type": [ + "null", + "string" + ] + } + }, + "teammate_ids": { + "type": [ + "null", + "array" + ], + "items": { + "type": [ + "null", + "string" + ] + } + }, + "admin_assignee_id": { + "type": [ + "null", + "string" + ] + }, + "team_assignee_id": { + "type": [ + "null", + "string" + ] + }, + "is_shared": { + "type": [ + "null", + "boolean" + ] + }, + "ticket_attributes": { + "type": [ + "null", + "object" + ], + "additionalProperties": true, + "properties": {} + }, + "contacts": { + "type": [ + "null", + "object" + ], + "additionalProperties": false, + "properties": { + "type": { + "type": [ + "null", + "string" + ] + }, + "contacts": { + "type": [ + "null", + "array" + ], + "items": { + "type": [ + "null", + "object" + ], + "additionalProperties": false, + "properties": { + "type": { + "type": [ + "null", + "string" + ] + }, + "id": { + "type": [ + "null", + "string" + ] + }, + "external_id": { + "type": [ + "null", + "string" + ] + } + } + } + } + } + }, + "ticket_state": { + "type": [ + "null", + "object" + ], + "additionalProperties": false, + "properties": { + "type": { + "type": [ + "null", + "string" + ] + }, + "id": { + "type": [ + "null", + "string" + ] + }, + "category": { + "type": [ + "null", + "string" + ] + }, + "internal_label": { + "type": [ + "null", + "string" + ] + }, + "external_label": { + "type": [ + "null", + "string" + ] + }, + "archived": { + "type": [ + "null", + "boolean" + ] + } + } + }, + "ticket_type": { + "type": [ + "null", + "object" + ], + "additionalProperties": false, + "properties": { + "type": { + "type": [ + "null", + "string" + ] + }, + "id": { + "type": [ + "null", + "string" + ] + }, + "category": { + "type": [ + "null", + "string" + ] + }, + "name": { + "type": [ + "null", + "string" + ] + }, + "description": { + "type": [ + "null", + "string" + ] + }, + "icon": { + "type": [ + "null", + "string" + ] + }, + "workspace_id": { + "type": [ + "null", + "string" + ] + }, + "ticket_type_attributes": { + "type": [ + "null", + "object" + ], + "additionalProperties": true + }, + "ticket_states": { + "type": [ + "null", + "object" + ], + "additionalProperties": true + }, + "archived": { + "type": [ + "null", + "boolean" + ] + }, + "created_at": { + "type": [ + "null", + "string" + ], + "format": "date-time" + }, + "updated_at": { + "type": [ + "null", + "string" + ], + "format": "date-time" + }, + "is_internal": { + "type": [ + "null", + "boolean" + ] + } + } + }, + "linked_objects": { + "type": [ + "null", + "object" + ], + "additionalProperties": false, + "properties": { + "type": { + "type": [ + "null", + "string" + ] + }, + "data": { + "type": [ + "null", + "array" + ], + "items": { + "type": [ + "null", + "object" + ], + "additionalProperties": false, + "properties": { + "id": { + "type": [ + "null", + "string" + ] + }, + "category": { + "type": [ + "null", + "string" + ] + } + } + } + }, + "total_count": { + "type": [ + "null", + "integer" + ] + }, + "has_more": { + "type": [ + "null", + "boolean" + ] + } + } + }, + "ticket_parts": { + "type": [ + "null", + "object" + ], + "additionalProperties": false, + "properties": { + "type": { + "type": [ + "null", + "string" + ] + }, + "ticket_parts": { + "type": [ + "null", + "array" + ], + "items": { + "type": [ + "null", + "object" + ], + "additionalProperties": false, + "properties": { + "type": { + "type": [ + "null", + "string" + ] + }, + "id": { + "type": [ + "null", + "string" + ] + }, + "part_type": { + "type": [ + "null", + "string" + ] + }, + "body": { + "type": [ + "null", + "string" + ] + }, + "created_at": { + "type": [ + "null", + "string" + ], + "format": "date-time" + }, + "updated_at": { + "type": [ + "null", + "string" + ], + "format": "date-time" + }, + "previous_ticket_state": { + "type": [ + "null", + "string" + ] + }, + "ticket_state": { + "type": [ + "null", + "string" + ] + }, + "assigned_to": { + "type": [ + "null", + "object" + ], + "additionalProperties": false, + "properties": { + "type": { + "type": [ + "null", + "string" + ] + }, + "id": { + "type": [ + "null", + "string" + ] + } + } + }, + "author": { + "type": [ + "null", + "object" + ], + "additionalProperties": false, + "properties": { + "type": { + "type": [ + "null", + "string" + ] + }, + "id": { + "type": [ + "null", + "string" + ] + }, + "name": { + "type": [ + "null", + "string" + ] + }, + "email": { + "type": [ + "null", + "string" + ] + } + } + }, + "attachments": { + "type": [ + "null", + "array" + ], + "items": { + "type": [ + "null", + "object" + ], + "additionalProperties": false, + "properties": { + "type": { + "type": [ + "null", + "string" + ] + }, + "name": { + "type": [ + "null", + "string" + ] + }, + "url": { + "type": [ + "null", + "string" + ] + }, + "content_type": { + "type": [ + "null", + "string" + ] + }, + "filesize": { + "type": [ + "null", + "integer" + ] + }, + "width": { + "type": [ + "null", + "integer" + ] + }, + "height": { + "type": [ + "null", + "integer" + ] + } + } + } + }, + "external_id": { + "type": [ + "null", + "string" + ] + }, + "redacted": { + "type": [ + "null", + "boolean" + ] + }, + "app_package_code": { + "type": [ + "null", + "string" + ] + }, + "updated_attribute_data": { + "type": [ + "null", + "object" + ], + "additionalProperties": true + } + } + } + }, + "total_count": { + "type": [ + "null", + "integer" + ] + } + } + } + } +} diff --git a/tap_intercom/streams.py b/tap_intercom/streams.py index cf27e61..a1ddf9a 100644 --- a/tap_intercom/streams.py +++ b/tap_intercom/streams.py @@ -152,6 +152,11 @@ def set_last_sync_started_at(self, state): def skip_records(self, record): return False + def get_start_bookmark(self, state, config): + return singer.get_bookmark( + state, self.tap_stream_id, self.replication_key, config['start_date'] + ) + def write_bookmark(self, state, bookmark_value): return singer.write_bookmark(state, self.tap_stream_id, @@ -192,13 +197,28 @@ def sync(self, child_stream = STREAMS.get(self.child) # Get current stream bookmark - parent_bookmark = singer.get_bookmark(state, self.tap_stream_id, self.replication_key, config['start_date']) + raw_state_bookmark = None + if isinstance(state, dict): + raw_state_bookmark = state.get("bookmarks", {}).get(self.tap_stream_id) + LOGGER.info( + "Stream: %s, raw state bookmark entry before get_bookmark: %s", + self.tap_stream_id, + raw_state_bookmark, + ) + + parent_bookmark = self.get_start_bookmark(state, config) + LOGGER.info( + "Stream: %s, config start_date=%s, parent_bookmark used=%s", + self.tap_stream_id, + config.get("start_date"), + parent_bookmark, + ) parent_bookmark_utc = singer.utils.strptime_to_utc(parent_bookmark) sync_start_date = parent_bookmark_utc self.set_last_processed(state) self.set_last_sync_started_at(state) - is_parent_selected = True + is_parent_selected = self.tap_stream_id in self.selected_streams is_child_selected = False # If the current stream has a child stream, then get the child stream's bookmark @@ -253,14 +273,19 @@ def sync(self, record[self.replication_key]) ) - # Write the record if the parent is selected - if is_parent_selected and record_datetime >= parent_bookmark_utc: + # Write the record if: + # 1. Parent is selected AND record >= parent_bookmark, OR + # 2. Child is selected AND record >= child_bookmark (need parent record for context) + should_write_parent = (is_parent_selected and record_datetime >= parent_bookmark_utc) or \ + (is_child_selected and has_child and record_datetime >= child_bookmark_utc) + + if should_write_parent: record_counter += 1 transformed_record = transform(record, stream_schema, integer_datetime_fmt=UNIX_MILLISECONDS_INTEGER_DATETIME_PARSING, metadata=stream_metadata) - # Write record if a parent is selected + # Write record if a parent is selected or if child is selected and needs parent context singer.write_record(self.tap_stream_id, transformed_record, time_extracted=singer.utils.now()) counter.increment() max_datetime = max(record_datetime, max_datetime) @@ -548,6 +573,15 @@ class Conversations(IncrementalStream): per_page = MAX_PAGE_SIZE child = 'conversation_parts' + def get_start_bookmark(self, state, config): + in_progress_bookmark = singer.get_bookmark( + state, self.tap_stream_id, "last_sync_started_at" + ) + default_bookmark = in_progress_bookmark or config['start_date'] + return singer.get_bookmark( + state, self.tap_stream_id, self.replication_key, default_bookmark + ) + def set_last_processed(self, state): self.last_processed = singer.get_bookmark( state, self.tap_stream_id, "last_processed") @@ -590,6 +624,10 @@ def write_intermediate_bookmark(self, state, last_processed, bookmark_value): self.tap_stream_id, "last_sync_started_at", self.last_sync_started_at) + state = singer.write_bookmark(state, + self.tap_stream_id, + self.replication_key, + self.last_sync_started_at) singer.write_state(state) def get_records(self, bookmark_datetime=None, is_parent=False, stream_metadata=None) -> Iterator[list]: @@ -901,6 +939,123 @@ def get_records(self, bookmark_datetime=None, is_parent=False) -> Iterator[list] yield from response.get(self.data_key, []) +class Tickets(IncrementalStream): + """ + Retrieve tickets + + Docs: https://developers.intercom.com/intercom-api-reference/reference/search-tickets + """ + tap_stream_id = 'tickets' + key_properties = ['id'] + path = 'tickets/search' + replication_key = 'updated_at' + valid_replication_keys = ['updated_at'] + data_key = 'tickets' + per_page = MAX_PAGE_SIZE + to_write_intermediate_bookmark = True + + def get_records(self, bookmark_datetime=None, is_parent=False, stream_metadata=None) -> Iterator[list]: + paging = True + starting_after = None + search_query = { + 'pagination': { + 'per_page': self.per_page + }, + 'query': { + 'operator': 'OR', + 'value': [{ + 'field': self.replication_key, + 'operator': '>', + 'value': self.dt_to_epoch_seconds(bookmark_datetime) + }, + { + 'field': self.replication_key, + 'operator': '=', + 'value': self.dt_to_epoch_seconds(bookmark_datetime) + }] + }, + 'sort': { + 'field': self.replication_key, + 'order': 'ascending' + } + } + LOGGER.info("Syncing: {}".format(self.tap_stream_id)) + + while paging: + response = self.client.post(self.path, json=search_query) + + if 'pages' in response and response.get('pages', {}).get('next'): + starting_after = response.get('pages').get('next').get('starting_after') + search_query['pagination'].update({'starting_after': starting_after}) + else: + paging = False + + records = transform_json(response, self.tap_stream_id, self.data_key) + LOGGER.info("Synced: {} for page: {}, records: {}".format(self.tap_stream_id, response.get('pages', {}).get('page'), len(records))) + + yield from records + + +class TicketTypes(FullTableStream): + """ + Retrieve ticket types + + Docs: https://developers.intercom.com/docs/references/rest-api/api.intercom.io/ticket-types + """ + tap_stream_id = 'ticket_types' + key_properties = ['id'] + path = 'ticket_types' + data_key = 'data' + + def get_records(self, bookmark_datetime=None, is_parent=False) -> Iterator[list]: + paging = True + next_page = None + LOGGER.info("Syncing: {}".format(self.tap_stream_id)) + + while paging: + response = self.client.get(self.path, url=next_page, params=self.params) + + LOGGER.info("Synced: {}, records: {}".format(self.tap_stream_id, len(response.get(self.data_key, [])))) + if 'pages' in response and response.get('pages', {}).get('next'): + next_page = response.get('pages', {}).get('next') + self.path = None + LOGGER.info("Syncing next page") + else: + paging = False + + yield from response.get(self.data_key, []) + + +class TicketStates(FullTableStream): + """ + Retrieve ticket states + + Docs: https://developers.intercom.com/docs/references/rest-api/api.intercom.io/ticket-states + """ + tap_stream_id = 'ticket_states' + key_properties = ['id'] + path = 'ticket_states' + data_key = 'data' + + def get_records(self, bookmark_datetime=None, is_parent=False) -> Iterator[list]: + paging = True + next_page = None + LOGGER.info("Syncing: {}".format(self.tap_stream_id)) + + while paging: + response = self.client.get(self.path, url=next_page, params=self.params) + + LOGGER.info("Synced: {}, records: {}".format(self.tap_stream_id, len(response.get(self.data_key, [])))) + if 'pages' in response and response.get('pages', {}).get('next'): + next_page = response.get('pages', {}).get('next') + self.path = None + LOGGER.info("Syncing next page") + else: + paging = False + + yield from response.get(self.data_key, []) + + STREAMS = { "admin_list": AdminList, "admins": Admins, @@ -913,5 +1068,8 @@ def get_records(self, bookmark_datetime=None, is_parent=False) -> Iterator[list] "contacts": Contacts, "segments": Segments, "tags": Tags, - "teams": Teams + "teams": Teams, + "tickets": Tickets, + "ticket_types": TicketTypes, + "ticket_states": TicketStates } diff --git a/tap_intercom/sync.py b/tap_intercom/sync.py index 7df3e60..7c0552d 100644 --- a/tap_intercom/sync.py +++ b/tap_intercom/sync.py @@ -68,13 +68,41 @@ def sync(config, state, catalog): access_token = config.get('access_token') client = IntercomClient(access_token, config.get('request_timeout'), config.get('user_agent')) # pass request_timeout parameter from config + # Some orchestrators (or custom state stores) may wrap the actual Singer + # state inside a top-level "singer_state" key, e.g.: + # { + # "singer_state": { + # "currently_syncing": null, + # "bookmarks": { ... } + # } + # } + # The tap, however, expects to receive the inner object as the state. To + # support both representations we unwrap the state if needed. + if isinstance(state, dict) and "singer_state" in state and isinstance(state["singer_state"], dict): + state = state["singer_state"] + + LOGGER.info( + "Raw incoming state. type=%s keys=%s", + type(state), + list(state.keys()) if isinstance(state, dict) else None, + ) + # Translate state to the new format with replication key in the state state = translate_state(state) - selected_stream_names = [] + if isinstance(state, dict): + LOGGER.info( + "Translated state bookmarks keys: %s", + list(state.get("bookmarks", {}).keys()), + ) + + # Determine which streams are selected in the catalog. + # If none are explicitly selected, default to all streams. selected_streams = list(catalog.get_selected_streams(state)) - for stream in selected_streams: - selected_stream_names.append(stream.tap_stream_id) + if not selected_streams: + selected_streams = catalog.streams + + selected_stream_names = [stream.tap_stream_id for stream in selected_streams] with Transformer() as transformer: for stream in get_streams_to_sync(catalog, selected_streams, selected_stream_names): diff --git a/tests/base.py b/tests/base.py index f7ac3e0..a0398be 100644 --- a/tests/base.py +++ b/tests/base.py @@ -137,6 +137,22 @@ def expected_metadata(self): self.PRIMARY_KEYS: {'id'}, self.REPLICATION_METHOD: self.FULL_TABLE, self.OBEYS_START_DATE : False + }, + "tickets": { + self.PRIMARY_KEYS: {"id"}, + self.REPLICATION_METHOD: self.INCREMENTAL, + self.REPLICATION_KEYS: {"updated_at"}, + self.OBEYS_START_DATE: True + }, + "ticket_types": { + self.PRIMARY_KEYS: {"id"}, + self.REPLICATION_METHOD: self.FULL_TABLE, + self.OBEYS_START_DATE: False + }, + "ticket_states": { + self.PRIMARY_KEYS: {"id"}, + self.REPLICATION_METHOD: self.FULL_TABLE, + self.OBEYS_START_DATE: False } } diff --git a/tests/unittests/test_conversation_part_bookmarks.py b/tests/unittests/test_conversation_part_bookmarks.py index 1271994..cdecc9e 100644 --- a/tests/unittests/test_conversation_part_bookmarks.py +++ b/tests/unittests/test_conversation_part_bookmarks.py @@ -70,3 +70,58 @@ def test_conversation_parts_bookmarking(self, mocked_client_get, mocked_parent_r self.assertEqual(mocked_write_bookmark.mock_calls, expected_write_bookmark) # Verify we get 'conversation_parts' bookmark self.assertIsNotNone(tap_state.get('bookmarks').get('conversation_parts')) + + def test_conversations_resume_from_in_progress_bookmark(self): + client = IntercomClient('dummy_token', None) + conversations = Conversations( + client=client, + catalog=Catalog(['conversations']), + selected_streams=['conversations'], + ) + + state = { + 'bookmarks': { + 'conversations': { + 'last_processed': '215473344415580', + 'last_sync_started_at': '2026-03-11T03:00:55.402219Z', + } + } + } + + bookmark = conversations.get_start_bookmark( + state, + {'start_date': '2025-12-01T00:00:00Z'}, + ) + + self.assertEqual(bookmark, '2026-03-11T03:00:55.402219Z') + + @mock.patch("tap_intercom.streams.singer.write_state") + def test_conversations_intermediate_state_sets_updated_at(self, mocked_write_state): + client = IntercomClient('dummy_token', None) + conversations = Conversations( + client=client, + catalog=Catalog(['conversations']), + selected_streams=['conversations'], + ) + conversations.last_sync_started_at = '2026-03-11T03:00:55.402219Z' + + tap_state = {} + conversations.write_intermediate_bookmark( + tap_state, + '215473344415580', + None, + ) + + self.assertEqual( + tap_state['bookmarks']['conversations']['updated_at'], + '2026-03-11T03:00:55.402219Z', + ) + self.assertEqual( + tap_state['bookmarks']['conversations']['last_sync_started_at'], + '2026-03-11T03:00:55.402219Z', + ) + self.assertEqual( + tap_state['bookmarks']['conversations']['last_processed'], + '215473344415580', + ) + mocked_write_state.assert_called_once_with(tap_state)