Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions apps/organization/apps.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,23 @@
from django.apps import AppConfig

_events_initialized = False


class OrganizationConfig(AppConfig):
default_auto_field = "django.db.models.BigAutoField"
name = "apps.organization"

def ready(self):
global _events_initialized
if _events_initialized:
return

from utils.event_publisher import EventPublisher

publisher = EventPublisher()
connection, _ = publisher.setup_org_event()
if connection:
connection.close()

publisher.start_discovery_listener()
_events_initialized = True
12 changes: 7 additions & 5 deletions bootstrap_service/management/commands/init_organization.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,11 @@ def _provision_rabbitmq(self, provisioner, org_id, org_slug):

def _publish_org_event(self, org_id, org_slug, org_name, result):
"""Publish organization created event."""
now_time = timezone.now().strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z"
publish_org_event(
"org.created",
str(uuid.uuid4()),
timezone.now().isoformat(),
now_time,
{
"id": org_id,
"slug": org_slug,
Expand All @@ -96,8 +97,8 @@ def _publish_org_event(self, org_id, org_slug, org_name, result):
"transformed_queue", f"{org_slug}.transformed.data.queue"
),
"is_active": True,
"created_at": timezone.now().isoformat(),
"updated_at": timezone.now().isoformat(),
"created_at": now_time,
"updated_at": now_time,
},
)

Expand Down Expand Up @@ -171,15 +172,16 @@ def _delete_organization(self, provisioner, organization):
org_slug = organization.slug_name
org_id = organization.id
vhost_name = organization.rabbitmq_vhost
now_time = timezone.now().strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z"

publish_org_event(
"org.deleted",
str(uuid.uuid4()),
timezone.now().isoformat(),
now_time,
{
"id": str(org_id),
"slug": org_slug,
"deleted_at": timezone.now().isoformat(),
"deleted_at": now_time,
},
)

Expand Down
142 changes: 141 additions & 1 deletion utils/event_publisher.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,33 @@
# Copyright 2026 Digital Fortress.

# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at

# http://www.apache.org/licenses/LICENSE-2.0

# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import json
import logging
import threading
import time
import uuid

import pika
from django.conf import settings
from django.db import close_old_connections
from django.utils import timezone

logger = logging.getLogger(__name__)

_listener_lock = threading.Lock()
_listener_started = False


class EventPublisher:
"""Publishes organization lifecycle events to RabbitMQ"""
Expand All @@ -24,7 +46,7 @@ def __init__(self):
self.events_routing_key = settings.ORG_EVENTS_ROUTING_KEY
self._listener_thread = None

def setup_org_events(self):
def setup_org_event(self):
"""
Create events queue for Transformer and Broker Bridge services
"""
Expand Down Expand Up @@ -81,6 +103,124 @@ def setup_org_events(self):
logger.error(f"Failed to create event connection: {e}")
return None, None

def start_discovery_listener(self):
"""Start background listener for discovery requests"""
global _listener_started

with _listener_lock:
if _listener_started:
return
_listener_started = True

threading.Thread(
target=self._listen_for_discovery_requests,
name="OrgDiscoveryListener",
daemon=True,
).start()

logger.info("Discovery listener started for spacedf organization")

def _listen_for_discovery_requests(self):
"""Listen and respond to discovery requests with org.created events"""
while True:
connection = None
try:
close_old_connections()
connection = pika.BlockingConnection(
pika.URLParameters(self.rabbitmq_url)
)
channel = connection.channel()

# Setup discovery queue
channel.exchange_declare(
exchange=self.events_exchange, exchange_type="topic", durable=True
)
channel.queue_declare(
self.console_queue_name, False, True, False, False, None
)
channel.queue_bind(
exchange=self.events_exchange,
queue=self.console_queue_name,
routing_key=self.discovery_routing_key,
)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(
queue=self.console_queue_name,
on_message_callback=self._handle_discovery_request,
auto_ack=False,
)

logger.info("Listening on queue '%s'", self.console_queue_name)
channel.start_consuming()

except Exception as e:
logger.exception("Discovery listener error: %s", e)
time.sleep(5)
finally:
close_old_connections()
if connection and connection.is_open:
connection.close()

def _handle_discovery_request(self, channel, method, _properties, body):
"""Handle discovery request and publish org.created for spacedf"""
from common.rabitmq.rabbitmq_provisioner import RabbitMQProvisioner

from apps.organization.models import Organization

close_old_connections()

try:
request = json.loads(body.decode("utf-8"))
reply_to = request.get("reply_to")

org = Organization.objects.filter(slug_name="spacedf").first()
if not org:
channel.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
return

# Build and publish org.created
provisioner = RabbitMQProvisioner()
envelope = {
"event_id": str(uuid.uuid4()),
"event_type": "org.created",
"timestamp": timezone.now().strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z",
"payload": {
"id": str(org.id),
"slug": org.slug_name,
"name": org.name,
"vhost": org.rabbitmq_vhost,
"amqp_url": provisioner.build_tenant_amqp_url(org.rabbitmq_vhost),
"exchange": f"{org.slug_name}.exchange",
"transformer_queue": f"{org.slug_name}.transformer.queue",
"transformed_queue": f"{org.slug_name}.transformed.data.queue",
"is_active": org.is_active,
"created_at": org.created_at.isoformat()
if hasattr(org, "created_at")
else None,
"updated_at": org.updated_at.isoformat()
if hasattr(org, "updated_at")
else None,
},
}

channel.basic_publish(
exchange="" if reply_to else self.events_exchange,
routing_key=reply_to or "org.created",
body=json.dumps(envelope),
properties=pika.BasicProperties(
content_type="application/json", delivery_mode=2
),
)

channel.basic_ack(delivery_tag=method.delivery_tag)

except Exception as e:
logger.exception("Discovery error: %s", e)
channel.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
finally:
close_old_connections()

def publish_event(
self, event_type: str, event_id: str, timestamp: str, payload: dict
) -> bool:
Expand Down