Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions packets/log_header.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ def appname(self, appname: str) -> None:
def packet_type(self) -> PacketType:
return PacketType.LOG_HEADER

@property
def values(self):
return self._values

def add_value(self, key: str, value: str) -> None:
if not isinstance(key, str):
raise TypeError("key must be an str")
Expand Down
12 changes: 9 additions & 3 deletions protocols/cloud/cloud_protocol.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import re
import collections
import logging
import re
import threading
import uuid
from datetime import datetime
Expand All @@ -11,10 +11,10 @@
from packets.log_header import LogHeader
from packets.packet import Packet
from packets.packet_type import PacketType
from protocols.cloud.chunk import Chunk
from protocols.cloud.exceptions import *
from protocols.cloud.scheduled_executor import ScheduledExecutor
from protocols.tcp_protocol import TcpProtocol
from protocols.cloud.exceptions import *
from protocols.cloud.chunk import Chunk

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -317,7 +317,13 @@ def _do_rotate_virtual_file_id(self) -> None:
self._virtual_file_id = uuid.uuid4()
self._virtual_file_size = 0

logger.debug("Composing new log header at virtual file rotation")
log_header = self._compose_log_header_packet()
logger.debug(
"New log header id {} NOT stored as protocol's reconnect log header and is written (into the queue)".format(
log_header.values.get("virtualfileid")))

# self._reconnect_log_header = log_header
super().write_packet(log_header)

def connect(self) -> None:
Expand Down
71 changes: 59 additions & 12 deletions protocols/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
import logging
import threading
import time
import typing
from typing import Optional

from common.events.error_event import ErrorEvent
from common.exceptions import ProtocolError, SmartInspectError
Expand All @@ -18,6 +20,7 @@
from packets.log_header import LogHeader
from packets.packet import Packet
from packets.packet_queue import PacketQueue
from packets.packet_type import PacketType
from scheduler.scheduler import Scheduler
from scheduler.scheduler_action import SchedulerAction
from scheduler.scheduler_command import SchedulerCommand
Expand All @@ -44,6 +47,7 @@ def __init__(self):
self.__initialized = False
self.__failed = False
self.__backlog_enabled = False
self._reconnect_log_header = None

def __create_options(self, options: str) -> None:
try:
Expand Down Expand Up @@ -141,15 +145,25 @@ def _compose_log_header_packet(self) -> LogHeader:

return log_header

def _internal_write_log_header(self) -> None:
log_header = self._compose_log_header_packet()
logger.debug("Writing LogHeader with values %s", log_header._values.items())
self._internal_write_packet(log_header)
# def _internal_write_log_header(self, connect_log_header: typing.Optional[LogHeader] = None) -> None:
# if connect_log_header is None and self._reconnect_log_header is None:
# log_header = self._compose_log_header_packet()
# elif connect_log_header:
# log_header = connect_log_header
#
# self._internal_write_packet(log_header)

def _internal_write_connect_log_header(self, connect_log_header: typing.Optional[LogHeader] = None) -> None:
if connect_log_header is None:
logger.debug("Connection log header is None, composing a new one")
connect_log_header = self._compose_log_header_packet()
logger.debug("Writing log header with vf_id {}".format(connect_log_header.values.get("virtualfileid")))
self._internal_write_packet(connect_log_header)

def _internal_write_packet(self, packet: Packet):
pass

def _internal_connect(self):
def _internal_connect(self, connect_log_header: Optional[LogHeader] = None):
pass

@property
Expand Down Expand Up @@ -197,15 +211,28 @@ def disconnect(self) -> None:
else:
self._impl_disconnect()

def _impl_connect(self):
def _impl_connect(self, connect_log_header: Optional[LogHeader] = None):
if self.__keep_open and not self._connected:
try:
try:
self._internal_connect()
self._internal_connect(connect_log_header)
logger.debug(f"{self.__class__.__name__} connected succesfully.")

logger.debug(
"Storing successful log header with vf_id {} as reconnect log header.".format(
None if connect_log_header is None else connect_log_header.values.get("virtualfileid"))
)
self._reconnect_log_header = connect_log_header
self._connected = True
self.__failed = False
logger.debug(f"{self.__class__.__name__} connected succesfully")

