diff --git a/src/bender/dashboard.py b/src/bender/dashboard.py index 2940ae8..bf40e48 100644 --- a/src/bender/dashboard.py +++ b/src/bender/dashboard.py @@ -35,10 +35,12 @@ def init_with_context(self, context): ) self.children.append( modules.AppList( - 'WEBSOCKET', + 'STREAMS', column=1, collapsible=True, models=( + 'streams.models.DepthOfMarket', + 'streams.models.TrainingData', 'streams.models.TaskManagement', ), ) @@ -115,6 +117,28 @@ def init_with_context(self, context): ) ) + self.children.append( + modules.LinkList( + 'Services', + column=3, + children=[ + { + 'title': 'Flower', + 'url': 'http://localhost:5555/', + 'external': True, + 'target': '_blank', + }, + { + 'title': 'Kafka GUI', + 'url': 'http://localhost:8080/', + 'external': True, + 'target': 'blank', + }, + ], + ) + ) + + # self.children.append(modules.RecentActions( # 'Recent actions', # limit=5, diff --git a/src/core/clients/binance/restapi/__init__.py b/src/core/clients/binance/restapi/__init__.py index d889346..8325224 100644 --- a/src/core/clients/binance/restapi/__init__.py +++ b/src/core/clients/binance/restapi/__init__.py @@ -2,9 +2,6 @@ class BinanceClient(BinanceBaseRestClient): - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - # Wallet from core.clients.binance.restapi.wallet import ( get_capital_config_getall, diff --git a/src/core/clients/binance/restapi/base.py b/src/core/clients/binance/restapi/base.py index 844f492..c85c9cc 100644 --- a/src/core/clients/binance/restapi/base.py +++ b/src/core/clients/binance/restapi/base.py @@ -1,23 +1,23 @@ import hashlib import hmac -from core.utils.client_utils import get_timestamp from typing import Tuple from urllib.parse import urlencode import requests +from django.conf import settings +from core.utils.client_utils import get_timestamp from core.utils.value_utils import clean_none_value class BinanceBaseRestClient: timeout = 10 - def __init__(self, credentials: dict, *args, **kwargs): - super().__init__(*args, **kwargs) + def __init__(self): self.session = requests.Session() - self.uri = credentials['uri'] - self.api_key = credentials['api_key'] - self.secret_key = credentials['secret_key'] + self.uri = settings.BINANCE_CLIENT['uri'] + self.api_key = settings.BINANCE_CLIENT['api_key'] + self.secret_key = settings.BINANCE_CLIENT['secret_key'] @staticmethod def encoded_string(query): diff --git a/src/core/clients/kafka/kafka_client.py b/src/core/clients/kafka/kafka_client.py index 5e61b4b..a453796 100644 --- a/src/core/clients/kafka/kafka_client.py +++ b/src/core/clients/kafka/kafka_client.py @@ -1,14 +1,14 @@ +import json import logging import socket -from confluent_kafka import Consumer -from confluent_kafka import Producer +from confluent_kafka import Producer, Consumer from django.conf import settings class KafkaProducerClient: - def __init__(self, credentials: dict, topic: str = None): - self.bootstrap_servers = credentials['bootstrap.servers'] + def __init__(self, topic: str = None): + self.bootstrap_servers = settings.KAFKA_CLIENT['bootstrap.servers'] self.client_id = socket.gethostname(), self.topic = topic if topic else 'default' @@ -21,14 +21,11 @@ def get_config(self): } def message_handler(self, _, message): - # json_data = json.loads(message) - # print(json_data) - + if not isinstance(message, bytes): + message = json.dumps(message).encode('utf-8') self.producer.produce( topic=self.topic, value=message, - # key=key, - # on_delivery=callback, # def callback(err, event): ) self.producer.flush() diff --git a/src/core/utils/admin_utils.py b/src/core/utils/admin_utils.py index 214b7e5..fa9d26f 100644 --- a/src/core/utils/admin_utils.py +++ b/src/core/utils/admin_utils.py @@ -1,14 +1,18 @@ from django.contrib import messages from django.http.response import HttpResponseRedirect from django.urls import reverse +from django.utils.safestring import mark_safe -def redirect_to_change_list(request, model, message=None, is_ok=True): - if message: - if is_ok: - messages.success(request, message) +def redirect_to_change_list(request, model, message=None): + msg = message[0] + if isinstance(msg, list): + msg = mark_safe('
'.join(msg)) + if msg: + if message[1]: + messages.success(request, msg) else: - messages.warning(request, message) + messages.warning(request, msg) meta = model._meta url = reverse(f'admin:{meta.app_label}_{meta.model_name}_changelist') return HttpResponseRedirect(url) diff --git a/src/market_data/admin.py b/src/market_data/admin.py index 6c86580..01b9bed 100644 --- a/src/market_data/admin.py +++ b/src/market_data/admin.py @@ -40,8 +40,9 @@ def get_urls(self): def update_exchange_info(self, request, symbol=None, symbols=None, permissions=None): result, is_ok = self.model.get_update(symbol, symbols, permissions) - message = f'Обновили {result} записей' if is_ok else result - return redirect_to_change_list(request, self.model, message, is_ok) + msg = f'Обновили {result} записей' if is_ok else result + message = msg, is_ok + return redirect_to_change_list(request, self.model, message) def save_model(self, request, obj, form, change): symbol = form.cleaned_data['symbol'] diff --git a/src/market_data/migrations/0001_initial.py b/src/market_data/migrations/0001_initial.py index 462104f..75005fa 100644 --- a/src/market_data/migrations/0001_initial.py +++ b/src/market_data/migrations/0001_initial.py @@ -1,4 +1,4 @@ -# Generated by Django 4.2.3 on 2023-08-26 11:38 +# Generated by Django 4.2.3 on 2023-09-10 12:54 import django.contrib.postgres.fields import django.contrib.postgres.fields.hstore diff --git a/src/market_data/models.py b/src/market_data/models.py index 3b5b7db..c4bbf3e 100644 --- a/src/market_data/models.py +++ b/src/market_data/models.py @@ -127,7 +127,7 @@ def __str__(self): @classmethod def get_update(cls, symbol=None, symbols=None, permissions=None): - client = BinanceClient(settings.BINANCE_CLIENT) + client = BinanceClient() try: result, is_ok = client.get_exchange_info(symbol, symbols, permissions) # noqa except requests.ConnectionError as e: diff --git a/src/streams/admin.py b/src/streams/admin.py index 88c4ba6..d8b0e77 100644 --- a/src/streams/admin.py +++ b/src/streams/admin.py @@ -1,7 +1,89 @@ from django.contrib import admin -from .models import TaskManagement +from .models import TaskManagement, DepthOfMarket, TrainingData +from core.utils.admin_utils import redirect_to_change_list +from django.utils.safestring import mark_safe +from streams.handlers.depth_of_market import DepthOfMarketStream, DepthOfMarketStreamError @admin.register(TaskManagement) class TaskManagementAdmin(admin.ModelAdmin): list_display = ('codename', 'is_working', 'updated', 'created') + + +@admin.register(DepthOfMarket) +class DepthOfMarketAdmin(admin.ModelAdmin): + list_display = ('id', 'symbol', 'is_active', 'depth', 'market_glass', 'updated', 'created') + raw_id_fields = ('symbol',) + actions = ('action_run', 'action_stop') + ordering = ('-is_active', 'id') + + def has_change_permission(self, request, obj=None): + return False + + @admin.display(description='Market glass') + def market_glass(self, obj): + return mark_safe(f'открыть') + + @admin.action(description='Запустить "Глубину рынка"') + def action_run(self, request, query=None): + msg = [] + status = True + for dom in query: + try: + dom_stream = DepthOfMarketStream(dom.symbol.symbol, dom.depth) + dom_stream.run() + dom.is_active = True + dom.save(update_fields=['is_active']) + except DepthOfMarketStreamError as e: + msg.append(e.msg) + status = False + message = msg, status + + return redirect_to_change_list(request, self.model, message) + + @admin.action(description='Остановить "Глубину рынка"') + def action_stop(self, request, query=None): + msg = [] + status = True + for dom in query: + try: + dom_stream = DepthOfMarketStream(dom.symbol.symbol, dom.depth) + dom_stream.stop() + dom.is_active = False + dom.save(update_fields=['is_active']) + except DepthOfMarketStreamError as e: + msg.append(e.msg) + status = False + message = msg, status + + return redirect_to_change_list(request, self.model, message) + + +@admin.register(TrainingData) +class TrainingDataAdmin(admin.ModelAdmin): + list_display = ('id', 'depth_of_market', 'is_active', 'amount', 'depth', 'market_glass', 'updated', 'created') + actions = ('action_run', 'action_stop') + + # def has_add_permission(self, request): + # return False + + def has_change_permission(self, request, obj=None): + return False + + @admin.display(description='Market glass') + def market_glass(self, obj): + return mark_safe(f'открыть') + + @admin.action(description='Запустить "Тестовые данные"') + def action_run(self, request, query=None): + # symbols = [item.symbol for item in query] + query.update(is_active=True) + message = f'action_run {query}', True + return redirect_to_change_list(request, self.model, message) + + @admin.action(description='Остановить "Тестовые данные"') + def action_stop(self, request, query=None): + # symbols = [item.symbol for item in query] + query.update(is_active=False) + message = f'action_stop {query}', False + return redirect_to_change_list(request, self.model, message) \ No newline at end of file diff --git a/src/streams/apps.py b/src/streams/apps.py index 718fd78..307c27b 100644 --- a/src/streams/apps.py +++ b/src/streams/apps.py @@ -4,3 +4,4 @@ class StreamsConfig(AppConfig): default_auto_field = "django.db.models.BigAutoField" name = "streams" + verbose_name = 'Потоки' diff --git a/src/streams/handlers/depth_of_market.py b/src/streams/handlers/depth_of_market.py index 922373f..c9ecd9a 100644 --- a/src/streams/handlers/depth_of_market.py +++ b/src/streams/handlers/depth_of_market.py @@ -1,84 +1,95 @@ +import json +import logging import time -from streams.models import TaskManagement -from streams.tasks import task_diff_book_depth -from core.clients.redis_client import RedisClient -from core.clients.binance.restapi import BinanceClient -from django.conf import settings import requests -import logging -import json -from core.clients.kafka.kafka_client import KafkaConsumerClient +from django.conf import settings + +from core.clients.binance.restapi import BinanceClient +from core.clients.kafka.kafka_client import ( + KafkaProducerClient, + KafkaConsumerClient, +) +from core.clients.redis_client import RedisClient +from streams.models import TaskManagement +from streams.tasks import task_websoket_management -class WebSoketError(Exception): - # Сбой стакана - pass +class DepthOfMarketStreamError(Exception): + def __init__(self, msg): + self.msg = msg + + +class DepthOfMarketStream: + def __init__(self, symbol: str, depth: int, logger: logging = None): + self.symbol = symbol + self.depth = depth + + self.codename_websocket_task = f'diff_book_depth_{symbol}_{depth}' -class DepthOfMarket: - def __init__(self, logger: logging = None): self.is_start_to_redis = False # флаг, устанавливается когда начинается запись стакана в Redis self.redis_conn = RedisClient() self.redis_conn.flushall() - self.binance_client = BinanceClient(settings.BINANCE_CLIENT) - self.kafka_client = KafkaConsumerClient() + self.binance_client = BinanceClient() + self.kafka_consumer_client = KafkaConsumerClient() + self.kafka_producer_client = KafkaProducerClient(topic=self.codename_websocket_task) if not logger: self.logger = logging.getLogger(__name__) - def run(self, symbol: str, depth: int = 100): - self.logger.info( - 'Depth Of Market runned: symbol = %s, depth = %s', symbol, depth - ) - - if self._websocket_start(symbol): - self.logger.info('Websocket started..') - time.sleep(5) + def run(self): + self._websocket_start() + time.sleep(1) - if last_update_id := self._get_snapshot(symbol, depth): - self.logger.info('Snapshot received..') + if last_update_id := self._get_snapshot(): + print(last_update_id) + # self._process(last_update_id, symbol) - if self._process(last_update_id, symbol): - self.logger.info('Process started.') + def stop(self): + self._websocket_stop() - def stop(self, symbol: str): - if self._websocket_stop(symbol): - self.logger.info( - 'Websocket stopped: symbol = %s', symbol - ) - - def _websocket_start(self, symbol: str): - codename = f'diff_book_depth_{symbol}'.lower() - task_management, _ = TaskManagement.objects.get_or_create(codename=codename) + def _websocket_start(self): + task_management, _ = TaskManagement.objects.get_or_create(codename=self.codename_websocket_task) if task_management.is_working: - # TODO LOG уже запущена - return + raise DepthOfMarketStreamError(f'Websocket уже запущен, codename = {self.codename_websocket_task}') task_management.is_working = True task_management.save(update_fields=['is_working']) - task_diff_book_depth.delay(symbol) + task_websoket_management.delay('diff_book_depth', self.codename_websocket_task, symbol=self.symbol) + self.logger.info('Websocket is running: codename = %s', self.codename_websocket_task) return True - def _websocket_stop(self, symbol: str): - codename = f'diff_book_depth_{symbol}'.lower() + def _websocket_stop(self): try: - task_management = TaskManagement.objects.get(codename=codename) + task_management = TaskManagement.objects.get(codename=self.codename_websocket_task) task_management.is_working = False task_management.save(update_fields=['is_working']) - return True except TaskManagement.DoesNotExist: - return + raise DepthOfMarketStreamError(f'Websocket не найден, codename = {self.codename_websocket_task}') + + self.logger.info('Websocket stopped: codename = %s', self.codename_websocket_task) + return True - def _get_snapshot(self, symbol, depth) -> int | requests.ConnectionError: + def _get_snapshot(self) -> int | DepthOfMarketStreamError: try: - result, is_ok = self.binance_client.get_order_book(symbol, depth) + result, is_ok = self.binance_client.get_order_book(self.symbol, self.depth) except requests.ConnectionError as e: - return e + raise DepthOfMarketStreamError(f'Snapshot не получен. Error: {e}') + if is_ok: - self._poll_redis(result, 'asks', 'ask') - self._poll_redis(result, 'bids', 'bid') - return result.get('lastUpdateId') + self.kafka_producer_client.message_handler(None, message=result) + self.logger.info('Snapshot received: codename = %s', self.codename_websocket_task) + return result.get('lastUpdateId') + + raise DepthOfMarketStreamError('Snapshot не получен, is_ok = False') + + + # if is_ok: + # self._poll_redis(result, 'asks', 'ask') + # self._poll_redis(result, 'bids', 'bid') + + def _consumer_message_handler(self, message, prev_message=None, last_update_id=None): # print(message, last_update_id) @@ -109,7 +120,7 @@ def _consumer_message_handler(self, message, prev_message=None, last_update_id=N # raise WebSoketError('Depth Of Market failed!') def _process(self, last_update_id, symbol): - self.kafka_client.get_topic(symbol, self._consumer_message_handler, last_update_id) + self.kafka_consumer_client.get_topic(symbol, self._consumer_message_handler, last_update_id) def _poll_redis(self, data: dict, lookup_field: str, redis_key: str): print(data.get(lookup_field), lookup_field, redis_key) diff --git a/src/streams/migrations/0001_initial.py b/src/streams/migrations/0001_initial.py index 6e5c487..41fe9c4 100644 --- a/src/streams/migrations/0001_initial.py +++ b/src/streams/migrations/0001_initial.py @@ -1,16 +1,19 @@ -# Generated by Django 4.2.3 on 2023-08-28 16:36 +# Generated by Django 4.2.3 on 2023-09-10 12:54 from django.db import migrations, models +import django.db.models.deletion class Migration(migrations.Migration): initial = True - dependencies = [] + dependencies = [ + ("market_data", "0001_initial"), + ] operations = [ migrations.CreateModel( - name="StreamTest", + name="DepthOfMarket", fields=[ ( "id", @@ -37,11 +40,118 @@ class Migration(migrations.Migration): auto_now_add=True, verbose_name="Дата создания" ), ), - ("result", models.TextField(verbose_name="result")), + ( + "is_active", + models.BooleanField(default=False, verbose_name="Status"), + ), + ( + "depth", + models.PositiveSmallIntegerField(default=100, verbose_name="Depth"), + ), + ( + "symbol", + models.ForeignKey( + on_delete=django.db.models.deletion.CASCADE, + to="market_data.exchangeinfo", + verbose_name="Symbol", + ), + ), + ], + options={ + "verbose_name": "Глубина рынка", + "verbose_name_plural": "Глубина рынка", + }, + ), + migrations.CreateModel( + name="TaskManagement", + fields=[ + ( + "server_time", + models.BigIntegerField( + blank=True, default=None, null=True, verbose_name="serverTime" + ), + ), + ( + "updated", + models.DateTimeField(auto_now=True, verbose_name="Дата обновления"), + ), + ( + "created", + models.DateTimeField( + auto_now_add=True, verbose_name="Дата создания" + ), + ), + ( + "codename", + models.CharField( + max_length=150, + primary_key=True, + serialize=False, + verbose_name="codename", + ), + ), + ( + "is_working", + models.BooleanField(default=False, verbose_name="is_working"), + ), + ], + options={ + "verbose_name": "Task management", + "verbose_name_plural": "Tasks management", + }, + ), + migrations.CreateModel( + name="TrainingData", + fields=[ + ( + "id", + models.BigAutoField( + auto_created=True, + primary_key=True, + serialize=False, + verbose_name="ID", + ), + ), + ( + "server_time", + models.BigIntegerField( + blank=True, default=None, null=True, verbose_name="serverTime" + ), + ), + ( + "updated", + models.DateTimeField(auto_now=True, verbose_name="Дата обновления"), + ), + ( + "created", + models.DateTimeField( + auto_now_add=True, verbose_name="Дата создания" + ), + ), + ( + "is_active", + models.BooleanField(default=False, verbose_name="Status"), + ), + ( + "amount", + models.PositiveIntegerField(default=0, verbose_name="Amount"), + ), + ( + "depth", + models.PositiveSmallIntegerField(default=100, verbose_name="Depth"), + ), + ( + "depth_of_market", + models.ForeignKey( + on_delete=django.db.models.deletion.CASCADE, + to="streams.depthofmarket", + verbose_name="Depth of market", + ), + ), ], options={ - "verbose_name": "StreamTest", - "verbose_name_plural": "StreamTest", + "verbose_name": "Тестовые данные", + "verbose_name_plural": "Тестовые данные", }, ), ] diff --git a/src/streams/migrations/0002_alter_trainingdata_index_together.py b/src/streams/migrations/0002_alter_trainingdata_index_together.py new file mode 100644 index 0000000..9240967 --- /dev/null +++ b/src/streams/migrations/0002_alter_trainingdata_index_together.py @@ -0,0 +1,16 @@ +# Generated by Django 4.2.3 on 2023-09-10 21:59 + +from django.db import migrations + + +class Migration(migrations.Migration): + dependencies = [ + ("streams", "0001_initial"), + ] + + operations = [ + migrations.AlterIndexTogether( + name="trainingdata", + index_together={("depth_of_market", "depth")}, + ), + ] diff --git a/src/streams/migrations/0002_commonvar.py b/src/streams/migrations/0002_commonvar.py deleted file mode 100644 index bc705be..0000000 --- a/src/streams/migrations/0002_commonvar.py +++ /dev/null @@ -1,35 +0,0 @@ -# Generated by Django 4.2.3 on 2023-09-02 12:02 - -from django.db import migrations, models - - -class Migration(migrations.Migration): - dependencies = [ - ("streams", "0001_initial"), - ] - - operations = [ - migrations.CreateModel( - name="CommonVar", - fields=[ - ( - "id", - models.BigAutoField( - auto_created=True, - primary_key=True, - serialize=False, - verbose_name="ID", - ), - ), - ("codename", models.CharField(max_length=100, verbose_name="codename")), - ( - "value", - models.CharField(blank=True, max_length=100, verbose_name="value"), - ), - ], - options={ - "verbose_name": "Базовая настройка", - "verbose_name_plural": "Базовые настройки", - }, - ), - ] diff --git a/src/streams/migrations/0003_alter_trainingdata_index_together_and_more.py b/src/streams/migrations/0003_alter_trainingdata_index_together_and_more.py new file mode 100644 index 0000000..d66757e --- /dev/null +++ b/src/streams/migrations/0003_alter_trainingdata_index_together_and_more.py @@ -0,0 +1,20 @@ +# Generated by Django 4.2.3 on 2023-09-10 22:00 + +from django.db import migrations + + +class Migration(migrations.Migration): + dependencies = [ + ("streams", "0002_alter_trainingdata_index_together"), + ] + + operations = [ + migrations.AlterIndexTogether( + name="trainingdata", + index_together=set(), + ), + migrations.AlterUniqueTogether( + name="trainingdata", + unique_together={("depth_of_market", "depth")}, + ), + ] diff --git a/src/streams/migrations/0003_taskmanagement_delete_commonvar_delete_streamtest.py b/src/streams/migrations/0003_taskmanagement_delete_commonvar_delete_streamtest.py deleted file mode 100644 index 95b29f7..0000000 --- a/src/streams/migrations/0003_taskmanagement_delete_commonvar_delete_streamtest.py +++ /dev/null @@ -1,60 +0,0 @@ -# Generated by Django 4.2.3 on 2023-09-02 15:54 - -from django.db import migrations, models -import uuid - - -class Migration(migrations.Migration): - dependencies = [ - ("streams", "0002_commonvar"), - ] - - operations = [ - migrations.CreateModel( - name="TaskManagement", - fields=[ - ( - "server_time", - models.BigIntegerField( - blank=True, default=None, null=True, verbose_name="serverTime" - ), - ), - ( - "updated", - models.DateTimeField(auto_now=True, verbose_name="Дата обновления"), - ), - ( - "created", - models.DateTimeField( - auto_now_add=True, verbose_name="Дата создания" - ), - ), - ( - "id", - models.UUIDField( - default=uuid.uuid4, - editable=False, - primary_key=True, - serialize=False, - ), - ), - ( - "codename", - models.CharField( - db_index=True, max_length=150, verbose_name="codename" - ), - ), - ("is_work", models.BooleanField(default=True, verbose_name="is_work")), - ], - options={ - "verbose_name": "Управление задачей", - "verbose_name_plural": "Управление задачами", - }, - ), - migrations.DeleteModel( - name="CommonVar", - ), - migrations.DeleteModel( - name="StreamTest", - ), - ] diff --git a/src/streams/migrations/0004_remove_taskmanagement_id_and_more.py b/src/streams/migrations/0004_remove_taskmanagement_id_and_more.py deleted file mode 100644 index eef84d7..0000000 --- a/src/streams/migrations/0004_remove_taskmanagement_id_and_more.py +++ /dev/null @@ -1,26 +0,0 @@ -# Generated by Django 4.2.3 on 2023-09-02 16:33 - -from django.db import migrations, models - - -class Migration(migrations.Migration): - dependencies = [ - ("streams", "0003_taskmanagement_delete_commonvar_delete_streamtest"), - ] - - operations = [ - migrations.RemoveField( - model_name="taskmanagement", - name="id", - ), - migrations.AlterField( - model_name="taskmanagement", - name="codename", - field=models.CharField( - max_length=150, - primary_key=True, - serialize=False, - verbose_name="codename", - ), - ), - ] diff --git a/src/streams/migrations/0005_remove_taskmanagement_is_work_and_more.py b/src/streams/migrations/0005_remove_taskmanagement_is_work_and_more.py deleted file mode 100644 index 90de278..0000000 --- a/src/streams/migrations/0005_remove_taskmanagement_is_work_and_more.py +++ /dev/null @@ -1,21 +0,0 @@ -# Generated by Django 4.2.3 on 2023-09-02 16:39 - -from django.db import migrations, models - - -class Migration(migrations.Migration): - dependencies = [ - ("streams", "0004_remove_taskmanagement_id_and_more"), - ] - - operations = [ - migrations.RemoveField( - model_name="taskmanagement", - name="is_work", - ), - migrations.AddField( - model_name="taskmanagement", - name="is_working", - field=models.BooleanField(default=False, verbose_name="is_working"), - ), - ] diff --git a/src/streams/models.py b/src/streams/models.py index a8ef002..831f30c 100644 --- a/src/streams/models.py +++ b/src/streams/models.py @@ -1,6 +1,7 @@ from django.db import models from core.utils.db_utils import BaseModel import uuid +from market_data.models import ExchangeInfo class TaskManagement(BaseModel): @@ -15,8 +16,58 @@ class TaskManagement(BaseModel): ) class Meta: - verbose_name = 'Управление задачей' - verbose_name_plural = 'Управление задачами' + verbose_name = 'Task management' + verbose_name_plural = 'Tasks management' def __str__(self): return self.codename + + +class DepthOfMarket(BaseModel): + symbol = models.ForeignKey( + ExchangeInfo, on_delete=models.CASCADE, + verbose_name='Symbol', + ) + is_active = models.BooleanField( + verbose_name='Status', + default=False, + ) + depth = models.PositiveSmallIntegerField( + verbose_name='Depth', + default=100, + ) + + class Meta: + verbose_name = 'Глубина рынка' + verbose_name_plural = 'Глубина рынка' + + def __str__(self): + return self.symbol.symbol + + +class TrainingData(BaseModel): + depth_of_market = models.ForeignKey( + DepthOfMarket, on_delete=models.CASCADE, + verbose_name='Depth of market', + ) + is_active = models.BooleanField( + verbose_name='Status', + default=False, + ) + amount = models.PositiveIntegerField( + verbose_name='Amount', + default=0, + ) + depth = models.PositiveSmallIntegerField( + verbose_name='Depth', + default=100, + ) + class Meta: + verbose_name = 'Тестовые данные' + verbose_name_plural = 'Тестовые данные' + unique_together = [ + ('depth_of_market', 'depth'), + ] + + def __str__(self): + return self.depth_of_market.symbol.symbol diff --git a/src/streams/tasks.py b/src/streams/tasks.py index 27dae51..8b5e953 100644 --- a/src/streams/tasks.py +++ b/src/streams/tasks.py @@ -1,7 +1,5 @@ import time -from django.conf import settings - from bender.celery_entry import app from core.clients.binance.websocket.spot.websocket_stream import SpotWebsocketStreamClient from core.clients.kafka.kafka_client import KafkaProducerClient @@ -9,14 +7,13 @@ @app.task(bind=True) -def task_diff_book_depth(self, symbol: str): - kafka_client = KafkaProducerClient(settings.KAFKA_CLIENT, topic=symbol) +def task_websoket_management(self, method: str, codename: str, *args, **kwargs): + kafka_client = KafkaProducerClient(topic=codename) my_client = SpotWebsocketStreamClient(on_message=kafka_client.message_handler) - my_client.diff_book_depth(symbol=symbol) + getattr(my_client, method)(*args, **kwargs) is_working = True while is_working: - codename = f'diff_book_depth_{symbol}'.lower() is_working = TaskManagement.objects.only('is_working').get(codename=codename).is_working time.sleep(1) diff --git a/src/streams/views.py b/src/streams/views.py index b706039..3862c59 100644 --- a/src/streams/views.py +++ b/src/streams/views.py @@ -5,7 +5,7 @@ from django.views.generic.base import View from core.clients.redis_client import RedisClient -from streams.handlers.depth_of_market import DepthOfMarket +from streams.handlers.depth_of_market import DepthOfMarketStream class DepthOfMarketView(View): @@ -15,7 +15,7 @@ def get(self, request, *args, **kwargs): if not (symbol := request.GET.get('symbol')): symbol = 'BTCUSDT' - depth_of_market = DepthOfMarket() + depth_of_market = DepthOfMarketStream() if action == 'start': depth_of_market.run(symbol) elif action == 'stop': diff --git a/src/wallet/admin.py b/src/wallet/admin.py index 103b94e..cf23120 100644 --- a/src/wallet/admin.py +++ b/src/wallet/admin.py @@ -48,8 +48,9 @@ def get_urls(self): def update_capital_config_getall(self, request): result, is_ok = self.model.get_update() - message = f'Обновили {result} записей' if is_ok else result - return redirect_to_change_list(request, self.model, message, is_ok) + msg = f'Обновили {result} записей' if is_ok else result + message = msg, is_ok + return redirect_to_change_list(request, self.model, message) # from binance.spot import Spot # client = Spot( @@ -102,13 +103,14 @@ def get_urls(self): def update_trade_fee(self, request, symbol=None): result, is_ok = self.model.get_update(symbol) - message = f'Обновили {result} записей' if is_ok else result - return redirect_to_change_list(request, self.model, message, is_ok) + msg = f'Обновили {result} записей' if is_ok else result + message = msg, is_ok + return redirect_to_change_list(request, self.model, message) @admin.action(description='Обновить') def action_update_trade_fee(self, request, symbol_query=None): if symbol_query.count() > 1: - message = 'Выборочное обновление только по 1 строке' - return redirect_to_change_list(request, self.model, message, False) + message = 'Выборочное обновление только по 1 строке', False + return redirect_to_change_list(request, self.model, message) self.update_trade_fee(request, symbol_query[0].symbol) diff --git a/src/wallet/migrations/0001_initial.py b/src/wallet/migrations/0001_initial.py index a4d2518..9fffe16 100644 --- a/src/wallet/migrations/0001_initial.py +++ b/src/wallet/migrations/0001_initial.py @@ -1,4 +1,4 @@ -# Generated by Django 4.2.3 on 2023-08-26 11:38 +# Generated by Django 4.2.3 on 2023-09-10 12:54 from django.db import migrations, models diff --git a/src/wallet/models.py b/src/wallet/models.py index d0b8d70..c033fc1 100644 --- a/src/wallet/models.py +++ b/src/wallet/models.py @@ -73,7 +73,7 @@ def __str__(self): @classmethod def get_update(cls): - client = BinanceClient(settings.BINANCE_CLIENT) + client = BinanceClient() try: result, is_ok = client.get_capital_config_getall() # noqa except requests.ConnectionError as e: @@ -130,7 +130,7 @@ def __str__(self): @classmethod def get_update(cls, symbol=None): - client = BinanceClient(settings.BINANCE_CLIENT) + client = BinanceClient() try: result, is_ok = client.get_trade_fee(symbol) # noqa except requests.ConnectionError as e: