Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5,529 changes: 2,832 additions & 2,697 deletions binaries/Scs.XML

Large diffs are not rendered by default.

Binary file modified binaries/Scs.dll
Binary file not shown.
33 changes: 33 additions & 0 deletions src/Scs/Communication/Scs/Client/Pipes/NamedPipeClient.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
using Hik.Communication.Scs.Communication.Channels;
using Hik.Communication.Scs.Communication.Channels.Pipes;
using Hik.Communication.Scs.Communication.EndPoints.Pipes;

namespace Hik.Communication.Scs.Client.Pipes
{
/// <summary>
/// Named pipe implementation of <see cref="IScsClient"/>.
/// </summary>
internal class NamedPipeClient : ScsClientBase
{
private readonly NamedPipeEndPoint _endPoint;

/// <summary>
/// Initializes a new instance of the <see cref="NamedPipeClient"/> class.
/// </summary>
/// <param name="endPoint">The end point.</param>
public NamedPipeClient(NamedPipeEndPoint endPoint)
{
_endPoint = endPoint;
}

#region Overrides of ScsClientBase

/// <inheritdoc />
protected override ICommunicationChannel CreateCommunicationChannel()
{
return new NamedPipeCommunicationChannel(_endPoint);
}

#endregion
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
using System;
using System.IO;
using System.IO.Pipes;
using System.Threading;
using Hik.Communication.Scs.Communication.EndPoints;
using Hik.Communication.Scs.Communication.EndPoints.Pipes;
using Hik.Communication.Scs.Communication.Messages;

namespace Hik.Communication.Scs.Communication.Channels.Pipes
{
/// <summary>
/// Named pipe implementation of <see cref="ICommunicationChannel"/>.
/// </summary>
internal class NamedPipeCommunicationChannel : CommunicationChannelBase
{
#region Constants

private const int CONNECT_ATTEMPT_INTERVAL_MS = 100;

#endregion

#region Private fields

/// <summary>
/// The end point.
/// </summary>
private readonly NamedPipeEndPoint _endPoint;

/// <summary>
/// Size of the buffer that is used to receive bytes from TCP socket.
/// </summary>
private const int ReceiveBufferSize = 4 * 1024; //4KB

/// <summary>
/// This buffer is used to receive bytes
/// </summary>
private readonly byte[] _buffer;

/// <summary>
/// Socket object to send/receive messages.
/// </summary>
private readonly PipeStream _pipeStream;

/// <summary>
/// A flag to control thread's running
/// </summary>
private volatile bool _running;

/// <summary>
/// This object is just used for thread synchronizing (locking).
/// </summary>
private readonly object _syncLock;

#endregion

/// <summary>
/// Initializes a new instance of the <see cref="NamedPipeCommunicationChannel" /> class.
/// </summary>
/// <param name="endPoint">The end point.</param>
/// <param name="pipeStream">An existing pipe stream or <c>null</c> to create client.</param>
public NamedPipeCommunicationChannel(NamedPipeEndPoint endPoint, PipeStream pipeStream = null)
{
_endPoint = endPoint;

if (pipeStream != null) _pipeStream = pipeStream;
else
{
var client = new NamedPipeClientStream(".", endPoint.Name, PipeDirection.InOut, PipeOptions.Asynchronous | PipeOptions.WriteThrough);

// connect overload with timeout is extremely processor intensive
int elapsed = 0, connectionTimeout = endPoint.ConnectionTimeout * 1000;

CONNECT:
try
{
client.Connect(0);
}
catch (TimeoutException)
{
Thread.Sleep(CONNECT_ATTEMPT_INTERVAL_MS);

if (endPoint.ConnectionTimeout != Timeout.Infinite && (elapsed += CONNECT_ATTEMPT_INTERVAL_MS) > connectionTimeout)
throw new TimeoutException("The host failed to connect. Timeout occurred.");

goto CONNECT;
}

_pipeStream = client;
}

_buffer = new byte[ReceiveBufferSize];
_syncLock = new object();
}

#region Overrides of CommunicationChannelBase

/// <inheritdoc />
public override ScsEndPoint RemoteEndPoint
{
get { return _endPoint; }
}

/// <inheritdoc />
public override void Disconnect()
{
if (CommunicationState != CommunicationStates.Connected)
{
return;
}

_running = false;
try
{
_pipeStream.Close();
_pipeStream.Dispose();
}
catch { }

CommunicationState = CommunicationStates.Disconnected;
OnDisconnected();
}

/// <inheritdoc />
protected override void StartInternal()
{
_running = true;
_pipeStream.BeginRead(_buffer, 0, _buffer.Length, ReceiveCallback, null);
}

/// <inheritdoc />
protected override void SendMessageInternal(IScsMessage message)
{
//Send message
lock (_syncLock)
{
//Create a byte array from message according to current protocol
var messageBytes = WireProtocol.GetBytes(message);

try
{
// send bytes
_pipeStream.Write(messageBytes, 0, messageBytes.Length);
}
catch (IOException exception)
{
throw new CommunicationException("Failed to send message.", exception);
}

LastSentMessageTime = DateTime.Now;
OnMessageSent(message);
}
}

#endregion

#region Private Methods

/// <summary>
/// Receive callback for <see cref="_pipeStream"/>.
/// </summary>
/// <param name="ar">The async result.</param>
private void ReceiveCallback(IAsyncResult ar)
{
if (!_running)
{
return;
}

try
{
//Get received bytes count
var bytesRead = _pipeStream.EndRead(ar);
if (bytesRead > 0)
{
LastReceivedMessageTime = DateTime.Now;

//Copy received bytes to a new byte array
var receivedBytes = new byte[bytesRead];
Array.Copy(_buffer, 0, receivedBytes, 0, bytesRead);

//Read messages according to current wire protocol
var messages = WireProtocol.CreateMessages(receivedBytes);

//Raise MessageReceived event for all received messages
foreach (var message in messages)
{
OnMessageReceived(message);
}
}
else
{
throw new CommunicationException("Named pipe is closed.");
}

//Read more bytes if still running
if (_running)
{
_pipeStream.BeginRead(_buffer, 0, _buffer.Length, ReceiveCallback, null);
}
}
catch
{
Disconnect();
}
}

#endregion
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
using System;
using System.Collections.Generic;
using System.IO.Pipes;
using System.Threading;
using Hik.Communication.Scs.Communication.EndPoints.Pipes;

namespace Hik.Communication.Scs.Communication.Channels.Pipes
{
internal class NamedPipeConnectionListener : ConnectionListenerBase
{
#region Constants

private const int MAX_SERVER_INSTANCES = 16;

#endregion

#region Private fields

private readonly NamedPipeEndPoint _endPoint;
private readonly object _syncObject;

private readonly List<NamedPipeCommunicationChannel> _openChannels;
private NamedPipeServerStream _waitingPipe;

private bool _running;

#endregion

/// <summary>
/// Initializes a new instance of the <see cref="NamedPipeConnectionListener"/> class.
/// </summary>
/// <param name="endPoint">The end point.</param>
/// <exception cref="System.NotImplementedException"></exception>
public NamedPipeConnectionListener(NamedPipeEndPoint endPoint)
{
_syncObject = new object();
_openChannels = new List<NamedPipeCommunicationChannel>();
_endPoint = endPoint;
}

#region Overrides of ConnectionListenerBase

/// <inheritdoc />
public override void Start()
{
_running = true;
OpenPipe();
}

/// <inheritdoc />
public override void Stop()
{
_running = false;

lock (_syncObject)
{
if (_waitingPipe != null)
{
try
{
_waitingPipe.Close();
_waitingPipe.Dispose();
_waitingPipe = null;
}
catch { }
}

foreach (var channel in _openChannels)
channel.Disconnect();
}
}

#endregion

#region Private Methods

private void ChannelDisconnected(object sender, EventArgs eventArgs)
{
if (!_running) return;

// remove disconnected channel from list
lock (_syncObject) _openChannels.Remove((NamedPipeCommunicationChannel)sender);
}

/// <summary>
/// Background listening thread.
/// </summary>
private void OpenPipe()
{
while (_running)
{
lock (_syncObject)
{
try
{
_waitingPipe = new NamedPipeServerStream(_endPoint.Name, PipeDirection.InOut, MAX_SERVER_INSTANCES, PipeTransmissionMode.Byte, PipeOptions.Asynchronous | PipeOptions.WriteThrough);

_waitingPipe.BeginWaitForConnection(ar =>
{
try
{
NamedPipeCommunicationChannel channel;

lock (_syncObject)
{
_waitingPipe.EndWaitForConnection(ar);
channel = new NamedPipeCommunicationChannel(_endPoint, _waitingPipe);
_waitingPipe = null;
}

channel.Disconnected += ChannelDisconnected;
OnCommunicationChannelConnected(channel);
}
catch (ObjectDisposedException)
{ }

// listen for next connection
if (_running) OpenPipe();
}, null);

return;
}
catch
{
if (_waitingPipe != null)
{
try
{
_waitingPipe.Dispose();
}
catch { }
_waitingPipe = null;
}
}
}

if (_running) Thread.Sleep(1000);
}
}

#endregion
}
}
Loading