Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
258 changes: 197 additions & 61 deletions app/actors/NewsActor.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,36 @@ public WatchQuery(String query, String sortBy, ActorRef<UserActor.Message> watch
}
}

/**
* Request to stop watching a news query.
* @author Luan Tran
*/
public static final class UnwatchQuery implements Message {
public final String query;
public final String sortBy;
public final ActorRef<UserActor.Message> watcher;

public UnwatchQuery(String query, String sortBy, ActorRef<UserActor.Message> watcher) {
this.query = query;
this.sortBy = sortBy;
this.watcher = watcher;
}
}

/**
* Request to stop all watching for a specific query (from supervisor on errors).
* @author Luan Tran
*/
public static final class StopQuery implements Message {
public final String query;
public final String sortBy;

public StopQuery(String query, String sortBy) {
this.query = query;
this.sortBy = sortBy;
}
}

/**
* Internal message to trigger a fetch for a specific query.
* <p>
Expand Down Expand Up @@ -132,13 +162,40 @@ static final class FetchComplete implements Message {
}
}


/**
* Message to update the API key used by this actor.
* @author Luan Tran
*/
public static final class UpdateApiKey implements Message {
public final String newApiKey;

public UpdateApiKey(String newApiKey) {
this.newApiKey = newApiKey;
}
}

/**
* Message to retry a failed query with the updated API key.
* @author Luan Tran
*/
public static final class RetryQuery implements Message {
public final String query;
public final String sortBy;

public RetryQuery(String query, String sortBy) {
this.query = query;
this.sortBy = sortBy;
}
}

// --- State ---

/** WebSocket client for making HTTP requests to the News API */
private final WSClient ws;

/** API key for authenticating with the News API */
private final String apiKey;
private String apiKey;

