Skip to content

Commit 9f4012b

Browse files
committed
Use read_nonblock instead of readpartial to account for SSL socket buffer
1 parent 32776c0 commit 9f4012b

File tree

3 files changed

+107
-32
lines changed

3 files changed

+107
-32
lines changed

lib/rb/benchmark/client.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,9 +72,9 @@ def run
7272
ctx.key = OpenSSL::PKey::RSA.new(File.open(File.join(keys_dir, "client.key")))
7373
end
7474

75-
Thrift::SSLSocket.new(@host, @port, nil, ssl_context)
75+
Thrift::SSLSocket.new(@host, @port, 5, ssl_context)
7676
else
77-
Thrift::Socket.new(@host, @port)
77+
Thrift::Socket.new(@host, @port, 5)
7878
end
7979
protocol = create_protocol(socket)
8080
transport = protocol.trans

lib/rb/lib/thrift/transport/socket.rb

Lines changed: 48 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -68,20 +68,20 @@ def write(str)
6868
if @timeout.nil? or @timeout == 0
6969
@handle.write(str)
7070
else
71+
deadline = Process.clock_gettime(Process::CLOCK_MONOTONIC) + @timeout
7172
len = 0
72-
start = Time.now
73-
while Time.now - start < @timeout
74-
rd, wr, = IO.select(nil, [@handle], nil, @timeout)
75-
if wr and not wr.empty?
73+
74+
while len < str.length
75+
begin
7676
len += @handle.write_nonblock(str[len..-1])
77-
break if len >= str.length
77+
rescue IO::WaitWritable
78+
wait_for(:write, deadline, str.length)
79+
rescue IO::WaitReadable
80+
wait_for(:read, deadline, str.length)
7881
end
7982
end
80-
if len < str.length
81-
raise TransportException.new(TransportException::TIMED_OUT, "Socket: Timed out writing #{str.length} bytes to #{@desc}")
82-
else
83-
len
84-
end
83+
84+
len
8585
end
8686
rescue TransportException => e
8787
# pass this on
@@ -100,19 +100,16 @@ def read(sz)
100100
if @timeout.nil? or @timeout == 0
101101
data = @handle.readpartial(sz)
102102
else
103-
# it's possible to interrupt select for something other than the timeout
104-
# so we need to ensure we've waited long enough, but not too long
105-
start = Time.now
106-
timespent = 0
107-
rd = loop do
108-
rd, = IO.select([@handle], nil, nil, @timeout - timespent)
109-
timespent = Time.now - start
110-
break rd if (rd and not rd.empty?) or timespent >= @timeout
111-
end
112-
if rd.nil? or rd.empty?
113-
raise TransportException.new(TransportException::TIMED_OUT, "Socket: Timed out reading #{sz} bytes from #{@desc}")
114-
else
115-
data = @handle.readpartial(sz)
103+
deadline = Process.clock_gettime(Process::CLOCK_MONOTONIC) + @timeout
104+
105+
data = loop do
106+
begin
107+
break @handle.read_nonblock(sz)
108+
rescue IO::WaitReadable
109+
wait_for(:read, deadline, sz)
110+
rescue IO::WaitWritable
111+
wait_for(:write, deadline, sz)
112+
end
116113
end
117114
end
118115
rescue TransportException => e
@@ -141,5 +138,33 @@ def to_io
141138
def to_s
142139
"socket(#{@host}:#{@port})"
143140
end
141+
142+
private
143+
144+
def wait_for(operation, deadline, sz)
145+
rd_ary, wr_ary = case operation
146+
when :read
147+
[[@handle], nil]
148+
when :write
149+
[nil, [@handle]]
150+
else
151+
raise ArgumentError, "Unknown IO wait operation: #{operation.inspect}"
152+
end
153+
154+
loop do
155+
remaining = deadline - Process.clock_gettime(Process::CLOCK_MONOTONIC)
156+
if remaining <= 0
157+
case operation
158+
when :read
159+
raise TransportException.new(TransportException::TIMED_OUT, "Socket: Timed out reading #{sz} bytes from #{@desc}")
160+
when :write
161+
raise TransportException.new(TransportException::TIMED_OUT, "Socket: Timed out writing #{sz} bytes to #{@desc}")
162+
end
163+
end
164+
165+
rd, wr, = IO.select(rd_ary, wr_ary, nil, remaining)
166+
return if (rd && !rd.empty?) || (wr && !wr.empty?)
167+
end
168+
end
144169
end
145170
end

