Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 119 additions & 0 deletions instrument/goodput_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package instrument

import (
"context"
"net"
"testing"
"time"

sdkotel "go.opentelemetry.io/otel"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"

"github.com/getlantern/geo"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

// newGoodputInstrument wires a manual-reader meter provider into the global
// otel state and returns a defaultInstrument plus the reader to collect from.
func newGoodputInstrument(t *testing.T) (*defaultInstrument, *sdkmetric.ManualReader) {
t.Helper()
// Restore the global meter provider after the test so the manual-reader
// provider doesn't leak into other tests in the process.
prev := sdkotel.GetMeterProvider()
t.Cleanup(func() { sdkotel.SetMeterProvider(prev) })

reader := sdkmetric.NewManualReader()
provider := sdkmetric.NewMeterProvider(sdkmetric.WithReader(reader))
sdkotel.SetMeterProvider(provider)

ins, err := NewDefault(geo.NoLookup{}, &mockISPLookup{}, "test-proxy")
require.NoError(t, err)
return ins, reader
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

// TestSessionGoodput verifies the per-session download goodput histogram is
// recorded once for a session that moved >= goodputMinBytes, with the value
// ~= received bytes / connection seconds and a receive direction tag.
func TestSessionGoodput(t *testing.T) {
ins, reader := newGoodputInstrument(t)

const recv = 1_100_000 // above the 1MB goodput threshold
ins.SessionGoodput(context.Background(), recv, time.Second, net.ParseIP("1.2.3.4"))

var rm metricdata.ResourceMetrics
require.NoError(t, reader.Collect(context.Background(), &rm))

count, sum, found := histogramCountSum(rm, "proxy.session.goodput")
require.True(t, found, "goodput histogram should be emitted for a >=1MB session")
assert.Equal(t, uint64(1), count, "exactly one goodput sample")
// 1s open duration → goodput ~= received bytes per second.
assert.InDelta(t, float64(recv), sum, float64(recv)*0.01)

attrs := extractHistogramAttrs(rm, "proxy.session.goodput")
assert.Equal(t, "receive", attrs["network.io.direction"])
// The country point attribute must always be present (empty here, since the
// test uses geo.NoLookup) so the metric stays sliceable by country.
_, hasCountry := attrs["geo.country.iso_code"]
assert.True(t, hasCountry, "goodput sample should carry the geo.country.iso_code attribute")
}

// TestSessionGoodputBelowThreshold verifies a sub-threshold session records no
// goodput sample.
func TestSessionGoodputBelowThreshold(t *testing.T) {
ins, reader := newGoodputInstrument(t)

ins.SessionGoodput(context.Background(), 42, time.Second, net.ParseIP("1.2.3.4"))

var rm metricdata.ResourceMetrics
require.NoError(t, reader.Collect(context.Background(), &rm))

_, _, found := histogramCountSum(rm, "proxy.session.goodput")
assert.False(t, found, "no goodput sample below the byte threshold")
}

// TestSessionGoodputZeroDuration verifies a non-positive duration records no
// sample (guards against divide-by-zero).
func TestSessionGoodputZeroDuration(t *testing.T) {
ins, reader := newGoodputInstrument(t)

ins.SessionGoodput(context.Background(), 2_000_000, 0, net.ParseIP("1.2.3.4"))

var rm metricdata.ResourceMetrics
require.NoError(t, reader.Collect(context.Background(), &rm))

_, _, found := histogramCountSum(rm, "proxy.session.goodput")
assert.False(t, found, "no goodput sample for a zero-duration session")
}

func histogramCountSum(rm metricdata.ResourceMetrics, name string) (uint64, float64, bool) {
for _, sm := range rm.ScopeMetrics {
for _, m := range sm.Metrics {
if m.Name != name {
continue
}
if d, ok := m.Data.(metricdata.Histogram[float64]); ok && len(d.DataPoints) > 0 {
return d.DataPoints[0].Count, d.DataPoints[0].Sum, true
}
}
}
return 0, 0, false
}

func extractHistogramAttrs(rm metricdata.ResourceMetrics, name string) map[string]string {
result := make(map[string]string)
for _, sm := range rm.ScopeMetrics {
for _, m := range sm.Metrics {
if m.Name != name {
continue
}
if d, ok := m.Data.(metricdata.Histogram[float64]); ok && len(d.DataPoints) > 0 {
for _, kv := range d.DataPoints[0].Attributes.ToSlice() {
result[string(kv.Key)] = kv.Value.Emit()
}
}
}
}
return result
}
30 changes: 30 additions & 0 deletions instrument/instrument.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@ var (
originRootRegex = regexp.MustCompile(`([^\.]+\.[^\.]+$)`)
)

// goodputMinBytes is the minimum received bytes a session must have moved
// before its goodput sample is recorded. Filters out idle/tiny connections
// whose bytes/duration is dominated by setup and idle time rather than actual
// transfer speed.
const goodputMinBytes = 1_000_000

// Instrument is the common interface about what can be instrumented.
type Instrument interface {
WrapFilter(prefix string, f filters.Filter) (filters.Filter, error)
Expand All @@ -37,6 +43,7 @@ type Instrument interface {
XBQHeaderSent(ctx context.Context)
SuspectedProbing(ctx context.Context, fromIP net.IP, reason string)
ProxiedBytes(ctx context.Context, sent, recv int, platform, platformVersion, libVersion, appVersion, app, locale, dataCapCohort, probingError string, clientIP net.IP, deviceID, originHost, arch string)
SessionGoodput(ctx context.Context, recvBytes int, duration time.Duration, clientIP net.IP)
Connection(ctx context.Context, clientIP net.IP)
ReportProxiedBytesPeriodically(interval time.Duration, tp *sdktrace.TracerProvider)
ReportProxiedBytes(tp *sdktrace.TracerProvider)
Expand Down Expand Up @@ -73,6 +80,8 @@ func (i NoInstrument) XBQHeaderSent(ctx context.Context)
func (i NoInstrument) SuspectedProbing(ctx context.Context, fromIP net.IP, reason string) {}
func (i NoInstrument) ProxiedBytes(ctx context.Context, sent, recv int, platform, platformVersion, libVersion, appVersion, app, locale, dataCapCohort, probingError string, clientIP net.IP, deviceID, originHost, arch string) {
}
func (i NoInstrument) SessionGoodput(ctx context.Context, recvBytes int, duration time.Duration, clientIP net.IP) {
}
func (i NoInstrument) ReportProxiedBytesPeriodically(interval time.Duration, tp *sdktrace.TracerProvider) {
}
func (i NoInstrument) ReportProxiedBytes(tp *sdktrace.TracerProvider) {}
Expand Down Expand Up @@ -305,6 +314,27 @@ func (ins *defaultInstrument) ProxiedBytes(ctx context.Context, sent, recv int,
ins.statsMx.Unlock()
}

// SessionGoodput records a session's download goodput (received bytes per
// second of connection lifetime) at close, for sessions that moved at least
// goodputMinBytes. duration is the connection's open time; it includes idle
// periods, so this is a floor on true transfer speed — but both arms of a
// bandit experiment are measured identically, so it's a fair relative signal,
// and the byte floor filters the worst idle-dominated noise. No device_id tag
// (cardinality); track and cloud.region come from resource attributes, leaving
// geo.country.iso_code as the only point attribute the evaluator strata need.
func (ins *defaultInstrument) SessionGoodput(ctx context.Context, recvBytes int, duration time.Duration, clientIP net.IP) {
if recvBytes < goodputMinBytes || duration <= 0 {
return
}
goodput := float64(recvBytes) / duration.Seconds()
country := ins.countryLookup.CountryCode(clientIP)
otelinstrument.SessionGoodput.Record(ctx, goodput,
metric.WithAttributes(
semconv.GeoCountryISOCodeKey.String(country),
semconv.NetworkIODirectionKey.String("receive"),
))
}

// Connection counts the number of incoming connections
func (ins *defaultInstrument) Connection(ctx context.Context, clientIP net.IP) {
fromCountry := ins.countryLookup.CountryCode(clientIP)
Expand Down
13 changes: 13 additions & 0 deletions instrument/otelinstrument/otelinstrument.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ var (
Throttling metric.Int64Counter
SuspectedProbing metric.Int64Counter
Connections metric.Int64Counter
SessionGoodput metric.Float64Histogram
DistinctClients1m, DistinctClients10m, DistinctClients1h *distinct.SlidingWindowDistinctCount
distinctClients metric.Int64ObservableGauge
)
Expand Down Expand Up @@ -79,6 +80,18 @@ func initialize() error {
if Connections, err = meter.Int64Counter("proxy.connections"); err != nil {
return err
}
// Per-session download goodput (received bytes per second of connection
// lifetime), recorded once at connection close for sessions that moved at
// least goodputMinBytes. Sliceable by track (resource attr) × cloud.region
// (resource attr) × geo.country.iso_code (point attr) so the bandit
// experiment evaluator can compare a challenger track's median goodput
// against the incumbent's. Unit "bytes/s" follows proxy.io's "bytes"
// spelling for consistency within this package's metrics.
if SessionGoodput, err = meter.Float64Histogram("proxy.session.goodput",
metric.WithUnit("bytes/s"),
metric.WithDescription("Per-session download goodput: received bytes per second of connection lifetime")); err != nil {
return err
}

DistinctClients1m = distinct.NewSlidingWindowDistinctCount(time.Minute, time.Second)
DistinctClients10m = distinct.NewSlidingWindowDistinctCount(10*time.Minute, 10*time.Second)
Expand Down
30 changes: 22 additions & 8 deletions reporting.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,28 @@ type reportingConfig struct {

func newReportingConfig(countryLookup geo.CountryLookup, rc *rclient.Client, instrument instrument.Instrument, throttleConfig throttle.Config) *reportingConfig {
proxiedBytesReporter := func(ctx map[string]interface{}, stats *measured.Stats, deltaStats *measured.Stats, final bool) {
if deltaStats.SentTotal == 0 && deltaStats.RecvTotal == 0 {
// nothing to report
noDelta := deltaStats.SentTotal == 0 && deltaStats.RecvTotal == 0
if noDelta && !final {
// nothing to report on an idle, non-final interval; return before
// any client IP parsing to keep periodic reporting cheap.
return
}

var client_ip net.IP
if s, ok := ctx[common.ClientIP].(string); ok {
client_ip = net.ParseIP(s)
}
Comment thread
reflog marked this conversation as resolved.

if final {
// Record per-session download goodput once at connection close,
// using the connection's cumulative received bytes and open time.
// Done before the zero-delta early return below so a session that
// was idle during its final reporting interval is still counted.
instrument.SessionGoodput(context.Background(), stats.RecvTotal, stats.Duration, client_ip)
}

if noDelta {
// nothing more to report (final call with no new bytes this interval)
return
}
// Note - sometimes we're missing the platform and version
Expand All @@ -45,12 +65,6 @@ func newReportingConfig(countryLookup geo.CountryLookup, rc *rclient.Client, ins
probingError := fromContext(ctx, common.ProbingError)
arch := fromContext(ctx, common.KernelArch)

var client_ip net.IP
_client_ip := ctx[common.ClientIP]
if _client_ip != nil {
client_ip = net.ParseIP(_client_ip.(string))
}

dataCapCohort := ""
throttleSettings, hasThrottleSettings := ctx[common.ThrottleSettings]
if hasThrottleSettings {
Expand Down
Loading