Skip to content

Commit dca9949

Browse files
authored
Merge pull request #1 from GeneralYadoc/develop
Modify for initial release.
2 parents 51fc026 + 7d55d1c commit dca9949

5 files changed

Lines changed: 276 additions & 42 deletions

File tree

README.md

Lines changed: 149 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1,149 @@
1-
# StreamChatAgent
1+
# StreamChatAgent
2+
YouTube chat poller which can get massages very smothly by using internal queue.
3+
4+
## The user of this library can
5+
- receive YouTube chat messages continiously by registering callback.
6+
- natively obtain high performance by using internal queue.
7+
8+
## Hou to install
9+
- Clone this repository.<br>
10+
```clone
11+
$ clone https://github.com/GeneralYadoc/StreamChatAgent.git
12+
```
13+
- Change directory to the root of the repository.<br>
14+
```cd
15+
$ cd StreamChatAgent
16+
```
17+
- Install package to your environment.<br>
18+
```install
19+
pip install .
20+
```
21+
22+
## How to use
23+
24+
- Sample codes
25+
``` sample.py
26+
import sys
27+
import re
28+
import StreamChatAgent as sca # Import this.
29+
30+
# callback for getting YouTube chat items
31+
# You can implement several processes in it.
32+
# This example prints datetime, ahthor name, message, of each item.
33+
def get_item_cb(c):
34+
print(f"{c.datetime} [{c.author.name}]- {c.message}")
35+
36+
# pre putting queue filter
37+
# You can edit YouTube chat items before putting internal queue.
38+
# You can avoid putting internal queue by returning None.
39+
# This example removes items whose message consists of stamps only.
40+
def pre_filter_cb(c):
41+
return None if re.match(r'^(:[^:]+:)+$', c.message) else c
42+
43+
# post getting queue filter
44+
# You can edit YouTube chat items after popping internal queue.
45+
# You can avoid sending items to get_item_cb by returning None.
46+
# This example removes stamps from message of items.
47+
def post_filter_cb(c):
48+
c.message = re.sub(r':[^:]+:','', c.message)
49+
return c
50+
51+
# Video ID is given from command line in this example.
52+
if len(sys.argv) <= 1:
53+
exit(0)
54+
55+
# Create StreamChatAgent instance.
56+
agent = sca.StreamChatAgent( video_id=sys.argv[1],
57+
get_item_cb=get_item_cb,
58+
pre_filter_cb=pre_filter_cb,
59+
post_filter_cb=post_filter_cb )
60+
61+
# Start async getting YouTube chat items.
62+
# Then get_item_cb is called continuosly.
63+
agent.start()
64+
65+
# Wait any key inputted from keyboad.
66+
input()
67+
68+
# Finish getting items.
69+
# Internal thread will stop soon.
70+
agent.disconnect()
71+
72+
# Wait terminating internal threads.
73+
agent.join()
74+
75+
del agent
76+
```
77+
78+
- Output of the sample
79+
```output
80+
% ./test.py MB57rMXXXXs
81+
2023-05-19 05:21:26 [John]- Hello!
82+
2023-05-19 05:21:27 [Kelly]- Hello everyone!
83+
2023-05-19 05:21:27 [Taro]- Welcome to our stream.
84+
```
85+
## Arguments of Constractor
86+
- StreamChatAgent object can be configured with following arguments of its constractor.
87+
88+
| name | description | default |
89+
|------|------------|---------|
90+
| video_id | String following after 'v=' in url of target YouTube live | - |
91+
| get_item_cb | Chat items are thrown to this callback | - |
92+
| pre_filter_cb | Filter set before internal queue | None |
93+
| post_filter_cb | Filter set between internal queue and get_item_cb | None |
94+
| max_queue_size | Max slots of internal queue (0 is no limit) | 1000 |
95+
| interval_sec | Polling interval of picking up items from YouTube | 0.01 \[sec\] |
96+
97+
## Methods
98+
### start()
99+
- Start polling and calling user callbacks asyncronously.
100+
- No arguments required, nothing returns.
101+
102+
### join()
103+
- Wait terminating internal threads kicked by start().
104+
- No arguments required, nothing returns.
105+
106+
### connect()
107+
- Start polling and calling user callbacks syncronously.
108+
- Lines following the call of the method never executen before terminate of internal threads.
109+
- No arguments required, nothing returns.
110+
111+
### disconnect()
112+
- Request to terminate polling and calling user callbacks.
113+
- Internal process will be terminated soon after.
114+
- No arguments required, nothing returns.
115+
116+
And other [threading.Thread](https://docs.python.org/3/library/threading.html) public pethods are available.
117+
118+
## Callbacks
119+
### get_item_callback
120+
- Callback for getting YouTube chat items.
121+
- You can implement several processes in it.
122+
- YouTube chat item is thrown as an argument.
123+
- It's not be assumed that any values are returned.
124+
### pre_filter_callback
125+
- pre putting queue filter.
126+
- YouTube chat item is thrown as an argument.
127+
- You can edit YouTube chat items before putting internal queue.
128+
- It's required that edited chat item is returned.
129+
- You can avoid putting internal queue by returning None.
130+
### post_filter_callback
131+
- post getting queue filter
132+
- You can edit YouTube chat items after popping internal queue.
133+
- It's required that edited chat item is returned.
134+
- You can avoid sending items to get_item_cb by returning None.
135+
136+
137+
## Type of YouTube Chat item
138+
- Please refer [pytchat README](https://github.com/taizan-hokuto/pytchat)
139+
140+
## Concept of design
141+
- Putting thread is separated from getting thread in order to avoid locking polling.<br>
142+
Unexpected sleep of pytchat may reduce by ths approach.
143+
- If internal queue is full when putting thread try to push new item, oldest item is removed from the queue before pushing.
144+
![](ReadMeParts/concept.png)
145+
146+
## Links
147+
StreamingChaatAgent uses following libraries internally.
148+
149+
- [pytchat](https://github.com/taizan-hokuto/pytchat) &emsp; Python library for fetching youtube live chat.

ReadMeParts/concept.png

21.6 KB
Loading

samples/sample.py

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
import sys
2+
import re
3+
import StreamChatAgent as sca # Import this.
4+
5+
# callback for getting YouTube chat items
6+
# You can implement several processes in it.
7+
# This example prints datetime, ahthor name, message, of each item.
8+
def get_item_cb(c):
9+
print(f"{c.datetime} [{c.author.name}]- {c.message}")
10+
11+
# pre putting queue filter
12+
# You can edit YouTube chat items before putting internal queue.
13+
# You can avoid putting internal queue by returning None.
14+
# This example removes items whose message consists of stamps only.
15+
def pre_filter_cb(c):
16+
return None if re.match(r'^(:[^:]+:)+$', c.message) else c
17+
18+
# post getting queue filter
19+
# You can edit YouTube chat items after popping internal queue.
20+
# You can avoid sending items to get_item_cb by returning None.
21+
# This example removes stamps from message of items.
22+
def post_filter_cb(c):
23+
c.message = re.sub(r':[^:]+:','', c.message)
24+
return c
25+
26+
# Video ID is given from command line in this example.
27+
if len(sys.argv) <= 1:
28+
exit(0)
29+
30+
# Create StreamChatAgent instance.
31+
agent = sca.StreamChatAgent( video_id=sys.argv[1],
32+
get_item_cb=get_item_cb,
33+
pre_filter_cb=pre_filter_cb,
34+
post_filter_cb=post_filter_cb )
35+
36+
# Start async getting YouTube chat items.
37+
# Then get_item_cb is called continuosly.
38+
agent.start()
39+
40+
# Wait any key inputted from keyboad.
41+
input()
42+
43+
# Finish getting items.
44+
# Internal thread will stop soon.
45+
agent.disconnect()
46+
47+
# Wait terminating internal threads.
48+
agent.join()
49+
50+
del agent
51+

src/StreamChatAgent.py

Lines changed: 76 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1,72 +1,107 @@
11
import time
22
import threading
33
import queue
4+
import math
45
import pytchat
56

6-
class StreamChatAgent:
7+
class StreamChatAgent(threading.Thread):
78
def __init__( self,
89
video_id,
910
get_item_cb,
1011
pre_filter_cb=None,
1112
post_filter_cb=None,
12-
max_queue_size=0,
13-
interval_sec=0.001 ):
14-
self.get_item_cb = get_item_cb
15-
self.pre_filter_cb = pre_filter_cb
16-
self.post_filter_cb = post_filter_cb
17-
self.item_queue = queue.Queue(max_queue_size)
18-
self.interval_sec = interval_sec
19-
self.alive = False
13+
max_queue_size=1000,
14+
interval_sec=0.01 ):
15+
self.__get_item_cb = get_item_cb
16+
self.__pre_filter_cb = pre_filter_cb
17+
self.__post_filter_cb = post_filter_cb
18+
self.__item_queue = queue.Queue(max_queue_size)
19+
self.__interval_sec = interval_sec
20+
self.__keeping_connection = False
2021

21-
self.chat = pytchat.create(video_id=video_id)
22+
self.__chat = pytchat.create(video_id=video_id)
2223

23-
self.my_put_thread = threading.Thread(target=self.put_items)
24-
self.my_get_thread = threading.Thread(target=self.get_items)
24+
self.__my_put_thread = threading.Thread(target=self.__put_items)
25+
self.__my_get_thread = threading.Thread(target=self.__get_items)
26+
27+
super(StreamChatAgent, self).__init__(daemon=True)
2528

2629
def connect(self):
27-
self.alive = True
28-
self.my_put_thread.start()
29-
self.my_get_thread.start()
30-
self.my_get_thread.join()
31-
self.my_put_thread.join()
30+
self.start()
31+
self.join()
32+
33+
def run(self):
34+
self.__keeping_connection = True
35+
self.__my_put_thread.start()
36+
self.__my_get_thread.start()
37+
self.__my_get_thread.join()
38+
self.__my_put_thread.join()
3239

3340
def disconnect(self):
34-
self.alive = False
41+
self.__keeping_connection = False
3542

36-
def is_chat_alive(self, immediate=False, retry_count=5, sleep=1.0):
43+
def __is_alive(self, immediate=True, wait_sec=0):
44+
if not self.__my_get_thread.is_alive() and not self.__my_put_thread.is_alive():
45+
return False
46+
47+
retry_count = math.floor(wait_sec / 0.01)
3748
if immediate:
3849
retry_count=1
50+
51+
steps = 0 if retry_count == 0 else 1
3952
i = 0
4053
while True:
41-
if not self.alive:
54+
if not self.__keeping_connection:
4255
return False
43-
if self.chat.is_alive():
56+
if self.__chat.is_alive():
4457
return True
45-
i += 1
46-
if retry_count > 0 and i >= retry_count:
58+
i += steps
59+
if retry_count != 0 and i >= retry_count:
4760
return False
48-
time.sleep(sleep)
61+
time.sleep(0.01)
4962

50-
def put_items(self):
51-
while self.alive and self.is_chat_alive():
52-
for c in self.chat.get().sync_items():
53-
if not self.alive:
63+
def __put_items(self):
64+
start_time = time.time()
65+
while self.__is_alive(immediate=False):
66+
for c in self.__chat.get().sync_items():
67+
if not self.__keeping_connection:
5468
break
5569
prefiltered_c = c
56-
if self.pre_filter_cb:
57-
prefiltered_c = self.pre_filter_cb(c)
70+
if self.__pre_filter_cb:
71+
prefiltered_c = self.__pre_filter_cb(c)
5872
if prefiltered_c:
59-
if self.item_queue.full():
60-
self.item_queue.get()
61-
self.item_queue.put(prefiltered_c)
73+
if self.__item_queue.full():
74+
self.__item_queue.get()
75+
self.__item_queue.put(prefiltered_c)
76+
self.__sleep_from(start_time)
77+
start_time = time.time()
78+
self.__keeping_connection = False
79+
6280

63-
def get_items(self):
64-
while self.alive and self.is_chat_alive():
65-
while self.alive and not self.item_queue.empty():
66-
c = self.item_queue.get()
81+
def __get_items(self):
82+
start_time = time.time()
83+
while self.__is_alive(immediate=False):
84+
while self.__keeping_connection and not self.__item_queue.empty():
85+
c = self.__item_queue.get()
6786
postfiltered_c = c
68-
if self.post_filter_cb:
69-
postfiltered_c = self.post_filter_cb(c)
87+
if self.__post_filter_cb:
88+
postfiltered_c = self.__post_filter_cb(c)
7089
if postfiltered_c:
71-
self.get_item_cb(postfiltered_c)
72-
time.sleep(self.interval_sec)
90+
self.__get_item_cb(postfiltered_c)
91+
self.__sleep_from(start_time, 0.01)
92+
start_time = time.time()
93+
self.__keeping_connection = False
94+
95+
def __sleep_from(self, start_time, interval_sec=None):
96+
if not interval_sec:
97+
interval_sec = self.__interval_sec
98+
cur_time = time.time()
99+
sleep = interval_sec - (cur_time - start_time)
100+
if sleep > 0.:
101+
sleep_counter = math.floor(sleep * 10)
102+
sleep_frac = sleep - (sleep_counter / 10.)
103+
for i in range(sleep_counter):
104+
if not self.__keeping_connection:
105+
break
106+
time.sleep(0.1)
107+
time.sleep(sleep_frac)
File renamed without changes.

0 commit comments

Comments
 (0)