diff --git a/lib/nsq/client_base.rb b/lib/nsq/client_base.rb index 09e3e1a..f5de6e5 100644 --- a/lib/nsq/client_base.rb +++ b/lib/nsq/client_base.rb @@ -43,6 +43,8 @@ def discover_repeatedly(opts = {}) # We can't connect to any nsqlookupds. That's okay, we'll just # leave our current nsqd connections alone and try again later. warn 'Could not connect to any nsqlookupd instances in discovery loop' + rescue StandardError => e + end sleep opts[:interval] end @@ -63,24 +65,34 @@ def nsqds_from_lookupd(topic = nil) def drop_and_add_connections(nsqds) + has_connections_changed = false + @connections.each_pair {|k, nsqd| + unless nsqd.connected? + drop_connection(k, nsqd) + has_connections_changed = true + end + + } # drop nsqd connections that are no longer in lookupd missing_nsqds = @connections.keys - nsqds missing_nsqds.each do |nsqd| drop_connection(nsqd) + has_connections_changed = true end # add new ones new_nsqds = nsqds - @connections.keys new_nsqds.each do |nsqd| begin - add_connection(nsqd) - rescue Exception => ex + add_connection(nsqd, connected_through_lookupd: true) + has_connections_changed = true + rescue StandardError => ex error "Failed to connect to nsqd @ #{nsqd}: #{ex}" end end # balance RDY state amongst the connections - connections_changed + connections_changed if has_connections_changed end @@ -98,10 +110,19 @@ def add_connection(nsqd, options = {}) end - def drop_connection(nsqd) + def drop_connection(nsqd, instance = nil) info "- Dropping connection #{nsqd}" - connection = @connections.delete(nsqd) - connection.close if connection + if instance + instance.close + connection = @connections.delete(nsqd) + if connection != instance + connection.close if connection + end + else + connection = @connections.delete(nsqd) + connection.close if connection + end + connections_changed end diff --git a/lib/nsq/connection.rb b/lib/nsq/connection.rb index e142f6c..901f88e 100644 --- a/lib/nsq/connection.rb +++ b/lib/nsq/connection.rb @@ -33,6 +33,8 @@ def initialize(opts = {}) @max_in_flight = opts[:max_in_flight] || 1 @tls_options = opts[:tls_options] @max_attempts = opts[:max_attempts] + @connected_through_lookupd = false + @last_heartbeat = nil if opts[:ssl_context] if @tls_options warn 'ssl_context and tls_options both set. Using tls_options. Ignoring ssl_context.' @@ -55,6 +57,10 @@ def initialize(opts = {}) raise ArgumentError, 'msg_timeout cannot be less than 1000. it\'s in milliseconds.' end + if opts[:connected_through_lookupd] + @connected_through_lookupd = opts[:connected_through_lookupd] + end + # for outgoing communication @write_queue = SizedQueue.new(10000) @@ -70,12 +76,18 @@ def initialize(opts = {}) start_monitoring_connection end - def connected? - @connected + if @connected + if !@last_heartbeat.nil? && @last_heartbeat > Time.now - 40 + true + else + false + end + else + false + end end - # close the connection and don't try to re-open it def close stop_monitoring_connection @@ -156,7 +168,16 @@ def write(raw) def write_to_socket(raw) debug ">>> #{raw.inspect}" - @socket.write(raw) + begin + @socket.write_nonblock(raw) + rescue Errno::EWOULDBLOCK, OpenSSL::SSL::SSLErrorWaitWritable + if connected? + sleep 0.01 + retry + else + raise "timeout" + end + end end @@ -179,9 +200,9 @@ def identify write_to_socket ["IDENTIFY\n", metadata.length, metadata].pack('a*l>a*') # Now wait for the response! - frame = receive_frame + frame = receive_frame(5) # timeout after 5 seconds to avoid hung servers server = JSON.parse(frame.data) - + @last_heartbeat = Time.now if @max_in_flight > server['max_rdy_count'] raise "max_in_flight is set to #{@max_in_flight}, server only supports #{server['max_rdy_count']}" end @@ -193,6 +214,7 @@ def identify def handle_response(frame) if frame.data == RESPONSE_HEARTBEAT debug 'Received heartbeat' + @last_heartbeat = Time.now nop elsif frame.data == RESPONSE_OK debug 'Received OK' @@ -202,13 +224,33 @@ def handle_response(frame) end - def receive_frame - if buffer = @socket.read(8) - size, type = buffer.unpack('l>l>') - size -= 4 # we want the size of the data part and type already took up 4 bytes - data = @socket.read(size) - frame_class = frame_class_for_type(type) - return frame_class.new(data, self) + def receive_frame(max_receive_time = nil) + break_after = nil + break_after = Time.now + max_receive_time if max_receive_time + begin + if buffer = @socket.read_nonblock(8) + size, type = buffer.unpack('l>l>') + size -= 4 # we want the size of the data part and type already took up 4 bytes + begin + data = @socket.read_nonblock(size) + frame_class = frame_class_for_type(type) + return frame_class.new(data, self) + rescue Errno::EWOULDBLOCK, OpenSSL::SSL::SSLErrorWaitReadable + if break_after.nil? || break_after > Time.now + sleep 0.01 + retry + else + raise Errno::ECONNREFUSED + end + end + end + rescue Errno::EWOULDBLOCK, OpenSSL::SSL::SSLErrorWaitReadable + if break_after.nil? || break_after > Time.now + sleep 0.01 + retry + else + raise Errno::ECONNREFUSED + end end end @@ -260,7 +302,7 @@ def read_loop raise 'No data from socket' end end - rescue Exception => ex + rescue StandardError => ex die(ex) end @@ -286,7 +328,7 @@ def write_loop break if data == :stop_write_loop write_to_socket(data) end - rescue Exception => ex + rescue StandardError => ex # requeue PUB and MPUB commands if data =~ /^M?PUB/ debug "Requeueing to write_queue: #{data.inspect}" @@ -404,19 +446,18 @@ def openssl_context # https://github.com/ooyala/retries/blob/master/lib/retries.rb def with_retries(&block) base_sleep_seconds = 0.5 - max_sleep_seconds = 300 # 5 minutes + max_sleep_seconds = 30 # 30 seconds # Let's do this thing attempts = 0 - + max_attempts = @connected_through_lookupd ? 10 : 100 begin attempts += 1 return block.call(attempts) rescue Errno::ECONNREFUSED, Errno::ECONNRESET, Errno::EHOSTUNREACH, Errno::ENETDOWN, Errno::ENETUNREACH, Errno::ETIMEDOUT, Timeout::Error => ex - - raise ex if attempts >= 100 + raise ex if attempts >= max_attempts # The sleep time is an exponentially-increasing function of base_sleep_seconds. # But, it never exceeds max_sleep_seconds. diff --git a/lib/nsq/discovery.rb b/lib/nsq/discovery.rb index 2009b58..069c9a2 100644 --- a/lib/nsq/discovery.rb +++ b/lib/nsq/discovery.rb @@ -65,8 +65,8 @@ def gather_nsqds_from_all_lookupds # If there's an error, return nil def get_nsqds(lookupd, topic = nil) uri_scheme = 'http://' unless lookupd.match(%r(https?://)) - uri = URI.parse("#{uri_scheme}#{lookupd}") - + uri = URI("#{uri_scheme}#{lookupd}") + uri.query = "ts=#{Time.now.to_i}" if topic uri.path = '/lookup' @@ -76,19 +76,29 @@ def get_nsqds(lookupd, topic = nil) end begin - body = Net::HTTP.get(uri) - data = JSON.parse(body) - producers = data['producers'] || # v1.0.0-compat - (data['data'] && data['data']['producers']) + + Net::HTTP.start(uri.host, uri.port, :use_ssl => uri.scheme == 'https') do |client| + client.open_timeout = 10 + client.read_timeout = 10 + response = client.get(uri.path + "?" + uri.query) + if response.code == "200" + body = response.body + data = JSON.parse(body) + producers = data['producers'] || # v1.0.0-compat + (data['data'] && data['data']['producers']) - if producers - producers.map do |producer| - "#{producer['broadcast_address']}:#{producer['tcp_port']}" + if producers + producers.map do |producer| + "#{producer['broadcast_address']}:#{producer['tcp_port']}" + end + else + [] + end + else + [] end - else - [] end - rescue Exception => e + rescue StandardError => e error "Error during discovery for #{lookupd}: #{e}" nil end diff --git a/lib/nsq/exceptions.rb b/lib/nsq/exceptions.rb index e641d1c..20f144b 100644 --- a/lib/nsq/exceptions.rb +++ b/lib/nsq/exceptions.rb @@ -1,5 +1,5 @@ module Nsq # Raised when nsqlookupd discovery fails - class DiscoveryException < Exception; end + class DiscoveryException < StandardError; end end diff --git a/spec/lib/nsq/consumer_with_pausing_spec.rb b/spec/lib/nsq/consumer_with_pausing_spec.rb new file mode 100644 index 0000000..bd7c567 --- /dev/null +++ b/spec/lib/nsq/consumer_with_pausing_spec.rb @@ -0,0 +1,48 @@ +require_relative '../../spec_helper' +require 'json' +require 'timeout' + +describe Nsq::Consumer do + before do + @cluster = NsqCluster.new(nsqd_count: 2, nsqlookupd_count: 1) + end + + after do + @cluster.destroy + end + + + describe 'when connecting to nsqd directly' do + before do + @nsqd = @cluster.nsqd.first + @consumer = new_consumer(nsqlookupd: nil, nsqd: "#{@nsqd.host}:#{@nsqd.tcp_port}", max_in_flight: 10) + end + after do + @consumer.terminate + end + + describe '::new that is paused' do + it 'should throw an exception when trying to connect to a server that\'s paused' do + @nsqd.pause_process + + expect{ + new_consumer(nsqlookupd: nil, nsqd: "#{@nsqd.host}:#{@nsqd.tcp_port}") + }.to raise_error(Errno::ECONNREFUSED) + + end + end + + describe '::new that is later paused should show as unhealty due to missing keepalives' do + it 'should show as unhealthy' do + puts "\nlong running test" + consumer = new_consumer(nsqlookupd: nil, nsqd: "#{@nsqd.host}:#{@nsqd.tcp_port}") + expect(consumer.connected?).to eq(true) + @nsqd.pause_process + sleep 50 # need to be able to configure this through opts to lower this. + expect(consumer.connected?).to eq(false) + end + end + + + end +end diff --git a/spec/lib/nsq/discovery_with_pausing_spec.rb b/spec/lib/nsq/discovery_with_pausing_spec.rb new file mode 100644 index 0000000..131c7ba --- /dev/null +++ b/spec/lib/nsq/discovery_with_pausing_spec.rb @@ -0,0 +1,79 @@ +require_relative '../../spec_helper' + +NSQD_COUNT = 5 + +describe Nsq::Discovery do + before do + @cluster = NsqCluster.new(nsqd_count: NSQD_COUNT, nsqlookupd_count: 2) + @topic = 'some-topic' + + # make sure each nsqd has a message for this topic + # leave the last nsqd without this topic for testing + @cluster.nsqd.take(NSQD_COUNT-1).each do |nsqd| + nsqd.pub(@topic, 'some-message') + end + @cluster.nsqd.last.pub('some-other-topic', 'some-message') + + @expected_topic_lookup_nsqds = @cluster.nsqd.take(NSQD_COUNT-1).map{|d|"#{d.host}:#{d.tcp_port}"}.sort + @expected_all_nsqds = @cluster.nsqd.map{|d|"#{d.host}:#{d.tcp_port}"}.sort + end + + after do + @cluster.destroy + end + + + def new_discovery(cluster_lookupds) + lookupds = cluster_lookupds.map do |lookupd| + "#{lookupd.host}:#{lookupd.http_port}" + end + + # one lookupd has scheme and one does not + lookupds.last.prepend 'http://' + + Nsq::Discovery.new(lookupds) + end + + describe 'multiple nsqlookupds, but one is paused' do + before do + @downed_nsqlookupd = @cluster.nsqlookupd.first + @downed_nsqlookupd.pause_process + + @discovery = new_discovery(@cluster.nsqlookupd) + end + + describe '#nsqds_for_topic' do + it 'returns all nsqds' do + puts "\nlong running test" + nsqds = @discovery.nsqds_for_topic(@topic) + expect(nsqds.sort).to eq(@expected_topic_lookup_nsqds) + end + end + end + + + describe 'when all lookupds are paused' do + before do + @cluster.nsqlookupd.each(&:pause_process) + @discovery = new_discovery(@cluster.nsqlookupd) + end + + describe '#nsqds' do + it 'throws an exception' do + puts "\nlong running test" + expect { + @discovery.nsqds + }.to raise_error(Nsq::DiscoveryException) + end + end + + describe '#nsqds_for_topic' do + it 'throws an exception' do + puts "\nlong running test" + expect { + @discovery.nsqds_for_topic(@topic) + }.to raise_error(Nsq::DiscoveryException) + end + end + end +end