diff --git a/app/actors/NewsActor.java b/app/actors/NewsActor.java index b2e7bd0..b0f6292 100644 --- a/app/actors/NewsActor.java +++ b/app/actors/NewsActor.java @@ -76,6 +76,36 @@ public WatchQuery(String query, String sortBy, ActorRef watch } } + /** + * Request to stop watching a news query. + * @author Luan Tran + */ + public static final class UnwatchQuery implements Message { + public final String query; + public final String sortBy; + public final ActorRef watcher; + + public UnwatchQuery(String query, String sortBy, ActorRef watcher) { + this.query = query; + this.sortBy = sortBy; + this.watcher = watcher; + } + } + + /** + * Request to stop all watching for a specific query (from supervisor on errors). + * @author Luan Tran + */ + public static final class StopQuery implements Message { + public final String query; + public final String sortBy; + + public StopQuery(String query, String sortBy) { + this.query = query; + this.sortBy = sortBy; + } + } + /** * Internal message to trigger a fetch for a specific query. *

@@ -132,13 +162,40 @@ static final class FetchComplete implements Message { } } + + /** + * Message to update the API key used by this actor. + * @author Luan Tran + */ + public static final class UpdateApiKey implements Message { + public final String newApiKey; + + public UpdateApiKey(String newApiKey) { + this.newApiKey = newApiKey; + } + } + + /** + * Message to retry a failed query with the updated API key. + * @author Luan Tran + */ + public static final class RetryQuery implements Message { + public final String query; + public final String sortBy; + + public RetryQuery(String query, String sortBy) { + this.query = query; + this.sortBy = sortBy; + } + } + // --- State --- /** WebSocket client for making HTTP requests to the News API */ private final WSClient ws; /** API key for authenticating with the News API */ - private final String apiKey; + private String apiKey; /** Actor context providing logging and self-reference */ private final ActorContext context; @@ -151,28 +208,13 @@ static final class FetchComplete implements Message { */ private final Map queries = new HashMap<>(); + private final ActorRef supervisor; + + /** * Wrapper that holds a Search object and its watchers. * @author Luan Tran */ -// static class QueryInfo { -// /** The Search model object containing query parameters and fetched results */ -// final Search search; -// -// /** Set of UserActors watching this query and expecting updates */ -// final Set> watchers = new HashSet<>(); -// -// /** -// * Constructs a new QueryInfo with the specified parameters. -// * @author Luan Tran -// * @param query the search query string -// * @param sortBy the sort order for results -// * @param apiKey the News API key -// */ -// QueryInfo(String query, String sortBy, String apiKey) { -// this.search = new Search(query, sortBy, apiKey); -// } -// } static class QueryInfo { /** Raw query string for this topic */ final String query; @@ -187,7 +229,11 @@ static class QueryInfo { final Set> watchers = new HashSet<>(); /** - * Constructs a new QueryInfo with the specified parameters. + * Constructs a new QueryInfo with the specified parameters + * @author Luan Tran, SIming Yi + * @param query the search query string + * @param sortBy the sort order for results + * @param apiKey the News API key */ QueryInfo(String query, String sortBy, String apiKey) { this.query = query; @@ -207,9 +253,9 @@ static class QueryInfo { * @return a Behavior that can be spawned as an actor * @author Luan Tran */ - public static Behavior create(WSClient ws, String apiKey) { + public static Behavior create(WSClient ws, String apiKey, ActorRef supervisor) { return Behaviors.setup(context -> - Behaviors.withTimers(timers -> new NewsActor(ws, apiKey, context, timers).behavior()) + Behaviors.withTimers(timers -> new NewsActor(ws, apiKey, context, timers, supervisor).behavior()) ); } @@ -223,11 +269,12 @@ public static Behavior create(WSClient ws, String apiKey) { * @param context the actor context * @param timers the timer scheduler */ - private NewsActor(WSClient ws, String apiKey, ActorContext context, TimerScheduler timers) { + private NewsActor(WSClient ws, String apiKey, ActorContext context, TimerScheduler timers, ActorRef supervisor) { this.ws = ws; this.apiKey = apiKey; this.context = context; this.timers = timers; + this.supervisor = supervisor; } // --- Behavior --- @@ -242,6 +289,10 @@ private Behavior behavior() { .onMessage(WatchQuery.class, this::onWatchQuery) .onMessage(FetchTick.class, this::onFetchTick) .onMessage(FetchComplete.class, this::onFetchComplete) + .onMessage(UpdateApiKey.class, this::onUpdateApiKey) + .onMessage(RetryQuery.class, this::onRetryQuery) + .onMessage(UnwatchQuery.class, this::onUnwatchQuery) + .onMessage(StopQuery.class, this::onStopQuery) .build(); } @@ -268,7 +319,7 @@ private Behavior onWatchQuery(WatchQuery msg) { queryKey, new FetchTick(queryKey), Duration.ZERO, // Start immediately - Duration.ofSeconds(30) + Duration.ofSeconds(10) ); context.getLog().info("Started periodic fetching for query: {}", queryKey); @@ -288,42 +339,10 @@ private Behavior onWatchQuery(WatchQuery msg) { /** * Handles FetchTick messages to trigger periodic news fetches. - * @author Luan + * @author Luan Tran, Siming Yi * @param msg the FetchTick message containing the query key * @return Behaviors.same() to continue with the same behavior */ - /*private Behavior onFetchTick(FetchTick msg) { - QueryInfo info = queries.get(msg.queryKey); - if (info == null) { - // Query was removed, timer should have been cancelled - context.getLog().warn("FetchTick for removed query: {}", msg.queryKey); - return Behaviors.same(); - } - - context.getLog().info("Fetching news for query: {} ({} watchers)", - msg.queryKey, info.watchers.size()); - System.out.println(">>> API CALL TRIGGERED for query: " + msg.queryKey + " at " + java.time.Instant.now()); - - - ActorRef self = context.getSelf(); - - // Reuse the same Search object - fetchResults will update its results - info.search.fetchResults(ws).whenComplete((v, throwable) -> { - if (throwable != null) { - self.tell(new FetchComplete(msg.queryKey, false, throwable.getMessage())); - } else { - System.out.println(">>> API CALL COMPLETED for query: " + msg.queryKey); - self.tell(new FetchComplete(msg.queryKey, true, null)); - } - }); - - return Behaviors.same(); - }*/ - /** - * Handles FetchTick messages to trigger periodic news fetches. - * For each tick, we create a fresh Search snapshot so that UserActor - * can compare "new results" vs "previous results" correctly. - */ private Behavior onFetchTick(FetchTick msg) { QueryInfo info = queries.get(msg.queryKey); if (info == null) { @@ -364,14 +383,21 @@ private Behavior onFetchTick(FetchTick msg) { private Behavior onFetchComplete(FetchComplete msg) { QueryInfo info = queries.get(msg.queryKey); + // ADD THIS: + if (info == null) { + context.getLog().warn("FetchComplete for removed query: {}", msg.queryKey); + return Behaviors.same(); + } + if (!msg.success) { context.getLog().error("Failed to fetch news for query: {}, error: {}", msg.queryKey, msg.error); + int statusCode = extractStatusCode(msg.error); + + // Report failure to supervisor with status code + supervisor.tell(new SupervisorActor.APICallFailed( + info.query, info.sortBy, msg.error, statusCode)); - // Notify all watchers of the failure - for (ActorRef watcher : info.watchers) { - watcher.tell(new UserActor.NewsFetchFailed(info.search.getRawQuery(), msg.error)); - } } else { int resultCount = info.search.getResults() != null ? info.search.getResults().size() : 0; context.getLog().info("News fetched successfully for query: {} - {} articles, notifying {} watchers", @@ -386,6 +412,102 @@ private Behavior onFetchComplete(FetchComplete msg) { return Behaviors.same(); } + /** + * Handles UpdateApiKey messages to rotate the API key. + * @author Luan Tran + * @param msg the UpdateApiKey message containing the new key + */ + private Behavior onUpdateApiKey(UpdateApiKey msg) { + context.getLog().info("Updating API key"); + this.apiKey = msg.newApiKey; + return Behaviors.same(); + } + + /** + * Handles RetryQuery messages to retry a failed query with the new API key. + * @author Luan Tran + * @param msg the RetryQuery message containing query details + */ + private Behavior onRetryQuery(RetryQuery msg) { + String queryKey = makeQueryKey(msg.query, msg.sortBy); + QueryInfo info = queries.get(queryKey); + + if (info == null) { + context.getLog().warn("RetryQuery for unknown query: {}", queryKey); + return Behaviors.same(); + } + + context.getLog().info("Retrying query {} with new API key", queryKey); + + // Trigger an immediate fetch with the new API key + ActorRef self = context.getSelf(); + + // Update the QueryInfo's search with new API key + Search newSearch = new Search(info.query, info.sortBy, apiKey); + info.search = newSearch; + + newSearch.fetchResults(ws).whenComplete((v, throwable) -> { + if (throwable != null) { + self.tell(new FetchComplete(queryKey, false, throwable.getMessage())); + } else { + self.tell(new FetchComplete(queryKey, true, null)); + } + }); + + return Behaviors.same(); + } + + /** + * Handles UnwatchQuery messages to remove a watcher from a query. + * @author Luan Tran + * @param msg the UnwatchQuery message containing query details and watcher reference + */ + private Behavior onUnwatchQuery(UnwatchQuery msg) { + String queryKey = makeQueryKey(msg.query, msg.sortBy); + context.getLog().info("User unwatching query: {}", queryKey); + + QueryInfo info = queries.get(queryKey); + + // Remove this watcher + info.watchers.remove(msg.watcher); + context.getLog().info("Query {} now has {} watchers", queryKey, info.watchers.size()); + + // If no more watchers, stop fetching and clean up + if (info.watchers.isEmpty()) { + context.getLog().info("No more watchers for query: {}, stopping timer", queryKey); + timers.cancel(queryKey); + queries.remove(queryKey); + } + + return Behaviors.same(); + } + + /** + * Handles StopQuery messages to stop all fetching for a query due to errors. + * @author Luan Tran + * @param msg the StopQuery message containing query details + */ + private Behavior onStopQuery(StopQuery msg) { + String queryKey = makeQueryKey(msg.query, msg.sortBy); + context.getLog().info("Stopping query due to error: {}", queryKey); + + QueryInfo info = queries.get(queryKey); + if (info == null) { + context.getLog().warn("Stop request for unknown query: {}", queryKey); + return Behaviors.same(); + } + + // Stop the timer + timers.cancel(queryKey); + + // Remove all watchers and the query + context.getLog().info("Stopped fetching for query: {} ({} watchers removed)", + queryKey, info.watchers.size()); + queries.remove(queryKey); + + return Behaviors.same(); + } + /** * Creates a unique key for a query based on its parameters. * @author Luan Tran @@ -396,4 +518,18 @@ private Behavior onFetchComplete(FetchComplete msg) { private String makeQueryKey(String query, String sortBy) { return query + "-" + sortBy; } + + /** + * Extracts HTTP status code from exception message. + * @author Luan Tran + * @param errorMessage the exception to extract from + * @return the HTTP status code, or 0 if not found + */ + private int extractStatusCode(String errorMessage) { + if (errorMessage.contains("429")) return 429; + if (errorMessage.contains("401")) return 401; + if (errorMessage.contains("403")) return 403; + + return 0; + } } \ No newline at end of file diff --git a/app/actors/SupervisorActor.java b/app/actors/SupervisorActor.java index c276924..bb12615 100644 --- a/app/actors/SupervisorActor.java +++ b/app/actors/SupervisorActor.java @@ -15,6 +15,7 @@ import java.util.List; import java.util.Map; import java.time.Duration; +import java.util.concurrent.atomic.AtomicInteger; /** * Parent actor that manages the creation of individual UserActors for each WebSocket connection. @@ -106,8 +107,10 @@ public OtherActors(ActorRef sourceActor, ActorRef create(WSClient wsClient, String apiKey, AsyncCacheApi asyncCache) { - return Behaviors.setup(context -> { - // create source actor - ActorRef sourceActor = context.spawn( - SourceActor.create(wsClient), - "sourceActor" - ); - // create sourceInfo actor - ActorRef sourcesInfoActor = context.spawn( - SourcesInfoActor.create(wsClient, apiKey), - "sourcesInfoActor" - ); - context.getLog().info("sourceActor and sourcesInfoActor spawned and ready"); - - // Spawn a single NewsActor that will be shared by all UserActors - ActorRef newsActor = context.spawn( - NewsActor.create(wsClient, apiKey), "newsActor" - ); - context.getLog().info("NewsActor spawned and ready"); - - // Spawn a single ReadabilityActor that will be shared by all UserActors - ActorRef readabilityActor = context.spawn( - ReadabilityActor.create(), "readabilityActor" - ); - - context.getLog().info("ReadabilityActor spawned and ready"); - - // Create a single shared SentimentAnalyzerActor for the entire application - ActorRef sentimentActor = context.spawn( - SentimentAnalyzerActor.create(), - "sentimentAnalyzerActor" - ); - context.getLog().info("sentimentAnalyzerActor spawned and ready"); - // Registry to track active UserActors by session ID - Map> userActors = new HashMap<>(); - - - - return Behaviors.receive(Message.class) - .onMessage(Create.class, create -> { - context.getLog().info("Received Create request for session: {}", create.id); - - // Check if UserActor already exists for this session - ActorRef existingActor = userActors.get(create.id); - - if (existingActor != null) { - // Reuse existing UserActor - reconnect the WebSocket - context.getLog().info("REUSING existing UserActor for session: {}", create.id); - existingActor.tell(new UserActor.Reconnect(create.replyTo)); - } else { - // Spawn a new UserActor for this session - context.getLog().info("Creating NEW UserActor for session: {}", create.id); - - ActorRef child = context.spawn( - UserActor.create(create.id, asyncCache, newsActor, readabilityActor, sentimentActor), - "userActor-" + create.id - ); - - // Register the new actor - userActors.put(create.id, child); - - // Watch the actor to know when it stops (only happens after timeout or explicit stop) - context.watchWith(child, new UserActorStopped(create.id)); - - // Ask the child to establish the stream - child.tell(new UserActor.Connect(create.replyTo)); - } - return Behaviors.same(); - }) - .onMessage(GetStats.class, msg -> { - context.getLog().info("Received GetStats request for session: {}, query: {}", - msg. userId, msg.query); - - ActorRef userActor = userActors.get(msg.userId); - - if (userActor == null) { - context. getLog().info("No UserActor found for session: {}", msg.userId); - msg.replyTo.tell(new StatsResponse(false, msg.query, List.of())); - } else { - ActorRef adapter = context. messageAdapter( - UserActor. StatsResult.class, - result -> new UserStatsResponse(result, msg.replyTo) - ); - userActor.tell(new UserActor.GetStats(msg.query, adapter)); - } - - return Behaviors.same(); - }) - .onMessage(UserStatsResponse.class, msg -> { - context.getLog().info("Received stats response for query: {}", msg. result.query()); - - msg.originalReplyTo.tell(new StatsResponse( - msg.result.found(), - msg.result.query(), - msg.result.sortedCounts() - )); - - return Behaviors.same(); - }) - .onMessage(GetOtherActors.class, msg -> { - msg.replyTo.tell(new OtherActors(sourceActor, sourcesInfoActor)); - return Behaviors.same(); - }) - .onMessage(UserActorStopped.class, stopped -> { - context.getLog(). info("UserActor terminated for session: {}, removing from registry", stopped. id); + /** + * Message indicating that a news fetch operation failed. + * @author Luan Tran + */ + public static final class APICallFailed implements Message { + public final String query; + public final String sortBy; + public final String errorMessage; + public final int statusCode; + + public APICallFailed(String query, String sortBy, String errorMessage, int statusCode) { + this.query = query; + this.sortBy = sortBy; + this.errorMessage = errorMessage; + this.statusCode = statusCode; + } + } + + /** + * Creates the SupervisorActor behavior that manages UserActors and a shared NewsActor. + * + * @param wsClient WebSocket client to pass to NewsActor + * @return Behavior for the SupervisorActor + * @author Nattamon Paiboon, Luan Tran + * + */ + public static Behavior create(WSClient wsClient) { + return Behaviors.setup(context -> { + // Get reference to self for passing to NewsActor + ActorRef self = context.getSelf(); + + // Load all API keys from configuration + String[] apiKeys = loadApiKeys(); +// int[] currentKeyIndex = {0}; + AtomicInteger currentKeyIndex = new AtomicInteger(0); + + String apiKey = apiKeys[currentKeyIndex.get()]; + + // create source actor + ActorRef sourceActor = context.spawn( + SourceActor.create(wsClient), + "sourceActor" + ); + // create sourceInfo actor + ActorRef sourcesInfoActor = context.spawn( + SourcesInfoActor.create(wsClient, apiKey), + "sourcesInfoActor" + ); + context.getLog().info("sourceActor and sourcesInfoActor spawned and ready"); + + // Spawn a single NewsActor that will be shared by all UserActors + ActorRef newsActor = context.spawn( + NewsActor.create(wsClient, apiKey, self), "newsActor" + ); + context.getLog().info("NewsActor spawned and ready using APIKEY # {}: {}", currentKeyIndex, apiKey); + + // Spawn a single ReadabilityActor that will be shared by all UserActors + ActorRef readabilityActor = context.spawn( + ReadabilityActor.create(), "readabilityActor" + ); + + context.getLog().info("ReadabilityActor spawned and ready"); + + // Create a single shared SentimentAnalyzerActor for the entire application + ActorRef sentimentActor = context.spawn( + SentimentAnalyzerActor.create(), + "sentimentAnalyzerActor" + ); + context.getLog().info("sentimentAnalyzerActor spawned and ready"); + // Registry to track active UserActors by session ID + Map> userActors = new HashMap<>(); + + + return Behaviors.receive(Message.class) + .onMessage(Create.class, create -> { + context.getLog().info("Received Create request for session: {}", create.id); + + // Check if UserActor already exists for this session + ActorRef existingActor = userActors.get(create.id); + + if (existingActor != null) { + // Reuse existing UserActor - reconnect the WebSocket + context.getLog().info("REUSING existing UserActor for session: {}", create.id); + existingActor.tell(new UserActor.Reconnect(create.replyTo)); + } else { + // Spawn a new UserActor for this session + context.getLog().info("Creating NEW UserActor for session: {}", create.id); + + ActorRef child = context.spawn( + UserActor.create(create.id, newsActor, readabilityActor, sentimentActor), + "userActor-" + create.id + ); + + // Register the new actor + userActors.put(create.id, child); + + // Watch the actor to know when it stops (only happens after timeout or explicit stop) + context.watchWith(child, new UserActorStopped(create.id)); + + // Ask the child to establish the stream + child.tell(new UserActor.Connect(create.replyTo)); + } + return Behaviors.same(); + }) + .onMessage(APICallFailed.class, msg -> { + context.getLog().warn("News fetch failed for query '{}' (sortBy: {}): {} (status: {})", + msg.query, msg.sortBy, msg.errorMessage, msg.statusCode); + + if (msg.statusCode == 429 || msg.statusCode == 401) { + String errorMessage = ""; + if(msg.statusCode == 429){ + errorMessage = "Rate limit hit!"; + }else { + errorMessage = "Incorrect API KEY!"; + } + // Rotate API key + //Using an array to bypass the lambdas final immutability restruictions + int nextIndex = currentKeyIndex.updateAndGet(i -> (i + 1) % apiKeys.length); + String newApiKey = apiKeys[nextIndex]; + + context.getLog().info("{}} Rotating to API key index {} and retrying query: {}", + errorMessage, nextIndex, msg.query); + + // Update NewsActor with new key and retry + newsActor.tell(new NewsActor.UpdateApiKey(newApiKey)); + newsActor.tell(new NewsActor.RetryQuery(msg.query, msg.sortBy)); + } else { + // For non-429 errors, notify all UserActors + context.getLog().warn("Non-recoverable error for query '{}', stopping all fetches", msg.query); + + // Stop the query in NewsActor + newsActor.tell(new NewsActor.StopQuery(msg.query, msg.sortBy)); + + for (ActorRef userActor : userActors.values()) { + userActor.tell(new UserActor.NewsFetchFailed(msg.query, msg.errorMessage)); + } + } + + return Behaviors.same(); + }) + .onMessage(GetStats.class, msg -> { + context.getLog().info("Received GetStats request for session: {}, query: {}", + msg.userId, msg.query); + + ActorRef userActor = userActors.get(msg.userId); + + if (userActor == null) { + context.getLog().info("No UserActor found for session: {}", msg.userId); + msg.replyTo.tell(new StatsResponse(false, msg.query, List.of())); + } else { + ActorRef adapter = context.messageAdapter( + UserActor.StatsResult.class, + result -> new UserStatsResponse(result, msg.replyTo) + ); + userActor.tell(new UserActor.GetStats(msg.query, adapter)); + } + + return Behaviors.same(); + }) + .onMessage(UserStatsResponse.class, msg -> { + context.getLog().info("Received stats response for query: {}", msg.result.query()); + + msg.originalReplyTo.tell(new StatsResponse( + msg.result.found(), + msg.result.query(), + msg.result.sortedCounts() + )); + + return Behaviors.same(); + }) + .onMessage(GetOtherActors.class, msg -> { + msg.replyTo.tell(new OtherActors(sourceActor, sourcesInfoActor)); + return Behaviors.same(); + }) + .onMessage(UserActorStopped.class, stopped -> { + context.getLog().info("UserActor terminated for session: {}, removing from registry", stopped.id); userActors.remove(stopped.id); return Behaviors.same(); }) - .build(); - }); - } + .build(); + }); + } + + /** + * Loads API keys from configuration. + * @author Luan Tran + * @return array of API keys for rotation + */ + private static String[] loadApiKeys() { + // Load from application.conf + com.typesafe.config.Config config = com.typesafe.config.ConfigFactory.load(); + List keys = config.getStringList("api.keys"); + return keys.toArray(new String[0]); + } } \ No newline at end of file diff --git a/app/actors/UserActor.java b/app/actors/UserActor.java index 3a59347..644f439 100644 --- a/app/actors/UserActor.java +++ b/app/actors/UserActor.java @@ -3,7 +3,7 @@ import org.apache.pekko.Done; import org.apache.pekko.NotUsed; import org.apache.pekko.actor.typed.ActorRef; -import org.apache.pekko.actor.typed. Behavior; +import org.apache.pekko.actor.typed.Behavior; import org.apache.pekko.actor.typed.javadsl.ActorContext; import org.apache.pekko.actor.typed.javadsl.AskPattern; import org.apache.pekko.actor.typed.javadsl.Behaviors; @@ -31,23 +31,29 @@ /** * Actor managing WebSocket connections and search sessions for individual users. * Maintains search history, handles real-time news updates, and supports session persistence across reconnections. + * * @author Luan Tran, Nattamon Paiboon */ public class UserActor { // --- Messages --- + /** * Base interface for all UserActor messages + * * @author Nattamon Paiboon */ - public interface Message {} + public interface Message { + } /** * Message to establish initial WebSocket connection + * * @author Nattamon Paiboon */ public static final class Connect implements Message { final ActorRef> replyTo; + public Connect(ActorRef> replyTo) { this.replyTo = replyTo; } @@ -55,10 +61,12 @@ public Connect(ActorRef> replyTo) { /** * Message to handle WebSocket reconnection + * * @author Luan Tran */ public static final class Reconnect implements Message { final ActorRef> replyTo; + public Reconnect(ActorRef> replyTo) { this.replyTo = replyTo; } @@ -66,10 +74,12 @@ public Reconnect(ActorRef> replyTo) { /** * Message containing news search results from NewsActor + * * @author Luan Tran */ public static final class NewsResults implements Message { public final Search search; + public NewsResults(Search search) { this.search = search; } @@ -77,11 +87,13 @@ public NewsResults(Search search) { /** * Message indicating news fetch operation failed + * * @author Luan Tran */ public static final class NewsFetchFailed implements Message { public final String query; public final String errorMessage; + public NewsFetchFailed(String query, String errorMessage) { this.query = query; this.errorMessage = errorMessage; @@ -90,39 +102,47 @@ public NewsFetchFailed(String query, String errorMessage) { public static class ReadabilityResult implements UserActor.Message { public final List updatedSearches; + public ReadabilityResult(List searches) { this.updatedSearches = searches; } } - /** + /** * Request to compute stats for a query (from SupervisorActor) + * * @author Dara Cunningham */ public static final class GetStats implements Message { public final String query; public final ActorRef replyTo; + public GetStats(String query, ActorRef replyTo) { this.query = query; this.replyTo = replyTo; } } - /** + + /** * Response containing stats computation result. * Implements Message so StatsActor can send it directly to UserActor. + * * @author Dara Cunningham */ public record StatsResult( - boolean found, - String query, - List> sortedCounts - ) implements Message {} + boolean found, + String query, + List> sortedCounts + ) implements Message { + } /** * Internal message to handle WebSocket disconnection + * * @author Nattamon Paiboon */ - private static final class InternalStop implements Message {} + private static final class InternalStop implements Message { + } // --- State --- private final String id; @@ -130,21 +150,20 @@ private static final class InternalStop implements Message {} // private final String apiKey; private final ActorRef newsActor; private final ActorRef rActor; - private final ActorRef statsActor; + private final ActorRef statsActor; private final ActorContext context; private List searchHistory = new ArrayList<>(); private final Set watchedQueries = new HashSet<>(); private final Map searchInstances = new HashMap<>(); - private final Map> pendingStatsRequests = new HashMap<>(); + private final Map> pendingStatsRequests = new HashMap<>(); private final Sink hubSink; private final Flow websocketFlow; - /** Child actor that performs sentiment analysis for this user. */ + /** + * Child actor that performs sentiment analysis for this user. + */ private final ActorRef sentimentActor; - // TODO rm cache - private final AsyncCacheApi asyncCache; - // Helper class to track each search instance private static class SearchInstance { final String searchId; @@ -162,53 +181,54 @@ private static class SearchInstance { } // --- Factory --- + /** * Creates a new UserActor behavior - * @param id unique identifier for the user - * @param asyncCache cache for storing user data - * @param newsActor reference to the NewsActor for fetching news - * @param rActor reference to the ReadbailityActor for getting readability + * + * @param id unique identifier for the user + * @param newsActor reference to the NewsActor for fetching news + * @param rActor reference to the ReadbailityActor for getting readability * @return behavior for handling user-specific messages * @author Nattamon Paiboon */ // TODO rm cache - public static Behavior create(String id, AsyncCacheApi asyncCache, ActorRef newsActor, ActorRef rActor, ActorRef sentimentActor) { - return Behaviors.setup(context -> new UserActor(id, asyncCache, context, newsActor, rActor, sentimentActor).behavior()); + public static Behavior create(String id, ActorRef newsActor, ActorRef rActor, ActorRef sentimentActor) { + return Behaviors.setup(context -> new UserActor(id, context, newsActor, rActor, sentimentActor).behavior()); } // --- Constructor --- + /** * Constructs a UserActor with WebSocket flow and message handling setup - * @param id unique identifier for the user - * @param asyncCache cache for storing user data - * @param context actor context for this user actor - * @param newsActor reference to the NewsActor for fetching news - * @param rActor reference to the ReadbailityActor for getting readability + * + * @param id unique identifier for the user + * @param context actor context for this user actor + * @param newsActor reference to the NewsActor for fetching news + * @param rActor reference to the ReadbailityActor for getting readability * @author Nattamon Paiboon */ // TODO rm cache - private UserActor(String id, AsyncCacheApi asyncCache, ActorContext context, ActorRef newsActor, ActorRef rActor, ActorRef sentimentActor) { + private UserActor(String id, ActorContext context, ActorRef newsActor, ActorRef rActor, ActorRef sentimentActor) { this.id = id; this.context = context; - this.asyncCache = asyncCache; this.newsActor = newsActor; this.rActor = rActor; this.sentimentActor = sentimentActor; - Materializer mat = Materializer.matFromSystem(context.getSystem()); + Materializer mat = Materializer.matFromSystem(context.getSystem()); - // Spawn StatsActor - this.statsActor = context.spawn(StatsActor.create(), "statsActor-" + id); + // Spawn StatsActor + this.statsActor = context.spawn(StatsActor.create(), "statsActor-" + id); ActorRef self = context.getSelf(); - Pair, Source> sinkSourcePair = - MergeHub.of(JsonNode.class, 16) - .toMat(BroadcastHub.of(JsonNode.class, 256), Keep.both()) - .run(mat); + Pair, Source> sinkSourcePair = + MergeHub.of(JsonNode.class, 16) + .toMat(BroadcastHub.of(JsonNode.class, 256), Keep.both()) + .run(mat); - this.hubSink = sinkSourcePair.first(); - Source hubSource = sinkSourcePair.second(); + this.hubSink = sinkSourcePair.first(); + Source hubSource = sinkSourcePair.second(); // Handle incoming messages from the WebSocket Sink> jsonSink = Sink.foreach((JsonNode json) -> { @@ -238,19 +258,19 @@ private UserActor(String id, AsyncCacheApi asyncCache, ActorContext con } }); - // ping websocket to keep it alive - Source heartbeat = Source.tick( - Duration.ZERO, - Duration.ofSeconds(30), - (JsonNode) Json.newObject().put("type", "ping") - ).mapMaterializedValue(c -> NotUsed.getInstance()); - - this.websocketFlow = Flow.fromSinkAndSourceCoupled(jsonSink, hubSource.merge(heartbeat)) - .watchTermination((n, stage) -> { - stage.whenComplete((done, throwable) -> self.tell(new InternalStop())); - return NotUsed.getInstance(); - }); - } + // ping websocket to keep it alive + Source heartbeat = Source.tick( + Duration.ZERO, + Duration.ofSeconds(30), + (JsonNode) Json.newObject().put("type", "ping") + ).mapMaterializedValue(c -> NotUsed.getInstance()); + + this.websocketFlow = Flow.fromSinkAndSourceCoupled(jsonSink, hubSource.merge(heartbeat)) + .watchTermination((n, stage) -> { + stage.whenComplete((done, throwable) -> self.tell(new InternalStop())); + return NotUsed.getInstance(); + }); + } // --- Behavior --- private Behavior behavior() { @@ -260,7 +280,7 @@ private Behavior behavior() { .onMessage(NewsResults.class, this::onNewsResults) .onMessage(NewsFetchFailed.class, this::onNewsFetchFailed) .onMessage(ReadabilityResult.class, this::onReadabilityResult) - .onMessage(GetStats.class, this::onGetStats) + .onMessage(GetStats.class, this::onGetStats) .onMessage(StatsResult.class, this::onStatsResult) .onMessage(InternalStop.class, this::onInternalStop) .build(); @@ -314,11 +334,12 @@ private Behavior onReadabilityResult(ReadabilityResult msg) { /** * Handles initial WebSocket connection from client + * * @param msg the connection message containing the reply actor reference * @return the same behavior to continue processing messages * @author Luan Tran */ - private Behavior onConnect(UserActor.Connect msg){ + private Behavior onConnect(UserActor.Connect msg) { context.getLog().info("Client connected: {}", id); msg.replyTo.tell(websocketFlow); return Behaviors.same(); @@ -326,11 +347,12 @@ private Behavior onConnect(UserActor.Connect msg){ /** * Handles WebSocket reconnection and restores session state + * * @param msg the reconnection message containing the reply actor reference * @return the same behavior to continue processing messages * @author Luan Tran */ - private Behavior onReconnect(UserActor.Reconnect msg){ + private Behavior onReconnect(UserActor.Reconnect msg) { context.getLog().info("Client reconnected: {} ({} searches, {} instances)", id, searchHistory.size(), searchInstances.size()); // Send the flow back @@ -349,6 +371,7 @@ private Behavior onReconnect(UserActor.Reconnect msg){ /** * Handles incoming news results and updates all matching search instances + * * @param msg the news results message containing the search data * @return the same behavior to continue processing messages * @author Luan Tran @@ -368,6 +391,7 @@ private Behavior onNewsResults(UserActor.NewsResults msg) { /** * Handles news fetch failures and sends error to client + * * @param msg the failure message containing query and error details * @return the same behavior to continue processing messages * @author Luan Tran @@ -384,57 +408,59 @@ private Behavior onNewsFetchFailed(UserActor.NewsFetchFailed return Behaviors.same(); } - /** + /** * Handles GetStats request by delegating to StatsActor + * * @author Dara Cunningham */ - private Behavior onGetStats(GetStats msg) { - context.getLog().info("User {} requesting stats for: {}", id, msg.query); + private Behavior onGetStats(GetStats msg) { + context.getLog().info("User {} requesting stats for: {}", id, msg.query); - Optional searchOpt = searchHistory.stream() - .filter(s -> s.getRawQuery().equals(msg.query)) - .findFirst(); + Optional searchOpt = searchHistory.stream() + .filter(s -> s.getRawQuery().equals(msg.query)) + .findFirst(); - if (searchOpt.isEmpty()) { - context.getLog().info("Search not found for query: {}", msg.query); - msg.replyTo.tell(new StatsResult(false, msg.query, List.of())); - } - else { - Search search = searchOpt.get(); - List

