Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 27 additions & 6 deletions lib/nsq/client_base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ def discover_repeatedly(opts = {})
# We can't connect to any nsqlookupds. That's okay, we'll just
# leave our current nsqd connections alone and try again later.
warn 'Could not connect to any nsqlookupd instances in discovery loop'
rescue StandardError => e

end
sleep opts[:interval]
end
Expand All @@ -63,24 +65,34 @@ def nsqds_from_lookupd(topic = nil)


def drop_and_add_connections(nsqds)
has_connections_changed = false
@connections.each_pair {|k, nsqd|
unless nsqd.connected?
drop_connection(k, nsqd)
has_connections_changed = true
end

}
# drop nsqd connections that are no longer in lookupd
missing_nsqds = @connections.keys - nsqds
missing_nsqds.each do |nsqd|
drop_connection(nsqd)
has_connections_changed = true
end

# add new ones
new_nsqds = nsqds - @connections.keys
new_nsqds.each do |nsqd|
begin
add_connection(nsqd)
rescue Exception => ex
add_connection(nsqd, connected_through_lookupd: true)
has_connections_changed = true
rescue StandardError => ex
error "Failed to connect to nsqd @ #{nsqd}: #{ex}"
end
end

# balance RDY state amongst the connections
connections_changed
connections_changed if has_connections_changed
end


Expand All @@ -98,10 +110,19 @@ def add_connection(nsqd, options = {})
end


def drop_connection(nsqd)
def drop_connection(nsqd, instance = nil)
info "- Dropping connection #{nsqd}"
connection = @connections.delete(nsqd)
connection.close if connection
if instance
instance.close
connection = @connections.delete(nsqd)
if connection != instance
connection.close if connection
end
else
connection = @connections.delete(nsqd)
connection.close if connection
end

connections_changed
end

Expand Down
79 changes: 60 additions & 19 deletions lib/nsq/connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ def initialize(opts = {})
@max_in_flight = opts[:max_in_flight] || 1
@tls_options = opts[:tls_options]
@max_attempts = opts[:max_attempts]
@connected_through_lookupd = false
@last_heartbeat = nil
if opts[:ssl_context]
if @tls_options
warn 'ssl_context and tls_options both set. Using tls_options. Ignoring ssl_context.'
Expand All @@ -55,6 +57,10 @@ def initialize(opts = {})
raise ArgumentError, 'msg_timeout cannot be less than 1000. it\'s in milliseconds.'
end

if opts[:connected_through_lookupd]
@connected_through_lookupd = opts[:connected_through_lookupd]
end

# for outgoing communication
@write_queue = SizedQueue.new(10000)

Expand All @@ -70,12 +76,18 @@ def initialize(opts = {})
start_monitoring_connection
end


def connected?
@connected
if @connected
if !@last_heartbeat.nil? && @last_heartbeat > Time.now - 40
true
else
false
end
else
false
end
end


# close the connection and don't try to re-open it
def close
stop_monitoring_connection
Expand Down Expand Up @@ -156,7 +168,16 @@ def write(raw)

def write_to_socket(raw)
debug ">>> #{raw.inspect}"
@socket.write(raw)
begin
@socket.write_nonblock(raw)
rescue Errno::EWOULDBLOCK, OpenSSL::SSL::SSLErrorWaitWritable
if connected?
sleep 0.01
retry
else
raise "timeout"
end
end
end


Expand All @@ -179,9 +200,9 @@ def identify
write_to_socket ["IDENTIFY\n", metadata.length, metadata].pack('a*l>a*')

# Now wait for the response!
frame = receive_frame
frame = receive_frame(5) # timeout after 5 seconds to avoid hung servers
server = JSON.parse(frame.data)

@last_heartbeat = Time.now
if @max_in_flight > server['max_rdy_count']
raise "max_in_flight is set to #{@max_in_flight}, server only supports #{server['max_rdy_count']}"
end
Expand All @@ -193,6 +214,7 @@ def identify
def handle_response(frame)
if frame.data == RESPONSE_HEARTBEAT
debug 'Received heartbeat'
@last_heartbeat = Time.now
nop
elsif frame.data == RESPONSE_OK
debug 'Received OK'
Expand All @@ -202,13 +224,33 @@ def handle_response(frame)
end


def receive_frame
if buffer = @socket.read(8)
size, type = buffer.unpack('l>l>')
size -= 4 # we want the size of the data part and type already took up 4 bytes
data = @socket.read(size)
frame_class = frame_class_for_type(type)
return frame_class.new(data, self)
def receive_frame(max_receive_time = nil)
break_after = nil
break_after = Time.now + max_receive_time if max_receive_time
begin
if buffer = @socket.read_nonblock(8)
size, type = buffer.unpack('l>l>')
size -= 4 # we want the size of the data part and type already took up 4 bytes
begin
data = @socket.read_nonblock(size)
frame_class = frame_class_for_type(type)
return frame_class.new(data, self)
rescue Errno::EWOULDBLOCK, OpenSSL::SSL::SSLErrorWaitReadable
if break_after.nil? || break_after > Time.now
sleep 0.01
retry
else
raise Errno::ECONNREFUSED
end
end
end
rescue Errno::EWOULDBLOCK, OpenSSL::SSL::SSLErrorWaitReadable
if break_after.nil? || break_after > Time.now
sleep 0.01
retry
else
raise Errno::ECONNREFUSED
end
end
end

