Skip to content
6 changes: 5 additions & 1 deletion lib/segment/analytics/defaults.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,18 @@ module Request
end

module Queue
BATCH_SIZE = 100
MAX_SIZE = 10000
end

module Message
MAX_BYTES = 32768 # 32Kb
end

module MessageBatch
MAX_BYTES = 512_000 # 500Kb
MAX_SIZE = 100
end

module BackoffPolicy
MIN_TIMEOUT_MS = 100
MAX_TIMEOUT_MS = 10000
Expand Down
6 changes: 5 additions & 1 deletion lib/segment/analytics/message.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,11 @@ def initialize(hash)
end

def too_big?
to_json.bytesize > Defaults::Message::MAX_BYTES
json_size > Defaults::Message::MAX_BYTES
end

def json_size
to_json.bytesize
end

# Since the hash is expected to not be modified (set at initialization),
Expand Down
34 changes: 32 additions & 2 deletions lib/segment/analytics/message_batch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,53 @@ class Analytics
class MessageBatch
extend Forwardable
include Segment::Analytics::Logging
include Segment::Analytics::Defaults::MessageBatch

def initialize
def initialize(max_message_count)
@messages = []
@max_message_count = max_message_count
@json_size = 0
end

def <<(message)
if message.too_big?
logger.error('a message exceeded the maximum allowed size')
else
@messages << message
@json_size += message.json_size + 1 # One byte for the comma
end
end

def full?
item_count_exhausted? || size_exhausted?
end

def clear
@messages.clear
@json_size = 0
end

def_delegators :@messages, :to_json
def_delegators :@messages, :clear
def_delegators :@messages, :empty?
def_delegators :@messages, :length

private

def item_count_exhausted?
@messages.length >= @max_message_count
end

# We consider the max size here as just enough to leave room for one more
# message of the largest size possible. This is a shortcut that allows us
# to use a native Ruby `Queue` that doesn't allow peeking. The tradeoff
# here is that we might fit in less messages than possible into a batch.
#
# The alternative is to use our own `Queue` implementation that allows
# peeking, and to consider the next message size when calculating whether
# the message can be accomodated in this batch.
def size_exhausted?
@json_size >= (MAX_BYTES - Defaults::Message::MAX_BYTES)
end
end
end
end
6 changes: 3 additions & 3 deletions lib/segment/analytics/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ def initialize(queue, write_key, options = {})
symbolize_keys! options
@queue = queue
@write_key = write_key
@batch_size = options[:batch_size] || Queue::BATCH_SIZE
@on_error = options[:on_error] || proc { |status, error| }
@batch = MessageBatch.new
batch_size = options[:batch_size] || Defaults::MessageBatch::MAX_SIZE
@batch = MessageBatch.new(batch_size)
@lock = Mutex.new
@request = Request.new
end
Expand All @@ -39,7 +39,7 @@ def run
return if @queue.empty?

@lock.synchronize do
until @batch.length >= @batch_size || @queue.empty?
until @batch.full? || @queue.empty?
@batch << Message.new(@queue.pop)
end
end
Expand Down
2 changes: 1 addition & 1 deletion spec/helpers/runscope_client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ def initialize(api_token)

def requests(bucket_key)
with_retries(3) do
response = @conn.get("/buckets/#{bucket_key}/messages", count: 10)
response = @conn.get("/buckets/#{bucket_key}/messages", count: 20)

raise "Runscope error. #{response.body}" unless response.status == 200

Expand Down
30 changes: 27 additions & 3 deletions spec/segment/analytics/message_batch_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,47 @@
module Segment
class Analytics
describe MessageBatch do
describe '#<<' do
subject { described_class.new }
subject { described_class.new(100) }

describe '#<<' do
it 'appends messages' do
subject << Message.new('a' => 'b')
expect(subject.length).to eq(1)
end

it 'rejects messages that exceed the maximum allowed size' do
max_bytes = Segment::Analytics::Defaults::Message::MAX_BYTES
max_bytes = Defaults::Message::MAX_BYTES
hash = { 'a' => 'b' * max_bytes }
message = Message.new(hash)

subject << message
expect(subject.length).to eq(0)
end
end

describe '#full?' do
it 'returns true once item count is exceeded' do
99.times { subject << Message.new(a: 'b') }
expect(subject.full?).to be(false)

subject << Message.new(a: 'b')
expect(subject.full?).to be(true)
end

it 'returns true once max size is almost exceeded' do
message = Message.new(a: 'b' * (Defaults::Message::MAX_BYTES - 10))

# Each message is under the individual limit
expect(message.json_size).to be < Defaults::Message::MAX_BYTES

# Size of the batch is over the limit
expect(50 * message.json_size).to be > Defaults::MessageBatch::MAX_BYTES

expect(subject.full?).to be(false)
50.times { subject << message }
expect(subject.full?).to be(true)
end
end
end
end
end
32 changes: 20 additions & 12 deletions spec/segment/analytics/worker_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,11 @@ class Analytics
describe '#init' do
it 'accepts string keys' do
queue = Queue.new
worker = Segment::Analytics::Worker.new(queue, 'secret', 'batch_size' => 100)
expect(worker.instance_variable_get(:@batch_size)).to eq(100)
worker = Segment::Analytics::Worker.new(queue,
'secret',
'batch_size' => 100)
batch = worker.instance_variable_get(:@batch)
expect(batch.instance_variable_get(:@max_message_count)).to eq(100)
end
end

Expand Down Expand Up @@ -35,8 +38,11 @@ class Analytics
end.to_not raise_error
end

it 'executes the error handler, before the request phase ends, if the request is invalid' do
Segment::Analytics::Request.any_instance.stub(:post).and_return(Segment::Analytics::Response.new(400, 'Some error'))
it 'executes the error handler if the request is invalid' do
Segment::Analytics::Request
.any_instance
.stub(:post)
.and_return(Segment::Analytics::Response.new(400, 'Some error'))

status = error = nil
on_error = proc do |yielded_status, yielded_error|
Expand All @@ -46,9 +52,10 @@ class Analytics

queue = Queue.new
queue << {}
worker = Segment::Analytics::Worker.new queue, 'secret', :on_error => on_error
worker = described_class.new(queue, 'secret', :on_error => on_error)

# This is to ensure that Client#flush doesn't finish before calling the error handler.
# This is to ensure that Client#flush doesn't finish before calling
# the error handler.
Thread.new { worker.run }
sleep 0.1 # First give thread time to spin-up.
sleep 0.01 while worker.is_requesting?
Expand All @@ -69,7 +76,9 @@ class Analytics

queue = Queue.new
queue << Requested::TRACK
worker = Segment::Analytics::Worker.new queue, 'testsecret', :on_error => on_error
worker = described_class.new(queue,
'testsecret',
:on_error => on_error)
worker.run

expect(queue).to be_empty
Expand All @@ -89,12 +98,11 @@ class Analytics
queue << Requested::TRACK
worker = Segment::Analytics::Worker.new(queue, 'testsecret')

Thread.new do
worker.run
expect(worker.is_requesting?).to eq(false)
end

worker_thread = Thread.new { worker.run }
eventually { expect(worker.is_requesting?).to eq(true) }

worker_thread.join
expect(worker.is_requesting?).to eq(false)
end
end
end
Expand Down