Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/netstd/Thrift/Protocol/TBinaryProtocol.cs
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ public override async ValueTask<TMessage> ReadMessageBeginAsync(CancellationToke
public override Task ReadMessageEndAsync(CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();
Transport.ResetConsumedMessageSize();
Transport.ResetMessageSizeAndConsumedBytes();
return Task.CompletedTask;
}

Expand Down
2 changes: 1 addition & 1 deletion lib/netstd/Thrift/Protocol/TCompactProtocol.cs
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,7 @@ public override async ValueTask<TMessage> ReadMessageBeginAsync(CancellationToke
public override Task ReadMessageEndAsync(CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();
Transport.ResetConsumedMessageSize();
Transport.ResetMessageSizeAndConsumedBytes();
return Task.CompletedTask;
}

Expand Down
2 changes: 1 addition & 1 deletion lib/netstd/Thrift/Protocol/TJSONProtocol.cs
Original file line number Diff line number Diff line change
Expand Up @@ -700,7 +700,7 @@ public override async Task ReadMessageEndAsync(CancellationToken cancellationTok
{
cancellationToken.ThrowIfCancellationRequested();
await ReadJsonArrayEndAsync(cancellationToken);
Transport.ResetConsumedMessageSize();
Transport.ResetMessageSizeAndConsumedBytes();
}

public override async ValueTask<TStruct> ReadStructBeginAsync(CancellationToken cancellationToken)
Expand Down
2 changes: 1 addition & 1 deletion lib/netstd/Thrift/Protocol/TProtocolDecorator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ public override async Task ReadMessageEndAsync(CancellationToken cancellationTok
{
cancellationToken.ThrowIfCancellationRequested();
await _wrappedProtocol.ReadMessageEndAsync(cancellationToken);
Transport.ResetConsumedMessageSize();
Transport.ResetMessageSizeAndConsumedBytes();
}

public override async ValueTask<TStruct> ReadStructBeginAsync(CancellationToken cancellationToken)
Expand Down
2 changes: 1 addition & 1 deletion lib/netstd/Thrift/Transport/Client/THttpTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ public override async Task FlushAsync(CancellationToken cancellationToken)
finally
{
_outputStream = new MemoryStream();
ResetConsumedMessageSize();
ResetMessageSizeAndConsumedBytes();
}
}

Expand Down
4 changes: 2 additions & 2 deletions lib/netstd/Thrift/Transport/Client/TMemoryBufferTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public void Seek(int delta, SeekOrigin origin)
throw new ArgumentException("Cannot seek outside of the valid range",nameof(origin));
Position = newPos;

ResetConsumedMessageSize();
ResetMessageSizeAndConsumedBytes();
CountConsumedMessageBytes(Position);
}

Expand Down Expand Up @@ -145,7 +145,7 @@ public override Task WriteAsync(byte[] buffer, int offset, int count, Cancellati
public override Task FlushAsync(CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();
ResetConsumedMessageSize();
ResetMessageSizeAndConsumedBytes();
return Task.CompletedTask;
}

Expand Down
4 changes: 2 additions & 2 deletions lib/netstd/Thrift/Transport/Client/TNamedPipeTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public override async Task OpenAsync(CancellationToken cancellationToken)
}

await PipeStream.ConnectAsync( ConnectTimeout, cancellationToken);
ResetConsumedMessageSize();
ResetMessageSizeAndConsumedBytes();
}

public override void Close()
Expand Down Expand Up @@ -112,7 +112,7 @@ public override async Task WriteAsync(byte[] buffer, int offset, int length, Can
public override async Task FlushAsync(CancellationToken cancellationToken)
{
await PipeStream.FlushAsync(cancellationToken);
ResetConsumedMessageSize();
ResetMessageSizeAndConsumedBytes();
}


Expand Down
23 changes: 21 additions & 2 deletions lib/netstd/Thrift/Transport/Client/TStreamTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.

using System;
using System.Drawing;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
Expand Down Expand Up @@ -46,10 +47,28 @@ protected Stream InputStream {
get => _InputStream;
set {
_InputStream = value;
ResetConsumedMessageSize();
ResetMessageSizeAndConsumedBytes(-1); // full reset to configured maximum
UpdateKnownMessageSize(-1); // adjust to real stream size
}
}

public override void UpdateKnownMessageSize(long size)
{
long adjusted = 0;

if (InputStream != null)
{
adjusted = MaxMessageSize;
if (size > 0)
adjusted = Math.Min(adjusted, size);
if( InputStream.CanSeek)
adjusted = Math.Min(adjusted, InputStream.Length);
}

base.UpdateKnownMessageSize(adjusted);
}


public override bool IsOpen => true;

public override Task OpenAsync(CancellationToken cancellationToken)
Expand Down Expand Up @@ -106,7 +125,7 @@ public override async Task WriteAsync(byte[] buffer, int offset, int length, Can
public override async Task FlushAsync(CancellationToken cancellationToken)
{
await OutputStream.FlushAsync(cancellationToken);
ResetConsumedMessageSize();
ResetMessageSizeAndConsumedBytes();
}


Expand Down
6 changes: 3 additions & 3 deletions lib/netstd/Thrift/Transport/Layered/TBufferedTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -179,10 +179,10 @@ public override void CheckReadBytesAvailable(long numBytes)
}
}

public override void ResetConsumedMessageSize(long newSize = -1)
public override void ResetMessageSizeAndConsumedBytes(long newSize = -1)
{
base.ResetConsumedMessageSize(newSize);
ReadBuffer.ResetConsumedMessageSize(newSize);
base.ResetMessageSizeAndConsumedBytes(newSize);
ReadBuffer.ResetMessageSizeAndConsumedBytes(newSize);
}

private void CheckNotDisposed()
Expand Down
6 changes: 3 additions & 3 deletions lib/netstd/Thrift/Transport/Layered/TFramedTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -191,10 +191,10 @@ protected override void Dispose(bool disposing)
IsDisposed = true;
}

