Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,29 @@ used by boto will also need access to the S3 bucket configured in the above conf
---

Copyright © 2018 Stitch

---

### Run the Tap with Meltano
All of the required changes to run the Tap with [Meltano](https://meltano.com/product/) are in the most recent commit of the `run with meltano` branch.

To add the Tap to a Meltano pipeline, add this block of code to the "extractors" section of you `meltano.yml` file with the required values:

```
- name: tap-intacct
namespace: tap_intacct
pip_url: git+https://github.com/FreshConsulting/tap-intacct.git@run-with-meltano
executable: tap-intacct
config:
start_date: <desired start date> # ex: '2025-01-01T00:00:00Z'
bucket: <S3 Bucket to Target>
company_id: <AWS Company ID>
role_name: <Name of the Role to Impersonate>
account_id: <AWS Account ID>
external_id: <AWS External ID>
```
#### Functionality Limitations
The version of the Tap in the `run with meltano` branch does not have the capability to select or exclude specific streams from the source.

#### Meltano Details
Follow [this tutorial](https://docs.meltano.com/getting-started/) for instructions on how to install and run Meltano
18 changes: 17 additions & 1 deletion tap_intacct/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
from tap_intacct.sync import sync_stream
from tap_intacct import s3

from tap_intacct.fix_catalogs import process_catalog

LOGGER = singer.get_logger()

REQUIRED_CONFIG_KEYS = ["start_date", "bucket", "company_id"]
Expand All @@ -28,6 +30,14 @@ def stream_is_selected(mdata):
def do_sync(config, catalog, state):
LOGGER.info('Starting sync.')

if config.get("fix_catalog", False) :
LOGGER.info('Fixing catalog data types and keys.')

with open(config.get("fix_data_path", {})) as f:
fix_data = json.load(f)

catalog['streams'] = process_catalog(catalog['streams'], fix_data)

for stream in catalog['streams']:
stream_name = stream['tap_stream_id']
mdata = metadata.to_map(stream['metadata'])
Expand Down Expand Up @@ -57,7 +67,13 @@ def main():
break
LOGGER.warning("I have direct access to the bucket without assuming the configured role.")
except:
s3.setup_aws_client(config)
# Check if proxy_account_id and proxy_role_name are in config
if 'proxy_account_id' in config and 'proxy_role_name' in config:
# If both are present, call setup_aws_client_with_proxy
s3.setup_aws_client_with_proxy(config)
else:
# Otherwise, call setup_aws_client
s3.setup_aws_client(config)

if args.discover:
do_discover(args.config)
Expand Down
116 changes: 116 additions & 0 deletions tap_intacct/fix_catalogs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
def process_catalog(streams: list, fix_data: dict) -> list:
fix_lookup = {}
dismiss_tables = set()
fix_key_lookup = {}

# Parse Fix Configuration
for table_entry in fix_data.get("tables", []):
for table_name, table_rules in table_entry.items():

if table_name == "DISMISS_TABLES":
dismiss_tables.update(table_rules)

elif table_name == "FIX_KEY_FIELD":
for item in table_rules:
for tbl, key_field in item.items():
fix_key_lookup[tbl] = key_field

else:
fix_lookup[table_name] = table_rules

# Schema Cleaning
def clean_schema(obj):
if isinstance(obj, dict):

if "anyOf" in obj and isinstance(obj["anyOf"], list) and obj["anyOf"]:
first_option = obj["anyOf"][0]
obj.clear()
obj.update(first_option)

for key, value in list(obj.items()):
if key == "type" and isinstance(value, list) and len(value) > 2:
obj[key] = value[:2]
else:
clean_schema(value)

elif isinstance(obj, list):
for item in obj:
clean_schema(item)

# Apply Type Fixes
def apply_type_fixes(stream_dict):
stream_name = stream_dict.get("stream")

if stream_name not in fix_lookup:
return

table_rules = fix_lookup[stream_name]
properties = stream_dict.get("schema", {}).get("properties", {})

for field_name, type_changes in table_rules.items():
if field_name not in properties:
continue

field_schema = properties[field_name]
field_types = field_schema.get("type")

if not isinstance(field_types, list):
continue

for old_type, new_type in type_changes.items():
field_schema["type"] = [
new_type if t == old_type else t
for t in field_types
]

# Update Metadata
def update_metadata(stream_dict):
stream_name = stream_dict.get("stream")
metadata_list = stream_dict.setdefault("metadata", [])

# Ensure property metadata is selected
for entry in metadata_list:
breadcrumb = entry.get("breadcrumb", [])

if len(breadcrumb) == 2 and breadcrumb[0] == "properties":
entry.setdefault("metadata", {})["selected"] = True

if len(breadcrumb) == 0:
entry.setdefault("metadata", {})["selected"] = True

# Inject table key if configured
if stream_name in fix_key_lookup:
key_field = fix_key_lookup[stream_name]

root_meta = next(
(e for e in metadata_list if e.get("breadcrumb", []) == []),
None
)

if not root_meta:
root_meta = {"breadcrumb": [], "metadata": {}}
metadata_list.append(root_meta)

root_meta.setdefault("metadata", {})
root_meta["metadata"]["table-key-properties"] = [key_field]
root_meta["metadata"]["selected"] = True

# Main Processing
processed_streams = []

for stream_dict in streams:
stream_name = stream_dict.get("stream")

if not stream_name:
continue

if stream_name in dismiss_tables:
continue

clean_schema(stream_dict)
apply_type_fixes(stream_dict)
update_metadata(stream_dict)

processed_streams.append(stream_dict)

return processed_streams
2 changes: 1 addition & 1 deletion tap_intacct/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ def get_input_files_for_table(config, table_name, modified_since=None):
prefix = str.join('/', [path, company_id]) if path else company_id
s3_objects = list_files_in_bucket(bucket, prefix)

pattern = "^" + prefix + '/' + table_name + "\.*\.csv"
pattern = "^" + prefix + '/' + table_name + "(?:\..*)?\.csv"
matcher = re.compile(pattern)

LOGGER.info(
Expand Down