lib/rb/spec/socket_spec_shared.rb

Lines changed: 57 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -74,31 +74,81 @@
7474
it "should support the timeout accessor for read" do
7575
@socket.timeout = 3
7676
@socket.open
77-
expect(IO).to receive(:select).with([@handle], nil, nil, 3).and_return([[@handle], [], []])
78-
expect(@handle).to receive(:readpartial).with(17).and_return("test data")
77+
expect(@handle).to receive(:read_nonblock).with(17).and_raise(IO::EAGAINWaitReadable)
78+
expect(IO).to receive(:select) do |rd, wr, err, timeout|
79+
expect(rd).to eq([@handle])
80+
expect(wr).to be_nil
81+
expect(err).to be_nil
82+
expect(timeout).to be > 0
83+
expect(timeout).to be <= 3
84+
[[@handle], [], []]
85+
end
86+
expect(@handle).to receive(:read_nonblock).with(17).and_return("test data")
7987
expect(@socket.read(17)).to eq("test data")
8088
end
8189

8290
it "should support the timeout accessor for write" do
8391
@socket.timeout = 3
8492
@socket.open
85-
expect(IO).to receive(:select).with(nil, [@handle], nil, 3).twice.and_return([[], [@handle], []])
86-
expect(@handle).to receive(:write_nonblock).with("test data").and_return(4)
87-
expect(@handle).to receive(:write_nonblock).with(" data").and_return(5)
93+
write_calls = 0
94+
expect(@handle).to receive(:write_nonblock).exactly(3).times do |chunk|
95+
write_calls += 1
96+
case write_calls
97+
when 1
98+
expect(chunk).to eq("test data")
99+
raise IO::EAGAINWaitWritable
100+
when 2
101+
expect(chunk).to eq("test data")
102+
4
103+
when 3
104+
expect(chunk).to eq(" data")
105+
5
106+
end
107+
end
108+
expect(IO).to receive(:select) do |rd, wr, err, timeout|
109+
expect(rd).to be_nil
110+
expect(wr).to eq([@handle])
111+
expect(err).to be_nil
112+
expect(timeout).to be > 0
113+
expect(timeout).to be <= 3
114+
[[], [@handle], []]
115+
end
88116
expect(@socket.write("test data")).to eq(9)
89117
end
90118

91119
it "should raise an error when read times out" do
92120
@socket.timeout = 0.5
93121
@socket.open
94-
expect(IO).to receive(:select).once {sleep(0.5); nil}
122+
expect(@handle).to receive(:read_nonblock).with(17).and_raise(IO::EAGAINWaitReadable)
123+
expect(IO).to receive(:select).once { sleep(0.6); nil }
95124
expect { @socket.read(17) }.to raise_error(Thrift::TransportException) { |e| expect(e.type).to eq(Thrift::TransportException::TIMED_OUT) }
96125
end
97126

98127
it "should raise an error when write times out" do
99128
@socket.timeout = 0.5
100129
@socket.open
101-
allow(IO).to receive(:select).with(nil, [@handle], nil, 0.5).and_return(nil)
130+
expect(@handle).to receive(:write_nonblock).with("test data").and_raise(IO::EAGAINWaitWritable)
131+
expect(IO).to receive(:select).once { sleep(0.6); nil }
102132
expect { @socket.write("test data") }.to raise_error(Thrift::TransportException) { |e| expect(e.type).to eq(Thrift::TransportException::TIMED_OUT) }
103133
end
134+
135+
it "should read buffered SSL data without waiting on the raw socket again" do
136+
@socket.timeout = 1
137+
@socket.open
138+
139+
expect(@handle).to receive(:read_nonblock).with(4).ordered.and_raise(IO::EAGAINWaitReadable)
140+
expect(IO).to receive(:select).once.ordered do |rd, wr, err, timeout|
141+
expect(rd).to eq([@handle])
142+
expect(wr).to be_nil
143+
expect(err).to be_nil
144+
expect(timeout).to be > 0
145+
expect(timeout).to be <= 1
146+
[[@handle], [], []]
147+
end
148+
expect(@handle).to receive(:read_nonblock).with(4).ordered.and_return("ABCD")
149+
expect(@handle).to receive(:read_nonblock).with(5).ordered.and_return("12345")
150+
151+
expect(@socket.read(4)).to eq("ABCD")
152+
expect(@socket.read(5)).to eq("12345")
153+
end
104154
end

0 commit comments

Comments
 (0)