Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 22 additions & 5 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
version: 2
version: 2.1
jobs:
build:
docker:
- image: 218546966473.dkr.ecr.us-east-1.amazonaws.com/circle-ci:tap-tester
- image: 218546966473.dkr.ecr.us-east-1.amazonaws.com/circle-ci:tap-tester-v4
steps:
- checkout
- run:
Expand All @@ -12,12 +12,28 @@ jobs:
source /usr/local/share/virtualenvs/tap-eloqua/bin/activate
pip install -U 'pip<19.2' 'setuptools<51.0.0'
pip install .[dev]
- add_ssh_keys
- run:
name: 'pylint'
command: |
source /usr/local/share/virtualenvs/tap-eloqua/bin/activate
pylint tap_eloqua -d C,R,W
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we use the standard list of disables?

- run:
name: 'Unit Tests'
command: |
source /usr/local/share/virtualenvs/tap-eloqua/bin/activate
pip install nose coverage
nosetests --with-coverage --cover-erase --cover-package=tap_eloqua --cover-html-dir=htmlcov tests/unittests
coverage html
- store_test_results:
path: test_output/report.xml
- store_artifacts:
path: htmlcov
workflows:
version: 2
commit:
jobs:
- build
- build:
context: circleci-user
build_daily:
triggers:
- schedule:
Expand All @@ -27,4 +43,5 @@ workflows:
only:
- master
jobs:
- build
- build:
context: circleci-user
8 changes: 7 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -102,4 +102,10 @@ test.*.json
rsa-key
tags
singer-check-tap-data
state.json
state.json

