diff --git a/apps/organization/apps.py b/apps/organization/apps.py index ff01435..3fd0054 100644 --- a/apps/organization/apps.py +++ b/apps/organization/apps.py @@ -1,6 +1,23 @@ from django.apps import AppConfig +_events_initialized = False + class OrganizationConfig(AppConfig): default_auto_field = "django.db.models.BigAutoField" name = "apps.organization" + + def ready(self): + global _events_initialized + if _events_initialized: + return + + from utils.event_publisher import EventPublisher + + publisher = EventPublisher() + connection, _ = publisher.setup_org_event() + if connection: + connection.close() + + publisher.start_discovery_listener() + _events_initialized = True diff --git a/bootstrap_service/management/commands/init_organization.py b/bootstrap_service/management/commands/init_organization.py index 337bb19..3e029db 100644 --- a/bootstrap_service/management/commands/init_organization.py +++ b/bootstrap_service/management/commands/init_organization.py @@ -78,10 +78,11 @@ def _provision_rabbitmq(self, provisioner, org_id, org_slug): def _publish_org_event(self, org_id, org_slug, org_name, result): """Publish organization created event.""" + now_time = timezone.now().strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z" publish_org_event( "org.created", str(uuid.uuid4()), - timezone.now().isoformat(), + now_time, { "id": org_id, "slug": org_slug, @@ -96,8 +97,8 @@ def _publish_org_event(self, org_id, org_slug, org_name, result): "transformed_queue", f"{org_slug}.transformed.data.queue" ), "is_active": True, - "created_at": timezone.now().isoformat(), - "updated_at": timezone.now().isoformat(), + "created_at": now_time, + "updated_at": now_time, }, ) @@ -171,15 +172,16 @@ def _delete_organization(self, provisioner, organization): org_slug = organization.slug_name org_id = organization.id vhost_name = organization.rabbitmq_vhost + now_time = timezone.now().strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z" publish_org_event( "org.deleted", str(uuid.uuid4()), - timezone.now().isoformat(), + now_time, { "id": str(org_id), "slug": org_slug, - "deleted_at": timezone.now().isoformat(), + "deleted_at": now_time, }, ) diff --git a/utils/event_publisher.py b/utils/event_publisher.py index c48fbb3..18feeca 100644 --- a/utils/event_publisher.py +++ b/utils/event_publisher.py @@ -1,11 +1,33 @@ +# Copyright 2026 Digital Fortress. + +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + import json import logging +import threading +import time +import uuid import pika from django.conf import settings +from django.db import close_old_connections +from django.utils import timezone logger = logging.getLogger(__name__) +_listener_lock = threading.Lock() +_listener_started = False + class EventPublisher: """Publishes organization lifecycle events to RabbitMQ""" @@ -24,7 +46,7 @@ def __init__(self): self.events_routing_key = settings.ORG_EVENTS_ROUTING_KEY self._listener_thread = None - def setup_org_events(self): + def setup_org_event(self): """ Create events queue for Transformer and Broker Bridge services """ @@ -81,6 +103,124 @@ def setup_org_events(self): logger.error(f"Failed to create event connection: {e}") return None, None + def start_discovery_listener(self): + """Start background listener for discovery requests""" + global _listener_started + + with _listener_lock: + if _listener_started: + return + _listener_started = True + + threading.Thread( + target=self._listen_for_discovery_requests, + name="OrgDiscoveryListener", + daemon=True, + ).start() + + logger.info("Discovery listener started for spacedf organization") + + def _listen_for_discovery_requests(self): + """Listen and respond to discovery requests with org.created events""" + while True: + connection = None + try: + close_old_connections() + connection = pika.BlockingConnection( + pika.URLParameters(self.rabbitmq_url) + ) + channel = connection.channel() + + # Setup discovery queue + channel.exchange_declare( + exchange=self.events_exchange, exchange_type="topic", durable=True + ) + channel.queue_declare( + self.console_queue_name, False, True, False, False, None + ) + channel.queue_bind( + exchange=self.events_exchange, + queue=self.console_queue_name, + routing_key=self.discovery_routing_key, + ) + + channel.basic_qos(prefetch_count=1) + channel.basic_consume( + queue=self.console_queue_name, + on_message_callback=self._handle_discovery_request, + auto_ack=False, + ) + + logger.info("Listening on queue '%s'", self.console_queue_name) + channel.start_consuming() + + except Exception as e: + logger.exception("Discovery listener error: %s", e) + time.sleep(5) + finally: + close_old_connections() + if connection and connection.is_open: + connection.close() + + def _handle_discovery_request(self, channel, method, _properties, body): + """Handle discovery request and publish org.created for spacedf""" + from common.rabitmq.rabbitmq_provisioner import RabbitMQProvisioner + + from apps.organization.models import Organization + + close_old_connections() + + try: + request = json.loads(body.decode("utf-8")) + reply_to = request.get("reply_to") + + org = Organization.objects.filter(slug_name="spacedf").first() + if not org: + channel.basic_nack(delivery_tag=method.delivery_tag, requeue=False) + return + + # Build and publish org.created + provisioner = RabbitMQProvisioner() + envelope = { + "event_id": str(uuid.uuid4()), + "event_type": "org.created", + "timestamp": timezone.now().strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z", + "payload": { + "id": str(org.id), + "slug": org.slug_name, + "name": org.name, + "vhost": org.rabbitmq_vhost, + "amqp_url": provisioner.build_tenant_amqp_url(org.rabbitmq_vhost), + "exchange": f"{org.slug_name}.exchange", + "transformer_queue": f"{org.slug_name}.transformer.queue", + "transformed_queue": f"{org.slug_name}.transformed.data.queue", + "is_active": org.is_active, + "created_at": org.created_at.isoformat() + if hasattr(org, "created_at") + else None, + "updated_at": org.updated_at.isoformat() + if hasattr(org, "updated_at") + else None, + }, + } + + channel.basic_publish( + exchange="" if reply_to else self.events_exchange, + routing_key=reply_to or "org.created", + body=json.dumps(envelope), + properties=pika.BasicProperties( + content_type="application/json", delivery_mode=2 + ), + ) + + channel.basic_ack(delivery_tag=method.delivery_tag) + + except Exception as e: + logger.exception("Discovery error: %s", e) + channel.basic_nack(delivery_tag=method.delivery_tag, requeue=True) + finally: + close_old_connections() + def publish_event( self, event_type: str, event_id: str, timestamp: str, payload: dict ) -> bool: