Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions lib/segment/analytics.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
require 'segment/analytics/field_parser'
require 'segment/analytics/client'
require 'segment/analytics/worker'
require 'segment/analytics/request'
require 'segment/analytics/transport'
require 'segment/analytics/response'
require 'segment/analytics/logging'

Expand All @@ -18,7 +18,7 @@ class Analytics
# @option options [Boolean] :stub (false) If true, requests don't hit the
# server and are stubbed to be successful.
def initialize(options = {})
Request.stub = options[:stub] if options.has_key?(:stub)
Transport.stub = options[:stub] if options.has_key?(:stub)
@client = Segment::Analytics::Client.new options
end

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,11 @@

module Segment
class Analytics
class Request
class Transport
include Segment::Analytics::Defaults::Request
include Segment::Analytics::Utils
include Segment::Analytics::Logging

# public: Creates a new request object to send analytics batch
#
def initialize(options = {})
options[:host] ||= HOST
options[:port] ||= PORT
Expand All @@ -34,10 +32,10 @@ def initialize(options = {})
@http = http
end

# public: Posts the write key and batch of messages to the API.
# Sends a batch of messages to the API
#
# returns - Response of the status and error if it exists
def post(write_key, batch)
# @return [Response] API response
def send(write_key, batch)
logger.debug("Sending request for #{batch.length} items")

last_response, exception = retry_with_backoff(@retries) do
Expand All @@ -59,6 +57,11 @@ def post(write_key, batch)
end
end

# Closes a persistent connection if it exists
def shutdown
@http.finish if @http.started?
end

private

def should_retry_request?(status_code, body)
Expand Down Expand Up @@ -117,6 +120,7 @@ def send_request(write_key, batch)

[200, '{}']
else
@http.start unless @http.started? # Maintain a persistent connection
response = @http.request(request, payload)
[response.code.to_i, response.body]
end
Expand Down
7 changes: 5 additions & 2 deletions lib/segment/analytics/worker.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
require 'segment/analytics/defaults'
require 'segment/analytics/message_batch'
require 'segment/analytics/request'
require 'segment/analytics/transport'
require 'segment/analytics/utils'

module Segment
Expand Down Expand Up @@ -29,6 +29,7 @@ def initialize(queue, write_key, options = {})
batch_size = options[:batch_size] || Defaults::MessageBatch::MAX_SIZE
@batch = MessageBatch.new(batch_size)
@lock = Mutex.new
@transport = Transport.new
end

# public: Continuously runs the loop to check for new events
Expand All @@ -41,11 +42,13 @@ def run
consume_message_from_queue! until @batch.full? || @queue.empty?
end

res = Request.new.post @write_key, @batch
res = @transport.send @write_key, @batch
@on_error.call(res.status, res.error) unless res.status == 200

@lock.synchronize { @batch.clear }
end
ensure
@transport.shutdown
end

# public: Check whether we have outstanding requests.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

module Segment
class Analytics
describe Request do
describe Transport do
before do
# Try and keep debug statements out of tests
allow(subject.logger).to receive(:error)
Expand Down Expand Up @@ -98,7 +98,7 @@ class Analytics
end
end

describe '#post' do
describe '#send' do
let(:response) {
Net::HTTPResponse.new(http_version, status_code, response_body)
}
Expand All @@ -110,6 +110,7 @@ class Analytics

before do
http = subject.instance_variable_get(:@http)
allow(http).to receive(:start)
allow(http).to receive(:request) { response }
allow(response).to receive(:body) { response_body }
end
Expand All @@ -125,14 +126,14 @@ class Analytics
path, default_headers
).and_call_original

subject.post(write_key, batch)
subject.send(write_key, batch)
end

it 'adds basic auth to the Net::HTTP::Post' do
expect_any_instance_of(Net::HTTP::Post).to receive(:basic_auth)
.with(write_key, nil)

subject.post(write_key, batch)
subject.send(write_key, batch)
end

context 'with a stub' do
Expand All @@ -141,16 +142,16 @@ class Analytics
end

