Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions lib/segment/analytics/backoff_policy.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
require 'segment/analytics/defaults'

module Segment
class Analytics
class BackoffPolicy
include Segment::Analytics::Defaults::BackoffPolicy

# @param [Hash] opts
# @option opts [Numeric] :min_timeout_ms The minimum backoff timeout
# @option opts [Numeric] :max_timeout_ms The maximum backoff timeout
# @option opts [Numeric] :multiplier The value to multiply the current
# interval with for each retry attempt
# @option opts [Numeric] :randomization_factor The randomization factor
# to use to create a range around the retry interval
def initialize(opts = {})
@min_timeout_ms = opts[:min_timeout_ms] || MIN_TIMEOUT_MS
@max_timeout_ms = opts[:max_timeout_ms] || MAX_TIMEOUT_MS
@multiplier = opts[:multiplier] || MULTIPLIER
@randomization_factor = opts[:randomization_factor] || RANDOMIZATION_FACTOR

@attempts = 0
end

# @return [Numeric] the next backoff interval, in milliseconds.
def next_interval
interval = @min_timeout_ms * (@multiplier**@attempts)
interval = add_jitter(interval, @randomization_factor)

@attempts += 1

[interval, @max_timeout_ms].min
end

private

def add_jitter(base, randomization_factor)
random_number = rand
max_deviation = base * randomization_factor
deviation = random_number * max_deviation

if random_number < 0.5
base - deviation
else
base + deviation
end
end
end
end
end
13 changes: 10 additions & 3 deletions lib/segment/analytics/defaults.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,22 @@ module Request
PORT = 443
PATH = '/v1/import'
SSL = true
HEADERS = { :accept => 'application/json' }

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

was this previously unused?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. It was being assigned to a variable, but the actual headers used were hardcoded.

(The diff for this individual commit includes all the relevant files).

RETRIES = 4
BACKOFF = 30.0
HEADERS = { 'Accept' => 'application/json',
'Content-Type' => 'application/json' }
RETRIES = 10
end

module Queue
BATCH_SIZE = 100
MAX_SIZE = 10000
end

module BackoffPolicy
MIN_TIMEOUT_MS = 100
MAX_TIMEOUT_MS = 10000
MULTIPLIER = 1.5
RANDOMIZATION_FACTOR = 0.5
end
end
end
end
23 changes: 11 additions & 12 deletions lib/segment/analytics/request.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
require 'segment/analytics/utils'
require 'segment/analytics/response'
require 'segment/analytics/logging'
require 'segment/analytics/backoff_policy'
require 'net/http'
require 'net/https'
require 'json'
Expand All @@ -19,10 +20,11 @@ def initialize(options = {})
options[:host] ||= HOST
options[:port] ||= PORT
options[:ssl] ||= SSL
options[:headers] ||= HEADERS
@headers = options[:headers] || HEADERS
@path = options[:path] || PATH
@retries = options[:retries] || RETRIES
@backoff = options[:backoff] || BACKOFF
@backoff_policy =
options[:backoff_policy] || Segment::Analytics::BackoffPolicy.new

http = Net::HTTP.new(options[:host], options[:port])
http.use_ssl = options[:ssl]
Expand All @@ -36,7 +38,7 @@ def initialize(options = {})
#
# returns - Response of the status and error if it exists
def post(write_key, batch)
last_response, exception = retry_with_backoff(@retries, @backoff) do
last_response, exception = retry_with_backoff(@retries) do
status_code, body = send_request(write_key, batch)
error = JSON.parse(body)['error']
should_retry = should_retry_request?(status_code, body)
Expand Down Expand Up @@ -71,10 +73,11 @@ def should_retry_request?(status_code, body)
# Takes a block that returns [result, should_retry].
#
# Retries upto `retries_remaining` times, if `should_retry` is false or
# an exception is raised.
# an exception is raised. `@backoff_policy` is used to determine the
# duration to sleep between attempts
#
# Returns [last_result, raised_exception]
def retry_with_backoff(retries_remaining, backoff, &block)
def retry_with_backoff(retries_remaining, &block)
result, caught_exception = nil
should_retry = false

Expand All @@ -87,24 +90,20 @@ def retry_with_backoff(retries_remaining, backoff, &block)
end

if should_retry && (retries_remaining > 1)
sleep(backoff)
retry_with_backoff(retries_remaining - 1, backoff, &block)
sleep(@backoff_policy.next_interval.to_f / 1000)
retry_with_backoff(retries_remaining - 1, &block)
else
[result, caught_exception]
end
end

# Sends a request for the batch, returns [status_code, body]
def send_request(write_key, batch)
headers = {
'Content-Type' => 'application/json',
'accept' => 'application/json'
}
payload = JSON.generate(
:sentAt => datetime_in_iso8601(Time.now),
:batch => batch
)
request = Net::HTTP::Post.new(@path, headers)
request = Net::HTTP::Post.new(@path, @headers)
request.basic_auth(write_key, nil)

if self.class.stub
Expand Down
92 changes: 92 additions & 0 deletions spec/segment/analytics/backoff_policy_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
require 'spec_helper'

module Segment
class Analytics
describe BackoffPolicy do
describe '#initialize' do
context 'no options are given' do
it 'sets default min_timeout_ms' do
actual = subject.instance_variable_get(:@min_timeout_ms)
expect(actual).to eq(described_class::MIN_TIMEOUT_MS)
end