except Exception as exception:
logger.debug(f"{self.__class__.__name__} failed to connect.")
logger.debug(
"Storing log header with vf_id {} used at connection as reconnect log header.".format(
None if connect_log_header is None else connect_log_header.values.get("virtualfileid"))
)
self._reconnect_log_header = connect_log_header
self._reset()
raise exception
except Exception as exception:
Expand Down Expand Up @@ -310,6 +337,11 @@ def __stop_scheduler(self):
def __schedule_connect(self) -> None:
command = SchedulerCommand()
command.action = SchedulerAction.CONNECT
log_header = self._compose_log_header_packet()
logger.debug(
"Scheduling connection. with vf_id {} composed, scheduling into SchedulerQueue".format(
log_header.values["virtualfileid"]))
command.state = log_header
self.__scheduler.schedule(command, SchedulerQueueEnd.TAIL)

def get_caption(self) -> str:
Expand Down Expand Up @@ -370,7 +402,10 @@ def remove_listener(self, listener: ProtocolListener):
self.__listeners.remove(listener)

def _internal_reconnect(self) -> bool:
self._internal_connect()
logger.debug(
"Using current reconnect log header with vf_id {} during reconnect".format(
None if self._reconnect_log_header is None else self._reconnect_log_header.values.get("virtualfileid")))
self._internal_connect(self._reconnect_log_header)
return True

def _internal_disconnect(self) -> None:
Expand All @@ -382,10 +417,17 @@ def __flush_queue(self) -> None:
self.__forward_packet(packet, False)
packet = self.__queue.pop()

def __forward_packet(self, packet: Packet, disconnect: bool) -> None:
def __forward_packet(self, packet: [Packet, LogHeader], disconnect: bool) -> None:
if not self._connected:
if not self.__keep_open:
self._internal_connect()
self._internal_connect(self._reconnect_log_header)
if packet.packet_type == PacketType.LOG_HEADER:
self._reconnect_log_header = packet
logger.debug(
"Updating reconnect log header - logheader packet with vf_id {} was forwarded.".format(
packet.values.get("virtualfileid"))
)

self._connected = True
self.__failed = False
else:
Expand All @@ -402,7 +444,7 @@ def __forward_packet(self, packet: Packet, disconnect: bool) -> None:
self._connected = False
self._internal_disconnect()

def __do_reconnect(self) -> None:
def __do_reconnect(self, connect_log_header: typing.Optional[LogHeader] = None) -> None:
if self.__reconnect_interval > 0:
tick_count = time.time() * 1000
if tick_count - self.__reconnect_tick_count < self.__reconnect_interval:
Expand All @@ -412,6 +454,11 @@ def __do_reconnect(self) -> None:
try:
if self._internal_reconnect():
self._connected = True
logger.debug(
"Reconnect successful - storing log header with vf_id {} as current reconnect log header".format(
None if connect_log_header is None else connect_log_header.values.get("virtualfileid"))
)
self._reconnect_log_header = connect_log_header
except Exception:
pass
# Reconnect exceptions are not reported,
Expand Down
6 changes: 4 additions & 2 deletions protocols/tcp_protocol.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
# Copyright (C) Code Partners Pty. Ltd. All rights reserved. #
import logging
import socket
import typing

from common.exceptions import SmartInspectError
from connections.builders import ConnectionsBuilder
from formatters.binary_formatter import BinaryFormatter
from packets.log_header import LogHeader
from packets.packet import Packet
from protocols.protocol import Protocol

Expand Down Expand Up @@ -61,7 +63,7 @@ def _send_client_banner(self) -> None:
self.__stream.write(self.__CLIENT_BANNER)
self.__stream.flush()

def _internal_connect(self):
def _internal_connect(self, connect_log_header: typing.Optional[LogHeader] = None):
try:
self.__socket = self._internal_initialize_socket()
except Exception as e:
Expand All @@ -71,7 +73,7 @@ def _internal_connect(self):