it 'returns a 200 response' do
expect(subject.post(write_key, batch).status).to eq(200)
expect(subject.send(write_key, batch).status).to eq(200)
end

it 'has a nil error' do
expect(subject.post(write_key, batch).error).to be_nil
expect(subject.send(write_key, batch).error).to be_nil
end

it 'logs a debug statement' do
expect(subject.logger).to receive(:debug).with(/stubbed request to/)
subject.post(write_key, batch)
subject.send(write_key, batch)
end
end

Expand All @@ -171,7 +172,7 @@ class Analytics
.exactly(retries - 1).times
.with(1)
.and_return(nil)
subject.post(write_key, batch)
subject.send(write_key, batch)
end
end

Expand All @@ -186,18 +187,18 @@ class Analytics
expect(subject)
.to receive(:sleep)
.never
subject.post(write_key, batch)
subject.send(write_key, batch)
end
end

context 'request is successful' do
let(:status_code) { 201 }
it 'returns a response code' do
expect(subject.post(write_key, batch).status).to eq(status_code)
expect(subject.send(write_key, batch).status).to eq(status_code)
end

it 'returns a nil error' do
expect(subject.post(write_key, batch).error).to be_nil
expect(subject.send(write_key, batch).error).to be_nil
end
end

Expand All @@ -206,7 +207,7 @@ class Analytics
let(:response_body) { { error: error }.to_json }

it 'returns the parsed error' do
expect(subject.post(write_key, batch).error).to eq(error)
expect(subject.send(write_key, batch).error).to eq(error)
end
end

Expand All @@ -227,11 +228,11 @@ class Analytics
subject { described_class.new(retries: 0) }

it 'returns a -1 for status' do
expect(subject.post(write_key, batch).status).to eq(-1)
expect(subject.send(write_key, batch).status).to eq(-1)
end

it 'has a connection error' do
error = subject.post(write_key, batch).error
error = subject.send(write_key, batch).error
expect(error).to match(/Malformed JSON/)
end

Expand Down
20 changes: 10 additions & 10 deletions spec/segment/analytics/worker_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ module Segment
class Analytics
describe Worker do
before do
Segment::Analytics::Request.stub = true
Segment::Analytics::Transport.stub = true
end

describe '#init' do
Expand All @@ -29,9 +29,9 @@ class Analytics

it 'does not error if the request fails' do
expect do
Segment::Analytics::Request
Segment::Analytics::Transport
.any_instance
.stub(:post)
.stub(:send)
.and_return(Segment::Analytics::Response.new(-1, 'Unknown error'))

queue = Queue.new
Expand All @@ -41,14 +41,14 @@ class Analytics

expect(queue).to be_empty

Segment::Analytics::Request.any_instance.unstub(:post)
Segment::Analytics::Transport.any_instance.unstub(:send)
end.to_not raise_error
end

it 'executes the error handler if the request is invalid' do
Segment::Analytics::Request
Segment::Analytics::Transport
.any_instance
.stub(:post)
.stub(:send)
.and_return(Segment::Analytics::Response.new(400, 'Some error'))

status = error = nil
Expand All @@ -67,7 +67,7 @@ class Analytics
sleep 0.1 # First give thread time to spin-up.
sleep 0.01 while worker.is_requesting?

Segment::Analytics::Request.any_instance.unstub(:post)
Segment::Analytics::Transport.any_instance.unstub(:send)

expect(queue).to be_empty
expect(status).to eq(400)
Expand Down Expand Up @@ -124,9 +124,9 @@ def bad_obj.to_json(*_args)
end

it 'returns true if there is a current batch' do
Segment::Analytics::Request
Segment::Analytics::Transport
.any_instance
.stub(:post) {
.stub(:send) {
sleep(0.2)
Segment::Analytics::Response.new(200, 'Success')
}
Expand All @@ -141,7 +141,7 @@ def bad_obj.to_json(*_args)
worker_thread.join
expect(worker.is_requesting?).to eq(false)

Segment::Analytics::Request.any_instance.unstub(:post)
Segment::Analytics::Transport.any_instance.unstub(:send)
end
end
end
Expand Down