From e9ddf6f8c8152598800f67e68874918a742a661f Mon Sep 17 00:00:00 2001 From: Takuro Ashie Date: Tue, 15 Dec 2020 09:30:03 +0900 Subject: [PATCH 1/2] Avoid to close @socket of MQTT::Client unexpectedly Since it is locked only when writing, it may be closed by anothre thread while reading or writing. In actual we often get the following error when we are processing massive messages by fluent-plugin-mqtt-io: IOError,stream closed in another thread This commit ensure to protect all atomic operations of the socket. Signed-off-by: Takuro Ashie --- lib/mqtt/client.rb | 75 ++++++++++++++++++++++++---------------- spec/mqtt_client_spec.rb | 8 ++--- 2 files changed, 50 insertions(+), 33 deletions(-) diff --git a/lib/mqtt/client.rb b/lib/mqtt/client.rb index def2619..ae971b6 100644 --- a/lib/mqtt/client.rb +++ b/lib/mqtt/client.rb @@ -165,7 +165,7 @@ def initialize(*args) @read_queue = Queue.new @pubacks = {} @read_thread = nil - @write_semaphore = Mutex.new + @socket_semaphore = Mutex.new @pubacks_semaphore = Mutex.new end @@ -227,7 +227,9 @@ def connect(clientid = nil) raise 'No MQTT server host set when attempting to connect' if @host.nil? - unless connected? + @socket_semaphore.synchronize do + break if socket_alive? + # Create network socket tcp_socket = TCPSocket.new(@host, @port) @@ -261,7 +263,7 @@ def connect(clientid = nil) ) # Send packet - send_packet(packet) + send_packet(packet, false) # Receive response receive_connack @@ -290,21 +292,16 @@ def disconnect(send_msg = true) @read_thread.kill if @read_thread && @read_thread.alive? @read_thread = nil - return unless connected? - - # Close the socket if it is open - if send_msg - packet = MQTT::Packet::Disconnect.new - send_packet(packet) + @socket_semaphore.synchronize do + close_socket(send_msg) end - @socket.close unless @socket.nil? - handle_close - @socket = nil end # Checks whether the client is connected to the server. def connected? - !@socket.nil? && !@socket.closed? + @socket_semaphore.synchronize do + socket_alive? + end end # Publish a message on a particular topic to the MQTT server. @@ -454,24 +451,28 @@ def unsubscribe(*topics) private + def socket_alive? + !@socket.nil? && !@socket.closed? + end + # Try to read a packet from the server # Also sends keep-alive ping packets. def receive_packet # Poll socket - is there data waiting? result = IO.select([@socket], [], [], SELECT_TIMEOUT) handle_timeouts - unless result.nil? - # Yes - read in the packet - packet = MQTT::Packet.read(@socket) - handle_packet packet + @socket_semaphore.synchronize do + unless result.nil? + # Yes - read in the packet + packet = MQTT::Packet.read(@socket) + handle_packet packet + end + keep_alive! end - keep_alive! # Pass exceptions up to parent thread rescue Exception => exp - unless @socket.nil? - @socket.close - @socket = nil - handle_close + @socket_semaphore.synchronize do + close_socket(false) end Thread.current[:parent].raise(exp) end @@ -509,6 +510,19 @@ def handle_close end end + def close_socket(send_msg = true) + return unless socket_alive? + + # Close the socket if it is open + if send_msg + packet = MQTT::Packet::Disconnect.new + send_packet(packet, false) + end + @socket.close unless @socket.nil? + handle_close + @socket = nil + end + if Process.const_defined? :CLOCK_MONOTONIC def current_time Process.clock_gettime(Process::CLOCK_MONOTONIC) @@ -521,12 +535,12 @@ def current_time end def keep_alive! - return unless @keep_alive > 0 && connected? + return unless @keep_alive > 0 && socket_alive? response_timeout = (@keep_alive * 1.5).ceil if current_time >= @last_ping_request + @keep_alive packet = MQTT::Packet::Pingreq.new - send_packet(packet) + send_packet(packet, false) @last_ping_request = current_time elsif current_time > @last_ping_response + response_timeout raise MQTT::ProtocolException, "No Ping Response received for #{response_timeout} seconds" @@ -556,12 +570,15 @@ def receive_connack end # Send a packet to server - def send_packet(data) - # Raise exception if we aren't connected - raise MQTT::NotConnectedException unless connected? - + def send_packet(data, with_lock = true) # Only allow one thread to write to socket at a time - @write_semaphore.synchronize do + if with_lock + @socket_semaphore.synchronize do + raise MQTT::NotConnectedException unless socket_alive? + @socket.write(data.to_s) + end + else + raise MQTT::NotConnectedException unless socket_alive? @socket.write(data.to_s) end end diff --git a/spec/mqtt_client_spec.rb b/spec/mqtt_client_spec.rb index 5ae92ab..5611ca2 100644 --- a/spec/mqtt_client_spec.rb +++ b/spec/mqtt_client_spec.rb @@ -285,7 +285,7 @@ def now end it "should not create a new TCP Socket if connected" do - allow(client).to receive(:connected?).and_return(true) + allow(client).to receive(:socket_alive?).and_return(true) expect(TCPSocket).to receive(:new).never client.connect('myclient') end @@ -558,19 +558,19 @@ def now end it "should not do anything if the socket is already disconnected" do - allow(client).to receive(:connected?).and_return(false) + allow(client).to receive(:socket_alive?).and_return(false) client.disconnect(true) expect(socket.string).to eq("") end it "should write a valid DISCONNECT packet to the socket if connected and the send_msg=true an" do - allow(client).to receive(:connected?).and_return(true) + allow(client).to receive(:socket_alive?).and_return(true) client.disconnect(true) expect(socket.string).to eq("\xE0\x00") end it "should not write anything to the socket if the send_msg=false" do - allow(client).to receive(:connected?).and_return(true) + allow(client).to receive(:socket_alive?).and_return(true) client.disconnect(false) expect(socket.string).to be_empty end From 683ce8a30890dd4b1df5e0396659da1b7a11fd7a Mon Sep 17 00:00:00 2001 From: Takuro Ashie Date: Thu, 21 Jan 2021 15:38:21 +0900 Subject: [PATCH 2/2] Make sure to raise an error on exiting @read_thread Although an error should be always raised on exiting @read_thread to break `get` loop, there was a case that it's not raised. Signed-off-by: Takuro Ashie --- lib/mqtt/client.rb | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/lib/mqtt/client.rb b/lib/mqtt/client.rb index ae971b6..56e7fc7 100644 --- a/lib/mqtt/client.rb +++ b/lib/mqtt/client.rb @@ -271,7 +271,19 @@ def connect(clientid = nil) # Start packet reading thread @read_thread = Thread.new(Thread.current) do |parent| Thread.current[:parent] = parent - receive_packet while connected? + no_error = true + no_error = receive_packet while no_error && connected? + + if no_error + # Should not reach here on normal state since `disconnect` kills + # this thread, but it will occur when `receive_packet` catches no + # error and # `connected?` returns false. An error should be raised + # in this case too to break `get` loop. + @socket_semaphore.synchronize do + close_socket(false) + end + Thread.current[:parent].raise(MQTT::NotConnectedException) + end end end @@ -469,12 +481,14 @@ def receive_packet end keep_alive! end + true # Pass exceptions up to parent thread rescue Exception => exp @socket_semaphore.synchronize do close_socket(false) end Thread.current[:parent].raise(exp) + false end def wait_for_puback(id, queue)