it 'sets default max_timeout_ms' do
actual = subject.instance_variable_get(:@max_timeout_ms)
expect(actual).to eq(described_class::MAX_TIMEOUT_MS)
end

it 'sets default multiplier' do
actual = subject.instance_variable_get(:@multiplier)
expect(actual).to eq(described_class::MULTIPLIER)
end

it 'sets default randomization factor' do
actual = subject.instance_variable_get(:@randomization_factor)
expect(actual).to eq(described_class::RANDOMIZATION_FACTOR)
end
end

context 'options are given' do
let(:min_timeout_ms) { 1234 }
let(:max_timeout_ms) { 5678 }
let(:multiplier) { 24 }
let(:randomization_factor) { 0.4 }

let(:options) do
{
min_timeout_ms: min_timeout_ms,
max_timeout_ms: max_timeout_ms,
multiplier: multiplier,
randomization_factor: randomization_factor
}
end

subject { described_class.new(options) }

it 'sets passed in min_timeout_ms' do
actual = subject.instance_variable_get(:@min_timeout_ms)
expect(actual).to eq(min_timeout_ms)
end

it 'sets passed in max_timeout_ms' do
actual = subject.instance_variable_get(:@max_timeout_ms)
expect(actual).to eq(max_timeout_ms)
end

it 'sets passed in multiplier' do
actual = subject.instance_variable_get(:@multiplier)
expect(actual).to eq(multiplier)
end

it 'sets passed in randomization_factor' do
actual = subject.instance_variable_get(:@randomization_factor)
expect(actual).to eq(randomization_factor)
end
end
end

describe '#next_interval' do
subject {
described_class.new(
min_timeout_ms: 1000,
max_timeout_ms: 10000,
multiplier: 2,
randomization_factor: 0.5
)
}

it 'returns exponentially increasing durations' do
expect(subject.next_interval).to be_within(500).of(1000)
expect(subject.next_interval).to be_within(1000).of(2000)
expect(subject.next_interval).to be_within(2000).of(4000)
expect(subject.next_interval).to be_within(4000).of(8000)
end

it 'caps maximum duration at max_timeout_secs' do
10.times { subject.next_interval }
expect(subject.next_interval).to eq(10000)
end
end
end
end
end
28 changes: 16 additions & 12 deletions spec/segment/analytics/request_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ class Analytics
expect(retries).to eq(described_class::RETRIES)
end

it 'sets a default backoff' do
backoff = subject.instance_variable_get(:@backoff)
expect(backoff).to eq(described_class::BACKOFF)
it 'sets a default backoff policy' do
backoff_policy = subject.instance_variable_get(:@backoff_policy)
expect(backoff_policy).to be_a(Segment::Analytics::BackoffPolicy)
end

it 'initializes a new Net::HTTP with default host and port' do
Expand All @@ -63,14 +63,14 @@ class Analytics
context 'options are given' do
let(:path) { 'my/cool/path' }
let(:retries) { 1234 }
let(:backoff) { 10 }
let(:backoff_policy) { FakeBackoffPolicy.new([1, 2, 3]) }
let(:host) { 'http://www.example.com' }
let(:port) { 8080 }
let(:options) do
{
path: path,
retries: retries,
backoff: backoff,
backoff_policy: backoff_policy,
host: host,
port: port
}
Expand All @@ -86,8 +86,9 @@ class Analytics
expect(subject.instance_variable_get(:@retries)).to eq(retries)
end

it 'sets passed in backoff' do
expect(subject.instance_variable_get(:@backoff)).to eq(backoff)
it 'sets passed in backoff backoff policy' do
expect(subject.instance_variable_get(:@backoff_policy))
.to eq(backoff_policy)
end

it 'initializes a new Net::HTTP with passed in host and port' do
Expand Down Expand Up @@ -117,7 +118,7 @@ class Analytics
path = subject.instance_variable_get(:@path)
default_headers = {
'Content-Type' => 'application/json',
'accept' => 'application/json'
'Accept' => 'application/json'
}
expect(Net::HTTP::Post).to receive(:new).with(
path, default_headers
Expand Down Expand Up @@ -157,14 +158,17 @@ class Analytics
let(:status_code) { status_code }
let(:body) { body }
let(:retries) { 4 }
let(:backoff) { 1 }
subject { described_class.new(retries: retries, backoff: backoff) }
let(:backoff_policy) { FakeBackoffPolicy.new([1000, 1000, 1000]) }
subject {
described_class.new(retries: retries,
backoff_policy: backoff_policy)
}

it 'retries the request' do
expect(subject)
.to receive(:sleep)
.exactly(retries - 1).times
.with(backoff)
.with(1)
.and_return(nil)
subject.post(write_key, batch)
end
Expand Down Expand Up @@ -219,7 +223,7 @@ class Analytics
context 'request or parsing of response results in an exception' do
let(:response_body) { 'Malformed JSON ---' }

subject { described_class.new(backoff: 0) }
subject { described_class.new(retries: 0) }

it 'returns a -1 for status' do
expect(subject.post(write_key, batch).status).to eq(-1)
Expand Down
12 changes: 12 additions & 0 deletions spec/spec_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,18 @@ def run
end
end

# A backoff policy that returns a fixed list of values
class FakeBackoffPolicy
def initialize(interval_values)
@interval_values = interval_values
end

def next_interval
raise 'FakeBackoffPolicy has no values left' if @interval_values.empty?
@interval_values.shift
end
end

# usage:
# it "should return a result of 5" do
# eventually(options: {timeout: 1}) { long_running_thing.result.should eq(5) }
Expand Down