catalog.json
get_catalog.py
persist/
configs/
current_state/
9 changes: 5 additions & 4 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,15 @@
classifiers=['Programming Language :: Python :: 3 :: Only'],
py_modules=['tap_eloqua'],
install_requires=[
'backoff==1.3.2',
'backoff==1.8.0',
'requests==2.20.1',
'pendulum==2.0.3',
'singer-python==5.2.0'
'singer-python==5.13.0'
],
extras_require={
'dev': [
'ipdb==0.11'
"dev": [
"pylint",
"ipdb",
]
},
entry_points='''
Expand Down
102 changes: 7 additions & 95 deletions tap_eloqua/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,7 @@

import sys
import json
import argparse

import singer
from singer import metadata

from tap_eloqua.client import EloquaClient
from tap_eloqua.discover import discover
from tap_eloqua.sync import sync
Expand All @@ -27,101 +23,17 @@ def do_discover(client):
json.dump(catalog.to_dict(), sys.stdout, indent=2)
LOGGER.info('Finished discover')

##### TEMP

from singer.catalog import Catalog

def check_config(config, required_keys):
missing_keys = [key for key in required_keys if key not in config]
if missing_keys:
raise Exception("Config is missing required keys: {}".format(missing_keys))

def load_json(path):
with open(path) as fil:
return json.load(fil)

def parse_args(required_config_keys):
'''Parse standard command-line args.

Parses the command-line arguments mentioned in the SPEC and the
BEST_PRACTICES documents:

-c,--config Config file
-s,--state State file
-d,--discover Run in discover mode
-p,--properties Properties file: DEPRECATED, please use --catalog instead
--catalog Catalog file

Returns the parsed args object from argparse. For each argument that
point to JSON files (config, state, properties), we will automatically
load and parse the JSON file.
'''
parser = argparse.ArgumentParser()

parser.add_argument(
'-c', '--config',
help='Config file',
required=True)

parser.add_argument(
'-s', '--state',
help='State file')

parser.add_argument(
'-p', '--properties',
help='Property selections: DEPRECATED, Please use --catalog instead')

parser.add_argument(
'--catalog',
help='Catalog file')

parser.add_argument(
'-d', '--discover',
action='store_true',
help='Do schema discovery')

args = parser.parse_args()
if args.config:
setattr(args, 'config_path', args.config)
args.config = load_json(args.config)
if args.state:
setattr(args, 'state_path', args.state)
args.state = load_json(args.state)
else:
args.state = {}
if args.properties:
setattr(args, 'properties_path', args.properties)
args.properties = load_json(args.properties)
if args.catalog:
setattr(args, 'catalog_path', args.catalog)
args.catalog = Catalog.load(args.catalog)

check_config(args.config, required_config_keys)

return args

#####

@singer.utils.handle_top_exception(LOGGER)
def main():
#parsed_args = singer.utils.parse_args(REQUIRED_CONFIG_KEYS)
parsed_args = parse_args(REQUIRED_CONFIG_KEYS)

with EloquaClient(parsed_args.config_path,
parsed_args.config['client_id'],
parsed_args.config['client_secret'],
parsed_args.config['refresh_token'],
parsed_args.config['redirect_uri'],
parsed_args.config.get('user_agent')) as client:

if parsed_args.discover:
args = singer.parse_args(REQUIRED_CONFIG_KEYS)
if args.dev:
LOGGER.warning("Executing Tap in Dev mode")
with EloquaClient(args.config_path, args.config, args.dev) as client:
if args.discover:
do_discover(client)
elif parsed_args.catalog:
sync(client,
parsed_args.catalog,
parsed_args.state,
parsed_args.config['start_date'],
int(parsed_args.config.get('bulk_page_size', 5000)))
elif args.catalog:
sync(client, args.catalog, args.state, args.config)

if __name__ == "__main__":
main()
71 changes: 37 additions & 34 deletions tap_eloqua/client.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,28 @@
import json
from datetime import datetime, timedelta
from datetime import timedelta

import backoff
import requests
from requests.exceptions import ConnectionError
from singer import metrics
from singer import metrics,get_logger
from singer.utils import strptime_to_utc,now,strftime
from .utils import write_config
LOGGER = get_logger()


class Server5xxError(Exception):
pass


class EloquaClient(object):
def __init__(self,
config_path,
client_id,
client_secret,
refresh_token,
redirect_uri,
user_agent):
def __init__(self,config_path, config, dev_mode=False):
self.__config_path = config_path
self.__client_id = client_id
self.__client_secret = client_secret
self.__refresh_token = refresh_token
self.__redirect_uri = redirect_uri
self.__user_agent = user_agent
self.config = config
self.__client_id = config["client_id"]
self.__client_secret = config["client_secret"]
self.__refresh_token = config["refresh_token"]
self.__redirect_uri = config["redirect_uri"]
self.__user_agent = config.get("user_agent","")
self.dev_mode = dev_mode
self.__access_token = None
self.__expires = None
self.__session = requests.Session()
Expand All @@ -40,7 +40,16 @@ def __exit__(self, type, value, traceback):
max_tries=5,
factor=2)
def get_access_token(self):
if self.__access_token is not None and self.__expires > datetime.utcnow():
if self.dev_mode:
try:
self.__access_token = self.config['access_token']
self.__expires=strptime_to_utc(self.config['expires_in'])
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
self.__expires=strptime_to_utc(self.config['expires_in'])
self.__expires = strptime_to_utc(self.config['expires_in'])

except KeyError as ex:
raise Exception("Unable to locate key in config") from ex
if not self.__access_token or self.__expires < now():
raise Exception("Access Token in config is expired, unable to authenticate in dev mode")

if self.__access_token and self.__expires > now():
return

headers = {}
Expand All @@ -63,31 +72,25 @@ def get_access_token(self):

if response.status_code != 200:
eloqua_response = response.json()
eloqua_response.update(
{'status': response.status_code})
raise Exception(
'Unable to authenticate (Eloqua response: `{}`)'.format(
eloqua_response))
eloqua_response.update({'status': response.status_code})
raise Exception('Unable to authenticate (Eloqua response: `{}`)'.format(eloqua_response))

data = response.json()

self.__access_token = data['access_token']
self.__refresh_token = data['refresh_token']
expires_in_seconds = data['expires_in'] - 10 # pad by 10 seconds
self.__expires = now() + timedelta(seconds=expires_in_seconds)

## refresh_token rotates on every reauth
with open(self.__config_path) as file:
config = json.load(file)
config['refresh_token'] = data['refresh_token']
with open(self.__config_path, 'w') as file:
json.dump(config, file, indent=2)

expires_seconds = data['expires_in'] - 10 # pad by 10 seconds
self.__expires = datetime.utcnow() + timedelta(seconds=expires_seconds)
if not self.dev_mode:
update_config_keys = {
"refresh_token":self.__refresh_token,
"access_token":self.__access_token,
"expires_in": strftime(self.__expires)
Comment on lines +86 to +88
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"refresh_token":self.__refresh_token,
"access_token":self.__access_token,
"expires_in": strftime(self.__expires)
"refresh_token": self.__refresh_token,
"access_token": self.__access_token,
"expires_in": strftime(self.__expires)

}
self.config = write_config(self.__config_path,update_config_keys)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
self.config = write_config(self.__config_path,update_config_keys)
self.config = write_config(self.__config_path, update_config_keys)


def get_base_urls(self):
data = self.request('GET',
url='https://login.eloqua.com/id',
endpoint='base_url')
data = self.request('GET',url='https://login.eloqua.com/id',endpoint='base_url')
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
data = self.request('GET',url='https://login.eloqua.com/id',endpoint='base_url')
data = self.request('GET', url='https://login.eloqua.com/id', endpoint='base_url')

self.__base_url = data['urls']['base']

@backoff.on_exception(backoff.expo,
Expand Down
16 changes: 9 additions & 7 deletions tap_eloqua/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,8 +173,8 @@ def sync_bulk_obj(client, catalog, state, start_date, stream_name, bulk_page_siz
bulk_page_size,
last_date,
offset=last_offset)
except HTTPError as e:
if e.response.status_code in [404, 410]:
except HTTPError as ex:
if ex.response.status_code in [404, 410]:
LOGGER.info('{} - Previous export expired: {}'.format(stream_name, last_sync_id))
else:
raise
Expand Down Expand Up @@ -377,12 +377,12 @@ def sync_activity_stream(client,
activity_type):
finished = False
sync_start = pendulum.now('UTC')
end_date = sync_start
end_date, last_sync_date = sync_start, None
Copy link
Copy Markdown

@RushiT0122 RushiT0122 Jan 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these changes in sync.py dev-mode related? If not can we keep it separate from dev-mode changes.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And if these are required changes then update the description on this PR.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@RushiT0122
These are not dev mode related changes, but if you see the code from Line:380 - Line:403
last_sync_date was declared on Line:385 in the try statement, but accessed outside its declared scope in the except block, hence it was introduced here.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm pretty sure this is a problem in other languages, but not python

>>> for i in range(3):
...   try:
...     print("About to initialize `my_variable`")
...     my_variable = i
...     print(f"my_variable: {my_variable}")
...     raise RuntimeError(f"Problem occured with {my_variable}")
...   except Exception as err:
...     print(f"Caught {type(err)} with message {str(err)}")
...     print(f"Current value of my_variable {my_variable}")
...
About to initialize `my_variable`
my_variable: 0
Caught <class 'RuntimeError'> with message Problem occured with 0
Current value of my_variable 0
About to initialize `my_variable`
my_variable: 1
Caught <class 'RuntimeError'> with message Problem occured with 1
Current value of my_variable 1
About to initialize `my_variable`
my_variable: 2
Caught <class 'RuntimeError'> with message Problem occured with 2
Current value of my_variable 2

while not finished:
try:
# Get latest bookmark to adjust time window from, if needed
last_date_raw = get_bulk_bookmark(state, stream_name).get('datetime', start_date)
last_date = pendulum.parse(last_date_raw)
last_sync_date = pendulum.parse(last_date_raw)

update_current_stream(state, stream_name)
sync_bulk_obj(client,
Expand All @@ -399,12 +399,14 @@ def sync_activity_stream(client,
# If not done, sync again to now()
end_date = sync_start
except ActivityExportTooLarge as ex:
LOGGER.warn(ex)
end_date = last_date.add(seconds=(end_date - last_date).total_seconds() / 2)
LOGGER.warning(ex)
end_date = last_sync_date.add(seconds=(end_date - last_sync_date).total_seconds() / 2)
if end_date > sync_start:
end_date = sync_start

def sync(client, catalog, state, start_date, bulk_page_size):
def sync(client, catalog, state, config):
start_date = config['start_date']
bulk_page_size = int(config.get('bulk_page_size', 5000))
selected_streams = get_selected_streams(catalog)

if not selected_streams:
Expand Down
18 changes: 18 additions & 0 deletions tap_eloqua/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from typing import Dict
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is unused

import json
from singer import get_logger

LOGGER = get_logger()


def write_config(config_path,data) :
"""
Updates the provided filepath with json format of the `data` object
does a safe write by performing a read before write, updates only specific keys, does not rewrite.
"""
with open(config_path,'r') as tap_config:
config = json.load(tap_config)
config.update(data)
with open(config_path,'w') as tap_config:
json.dump(config, tap_config, indent=2)
return config
Loading