Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 4 additions & 6 deletions .env-example
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
API_KEY=xxxxxxx
API_URL=http://localhost:3001
NODE_ENV="development"
GOOGLE_API_KEY=your_google_api_key
INSTANCE_DOMAIN=api.topia.io
INSTANCE_PROTOCOL=https
INTERACTIVE_KEY=xxxxxxx
INTERACTIVE_SECRET=xxxxxxx
NODE_ENV="development"
INTERACTIVE_KEY=your_interactive_key
INTERACTIVE_SECRET=your_interactive_secret
20 changes: 10 additions & 10 deletions .github/workflows/aws_auto_release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,19 +36,19 @@ jobs:

# Read CODEOWNERS file if it exists
if [[ -f ".github/CODEOWNERS" ]]; then
echo "📋 Reading CODEOWNERS file..."
echo "[INFO] Reading CODEOWNERS file..."
# Extract usernames from CODEOWNERS (remove @ prefix)
codeowners=$(grep -v '^#' .github/CODEOWNERS | grep -o '@[a-zA-Z0-9_-]*' | sed 's/@//' | sort -u)
for user in $codeowners; do
authorized_users+=("$user")
echo " - CODEOWNER: $user"
done
else
echo "⚠️ No CODEOWNERS file found"
echo "[WARN] No CODEOWNERS file found"
fi

# Get repository collaborators with admin/maintain permissions using GitHub API
echo "🔍 Checking repository permissions..."
echo "[CHECK] Checking repository permissions..."

