From 6205ce3678123717be68e55b140ddcd113de0f89 Mon Sep 17 00:00:00 2001 From: Paul Kuruvilla Date: Fri, 6 Dec 2019 12:28:43 +0530 Subject: [PATCH] Re-use TCP connections across API calls - Rename Analytics::Request -> Analytics::Transport - Rename Transport#post -> Transport#send - Add Tranport#shutdown to close persistent connections - Re-use Transport object in Analytics::Worker --- lib/segment/analytics.rb | 4 +-- .../analytics/{request.rb => transport.rb} | 16 ++++++---- lib/segment/analytics/worker.rb | 7 +++-- .../{request_spec.rb => transport_spec.rb} | 29 ++++++++++--------- spec/segment/analytics/worker_spec.rb | 20 ++++++------- 5 files changed, 42 insertions(+), 34 deletions(-) rename lib/segment/analytics/{request.rb => transport.rb} (91%) rename spec/segment/analytics/{request_spec.rb => transport_spec.rb} (90%) diff --git a/lib/segment/analytics.rb b/lib/segment/analytics.rb index d6f1df8..1e47c62 100644 --- a/lib/segment/analytics.rb +++ b/lib/segment/analytics.rb @@ -4,7 +4,7 @@ require 'segment/analytics/field_parser' require 'segment/analytics/client' require 'segment/analytics/worker' -require 'segment/analytics/request' +require 'segment/analytics/transport' require 'segment/analytics/response' require 'segment/analytics/logging' @@ -18,7 +18,7 @@ class Analytics # @option options [Boolean] :stub (false) If true, requests don't hit the # server and are stubbed to be successful. def initialize(options = {}) - Request.stub = options[:stub] if options.has_key?(:stub) + Transport.stub = options[:stub] if options.has_key?(:stub) @client = Segment::Analytics::Client.new options end diff --git a/lib/segment/analytics/request.rb b/lib/segment/analytics/transport.rb similarity index 91% rename from lib/segment/analytics/request.rb rename to lib/segment/analytics/transport.rb index 344ab75..59697be 100644 --- a/lib/segment/analytics/request.rb +++ b/lib/segment/analytics/transport.rb @@ -9,13 +9,11 @@ module Segment class Analytics - class Request + class Transport include Segment::Analytics::Defaults::Request include Segment::Analytics::Utils include Segment::Analytics::Logging - # public: Creates a new request object to send analytics batch - # def initialize(options = {}) options[:host] ||= HOST options[:port] ||= PORT @@ -34,10 +32,10 @@ def initialize(options = {}) @http = http end - # public: Posts the write key and batch of messages to the API. + # Sends a batch of messages to the API # - # returns - Response of the status and error if it exists - def post(write_key, batch) + # @return [Response] API response + def send(write_key, batch) logger.debug("Sending request for #{batch.length} items") last_response, exception = retry_with_backoff(@retries) do @@ -59,6 +57,11 @@ def post(write_key, batch) end end + # Closes a persistent connection if it exists + def shutdown + @http.finish if @http.started? + end + private def should_retry_request?(status_code, body) @@ -117,6 +120,7 @@ def send_request(write_key, batch) [200, '{}'] else + @http.start unless @http.started? # Maintain a persistent connection response = @http.request(request, payload) [response.code.to_i, response.body] end diff --git a/lib/segment/analytics/worker.rb b/lib/segment/analytics/worker.rb index ced52e9..a313eed 100644 --- a/lib/segment/analytics/worker.rb +++ b/lib/segment/analytics/worker.rb @@ -1,6 +1,6 @@ require 'segment/analytics/defaults' require 'segment/analytics/message_batch' -require 'segment/analytics/request' +require 'segment/analytics/transport' require 'segment/analytics/utils' module Segment @@ -29,6 +29,7 @@ def initialize(queue, write_key, options = {}) batch_size = options[:batch_size] || Defaults::MessageBatch::MAX_SIZE @batch = MessageBatch.new(batch_size) @lock = Mutex.new + @transport = Transport.new end # public: Continuously runs the loop to check for new events @@ -41,11 +42,13 @@ def run consume_message_from_queue! until @batch.full? || @queue.empty? end - res = Request.new.post @write_key, @batch + res = @transport.send @write_key, @batch @on_error.call(res.status, res.error) unless res.status == 200 @lock.synchronize { @batch.clear } end + ensure + @transport.shutdown end # public: Check whether we have outstanding requests. diff --git a/spec/segment/analytics/request_spec.rb b/spec/segment/analytics/transport_spec.rb similarity index 90% rename from spec/segment/analytics/request_spec.rb rename to spec/segment/analytics/transport_spec.rb index b4d0f63..56b551f 100644 --- a/spec/segment/analytics/request_spec.rb +++ b/spec/segment/analytics/transport_spec.rb @@ -2,7 +2,7 @@ module Segment class Analytics - describe Request do + describe Transport do before do # Try and keep debug statements out of tests allow(subject.logger).to receive(:error) @@ -98,7 +98,7 @@ class Analytics end end - describe '#post' do + describe '#send' do let(:response) { Net::HTTPResponse.new(http_version, status_code, response_body) } @@ -110,6 +110,7 @@ class Analytics before do http = subject.instance_variable_get(:@http) + allow(http).to receive(:start) allow(http).to receive(:request) { response } allow(response).to receive(:body) { response_body } end @@ -125,14 +126,14 @@ class Analytics path, default_headers ).and_call_original - subject.post(write_key, batch) + subject.send(write_key, batch) end it 'adds basic auth to the Net::HTTP::Post' do expect_any_instance_of(Net::HTTP::Post).to receive(:basic_auth) .with(write_key, nil) - subject.post(write_key, batch) + subject.send(write_key, batch) end context 'with a stub' do @@ -141,16 +142,16 @@ class Analytics end it 'returns a 200 response' do - expect(subject.post(write_key, batch).status).to eq(200) + expect(subject.send(write_key, batch).status).to eq(200) end it 'has a nil error' do - expect(subject.post(write_key, batch).error).to be_nil + expect(subject.send(write_key, batch).error).to be_nil end it 'logs a debug statement' do expect(subject.logger).to receive(:debug).with(/stubbed request to/) - subject.post(write_key, batch) + subject.send(write_key, batch) end end @@ -171,7 +172,7 @@ class Analytics .exactly(retries - 1).times .with(1) .and_return(nil) - subject.post(write_key, batch) + subject.send(write_key, batch) end end @@ -186,18 +187,18 @@ class Analytics expect(subject) .to receive(:sleep) .never - subject.post(write_key, batch) + subject.send(write_key, batch) end end context 'request is successful' do let(:status_code) { 201 } it 'returns a response code' do - expect(subject.post(write_key, batch).status).to eq(status_code) + expect(subject.send(write_key, batch).status).to eq(status_code) end it 'returns a nil error' do - expect(subject.post(write_key, batch).error).to be_nil + expect(subject.send(write_key, batch).error).to be_nil end end @@ -206,7 +207,7 @@ class Analytics let(:response_body) { { error: error }.to_json } it 'returns the parsed error' do - expect(subject.post(write_key, batch).error).to eq(error) + expect(subject.send(write_key, batch).error).to eq(error) end end @@ -227,11 +228,11 @@ class Analytics subject { described_class.new(retries: 0) } it 'returns a -1 for status' do - expect(subject.post(write_key, batch).status).to eq(-1) + expect(subject.send(write_key, batch).status).to eq(-1) end it 'has a connection error' do - error = subject.post(write_key, batch).error + error = subject.send(write_key, batch).error expect(error).to match(/Malformed JSON/) end diff --git a/spec/segment/analytics/worker_spec.rb b/spec/segment/analytics/worker_spec.rb index ca6dcd0..0ca4d49 100644 --- a/spec/segment/analytics/worker_spec.rb +++ b/spec/segment/analytics/worker_spec.rb @@ -4,7 +4,7 @@ module Segment class Analytics describe Worker do before do - Segment::Analytics::Request.stub = true + Segment::Analytics::Transport.stub = true end describe '#init' do @@ -29,9 +29,9 @@ class Analytics it 'does not error if the request fails' do expect do - Segment::Analytics::Request + Segment::Analytics::Transport .any_instance - .stub(:post) + .stub(:send) .and_return(Segment::Analytics::Response.new(-1, 'Unknown error')) queue = Queue.new @@ -41,14 +41,14 @@ class Analytics expect(queue).to be_empty - Segment::Analytics::Request.any_instance.unstub(:post) + Segment::Analytics::Transport.any_instance.unstub(:send) end.to_not raise_error end it 'executes the error handler if the request is invalid' do - Segment::Analytics::Request + Segment::Analytics::Transport .any_instance - .stub(:post) + .stub(:send) .and_return(Segment::Analytics::Response.new(400, 'Some error')) status = error = nil @@ -67,7 +67,7 @@ class Analytics sleep 0.1 # First give thread time to spin-up. sleep 0.01 while worker.is_requesting? - Segment::Analytics::Request.any_instance.unstub(:post) + Segment::Analytics::Transport.any_instance.unstub(:send) expect(queue).to be_empty expect(status).to eq(400) @@ -124,9 +124,9 @@ def bad_obj.to_json(*_args) end it 'returns true if there is a current batch' do - Segment::Analytics::Request + Segment::Analytics::Transport .any_instance - .stub(:post) { + .stub(:send) { sleep(0.2) Segment::Analytics::Response.new(200, 'Success') } @@ -141,7 +141,7 @@ def bad_obj.to_json(*_args) worker_thread.join expect(worker.is_requesting?).to eq(false) - Segment::Analytics::Request.any_instance.unstub(:post) + Segment::Analytics::Transport.any_instance.unstub(:send) end end end