public override void ResetConsumedMessageSize(long newSize = -1)
public override void ResetMessageSizeAndConsumedBytes(long newSize = -1)
{
base.ResetConsumedMessageSize(newSize);
ReadBuffer.ResetConsumedMessageSize(newSize);
base.ResetMessageSizeAndConsumedBytes(newSize);
ReadBuffer.ResetMessageSizeAndConsumedBytes(newSize);
}
}
}
4 changes: 2 additions & 2 deletions lib/netstd/Thrift/Transport/Layered/TLayeredTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ public override void CheckReadBytesAvailable(long numBytes)
InnerTransport.CheckReadBytesAvailable(numBytes);
}

public override void ResetConsumedMessageSize(long newSize = -1)
public override void ResetMessageSizeAndConsumedBytes(long newSize = -1)
{
InnerTransport.ResetConsumedMessageSize(newSize);
InnerTransport.ResetMessageSizeAndConsumedBytes(newSize);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ public override async Task WriteAsync(byte[] buffer, int offset, int length, Can
public override async Task FlushAsync(CancellationToken cancellationToken)
{
await PipeStream.FlushAsync(cancellationToken);
ResetConsumedMessageSize();
ResetMessageSizeAndConsumedBytes();
}

protected override void Dispose(bool disposing)
Expand Down
8 changes: 4 additions & 4 deletions lib/netstd/Thrift/Transport/TEndpointTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,13 @@ public TEndpointTransport( TConfiguration config)
_configuration = config ?? new TConfiguration();
Debug.Assert(Configuration != null);

ResetConsumedMessageSize();
ResetMessageSizeAndConsumedBytes();
}

/// <summary>
/// Resets RemainingMessageSize to the configured maximum
/// </summary>
public override void ResetConsumedMessageSize(long newSize = -1)
public override void ResetMessageSizeAndConsumedBytes(long newSize = -1)
{
// full reset
if (newSize < 0)
Expand All @@ -70,7 +70,7 @@ public override void ResetConsumedMessageSize(long newSize = -1)
public override void UpdateKnownMessageSize(long size)
{
var consumed = KnownMessageSize - RemainingMessageSize;
ResetConsumedMessageSize(size);
ResetMessageSizeAndConsumedBytes(size);
CountConsumedMessageBytes(consumed);
}

Expand All @@ -80,7 +80,7 @@ public override void UpdateKnownMessageSize(long size)
/// <param name="numBytes"></param>
public override void CheckReadBytesAvailable(long numBytes)
{
if (RemainingMessageSize < numBytes)
if ((RemainingMessageSize < numBytes) || (numBytes < 0))
throw new TTransportException(TTransportException.ExceptionType.EndOfFile, "MaxMessageSize reached");
}

Expand Down
2 changes: 1 addition & 1 deletion lib/netstd/Thrift/Transport/TTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public abstract class TTransport : IDisposable
public abstract TConfiguration Configuration { get; }
public abstract void UpdateKnownMessageSize(long size);
public abstract void CheckReadBytesAvailable(long numBytes);
public abstract void ResetConsumedMessageSize(long newSize = -1);
public abstract void ResetMessageSizeAndConsumedBytes(long newSize = -1);
public void Dispose()
{
Dispose(true);
Expand Down
Loading