diff --git a/airbyte-integrations/connectors/source-orb/acceptance-test-config.yml b/airbyte-integrations/connectors/source-orb/acceptance-test-config.yml index 71a4457cf2d6..76134772a4dc 100644 --- a/airbyte-integrations/connectors/source-orb/acceptance-test-config.yml +++ b/airbyte-integrations/connectors/source-orb/acceptance-test-config.yml @@ -16,7 +16,8 @@ tests: basic_read: - config_path: "secrets/config.json" configured_catalog_path: "integration_tests/configured_catalog.json" - empty_streams: [] + fail_on_extra_columns: false + empty_streams: ["credits_ledger_entries"] incremental: - config_path: "secrets/config.json" configured_catalog_path: "integration_tests/configured_catalog.json" diff --git a/airbyte-integrations/connectors/source-orb/bootstrap.md b/airbyte-integrations/connectors/source-orb/bootstrap.md index ea3ad7fe4233..a7164a42fb79 100644 --- a/airbyte-integrations/connectors/source-orb/bootstrap.md +++ b/airbyte-integrations/connectors/source-orb/bootstrap.md @@ -6,10 +6,11 @@ Orb is a REST API. Connector has the following streams, and all of them support * [Plans](https://docs.withorb.com/reference/list-plans) * [Customers](https://docs.withorb.com/reference/list-customers) * [Credits Ledger Entries](https://docs.withorb.com/reference/view-credits-ledger) +* [Invoices](https://docs.withorb.com/docs/orb-docs/api-reference/schemas/invoice) Note that the Credits Ledger Entries must read all Customers for an incremental sync, but will only incrementally return new ledger entries for each customer. -Since the Orb API does not allow querying objects based on `updated_at`, these incremental syncs will capture updates to newly created objects but not resources updated after object creation. +Since the Orb API does not allow querying objects based on `updated_at`, these incremental syncs will capture updates to newly created objects but not resources updated after object creation. Use a full resync in order to capture newly updated entries. ## Pagination diff --git a/airbyte-integrations/connectors/source-orb/integration_tests/abnormal_state.json b/airbyte-integrations/connectors/source-orb/integration_tests/abnormal_state.json index 713104475acd..d655333ee877 100644 --- a/airbyte-integrations/connectors/source-orb/integration_tests/abnormal_state.json +++ b/airbyte-integrations/connectors/source-orb/integration_tests/abnormal_state.json @@ -17,5 +17,8 @@ "FDWRvxuBUiFfZech": { "timeframe_start": "2122-01-01T00:00:00Z" } + }, + "invoices": { + "invoice_date": "2122-01-01T00:00:00Z" } } diff --git a/airbyte-integrations/connectors/source-orb/integration_tests/configured_catalog.json b/airbyte-integrations/connectors/source-orb/integration_tests/configured_catalog.json index 1bbf8cb7db46..ab54424f7682 100644 --- a/airbyte-integrations/connectors/source-orb/integration_tests/configured_catalog.json +++ b/airbyte-integrations/connectors/source-orb/integration_tests/configured_catalog.json @@ -36,6 +36,18 @@ "sync_mode": "incremental", "destination_sync_mode": "overwrite" }, + { + "stream": { + "name": "invoices", + "json_schema": {}, + "supported_sync_modes": ["full_refresh", "incremental"], + "source_defined_cursor": true, + "default_cursor_field": ["invoice_date"], + "source_defined_primary_key": [["id"]] + }, + "sync_mode": "incremental", + "destination_sync_mode": "overwrite" + }, { "stream": { "name": "credits_ledger_entries", diff --git a/airbyte-integrations/connectors/source-orb/integration_tests/sample_config.json b/airbyte-integrations/connectors/source-orb/integration_tests/sample_config.json index 08ebf5bb7d0a..87547e9abe9b 100644 --- a/airbyte-integrations/connectors/source-orb/integration_tests/sample_config.json +++ b/airbyte-integrations/connectors/source-orb/integration_tests/sample_config.json @@ -1,4 +1,4 @@ { "api_key": "", - "start_date": "2023-01-25T00:00:00Z" + "start_date": "2022-01-25T00:00:00Z" } diff --git a/airbyte-integrations/connectors/source-orb/integration_tests/sample_state.json b/airbyte-integrations/connectors/source-orb/integration_tests/sample_state.json index f053686d1fc6..f87d36a5bc89 100644 --- a/airbyte-integrations/connectors/source-orb/integration_tests/sample_state.json +++ b/airbyte-integrations/connectors/source-orb/integration_tests/sample_state.json @@ -17,5 +17,8 @@ "someId": { "timeframe_start": "2022-01-01T00:00:00Z" } + }, + "invoices": { + "invoice_date": "2022-01-01T00:00:00Z" } } diff --git a/airbyte-integrations/connectors/source-orb/source_orb/schemas/invoices.json b/airbyte-integrations/connectors/source-orb/source_orb/schemas/invoices.json new file mode 100644 index 000000000000..0417de31822d --- /dev/null +++ b/airbyte-integrations/connectors/source-orb/source_orb/schemas/invoices.json @@ -0,0 +1,108 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "type": ["null", "object"], + "properties": { + "id": { + "type": "string" + }, + "created_at": { + "type": ["null", "string"], + "format": "date-time" + }, + "invoice_date": { + "type": ["string"], + "format": "date-time" + }, + "due_date": { + "type": ["string"], + "format": "date-time" + }, + "invoice_pdf": { + "type": ["null", "string"] + }, + "subtotal": { + "type": ["string"] + }, + "total": { + "type": ["string"] + }, + "amount_due": { + "type": ["string"] + }, + "status": { + "type": ["string"] + }, + "memo": { + "type": ["null", "string"] + }, + "issue_failed_at": { + "type": ["null", "string"], + "format": "date-time" + }, + "sync_failed_at": { + "type": ["null", "string"], + "format": "date-time" + }, + "payment_failed_at": { + "type": ["null", "string"], + "format": "date-time" + }, + "payment_started_at": { + "type": ["null", "string"], + "format": "date-time" + }, + "voided_at": { + "type": ["null", "string"], + "format": "date-time" + }, + "paid_at": { + "type": ["null", "string"], + "format": "date-time" + }, + "issued_at": { + "type": ["null", "string"], + "format": "date-time" + }, + "hosted_invoice_url": { + "type": ["null", "string"] + }, + "line_items": { + "type": ["array"], + "items": { + "type": "object", + "properties": { + "id": { + "type": "string" + }, + "quantity": { + "type": "number" + }, + "amount": { + "type": "string" + }, + "name": { + "type": "string" + }, + "start_date": { + "type": ["null", "string"], + "format": "date-time" + }, + "end_date": { + "type": ["null", "string"], + "format": "date-time" + } + } + } + }, + "subscription": { + "type": ["object", "null"], + "properties": { + "id": { + "type": "string" + } + } + } + }, + "required": ["id", "created_at", "invoice_date", "due_date", "subtotal", "total", "amount_due", "status"] + } + \ No newline at end of file diff --git a/airbyte-integrations/connectors/source-orb/source_orb/source.py b/airbyte-integrations/connectors/source-orb/source_orb/source.py index c3dd7b15fba9..0005a9b70407 100644 --- a/airbyte-integrations/connectors/source-orb/source_orb/source.py +++ b/airbyte-integrations/connectors/source-orb/source_orb/source.py @@ -142,7 +142,7 @@ def request_params(self, stream_state: Mapping[str, Any], **kwargs) -> MutableMa if state_based_start_timestamp: # This may (reasonably) override the existing `created_at[gte]` set based on the start_date # of the stream, as configured. - params["created_at[gte]"] = state_based_start_timestamp + params[f"{self.cursor_field}[gte]"] = state_based_start_timestamp return params @@ -448,6 +448,32 @@ def path(self, **kwargs) -> str: return "plans" +class Invoices(IncrementalOrbStream): + """ + Fetches non-draft invoices, including those that are paid, issued, void, or synced. + API Docs: https://docs.withorb.com/docs/orb-docs/api-reference/operations/list-invoices + """ + + @property + def cursor_field(self) -> str: + """ + Invoices created in the past may be newly issued, so we store state on + `invoice_date` instead. + """ + return "invoice_date" + + def path(self, **kwargs) -> str: + return "invoices" + + def request_params(self, stream_state: Mapping[str, Any], **kwargs) -> MutableMapping[str, Any]: + request_params = super().request_params(stream_state, **kwargs) + # Filter to all statuses. Note that if you're currently expecting the status of the invoice + # to update at the sink, you should periodically still expect to re-sync this connector to + # fetch updates. + request_params["status[]"] = ["void", "paid", "issued", "synced"] + return request_params + + class CreditsLedgerEntries(IncrementalOrbStream): page_size = 500 """ @@ -705,6 +731,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: Customers(authenticator=authenticator, lookback_window_days=lookback_window, start_date=start_date), Subscriptions(authenticator=authenticator, lookback_window_days=lookback_window, start_date=start_date), Plans(authenticator=authenticator, lookback_window_days=lookback_window, start_date=start_date), + Invoices(authenticator=authenticator, lookback_window_days=lookback_window), CreditsLedgerEntries( authenticator=authenticator, lookback_window_days=lookback_window, diff --git a/airbyte-integrations/connectors/source-orb/unit_tests/test_incremental_streams.py b/airbyte-integrations/connectors/source-orb/unit_tests/test_incremental_streams.py index ce1c2ae5b15b..ea59c15494a7 100644 --- a/airbyte-integrations/connectors/source-orb/unit_tests/test_incremental_streams.py +++ b/airbyte-integrations/connectors/source-orb/unit_tests/test_incremental_streams.py @@ -8,7 +8,16 @@ import responses from airbyte_cdk.models import SyncMode from pytest import fixture -from source_orb.source import CreditsLedgerEntries, Customers, IncrementalOrbStream, OrbStream, Plans, Subscriptions, SubscriptionUsage +from source_orb.source import ( + CreditsLedgerEntries, + Customers, + IncrementalOrbStream, + Invoices, + OrbStream, + Plans, + Subscriptions, + SubscriptionUsage, +) @fixture @@ -89,6 +98,34 @@ def test_request_params(patch_incremental_base_class, mocker, config, current_st assert stream.request_params(**inputs) == expected_params +@pytest.mark.parametrize( + ("config", "current_stream_state", "next_page_token", "expected_params"), + [ + ( + {}, + dict(invoice_date="2022-01-25T12:00:00+00:00"), + {"cursor": "f96594d0-8220-11ec-a8a3-0242ac120002"}, + {"invoice_date[gte]": "2022-01-25T12:00:00+00:00", "cursor": "f96594d0-8220-11ec-a8a3-0242ac120002"}, + ), + ({}, dict(invoice_date="2022-01-25T12:00:00+00:00"), None, {"invoice_date[gte]": "2022-01-25T12:00:00+00:00"}), + # Honors lookback_window_days + ( + dict(lookback_window_days=3), + dict(invoice_date="2022-01-25T12:00:00+00:00"), + None, + {"invoice_date[gte]": "2022-01-22T12:00:00+00:00"}, + ), + ({}, {}, None, None), + ], +) +def test_invoices_request_params(patch_incremental_base_class, mocker, config, current_stream_state, next_page_token, expected_params): + stream = Invoices(**config) + inputs = {"stream_state": current_stream_state, "next_page_token": next_page_token} + expected_params = expected_params or {} + expected_params["limit"] = OrbStream.page_size + expected_params["status[]"] = ['void', 'paid', 'issued', 'synced'] + assert stream.request_params(**inputs) == expected_params + # We have specific unit tests for CreditsLedgerEntries incremental stream # because that employs slicing logic @@ -125,6 +162,29 @@ def test_credits_ledger_entries_get_updated_state(mocker, current_stream_state, assert stream.get_updated_state(**inputs) == expected_state +@pytest.mark.parametrize( + ("current_stream_state", "latest_record", "expected_state"), + [ + # No state + ( + {}, + dict(invoice_date="2022-01-26T12:00:00+00:00"), + dict(invoice_date="2022-01-26T12:00:00+00:00"), + ), + # Existing state + ( + dict(invoice_date="2022-01-26T12:00:00+00:00"), + dict(invoice_date="2023-01-26T12:00:00+00:00"), + dict(invoice_date="2023-01-26T12:00:00+00:00"), + ), + ], +) +def test_invoices_get_updated_state(mocker, current_stream_state, latest_record, expected_state): + stream = Invoices() + inputs = {"current_stream_state": current_stream_state, "latest_record": latest_record} + assert stream.get_updated_state(**inputs) == expected_state + + def test_credits_ledger_entries_stream_slices(mocker): mocker.patch.object( Customers, "read_records", return_value=iter([{"id": "1", "name": "Customer Foo"}, {"id": "18", "name": "Customer Bar"}]) diff --git a/airbyte-integrations/connectors/source-orb/unit_tests/test_source.py b/airbyte-integrations/connectors/source-orb/unit_tests/test_source.py index 500855f20a85..0bf8749056dc 100644 --- a/airbyte-integrations/connectors/source-orb/unit_tests/test_source.py +++ b/airbyte-integrations/connectors/source-orb/unit_tests/test_source.py @@ -35,5 +35,5 @@ def test_streams(mocker): sample_config = {"api_key": "test-token", "start_date": "2023-01-25T00:00:00Z"} config_mock.get.side_effect = sample_config.get streams = source.streams(config_mock) - expected_streams_number = 5 + expected_streams_number = 6 assert len(streams) == expected_streams_number diff --git a/docs/integrations/sources/orb.md b/docs/integrations/sources/orb.md index 1fcf95e0ba5b..64a6b199c64a 100644 --- a/docs/integrations/sources/orb.md +++ b/docs/integrations/sources/orb.md @@ -54,6 +54,7 @@ an Orb Account and API Key. | Version | Date | Pull Request | Subject | | --- | --- | --- | --- | +| 1.1.0 | 2023-03-26 | | Add Invoices incremental stream | 1.0.0 | 2023-02-02 | [21951](https://github.com/airbytehq/airbyte/pull/21951) | Add SubscriptionUsage stream, and made `start_date` a required field | 0.1.4 | 2022-10-07 | [17761](https://github.com/airbytehq/airbyte/pull/17761) | Fix bug with enriching ledger entries with multiple credit blocks | 0.1.3 | 2022-08-26 | [16017](https://github.com/airbytehq/airbyte/pull/16017) | Add credit block id to ledger entries