Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Local configuration files
macroliquidity_local.ini

.DS_Store
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
Expand Down Expand Up @@ -143,4 +143,4 @@ cython_debug/
.idea
HARK/netlogo*
Hark/NetLogo
.vscode
.vscode
115 changes: 115 additions & 0 deletions cloud/AzureMessageBus_MarketClass/AZReceiverClass2.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
using System.Threading.Tasks;
using System.Text;
using Azure.Messaging.ServiceBus;
using Microsoft.Azure;
using Azure.Identity;
using Azure.Security.KeyVault.Secrets;
using System;

namespace AZRPCReceiver
{
public class Program
{
//const string CONNECTION_STR = "Endpoint=sb://sharkfinmq.servicebus.windows.net/;SharedAccessKeyName=brokerDev;SharedAccessKey=ethbk8ZhqarR89v455j9psqsBPPa/CcjdtxZY7+3vVw=";
static public string CONNECTION_STR = Get_RPC_ConnectionString("sharkfinkv");
//const string SIMULATION_ID = "chum4000";
public ServiceBusClient? client;
public ServiceBusProcessor? processor;

//int RequestCounter = 0;
public static async Task Main(string[] args)
{
//string finalFlag = "";
var options = new ServiceBusProcessorOptions
{
AutoCompleteMessages = false,
MaxConcurrentCalls = 1
};
string SIMULATION_ID = args[0];
string clientQueueName = ($"{SIMULATION_ID}_requests");
string responseQueueName = ($"{SIMULATION_ID}_responses");
Console.WriteLine("The current time is " + DateTime.Now);
Console.WriteLine("Configuring AMMPS RPC Listenter...");
Console.WriteLine($"The current simulation_ID is: {SIMULATION_ID}");
Console.WriteLine($"The Request queue is: {clientQueueName} and the response queue name is {responseQueueName}");
Console.WriteLine($"Attempting to conect AMMPS to the Azure Service Bus Endpoint:{CONNECTION_STR}");
await using ServiceBusClient? client = new ServiceBusClient(CONNECTION_STR);
await using ServiceBusProcessor processor = client.CreateProcessor(clientQueueName, options);
processor.ProcessMessageAsync += MessageHandler;
processor.ProcessErrorAsync += ErrorHandler;
await processor.StartProcessingAsync();
//listener will process until it receives the final message
//await processor.StopProcessingAsync();
Console.ReadLine();
}

public static async Task MessageHandler(ProcessMessageEventArgs args)
{
string body = args.Message.Body.ToString();
string mesgID = args.Message.MessageId.ToString();
string replyTo = args.Message.ReplyTo;
var finalMessage = args.Message.ApplicationProperties["finalMessage"].ToString();
string coorelationID = mesgID;
Console.WriteLine($"Connected to Azure Service Bus....Listening for requests....received message ID: {mesgID}");
Console.WriteLine($"Message body contents: {body}");
Console.WriteLine($"Has simulation completed? {finalMessage}");
// there is where we add logic to process request and send response.
Console.WriteLine($"Processing response using coorolationID: {coorelationID}");
Console.WriteLine($"Attempting to send response to the following replyTo address: {replyTo}");
//need to add and 'if' statement to determin when Simulations is complete and exit the program
await ReplyRPCMessage(body,replyTo, mesgID);
//code to complete message.
await args.CompleteMessageAsync(args.Message);
//else exit program

if (finalMessage == "yes")
{
Console.WriteLine("Processed final Message. Closing Listener");
}

}

public static Task ErrorHandler(ProcessErrorEventArgs args)
{
Console.WriteLine("-----------------------------");
// the fully qualified namespace is available
Console.WriteLine(args.FullyQualifiedNamespace);
// the error source tells me at what point in the processing an error occurred
Console.WriteLine(args.ErrorSource);
//entity path
Console.WriteLine(args.EntityPath);
//Console.WriteLine(args.Exception.ToString());
Console.WriteLine("-----------------------------");
Console.WriteLine("-----------------------------");
//processor.StopProcessingAsync();
return Task.CompletedTask;
}
public static async Task ReplyRPCMessage(string mesgBody, string responseQueueName, string mesgID)
{
await using ServiceBusClient? client = new ServiceBusClient(CONNECTION_STR);
ServiceBusSender sender = client.CreateSender(responseQueueName);
ServiceBusMessage message = new ServiceBusMessage(mesgBody);
message.CorrelationId=mesgID;
string resposeMessageID= message.MessageId;
Console.WriteLine($"Connected to Azure Service Bus....Sending AMMPS response message with ID {resposeMessageID} with coorolationID {message.CorrelationId}");
await sender.SendMessageAsync(message);
}


public static string Get_RPC_ConnectionString(string keyVaultName)
{
string keyVaultUri = ($"https://{keyVaultName}.vault.azure.net");
Console.WriteLine($"Retrieving Service Bus Endpoint from Azure KeyVault: {keyVaultUri}");
//var credential = new InteractiveBrowserCredential();
DefaultAzureCredential? credential = new DefaultAzureCredential();
var client = new SecretClient(new Uri(keyVaultUri), credential);
var retreived_ConnectionString_Secret = client.GetSecret(name:"sharkfinMQconnectionstring");
var sharkfinMQconnectionstringValue = retreived_ConnectionString_Secret.Value;
string cstring = "";
cstring = sharkfinMQconnectionstringValue.Value;
return cstring;
}

}

}
215 changes: 215 additions & 0 deletions cloud/AzureMessageBus_MarketClass/AZ_RPC.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"id": "5deef6c9",
"metadata": {},
"outputs": [],
"source": [
"#from sharkfin.utilities import *\n",
"#from sharkfin.markets import AbstractMarket\n",
"import numpy as np\n",
"import json\n",
"import uuid\n",
"import time\n",
"import os\n",
"import uuid\n",
"from azure.servicebus import ServiceBusClient, ServiceBusMessage\n",
"from azure.servicebus.management import ServiceBusAdministrationClient\n",
"from azure.keyvault.secrets import SecretClient\n",
"from azure.identity import DefaultAzureCredential\n",
"\n",
"CONNECTION_STR = 'Endpoint=sb://sharkfinmq.servicebus.windows.net/;SharedAccessKeyName=brokerDev;SharedAccessKey=ethbk8ZhqarR89v455j9psqsBPPa/CcjdtxZY7+3vVw='\n"
]
},
{
"cell_type": "code",
"execution_count": 4,
"id": "0214518d",
"metadata": {},
"outputs": [],
"source": [
"class ClientRPCMarket():\n",
" def __init__(self, seed_limit='', simulation_id='', host=CONNECTION_STR):\n",
" self.simulation_price_scale = 1\n",
" self.default_sim_price = 100\n",
" seed_limit = None\n",
" self.sample = 0\n",
" self.seeds = []\n",
" self.seed_limit = seed_limit\n",
" self.latest_price = None\n",
" self.prices = [self.default_sim_price]\n",
" self.rpc_simulation_id = simulation_id\n",
" self.rpc_send_queue_name = simulation_id +'sq'\n",
" self.rpc_response_queue_name = simulation_id +'rq'\n",
" self.rpc_host_name = host\n",
" self.connection_str = host\n",
" self.init_az_rpc()\n",
" self.create_queues()\n",
" \n",
" \n",
" #method to initialize Azure Service Bus Clients for messageing and managment\n",
" def init_az_rpc(self):\n",
" self.service_bus_mgmt_client = ServiceBusAdministrationClient.from_connection_string(self.connection_str)\n",
" self.service_bus_message_client = ServiceBusClient.from_connection_string(conn_str=self.connection_str, logging_enable=True)\n",
" \n",
"\n",
" #method to create the required sending and response queues for RPC pattern\n",
" def create_queues(self):\n",
" #create queue for sharkfin to send daily values\n",
" self.service_bus_mgmt_client.create_queue(self.rpc_send_queue_name, max_delivery_count=10, dead_lettering_on_message_expiration=True)\n",
" #create queue for amps response - \n",
" self.service_bus_mgmt_client.create_queue(self.rpc_response_queue_name, max_delivery_count=10, dead_lettering_on_message_expiration=True)\n",
" \n",
" #method to instanciate a well-formed service bus message. requires passing json object as msg_body parameter\n",
" def new_rpc_message(self,msg_body):\n",
" msgdata = json.dumps(msg_body)\n",
" self.service_bus_message = ServiceBusMessage(\n",
" msgdata,\n",
" session_id = self.rpc_session_id,\n",
" reply_to = self.rpc_response_queue_name,\n",
" reply_to_session_id = self.rpc_session_id,\n",
" application_properties = {'placeholdermetadata': 'custom_data_example_if_needed'})\n",
"\n",
" #method to send a service bus message\n",
" def send_rpc_message(self):\n",
" sender = self.service_bus_message_client.get_queue_sender(queue_name=self.rpc_send_queue_name)\n",
" result = sender.send_messages(self.service_bus_message)\n",
" #self.coorelation_id = result.correlation_id\n",
" #self.message_id = result.message_id\n",
" #print (f\"Sent RPC message with ID: {self.message_id} to consumer queue {self.rpc_send_queue_name} await reply into response queue: {self.rpc_response_queue_name}...\")\n",
" print (f\"Sent RPC message to consumer queue {self.rpc_send_queue_name} await reply into response queue: {self.rpc_response_queue_name}...\")\n",
" return result\n",
" \n",
" def get_rpc_response(self, service_bus_client, qname, coorelation_id):\n",
" receiver = service_bus_client.get_queue_receiver(queue_name=self.rpc_response_queue_name, max_wait_time=5)\n",
" for msg in receiver:\n",
" if self.coorelation_id == msg.correlation_id:\n",
" self.response = body\n",
" print(\"Received: \" + str(msg))\n",
" receiver.complete_message(msg)\n",
" "
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "830cfa43",
"metadata": {},
"outputs": [],
"source": [
"testMarket1 = ClientRPCMarket(4,'chum3004')"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "fa145d73",
"metadata": {},
"outputs": [],
"source": [
"testMarket1 = ClientRPCMarket(4,'chum3004')\n",
"testMarket1.__dict__\n",
"msgdata = {'seed': 2034, 'bl': 120, 'sl': 110, 'end_simulation': False}\n",
"msgdata = json.dumps(msgdata)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "e2bebd32",
"metadata": {},
"outputs": [],
"source": [
"msgdata = {'seed': 2034, 'bl': 120, 'sl': 110, 'end_simulation': False}\n",
"msgdata = json.dumps(msgdata)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "82642196",
"metadata": {},
"outputs": [],
"source": [
"type(msgdata)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "b167300b",
"metadata": {},
"outputs": [],
"source": [
"testMarket1.new_rpc_message(msgdata)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "2bc4e1a5",
"metadata": {},
"outputs": [],
"source": [
"testMarket1.send_rpc_message()"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "6406f0ba",
"metadata": {},
"outputs": [],
"source": [
"result"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "b0e32a13",
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "markdown",
"id": "aa8af920",
"metadata": {},
"source": [
" Using the Azure Service Bus to perform a basic request/reply pattern is describe below:\n",
" A publisher sends a message into a queue and expects a reply from the message consumer.\n",
" To receive the reply, the publisher owns a queue into which it expects replies to be delivered. \n",
" The address of that queue is expressed in the ReplyTo property of the outbound message. \n",
" When the consumer responds, it copies the MessageId of the handled message into \n",
" the CorrelationId property of the reply message and delivers the message to the destination\n",
" indicated by the ReplyTo property. One message can yield multiple replies, \n",
" depending on the application context.\n",
" https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-messages-payloads#message-routing-and-correlation"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.9.9"
}
},
"nbformat": 4,
"nbformat_minor": 5
}
Loading