self.__stream = self.__socket.makefile("rwb", self.__BUFFER_SIZE)
self._do_handshake()
self._internal_write_log_header()
self._internal_write_connect_log_header(connect_log_header)

def _internal_initialize_socket(self) -> socket.socket:
socket_ = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
Expand Down
33 changes: 20 additions & 13 deletions scheduler/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ def run(self) -> None:
if not self.run_commands(count):
break

from protocols.cloud.cloud_protocol import CloudProtocol
if isinstance(self.parent.protocol, CloudProtocol):
from protocols.tcp_protocol import TcpProtocol
if isinstance(self.parent.protocol, TcpProtocol):

if self.consecutive_packet_write_fail_count > 0:
try:
Expand Down Expand Up @@ -65,7 +65,12 @@ def __run_command(self, command: SchedulerCommand) -> None:
# noinspection PyBroadException
try:
if action == SchedulerAction.CONNECT:
protocol._impl_connect()
connect_log_header: LogHeader = command.state
logger.debug(
"Received CONNECT command with log header vf_id {}. Using it to connect".format(
connect_log_header.values.get("virtualfileid")))

protocol._impl_connect(connect_log_header)
elif action == SchedulerAction.WRITE_PACKET:
self.__write_packet_action(command)
elif action == SchedulerAction.DISCONNECT:
Expand All @@ -83,12 +88,14 @@ def __write_packet_action(self, command):
protocol = self.parent.protocol

protocol._impl_write_packet(packet)
from protocols.cloud.cloud_protocol import CloudProtocol
if isinstance(protocol, CloudProtocol) and protocol.failed:
from protocols.tcp_protocol import TcpProtocol
if isinstance(protocol, TcpProtocol) and protocol.failed:

if not protocol.is_reconnect_allowed():
logging.debug("Reconnect is disabled, no need to requeue packet we failed to send")
return
from protocols.cloud.cloud_protocol import CloudProtocol
if isinstance(protocol, CloudProtocol) and protocol.failed:
if not protocol.is_reconnect_allowed():
logging.debug("Reconnect is disabled, no need to requeue packet we failed to send")
return

self.consecutive_packet_write_fail_count += 1
logging.debug("Sending packet failed, scheduling again to the head of the queue, "
Expand All @@ -106,20 +113,20 @@ def __write_packet_action(self, command):

class Scheduler:
__BUFFER_SIZE = 0x10
__CLOUD_PROTOCOL_BUFFER_SIZE = 0x1
__TCP_PROTOCOL_BUFFER_SIZE = 0x1

def __init__(self, protocol):
super().__init__()
self.__protocol = protocol
self.__condition = threading.Condition()
self.__queue = SchedulerQueue()

# if protocol is CloudProtocol - respective buffer size is set
from protocols.cloud.cloud_protocol import CloudProtocol
# if protocol is TcpProtocol - respective buffer size is set
from protocols.tcp_protocol import TcpProtocol
self.__buffer: List[Optional[SchedulerCommand]] = [
[None] * self.__BUFFER_SIZE,
[None] * self.__CLOUD_PROTOCOL_BUFFER_SIZE,
][isinstance(self.__protocol, CloudProtocol)]
[None] * self.__TCP_PROTOCOL_BUFFER_SIZE,
][isinstance(self.__protocol, TcpProtocol)]

self.__started: bool = False
self.__stopped: bool = False
Expand Down
5 changes: 3 additions & 2 deletions scheduler/scheduler_command.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from typing import Optional, Union

from common.protocol_command import ProtocolCommand
from scheduler.scheduler_action import SchedulerAction
from packets.log_header import LogHeader
from packets.packet import Packet
from scheduler.scheduler_action import SchedulerAction


class SchedulerCommand:
Expand All @@ -20,7 +21,7 @@ def action(self, action: SchedulerAction) -> None:
self.__action = action

@property
def state(self) -> Union[ProtocolCommand, Packet, object]:
def state(self) -> Union[ProtocolCommand, Packet, LogHeader, object]:
return self.__state

@state.setter
Expand Down