From 3489202efe1f0660ba80b75dcde5fe1ea53c897a Mon Sep 17 00:00:00 2001 From: Soulou Date: Thu, 17 Jan 2019 22:50:33 +0100 Subject: [PATCH 01/15] First step, single read and write loop --- lib/nsq/connection.rb | 110 +++++++++++++++--------------------- lib/nsq/selectable_queue.rb | 47 +++++++++++++++ 2 files changed, 94 insertions(+), 63 deletions(-) create mode 100644 lib/nsq/selectable_queue.rb diff --git a/lib/nsq/connection.rb b/lib/nsq/connection.rb index e142f6c..b68e544 100644 --- a/lib/nsq/connection.rb +++ b/lib/nsq/connection.rb @@ -6,6 +6,7 @@ require_relative 'frames/error' require_relative 'frames/message' require_relative 'frames/response' +require_relative 'selectable_queue' require_relative 'logger' module Nsq @@ -56,11 +57,11 @@ def initialize(opts = {}) end # for outgoing communication - @write_queue = SizedQueue.new(10000) + @write_queue = SelectableQueue.new(10000) # For indicating that the connection has died. # We use a Queue so we don't have to poll. Used to communicate across - # threads (from write_loop and read_loop to connect_and_monitor). + # threads (from read_write_loop to connect_and_monitor). @death_queue = Queue.new @connected = false @@ -201,17 +202,21 @@ def handle_response(frame) end end - - def receive_frame - if buffer = @socket.read(8) + def receive_frame(recv_method = :recv) + # __send__ is used as it clashes with #send method of Socket + if buffer = @socket.__send__(recv_method, 8) size, type = buffer.unpack('l>l>') size -= 4 # we want the size of the data part and type already took up 4 bytes - data = @socket.read(size) + data = @socket.recv(size) frame_class = frame_class_for_type(type) return frame_class.new(data, self) end end + def receive_frame_nonblock + receive_frame(:recv_nonblock) + end + FRAME_CLASSES = [Response, Error, Message] def frame_class_for_type(type) @@ -231,71 +236,53 @@ def decrement_in_flight end - def start_read_loop - @read_loop_thread ||= Thread.new{read_loop} + def start_read_write_loop + @read_write_loop_thread ||= Thread.new{read_write_loop} end - def stop_read_loop - @read_loop_thread.kill if @read_loop_thread - @read_loop_thread = nil + def stop_read_write_loop + @read_write_loop_thread.kill if @read_write_loop_thread + @read_write_loop_thread = nil end - - def read_loop + def read_write_loop loop do - frame = receive_frame - if frame.is_a?(Response) - handle_response(frame) - elsif frame.is_a?(Error) - error "Error received: #{frame.data}" - elsif frame.is_a?(Message) - debug "<<< #{frame.body}" - if @max_attempts && frame.attempts > @max_attempts - fin(frame.id) - else - @queue.push(frame) if @queue + begin + ready, _, _ = IO.select([@socket, @write_queue]) + + if ready.include?(@write_queue) + data = @write_queue.pop + if !data.nil? + write_to_socket(data) + end end - else - raise 'No data from socket' - end - end - rescue Exception => ex - die(ex) - end - - - def start_write_loop - @write_loop_thread ||= Thread.new{write_loop} - end - - - def stop_write_loop - if @write_loop_thread - @write_queue.push(:stop_write_loop) - @write_loop_thread.join - end - @write_loop_thread = nil - end - - def write_loop - data = nil - loop do - data = @write_queue.pop - break if data == :stop_write_loop - write_to_socket(data) + frame = receive_frame_nonblock + if !frame.nil? + if frame.is_a?(Response) + handle_response(frame) + elsif frame.is_a?(Error) + error "Error received: #{frame.data}" + elsif frame.is_a?(Message) + debug "<<< #{frame.body}" + if @max_attempts && frame.attempts > @max_attempts + fin(frame.id) + else + @queue.push(frame) if @queue + end + else + raise 'No data from socket' + end + end + rescue IO::WaitReadable + retry + end end rescue Exception => ex - # requeue PUB and MPUB commands - if data =~ /^M?PUB/ - debug "Requeueing to write_queue: #{data.inspect}" - @write_queue.push(data) - end die(ex) end - # Waits for death of connection def start_monitoring_connection @connection_monitor_thread ||= Thread.new{monitor_connection} @@ -308,7 +295,6 @@ def stop_monitoring_connection @connection_monitor = nil end - def monitor_connection loop do # wait for death, hopefully it never comes @@ -344,8 +330,7 @@ def open_connection identify upgrade_to_ssl_socket if @tls_v1 - start_read_loop - start_write_loop + start_read_write_loop @connected = true # we need to re-subscribe if there's a topic specified @@ -360,8 +345,7 @@ def open_connection # closes the connection and stops listening for messages def close_connection cls if connected? - stop_read_loop - stop_write_loop + stop_read_write_loop @socket.close if @socket @socket = nil @connected = false diff --git a/lib/nsq/selectable_queue.rb b/lib/nsq/selectable_queue.rb new file mode 100644 index 0000000..21d7e19 --- /dev/null +++ b/lib/nsq/selectable_queue.rb @@ -0,0 +1,47 @@ +# A queue that you can pass to IO.select. +# +# NOT THREAD SAFE: Only one thread should write; only one thread should read. +# +# Purpose: +# Allow easy integration of data-producing threads into event loops. The +# queue will be readable from select's perspective as long as there are +# objects in the queue. +# +# Implementation: +# The queue maintains a pipe. The pipe contains a number of bytes equal to +# the queue size. +# +# Example use: +# queue = SelectableQueue.new +# readable, _, _ = IO.select([queue, $stdin]) +# print "got #{queue.pop}" if readable.contain?(queue) +# +class SelectableQueue + def initialize(size = 0) + if size == 0 + @queue = Queue.new + else + @queue = SizedQueue.new(size) + end + @read_io, @write_io = IO.pipe + end + + def push(o) + @queue.push(o) + # It doesn't matter what we write into the pipe, as long as it's one byte. + # It's not blocking until full, and has a default limit of 65536 (64KB) + @write_io << '.' + self + end + + def pop(nonblock=false) + o = @queue.pop(nonblock) + @read_io.read(1) + o + end + + def to_io + @read_io + end +end + From 19867db4f769fd36f39637b37894886d3299d733 Mon Sep 17 00:00:00 2001 From: Soulou Date: Thu, 17 Jan 2019 23:55:00 +0100 Subject: [PATCH 02/15] First working PoC to handle exception when sending messages --- lib/nsq/connection.rb | 29 ++++++++++++++++++++++++----- lib/nsq/exceptions.rb | 2 ++ 2 files changed, 26 insertions(+), 5 deletions(-) diff --git a/lib/nsq/connection.rb b/lib/nsq/connection.rb index b68e544..7548a8c 100644 --- a/lib/nsq/connection.rb +++ b/lib/nsq/connection.rb @@ -3,6 +3,7 @@ require 'openssl' require 'timeout' +require_relative 'exceptions' require_relative 'frames/error' require_relative 'frames/message' require_relative 'frames/response' @@ -58,6 +59,7 @@ def initialize(opts = {}) # for outgoing communication @write_queue = SelectableQueue.new(10000) + @transactions = [] # For indicating that the connection has died. # We use a Queue so we don't have to poll. Used to communicate across @@ -151,16 +153,22 @@ def nop def write(raw) - @write_queue.push(raw) + result = (raw =~ /^M?PUB/).nil? ? nil : SizedQueue.new(1) + @write_queue.push({ + message: raw, + result: result, + }) + if result + value = result.pop + raise value if value.is_a?(Exception) + end end - def write_to_socket(raw) debug ">>> #{raw.inspect}" @socket.write(raw) end - def identify hostname = Socket.gethostname metadata = { @@ -205,6 +213,7 @@ def handle_response(frame) def receive_frame(recv_method = :recv) # __send__ is used as it clashes with #send method of Socket if buffer = @socket.__send__(recv_method, 8) + raise EOFError if buffer.length == 0 size, type = buffer.unpack('l>l>') size -= 4 # we want the size of the data part and type already took up 4 bytes data = @socket.recv(size) @@ -254,17 +263,27 @@ def read_write_loop if ready.include?(@write_queue) data = @write_queue.pop if !data.nil? - write_to_socket(data) + @transactions.push(data[:result]) + write_to_socket(data[:message]) end end frame = receive_frame_nonblock if !frame.nil? + result = @transactions.pop if frame.is_a?(Response) + # If the producer is expecting a result + # signal everything went fine pushing any value + result.push(nil) if result handle_response(frame) elsif frame.is_a?(Error) - error "Error received: #{frame.data}" + if result + result.push(ErrorFrameException.new(frame.data)) + else + error "Error received: #{frame.data}" + end elsif frame.is_a?(Message) + result.push(nil) if result debug "<<< #{frame.body}" if @max_attempts && frame.attempts > @max_attempts fin(frame.id) diff --git a/lib/nsq/exceptions.rb b/lib/nsq/exceptions.rb index e641d1c..f659c14 100644 --- a/lib/nsq/exceptions.rb +++ b/lib/nsq/exceptions.rb @@ -1,5 +1,7 @@ module Nsq # Raised when nsqlookupd discovery fails class DiscoveryException < Exception; end + + class ErrorFrameException < Exception; end end From 308dfd7efdef517fcf7237c85fa080263a01477c Mon Sep 17 00:00:00 2001 From: Soulou Date: Fri, 18 Jan 2019 14:48:32 +0100 Subject: [PATCH 03/15] Fix retrocompaibility, default behavior is exactly how it was before and specs are good --- lib/nsq/connection.rb | 61 ++++++++++++++++++++++--------------- lib/nsq/producer.rb | 5 +-- lib/nsq/selectable_queue.rb | 4 +++ 3 files changed, 44 insertions(+), 26 deletions(-) diff --git a/lib/nsq/connection.rb b/lib/nsq/connection.rb index 7548a8c..a89fe9a 100644 --- a/lib/nsq/connection.rb +++ b/lib/nsq/connection.rb @@ -28,6 +28,7 @@ class Connection def initialize(opts = {}) @host = opts[:host] || (raise ArgumentError, 'host is required') @port = opts[:port] || (raise ArgumentError, 'port is required') + @synchronous = opts[:synchronous] || false @queue = opts[:queue] @topic = opts[:topic] @channel = opts[:channel] @@ -153,7 +154,7 @@ def nop def write(raw) - result = (raw =~ /^M?PUB/).nil? ? nil : SizedQueue.new(1) + result = @synchronous && (raw =~ /^M?PUB/) ? SizedQueue.new(1) : nil @write_queue.push({ message: raw, result: result, @@ -210,23 +211,17 @@ def handle_response(frame) end end - def receive_frame(recv_method = :recv) - # __send__ is used as it clashes with #send method of Socket - if buffer = @socket.__send__(recv_method, 8) + def receive_frame + if buffer = @socket.read(8) raise EOFError if buffer.length == 0 size, type = buffer.unpack('l>l>') size -= 4 # we want the size of the data part and type already took up 4 bytes - data = @socket.recv(size) + data = @socket.read(size) frame_class = frame_class_for_type(type) return frame_class.new(data, self) end end - def receive_frame_nonblock - receive_frame(:recv_nonblock) - end - - FRAME_CLASSES = [Response, Error, Message] def frame_class_for_type(type) raise "Bad frame type specified: #{type}" if type > FRAME_CLASSES.length - 1 @@ -251,8 +246,15 @@ def start_read_write_loop def stop_read_write_loop - @read_write_loop_thread.kill if @read_write_loop_thread - @read_write_loop_thread = nil + # if the loop has died because of a connection error, the thread is + # already stopped, otherwise we want to terminate the producer connection + # and a custom-made message is sent signaling to the loop to stop + # gracefully + if @read_write_loop_thread + @write_queue.push(message: :stop_loop) if @read_write_loop_thread.alive? + @read_write_loop_thread.join + @read_write_loop_thread = nil + end end def read_write_loop @@ -260,16 +262,8 @@ def read_write_loop begin ready, _, _ = IO.select([@socket, @write_queue]) - if ready.include?(@write_queue) - data = @write_queue.pop - if !data.nil? - @transactions.push(data[:result]) - write_to_socket(data[:message]) - end - end - - frame = receive_frame_nonblock - if !frame.nil? + if ready.include?(@socket) + frame = receive_frame result = @transactions.pop if frame.is_a?(Response) # If the producer is expecting a result @@ -294,6 +288,13 @@ def read_write_loop raise 'No data from socket' end end + + if ready.include?(@write_queue) + data = @write_queue.pop + return if data[:message] == :stop_loop + @transactions.push(data[:result]) + write_to_socket(data[:message]) + end rescue IO::WaitReadable retry end @@ -321,7 +322,7 @@ def monitor_connection warn "Died from: #{cause_of_death}" debug 'Reconnecting...' - reconnect + reconnect(cause_of_death) debug 'Reconnected!' # clear all death messages, since we're now reconnected. @@ -333,13 +334,25 @@ def monitor_connection # close the connection if it's not already closed and try to reconnect # over and over until we succeed! - def reconnect + def reconnect(cause_of_death) close_connection with_retries do + # If a synchronous producer received messages during the reconnection + # period those messages must fail if they expect an acknowledgement + # Between each reconnection attempt, we ensure the `connection.write` + # are not blocked by returning the exception which lead to the initial + # disconnection. + push_error_pending_writes cause_of_death if @synchronous open_connection end end + def push_error_pending_writes cause_of_death + while !@write_queue.empty? + data = @write_queue.pop + data[:result].push(cause_of_death) if data[:result] + end + end def open_connection @socket = TCPSocket.new(@host, @port) diff --git a/lib/nsq/producer.rb b/lib/nsq/producer.rb index fa3e471..dfaaa37 100644 --- a/lib/nsq/producer.rb +++ b/lib/nsq/producer.rb @@ -7,6 +7,7 @@ class Producer < ClientBase def initialize(opts = {}) @connections = {} @topic = opts[:topic] + @synchronous = opts[:synchronous] || false @discovery_interval = opts[:discovery_interval] || 60 @ssl_context = opts[:ssl_context] @tls_options = opts[:tls_options] @@ -22,10 +23,10 @@ def initialize(opts = {}) elsif opts[:nsqd] nsqds = [opts[:nsqd]].flatten - nsqds.each{|d| add_connection(d)} + nsqds.each{|d| add_connection(d, {synchronous: @synchronous})} else - add_connection('127.0.0.1:4150') + add_connection('127.0.0.1:4150', {synchronous: @synchronous}) end end diff --git a/lib/nsq/selectable_queue.rb b/lib/nsq/selectable_queue.rb index 21d7e19..a63ac18 100644 --- a/lib/nsq/selectable_queue.rb +++ b/lib/nsq/selectable_queue.rb @@ -26,6 +26,10 @@ def initialize(size = 0) @read_io, @write_io = IO.pipe end + def empty? + @queue.empty? + end + def push(o) @queue.push(o) # It doesn't matter what we write into the pipe, as long as it's one byte. From f371fea9c369ded8539129e0647d525364e7d2bb Mon Sep 17 00:00:00 2001 From: Soulou Date: Fri, 18 Jan 2019 15:11:08 +0100 Subject: [PATCH 04/15] Add specs for synchronous producer (ensure exception is thrown) --- lib/nsq/connection.rb | 1 - spec/lib/nsq/producer_synchronous_spec.rb | 26 +++++++++++++++++++++++ 2 files changed, 26 insertions(+), 1 deletion(-) create mode 100644 spec/lib/nsq/producer_synchronous_spec.rb diff --git a/lib/nsq/connection.rb b/lib/nsq/connection.rb index a89fe9a..01cc9b2 100644 --- a/lib/nsq/connection.rb +++ b/lib/nsq/connection.rb @@ -213,7 +213,6 @@ def handle_response(frame) def receive_frame if buffer = @socket.read(8) - raise EOFError if buffer.length == 0 size, type = buffer.unpack('l>l>') size -= 4 # we want the size of the data part and type already took up 4 bytes data = @socket.read(size) diff --git a/spec/lib/nsq/producer_synchronous_spec.rb b/spec/lib/nsq/producer_synchronous_spec.rb new file mode 100644 index 0000000..1a0106d --- /dev/null +++ b/spec/lib/nsq/producer_synchronous_spec.rb @@ -0,0 +1,26 @@ +require_relative '../../spec_helper' + +describe Nsq::Producer do + context 'with a synchronous producer' do + before do + @cluster = NsqCluster.new(nsqd_count: 1) + @nsqd = @cluster.nsqd.first + @producer = new_producer(@nsqd, synchronous: true) + end + + after do + @producer.terminate if @producer + @cluster.destroy + end + + describe '#write' do + it 'shouldn\'t raise an error when nsqd is down' do + @nsqd.stop + + expect{ + @producer.write('fail') + }.to raise_error(RuntimeError, "No data from socket") + end + end + end +end From 9df8d0bce6bdc05c2a331a09b8f20e6fc9811737 Mon Sep 17 00:00:00 2001 From: Soulou Date: Sun, 27 Jan 2019 21:42:30 +0100 Subject: [PATCH 05/15] Add default retry behavior when writing to socket, only impact synchronous producers, asynchronous never return exception --- lib/nsq/producer.rb | 32 ++++++++++++++++++----- spec/lib/nsq/producer_synchronous_spec.rb | 27 +++++++++++++++++-- 2 files changed, 51 insertions(+), 8 deletions(-) diff --git a/lib/nsq/producer.rb b/lib/nsq/producer.rb index dfaaa37..a3bcf42 100644 --- a/lib/nsq/producer.rb +++ b/lib/nsq/producer.rb @@ -12,6 +12,7 @@ def initialize(opts = {}) @ssl_context = opts[:ssl_context] @tls_options = opts[:tls_options] @tls_v1 = opts[:tls_v1] + @retry_attempts = opts[:retry_attempts] || 5 nsqlookupds = [] if opts[:nsqlookupd] @@ -57,13 +58,32 @@ def write_to_topic(topic, *raw_messages) # stringify the messages messages = raw_messages.map(&:to_s) - # get a suitable connection to write to - connection = connection_for_write + with_retries @retry_attempts do + # get a suitable connection to write to + connection = connection_for_write - if messages.length > 1 - connection.mpub(topic, messages) - else - connection.pub(topic, messages.first) + if messages.length > 1 + connection.mpub(topic, messages) + else + connection.pub(topic, messages.first) + end + end + end + + def with_retries(attempts) + wait = 1.0 + count = 0 + begin + yield + rescue => e + if count < attempts + error "exception when publishing message, retrying in #{wait} seconds" + sleep(wait) + wait = wait * 2 + count += 1 + retry + end + raise e end end diff --git a/spec/lib/nsq/producer_synchronous_spec.rb b/spec/lib/nsq/producer_synchronous_spec.rb index 1a0106d..c92cb3b 100644 --- a/spec/lib/nsq/producer_synchronous_spec.rb +++ b/spec/lib/nsq/producer_synchronous_spec.rb @@ -1,11 +1,11 @@ require_relative '../../spec_helper' describe Nsq::Producer do - context 'with a synchronous producer' do + context 'with a synchronous producer without retry' do before do @cluster = NsqCluster.new(nsqd_count: 1) @nsqd = @cluster.nsqd.first - @producer = new_producer(@nsqd, synchronous: true) + @producer = new_producer(@nsqd, synchronous: true, retry_attempts: 0) end after do @@ -23,4 +23,27 @@ end end end + + context 'with a synchronous producer with retries (default behavior)' do + before do + @cluster = NsqCluster.new(nsqd_count: 1) + @nsqd = @cluster.nsqd.first + @producer = new_producer(@nsqd, synchronous: true) + end + + after do + @producer.terminate if @producer + @cluster.destroy + end + + describe '#write' do + it 'shouldn\'t raise an error when nsqd is down' do + @nsqd.stop + + Thread.new { sleep 3 ; @nsqd.start } + + expect{ @producer.write('fail') }.not_to raise_error + end + end + end end From fe8f9e997f708ec1b05b500de45d2c2969c044f7 Mon Sep 17 00:00:00 2001 From: Soulou Date: Sun, 27 Jan 2019 23:59:39 +0100 Subject: [PATCH 06/15] synchronous producer: Add message acknowledgement timeout, default to 5 second --- lib/nsq/connection.rb | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/lib/nsq/connection.rb b/lib/nsq/connection.rb index 01cc9b2..9e014c7 100644 --- a/lib/nsq/connection.rb +++ b/lib/nsq/connection.rb @@ -29,6 +29,7 @@ def initialize(opts = {}) @host = opts[:host] || (raise ArgumentError, 'host is required') @port = opts[:port] || (raise ArgumentError, 'port is required') @synchronous = opts[:synchronous] || false + @ok_timeout = opts[:ok_timeout] || 5 @queue = opts[:queue] @topic = opts[:topic] @channel = opts[:channel] @@ -160,7 +161,9 @@ def write(raw) result: result, }) if result - value = result.pop + value = Timeout::timeout @ok_timeout do + result.pop + end raise value if value.is_a?(Exception) end end From 0fa9ccb16ec161941b6131e600d8581879be24dd Mon Sep 17 00:00:00 2001 From: Soulou Date: Mon, 28 Jan 2019 00:33:41 +0100 Subject: [PATCH 07/15] Better logging when there is an exception writing a message --- lib/nsq/producer.rb | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/nsq/producer.rb b/lib/nsq/producer.rb index a3bcf42..ef4dba1 100644 --- a/lib/nsq/producer.rb +++ b/lib/nsq/producer.rb @@ -75,15 +75,15 @@ def with_retries(attempts) count = 0 begin yield - rescue => e + rescue => ex if count < attempts - error "exception when publishing message, retrying in #{wait} seconds" + error "exception when publishing message: #{ex}, retrying in #{wait} seconds…" sleep(wait) wait = wait * 2 count += 1 retry end - raise e + raise ex end end From 198ddd58267ded4ae1ac09d14304448eb9f5a253 Mon Sep 17 00:00:00 2001 From: Soulou Date: Mon, 28 Jan 2019 00:38:32 +0100 Subject: [PATCH 08/15] Improve implementation of producer, timeout/retry This commit has been inspired greatly by what is done in the official golang driver. To follow the recommandations of the NSQ team, the producer is not able to get initialized with nsqlookupd URL, a producer is connecting to only one NSQd instance, not several. - The producer is keeping a list of transactions to wait for the return value of NSQd. - A new class `NsqdsProducer` can be initialized with multiple NSQds addresses and will apply a strategy to write messages: - `Nsq::NsqdsProducer::STRATEGY_FAILOVER` Always send on the same nsqd except if not available, then get to the next one - `Nsq::NsqdsProducer::STRATEGY_ROUND_ROBIN` Apply round robin over the different available nsqds --- .ruby-version | 2 +- lib/nsq.rb | 1 + lib/nsq/connection.rb | 90 +++------------ lib/nsq/exceptions.rb | 18 ++- lib/nsq/nsqds_producer.rb | 75 ++++++++++++ lib/nsq/producer.rb | 133 +++++++++++++--------- lib/nsq/retry.rb | 42 +++++++ spec/lib/nsq/nsqds_producer_spec.rb | 89 +++++++++++++++ spec/lib/nsq/producer_spec.rb | 61 ---------- spec/lib/nsq/producer_synchronous_spec.rb | 4 +- spec/spec_helper.rb | 8 ++ 11 files changed, 328 insertions(+), 195 deletions(-) create mode 100644 lib/nsq/nsqds_producer.rb create mode 100644 lib/nsq/retry.rb create mode 100644 spec/lib/nsq/nsqds_producer_spec.rb diff --git a/.ruby-version b/.ruby-version index bb576db..cc6c9a4 100644 --- a/.ruby-version +++ b/.ruby-version @@ -1 +1 @@ -2.3 +2.3.5 diff --git a/lib/nsq.rb b/lib/nsq.rb index c94b2a9..50523dd 100644 --- a/lib/nsq.rb +++ b/lib/nsq.rb @@ -11,3 +11,4 @@ require_relative 'nsq/consumer' require_relative 'nsq/producer' +require_relative 'nsq/nsqds_producer' diff --git a/lib/nsq/connection.rb b/lib/nsq/connection.rb index 9e014c7..1651028 100644 --- a/lib/nsq/connection.rb +++ b/lib/nsq/connection.rb @@ -3,6 +3,7 @@ require 'openssl' require 'timeout' +require_relative 'retry' require_relative 'exceptions' require_relative 'frames/error' require_relative 'frames/message' @@ -28,8 +29,7 @@ class Connection def initialize(opts = {}) @host = opts[:host] || (raise ArgumentError, 'host is required') @port = opts[:port] || (raise ArgumentError, 'port is required') - @synchronous = opts[:synchronous] || false - @ok_timeout = opts[:ok_timeout] || 5 + @response_queue = opts[:response_queue] @queue = opts[:queue] @topic = opts[:topic] @channel = opts[:channel] @@ -61,7 +61,6 @@ def initialize(opts = {}) # for outgoing communication @write_queue = SelectableQueue.new(10000) - @transactions = [] # For indicating that the connection has died. # We use a Queue so we don't have to poll. Used to communicate across @@ -155,17 +154,7 @@ def nop def write(raw) - result = @synchronous && (raw =~ /^M?PUB/) ? SizedQueue.new(1) : nil - @write_queue.push({ - message: raw, - result: result, - }) - if result - value = Timeout::timeout @ok_timeout do - result.pop - end - raise value if value.is_a?(Exception) - end + @write_queue.push(message: raw) end def write_to_socket(raw) @@ -208,12 +197,21 @@ def handle_response(frame) debug 'Received heartbeat' nop elsif frame.data == RESPONSE_OK + @response_queue.push(frame) if @response_queue debug 'Received OK' else die "Received response we don't know how to handle: #{frame.data}" end end + def handle_error(frame) + if @response_queue + @response_queue.push(frame) + else + error "Error received: #{frame.data}" + end + end + def receive_frame if buffer = @socket.read(8) size, type = buffer.unpack('l>l>') @@ -266,20 +264,11 @@ def read_write_loop if ready.include?(@socket) frame = receive_frame - result = @transactions.pop if frame.is_a?(Response) - # If the producer is expecting a result - # signal everything went fine pushing any value - result.push(nil) if result handle_response(frame) elsif frame.is_a?(Error) - if result - result.push(ErrorFrameException.new(frame.data)) - else - error "Error received: #{frame.data}" - end + handle_error(frame) elsif frame.is_a?(Message) - result.push(nil) if result debug "<<< #{frame.body}" if @max_attempts && frame.attempts > @max_attempts fin(frame.id) @@ -287,14 +276,13 @@ def read_write_loop @queue.push(frame) if @queue end else - raise 'No data from socket' + die(UnexpectedFrameError.new(frame)) end end if ready.include?(@write_queue) data = @write_queue.pop return if data[:message] == :stop_loop - @transactions.push(data[:result]) write_to_socket(data[:message]) end rescue IO::WaitReadable @@ -338,13 +326,13 @@ def monitor_connection # over and over until we succeed! def reconnect(cause_of_death) close_connection - with_retries do + Nsq::with_retries do # If a synchronous producer received messages during the reconnection # period those messages must fail if they expect an acknowledgement # Between each reconnection attempt, we ensure the `connection.write` # are not blocked by returning the exception which lead to the initial # disconnection. - push_error_pending_writes cause_of_death if @synchronous + push_error_pending_writes cause_of_death if @response_queue open_connection end end @@ -352,7 +340,7 @@ def reconnect(cause_of_death) def push_error_pending_writes cause_of_death while !@write_queue.empty? data = @write_queue.pop - data[:result].push(cause_of_death) if data[:result] + @response_queue.push(cause_of_death) if @response_queue end end @@ -415,50 +403,6 @@ def openssl_context context end - - # Retry the supplied block with exponential backoff. - # - # Borrowed liberally from: - # https://github.com/ooyala/retries/blob/master/lib/retries.rb - def with_retries(&block) - base_sleep_seconds = 0.5 - max_sleep_seconds = 300 # 5 minutes - - # Let's do this thing - attempts = 0 - - begin - attempts += 1 - return block.call(attempts) - - rescue Errno::ECONNREFUSED, Errno::ECONNRESET, Errno::EHOSTUNREACH, - Errno::ENETDOWN, Errno::ENETUNREACH, Errno::ETIMEDOUT, Timeout::Error => ex - - raise ex if attempts >= 100 - - # The sleep time is an exponentially-increasing function of base_sleep_seconds. - # But, it never exceeds max_sleep_seconds. - sleep_seconds = [base_sleep_seconds * (2 ** (attempts - 1)), max_sleep_seconds].min - # Randomize to a random value in the range sleep_seconds/2 .. sleep_seconds - sleep_seconds = sleep_seconds * (0.5 * (1 + rand())) - # But never sleep less than base_sleep_seconds - sleep_seconds = [base_sleep_seconds, sleep_seconds].max - - warn "Failed to connect: #{ex}. Retrying in #{sleep_seconds.round(1)} seconds." - - snooze(sleep_seconds) - - retry - end - end - - - # Se we can stub for testing and reconnect in a tight loop - def snooze(t) - sleep(t) - end - - def server_needs_rdy_re_ups? # versions less than 0.3.0 need RDY re-ups # see: https://github.com/bitly/nsq/blob/master/ChangeLog.md#030---2014-11-18 diff --git a/lib/nsq/exceptions.rb b/lib/nsq/exceptions.rb index f659c14..2fd15ba 100644 --- a/lib/nsq/exceptions.rb +++ b/lib/nsq/exceptions.rb @@ -1,7 +1,19 @@ module Nsq # Raised when nsqlookupd discovery fails - class DiscoveryException < Exception; end + class DiscoveryException < StandardError; end - class ErrorFrameException < Exception; end -end + class ErrorFrameException < StandardError; end + + class UnexpectedFrameError < StandardError + def initialize(frame) + @frame = frame + end + def message + if @frame + return "unexpected frame value #{frame}" + end + return 'empty frame from socket' + end + end +end diff --git a/lib/nsq/nsqds_producer.rb b/lib/nsq/nsqds_producer.rb new file mode 100644 index 0000000..180e958 --- /dev/null +++ b/lib/nsq/nsqds_producer.rb @@ -0,0 +1,75 @@ +module Nsq + class NsqdsProducer + include Nsq::AttributeLogger + + STRATEGY_FAILOVER = :failover + STRATEGY_ROUNDROBIN = :round_robin + STRATEGIES = [STRATEGY_FAILOVER, STRATEGY_ROUNDROBIN].freeze + + attr_reader :topic + + def initialize(opts = {}) + @nsqds = opts.delete(:nsqds) + @strategy = opts.delete(:strategy) || STRATEGY_FAILOVER + @attempts = opts.delete(:strategy_attempts) + @topic = opts[:topic] + + if !STRATEGIES.include?(@strategy) + raise ArgumentError, "strategy should be one of #{STRATEGIES.join(", ")}" + end + + if @nsqds && !@nsqds.is_a?(Array) + raise ArgumentError, "nsqds should be an array of hosts 'host:port'" + elsif !@nsqds + @nsqds = ['127.0.0.1:4150'] + end + + @index = 0 + @producers = @nsqds.map do |nsqd| + Producer.new(opts.merge(nsqd: nsqd)) + end + end + + def terminate + @producers.each do |producer| + producer.terminate + end + end + + def write(*raw_messages) + each_provider(:write, raw_messages) + end + + def deferred_write(delay, *raw_messages) + each_provider(:deferred_write, delay, raw_messages) + end + + def deferred_write_to_topic(topic, delay, *raw_messages) + each_provider(:deferred_write_to_topic, topic, delay, raw_messages) + end + + def write_to_topic(topic, *raw_messages) + each_provider(:write_to_topic, topic, raw_messages) + end + + protected + + def each_provider(action, *args) + attempt = 0 + begin + @producers[@index].send(action, *args) + inc_index if @strategy == STRATEGY_ROUNDROBIN + rescue => ex + error producer: @producers[@index].nsqd, msg: "fail to #{action} message: #{ex.message}", exception: ex.class + inc_index + attempt += 1 + raise ex if attempt == @attempts + retry + end + end + + def inc_index + @index = (@index + 1 ) % @producers.length + end + end +end diff --git a/lib/nsq/producer.rb b/lib/nsq/producer.rb index ef4dba1..cd1fcda 100644 --- a/lib/nsq/producer.rb +++ b/lib/nsq/producer.rb @@ -1,34 +1,42 @@ +require_relative 'exceptions' +require_relative 'selectable_queue' +require_relative 'retry' require_relative 'client_base' module Nsq class Producer < ClientBase - attr_reader :topic + attr_reader :topic, :nsqd def initialize(opts = {}) @connections = {} + @nsqd = opts[:nsqd] @topic = opts[:topic] @synchronous = opts[:synchronous] || false @discovery_interval = opts[:discovery_interval] || 60 @ssl_context = opts[:ssl_context] @tls_options = opts[:tls_options] @tls_v1 = opts[:tls_v1] - @retry_attempts = opts[:retry_attempts] || 5 + @retry_attempts = opts[:retry_attempts] || 10 - nsqlookupds = [] - if opts[:nsqlookupd] - nsqlookupds = [opts[:nsqlookupd]].flatten - discover_repeatedly( - nsqlookupds: nsqlookupds, - interval: @discovery_interval - ) + @ok_timeout = opts[:ok_timeout] || 5 + @write_queue = SelectableQueue.new(10000) - elsif opts[:nsqd] - nsqds = [opts[:nsqd]].flatten - nsqds.each{|d| add_connection(d, {synchronous: @synchronous})} + @response_queue = SelectableQueue.new(10000) if @synchronous + if @nsqd + raise ArgumentError, "should be a string 'host:port'" if !@nsqd.is_a?(String) + @connection = add_connection(@nsqd, response_queue: @response_queue) else - add_connection('127.0.0.1:4150', {synchronous: @synchronous}) + @connection = add_connection('127.0.0.1:4150', response_queue: @response_queue) end + + @router_thread = Thread.new { start_router() } + end + + def terminate + stop_router + @router_thread.join + super end def write(*raw_messages) @@ -51,6 +59,15 @@ def deferred_write(delay, *raw_messages) deferred_write_to_topic(@topic, delay, *raw_messages) end + def deferred_write_to_topic(topic, delay, *raw_messages) + raise ArgumentError, 'message not provided' if raw_messages.empty? + messages = raw_messages.map(&:to_s) + messages.each do |msg| + msg = {op: :dpub, topic: topic, at: (delay * 1000).to_i, payload: msg} + queue(msg) + end + end + def write_to_topic(topic, *raw_messages) # return error if message(s) not provided raise ArgumentError, 'message not provided' if raw_messages.empty? @@ -58,58 +75,64 @@ def write_to_topic(topic, *raw_messages) # stringify the messages messages = raw_messages.map(&:to_s) - with_retries @retry_attempts do - # get a suitable connection to write to - connection = connection_for_write - - if messages.length > 1 - connection.mpub(topic, messages) - else - connection.pub(topic, messages.first) - end + if messages.length > 1 + msg = { op: :mpub, topic: topic, payload: messages } + else + msg = { op: :pub, topic: topic, payload: messages.first } end + + queue(msg) end - def with_retries(attempts) - wait = 1.0 - count = 0 - begin - yield - rescue => ex - if count < attempts - error "exception when publishing message: #{ex}, retrying in #{wait} seconds…" - sleep(wait) - wait = wait * 2 - count += 1 - retry + private + + def queue(msg) + Nsq::with_retries max_attempts: @retry_attempts do + msg[:result] = SizedQueue.new(1) if @synchronous + @write_queue.push(msg) + if msg[:result] + Timeout::timeout(@ok_timeout) do + value = msg[:result].pop + raise value if value.is_a?(Exception) + end end - raise ex end end - def deferred_write_to_topic(topic, delay, *raw_messages) - raise ArgumentError, 'message not provided' if raw_messages.empty? - messages = raw_messages.map(&:to_s) - connection = connection_for_write - messages.each do |msg| - connection.dpub(topic, (delay * 1000).to_i, msg) + def start_router + transactions = [] + queues = [@write_queue] + queues << @response_queue if @response_queue + loop do + ready, _, _ = IO::select(queues) + if ready.include?(@response_queue) + frame = @response_queue.pop + result = transactions.pop + next if result.nil? + if frame.is_a?(Exception) + result.push(frame) + elsif frame.is_a?(Response) + result.push(nil) + elsif frame.is_a?(Error) + result.push(ErrorFrameException.new(frame.data)) + else + result.push(InvalidFrameException.new(frame.data)) + end + else + data = @write_queue.pop + return if data[:op] == :stop_router + if data[:op] == :dpub + @connection.send(data[:op], data[:topic], data[:at], data[:payload]) + else + @connection.send(data[:op], data[:topic], data[:payload]) + end + transactions.push(data[:result]) + end end end - private - def connection_for_write - # Choose a random Connection that's currently connected - # Or, if there's nothing connected, just take any random one - connections_currently_connected = connections.select{|_,c| c.connected?} - connection = connections_currently_connected.values.sample || connections.values.sample - - # Raise an exception if there's no connection available - unless connection - raise 'No connections available' - end - - connection + def stop_router + @write_queue.push(op: :stop_router) end - end end diff --git a/lib/nsq/retry.rb b/lib/nsq/retry.rb new file mode 100644 index 0000000..7432dee --- /dev/null +++ b/lib/nsq/retry.rb @@ -0,0 +1,42 @@ +require_relative 'exceptions' + +require 'timeout' + +module Nsq + # Retry the supplied block with exponential backoff. + # + # Borrowed liberally from: + # https://github.com/ooyala/retries/blob/master/lib/retries.rb + def self.with_retries(opts = {}, &block) + base_sleep_seconds = 0.5 + max_sleep_seconds = 300 # 5 minutes + + # Let's do this thing + attempts = 0 + max_attempts = opts[:max_attempts] || 100 + + begin + attempts += 1 + return block.call(attempts) + + rescue UnexpectedFrameError, ErrorFrameException, Errno::ECONNREFUSED, Errno::ECONNRESET, Errno::EHOSTUNREACH, + Errno::ENETDOWN, Errno::ENETUNREACH, Errno::ETIMEDOUT, Timeout::Error => ex + + raise ex if attempts >= max_attempts + + # The sleep time is an exponentially-increasing function of base_sleep_seconds. + # But, it never exceeds max_sleep_seconds. + sleep_seconds = [base_sleep_seconds * (2 ** (attempts - 1)), max_sleep_seconds].min + # Randomize to a random value in the range sleep_seconds/2 .. sleep_seconds + sleep_seconds = sleep_seconds * (0.5 * (1 + rand())) + # But never sleep less than base_sleep_seconds + sleep_seconds = [base_sleep_seconds, sleep_seconds].max + + warn "Failed to connect: #{ex}. Retrying in #{sleep_seconds.round(1)} seconds." + + sleep(sleep_seconds) + + retry + end + end +end diff --git a/spec/lib/nsq/nsqds_producer_spec.rb b/spec/lib/nsq/nsqds_producer_spec.rb new file mode 100644 index 0000000..4c9475e --- /dev/null +++ b/spec/lib/nsq/nsqds_producer_spec.rb @@ -0,0 +1,89 @@ +require_relative '../../spec_helper' +require 'json' + +describe Nsq::Producer do + def message_count(nsqd, topic = @producer.topic) + parsed_body = JSON.parse(nsqd.stats.body) + topics_info = (parsed_body['data'] || parsed_body)['topics'] + topic_info = topics_info.select{|t| t['topic_name'] == topic }.first + if topic_info + topic_info['message_count'] + else + 0 + end + end + + context 'connecting directly to a single nsqd' do + + def new_consumer(topic = TOPIC) + Nsq::Consumer.new( + topic: topic, + channel: CHANNEL, + nsqd: "#{@nsqd.host}:#{@nsqd.tcp_port}", + max_in_flight: 1 + ) + end + + before do + @cluster = NsqCluster.new(nsqd_count: 2) + @producer = new_nsqds_producer(@cluster.nsqd, synchronous: true, retry_attempts: 1, ok_timeout: 1) + end + + after do + @producer.terminate if @producer + @cluster.destroy + end + + describe '::new' do + it 'should throw an exception if one of the nsqds is down' do + @cluster.nsqd.first.stop + + expect{ + new_nsqds_producer(@cluster.nsqd) + }.to raise_error(Errno::ECONNREFUSED) + end + + it 'should throw an exception if the strategy is unkown' do + expect{ + new_nsqds_producer(@cluster.nsqd, strategy: :none) + }.to raise_error(ArgumentError, "strategy should be one of failover, round_robin") + end + end + + context 'failover strategy' do + describe '#write' do + it 'should send a message to the first nsqd' do + @producer.write 'first' + wait_for{message_count(@cluster.nsqd.first)==1} + expect(message_count(@cluster.nsqd.first)).to eq(1) + end + + it 'should send a message to the second nsqd if the first is down' do + @cluster.nsqd[0].stop + @producer.write 'first' + wait_for{message_count(@cluster.nsqd[1])==1} + expect(message_count(@cluster.nsqd[1])).to eq(1) + end + + it 'should send a message to the first nsqd again if the second is down' do + @cluster.nsqd[0].stop + sleep(5) + @producer.write 'first' + wait_for{message_count(@cluster.nsqd[1])==1} + expect(message_count(@cluster.nsqd[1])).to eq(1) + @producer.write 'second' + wait_for{message_count(@cluster.nsqd[1])==2} + expect(message_count(@cluster.nsqd[1])).to eq(2) + + @cluster.nsqd[0].start + @cluster.nsqd[1].stop + + sleep(5) + @producer.write 'third' + wait_for{message_count(@cluster.nsqd[0])==1} + expect(message_count(@cluster.nsqd[0])).to eq(1) + end + end + end + end +end diff --git a/spec/lib/nsq/producer_spec.rb b/spec/lib/nsq/producer_spec.rb index 8924f4c..6ac1102 100644 --- a/spec/lib/nsq/producer_spec.rb +++ b/spec/lib/nsq/producer_spec.rb @@ -220,66 +220,5 @@ def new_consumer(topic = TOPIC) expect(message_count('hello')).to eq(10) end end - end - - context 'connecting via nsqlookupd' do - - before do - @cluster = NsqCluster.new(nsqd_count: 2, nsqlookupd_count: 1) - @producer = new_lookupd_producer - - # wait for it to connect to all nsqds - wait_for{ @producer.connections.length == @cluster.nsqd.length } - end - - after do - @producer.terminate if @producer - @cluster.destroy - end - - - describe '#connections' do - it 'should be connected to all nsqds' do - expect(@producer.connections.length).to eq(@cluster.nsqd.length) - end - - it 'should drop a connection when an nsqd goes offline' do - @cluster.nsqd.first.stop - wait_for{ @producer.connections.length == @cluster.nsqd.length - 1 } - expect(@producer.connections.length).to eq(@cluster.nsqd.length - 1) - end - end - - - describe '#connected?' do - it 'should return true if it\'s connected to at least one nsqd' do - expect(@producer.connected?).to eq(true) - end - - it 'should return false when it\'s not connected to any nsqds' do - @cluster.nsqd.each{|nsqd| nsqd.stop} - wait_for{ !@producer.connected? } - expect(@producer.connected?).to eq(false) - end - end - - - describe '#write' do - it 'writes to a random connection' do - expect_any_instance_of(Nsq::Connection).to receive(:pub) - @producer.write('howdy!') - end - - it 'raises an error if there are no connections to write to' do - @cluster.nsqd.each{|nsqd| nsqd.stop} - wait_for{ @producer.connections.length == 0 } - expect { - @producer.write('die') - }.to raise_error(RuntimeError, /No connections available/) - end - end - - end - end diff --git a/spec/lib/nsq/producer_synchronous_spec.rb b/spec/lib/nsq/producer_synchronous_spec.rb index c92cb3b..46b8a43 100644 --- a/spec/lib/nsq/producer_synchronous_spec.rb +++ b/spec/lib/nsq/producer_synchronous_spec.rb @@ -14,12 +14,12 @@ end describe '#write' do - it 'shouldn\'t raise an error when nsqd is down' do + it 'should raise an error when nsqd is down' do @nsqd.stop expect{ @producer.write('fail') - }.to raise_error(RuntimeError, "No data from socket") + }.to raise_error(Nsq::UnexpectedFrameError) end end end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 50a17ae..4c76b88 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -71,6 +71,14 @@ def new_producer(nsqd, opts = {}) }.merge(opts)) end +def new_nsqds_producer(nsqds, opts = {}) + Nsq::NsqdsProducer.new({ + topic: TOPIC, + nsqds: nsqds.map{ |n| "#{n.host}:#{n.tcp_port}" }, + discovery_interval: 1 + }.merge(opts)) +end + def new_lookupd_producer(opts = {}) lookupd = @cluster.nsqlookupd.map{|l| "#{l.host}:#{l.http_port}"} Nsq::Producer.new({ From 05b87abba7a1f7cee4da6b72d0a1cc30d676a9d8 Mon Sep 17 00:00:00 2001 From: Soulou Date: Tue, 29 Jan 2019 23:09:35 +0100 Subject: [PATCH 09/15] Add specs for round robin strategy --- spec/lib/nsq/nsqds_producer_spec.rb | 61 +++++++++++++++++++++++++++-- 1 file changed, 57 insertions(+), 4 deletions(-) diff --git a/spec/lib/nsq/nsqds_producer_spec.rb b/spec/lib/nsq/nsqds_producer_spec.rb index 4c9475e..24be5d9 100644 --- a/spec/lib/nsq/nsqds_producer_spec.rb +++ b/spec/lib/nsq/nsqds_producer_spec.rb @@ -26,11 +26,9 @@ def new_consumer(topic = TOPIC) before do @cluster = NsqCluster.new(nsqd_count: 2) - @producer = new_nsqds_producer(@cluster.nsqd, synchronous: true, retry_attempts: 1, ok_timeout: 1) end after do - @producer.terminate if @producer @cluster.destroy end @@ -51,6 +49,13 @@ def new_consumer(topic = TOPIC) end context 'failover strategy' do + before do + @producer = new_nsqds_producer(@cluster.nsqd, synchronous: true, retry_attempts: 1, ok_timeout: 1) + end + after do + @producer.terminate if @producer + end + describe '#write' do it 'should send a message to the first nsqd' do @producer.write 'first' @@ -67,7 +72,7 @@ def new_consumer(topic = TOPIC) it 'should send a message to the first nsqd again if the second is down' do @cluster.nsqd[0].stop - sleep(5) + @producer.write 'first' wait_for{message_count(@cluster.nsqd[1])==1} expect(message_count(@cluster.nsqd[1])).to eq(1) @@ -78,12 +83,60 @@ def new_consumer(topic = TOPIC) @cluster.nsqd[0].start @cluster.nsqd[1].stop - sleep(5) @producer.write 'third' wait_for{message_count(@cluster.nsqd[0])==1} expect(message_count(@cluster.nsqd[0])).to eq(1) end end end + + context 'round robin strategy' do + before do + @producer = new_nsqds_producer(@cluster.nsqd, synchronous: true, retry_attempts: 1, ok_timeout: 1, strategy: Nsq::NsqdsProducer::STRATEGY_ROUNDROBIN) + end + after do + @producer.terminate if @producer + end + + describe '#write' do + it 'should distributes messages among nsqs' do + @producer.write 'first' + @producer.write 'second' + + wait_for{message_count(@cluster.nsqd.first)==1} + expect(message_count(@cluster.nsqd.first)).to eq(1) + + wait_for{message_count(@cluster.nsqd.last)==1} + expect(message_count(@cluster.nsqd.last)).to eq(1) + end + + it 'should send twice to the same if one node is down' do + @cluster.nsqd.first.stop + + @producer.write 'first' + @producer.write 'second' + + wait_for{message_count(@cluster.nsqd.last)==2} + expect(message_count(@cluster.nsqd.last)).to eq(2) + end + + it 'should start the round robin back once the node is back up' do + @cluster.nsqd.first.stop + @producer.write 'first' + @producer.write 'second' + + @cluster.nsqd.first.start + sleep 0.5 + + @producer.write 'three' + @producer.write 'four' + wait_for{message_count(@cluster.nsqd.last)==3} + expect(message_count(@cluster.nsqd.last)).to eq(3) + wait_for{message_count(@cluster.nsqd.first)==1} + expect(message_count(@cluster.nsqd.first)).to eq(1) + + end + end + end end end From b2cb426707f7b918d8960cd1a39f43b7b50c1f3d Mon Sep 17 00:00:00 2001 From: Soulou Date: Tue, 29 Jan 2019 23:26:54 +0100 Subject: [PATCH 10/15] Add SocketError to the rescued exceptions --- lib/nsq/retry.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/nsq/retry.rb b/lib/nsq/retry.rb index 7432dee..f3b16b8 100644 --- a/lib/nsq/retry.rb +++ b/lib/nsq/retry.rb @@ -20,7 +20,7 @@ def self.with_retries(opts = {}, &block) return block.call(attempts) rescue UnexpectedFrameError, ErrorFrameException, Errno::ECONNREFUSED, Errno::ECONNRESET, Errno::EHOSTUNREACH, - Errno::ENETDOWN, Errno::ENETUNREACH, Errno::ETIMEDOUT, Timeout::Error => ex + Errno::ENETDOWN, Errno::ENETUNREACH, Errno::ETIMEDOUT, Timeout::Error, SocketError => ex raise ex if attempts >= max_attempts From 3ac25e2c43f7bfd66081458323c95df1e5866214 Mon Sep 17 00:00:00 2001 From: Soulou Date: Tue, 29 Jan 2019 23:31:47 +0100 Subject: [PATCH 11/15] Reduce default retry/timeouts, we don't want to wait so much when a message fails --- lib/nsq/producer.rb | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/nsq/producer.rb b/lib/nsq/producer.rb index cd1fcda..0619397 100644 --- a/lib/nsq/producer.rb +++ b/lib/nsq/producer.rb @@ -16,9 +16,9 @@ def initialize(opts = {}) @ssl_context = opts[:ssl_context] @tls_options = opts[:tls_options] @tls_v1 = opts[:tls_v1] - @retry_attempts = opts[:retry_attempts] || 10 + @retry_attempts = opts[:retry_attempts] || 3 - @ok_timeout = opts[:ok_timeout] || 5 + @ok_timeout = opts[:ok_timeout] || 3 @write_queue = SelectableQueue.new(10000) @response_queue = SelectableQueue.new(10000) if @synchronous From 51ecfd9ae25ebc8d4714a136d4fc7ed270735419 Mon Sep 17 00:00:00 2001 From: Soulou Date: Wed, 30 Jan 2019 15:54:20 +0100 Subject: [PATCH 12/15] Correctly splat messages in NsqdsProducer --- lib/nsq/nsqds_producer.rb | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/lib/nsq/nsqds_producer.rb b/lib/nsq/nsqds_producer.rb index 180e958..2d88ea9 100644 --- a/lib/nsq/nsqds_producer.rb +++ b/lib/nsq/nsqds_producer.rb @@ -37,19 +37,19 @@ def terminate end def write(*raw_messages) - each_provider(:write, raw_messages) + each_provider(:write, *raw_messages) end def deferred_write(delay, *raw_messages) - each_provider(:deferred_write, delay, raw_messages) + each_provider(:deferred_write, delay, *raw_messages) end def deferred_write_to_topic(topic, delay, *raw_messages) - each_provider(:deferred_write_to_topic, topic, delay, raw_messages) + each_provider(:deferred_write_to_topic, topic, delay, *raw_messages) end def write_to_topic(topic, *raw_messages) - each_provider(:write_to_topic, topic, raw_messages) + each_provider(:write_to_topic, topic, *raw_messages) end protected From 65c627d6209a842faff30e1ad2211a12d4320c2c Mon Sep 17 00:00:00 2001 From: Soulou Date: Thu, 31 Jan 2019 11:32:16 +0100 Subject: [PATCH 13/15] Fix when the read/write loop is started and ensure that a response is received by SUB before sending RDY commands --- lib/nsq/connection.rb | 14 ++++++++------ lib/nsq/exceptions.rb | 2 +- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/lib/nsq/connection.rb b/lib/nsq/connection.rb index 1651028..941559f 100644 --- a/lib/nsq/connection.rb +++ b/lib/nsq/connection.rb @@ -87,11 +87,6 @@ def close end - def sub(topic, channel) - write "SUB #{topic} #{channel}\n" - end - - def rdy(count) write "RDY #{count}\n" end @@ -143,6 +138,10 @@ def re_up_ready private + def sub(topic, channel) + write_to_socket "SUB #{topic} #{channel}\n" + end + def cls write "CLS\n" end @@ -352,15 +351,18 @@ def open_connection identify upgrade_to_ssl_socket if @tls_v1 - start_read_write_loop @connected = true # we need to re-subscribe if there's a topic specified if @topic debug "Subscribing to #{@topic}" sub(@topic, @channel) + frame = receive_frame + raise ErrorFrameException(frame.data) if frame.is_a?(Error) re_up_ready end + + start_read_write_loop end diff --git a/lib/nsq/exceptions.rb b/lib/nsq/exceptions.rb index 2fd15ba..1bd5166 100644 --- a/lib/nsq/exceptions.rb +++ b/lib/nsq/exceptions.rb @@ -11,7 +11,7 @@ def initialize(frame) def message if @frame - return "unexpected frame value #{frame}" + return "unexpected frame value #{frame.data}" end return 'empty frame from socket' end From 2f9a5369ba38e3a3c25d0f583edac2b216a18a54 Mon Sep 17 00:00:00 2001 From: Soulou Date: Thu, 31 Jan 2019 12:14:39 +0100 Subject: [PATCH 14/15] Fix flaky spec --- spec/lib/nsq/producer_synchronous_spec.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spec/lib/nsq/producer_synchronous_spec.rb b/spec/lib/nsq/producer_synchronous_spec.rb index 46b8a43..e06667a 100644 --- a/spec/lib/nsq/producer_synchronous_spec.rb +++ b/spec/lib/nsq/producer_synchronous_spec.rb @@ -40,7 +40,7 @@ it 'shouldn\'t raise an error when nsqd is down' do @nsqd.stop - Thread.new { sleep 3 ; @nsqd.start } + Thread.new { sleep 1 ; @nsqd.start } expect{ @producer.write('fail') }.not_to raise_error end From 591f3bf53d00a4e27fccd0a08bcf9d5177c96668 Mon Sep 17 00:00:00 2001 From: Soulou Date: Thu, 31 Jan 2019 12:31:16 +0100 Subject: [PATCH 15/15] Update readme --- README.md | 69 ++++++++++++++++++++++++++++++++++--------------------- 1 file changed, 43 insertions(+), 26 deletions(-) diff --git a/README.md b/README.md index 50c32ea..1d36142 100644 --- a/README.md +++ b/README.md @@ -65,13 +65,28 @@ consumer.terminate The Nsq::Producer constructor takes the following options: -| Option | Description | Default | -|---------------|----------------------------------------|--------------------| -| `topic` | Topic to which to publish messages | | -| `nsqd` | Host and port of the nsqd instance | '127.0.0.1:4150' | -| `nsqlookupd` | Use lookupd to auto discover nsqds | | -| `tls_v1` | Flag for tls v1 connections | false | -| `tls_options` | Optional keys+certs for TLS connections| | +| Option | Description | Default | +|----------------|----------------------------------------|--------------------| +| `topic` | Topic to which to publish messages | | +| `nsqd` | Host and port of the nsqd instance | '127.0.0.1:4150' | +| `tls_v1` | Flag for tls v1 connections | false | +| `tls_options` | Optional keys+certs for TLS connections| | +| `synchronous` | Wait for acknowledgement on publish | false | + +Following options are only taken into account if the producer is configured as +synchronous: + +| Option | Description | Default | +|------------------|----------------------------------------|--------------------| +| `ok_timeout` | Time to wait acknowledgement (secs) | 3 | +| `retry_attempts` | Number of attempts to retry publishing | 3 | +| | before throwing an exception | | + + +**Note:** By default, producers are asynchronous, we don't wait for nsqd to +acknowledge our writes. As a result, if the connection to nsqd fails, you can +lose messages. This is acceptable for our use cases, mostly because we are +sending messages to a local nsqd instance and failure is very rare. For example, if you'd like to publish messages to a single nsqd. @@ -82,23 +97,15 @@ producer = Nsq::Producer.new( ) ``` -Alternatively, you can use nsqlookupd to find all nsqd nodes in the cluster. -When you instantiate Nsq::Producer in this way, it will automatically maintain -connections to all nsqd instances. When you publish a message, it will be sent -to a random nsqd instance. - -```Ruby -producer = Nsq::Producer.new( - nsqlookupd: ['1.2.3.4:4161', '6.7.8.9:4161'], - topic: 'topic-of-great-esteem' -) -``` +> A producer should is connecting to one single NSQd instance and can't find +> topic through nsqlookupd. This behavior is the one expected by the NSQ maintainers: +> [https://github.com/nsqio/nsq/issues/159](https://github.com/nsqio/nsq/issues/159) If you need to connect using SSL/TLS Authentication via `tls_options` ```Ruby producer = Nsq::Producer.new( - nsqlookupd: ['1.2.3.4:4161', '6.7.8.9:4161'], + nsqd: '6.7.8.9:4150', topic: 'topic-of-great-esteem', tls_v1: true, tls_options: { @@ -114,7 +121,7 @@ If you need to connect using simple `tls_v1` ```Ruby producer = Nsq::Producer.new( - nsqlookupd: ['1.2.3.4:4161', '6.7.8.9:4161'], + nsqd: '6.7.8.9:4150', topic: 'topic-of-great-esteem', tls_v1: true ) @@ -144,12 +151,6 @@ producing messages faster than we're able to send them to nsqd or nsqd is offline for an extended period and you accumulate 10,000 messages in the queue, calls to `#write` will block until there's room in the queue. -**Note:** We don't wait for nsqd to acknowledge our writes. As a result, if the -connection to nsqd fails, you can lose messages. This is acceptable for our use -cases, mostly because we are sending messages to a local nsqd instance and -failure is very rare. - - ### `#write_to_topic` Publishes one or more messages to nsqd. Like `#write`, but allows you to specify @@ -184,7 +185,23 @@ these messages to be lost. After you write your last message, consider sleeping for a second before you call `#terminate`. +### NsqdsProducer + +This producer aims at producing to multiple nsqd instances. Either to distribute +the messages or to behave as a failover, when an nsqd instance is down. + +Its attributes are based on the `Producer` class except that it doesn't have `:nsqd`, +and it has the additional parameters: + +**Note:** if the producer is not synchronous, the failover `:strategy` won't behave +correctly. As `write` would never fail. + +| Option | Description | Default | +|----------------|----------------------------------------|----------------------| +| `strategy` | Can be `:failover` or `:roundrobin` | `:failover` | +| `nsqds` | Array of host and port of the nsqds | `['127.0.0.1:4150']` | +The methods are simillar to the `Producer` class. ## Consumer