Expand Down Expand Up @@ -260,7 +302,7 @@ def read_loop
raise 'No data from socket'
end
end
rescue Exception => ex
rescue StandardError => ex
die(ex)
end

Expand All @@ -286,7 +328,7 @@ def write_loop
break if data == :stop_write_loop
write_to_socket(data)
end
rescue Exception => ex
rescue StandardError => ex
# requeue PUB and MPUB commands
if data =~ /^M?PUB/
debug "Requeueing to write_queue: #{data.inspect}"
Expand Down Expand Up @@ -404,19 +446,18 @@ def openssl_context
# https://github.com/ooyala/retries/blob/master/lib/retries.rb
def with_retries(&block)
base_sleep_seconds = 0.5
max_sleep_seconds = 300 # 5 minutes
max_sleep_seconds = 30 # 30 seconds

# Let's do this thing
attempts = 0

max_attempts = @connected_through_lookupd ? 10 : 100
begin
attempts += 1
return block.call(attempts)

rescue Errno::ECONNREFUSED, Errno::ECONNRESET, Errno::EHOSTUNREACH,
Errno::ENETDOWN, Errno::ENETUNREACH, Errno::ETIMEDOUT, Timeout::Error => ex

raise ex if attempts >= 100
raise ex if attempts >= max_attempts

# The sleep time is an exponentially-increasing function of base_sleep_seconds.
# But, it never exceeds max_sleep_seconds.
Expand Down
34 changes: 22 additions & 12 deletions lib/nsq/discovery.rb
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ def gather_nsqds_from_all_lookupds
# If there's an error, return nil
def get_nsqds(lookupd, topic = nil)
uri_scheme = 'http://' unless lookupd.match(%r(https?://))
uri = URI.parse("#{uri_scheme}#{lookupd}")

uri = URI("#{uri_scheme}#{lookupd}")
uri.query = "ts=#{Time.now.to_i}"
if topic
uri.path = '/lookup'
Expand All @@ -76,19 +76,29 @@ def get_nsqds(lookupd, topic = nil)
end

begin
body = Net::HTTP.get(uri)
data = JSON.parse(body)
producers = data['producers'] || # v1.0.0-compat
(data['data'] && data['data']['producers'])

Net::HTTP.start(uri.host, uri.port, :use_ssl => uri.scheme == 'https') do |client|
client.open_timeout = 10
client.read_timeout = 10
response = client.get(uri.path + "?" + uri.query)
if response.code == "200"
body = response.body
data = JSON.parse(body)
producers = data['producers'] || # v1.0.0-compat
(data['data'] && data['data']['producers'])

if producers
producers.map do |producer|
"#{producer['broadcast_address']}:#{producer['tcp_port']}"
if producers
producers.map do |producer|
"#{producer['broadcast_address']}:#{producer['tcp_port']}"
end
else
[]
end
else
[]
end
else
[]
end
rescue Exception => e
rescue StandardError => e
error "Error during discovery for #{lookupd}: #{e}"
nil
end
Expand Down
2 changes: 1 addition & 1 deletion lib/nsq/exceptions.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
module Nsq
# Raised when nsqlookupd discovery fails
class DiscoveryException < Exception; end
class DiscoveryException < StandardError; end
end

48 changes: 48 additions & 0 deletions spec/lib/nsq/consumer_with_pausing_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
require_relative '../../spec_helper'
require 'json'
require 'timeout'

describe Nsq::Consumer do
before do
@cluster = NsqCluster.new(nsqd_count: 2, nsqlookupd_count: 1)
end

after do
@cluster.destroy
end


describe 'when connecting to nsqd directly' do
before do
@nsqd = @cluster.nsqd.first
@consumer = new_consumer(nsqlookupd: nil, nsqd: "#{@nsqd.host}:#{@nsqd.tcp_port}", max_in_flight: 10)
end
after do
@consumer.terminate
end

describe '::new that is paused' do
it 'should throw an exception when trying to connect to a server that\'s paused' do
@nsqd.pause_process

expect{
new_consumer(nsqlookupd: nil, nsqd: "#{@nsqd.host}:#{@nsqd.tcp_port}")
}.to raise_error(Errno::ECONNREFUSED)

end
end

describe '::new that is later paused should show as unhealty due to missing keepalives' do
it 'should show as unhealthy' do
puts "\nlong running test"
consumer = new_consumer(nsqlookupd: nil, nsqd: "#{@nsqd.host}:#{@nsqd.tcp_port}")
expect(consumer.connected?).to eq(true)
@nsqd.pause_process
sleep 50 # need to be able to configure this through opts to lower this.
expect(consumer.connected?).to eq(false)
end
end


end
end
Loading