Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,25 @@ The tap requires a [Slack API token](https://github.com/slackapi/python-slackcli
- `channels:history`
- `users:read`

Create a config file containing the API token and a start date, e.g.:

```json
{
"token":"xxxx",
"start_date":"2020-05-01T00:00:00"
}
```

Optionally, you can also specify whether you want to sync private channels or not by adding the following to the config:

```json
"private_channels":false
```

By default, private channels will be synced, and therefore you will also need these additional scopes:
- `groups:read`
- `groups:history`

## Usage

It is recommended to follow Singer [best practices](https://github.com/singer-io/getting-started/blob/master/docs/RUNNING_AND_DEVELOPING.md#running-and-developing-singer-taps-and-targets) when running taps either [on their own](https://github.com/singer-io/getting-started/blob/master/docs/RUNNING_AND_DEVELOPING.md#running-a-singer-tap) or [with a Singer target](https://github.com/singer-io/getting-started/blob/master/docs/RUNNING_AND_DEVELOPING.md#running-a-singer-tap-with-a-singer-target).
Expand Down
79 changes: 42 additions & 37 deletions tap_slack/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,17 @@ def write_schema(self):
def write_state(self):
return singer.write_state(self.state)

def channels(self):
types = "public_channel"
enable_private_channels = self.config.get("private_channels", True)
if enable_private_channels:
types = "public_channel,private_channel"

for page in self.webclient.conversations_list(limit=100, exclude_archived='false', types=types):
channels = page.get('channels')
for channel in channels:
yield channel


class ConversationsStream(SlackStream):
name = 'conversations'
Expand All @@ -45,13 +56,11 @@ def sync(self, mdata):

with singer.metrics.job_timer(job_type='list_conversations') as timer:
with singer.metrics.record_counter(endpoint=self.name) as counter:
for page in self.webclient.conversations_list(limit=100, exclude_archived='false', types="public_channel,private_channel"):
channels = page.get('channels')
for channel in channels:
with singer.Transformer(integer_datetime_fmt="unix-seconds-integer-datetime-parsing") as transformer:
transformed_record = transformer.transform(data=channel, schema=schema, metadata=metadata.to_map(mdata))
singer.write_record(stream_name=self.name, time_extracted=singer.utils.now(), record=transformed_record)
counter.increment()
for channel in self.channels():
with singer.Transformer(integer_datetime_fmt="unix-seconds-integer-datetime-parsing") as transformer:
transformed_record = transformer.transform(data=channel, schema=schema, metadata=metadata.to_map(mdata))
singer.write_record(stream_name=self.name, time_extracted=singer.utils.now(), record=transformed_record)
counter.increment()


class ConversationMembersStream(SlackStream):
Expand All @@ -67,20 +76,18 @@ def sync(self, mdata):

with singer.metrics.job_timer(job_type='list_conversation_members') as timer:
with singer.metrics.record_counter(endpoint=self.name) as counter:
for page in self.webclient.conversations_list(limit=100, exclude_archived='false', types="public_channel,private_channel"):
channels = page.get('channels')
for channel in channels:
channel_id = channel.get('id')
for page in self.webclient.conversations_members(channel=channel_id):
members = page.get('members')
for member in members:
data = {}
data['channel_id'] = channel_id
data['user_id'] = member
with singer.Transformer() as transformer:
transformed_record = transformer.transform(data=data, schema=schema, metadata=metadata.to_map(mdata))
singer.write_record(stream_name=self.name, time_extracted=singer.utils.now(), record=transformed_record)
counter.increment()
for channel in self.channels():
channel_id = channel.get('id')
for page in self.webclient.conversations_members(channel=channel_id):
members = page.get('members')
for member in members:
data = {}
data['channel_id'] = channel_id
data['user_id'] = member
with singer.Transformer() as transformer:
transformed_record = transformer.transform(data=data, schema=schema, metadata=metadata.to_map(mdata))
singer.write_record(stream_name=self.name, time_extracted=singer.utils.now(), record=transformed_record)
counter.increment()


class ConversationHistoryStream(SlackStream):
Expand All @@ -96,22 +103,20 @@ def sync(self, mdata):

with singer.metrics.job_timer(job_type='list_conversation_history') as timer:
with singer.metrics.record_counter(endpoint=self.name) as counter:
for page in self.webclient.conversations_list(limit=100, exclude_archived='false', types="public_channel,private_channel"):
channels = page.get('channels')
for channel in channels:
channel_id = channel.get('id')
for page in self.webclient.conversations_history(channel=channel_id):
messages = page.get('messages')
for message in messages:
data = {}
data['channel_id'] = channel_id
data = {**data, **message}
with singer.Transformer(integer_datetime_fmt="unix-seconds-integer-datetime-parsing") as transformer:
transformed_record = transformer.transform(data=data, schema=schema, metadata=metadata.to_map(mdata))
singer.write_record(stream_name=self.name, time_extracted=singer.utils.now(), record=transformed_record)
counter.increment()
#TODO: handle rate limiting better than this.
time.sleep(1)
for channel in self.channels():
channel_id = channel.get('id')
for page in self.webclient.conversations_history(channel=channel_id):
messages = page.get('messages')
for message in messages:
data = {}
data['channel_id'] = channel_id
data = {**data, **message}
with singer.Transformer(integer_datetime_fmt="unix-seconds-integer-datetime-parsing") as transformer:
transformed_record = transformer.transform(data=data, schema=schema, metadata=metadata.to_map(mdata))
singer.write_record(stream_name=self.name, time_extracted=singer.utils.now(), record=transformed_record)
counter.increment()
#TODO: handle rate limiting better than this.
time.sleep(1)


class UsersStream(SlackStream):
Expand Down