From d4d8f4980cccaa9b6a24edd49122a0b25ae84365 Mon Sep 17 00:00:00 2001 From: shaunak01 Date: Mon, 20 Oct 2025 21:01:07 +0000 Subject: [PATCH 1/2] Checkpoint MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Pre-commit checks: All checks passed ✅ --- helpers/slack_utils.py | 352 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 352 insertions(+) create mode 100644 helpers/slack_utils.py diff --git a/helpers/slack_utils.py b/helpers/slack_utils.py new file mode 100644 index 000000000..1ce2b7ee2 --- /dev/null +++ b/helpers/slack_utils.py @@ -0,0 +1,352 @@ +""" +Slack Analytics Utility Extracts channels, groups, and user activity metrics +from Slack workspace. + +Import as: + +import helpers.slack_utils as hslautil +""" + +import collections +import datetime +import logging +import os +from typing import Any, Dict, List, Optional + +import pandas as pd +import slack_sdk + +_LOG = logging.getLogger(__name__) + + +# ############################################################################# +# SlackAnalytics +# ############################################################################# + + +class SlackAnalytics: + + def __init__(self, token: Optional[str] = None): + """ + Initialize Slack client. + + :param token: Slack bot token + """ + self.token = token or os.getenv("SLACK_BOT_TOKEN") + if not self.token: + raise ValueError( + "Slack token required via parameter or SLACK_BOT_TOKEN env var" + ) + self.client = slack_sdk.WebClient(token=self.token) + self._verify_connection() + + def get_all_channels(self) -> List[Dict[str, Any]]: + """ + Get all public channels in the workspace. + + :return: channel dictionaries with metadata + """ + channels = [] + cursor = None + while True: + response = self.client.conversations_list( + types="public_channel", + exclude_archived=False, + limit=200, + cursor=cursor, + ) + for channel in response["channels"]: + channels.append( + { + "id": channel["id"], + "name": channel["name"], + "is_channel": channel["is_channel"], + "is_archived": channel["is_archived"], + "is_general": channel.get("is_general", False), + "num_members": channel.get("num_members", 0), + "created": channel["created"], + "creator": channel.get("creator", ""), + "topic": channel.get("topic", {}).get("value", ""), + "purpose": channel.get("purpose", {}).get("value", ""), + } + ) + cursor = response.get("response_metadata", {}).get("next_cursor") + if not cursor: + break + _LOG.info("Retrieved %d public channels", len(channels)) + return channels + + def get_all_groups(self) -> List[Dict[str, Any]]: + """ + Get all private channels (groups) in the workspace. + + :return: private channel dictionaries with metadata + """ + groups = [] + cursor = None + while True: + response = self.client.conversations_list( + types="private_channel", + exclude_archived=False, + limit=200, + cursor=cursor, + ) + for group in response["channels"]: + groups.append( + { + "id": group["id"], + "name": group["name"], + "is_private": group.get("is_private", True), + "is_archived": group["is_archived"], + "num_members": group.get("num_members", 0), + "created": group["created"], + "creator": group.get("creator", ""), + "topic": group.get("topic", {}).get("value", ""), + "purpose": group.get("purpose", {}).get("value", ""), + } + ) + cursor = response.get("response_metadata", {}).get("next_cursor") + if not cursor: + break + _LOG.info("Retrieved %d private channels", len(groups)) + return groups + + def get_all_users(self) -> List[Dict[str, Any]]: + """ + Get all users in the workspace. + + :return: user dictionaries with metadata + """ + users = [] + cursor = None + while True: + response = self.client.users_list(cursor=cursor, limit=200) + for user in response["members"]: + if not user["deleted"] and not user["is_bot"]: + users.append( + { + "id": user["id"], + "name": user.get("name", ""), + "real_name": user.get("real_name", ""), + "display_name": user.get("profile", {}).get( + "display_name", "" + ), + "email": user.get("profile", {}).get("email", ""), + "is_admin": user.get("is_admin", False), + "is_owner": user.get("is_owner", False), + "is_primary_owner": user.get( + "is_primary_owner", False + ), + "timezone": user.get("tz", ""), + } + ) + cursor = response.get("response_metadata", {}).get("next_cursor") + if not cursor: + break + _LOG.info("Retrieved %d active users", len(users)) + return users + + def get_channel_messages( + self, channel_id: str, days: int = 30 + ) -> List[Dict[str, Any]]: + """ + Get messages from a specific channel. + + :param channel_id: channel ID to fetch messages from + :param days: number of days to look back + :return: message dictionaries + """ + messages = [] + oldest = ( + datetime.datetime.now() - datetime.timedelta(days=days) + ).timestamp() + cursor = None + while True: + response = self.client.conversations_history( + channel=channel_id, oldest=str(oldest), limit=200, cursor=cursor + ) + messages.extend(response["messages"]) + cursor = response.get("response_metadata", {}).get("next_cursor") + if not cursor: + break + return messages + + def calculate_user_activity( + self, days: int = 30 + ) -> Dict[str, Dict[str, Any]]: + """ + Calculate activity metrics per user across all accessible channels. + + :param days: number of days to analyze + :return: dictionary with user_id as key and activity metrics as + value + """ + user_activity = collections.defaultdict( + lambda: { + "message_count": 0, + "channels_active": set(), + "reactions_given": 0, + "files_shared": 0, + "threads_started": 0, + "replies_count": 0, + } + ) + # Get all channels (public and private that bot has access to). + all_channels = self.get_all_channels() + _LOG.info( + "Analyzing activity across %d channels for last %d days", + len(all_channels), + days, + ) + accessible_channels = 0 + skipped_channels = 0 + for channel in all_channels: + channel_id = channel["id"] + channel_name = channel["name"] + try: + messages = self.get_channel_messages(channel_id, days) + except slack_sdk.errors.SlackApiError as e: + if e.response.get("error") == "not_in_channel": + _LOG.debug("Bot not in channel %s, skipping", channel_id) + skipped_channels += 1 + continue + raise + if not messages: + skipped_channels += 1 + continue + accessible_channels += 1 + _LOG.info( + "Processing channel: %s (%d messages)", + channel_name, + len(messages), + ) + for msg in messages: + user_id = msg.get("user") + if not user_id: + continue + # Count messages. + user_activity[user_id]["message_count"] += 1 + user_activity[user_id]["channels_active"].add(channel_id) + # Count reactions given. + if "reactions" in msg: + for reaction in msg["reactions"]: + if user_id in reaction.get("users", []): + user_activity[user_id]["reactions_given"] += 1 + # Count files shared. + if "files" in msg: + user_activity[user_id]["files_shared"] += len(msg["files"]) + # Count threads started. + if "thread_ts" in msg and msg.get("thread_ts") == msg.get("ts"): + user_activity[user_id]["threads_started"] += 1 + # Count replies. + if "thread_ts" in msg and msg.get("thread_ts") != msg.get("ts"): + user_activity[user_id]["replies_count"] += 1 + _LOG.info( + "Analysis complete: %d accessible channels, %d skipped (bot not member)", + accessible_channels, + skipped_channels, + ) + # Convert sets to counts. + result = {} + for user_id, metrics in user_activity.items(): + result[user_id] = { + "user_id": user_id, + "message_count": metrics["message_count"], + "channels_active_count": len(metrics["channels_active"]), + "reactions_given": metrics["reactions_given"], + "files_shared": metrics["files_shared"], + "threads_started": metrics["threads_started"], + "replies_count": metrics["replies_count"], + "total_activity_score": ( + metrics["message_count"] + + metrics["reactions_given"] * 0.5 + + metrics["files_shared"] * 2 + + metrics["threads_started"] * 1.5 + + metrics["replies_count"] + ), + } + _LOG.info("Calculated activity for %d users", len(result)) + return result + + def get_workspace_stats(self, days: int = 30) -> Dict[str, Any]: + """ + Get comprehensive workspace statistics. + + :param days: number of days to analyze for activity metrics + :return: dictionary with workspace statistics + """ + channels = self.get_all_channels() + groups = self.get_all_groups() + users = self.get_all_users() + activity = self.calculate_user_activity(days) + data = { + "timestamp": datetime.datetime.now().isoformat(), + "analysis_period_days": days, + "channels": { + "total": len(channels), + "active": len([c for c in channels if not c["is_archived"]]), + "archived": len([c for c in channels if c["is_archived"]]), + "list": channels, + }, + "groups": { + "total": len(groups), + "active": len([g for g in groups if not g["is_archived"]]), + "archived": len([g for g in groups if g["is_archived"]]), + "list": groups, + }, + "users": { + "total": len(users), + "admins": len([u for u in users if u["is_admin"]]), + "list": users, + }, + "user_activity": activity, + } + return data + + def get_user_activity_dataframe(self, days: int = 30): + """ + Get user activity as a pandas DataFrame with user names. + + :param days: number of days to analyze + :return: pandas DataFrame with user activity and names + """ + # Get users and activity. + users = self.get_all_users() + activity = self.calculate_user_activity(days) + # Create user lookup dictionary. + user_lookup = {user["id"]: user for user in users} + # Build DataFrame rows. + rows = [] + for user_id, metrics in activity.items(): + user_info = user_lookup.get(user_id, {}) + rows.append( + { + "user_id": user_id, + "name": user_info.get("name", "Unknown"), + "real_name": user_info.get("real_name", "Unknown"), + "display_name": user_info.get("display_name", ""), + "email": user_info.get("email", ""), + "is_admin": user_info.get("is_admin", False), + "message_count": metrics["message_count"], + "channels_active_count": metrics["channels_active_count"], + "reactions_given": metrics["reactions_given"], + "files_shared": metrics["files_shared"], + "threads_started": metrics["threads_started"], + "replies_count": metrics["replies_count"], + "total_activity_score": metrics["total_activity_score"], + } + ) + df = pd.DataFrame(rows) + # Sort by activity score. + df = df.sort_values("total_activity_score", ascending=False).reset_index( + drop=True + ) + _LOG.info("Created DataFrame with %d users", len(df)) + return df + + def _verify_connection(self): + """ + Verify Slack API connection. + """ + response = self.client.auth_test() + _LOG.info("Connected to Slack workspace: %s", response["team"]) From eab744d150398619cbabbae9ee9771f49f233785 Mon Sep 17 00:00:00 2001 From: shaunak01 Date: Tue, 21 Oct 2025 17:56:20 +0000 Subject: [PATCH 2/2] Checkpont MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Pre-commit checks: All checks passed ✅ --- helpers/github_utils.py | 180 +++++++++++---------------------------- helpers/hcache_simple.py | 11 ++- 2 files changed, 62 insertions(+), 129 deletions(-) diff --git a/helpers/github_utils.py b/helpers/github_utils.py index 276ff9f8d..3115c0187 100644 --- a/helpers/github_utils.py +++ b/helpers/github_utils.py @@ -6,7 +6,6 @@ import datetime import functools -import hashlib import itertools import json import logging @@ -21,109 +20,63 @@ import tqdm as td from tqdm import tqdm +import helpers.hcache_simple as hcacsimp import helpers.hdbg as hdbg _LOG = logging.getLogger(__name__) -# ############################################################################# -# GitHubCache -# ############################################################################# - - -class GitHubCache: +def github_cached(cache_type: str = "json", write_through: bool = True): """ - Custom cache that excludes the client object from cache keys. - """ - - def __init__(self, cache_dir: str = "."): - self.cache_dir = cache_dir + Cache decorator specifically for GitHub API functions. - def get(self, func_name: str, args: tuple) -> Any: - """ - Get a value from cache. + Automatically excludes the 'client' parameter (first positional arg) + from cache keys since client instances change across sessions. - :param func_name: name of the function - :param args: function arguments - :return: cached value or None if not found - """ - cache_path = self._get_cache_path(func_name) - # Check if cache file exists. - if not os.path.exists(cache_path): - return None - # Load cache file. - with open(cache_path, "r") as f: - cache_data = json.load(f) - # Generate key and look up value. - key = self._make_key(func_name, args) - return cache_data.get(key) - - def set(self, func_name: str, args: tuple, value: Any) -> None: - """ - Set a value in cache. - - :param func_name: name of the function - :param args: function arguments - :param value: value to cache - """ - cache_path = self._get_cache_path(func_name) - # Load existing cache or create new. - if os.path.exists(cache_path): - with open(cache_path, "r") as f: - cache_data = json.load(f) - else: - cache_data = {} - # Add new entry. - key = self._make_key(func_name, args) - cache_data[key] = value - # Write back to file. - with open(cache_path, "w") as f: - json.dump(cache_data, f, indent=2) - - def _make_key( - self, func_name: str, args: tuple, skip_first: bool = True - ) -> str: - """ - Create a cache key from function name and arguments. - - :param func_name: name of the function - :param args: function arguments - :param skip_first: Skip first argument (client) when building - key - :return: cache key string - """ - # Skip the client argument when building the key. - cache_args = args[1:] if skip_first else args - # Convert arguments to a string representation. - key_parts = [func_name] - for arg in cache_args: - if isinstance(arg, (str, int, float, bool)): - key_parts.append(str(arg)) - elif isinstance(arg, datetime.datetime): - key_parts.append(arg.isoformat()) - elif isinstance(arg, tuple): - # Handle period tuples. - key_parts.append(f"{arg[0].isoformat()}_{arg[1].isoformat()}") - else: - # Hash complex objects. - key_parts.append(hashlib.md5(str(arg).encode()).hexdigest()[:8]) - # Create a hash of the key for consistent length. - full_key = "_".join(key_parts) - res = hashlib.md5(full_key.encode()).hexdigest() - return res + :param cache_type: Type of cache ('json' or 'pickle') + :param write_through: If True, write to disk after each cache update + :return: Decorated function with caching + """ - def _get_cache_path(self, func_name: str) -> str: - """ - Get the cache file path for a function. + def decorator(func: Callable) -> Callable: + # Get function name for cache + func_name = func.__name__ + if func_name.endswith("_intrinsic"): + func_name = func_name[: -len("_intrinsic")] + # SET CACHE TYPE PROPERTY (this was missing!) + existing_type = hcacsimp.get_cache_property("system", func_name, "type") + if not existing_type: + hcacsimp.set_cache_property("system", func_name, "type", cache_type) - :param func_name: name of the function - :return: path to the cache file - """ - path = os.path.join(self.cache_dir, f"cache.{func_name}.json") - return path + # Create a cached version that only uses args after client. + @functools.wraps(func) + def wrapper(client, *args, **kwargs): + # Create cache key from everything EXCEPT client. + cache_key = json.dumps( + {"args": args, "kwargs": kwargs}, + sort_keys=True, + default=str, + ) + # Get cache. + cache = hcacsimp.get_cache(func_name) + # Check if we have cached value. + if cache_key in cache: + _LOG.debug("Cache hit for %s", func_name) + return cache[cache_key] + # Cache miss - call the actual function. + _LOG.debug("Cache miss for %s, fetching from API", func_name) + result = func(client, *args, **kwargs) + # Store in cache + cache[cache_key] = result + # Write to disk if enabled. + if write_through: + hcacsimp.flush_cache_to_disk(func_name) + return result + + return wrapper + + return decorator -# Create global cache instance. -_github_cache = GitHubCache() # ############################################################################# # GitHubAPI @@ -601,36 +554,7 @@ def days_between( return days -def cached_github_function(func: Callable) -> Callable: - """ - Decorator to cache GitHub API functions, excluding client from key. - - The decorated function must have 'client' as the first parameter. - :param func: github API function to be cached - :return: wrapped function with caching - """ - - @functools.wraps(func) - def wrapper(*args, **kwargs): - # Default result is None. - result = None - # Try to get from cache first. - cached_value = _github_cache.get(func.__name__, args) - if cached_value is not None: - _LOG.debug(f"Cache hit for {func.__name__}") - result = cached_value - else: - # Call the actual function if not cached. - _LOG.debug(f"Cache miss for {func.__name__}, fetching from API") - result = func(*args, **kwargs) - # Store in cache. - _github_cache.set(func.__name__, args, result) - return result - - return wrapper - - -@cached_github_function +@github_cached(cache_type="json", write_through=True) def get_commit_datetimes_by_repo_period_intrinsic( client, org: str, @@ -692,7 +616,7 @@ def get_commit_datetimes_by_repo_period_intrinsic( return timestamps -@cached_github_function +@github_cached(cache_type="json", write_through=True) def get_pr_datetimes_by_repo_period_intrinsic( client, org: str, @@ -749,7 +673,7 @@ def get_pr_datetimes_by_repo_period_intrinsic( return timestamps -@cached_github_function +@github_cached(cache_type="json", write_through=True) def get_issue_datetimes_by_repo_intrinsic( client, org: str, @@ -814,7 +738,7 @@ def get_issue_datetimes_by_repo_intrinsic( return result_dict -@cached_github_function +@github_cached(cache_type="json", write_through=True) def get_loc_stats_by_repo_period_intrinsic( client, org: str, @@ -890,7 +814,7 @@ def get_loc_stats_by_repo_period_intrinsic( return stats_list -@cached_github_function +@github_cached(cache_type="json", write_through=True) def get_issue_comment_datetimes_by_repo_period_intrinsic( client, org: str, @@ -962,7 +886,7 @@ def get_issue_comment_datetimes_by_repo_period_intrinsic( return timestamps -@cached_github_function +@github_cached(cache_type="json", write_through=True) def get_pr_review_datetimes_by_repo_period_intrinsic( client, org: str, diff --git a/helpers/hcache_simple.py b/helpers/hcache_simple.py index c45645886..1f748311e 100644 --- a/helpers/hcache_simple.py +++ b/helpers/hcache_simple.py @@ -1,3 +1,9 @@ +""" +Import as: + +import helpers.hcache_simple as hcacsimp +""" + import functools import glob import json @@ -519,7 +525,10 @@ def decorator(func: Callable[..., Any]) -> Callable[..., Any]: func_name = getattr(func, "__name__", "unknown_function") if func_name.endswith("_intrinsic"): func_name = func_name[: -len("_intrinsic")] - set_cache_property("system", func_name, "type", cache_type) + # Only set cache type if not already set (preserve existing setting). + existing_type = get_cache_property("system", func_name, "type") + if not existing_type: + set_cache_property("system", func_name, "type", cache_type) @functools.wraps(func) def wrapper(