Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 25 additions & 1 deletion src/bender/dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,12 @@ def init_with_context(self, context):
)
self.children.append(
modules.AppList(
'WEBSOCKET',
'STREAMS',
column=1,
collapsible=True,
models=(
'streams.models.DepthOfMarket',
'streams.models.TrainingData',
'streams.models.TaskManagement',
),
)
Expand Down Expand Up @@ -115,6 +117,28 @@ def init_with_context(self, context):
)
)

self.children.append(
modules.LinkList(
'Services',
column=3,
children=[
{
'title': 'Flower',
'url': 'http://localhost:5555/',
'external': True,
'target': '_blank',
},
{
'title': 'Kafka GUI',
'url': 'http://localhost:8080/',
'external': True,
'target': 'blank',
},
],
)
)


# self.children.append(modules.RecentActions(
# 'Recent actions',
# limit=5,
Expand Down
3 changes: 0 additions & 3 deletions src/core/clients/binance/restapi/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,6 @@


class BinanceClient(BinanceBaseRestClient):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)

# Wallet
from core.clients.binance.restapi.wallet import (
get_capital_config_getall,
Expand Down
12 changes: 6 additions & 6 deletions src/core/clients/binance/restapi/base.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,23 @@
import hashlib
import hmac
from core.utils.client_utils import get_timestamp
from typing import Tuple
from urllib.parse import urlencode

import requests
from django.conf import settings

from core.utils.client_utils import get_timestamp
from core.utils.value_utils import clean_none_value


class BinanceBaseRestClient:
timeout = 10

def __init__(self, credentials: dict, *args, **kwargs):
super().__init__(*args, **kwargs)
def __init__(self):
self.session = requests.Session()
self.uri = credentials['uri']
self.api_key = credentials['api_key']
self.secret_key = credentials['secret_key']
self.uri = settings.BINANCE_CLIENT['uri']
self.api_key = settings.BINANCE_CLIENT['api_key']
self.secret_key = settings.BINANCE_CLIENT['secret_key']

@staticmethod
def encoded_string(query):
Expand Down
15 changes: 6 additions & 9 deletions src/core/clients/kafka/kafka_client.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import json
import logging
import socket

from confluent_kafka import Consumer
from confluent_kafka import Producer
from confluent_kafka import Producer, Consumer
from django.conf import settings


class KafkaProducerClient:
def __init__(self, credentials: dict, topic: str = None):
self.bootstrap_servers = credentials['bootstrap.servers']
def __init__(self, topic: str = None):
self.bootstrap_servers = settings.KAFKA_CLIENT['bootstrap.servers']
self.client_id = socket.gethostname(),
self.topic = topic if topic else 'default'

Expand All @@ -21,14 +21,11 @@ def get_config(self):
}

def message_handler(self, _, message):
# json_data = json.loads(message)
# print(json_data)

if not isinstance(message, bytes):
message = json.dumps(message).encode('utf-8')
self.producer.produce(
topic=self.topic,
value=message,
# key=key,
# on_delivery=callback, # def callback(err, event):
)
self.producer.flush()

Expand Down
14 changes: 9 additions & 5 deletions src/core/utils/admin_utils.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
from django.contrib import messages
from django.http.response import HttpResponseRedirect
from django.urls import reverse
from django.utils.safestring import mark_safe


def redirect_to_change_list(request, model, message=None, is_ok=True):
if message:
if is_ok:
messages.success(request, message)
def redirect_to_change_list(request, model, message=None):
msg = message[0]
if isinstance(msg, list):
msg = mark_safe('<br>'.join(msg))
if msg:
if message[1]:
messages.success(request, msg)
else:
messages.warning(request, message)
messages.warning(request, msg)
meta = model._meta
url = reverse(f'admin:{meta.app_label}_{meta.model_name}_changelist')
return HttpResponseRedirect(url)
5 changes: 3 additions & 2 deletions src/market_data/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,9 @@ def get_urls(self):

def update_exchange_info(self, request, symbol=None, symbols=None, permissions=None):
result, is_ok = self.model.get_update(symbol, symbols, permissions)
message = f'Обновили {result} записей' if is_ok else result
return redirect_to_change_list(request, self.model, message, is_ok)
msg = f'Обновили {result} записей' if is_ok else result
message = msg, is_ok
return redirect_to_change_list(request, self.model, message)

