diff --git a/lib/segment/analytics/backoff_policy.rb b/lib/segment/analytics/backoff_policy.rb new file mode 100644 index 0000000..394a2c1 --- /dev/null +++ b/lib/segment/analytics/backoff_policy.rb @@ -0,0 +1,49 @@ +require 'segment/analytics/defaults' + +module Segment + class Analytics + class BackoffPolicy + include Segment::Analytics::Defaults::BackoffPolicy + + # @param [Hash] opts + # @option opts [Numeric] :min_timeout_ms The minimum backoff timeout + # @option opts [Numeric] :max_timeout_ms The maximum backoff timeout + # @option opts [Numeric] :multiplier The value to multiply the current + # interval with for each retry attempt + # @option opts [Numeric] :randomization_factor The randomization factor + # to use to create a range around the retry interval + def initialize(opts = {}) + @min_timeout_ms = opts[:min_timeout_ms] || MIN_TIMEOUT_MS + @max_timeout_ms = opts[:max_timeout_ms] || MAX_TIMEOUT_MS + @multiplier = opts[:multiplier] || MULTIPLIER + @randomization_factor = opts[:randomization_factor] || RANDOMIZATION_FACTOR + + @attempts = 0 + end + + # @return [Numeric] the next backoff interval, in milliseconds. + def next_interval + interval = @min_timeout_ms * (@multiplier**@attempts) + interval = add_jitter(interval, @randomization_factor) + + @attempts += 1 + + [interval, @max_timeout_ms].min + end + + private + + def add_jitter(base, randomization_factor) + random_number = rand + max_deviation = base * randomization_factor + deviation = random_number * max_deviation + + if random_number < 0.5 + base - deviation + else + base + deviation + end + end + end + end +end diff --git a/lib/segment/analytics/defaults.rb b/lib/segment/analytics/defaults.rb index aeb933e..d641420 100644 --- a/lib/segment/analytics/defaults.rb +++ b/lib/segment/analytics/defaults.rb @@ -6,15 +6,22 @@ module Request PORT = 443 PATH = '/v1/import' SSL = true - HEADERS = { :accept => 'application/json' } - RETRIES = 4 - BACKOFF = 30.0 + HEADERS = { 'Accept' => 'application/json', + 'Content-Type' => 'application/json' } + RETRIES = 10 end module Queue BATCH_SIZE = 100 MAX_SIZE = 10000 end + + module BackoffPolicy + MIN_TIMEOUT_MS = 100 + MAX_TIMEOUT_MS = 10000 + MULTIPLIER = 1.5 + RANDOMIZATION_FACTOR = 0.5 + end end end end diff --git a/lib/segment/analytics/request.rb b/lib/segment/analytics/request.rb index e2d8073..026950a 100644 --- a/lib/segment/analytics/request.rb +++ b/lib/segment/analytics/request.rb @@ -2,6 +2,7 @@ require 'segment/analytics/utils' require 'segment/analytics/response' require 'segment/analytics/logging' +require 'segment/analytics/backoff_policy' require 'net/http' require 'net/https' require 'json' @@ -19,10 +20,11 @@ def initialize(options = {}) options[:host] ||= HOST options[:port] ||= PORT options[:ssl] ||= SSL - options[:headers] ||= HEADERS + @headers = options[:headers] || HEADERS @path = options[:path] || PATH @retries = options[:retries] || RETRIES - @backoff = options[:backoff] || BACKOFF + @backoff_policy = + options[:backoff_policy] || Segment::Analytics::BackoffPolicy.new http = Net::HTTP.new(options[:host], options[:port]) http.use_ssl = options[:ssl] @@ -36,7 +38,7 @@ def initialize(options = {}) # # returns - Response of the status and error if it exists def post(write_key, batch) - last_response, exception = retry_with_backoff(@retries, @backoff) do + last_response, exception = retry_with_backoff(@retries) do status_code, body = send_request(write_key, batch) error = JSON.parse(body)['error'] should_retry = should_retry_request?(status_code, body) @@ -71,10 +73,11 @@ def should_retry_request?(status_code, body) # Takes a block that returns [result, should_retry]. # # Retries upto `retries_remaining` times, if `should_retry` is false or - # an exception is raised. + # an exception is raised. `@backoff_policy` is used to determine the + # duration to sleep between attempts # # Returns [last_result, raised_exception] - def retry_with_backoff(retries_remaining, backoff, &block) + def retry_with_backoff(retries_remaining, &block) result, caught_exception = nil should_retry = false @@ -87,8 +90,8 @@ def retry_with_backoff(retries_remaining, backoff, &block) end if should_retry && (retries_remaining > 1) - sleep(backoff) - retry_with_backoff(retries_remaining - 1, backoff, &block) + sleep(@backoff_policy.next_interval.to_f / 1000) + retry_with_backoff(retries_remaining - 1, &block) else [result, caught_exception] end @@ -96,15 +99,11 @@ def retry_with_backoff(retries_remaining, backoff, &block) # Sends a request for the batch, returns [status_code, body] def send_request(write_key, batch) - headers = { - 'Content-Type' => 'application/json', - 'accept' => 'application/json' - } payload = JSON.generate( :sentAt => datetime_in_iso8601(Time.now), :batch => batch ) - request = Net::HTTP::Post.new(@path, headers) + request = Net::HTTP::Post.new(@path, @headers) request.basic_auth(write_key, nil) if self.class.stub diff --git a/spec/segment/analytics/backoff_policy_spec.rb b/spec/segment/analytics/backoff_policy_spec.rb new file mode 100644 index 0000000..75dc9fc --- /dev/null +++ b/spec/segment/analytics/backoff_policy_spec.rb @@ -0,0 +1,92 @@ +require 'spec_helper' + +module Segment + class Analytics + describe BackoffPolicy do + describe '#initialize' do + context 'no options are given' do + it 'sets default min_timeout_ms' do + actual = subject.instance_variable_get(:@min_timeout_ms) + expect(actual).to eq(described_class::MIN_TIMEOUT_MS) + end + + it 'sets default max_timeout_ms' do + actual = subject.instance_variable_get(:@max_timeout_ms) + expect(actual).to eq(described_class::MAX_TIMEOUT_MS) + end + + it 'sets default multiplier' do + actual = subject.instance_variable_get(:@multiplier) + expect(actual).to eq(described_class::MULTIPLIER) + end + + it 'sets default randomization factor' do + actual = subject.instance_variable_get(:@randomization_factor) + expect(actual).to eq(described_class::RANDOMIZATION_FACTOR) + end + end + + context 'options are given' do + let(:min_timeout_ms) { 1234 } + let(:max_timeout_ms) { 5678 } + let(:multiplier) { 24 } + let(:randomization_factor) { 0.4 } + + let(:options) do + { + min_timeout_ms: min_timeout_ms, + max_timeout_ms: max_timeout_ms, + multiplier: multiplier, + randomization_factor: randomization_factor + } + end + + subject { described_class.new(options) } + + it 'sets passed in min_timeout_ms' do + actual = subject.instance_variable_get(:@min_timeout_ms) + expect(actual).to eq(min_timeout_ms) + end + + it 'sets passed in max_timeout_ms' do + actual = subject.instance_variable_get(:@max_timeout_ms) + expect(actual).to eq(max_timeout_ms) + end + + it 'sets passed in multiplier' do + actual = subject.instance_variable_get(:@multiplier) + expect(actual).to eq(multiplier) + end + + it 'sets passed in randomization_factor' do + actual = subject.instance_variable_get(:@randomization_factor) + expect(actual).to eq(randomization_factor) + end + end + end + + describe '#next_interval' do + subject { + described_class.new( + min_timeout_ms: 1000, + max_timeout_ms: 10000, + multiplier: 2, + randomization_factor: 0.5 + ) + } + + it 'returns exponentially increasing durations' do + expect(subject.next_interval).to be_within(500).of(1000) + expect(subject.next_interval).to be_within(1000).of(2000) + expect(subject.next_interval).to be_within(2000).of(4000) + expect(subject.next_interval).to be_within(4000).of(8000) + end + + it 'caps maximum duration at max_timeout_secs' do + 10.times { subject.next_interval } + expect(subject.next_interval).to eq(10000) + end + end + end + end +end diff --git a/spec/segment/analytics/request_spec.rb b/spec/segment/analytics/request_spec.rb index d556eb9..babc928 100644 --- a/spec/segment/analytics/request_spec.rb +++ b/spec/segment/analytics/request_spec.rb @@ -46,9 +46,9 @@ class Analytics expect(retries).to eq(described_class::RETRIES) end - it 'sets a default backoff' do - backoff = subject.instance_variable_get(:@backoff) - expect(backoff).to eq(described_class::BACKOFF) + it 'sets a default backoff policy' do + backoff_policy = subject.instance_variable_get(:@backoff_policy) + expect(backoff_policy).to be_a(Segment::Analytics::BackoffPolicy) end it 'initializes a new Net::HTTP with default host and port' do @@ -63,14 +63,14 @@ class Analytics context 'options are given' do let(:path) { 'my/cool/path' } let(:retries) { 1234 } - let(:backoff) { 10 } + let(:backoff_policy) { FakeBackoffPolicy.new([1, 2, 3]) } let(:host) { 'http://www.example.com' } let(:port) { 8080 } let(:options) do { path: path, retries: retries, - backoff: backoff, + backoff_policy: backoff_policy, host: host, port: port } @@ -86,8 +86,9 @@ class Analytics expect(subject.instance_variable_get(:@retries)).to eq(retries) end - it 'sets passed in backoff' do - expect(subject.instance_variable_get(:@backoff)).to eq(backoff) + it 'sets passed in backoff backoff policy' do + expect(subject.instance_variable_get(:@backoff_policy)) + .to eq(backoff_policy) end it 'initializes a new Net::HTTP with passed in host and port' do @@ -117,7 +118,7 @@ class Analytics path = subject.instance_variable_get(:@path) default_headers = { 'Content-Type' => 'application/json', - 'accept' => 'application/json' + 'Accept' => 'application/json' } expect(Net::HTTP::Post).to receive(:new).with( path, default_headers @@ -157,14 +158,17 @@ class Analytics let(:status_code) { status_code } let(:body) { body } let(:retries) { 4 } - let(:backoff) { 1 } - subject { described_class.new(retries: retries, backoff: backoff) } + let(:backoff_policy) { FakeBackoffPolicy.new([1000, 1000, 1000]) } + subject { + described_class.new(retries: retries, + backoff_policy: backoff_policy) + } it 'retries the request' do expect(subject) .to receive(:sleep) .exactly(retries - 1).times - .with(backoff) + .with(1) .and_return(nil) subject.post(write_key, batch) end @@ -219,7 +223,7 @@ class Analytics context 'request or parsing of response results in an exception' do let(:response_body) { 'Malformed JSON ---' } - subject { described_class.new(backoff: 0) } + subject { described_class.new(retries: 0) } it 'returns a -1 for status' do expect(subject.post(write_key, batch).status).to eq(-1) diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 8d8bdb8..8bc41b0 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -90,6 +90,18 @@ def run end end +# A backoff policy that returns a fixed list of values +class FakeBackoffPolicy + def initialize(interval_values) + @interval_values = interval_values + end + + def next_interval + raise 'FakeBackoffPolicy has no values left' if @interval_values.empty? + @interval_values.shift + end +end + # usage: # it "should return a result of 5" do # eventually(options: {timeout: 1}) { long_running_thing.result.should eq(5) }