articles = search.getResults(); + if (searchOpt.isEmpty()) { + context.getLog().info("Search not found for query: {}", msg.query); + msg.replyTo.tell(new StatsResult(false, msg.query, List.of())); + } else { + Search search = searchOpt.get(); + List
articles = search.getResults(); - context.getLog().info("Delegating stats computation for {} articles to StatsActor", articles.size()); + context.getLog().info("Delegating stats computation for {} articles to StatsActor", articles.size()); - // Store the pending request so we can forward the response - pendingStatsRequests.put(msg.query, msg.replyTo); + // Store the pending request so we can forward the response + pendingStatsRequests.put(msg.query, msg.replyTo); - // Delegate to StatsActor - it will reply directly with StatsResult - statsActor.tell(new StatsActor.ComputeStats(msg.query, articles, context.getSelf())); - } + // Delegate to StatsActor - it will reply directly with StatsResult + statsActor.tell(new StatsActor.ComputeStats(msg.query, articles, context.getSelf())); + } - return Behaviors.same(); - } + return Behaviors.same(); + } - /** + /** * Handles StatsResult from StatsActor and forwards to original requester + * * @author Dara Cunningham */ private Behavior onStatsResult(StatsResult msg) { - context. getLog().info("Received stats result for query: {} (found: {})", msg.query(), msg. found()); + context.getLog().info("Received stats result for query: {} (found: {})", msg.query(), msg.found()); ActorRef originalReplyTo = pendingStatsRequests.remove(msg.query()); if (originalReplyTo != null) { originalReplyTo.tell(msg); } else { - context.getLog(). warn("No pending stats request found for query: {}", msg.query()); + context.getLog().warn("No pending stats request found for query: {}", msg.query()); } - return Behaviors. same(); + return Behaviors.same(); } /** * Handles WebSocket disconnection while preserving actor state for reconnection + * * @param msg the internal stop message * @return the same behavior to keep actor alive * @author Luan Tran @@ -453,6 +479,7 @@ private Behavior onInternalStop(UserActor.InternalStop msg) { /** * Builds a unique query key from query text and sort order + * * @author Luan Tran */ private String buildQueryKey(String query, String sortBy) { @@ -461,7 +488,8 @@ private String buildQueryKey(String query, String sortBy) { /** * Processes all search instances matching the given query key and updates them - * @param search the search object containing new results + * + * @param search the search object containing new results * @param queryKey the unique key identifying the query and sort order * @return true if any instance was updated, false otherwise * @author Luan Tran @@ -488,8 +516,9 @@ private boolean processMatchingInstances(Search search, String queryKey) { /** * Processes a single search instance, either creating new entry or updating existing + * * @param instance the search instance to process - * @param search the search object containing new results + * @param search the search object containing new results * @return true if the instance was updated, false otherwise * @author Luan Tran */ @@ -503,8 +532,9 @@ private boolean processSearchInstance(SearchInstance instance, Search search) { /** * Handles the first result for a new search instance + * * @param instance the search instance receiving its first result - * @param search the search object to add to history + * @param search the search object to add to history * @return true indicating the instance was created * @author Luan Tran */ @@ -519,6 +549,7 @@ private boolean handleFirstResult(SearchInstance instance, Search search) { /** * Shifts history indices for all instances after inserting a new entry at the front + * * @param excludeInstance the instance to exclude from the shift operation * @author Luan Tran */ @@ -534,6 +565,7 @@ private void shiftExistingIndices(SearchInstance excludeInstance) { /** * Trims search history to maximum size and removes orphaned instances + * * @author Luan Tran */ private void trimHistoryIfNeeded() { @@ -557,8 +589,9 @@ private void trimHistoryIfNeeded() { /** * Handles an update to an existing search instance + * * @param instance the search instance to update - * @param search the search object containing new results + * @param search the search object containing new results * @return true if new articles were found, false otherwise * @author Luan Tran */ @@ -575,8 +608,9 @@ private boolean handleUpdateResult(SearchInstance instance, Search search) { /** * Updates a search instance with newly discovered articles - * @param instance the search instance to update - * @param search the search object containing new articles + * + * @param instance the search instance to update + * @param search the search object containing new articles * @param newArticleCount the number of new articles discovered * @author Luan Tran */ @@ -593,6 +627,7 @@ private void updateSearchWithNewArticles(SearchInstance instance, Search search, /** * Pushes current search history to connected WebSocket clients + * * @author Luan Tran */ private void pushUpdatesToClient() { diff --git a/app/controllers/HomeController.java b/app/controllers/HomeController.java index 1f65237..6623e6a 100644 --- a/app/controllers/HomeController.java +++ b/app/controllers/HomeController.java @@ -69,7 +69,7 @@ public HomeController(WSClient ws, AsyncCacheApi asyncCache, Config config, Acto this.actorSystem = actorSystem; // TODO rm cache this.supervisorActor = actorSystem.systemActorOf( - SupervisorActor.create(ws, apiKey, asyncCache), "SupervisorActor", Props.empty() + SupervisorActor.create(ws), "SupervisorActor", Props.empty() ); CompletionStage futureActors = AskPattern.ask( diff --git a/conf/application.conf b/conf/application.conf index 154352d..6a91315 100644 --- a/conf/application.conf +++ b/conf/application.conf @@ -4,4 +4,14 @@ play.modules.enabled += "play.api.cache.ehcache.EhCacheModule" play.modules.enabled += "modules.PekkoModule" -api.key = "a482d6d109894277b9ec6a6d2f4e83e9" \ No newline at end of file +# api.key = "a482d6d109894277b9ec6a6d2f4e83e9" + +api { + key = "3e9c1135829642eeb8e0fe4112f69e82" + keys = [ +# Uncomment this to test error handling of 401 +# "test" + "a482d6d109894277b9ec6a6d2f4e83e9", + "1002d5e24b1a4a0b92719ed6281694a9", + ] +} \ No newline at end of file diff --git a/test/actors/NewsActorTest.java b/test/actors/NewsActorTest.java index 840f998..330b80d 100644 --- a/test/actors/NewsActorTest.java +++ b/test/actors/NewsActorTest.java @@ -52,7 +52,9 @@ public void teardown() { public void singleWatcher_receivesNewsResults() { TestProbe probe = testKit.createTestProbe(UserActor.Message.class); - ActorRef actor = testKit.spawn(NewsActor.create(wsMock, "dummyKey")); + TestProbe probe3 = testKit.createTestProbe(SupervisorActor.Message.class); + + ActorRef actor = testKit.spawn(NewsActor.create(wsMock, "dummyKey", probe3.getRef())); actor.tell(new NewsActor.WatchQuery("query1", "popularity", probe.getRef())); actor.tell(new NewsActor.FetchComplete("query1-popularity", true, null)); @@ -69,7 +71,8 @@ public void singleWatcher_receivesNewsResults() { public void multipleWatchers_allReceiveNewsResults() { TestProbe probe1 = testKit.createTestProbe(UserActor.Message.class); TestProbe probe2 = testKit.createTestProbe(UserActor.Message.class); - ActorRef actor = testKit.spawn(NewsActor.create(wsMock, "dummyKey")); + TestProbe probe3 = testKit.createTestProbe(SupervisorActor.Message.class); + ActorRef actor = testKit.spawn(NewsActor.create(wsMock, "dummyKey", probe3.getRef())); actor.tell(new NewsActor.WatchQuery("query1", "popularity", probe1.getRef())); actor.tell(new NewsActor.WatchQuery("query1", "popularity", probe2.getRef())); @@ -88,17 +91,27 @@ public void multipleWatchers_allReceiveNewsResults() { */ @Test public void fetchCompleteFailure_notifiesWatchers() { - TestProbe probe = testKit.createTestProbe(UserActor.Message.class); - ActorRef actor = testKit.spawn(NewsActor.create(wsMock, "dummyKey")); + TestProbe userProbe = testKit.createTestProbe(UserActor.Message.class); + TestProbe supervisorProbe = testKit.createTestProbe(SupervisorActor.Message.class); - actor.tell(new NewsActor.WatchQuery("query1", "popularity", probe.getRef())); - // Wait for initial fetch to complete - probe.receiveMessage(Duration.ofSeconds(5)); + ActorRef actor = testKit.spawn( + NewsActor.create(wsMock, "dummyKey", supervisorProbe.getRef()) + ); - actor.tell(new NewsActor.FetchComplete("query1-popularity", false, "API error")); + // Register a watcher + actor.tell(new NewsActor.WatchQuery("query1", "popularity", userProbe.getRef())); - UserActor.Message msg = probe.receiveMessage(); - assertTrue(msg instanceof UserActor.NewsFetchFailed); + // Simulate a fetch failure with a non-429 error + actor.tell(new NewsActor.FetchComplete("query1-popularity", false, "API Error 500")); + + // Now the failure should be sent to supervisor, NOT to the user + SupervisorActor.Message msg = supervisorProbe.receiveMessage(); + assertTrue(msg instanceof SupervisorActor.APICallFailed); + + SupervisorActor.APICallFailed failed = (SupervisorActor.APICallFailed) msg; + assertEquals("query1", failed.query); + assertEquals("popularity", failed.sortBy); + assertEquals(0, failed.statusCode); } /** @@ -107,7 +120,9 @@ public void fetchCompleteFailure_notifiesWatchers() { */ @Test public void fetchTick_forRemovedQuery_logsWarning() { - ActorRef actor = testKit.spawn(NewsActor.create(wsMock, "dummyKey")); + TestProbe probe3 = testKit.createTestProbe(SupervisorActor.Message.class); + + ActorRef actor = testKit.spawn(NewsActor.create(wsMock, "dummyKey", probe3.getRef())); actor.tell(new NewsActor.FetchTick("missing-query")); } @@ -118,8 +133,9 @@ public void fetchTick_forRemovedQuery_logsWarning() { @Test public void fetchTick_triggersFetchResults() { TestProbe probe = testKit.createTestProbe(UserActor.Message.class); - ActorRef actor = testKit.spawn(NewsActor.create(wsMock, "dummyKey")); + TestProbe probe3 = testKit.createTestProbe(SupervisorActor.Message.class); + ActorRef actor = testKit.spawn(NewsActor.create(wsMock, "dummyKey", probe3.getRef())); actor.tell(new NewsActor.WatchQuery("query1", "popularity", probe.getRef())); actor.tell(new NewsActor.FetchTick("query1-popularity")); } @@ -130,8 +146,9 @@ public void fetchTick_triggersFetchResults() { */ @Test public void watchQuery_withCachedResults_sendsImmediately() throws Exception { - ActorRef actor = testKit.spawn(NewsActor.create(wsMock, "dummyKey")); + TestProbe probe3 = testKit.createTestProbe(SupervisorActor.Message.class); + ActorRef actor = testKit.spawn(NewsActor.create(wsMock, "dummyKey", probe3.getRef())); TestProbe probe1 = testKit.createTestProbe(UserActor.Message.class); actor.tell(new NewsActor.WatchQuery("query1", "popularity", probe1.getRef())); @@ -155,8 +172,9 @@ public void watchQuery_withCachedResults_sendsImmediately() throws Exception { @Test public void watchQuery_multipleWatchersSequential() { TestProbe probe1 = testKit.createTestProbe(UserActor.Message.class); - ActorRef actor = testKit.spawn(NewsActor.create(wsMock, "dummyKey")); + TestProbe probe3 = testKit.createTestProbe(SupervisorActor.Message.class); + ActorRef actor = testKit.spawn(NewsActor.create(wsMock, "dummyKey", probe3.getRef())); actor.tell(new NewsActor.WatchQuery("query1", "popularity", probe1.getRef())); actor.tell(new NewsActor.FetchComplete("query1-popularity", true, null)); probe1.receiveMessage(); @@ -167,4 +185,426 @@ public void watchQuery_multipleWatchersSequential() { UserActor.Message msg = probe2.receiveMessage(Duration.ofSeconds(1)); assertTrue(msg instanceof UserActor.NewsResults); } + + /** + * NewsActor receives fetch failure and sends APICallFailed to Supervisor. + * @author Luan Tran + */ + @Test + public void testNewsActor_fetchFails_sendsMessageToSupervisor() { + // Setup: Create NewsActor with supervisor probe + TestProbe supervisorProbe = + testKit.createTestProbe(SupervisorActor.Message.class); + TestProbe userProbe = + testKit.createTestProbe(UserActor.Message.class); + + ActorRef newsActor = testKit.spawn( + NewsActor.create(wsMock, "testKey", supervisorProbe.getRef()) + ); + + // Setup: Watch a query so there's an active query to fail + newsActor.tell(new NewsActor.WatchQuery("bitcoin", "popularity", userProbe.getRef())); + + // Action: Simulate API call failure with 429 status + newsActor.tell(new NewsActor.FetchComplete("bitcoin-popularity", false, "Rate limit exceeded 429")); + + // Verify: Supervisor receives APICallFailed message + SupervisorActor.Message message = supervisorProbe.receiveMessage(Duration.ofSeconds(3)); + + assertTrue("Expected APICallFailed message", message instanceof SupervisorActor.APICallFailed); + SupervisorActor.APICallFailed failed = (SupervisorActor.APICallFailed) message; + + assertEquals("bitcoin", failed.query); + assertEquals("popularity", failed.sortBy); + assertEquals(429, failed.statusCode); + assertEquals("Rate limit exceeded 429", failed.errorMessage); + } + + /** + * NewsActor sends different status codes to Supervisor correctly. + * @author Luan Tran + */ + @Test + public void testNewsActor_fetchFails_sendsCorrectStatusCodes() { + TestProbe supervisorProbe = + testKit.createTestProbe(SupervisorActor.Message.class); + TestProbe userProbe = + testKit.createTestProbe(UserActor.Message.class); + + ActorRef newsActor = testKit.spawn( + NewsActor.create(wsMock, "testKey", supervisorProbe.getRef()) + ); + + // Watch multiple queries + newsActor.tell(new NewsActor.WatchQuery("query1", "popularity", userProbe.getRef())); + newsActor.tell(new NewsActor.WatchQuery("query2", "popularity", userProbe.getRef())); + newsActor.tell(new NewsActor.WatchQuery("query3", "popularity", userProbe.getRef())); + newsActor.tell(new NewsActor.WatchQuery("query4", "popularity", userProbe.getRef())); + + // Test different error codes + newsActor.tell(new NewsActor.FetchComplete("query1-popularity", false, "Rate limit 429")); + SupervisorActor.APICallFailed msg1 = (SupervisorActor.APICallFailed) supervisorProbe.receiveMessage(); + assertEquals(429, msg1.statusCode); + + newsActor.tell(new NewsActor.FetchComplete("query2-popularity", false, "Server error 500")); + SupervisorActor.APICallFailed msg2 = (SupervisorActor.APICallFailed) supervisorProbe.receiveMessage(); + assertEquals(0, msg2.statusCode); + + newsActor.tell(new NewsActor.FetchComplete("query3-popularity", false, "Unauthorized 401")); + SupervisorActor.APICallFailed msg3 = (SupervisorActor.APICallFailed) supervisorProbe.receiveMessage(); + assertEquals(401, msg3.statusCode); + + newsActor.tell(new NewsActor.FetchComplete("query4-popularity", false, "Forbidden 403")); + SupervisorActor.APICallFailed msg4 = (SupervisorActor.APICallFailed) supervisorProbe.receiveMessage(); + assertEquals(403, msg4.statusCode); + } + + /** + * Complete flow - NewsActor fail → Supervisor → NewsActor retry + * @author Luan Tran + */ + @Test + public void testCompleteFlow_newsActorFailToRetry() { + // Use real supervisor which creates its own NewsActor + ActorRef supervisor = testKit.spawn( + SupervisorActor.create(wsMock) + ); + + // Verify supervisor is operational before test + TestProbe initialProbe = + testKit.createTestProbe(SupervisorActor.OtherActors.class); + supervisor.tell(new SupervisorActor.GetOtherActors(initialProbe.getRef())); + SupervisorActor.OtherActors initialActors = initialProbe.receiveMessage(Duration.ofSeconds(3)); + assertNotNull("Supervisor should be operational initially", initialActors); + + // Simulate a 429 error (as if it came from NewsActor's FetchComplete) + supervisor.tell(new SupervisorActor.APICallFailed( + "bitcoin", + "popularity", + "Rate limit exceeded", + 429 + )); + + // Verify system is stable after this flow + TestProbe verifyProbe = + testKit.createTestProbe(SupervisorActor.OtherActors.class); + supervisor.tell(new SupervisorActor.GetOtherActors(verifyProbe.getRef())); + SupervisorActor.OtherActors actors1 = verifyProbe.receiveMessage(Duration.ofSeconds(3)); + assertNotNull("Supervisor should still be operational after first 429", actors1); + + // Send another 429 to verify key rotation continued + supervisor.tell(new SupervisorActor.APICallFailed( + "ethereum", + "publishedAt", + "Rate limit exceeded", + 429 + )); + + // Verify still operational after second 429 + TestProbe verifyProbe2 = + testKit.createTestProbe(SupervisorActor.OtherActors.class); + supervisor.tell(new SupervisorActor.GetOtherActors(verifyProbe2.getRef())); + SupervisorActor.OtherActors actors2 = verifyProbe2.receiveMessage(Duration.ofSeconds(3)); + assertNotNull("Supervisor should still be operational after second 429", actors2); + + // Verify the actors are the same references (supervisor didn't recreate them) + assertEquals("SourceActor should be the same instance", + initialActors.sourceActor, actors2.sourceActor); + assertEquals("SourcesInfoActor should be the same instance", + initialActors.sourcesInfoActor, actors2.sourcesInfoActor); + } + + /** + * NewsActor receives StopQuery after non-429 error. + * @author Luan Tran + */ + @Test + public void testNewsActor_receivesStopQuery_stopsQuery() { + TestProbe supervisorProbe = + testKit.createTestProbe(SupervisorActor.Message.class); + TestProbe userProbe = + testKit.createTestProbe(UserActor.Message.class); + + ActorRef newsActor = testKit.spawn( + NewsActor.create(wsMock, "testKey", supervisorProbe.getRef()) + ); + + // Setup: Watch a query + newsActor.tell(new NewsActor.WatchQuery("bitcoin", "popularity", userProbe.getRef())); + + // Clear initial NewsResults + UserActor.Message initialResult = userProbe.receiveMessage(Duration.ofSeconds(3)); + assertTrue("Expected initial NewsResults", initialResult instanceof UserActor.NewsResults); + + // Simulate non-429 error + newsActor.tell(new NewsActor.FetchComplete("bitcoin-popularity", false, "Server error 500")); + + // Supervisor receives the error + SupervisorActor.Message errorMsg = supervisorProbe.receiveMessage(Duration.ofSeconds(3)); + assertTrue("Expected APICallFailed", errorMsg instanceof SupervisorActor.APICallFailed); + SupervisorActor.APICallFailed failed = (SupervisorActor.APICallFailed) errorMsg; + assertEquals(0, failed.statusCode); + assertEquals("bitcoin", failed.query); + + // Supervisor would send StopQuery to NewsActor (simulate this) + newsActor.tell(new NewsActor.StopQuery("bitcoin", "popularity")); + + // Verify query is stopped - actor still works with new queries + newsActor.tell(new NewsActor.WatchQuery("test", "popularity", userProbe.getRef())); + UserActor.Message newQueryResult = userProbe.receiveMessage(Duration.ofSeconds(3)); + assertTrue("Actor should accept new queries after stop", + newQueryResult instanceof UserActor.NewsResults); + } + + /** + * Verify UpdateApiKey actually updates the key used in retries. + * @author Luan Tran + */ + @Test + public void testUpdateApiKey_changesKeyForSubsequentRetries() { + TestProbe supervisorProbe = + testKit.createTestProbe(SupervisorActor.Message.class); + TestProbe userProbe = + testKit.createTestProbe(UserActor.Message.class); + + ActorRef newsActor = testKit.spawn( + NewsActor.create(wsMock, "key1", supervisorProbe.getRef()) + ); + + // Watch a query + newsActor.tell(new NewsActor.WatchQuery("bitcoin", "popularity", userProbe.getRef())); + + // Clear initial NewsResults + UserActor.Message initialResult = userProbe.receiveMessage(Duration.ofSeconds(3)); + assertTrue("Expected initial NewsResults", initialResult instanceof UserActor.NewsResults); + + // Test 3 key updates and retries + String[] keys = {"key2", "key3", "key4"}; + for (int i = 0; i < keys.length; i++) { + String key = keys[i]; + + // Update key and retry + newsActor.tell(new NewsActor.UpdateApiKey(key)); + newsActor.tell(new NewsActor.RetryQuery("bitcoin", "popularity")); + + // Verify retry triggered by checking for either supervisor or user message + boolean retryTriggered = false; + try { + supervisorProbe.receiveMessage(Duration.ofSeconds(3)); + retryTriggered = true; + } catch (AssertionError e) { + try { + userProbe.receiveMessage(Duration.ofSeconds(3)); + retryTriggered = true; + } catch (AssertionError e2) { + // Will fail below + } + } + assertTrue("Retry " + (i + 1) + " with key " + key + " should trigger a fetch", retryTriggered); + } + + // Verify actor still operational + newsActor.tell(new NewsActor.WatchQuery("test", "popularity", userProbe.getRef())); + UserActor.Message finalMsg = userProbe.receiveMessage(Duration.ofSeconds(3)); + assertTrue("Actor should still be operational after multiple retries", + finalMsg instanceof UserActor.NewsResults); + } + + /** + * NewsActor receives StopQuery after non-429 error. + * @author Luan Tran + */ + @Test + public void testNewsActor_receivesStopQueryAndUnwatches_MultipleActors() { + TestProbe supervisorProbe = + testKit.createTestProbe(SupervisorActor.Message.class); + TestProbe userProbe1 = + testKit.createTestProbe(UserActor.Message.class); + TestProbe userProbe2 = + testKit.createTestProbe(UserActor.Message.class); + TestProbe userProbe3 = + testKit.createTestProbe(UserActor.Message.class); + + ActorRef newsActor = testKit.spawn( + NewsActor.create(wsMock, "testKey", supervisorProbe.getRef()) + ); + + // Setup: Watch a query with multiple watchers + newsActor.tell(new NewsActor.WatchQuery("bitcoin", "popularity", userProbe1.getRef())); + newsActor.tell(new NewsActor.WatchQuery("bitcoin", "popularity", userProbe2.getRef())); + newsActor.tell(new NewsActor.WatchQuery("bitcoin", "popularity", userProbe3.getRef())); + + // Get initial NewsResults from all three watchers + UserActor.Message result1 = userProbe1.receiveMessage(Duration.ofSeconds(3)); + assertTrue("Expected initial NewsResults for watcher 1", result1 instanceof UserActor.NewsResults); + + UserActor.Message result2 = userProbe2.receiveMessage(Duration.ofSeconds(3)); + assertTrue("Expected initial NewsResults for watcher 2", result2 instanceof UserActor.NewsResults); + + UserActor.Message result3 = userProbe3.receiveMessage(Duration.ofSeconds(3)); + assertTrue("Expected initial NewsResults for watcher 3", result3 instanceof UserActor.NewsResults); + + // Simulate non-429 error + newsActor.tell(new NewsActor.FetchComplete("bitcoin-popularity", false, "Server error 500")); + + // Supervisor receives the error + SupervisorActor.Message errorMsg = supervisorProbe.receiveMessage(Duration.ofSeconds(3)); + assertTrue("Expected APICallFailed", errorMsg instanceof SupervisorActor.APICallFailed); + SupervisorActor.APICallFailed failed = (SupervisorActor.APICallFailed) errorMsg; + assertEquals(0, failed.statusCode); + assertEquals("bitcoin", failed.query); + + // First, test UnwatchQuery - remove one watcher + newsActor.tell(new NewsActor.UnwatchQuery("bitcoin", "popularity", userProbe1.getRef())); + + // Verify watcher was removed by triggering a manual FetchComplete + // Only userProbe2 and userProbe3 should receive results now + newsActor.tell(new NewsActor.FetchComplete("bitcoin-popularity", true, null)); + + // userProbe2 and userProbe3 should receive NewsResults + UserActor.Message updateMsg2 = userProbe2.receiveMessage(Duration.ofSeconds(3)); + assertTrue("Watcher 2 should still receive updates", updateMsg2 instanceof UserActor.NewsResults); + + UserActor.Message updateMsg3 = userProbe3.receiveMessage(Duration.ofSeconds(3)); + assertTrue("Watcher 3 should still receive updates", updateMsg3 instanceof UserActor.NewsResults); + + // userProbe1 should NOT receive anything (it was unwatched) + try { + userProbe1.receiveMessage(Duration.ofMillis(500)); + fail("Watcher 1 should NOT receive updates after being unwatched"); + } catch (AssertionError e) { + // Expected - no message received + } + + // Now test StopQuery - should remove all remaining watchers + newsActor.tell(new NewsActor.StopQuery("bitcoin", "popularity")); + + // Verify query is stopped - actor still works with new queries + newsActor.tell(new NewsActor.WatchQuery("test", "popularity", userProbe1.getRef())); + UserActor.Message newQueryResult = userProbe1.receiveMessage(Duration.ofSeconds(3)); + assertTrue("Actor should accept new queries after stop", + newQueryResult instanceof UserActor.NewsResults); + + // Verify the stopped query doesn't exist anymore by trying to retry it + // This should log a warning but not crash + newsActor.tell(new NewsActor.RetryQuery("bitcoin", "popularity")); + + // Also verify none of the old watchers receive anything + // (since the query was completely stopped and removed) + try { + userProbe2.receiveMessage(Duration.ofMillis(500)); + fail("Watcher 2 should NOT receive updates after StopQuery"); + } catch (AssertionError e) { + // Expected - no message received + } + + try { + userProbe3.receiveMessage(Duration.ofMillis(500)); + fail("Watcher 3 should NOT receive updates after StopQuery"); + } catch (AssertionError e) { + // Expected - no message received + } + } + + /** + * Last watcher removal triggers query cleanup. + * @author Luan Tran + */ + @Test + public void testLastWatcherRemoval_triggersQueryCleanup() { + TestProbe supervisorProbe = + testKit.createTestProbe(SupervisorActor.Message.class); + TestProbe userProbe1 = + testKit.createTestProbe(UserActor.Message.class); + TestProbe userProbe2 = + testKit.createTestProbe(UserActor.Message.class); + TestProbe userProbe3 = + testKit.createTestProbe(UserActor.Message.class); + + ActorRef newsActor = testKit.spawn( + NewsActor.create(wsMock, "testKey", supervisorProbe.getRef()) + ); + + // Setup: Watch a query with 3 watchers + newsActor.tell(new NewsActor.WatchQuery("bitcoin", "popularity", userProbe1.getRef())); + newsActor.tell(new NewsActor.WatchQuery("bitcoin", "popularity", userProbe2.getRef())); + newsActor.tell(new NewsActor.WatchQuery("bitcoin", "popularity", userProbe3.getRef())); + + // Clear initial NewsResults + userProbe1.receiveMessage(Duration.ofSeconds(3)); + userProbe2.receiveMessage(Duration.ofSeconds(3)); + userProbe3.receiveMessage(Duration.ofSeconds(3)); + + // Remove watchers one by one + // First removal - query should still exist (2 watchers remain) + newsActor.tell(new NewsActor.UnwatchQuery("bitcoin", "popularity", userProbe1.getRef())); + + // Verify query still exists by triggering a fetch complete + newsActor.tell(new NewsActor.FetchComplete("bitcoin-popularity", true, null)); + + // Remaining watchers should receive updates + UserActor.Message msg1 = userProbe2.receiveMessage(Duration.ofSeconds(3)); + assertTrue("Watcher 2 should receive updates", msg1 instanceof UserActor.NewsResults); + + UserActor.Message msg2 = userProbe3.receiveMessage(Duration.ofSeconds(3)); + assertTrue("Watcher 3 should receive updates", msg2 instanceof UserActor.NewsResults); + + // Second removal - query should still exist (1 watcher remains) + newsActor.tell(new NewsActor.UnwatchQuery("bitcoin", "popularity", userProbe2.getRef())); + + // Verify query still exists + newsActor.tell(new NewsActor.FetchComplete("bitcoin-popularity", true, null)); + + // Last watcher should receive updates + UserActor.Message msg3 = userProbe3.receiveMessage(Duration.ofSeconds(3)); + assertTrue("Watcher 3 should still receive updates", msg3 instanceof UserActor.NewsResults); + + // Third removal - THIS IS THE LAST WATCHER + // This should trigger the cleanup logic: + // - timers.cancel(queryKey) + // - queries.remove(queryKey) + newsActor.tell(new NewsActor.UnwatchQuery("bitcoin", "popularity", userProbe3.getRef())); + + // Verify query was completely removed by trying to: + // 1. Retry the query (should log warning about non-existent query) + newsActor.tell(new NewsActor.RetryQuery("bitcoin", "popularity")); + + // 2. Send a FetchComplete for it (should be ignored or log warning) + newsActor.tell(new NewsActor.FetchComplete("bitcoin-popularity", true, null)); + + // 3. Verify no watchers receive anything (query is gone) + try { + userProbe1.receiveMessage(Duration.ofSeconds(3)); + fail("No watcher should receive updates after query cleanup"); + } catch (AssertionError e) { + // Expected - no message + } + + try { + userProbe2.receiveMessage(Duration.ofSeconds(3)); + fail("No watcher should receive updates after query cleanup"); + } catch (AssertionError e) { + // Expected - no message + } + + try { + userProbe3.receiveMessage(Duration.ofSeconds(3)); + fail("No watcher should receive updates after query cleanup"); + } catch (AssertionError e) { + // Expected - no message + } +// +// // Verify actor is still operational with a new query +// newsActor.tell(new NewsActor.WatchQuery("ethereum", "publishedAt", userProbe1.getRef())); +// UserActor.Message newQueryMsg = userProbe1.receiveMessage(Duration.ofSeconds(3)); +// assertTrue("Actor should handle new queries after cleanup", +// newQueryMsg instanceof UserActor.NewsResults); +// +// // Final verification: Watch the same query again (should create a fresh query) +// newsActor.tell(new NewsActor.WatchQuery("bitcoin", "popularity", userProbe2.getRef())); +// UserActor.Message restartMsg = userProbe2.receiveMessage(Duration.ofSeconds(3)); +// assertTrue("Should be able to re-watch a cleaned up query", +// restartMsg instanceof UserActor.NewsResults); + } + } diff --git a/test/actors/SupervisorActorTest.java b/test/actors/SupervisorActorTest.java index 1038d86..478bd36 100644 --- a/test/actors/SupervisorActorTest.java +++ b/test/actors/SupervisorActorTest.java @@ -4,6 +4,7 @@ import org.apache.pekko.actor.testkit.typed.javadsl.ActorTestKit; import org.apache.pekko.actor.testkit.typed.javadsl.TestProbe; import org.apache.pekko.actor.typed.ActorRef; +import org.apache.pekko.actor.typed.javadsl.Behaviors; import org.apache.pekko.stream.javadsl.Flow; import com.fasterxml.jackson.databind.JsonNode; import org.junit.AfterClass; @@ -26,8 +27,6 @@ public class SupervisorActorTest { private static ActorTestKit testKit; private WSClient mockWsClient; - private AsyncCacheApi mockCache; - private String apiKey; private ActorRef supervisorActor; @BeforeClass @@ -43,15 +42,8 @@ public static void teardownClass() { @Before public void setup() { mockWsClient = mock(WSClient.class); - mockCache = mock(AsyncCacheApi.class); - when(mockCache.set(anyString(), any(), anyInt())) - .thenReturn(CompletableFuture.completedFuture(null)); - when(mockCache.get(anyString())) - .thenReturn(CompletableFuture.completedFuture(null)); - - apiKey = "test-api-key"; supervisorActor = testKit.spawn( - SupervisorActor.create(mockWsClient, apiKey, mockCache) + SupervisorActor.create(mockWsClient) ); } @@ -360,4 +352,165 @@ public void testStatsRequestsForDifferentSessions() { assertEquals("Session 2 query should match", "query-for-session-2", response2.query()); } + /** + * Supervisor receives 429 error and sends UpdateApiKey + RetryQuery to NewsActor. + * Need to modify Supervisor behavior to test newsActor within it + * @author Luan Tran + */ + @Test + public void testSupervisor_receives429_sendsRetryToNewsActor() { + // Create a probe to monitor messages sent to NewsActor + TestProbe newsActorMonitor = + testKit.createTestProbe(NewsActor.Message.class); + + // Create a test supervisor that mimics the real SupervisorActor's behavior + // but with a monitored NewsActor so we can intercept messages + ActorRef testSupervisor = testKit.spawn( + Behaviors.setup(context -> { + // Create NewsActor with monitoring + ActorRef newsActor = context.spawn( + Behaviors.monitor( + NewsActor.Message.class, + newsActorMonitor.getRef(), + NewsActor.create(mockWsClient, "key1", context.getSelf()) + ), + "newsActor" + ); + + // Simulate supervisor's API key rotation + String[] apiKeys = {"key1", "key2", "key3", "key4"}; + java.util.concurrent.atomic.AtomicInteger currentKeyIndex = + new java.util.concurrent.atomic.AtomicInteger(0); + + return Behaviors.receive(SupervisorActor.Message.class) + .onMessage(SupervisorActor.APICallFailed.class, msg -> { + context.getLog().info("Received APICallFailed with status: {}", msg.statusCode); + + if (msg.statusCode == 429) { + // Rotate API key (same logic as real SupervisorActor) + int nextIndex = currentKeyIndex.updateAndGet(i -> (i + 1) % apiKeys.length); + String newApiKey = apiKeys[nextIndex]; + + context.getLog().info("Rate limit hit! Rotating to key index {} and retrying query: {}", + nextIndex, msg.query); + + // Send UpdateApiKey and RetryQuery to NewsActor + newsActor.tell(new NewsActor.UpdateApiKey(newApiKey)); + newsActor.tell(new NewsActor.RetryQuery(msg.query, msg.sortBy)); + } + return Behaviors.same(); + }) + .build(); + }) + ); + + // Send 429 error to supervisor + testSupervisor.tell(new SupervisorActor.APICallFailed( + "bitcoin", + "popularity", + "Rate limit exceeded", + 429 + )); + + // Verify: NewsActor receives UpdateApiKey + NewsActor.Message msg1 = newsActorMonitor.receiveMessage(Duration.ofSeconds(3)); + assertTrue("Expected UpdateApiKey message", msg1 instanceof NewsActor.UpdateApiKey); + NewsActor.UpdateApiKey updateMsg = (NewsActor.UpdateApiKey) msg1; + assertEquals("key2", updateMsg.newApiKey); // Should rotate from key1 to key2 + + // Verify: NewsActor receives RetryQuery + NewsActor.Message msg2 = newsActorMonitor.receiveMessage(Duration.ofSeconds(3)); + assertTrue("Expected RetryQuery message", msg2 instanceof NewsActor.RetryQuery); + NewsActor.RetryQuery retryMsg = (NewsActor.RetryQuery) msg2; + assertEquals("bitcoin", retryMsg.query); + assertEquals("popularity", retryMsg.sortBy); + + // Test another 429 to verify continued key rotation + testSupervisor.tell(new SupervisorActor.APICallFailed( + "ethereum", + "publishedAt", + "Rate limit exceeded", + 429 + )); + + // Verify rotation to key3 + NewsActor.Message msg3 = newsActorMonitor.receiveMessage(Duration.ofSeconds(3)); + assertTrue("Expected UpdateApiKey message", msg3 instanceof NewsActor.UpdateApiKey); + assertEquals("key3", ((NewsActor.UpdateApiKey) msg3).newApiKey); + + // Verify second retry + NewsActor.Message msg4 = newsActorMonitor.receiveMessage(Duration.ofSeconds(3)); + assertTrue("Expected RetryQuery message", msg4 instanceof NewsActor.RetryQuery); + assertEquals("ethereum", ((NewsActor.RetryQuery) msg4).query); + assertEquals("publishedAt", ((NewsActor.RetryQuery) msg4).sortBy); + } + + /** + * Supervisor receives non-429 error and sends StopQuery to NewsActor. + * @author Luan Tran + */ + @Test + public void testSupervisor_receivesNon429_sendsStopQuery() { + // Create a NewsActor probe to verify it receives StopQuery + TestProbe newsActorProbe = + testKit.createTestProbe(NewsActor.Message.class); + + // Simulate SupervisorActor's behavior on non-429 error: + // 1. Send StopQuery to NewsActor + newsActorProbe.getRef().tell(new NewsActor.StopQuery("bitcoin", "popularity")); + + // Verify: NewsActor receives StopQuery + NewsActor.Message msg = newsActorProbe.receiveMessage(Duration.ofSeconds(3)); + assertTrue("Expected StopQuery message", msg instanceof NewsActor.StopQuery); + NewsActor.StopQuery stopMsg = (NewsActor.StopQuery) msg; + assertEquals("bitcoin", stopMsg.query); + assertEquals("popularity", stopMsg.sortBy); + + // Additional verification: Test with real supervisor + ActorRef supervisor = testKit.spawn( + SupervisorActor.create(mockWsClient) + ); + + // Send 500 error to supervisor - it should send StopQuery to its internal NewsActor + supervisor.tell(new SupervisorActor.APICallFailed( + "ethereum", + "publishedAt", + "Internal server error", + 500 + )); + + // Verify supervisor handled it correctly (system remains stable) + TestProbe verifyProbe = + testKit.createTestProbe(SupervisorActor.OtherActors.class); + supervisor.tell(new SupervisorActor.GetOtherActors(verifyProbe.getRef())); + SupervisorActor.OtherActors actors = verifyProbe.receiveMessage(Duration.ofSeconds(3)); + assertNotNull("Supervisor should still be operational", actors); + } + + /** + * Supervisor correctly routes different errors. + * @author Luan Tran + */ + @Test + public void testSupervisor_routesErrorsCorrectly() { + ActorRef supervisor = testKit.spawn( + SupervisorActor.create(mockWsClient) + ); + + // Send various errors + supervisor.tell(new SupervisorActor.APICallFailed("q1", "popularity", "Rate limit", 429)); + supervisor.tell(new SupervisorActor.APICallFailed("q2", "popularity", "Server error", 500)); + supervisor.tell(new SupervisorActor.APICallFailed("q3", "popularity", "Unauthorized", 401)); + supervisor.tell(new SupervisorActor.APICallFailed("q4", "popularity", "Forbidden", 403)); + supervisor.tell(new SupervisorActor.APICallFailed("q5", "popularity", "Service unavailable", 503)); + + // Verify all handled correctly (system stable) + TestProbe verifyProbe = + testKit.createTestProbe(SupervisorActor.OtherActors.class); + supervisor.tell(new SupervisorActor.GetOtherActors(verifyProbe.getRef())); + verifyProbe.receiveMessage(Duration.ofSeconds(3)); + } + + + } diff --git a/test/actors/UserActorTest.java b/test/actors/UserActorTest.java index a15d72e..9e405ad 100644 --- a/test/actors/UserActorTest.java +++ b/test/actors/UserActorTest.java @@ -79,7 +79,7 @@ public void setup() { userId = "test-user-" + System.currentTimeMillis(); // Create UserActor - userActor = testKit.spawn(UserActor.create(userId, mockCache, newsActorProbe.ref(),readabilityActorProbe.ref(), sentimentActorProbe.ref())); + userActor = testKit.spawn(UserActor.create(userId, newsActorProbe.ref(),readabilityActorProbe.ref(), sentimentActorProbe.ref())); } /**