def save_model(self, request, obj, form, change):
symbol = form.cleaned_data['symbol']
Expand Down
2 changes: 1 addition & 1 deletion src/market_data/migrations/0001_initial.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Generated by Django 4.2.3 on 2023-08-26 11:38
# Generated by Django 4.2.3 on 2023-09-10 12:54

import django.contrib.postgres.fields
import django.contrib.postgres.fields.hstore
Expand Down
2 changes: 1 addition & 1 deletion src/market_data/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ def __str__(self):

@classmethod
def get_update(cls, symbol=None, symbols=None, permissions=None):
client = BinanceClient(settings.BINANCE_CLIENT)
client = BinanceClient()
try:
result, is_ok = client.get_exchange_info(symbol, symbols, permissions) # noqa
except requests.ConnectionError as e:
Expand Down
84 changes: 83 additions & 1 deletion src/streams/admin.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,89 @@
from django.contrib import admin
from .models import TaskManagement
from .models import TaskManagement, DepthOfMarket, TrainingData
from core.utils.admin_utils import redirect_to_change_list
from django.utils.safestring import mark_safe
from streams.handlers.depth_of_market import DepthOfMarketStream, DepthOfMarketStreamError


@admin.register(TaskManagement)
class TaskManagementAdmin(admin.ModelAdmin):
list_display = ('codename', 'is_working', 'updated', 'created')


@admin.register(DepthOfMarket)
class DepthOfMarketAdmin(admin.ModelAdmin):
list_display = ('id', 'symbol', 'is_active', 'depth', 'market_glass', 'updated', 'created')
raw_id_fields = ('symbol',)
actions = ('action_run', 'action_stop')
ordering = ('-is_active', 'id')

def has_change_permission(self, request, obj=None):
return False

@admin.display(description='Market glass')
def market_glass(self, obj):
return mark_safe(f'<a href="" target="_blank">открыть</a>')

@admin.action(description='Запустить "Глубину рынка"')
def action_run(self, request, query=None):
msg = []
status = True
for dom in query:
try:
dom_stream = DepthOfMarketStream(dom.symbol.symbol, dom.depth)
dom_stream.run()
dom.is_active = True
dom.save(update_fields=['is_active'])
except DepthOfMarketStreamError as e:
msg.append(e.msg)
status = False
message = msg, status

return redirect_to_change_list(request, self.model, message)

@admin.action(description='Остановить "Глубину рынка"')
def action_stop(self, request, query=None):
msg = []
status = True
for dom in query:
try:
dom_stream = DepthOfMarketStream(dom.symbol.symbol, dom.depth)
dom_stream.stop()
dom.is_active = False
dom.save(update_fields=['is_active'])
except DepthOfMarketStreamError as e:
msg.append(e.msg)
status = False
message = msg, status

return redirect_to_change_list(request, self.model, message)


@admin.register(TrainingData)
class TrainingDataAdmin(admin.ModelAdmin):
list_display = ('id', 'depth_of_market', 'is_active', 'amount', 'depth', 'market_glass', 'updated', 'created')
actions = ('action_run', 'action_stop')

# def has_add_permission(self, request):
# return False

def has_change_permission(self, request, obj=None):
return False

@admin.display(description='Market glass')
def market_glass(self, obj):
return mark_safe(f'<a href="" target="_blank">открыть</a>')

@admin.action(description='Запустить "Тестовые данные"')
def action_run(self, request, query=None):
# symbols = [item.symbol for item in query]
query.update(is_active=True)
message = f'action_run {query}', True
return redirect_to_change_list(request, self.model, message)

@admin.action(description='Остановить "Тестовые данные"')
def action_stop(self, request, query=None):
# symbols = [item.symbol for item in query]
query.update(is_active=False)
message = f'action_stop {query}', False
return redirect_to_change_list(request, self.model, message)
1 change: 1 addition & 0 deletions src/streams/apps.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@
class StreamsConfig(AppConfig):
default_auto_field = "django.db.models.BigAutoField"
name = "streams"
verbose_name = 'Потоки'
Loading