/** Actor context providing logging and self-reference */
private final ActorContext<Message> context;
Expand All @@ -151,28 +208,13 @@ static final class FetchComplete implements Message {
*/
private final Map<String, QueryInfo> queries = new HashMap<>();

private final ActorRef<SupervisorActor.Message> supervisor;


/**
* Wrapper that holds a Search object and its watchers.
* @author Luan Tran
*/
// static class QueryInfo {
// /** The Search model object containing query parameters and fetched results */
// final Search search;
//
// /** Set of UserActors watching this query and expecting updates */
// final Set<ActorRef<UserActor.Message>> watchers = new HashSet<>();
//
// /**
// * Constructs a new QueryInfo with the specified parameters.
// * @author Luan Tran
// * @param query the search query string
// * @param sortBy the sort order for results
// * @param apiKey the News API key
// */
// QueryInfo(String query, String sortBy, String apiKey) {
// this.search = new Search(query, sortBy, apiKey);
// }
// }
static class QueryInfo {
/** Raw query string for this topic */
final String query;
Expand All @@ -187,7 +229,11 @@ static class QueryInfo {
final Set<ActorRef<UserActor.Message>> watchers = new HashSet<>();

/**
* Constructs a new QueryInfo with the specified parameters.
* Constructs a new QueryInfo with the specified parameters
* @author Luan Tran, SIming Yi
* @param query the search query string
* @param sortBy the sort order for results
* @param apiKey the News API key
*/
QueryInfo(String query, String sortBy, String apiKey) {
this.query = query;
Expand All @@ -207,9 +253,9 @@ static class QueryInfo {
* @return a Behavior that can be spawned as an actor
* @author Luan Tran
*/
public static Behavior<Message> create(WSClient ws, String apiKey) {
public static Behavior<Message> create(WSClient ws, String apiKey, ActorRef<SupervisorActor.Message> supervisor) {
return Behaviors.setup(context ->
Behaviors.withTimers(timers -> new NewsActor(ws, apiKey, context, timers).behavior())
Behaviors.withTimers(timers -> new NewsActor(ws, apiKey, context, timers, supervisor).behavior())
);
}

Expand All @@ -223,11 +269,12 @@ public static Behavior<Message> create(WSClient ws, String apiKey) {
* @param context the actor context
* @param timers the timer scheduler
*/
private NewsActor(WSClient ws, String apiKey, ActorContext<Message> context, TimerScheduler<Message> timers) {
private NewsActor(WSClient ws, String apiKey, ActorContext<Message> context, TimerScheduler<Message> timers, ActorRef<SupervisorActor.Message> supervisor) {
this.ws = ws;
this.apiKey = apiKey;
this.context = context;
this.timers = timers;
this.supervisor = supervisor;
}

// --- Behavior ---
Expand All @@ -242,6 +289,10 @@ private Behavior<Message> behavior() {
.onMessage(WatchQuery.class, this::onWatchQuery)
.onMessage(FetchTick.class, this::onFetchTick)
.onMessage(FetchComplete.class, this::onFetchComplete)
.onMessage(UpdateApiKey.class, this::onUpdateApiKey)
.onMessage(RetryQuery.class, this::onRetryQuery)
.onMessage(UnwatchQuery.class, this::onUnwatchQuery)
.onMessage(StopQuery.class, this::onStopQuery)
.build();
}

Expand All @@ -268,7 +319,7 @@ private Behavior<Message> onWatchQuery(WatchQuery msg) {
queryKey,
new FetchTick(queryKey),
Duration.ZERO, // Start immediately
Duration.ofSeconds(30)
Duration.ofSeconds(10)
);

context.getLog().info("Started periodic fetching for query: {}", queryKey);
Expand All @@ -288,42 +339,10 @@ private Behavior<Message> onWatchQuery(WatchQuery msg) {

/**
* Handles FetchTick messages to trigger periodic news fetches.
* @author Luan
* @author Luan Tran, Siming Yi
* @param msg the FetchTick message containing the query key
* @return Behaviors.same() to continue with the same behavior
*/
/*private Behavior<Message> onFetchTick(FetchTick msg) {
QueryInfo info = queries.get(msg.queryKey);
if (info == null) {
// Query was removed, timer should have been cancelled
context.getLog().warn("FetchTick for removed query: {}", msg.queryKey);
return Behaviors.same();
}

context.getLog().info("Fetching news for query: {} ({} watchers)",
msg.queryKey, info.watchers.size());
System.out.println(">>> API CALL TRIGGERED for query: " + msg.queryKey + " at " + java.time.Instant.now());


ActorRef<Message> self = context.getSelf();

// Reuse the same Search object - fetchResults will update its results
info.search.fetchResults(ws).whenComplete((v, throwable) -> {
if (throwable != null) {
self.tell(new FetchComplete(msg.queryKey, false, throwable.getMessage()));
} else {
System.out.println(">>> API CALL COMPLETED for query: " + msg.queryKey);
self.tell(new FetchComplete(msg.queryKey, true, null));
}
});

return Behaviors.same();
}*/
/**
* Handles FetchTick messages to trigger periodic news fetches.
* For each tick, we create a fresh Search snapshot so that UserActor
* can compare "new results" vs "previous results" correctly.
*/
private Behavior<Message> onFetchTick(FetchTick msg) {
QueryInfo info = queries.get(msg.queryKey);
if (info == null) {
Expand Down Expand Up @@ -364,14 +383,21 @@ private Behavior<Message> onFetchTick(FetchTick msg) {
private Behavior<Message> onFetchComplete(FetchComplete msg) {
QueryInfo info = queries.get(msg.queryKey);

// ADD THIS:
if (info == null) {
context.getLog().warn("FetchComplete for removed query: {}", msg.queryKey);
return Behaviors.same();
}

if (!msg.success) {
context.getLog().error("Failed to fetch news for query: {}, error: {}",
msg.queryKey, msg.error);
int statusCode = extractStatusCode(msg.error);

// Report failure to supervisor with status code
supervisor.tell(new SupervisorActor.APICallFailed(
info.query, info.sortBy, msg.error, statusCode));

// Notify all watchers of the failure
for (ActorRef<UserActor.Message> watcher : info.watchers) {
watcher.tell(new UserActor.NewsFetchFailed(info.search.getRawQuery(), msg.error));
}
} else {
int resultCount = info.search.getResults() != null ? info.search.getResults().size() : 0;
context.getLog().info("News fetched successfully for query: {} - {} articles, notifying {} watchers",
Expand All @@ -386,6 +412,102 @@ private Behavior<Message> onFetchComplete(FetchComplete msg) {
return Behaviors.same();
}

/**
* Handles UpdateApiKey messages to rotate the API key.
* @author Luan Tran
* @param msg the UpdateApiKey message containing the new key
*/
private Behavior<Message> onUpdateApiKey(UpdateApiKey msg) {
context.getLog().info("Updating API key");
this.apiKey = msg.newApiKey;
return Behaviors.same();
}

/**
* Handles RetryQuery messages to retry a failed query with the new API key.
* @author Luan Tran
* @param msg the RetryQuery message containing query details
*/
private Behavior<Message> onRetryQuery(RetryQuery msg) {
String queryKey = makeQueryKey(msg.query, msg.sortBy);
QueryInfo info = queries.get(queryKey);

if (info == null) {
context.getLog().warn("RetryQuery for unknown query: {}", queryKey);
return Behaviors.same();
}

context.getLog().info("Retrying query {} with new API key", queryKey);

// Trigger an immediate fetch with the new API key
ActorRef<Message> self = context.getSelf();

// Update the QueryInfo's search with new API key
Search newSearch = new Search(info.query, info.sortBy, apiKey);
info.search = newSearch;

newSearch.fetchResults(ws).whenComplete((v, throwable) -> {
if (throwable != null) {
self.tell(new FetchComplete(queryKey, false, throwable.getMessage()));
} else {
self.tell(new FetchComplete(queryKey, true, null));
}
});

return Behaviors.same();
}

/**
* Handles UnwatchQuery messages to remove a watcher from a query.
* @author Luan Tran
* @param msg the UnwatchQuery message containing query details and watcher reference
*/
private Behavior<Message> onUnwatchQuery(UnwatchQuery msg) {
String queryKey = makeQueryKey(msg.query, msg.sortBy);
context.getLog().info("User unwatching query: {}", queryKey);

QueryInfo info = queries.get(queryKey);

// Remove this watcher
info.watchers.remove(msg.watcher);
context.getLog().info("Query {} now has {} watchers", queryKey, info.watchers.size());

// If no more watchers, stop fetching and clean up
if (info.watchers.isEmpty()) {
context.getLog().info("No more watchers for query: {}, stopping timer", queryKey);
timers.cancel(queryKey);
queries.remove(queryKey);
}

return Behaviors.same();
}

/**
* Handles StopQuery messages to stop all fetching for a query due to errors.
* @author Luan Tran
* @param msg the StopQuery message containing query details
*/
private Behavior<Message> onStopQuery(StopQuery msg) {
String queryKey = makeQueryKey(msg.query, msg.sortBy);
context.getLog().info("Stopping query due to error: {}", queryKey);

QueryInfo info = queries.get(queryKey);
if (info == null) {
context.getLog().warn("Stop request for unknown query: {}", queryKey);
return Behaviors.same();
}

// Stop the timer
timers.cancel(queryKey);

// Remove all watchers and the query
context.getLog().info("Stopped fetching for query: {} ({} watchers removed)",
queryKey, info.watchers.size());
queries.remove(queryKey);

return Behaviors.same();
}

/**
* Creates a unique key for a query based on its parameters.
* @author Luan Tran
Expand All @@ -396,4 +518,18 @@ private Behavior<Message> onFetchComplete(FetchComplete msg) {
private String makeQueryKey(String query, String sortBy) {
return query + "-" + sortBy;
}

/**
* Extracts HTTP status code from exception message.
* @author Luan Tran
* @param errorMessage the exception to extract from
* @return the HTTP status code, or 0 if not found
*/
private int extractStatusCode(String errorMessage) {
if (errorMessage.contains("429")) return 429;
if (errorMessage.contains("401")) return 401;
if (errorMessage.contains("403")) return 403;

return 0;
}
}
Loading