Skip to content

Commit cdd180c

Browse files
authored
feat: add batch updates support (#216)
* feat: add batch updates support * fix: lint fix * fix: fix mypy * fix: fix imports * fix: updated imports * fix: lint fix
1 parent e333b58 commit cdd180c

File tree

8 files changed

+1272
-1
lines changed

8 files changed

+1272
-1
lines changed
Lines changed: 242 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,242 @@
1+
from typing import TYPE_CHECKING, List
2+
3+
from stream_chat.types.channel_batch import (
4+
ChannelBatchMemberRequest,
5+
ChannelDataUpdate,
6+
ChannelsBatchFilters,
7+
ChannelsBatchOptions,
8+
)
9+
from stream_chat.types.stream_response import StreamResponse
10+
11+
if TYPE_CHECKING:
12+
from stream_chat.async_chat.client import StreamChatAsync
13+
14+
15+
class ChannelBatchUpdater:
16+
"""
17+
Provides convenience methods for batch channel operations (async).
18+
"""
19+
20+
def __init__(self, client: "StreamChatAsync") -> None:
21+
self.client = client
22+
23+
async def add_members(
24+
self, filter: ChannelsBatchFilters, members: List[ChannelBatchMemberRequest]
25+
) -> StreamResponse:
26+
"""
27+
Adds members to channels matching the filter.
28+
29+
:param filter: The filter to match channels.
30+
:param members: List of members to add.
31+
:return: StreamResponse containing task_id.
32+
"""
33+
options: ChannelsBatchOptions = {
34+
"operation": "addMembers",
35+
"filter": filter,
36+
"members": members,
37+
}
38+
return await self.client.update_channels_batch(options)
39+
40+
async def remove_members(
41+
self, filter: ChannelsBatchFilters, members: List[ChannelBatchMemberRequest]
42+
) -> StreamResponse:
43+
"""
44+
Removes members from channels matching the filter.
45+
46+
:param filter: The filter to match channels.
47+
:param members: List of members to remove.
48+
:return: StreamResponse containing task_id.
49+
"""
50+
options: ChannelsBatchOptions = {
51+
"operation": "removeMembers",
52+
"filter": filter,
53+
"members": members,
54+
}
55+
return await self.client.update_channels_batch(options)
56+
57+
async def invite_members(
58+
self, filter: ChannelsBatchFilters, members: List[ChannelBatchMemberRequest]
59+
) -> StreamResponse:
60+
"""
61+
Invites members to channels matching the filter.
62+
63+
:param filter: The filter to match channels.
64+
:param members: List of members to invite.
65+
:return: StreamResponse containing task_id.
66+
"""
67+
options: ChannelsBatchOptions = {
68+
"operation": "inviteMembers",
69+
"filter": filter,
70+
"members": members,
71+
}
72+
return await self.client.update_channels_batch(options)
73+
74+
async def add_moderators(
75+
self, filter: ChannelsBatchFilters, members: List[ChannelBatchMemberRequest]
76+
) -> StreamResponse:
77+
"""
78+
Adds moderators to channels matching the filter.
79+
80+
:param filter: The filter to match channels.
81+
:param members: List of members to add as moderators.
82+
:return: StreamResponse containing task_id.
83+
"""
84+
options: ChannelsBatchOptions = {
85+
"operation": "addModerators",
86+
"filter": filter,
87+
"members": members,
88+
}
89+
return await self.client.update_channels_batch(options)
90+
91+
async def demote_moderators(
92+
self, filter: ChannelsBatchFilters, members: List[ChannelBatchMemberRequest]
93+
) -> StreamResponse:
94+
"""
95+
Removes moderator role from members in channels matching the filter.
96+
97+
:param filter: The filter to match channels.
98+
:param members: List of members to demote from moderators.
99+
:return: StreamResponse containing task_id.
100+
"""
101+
options: ChannelsBatchOptions = {
102+
"operation": "demoteModerators",
103+
"filter": filter,
104+
"members": members,
105+
}
106+
return await self.client.update_channels_batch(options)
107+
108+
async def assign_roles(
109+
self, filter: ChannelsBatchFilters, members: List[ChannelBatchMemberRequest]
110+
) -> StreamResponse:
111+
"""
112+
Assigns roles to members in channels matching the filter.
113+
114+
:param filter: The filter to match channels.
115+
:param members: List of members with roles to assign.
116+
:return: StreamResponse containing task_id.
117+
"""
118+
options: ChannelsBatchOptions = {
119+
"operation": "assignRoles",
120+
"filter": filter,
121+
"members": members,
122+
}
123+
return await self.client.update_channels_batch(options)
124+
125+
async def hide(
126+
self, filter: ChannelsBatchFilters, members: List[ChannelBatchMemberRequest]
127+
) -> StreamResponse:
128+
"""
129+
Hides channels matching the filter for the specified members.
130+
131+
:param filter: The filter to match channels.
132+
:param members: List of members for whom to hide channels.
133+
:return: StreamResponse containing task_id.
134+
"""
135+
options: ChannelsBatchOptions = {
136+
"operation": "hide",
137+
"filter": filter,
138+
"members": members,
139+
}
140+
return await self.client.update_channels_batch(options)
141+
142+
async def show(
143+
self, filter: ChannelsBatchFilters, members: List[ChannelBatchMemberRequest]
144+
) -> StreamResponse:
145+
"""
146+
Shows channels matching the filter for the specified members.
147+
148+
:param filter: The filter to match channels.
149+
:param members: List of members for whom to show channels.
150+
:return: StreamResponse containing task_id.
151+
"""
152+
options: ChannelsBatchOptions = {
153+
"operation": "show",
154+
"filter": filter,
155+
"members": members,
156+
}
157+
return await self.client.update_channels_batch(options)
158+
159+
async def archive(
160+
self, filter: ChannelsBatchFilters, members: List[ChannelBatchMemberRequest]
161+
) -> StreamResponse:
162+
"""
163+
Archives channels matching the filter for the specified members.
164+
165+
:param filter: The filter to match channels.
166+
:param members: List of members for whom to archive channels.
167+
:return: StreamResponse containing task_id.
168+
"""
169+
options: ChannelsBatchOptions = {
170+
"operation": "archive",
171+
"filter": filter,
172+
"members": members,
173+
}
174+
return await self.client.update_channels_batch(options)
175+
176+
async def unarchive(
177+
self, filter: ChannelsBatchFilters, members: List[ChannelBatchMemberRequest]
178+
) -> StreamResponse:
179+
"""
180+
Unarchives channels matching the filter for the specified members.
181+
182+
:param filter: The filter to match channels.
183+
:param members: List of members for whom to unarchive channels.
184+
:return: StreamResponse containing task_id.
185+
"""
186+
options: ChannelsBatchOptions = {
187+
"operation": "unarchive",
188+
"filter": filter,
189+
"members": members,
190+
}
191+
return await self.client.update_channels_batch(options)
192+
193+
async def update_data(
194+
self, filter: ChannelsBatchFilters, data: ChannelDataUpdate
195+
) -> StreamResponse:
196+
"""
197+
Updates data on channels matching the filter.
198+
199+
:param filter: The filter to match channels.
200+
:param data: Channel data to update.
201+
:return: StreamResponse containing task_id.
202+
"""
203+
options: ChannelsBatchOptions = {
204+
"operation": "updateData",
205+
"filter": filter,
206+
"data": data,
207+
}
208+
return await self.client.update_channels_batch(options)
209+
210+
async def add_filter_tags(
211+
self, filter: ChannelsBatchFilters, tags: List[str]
212+
) -> StreamResponse:
213+
"""
214+
Adds filter tags to channels matching the filter.
215+
216+
:param filter: The filter to match channels.
217+
:param tags: List of filter tags to add.
218+
:return: StreamResponse containing task_id.
219+
"""
220+
options: ChannelsBatchOptions = {
221+
"operation": "addFilterTags",
222+
"filter": filter,
223+
"filter_tags_update": tags,
224+
}
225+
return await self.client.update_channels_batch(options)
226+
227+
async def remove_filter_tags(
228+
self, filter: ChannelsBatchFilters, tags: List[str]
229+
) -> StreamResponse:
230+
"""
231+
Removes filter tags from channels matching the filter.
232+
233+
:param filter: The filter to match channels.
234+
:param tags: List of filter tags to remove.
235+
:return: StreamResponse containing task_id.
236+
"""
237+
options: ChannelsBatchOptions = {
238+
"operation": "removeFilterTags",
239+
"filter": filter,
240+
"filter_tags_update": tags,
241+
}
242+
return await self.client.update_channels_batch(options)

stream_chat/async_chat/client.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import warnings
55
from types import TracebackType
66
from typing import (
7+
TYPE_CHECKING,
78
Any,
89
AsyncContextManager,
910
Callable,
@@ -15,12 +16,17 @@
1516
Union,
1617
cast,
1718
)
19+
20+
if TYPE_CHECKING:
21+
from stream_chat.async_chat.channel_batch_updater import ChannelBatchUpdater
22+
1823
from urllib.parse import urlparse
1924

2025
from stream_chat.async_chat.campaign import Campaign
2126
from stream_chat.async_chat.segment import Segment
2227
from stream_chat.types.base import SortParam
2328
from stream_chat.types.campaign import CampaignData, QueryCampaignsOptions
29+
from stream_chat.types.channel_batch import ChannelsBatchOptions
2430
from stream_chat.types.draft import QueryDraftsFilter, QueryDraftsOptions
2531
from stream_chat.types.segment import (
2632
QuerySegmentsOptions,
@@ -1028,6 +1034,30 @@ async def mark_delivered_simple(
10281034

10291035
return await self.mark_delivered(data)
10301036

1037+
async def update_channels_batch(
1038+
self, options: ChannelsBatchOptions
1039+
) -> StreamResponse:
1040+
"""
1041+
Updates channels in batch based on the provided options.
1042+
1043+
:param options: ChannelsBatchOptions containing operation, filter, and operation-specific data.
1044+
:return: StreamResponse containing task_id.
1045+
"""
1046+
if options is None:
1047+
raise ValueError("options must not be None")
1048+
1049+
return await self.put("channels/batch", data=options)
1050+
1051+
def channel_batch_updater(self) -> "ChannelBatchUpdater":
1052+
"""
1053+
Returns a ChannelBatchUpdater instance for batch channel operations.
1054+
1055+
:return: ChannelBatchUpdater instance.
1056+
"""
1057+
from stream_chat.async_chat.channel_batch_updater import ChannelBatchUpdater
1058+
1059+
return ChannelBatchUpdater(self)
1060+
10311061
async def close(self) -> None:
10321062
await self.session.close()
10331063

stream_chat/base/client.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1302,6 +1302,27 @@ def get_task(
13021302
"""
13031303
pass
13041304

1305+
@abc.abstractmethod
1306+
def update_channels_batch(
1307+
self, options: Any
1308+
) -> Union[StreamResponse, Awaitable[StreamResponse]]:
1309+
"""
1310+
Updates channels in batch based on the provided options.
1311+
1312+
:param options: ChannelsBatchOptions containing operation, filter, and operation-specific data.
1313+
:return: StreamResponse containing task_id.
1314+
"""
1315+
pass
1316+
1317+
@abc.abstractmethod
1318+
def channel_batch_updater(self) -> Any:
1319+
"""
1320+
Returns a ChannelBatchUpdater instance for batch channel operations.
1321+
1322+
:return: ChannelBatchUpdater instance.
1323+
"""
1324+
pass
1325+
13051326
@abc.abstractmethod
13061327
def send_user_custom_event(
13071328
self, user_id: str, event: Dict

0 commit comments

Comments
 (0)