dynamic-singer, Python API, Dynamic source, Dynamic target, N targets, Prometheus exporter, realtime transformation for Singer ETL
This library is an extension for singer.io for easier deployment, metrics, auto-detect schema, realtime transformation and sinking to multiple targets. Read more about singer.io at https://www.singer.io/.
dynamic-singer also able to run in Jupyter Notebook.
pip install dynamic-singerIf you are familiar with singer, we know to start sourcing from Tap to Target required Bash | command, example,
tap-fixerio --config fixerio-config.json | target-gsheet --config gsheet-config.jsonFor dynamic-singer, you can run this using Python interface,
import dynamic_singer as dsinger
source = dsinger.Source('tap-fixerio --config fixerio-config.json')
source.add('target-gsheet --config gsheet-config.json')
source.start()Check google spreadsheet here, link
Full example, check example/fixerio-gsheet.ipynb.
Now we want to keep track metrics from Tap and Targets, by default we cannot do it using singer because singer using Bash pipe |, to solve that, we need to do something like,
tap | prometheus | target | prometheusBut prometheus need to understand the pipe. And nobody got time for that. Do not worry, by default dynamic-singer already enable prometheus exporter. dynamic-singer captures,
- output rates from tap
- data size from tap
- output rates from target
- data size from target
import dynamic_singer as dsinger
source = dsinger.Source('tap-fixerio --config fixerio-config.json')
source.add('target-gsheet --config gsheet-config.json')
source.start()So if you go to http://localhost:8000,
# HELP total_tap_fixerio_total total rows tap_fixerio
# TYPE total_tap_fixerio_total counter
total_tap_fixerio_total 4.0
# TYPE total_tap_fixerio_created gauge
total_tap_fixerio_created 1.5887420455044758e+09
# HELP data_size_tap_fixerio summary of data size tap_fixerio (KB)
# TYPE data_size_tap_fixerio summary
data_size_tap_fixerio_count 4.0
data_size_tap_fixerio_sum 0.738
# TYPE data_size_tap_fixerio_created gauge
data_size_tap_fixerio_created 1.588742045504552e+09
total_target_gsheet_total 4.0
# TYPE total_target_gsheet_created gauge
total_target_gsheet_created 1.588742045529744e+09
# HELP data_size_target_gsheet summary of data size target_gsheet (KB)
# TYPE data_size_target_gsheet summary
data_size_target_gsheet_count 4.0
data_size_target_gsheet_sum 0.196
# TYPE data_size_target_gsheet_created gauge
data_size_target_gsheet_created 1.5887420455298738e+09
Name convention simply took from tap / target name.
Let say I want to target more than 1 targets, I want to save to 2 different spreadsheets at the same time. If singer, we need to initiate pipe twice.
tap-fixerio --config fixerio-config.json | target-gsheet --config gsheet-config1.jsontap-fixerio --config fixerio-config.json | target-gsheet --config gsheet-config2.jsonIf we do this, both sheets probably got different data! Oh no!
So to add more than one target using dynamic-singer,
import dynamic_singer as dsinger
source = dsinger.Source('tap-fixerio --config fixerio-config.json')
source.add('target-gsheet --config gsheet-config.json')
source.add('target-gsheet --config gsheet-config1.json')
source.start()Check first google spreadsheet here, link
Check second google spreadsheet here, link
Full example, check example/fixerio-gsheet-twice.ipynb.
Now let say I want to transfer data from python code as a Tap, I need to write it like,
python3 tap.py | target-gsheet --config gsheet-config.jsonGood thing if using dynamic-singer, you can directly transfer data from python object into Targets.
import dynamic_singer as dsinger
class Example:
def __init__(self, size):
self.size = size
self.count = 0
def emit(self):
if self.count < self.size:
self.count += 1
return {'data': self.count}
example = Example(20)
source = dsinger.Source(example, tap_name = 'example', tap_key = 'timestamp')
source.add('target-gsheet --config gsheet-config.json')
source.start()Check google spreadsheet here, link
Full example, check example/iterator-gsheet.ipynb.
- Must has
emitmethod.
If not, it will throw an error,
ValueError: tap must a string or an object with method `emit`
emitmust returned a dict, if want to terminate, simply returnedNone.
If not, it will throw an error,
ValueError: tap.emit() must returned a dict
tap_schemamust a dict or None. If None, it will auto generate schema based ontap.emit().tap_nameis necessary, this is name for the tap.tap_keyis necessary, it acted as primary key for the tap.
If tap_key not inside the dictionary, it will throw an error,
ValueError: tap key not exist in elements from tap
Now if we look into target provided by singer.io, example like, https://github.com/singer-io/target-gsheet, or https://github.com/RealSelf/target-bigquery, to build target is complicated and must able to parse value from terminal pipe.
But with dynamic-singer, to create a target is very simple.
Let say I want to build a target that save every row from fixer-io to a text file,
import dynamic_singer as dsinger
class Target:
def __init__(self, filename):
self.f = open(filename, 'a')
def parse(self, row):
self.f.write(row)
return row
target = Target('test.txt')
source = dsinger.Source('tap-fixerio --config fixer-config.json')
source.add(target)
source.start()After that, check test.txt,
{"type": "SCHEMA", "stream": "exchange_rate", "schema": {"type": "object", "properties": {"date": {"type": "string", "format": "date-time"}}, "additionalProperties": true}, "key_properties": ["date"]}{"type": "RECORD", "stream": "exchange_rate", "record": {"GBP": "0.871002", "JPY": "115.375629", "EUR": "1.0", "date": "2020-05-05T00:00:00Z"}}{"type": "RECORD", "stream": "exchange_rate", "record": {"GBP": "0.872634", "JPY": "114.804452", "EUR": "1.0", "date": "2020-05-06T00:00:00Z"}}{"type": "STATE", "value": {"start_date": "2020-05-06"}}
Singer tap always send schema information, so remember to parse it properly.
Full example, check example/fixerio-writefile.ipynb.
When talking about transformation,
- We want to add new values in a row.
- Edit existing values in a row.
- Filter rows based on certain conditions.
dynamic-singer supported realtime transformation as simple,
import dynamic_singer as dsinger
from datetime import datetime
count = 0
def transformation(row):
global count
row['extra'] = count
count += 1
return row
example = Example(20)
source = dsinger.Source(example, tap_name = 'example-transformation', tap_key = 'timestamp')
source.add('target-gsheet --config gsheet-config.json')
source.start(transformation = transformation)Even we added new values in the row, dynamic-singer will auto generate new schema.
Full example, check example/iterator-transformation-gsheet.ipynb.
import dynamic_singer as dsinger
from datetime import datetime
def transformation(row):
if row['data'] > 5:
return row
example = Example(20)
source = dsinger.Source(example, tap_name = 'example-transformation', tap_key = 'timestamp')
source.add('target-gsheet --config gsheet-config.json')
source.start(transformation = transformation)Full example, check example/iterator-filter-gsheet.ipynb.
- Must has
parsemethod.
If not, it will throw an error,
ValueError: target must a string or an object with method `parse`
Tap from fixerio and target to gsheet.
Tap from fixerio and target to multiple gsheets.
use Python object as a Tap and target to gsheet.
Tap from fixerio and save to file using Python object as a Target.
Tap from fixerio and save to gsheet, save to file using Python object as a Target and save to bigquery.
use Python object as a Tap, transform realtime and target to gsheet.
use Python object as a Tap, filter realtime and target to gsheet.
class Source:
def __init__(
self,
tap,
tap_schema: Dict = None,
tap_name: str = None,
tap_key: str = None,
port: int = 8000,
):
"""
Parameters
----------
tap: str / object
tap source.
tap_schema: Dict, (default=None)
data schema if tap an object. If `tap_schema` is None, it will auto generate schema.
tap_name: str, (default=None)
name for tap, necessary if tap is an object. it will throw an error if not a string if tap is an object.
tap_key: str, (default=None)
important non-duplicate key from `tap.emit()`, usually a timestamp.
port: int, (default=8000)
prometheus exporter port.
"""def add(self, target):
"""
Parameters
----------
target: str / object
target source.
"""def get_targets(self):
"""
Returns
----------
result: list of targets
"""def delete_target(self, index: int):
"""
Parameters
----------
index: int
target index from `get_targets()`.
"""def start(
self,
transformation: Callable = None,
asynchronous: bool = False,
debug: bool = True,
):
"""
Parameters
----------
transformation: Callable, (default=None)
a callable variable to transform tap data, this will auto generate new data schema.
debug: bool, (default=True)
If True, will print every rows emitted and parsed.
asynchronous: bool, (default=False)
If True, emit to targets in async manner, else, loop from first target until last target.
"""





