Skip to content

Commit d2d791b

Browse files
committed
Connected Device control to event bus
1 parent 3360902 commit d2d791b

16 files changed

Lines changed: 183 additions & 22 deletions

services/common/config/event_bus.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,10 @@ export const STREAMS = { // streams instead of actions because certain services
2121
DOMAIN_CREATED: "DOMAIN_SERVICE.DOMAIN_CREATED",
2222
DOMAIN_DELETED: "DOMAIN_SERVICE.DOMAIN_DELETED",
2323
DOMAIN_UPDATED: "DOMAIN_SERVICE.DOMAIN_UPDATED",
24+
DOMAIN_USER_ADDED: "DOMAIN_SERVICE.DOMAIN_USER_ADDED", // FOR Roles - sends roles
25+
DOMAIN_USER_ROLE_UPDATED: "DOMAIN_SERVICE.DOMAIN_USER_ROLE_UPDATED", // sends patch
26+
DOMAIN_USER_UPDATED: "DOMAIN_SERVICE.DOMAIN_USER_UPDATED",// sends patch - no need for it yet
27+
DOMAIN_USER_REMOVED: "DOMAIN_SERVICE.DOMAIN_USER_REMOVED", // sends user id
2428
},
2529
DEVICE_SERVICE: {
2630
DEVICE_CREATED: "DEVICE_SERVICE.DEVICE_CREATED",
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
2+
3+
4+
import { BaseWorker } from "@services/eventbus";
5+
import { STREAMS } from "@services/common/config";
6+
import { DomainUserAddedHandler, DomainUserRemovedHandler, DomainUserRoleUpdatedHandler, UserDeletedHandler } from "./handlers";
7+
import logger from "../config/logger";
8+
export class DeviceControlServiceWorker extends BaseWorker {
9+
onCreate() {
10+
this.handler(STREAMS.DOMAIN_SERVICE.DOMAIN_CREATED, DomainUserAddedHandler) // only has to create a user role for domain owner
11+
this.handler(STREAMS.DOMAIN_SERVICE.DOMAIN_USER_ADDED, DomainUserAddedHandler) // should create a user role with the information
12+
this.handler(STREAMS.DOMAIN_SERVICE.DOMAIN_USER_REMOVED, DomainUserRemovedHandler) // should delete the user role
13+
this.handler(STREAMS.DOMAIN_SERVICE.DOMAIN_USER_ROLE_UPDATED, DomainUserRoleUpdatedHandler) // should update the user role with the new information
14+
this.handler(STREAMS.AUTH_SERVICE.USER_DELETED, UserDeletedHandler) // should delete the user role
15+
this.errorHandler((error, payload) => {
16+
logger.error({ error, payload }, "Error in DeviceControlServiceWorker:");
17+
})
18+
}
19+
20+
}
21+
22+
const worker = new DeviceControlServiceWorker();
23+
worker.start(); // this is the only way for the parent process to start the worker and begin communicating
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
import type { EventPayload } from "@services/eventbus";
2+
import { UserRoleModelInstance } from "@services/domain/src/models";
3+
import type { Role } from "@services/common/types";
4+
5+
export async function DomainCreatedHandler(message: EventPayload) {
6+
try {
7+
// get role info from payload
8+
const userRole = message?.message;
9+
if (!userRole || typeof userRole === "string" || !userRole.userId || !userRole.domainId || !userRole.role) {
10+
throw new Error("No user role information in payload");
11+
}
12+
await UserRoleModelInstance.create({ userId: userRole.userId, domainId: userRole.domainId, role: userRole.role as Role }) // create the new role in the database
13+
} catch (error) {
14+
throw new Error("Failed to process domain created event", { cause: error });
15+
}
16+
}
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
import type { EventPayload } from "@services/eventbus";
2+
import { UserRoleModelInstance } from "@services/domain/src/models";
3+
import type { Role } from "@services/common/types";
4+
5+
export async function DomainUserAddedHandler(message: EventPayload) {
6+
try {
7+
// get role info from payload
8+
const userRole = message?.message;
9+
if (!userRole || typeof userRole === "string" || !userRole.userId || !userRole.domainId || !userRole.role) {
10+
throw new Error("No user role information in payload");
11+
}
12+
await UserRoleModelInstance.create({ userId: userRole.userId, domainId: userRole.domainId, role: userRole.role as Role }) // create the new role in the database
13+
} catch (error) {
14+
throw new Error("Failed to process domain user added event", { cause: error });
15+
}
16+
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
import type { EventPayload } from "@services/eventbus";
2+
import { UserRoleModelInstance } from "@services/domain/src/models";
3+
export async function DomainUserRemovedHandler(message: EventPayload) {
4+
try {
5+
const userDomainInfo = message?.message;
6+
if (!userDomainInfo || typeof userDomainInfo === "string" || !userDomainInfo.userId || !userDomainInfo.domainId) {
7+
throw new Error("No user domain information in payload");
8+
}
9+
await UserRoleModelInstance.delete(userDomainInfo.userId, userDomainInfo.domainId) // delete the role associated with the deleted user in the database - this is necessary to ensure that if the user is re-added to the domain later, they don't have any lingering permissions from their previous membership
10+
} catch (error) {
11+
throw new Error("Failed to process domain user removed event", { cause: error });
12+
}
13+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
import type { EventPayload } from "@services/eventbus";
2+
import { UserRoleModelInstance } from "../../models";
3+
import type { Role } from "@services/common/types";
4+
5+
export async function DomainUserRoleUpdatedHandler(message: EventPayload) {
6+
try {
7+
// get role info from payload
8+
const userRoleUpdate = message?.message;
9+
if (!userRoleUpdate || typeof userRoleUpdate === "string" || !userRoleUpdate.userId || !userRoleUpdate.domainId || !userRoleUpdate.role) {
10+
throw new Error("No user role update information in payload");
11+
}
12+
// find the existing role in the database and update it with the new information
13+
// this is a bit redundant since the domain service already updates the role, but it ensures that the device control service has the most up-to-date information and can react to it if needed (for example, if a user is demoted from admin to user, we might want to immediately revoke their access to certain devices or features)
14+
await UserRoleModelInstance.updateRole(userRoleUpdate.userId, userRoleUpdate.domainId, userRoleUpdate.role.toUpperCase() as Role)
15+
} catch (error) {
16+
throw new Error("Failed to process domain user role updated event", { cause: error });
17+
}
18+
}
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
export * from "./domain_user_added_handler";
2+
export * from "./domain_user_role_updated_handler";
3+
export * from "./domain_user_removed_handler";
4+
export * from "./user_deleted_handler";
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
import { type EventPayload } from "@services/eventbus";
2+
import { UserRoleModelInstance } from "../../models";
3+
4+
export async function UserDeletedHandler(message: EventPayload): Promise<void> {
5+
try {
6+
const { id } = message!.message;
7+
if (!id) {
8+
throw new Error("Missing required fields in message");
9+
}
10+
const [_, err] = await UserRoleModelInstance.deleteByUserId(id);
11+
if (err) {
12+
throw new Error("Error deleting user roles: ", { cause: err });
13+
}
14+
} catch (err) {
15+
throw new Error("Failed to process user deleted event: ", { cause: err });
16+
}
17+
}

services/devicecontrol/src/models/user_roles_model.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,15 @@ export class UserRoleModel extends MongoAssociationModel<IUserRole> {
9191
return [true, null]
9292
}, externalSession)
9393
}
94+
async deleteByUserId(userId: string, externalSession?: ClientSession): Promise<Result<boolean>> {
95+
return await this.transactionWrap(async (session) => {
96+
const result = await this.model.updateMany({ userId, deletedAt: null }, { deletedAt: new Date() }, { session }).exec()
97+
if (result.modifiedCount === 0) {
98+
return [null, new Error("No user roles found for deletion")]
99+
}
100+
return [true, null]
101+
}, externalSession)
102+
}
94103

95104
}
96105

services/domain/src/bus/handlers/user_deleted_handler.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { type EventPayload } from "@services/eventbus";
2-
import { ProfileModelInstance, DomainModelInstance } from "../../models";
2+
import { ProfileModelInstance, DomainModelInstance, UserRoleModelInstance } from "../../models";
33

44
export default async function HandleUserDeleted(message: EventPayload): Promise<void> {
55
try {
@@ -10,6 +10,7 @@ export default async function HandleUserDeleted(message: EventPayload): Promise<
1010
await DomainModelInstance.multiTableTransaction(async (conn) => {
1111
await DomainModelInstance.deleteByOwnerId(id, conn)
1212
await ProfileModelInstance.delete(id, conn)
13+
await UserRoleModelInstance.deleteByUserId(id, conn)
1314
});
1415
} catch (err) {
1516
throw new Error("Error handling USER_DELETED event: ", { cause: err });

0 commit comments

Comments
 (0)