From 533f65c78a5ecac11f08a56fe4bbdebab3f3deae Mon Sep 17 00:00:00 2001 From: Ilya Yakelzon Date: Thu, 25 Jun 2026 17:14:58 +0200 Subject: [PATCH 1/2] metrics: per-session download goodput histogram Adds proxy.session.goodput, a Float64Histogram (unit bytes/s) recorded once at connection close = received bytes / connection seconds, for sessions that moved >= 1 MB. This is the measurement signal for Lantern's automatic bandit experimentation system: to decide whether a challenger track actually delivers a big win, the evaluator needs a real per-session throughput signal it can compare between challenger and incumbent, per region x country. proxy.io (a byte counter) can't give a rate/distribution; this adds one. Tagged with network.io.direction=receive plus geo.country.iso_code (point attr), so it's sliceable by track x dc/region (resource attrs from buildResource) x country -- exactly the strata the evaluator compares. No device_id tag (cardinality). Unlike lantern-box#277, which wraps Conn/PacketConn to accumulate rx bytes and emit at Close(), http-proxy-lantern already measures per-connection bytes and duration via the measured listener: the reporting callback receives cumulative stats and a final=true flag at close. So goodput is recorded there, before the zero-delta early return so a session idle during its last reporting interval is still counted. Caveat (documented in code): duration is connection open time (includes idle), so goodput is a floor on true transfer speed. Both experiment arms are measured identically, so it's a fair relative signal, and the >=1 MB floor filters idle-dominated noise. Co-Authored-By: Claude Opus 4.8 (1M context) --- instrument/goodput_test.go | 110 ++++++++++++++++++++ instrument/instrument.go | 30 ++++++ instrument/otelinstrument/otelinstrument.go | 13 +++ reporting.go | 20 ++-- 4 files changed, 167 insertions(+), 6 deletions(-) create mode 100644 instrument/goodput_test.go diff --git a/instrument/goodput_test.go b/instrument/goodput_test.go new file mode 100644 index 00000000..f9d480b1 --- /dev/null +++ b/instrument/goodput_test.go @@ -0,0 +1,110 @@ +package instrument + +import ( + "context" + "net" + "testing" + "time" + + sdkotel "go.opentelemetry.io/otel" + sdkmetric "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + + "github.com/getlantern/geo" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// newGoodputInstrument wires a manual-reader meter provider into the global +// otel state and returns a defaultInstrument plus the reader to collect from. +func newGoodputInstrument(t *testing.T) (*defaultInstrument, *sdkmetric.ManualReader) { + t.Helper() + reader := sdkmetric.NewManualReader() + provider := sdkmetric.NewMeterProvider(sdkmetric.WithReader(reader)) + sdkotel.SetMeterProvider(provider) + + ins, err := NewDefault(geo.NoLookup{}, &mockISPLookup{}, "test-proxy") + require.NoError(t, err) + return ins, reader +} + +// TestSessionGoodput verifies the per-session download goodput histogram is +// recorded once for a session that moved >= goodputMinBytes, with the value +// ~= received bytes / connection seconds and a receive direction tag. +func TestSessionGoodput(t *testing.T) { + ins, reader := newGoodputInstrument(t) + + const recv = 1_100_000 // above the 1MB goodput threshold + ins.SessionGoodput(context.Background(), recv, time.Second, net.ParseIP("1.2.3.4")) + + var rm metricdata.ResourceMetrics + require.NoError(t, reader.Collect(context.Background(), &rm)) + + count, sum, found := histogramCountSum(rm, "proxy.session.goodput") + require.True(t, found, "goodput histogram should be emitted for a >=1MB session") + assert.Equal(t, uint64(1), count, "exactly one goodput sample") + // 1s open duration → goodput ~= received bytes per second. + assert.InDelta(t, float64(recv), sum, float64(recv)*0.01) + + attrs := extractHistogramAttrs(rm, "proxy.session.goodput") + assert.Equal(t, "receive", attrs["network.io.direction"]) +} + +// TestSessionGoodputBelowThreshold verifies a sub-threshold session records no +// goodput sample. +func TestSessionGoodputBelowThreshold(t *testing.T) { + ins, reader := newGoodputInstrument(t) + + ins.SessionGoodput(context.Background(), 42, time.Second, net.ParseIP("1.2.3.4")) + + var rm metricdata.ResourceMetrics + require.NoError(t, reader.Collect(context.Background(), &rm)) + + _, _, found := histogramCountSum(rm, "proxy.session.goodput") + assert.False(t, found, "no goodput sample below the byte threshold") +} + +// TestSessionGoodputZeroDuration verifies a non-positive duration records no +// sample (guards against divide-by-zero). +func TestSessionGoodputZeroDuration(t *testing.T) { + ins, reader := newGoodputInstrument(t) + + ins.SessionGoodput(context.Background(), 2_000_000, 0, net.ParseIP("1.2.3.4")) + + var rm metricdata.ResourceMetrics + require.NoError(t, reader.Collect(context.Background(), &rm)) + + _, _, found := histogramCountSum(rm, "proxy.session.goodput") + assert.False(t, found, "no goodput sample for a zero-duration session") +} + +func histogramCountSum(rm metricdata.ResourceMetrics, name string) (uint64, float64, bool) { + for _, sm := range rm.ScopeMetrics { + for _, m := range sm.Metrics { + if m.Name != name { + continue + } + if d, ok := m.Data.(metricdata.Histogram[float64]); ok && len(d.DataPoints) > 0 { + return d.DataPoints[0].Count, d.DataPoints[0].Sum, true + } + } + } + return 0, 0, false +} + +func extractHistogramAttrs(rm metricdata.ResourceMetrics, name string) map[string]string { + result := make(map[string]string) + for _, sm := range rm.ScopeMetrics { + for _, m := range sm.Metrics { + if m.Name != name { + continue + } + if d, ok := m.Data.(metricdata.Histogram[float64]); ok && len(d.DataPoints) > 0 { + for _, kv := range d.DataPoints[0].Attributes.ToSlice() { + result[string(kv.Key)] = kv.Value.Emit() + } + } + } + } + return result +} diff --git a/instrument/instrument.go b/instrument/instrument.go index 051d9326..679a3b73 100644 --- a/instrument/instrument.go +++ b/instrument/instrument.go @@ -26,6 +26,12 @@ var ( originRootRegex = regexp.MustCompile(`([^\.]+\.[^\.]+$)`) ) +// goodputMinBytes is the minimum received bytes a session must have moved +// before its goodput sample is recorded. Filters out idle/tiny connections +// whose bytes/duration is dominated by setup and idle time rather than actual +// transfer speed. +const goodputMinBytes = 1_000_000 + // Instrument is the common interface about what can be instrumented. type Instrument interface { WrapFilter(prefix string, f filters.Filter) (filters.Filter, error) @@ -37,6 +43,7 @@ type Instrument interface { XBQHeaderSent(ctx context.Context) SuspectedProbing(ctx context.Context, fromIP net.IP, reason string) ProxiedBytes(ctx context.Context, sent, recv int, platform, platformVersion, libVersion, appVersion, app, locale, dataCapCohort, probingError string, clientIP net.IP, deviceID, originHost, arch string) + SessionGoodput(ctx context.Context, recvBytes int, duration time.Duration, clientIP net.IP) Connection(ctx context.Context, clientIP net.IP) ReportProxiedBytesPeriodically(interval time.Duration, tp *sdktrace.TracerProvider) ReportProxiedBytes(tp *sdktrace.TracerProvider) @@ -73,6 +80,8 @@ func (i NoInstrument) XBQHeaderSent(ctx context.Context) func (i NoInstrument) SuspectedProbing(ctx context.Context, fromIP net.IP, reason string) {} func (i NoInstrument) ProxiedBytes(ctx context.Context, sent, recv int, platform, platformVersion, libVersion, appVersion, app, locale, dataCapCohort, probingError string, clientIP net.IP, deviceID, originHost, arch string) { } +func (i NoInstrument) SessionGoodput(ctx context.Context, recvBytes int, duration time.Duration, clientIP net.IP) { +} func (i NoInstrument) ReportProxiedBytesPeriodically(interval time.Duration, tp *sdktrace.TracerProvider) { } func (i NoInstrument) ReportProxiedBytes(tp *sdktrace.TracerProvider) {} @@ -305,6 +314,27 @@ func (ins *defaultInstrument) ProxiedBytes(ctx context.Context, sent, recv int, ins.statsMx.Unlock() } +// SessionGoodput records a session's download goodput (received bytes per +// second of connection lifetime) at close, for sessions that moved at least +// goodputMinBytes. duration is the connection's open time; it includes idle +// periods, so this is a floor on true transfer speed — but both arms of a +// bandit experiment are measured identically, so it's a fair relative signal, +// and the byte floor filters the worst idle-dominated noise. No device_id tag +// (cardinality); track and cloud.region come from resource attributes, leaving +// geo.country.iso_code as the only point attribute the evaluator strata need. +func (ins *defaultInstrument) SessionGoodput(ctx context.Context, recvBytes int, duration time.Duration, clientIP net.IP) { + if recvBytes < goodputMinBytes || duration <= 0 { + return + } + goodput := float64(recvBytes) / duration.Seconds() + country := ins.countryLookup.CountryCode(clientIP) + otelinstrument.SessionGoodput.Record(ctx, goodput, + metric.WithAttributes( + semconv.GeoCountryISOCodeKey.String(country), + semconv.NetworkIODirectionKey.String("receive"), + )) +} + // Connection counts the number of incoming connections func (ins *defaultInstrument) Connection(ctx context.Context, clientIP net.IP) { fromCountry := ins.countryLookup.CountryCode(clientIP) diff --git a/instrument/otelinstrument/otelinstrument.go b/instrument/otelinstrument/otelinstrument.go index 020ab67c..a3bde0a8 100644 --- a/instrument/otelinstrument/otelinstrument.go +++ b/instrument/otelinstrument/otelinstrument.go @@ -32,6 +32,7 @@ var ( Throttling metric.Int64Counter SuspectedProbing metric.Int64Counter Connections metric.Int64Counter + SessionGoodput metric.Float64Histogram DistinctClients1m, DistinctClients10m, DistinctClients1h *distinct.SlidingWindowDistinctCount distinctClients metric.Int64ObservableGauge ) @@ -79,6 +80,18 @@ func initialize() error { if Connections, err = meter.Int64Counter("proxy.connections"); err != nil { return err } + // Per-session download goodput (received bytes per second of connection + // lifetime), recorded once at connection close for sessions that moved at + // least goodputMinBytes. Sliceable by track (resource attr) × cloud.region + // (resource attr) × geo.country.iso_code (point attr) so the bandit + // experiment evaluator can compare a challenger track's median goodput + // against the incumbent's. Unit "bytes/s" follows proxy.io's "bytes" + // spelling for consistency within this package's metrics. + if SessionGoodput, err = meter.Float64Histogram("proxy.session.goodput", + metric.WithUnit("bytes/s"), + metric.WithDescription("Per-session download goodput: received bytes per second of connection lifetime")); err != nil { + return err + } DistinctClients1m = distinct.NewSlidingWindowDistinctCount(time.Minute, time.Second) DistinctClients10m = distinct.NewSlidingWindowDistinctCount(10*time.Minute, 10*time.Second) diff --git a/reporting.go b/reporting.go index e4c89922..46bba839 100644 --- a/reporting.go +++ b/reporting.go @@ -29,6 +29,20 @@ type reportingConfig struct { func newReportingConfig(countryLookup geo.CountryLookup, rc *rclient.Client, instrument instrument.Instrument, throttleConfig throttle.Config) *reportingConfig { proxiedBytesReporter := func(ctx map[string]interface{}, stats *measured.Stats, deltaStats *measured.Stats, final bool) { + var client_ip net.IP + _client_ip := ctx[common.ClientIP] + if _client_ip != nil { + client_ip = net.ParseIP(_client_ip.(string)) + } + + if final { + // Record per-session download goodput once at connection close, + // using the connection's cumulative received bytes and open time. + // Done before the zero-delta early return below so a session that + // was idle during its final reporting interval is still counted. + instrument.SessionGoodput(context.Background(), stats.RecvTotal, stats.Duration, client_ip) + } + if deltaStats.SentTotal == 0 && deltaStats.RecvTotal == 0 { // nothing to report return @@ -45,12 +59,6 @@ func newReportingConfig(countryLookup geo.CountryLookup, rc *rclient.Client, ins probingError := fromContext(ctx, common.ProbingError) arch := fromContext(ctx, common.KernelArch) - var client_ip net.IP - _client_ip := ctx[common.ClientIP] - if _client_ip != nil { - client_ip = net.ParseIP(_client_ip.(string)) - } - dataCapCohort := "" throttleSettings, hasThrottleSettings := ctx[common.ThrottleSettings] if hasThrottleSettings { From 23f4f7cfae0cef23f4017c31da976ce61da50175 Mon Sep 17 00:00:00 2001 From: Ilya Yakelzon Date: Thu, 25 Jun 2026 17:22:16 +0200 Subject: [PATCH 2/2] metrics: address review feedback on session goodput - reporting.go: skip client IP parsing on idle, non-final reporting intervals (return on zero-delta before parsing), and use a safe type assertion for ctx[common.ClientIP]. Avoids the per-interval overhead regression introduced by recording goodput at close. - goodput_test.go: restore the global meter provider via t.Cleanup so the manual-reader provider doesn't leak into other tests. - goodput_test.go: also assert the geo.country.iso_code point attribute is present, fully covering the emitted metric contract. Co-Authored-By: Claude Opus 4.8 (1M context) --- instrument/goodput_test.go | 9 +++++++++ reporting.go | 16 +++++++++++----- 2 files changed, 20 insertions(+), 5 deletions(-) diff --git a/instrument/goodput_test.go b/instrument/goodput_test.go index f9d480b1..87f8bce5 100644 --- a/instrument/goodput_test.go +++ b/instrument/goodput_test.go @@ -19,6 +19,11 @@ import ( // otel state and returns a defaultInstrument plus the reader to collect from. func newGoodputInstrument(t *testing.T) (*defaultInstrument, *sdkmetric.ManualReader) { t.Helper() + // Restore the global meter provider after the test so the manual-reader + // provider doesn't leak into other tests in the process. + prev := sdkotel.GetMeterProvider() + t.Cleanup(func() { sdkotel.SetMeterProvider(prev) }) + reader := sdkmetric.NewManualReader() provider := sdkmetric.NewMeterProvider(sdkmetric.WithReader(reader)) sdkotel.SetMeterProvider(provider) @@ -48,6 +53,10 @@ func TestSessionGoodput(t *testing.T) { attrs := extractHistogramAttrs(rm, "proxy.session.goodput") assert.Equal(t, "receive", attrs["network.io.direction"]) + // The country point attribute must always be present (empty here, since the + // test uses geo.NoLookup) so the metric stays sliceable by country. + _, hasCountry := attrs["geo.country.iso_code"] + assert.True(t, hasCountry, "goodput sample should carry the geo.country.iso_code attribute") } // TestSessionGoodputBelowThreshold verifies a sub-threshold session records no diff --git a/reporting.go b/reporting.go index 46bba839..bbffafc6 100644 --- a/reporting.go +++ b/reporting.go @@ -29,10 +29,16 @@ type reportingConfig struct { func newReportingConfig(countryLookup geo.CountryLookup, rc *rclient.Client, instrument instrument.Instrument, throttleConfig throttle.Config) *reportingConfig { proxiedBytesReporter := func(ctx map[string]interface{}, stats *measured.Stats, deltaStats *measured.Stats, final bool) { + noDelta := deltaStats.SentTotal == 0 && deltaStats.RecvTotal == 0 + if noDelta && !final { + // nothing to report on an idle, non-final interval; return before + // any client IP parsing to keep periodic reporting cheap. + return + } + var client_ip net.IP - _client_ip := ctx[common.ClientIP] - if _client_ip != nil { - client_ip = net.ParseIP(_client_ip.(string)) + if s, ok := ctx[common.ClientIP].(string); ok { + client_ip = net.ParseIP(s) } if final { @@ -43,8 +49,8 @@ func newReportingConfig(countryLookup geo.CountryLookup, rc *rclient.Client, ins instrument.SessionGoodput(context.Background(), stats.RecvTotal, stats.Duration, client_ip) } - if deltaStats.SentTotal == 0 && deltaStats.RecvTotal == 0 { - // nothing to report + if noDelta { + // nothing more to report (final call with no new bytes this interval) return } // Note - sometimes we're missing the platform and version