# Check if user has admin or maintain permissions
user_permission=$(curl -s -H "Authorization: token ${{ secrets.PAT }}" \
Expand All @@ -65,15 +65,15 @@ jobs:
for user in "${authorized_users[@]}"; do
if [[ "$user" == "$merged_by" ]]; then
is_authorized=true
echo " User $merged_by is authorized via CODEOWNERS"
echo "[OK] User $merged_by is authorized via CODEOWNERS"
break
fi
done

# Check if user has admin or maintain permissions
if [[ "$user_permission" == "admin" || "$user_permission" == "maintain" ]]; then
is_authorized=true
echo " User $merged_by is authorized via repository permissions ($user_permission)"
echo "[OK] User $merged_by is authorized via repository permissions ($user_permission)"
fi

# Check if user is organization owner (for metaversecloud-com org)
Expand All @@ -94,21 +94,21 @@ jobs:

if [[ "$owner_status" == "admin" ]]; then
is_authorized=true
echo " User $merged_by is authorized as organization owner"
echo "[OK] User $merged_by is authorized as organization owner"
fi
fi

echo "is_authorized=$is_authorized" >> $GITHUB_OUTPUT

if [[ "$is_authorized" == "false" ]]; then
echo " User $merged_by is not authorized to trigger releases"
echo "💡 Authorized users include:"
echo "[ERROR] User $merged_by is not authorized to trigger releases"
echo "[TIP] Authorized users include:"
echo " - CODEOWNERS: ${authorized_users[*]}"
echo " - Repository admins and maintainers"
echo " - Organization owners"
exit 0
else
echo "🎉 User $merged_by is authorized to trigger releases"
echo "[SUCCESS] User $merged_by is authorized to trigger releases"
fi

- name: Check for release labels and determine version bumps
Expand Down Expand Up @@ -221,7 +221,7 @@ jobs:
generate_release_notes: true
make_latest: true
body: |
## 🚀 Release ${{ env.NEW_VERSION }}
## ? Release ${{ env.NEW_VERSION }}

**Version Bumps Applied:**
- Major: ${{ steps.check.outputs.has_major }}
Expand Down
14 changes: 14 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,25 @@

Jukebox app built on Topia's SDK.

## Environment Variables

Create a `.env` file in the root directory. See `.env-example` for a template.

| Variable | Description | Required |
| ---------------------- | ---------------------------------------------------------------------------------- | -------- |
| `NODE_ENV` | Node environment | No |
| `SKIP_PREFLIGHT_CHECK` | Skip CRA preflight check | No |
| `GOOGLE_API_KEY` | YouTube Data API key for searching videos | Yes |
| `INSTANCE_DOMAIN` | Topia API domain (`api.topia.io` for production, `api-stage.topia.io` for staging) | Yes |
| `INTERACTIVE_KEY` | Topia interactive app key | Yes |
| `INTERACTIVE_SECRET` | Topia interactive app secret | Yes |

## Getting Started

### Built With

#### Client

![React](https://img.shields.io/badge/react-%2320232a.svg?style=for-the-badge&logo=react&logoColor=%2361DAFB)
![Vite](https://img.shields.io/badge/vite-%23646CFF.svg?style=for-the-badge&logo=vite&logoColor=white)
![TypeScript](https://img.shields.io/badge/typescript-%23007ACC.svg?style=for-the-badge&logo=typescript&logoColor=white)
Expand Down
190 changes: 163 additions & 27 deletions server/redis-sse/index.ts
Original file line number Diff line number Diff line change
@@ -1,43 +1,169 @@
import { createClient } from "redis";

import * as redis from "redis";
import { Response } from "express";
import { getCredentials } from "../utils/index.js";

const RAPID_RETRY_MAX = 10;
const RAPID_ERROR_THRESHOLD = 5000;

let pubRapidErrorCount = 0;
let pubReconnectionAttempt = 0;
let pubLastReconnectAttemptTime = null;
let pubLastConnectionTime = null;

let subRapidErrorCount = 0;
let subReconnectionAttempt = 0;
let subLastReconnectAttemptTime = null;
let subLastConnectionTime = null;

const getRedisHealth = (name) => {
const currentTime = new Date().getTime();
const lastConnectionTime = name === "pub" ? pubLastConnectionTime : subLastConnectionTime;
const lastReconnectAttemptTime = name === "pub" ? pubLastReconnectAttemptTime : subLastReconnectAttemptTime;
const rapidReconnectCount = name === "pub" ? pubRapidErrorCount : subRapidErrorCount;
const reconnectCount = name === "pub" ? pubReconnectionAttempt : subReconnectionAttempt;
const status = rapidReconnectCount < RAPID_RETRY_MAX ? "OK" : "UNHEALTHY";
const timeSinceLastReconnectAttempt = lastReconnectAttemptTime ? currentTime - lastReconnectAttemptTime : null;

return {
status,
currentTime,
lastConnectionTime,
rapidReconnectCount,
reconnectCount,
timeSinceLastReconnectAttempt,
};
};

const handleRedisConnection = (client, name) => {
const { reconnectCount, currentTime, status } = getRedisHealth(name);
const info = reconnectCount ? `status: ${status}, reconnectCount: ${reconnectCount}` : `status: ${status}`;
console.log(`Redis connected - ${name} server, on process: ${process.pid}`, info);
if (name === "pub") pubLastConnectionTime = currentTime;
if (name === "sub") subLastConnectionTime = currentTime;
client.health = getRedisHealth(name);
};

const handleRedisReconnection = (name) => {
const { currentTime, timeSinceLastReconnectAttempt } = getRedisHealth(name);
if (name === "pub") {
pubLastReconnectAttemptTime = currentTime;
pubReconnectionAttempt++;
if (timeSinceLastReconnectAttempt && timeSinceLastReconnectAttempt < RAPID_ERROR_THRESHOLD) {
pubRapidErrorCount++;
}
}
if (name === "sub") {
subLastReconnectAttemptTime = currentTime;
subReconnectionAttempt++;
if (timeSinceLastReconnectAttempt && timeSinceLastReconnectAttempt < RAPID_ERROR_THRESHOLD) {
subRapidErrorCount++;
}
}
};

const handleRedisError = (name, error) => {
const { reconnectCount, rapidReconnectCount, status, timeSinceLastReconnectAttempt } = getRedisHealth(name);
const info = reconnectCount
? `status: ${status}, reconnectCount: ${reconnectCount}, rapidReconnectCount: ${rapidReconnectCount} timeSinceLastReconnectAttempt: ${timeSinceLastReconnectAttempt}`
: `status: ${status}`;
console.error(`Redis error - ${name} server, on process: ${process.pid}, ${info}`);
console.error(`Redis error details - ${error}`);
};

function getRedisClient(url = process.env.REDIS_URL) {
let isClusterMode = false;
if (typeof process.env.REDIS_CLUSTER_MODE === "undefined") {
console.log("[Redis] Environment variable REDIS_CLUSTER_MODE is not set. Defaulting to false.");
} else {
isClusterMode = process.env.REDIS_CLUSTER_MODE === "true";
}
const safeUrl = url || "";
const parsedUrl = new URL(safeUrl);
const host = parsedUrl.hostname;
const port = parsedUrl.port ? parseInt(parsedUrl.port) : 6379;
const username = parsedUrl.username || "default";
const password = parsedUrl.password || "";
const tls = safeUrl.startsWith("rediss");

if (!isClusterMode) {
return redis.createClient({
socket: {
host,
port,
tls,
},
username,
password,
url: safeUrl,
});
}
return redis.createCluster({
useReplicas: true,
rootNodes: [
{
url: safeUrl,
socket: {
tls,
},
},
],
defaults: {
username,
password,
},
});
}

export const redisClient = getRedisClient();
redisClient.on("connect", () => {
handleRedisConnection(redisClient, "pub");
});
redisClient.on("reconnecting", () => {
handleRedisReconnection("pub");
});
redisClient.on("error", (error) => {
handleRedisError("pub", error);
});

export const redisSubClient = getRedisClient();
redisSubClient.on("connect", () => {
handleRedisConnection(redisSubClient, "sub");
});
redisSubClient.on("reconnecting", () => {
handleRedisReconnection("sub");
});
redisSubClient.on("error", (error) => {
handleRedisError("sub", error);
});

const shouldSendEvent = (
data: { assetId: string; visitorId: string | undefined; interactiveNonce: string | undefined },
assetId: string,
visitorId: string,
interactiveNonce: string,
) => {
return (
data.assetId === assetId &&
(data.visitorId === undefined || data.visitorId !== visitorId) &&
(data.interactiveNonce === undefined || data.interactiveNonce !== interactiveNonce)
);
};

const connectionOpt = {
url: process.env.REDIS_URL,
socket: {
tls: process.env.REDIS_URL!.startsWith("rediss"),
},
return data.assetId === assetId && data.visitorId !== visitorId && data.interactiveNonce !== interactiveNonce;
};

const redisObj = {
publisher: createClient(connectionOpt),
subscriber: createClient(connectionOpt),
publish: function (channel: string, message: any) {
console.log(`Publishing ${message.event} to ${channel}`);
publisher: redisClient,
subscriber: redisSubClient,
connections: [],
publish: function (channel, message) {
if (process.env.NODE_ENV === "development") console.log(`Publishing ${message.event} to ${channel}`);
this.publisher.publish(channel, JSON.stringify(message));
},
subscribe: function (channel: string) {
this.subscriber.subscribe(channel, (message) => {
const data = JSON.parse(message);
console.log(`Event '${data.event}' received on ${channel}`);
if (process.env.NODE_ENV === "development") console.log(`Event '${data.event}' received on ${channel}`);
let dataToSend: { data?: any; kind?: string } = {};
if (data.event === "nowPlaying") {
dataToSend = { data: { videoId: data.videoId, nextUpId: data.nextUpId }, kind: "nowPlaying" };
} else if (data.event === "mediaAction") {
dataToSend = { data: { media: data.videos }, kind: data.kind };
}

this.connections.forEach(({ res: existingConnection }) => {
const { assetId, visitorId, interactiveNonce } = existingConnection.req.query;
if (shouldSendEvent(data, assetId, visitorId, interactiveNonce)) {
Expand All @@ -46,10 +172,8 @@ const redisObj = {
});
});
},
connections: [],
addConn: function (connection) {
const { visitorId, interactiveNonce } = connection.res.req.query;

if (
this.connections.some(
({ res: existingConnection }) =>
Expand All @@ -70,24 +194,36 @@ const redisObj = {
} else {
this.connections.push(connection);
}
console.log(`Connection ${interactiveNonce} added. Length is ${this.connections.length}`);
if (process.env.NODE_ENV === "development") {
console.log(`Connection ${interactiveNonce} added. Length is ${this.connections.length}`);
}
},
deleteConn: function () {
// Remove inactive connections older than 30 minutes
// Remove inactive connections older than 15 minutes
this.connections = this.connections.filter(({ res, lastHeartbeatTime }) => {
const isActive = lastHeartbeatTime > Date.now() - 15 * 60 * 1000;
if (!isActive) {
if (!isActive && process.env.NODE_ENV === "development") {
console.log(`Connection to ${res.req.query.interactiveNonce} deleted`);
}
return isActive;
});
},
};

redisObj.publisher.connect();
redisObj.subscriber.connect();
// Initialize connections and subscription with proper sequencing
async function initRedis() {
try {
await redisObj.publisher.connect();
await redisObj.subscriber.connect();
// Subscribe only after connections are established
redisObj.subscribe(`${process.env.INTERACTIVE_KEY}_JUKEBOX`);
} catch (err) {
console.error("[Redis] Initialization error:", err);
}
}

redisObj.subscribe(`${process.env.INTERACTIVE_KEY}_JUKEBOX`);
// Kick off initialization (top-level)
initRedis();

redisObj.publisher.on("error", (err) => console.error("Publisher Error", err));
redisObj.subscriber.on("error", (err) => console.error("Subscriber Error", err));
Expand Down
Loading