From 53f201799c87ad268ebddd4872eaa2314ea6f927 Mon Sep 17 00:00:00 2001 From: Steven Miller Date: Mon, 6 Apr 2026 15:57:07 -0400 Subject: [PATCH 1/2] Consolidate instance lifecycle subscriptions --- lib/builds/manager_test.go | 4 +- lib/instances/README.md | 4 +- lib/instances/lifecycle_events.go | 136 +++++++++++++++++++++++++ lib/instances/lifecycle_events_test.go | 125 +++++++++++++++++++++++ lib/instances/manager.go | 69 +++++++++---- lib/instances/metrics.go | 62 +++++++++++ lib/instances/metrics_test.go | 101 ++++++++++++++++++ lib/instances/subscribe.go | 83 --------------- lib/instances/subscribe_test.go | 97 ------------------ lib/instances/wait.go | 24 +++-- lib/instances/wait_test.go | 113 +++++++++++++++++--- 11 files changed, 589 insertions(+), 229 deletions(-) create mode 100644 lib/instances/lifecycle_events.go create mode 100644 lib/instances/lifecycle_events_test.go delete mode 100644 lib/instances/subscribe.go delete mode 100644 lib/instances/subscribe_test.go diff --git a/lib/builds/manager_test.go b/lib/builds/manager_test.go index e0709e6f..3b6a3e34 100644 --- a/lib/builds/manager_test.go +++ b/lib/builds/manager_test.go @@ -171,8 +171,8 @@ func (m *mockInstanceManager) GetVsockDialer(ctx context.Context, instanceID str return nil, nil } -func (m *mockInstanceManager) Subscribe(instanceID string) (<-chan instances.StateChange, func()) { - ch := make(chan instances.StateChange, 1) +func (m *mockInstanceManager) SubscribeLifecycleEvents(consumer instances.LifecycleEventConsumer) (<-chan instances.LifecycleEvent, func()) { + ch := make(chan instances.LifecycleEvent, 1) return ch, func() { close(ch) } } diff --git a/lib/instances/README.md b/lib/instances/README.md index 77fdd3ae..b56e276d 100644 --- a/lib/instances/README.md +++ b/lib/instances/README.md @@ -130,9 +130,9 @@ Any State → Stopped - If an instance is deleted, its schedule is retained so retention can continue cleaning existing scheduled snapshots. - Once the deleted instance has no scheduled snapshots left, the scheduler removes that schedule automatically. -## WaitForState (wait.go, subscribe.go) +## WaitForState (wait.go, lifecycle_events.go) -Waits for an instance to reach a target state using channel-based subscriptions with a polling fallback. State-changing manager methods (start, stop, standby, restore, fork, delete) broadcast via `notifyStateChange`, which fans out to subscriber channels. `WaitForState` uses a 3-way select: subscription events, 5s polling fallback (logs at debug level if it detects the state change), and context cancellation/timeout. Returns early on terminal (`Stopped`, `Standby`, `Shutdown`) or error (`Unknown`) states. Used by `GET /instances/{id}/wait`. +Waits for an instance to reach a target state using the shared lifecycle event bus with a polling fallback. State-changing manager methods publish lifecycle events after successful mutations, and `WaitForState` filters them by `instance_id`. A 5s polling fallback guards against missed or dropped events. Returns early on terminal (`Stopped`, `Standby`, `Shutdown`) or error (`Unknown`) states. Used by `GET /instances/{id}/wait`. ## Reference Handling diff --git a/lib/instances/lifecycle_events.go b/lib/instances/lifecycle_events.go new file mode 100644 index 00000000..ad14cb00 --- /dev/null +++ b/lib/instances/lifecycle_events.go @@ -0,0 +1,136 @@ +package instances + +import ( + "context" + "sync" +) + +const lifecycleEventBufferSize = 32 + +// LifecycleEventConsumer identifies the internal consumer of lifecycle events. +// Keep this set bounded for observability label safety. +type LifecycleEventConsumer string + +const ( + LifecycleEventConsumerWaitForState LifecycleEventConsumer = "wait_for_state" + LifecycleEventConsumerAutoStandby LifecycleEventConsumer = "auto_standby" +) + +var allLifecycleEventConsumers = []LifecycleEventConsumer{ + LifecycleEventConsumerWaitForState, + LifecycleEventConsumerAutoStandby, +} + +// LifecycleEventAction identifies which instance lifecycle action occurred. +type LifecycleEventAction string + +const ( + LifecycleEventCreate LifecycleEventAction = "create" + LifecycleEventUpdate LifecycleEventAction = "update" + LifecycleEventStart LifecycleEventAction = "start" + LifecycleEventStop LifecycleEventAction = "stop" + LifecycleEventStandby LifecycleEventAction = "standby" + LifecycleEventRestore LifecycleEventAction = "restore" + LifecycleEventDelete LifecycleEventAction = "delete" + LifecycleEventFork LifecycleEventAction = "fork" +) + +// LifecycleEvent is a global instance change event stream used by internal +// consumers such as wait-for-state and background controllers. +type LifecycleEvent struct { + Action LifecycleEventAction + InstanceID string + Instance *Instance +} + +type lifecycleSubscriber struct { + consumer LifecycleEventConsumer + ch chan LifecycleEvent +} + +type lifecycleConsumerStats struct { + Subscribers int64 + MaxQueueDepth int64 +} + +type lifecycleSubscribers struct { + mu sync.Mutex + subs []lifecycleSubscriber + onDrop func(context.Context, LifecycleEventConsumer) +} + +func newLifecycleSubscribers() *lifecycleSubscribers { + return &lifecycleSubscribers{} +} + +func (s *lifecycleSubscribers) Subscribe(consumer LifecycleEventConsumer) (<-chan LifecycleEvent, func()) { + ch := make(chan LifecycleEvent, lifecycleEventBufferSize) + + s.mu.Lock() + s.subs = append(s.subs, lifecycleSubscriber{ + consumer: consumer, + ch: ch, + }) + s.mu.Unlock() + + return ch, func() { + s.mu.Lock() + defer s.mu.Unlock() + for i, sub := range s.subs { + if sub.ch == ch { + s.subs = append(s.subs[:i], s.subs[i+1:]...) + close(ch) + break + } + } + } +} + +func (s *lifecycleSubscribers) Notify(ctx context.Context, event LifecycleEvent) { + s.mu.Lock() + subs := append([]lifecycleSubscriber(nil), s.subs...) + s.mu.Unlock() + + for _, sub := range subs { + if trySendLifecycleEvent(sub.ch, event) && s.onDrop != nil { + s.onDrop(ctx, sub.consumer) + } + } +} + +func (s *lifecycleSubscribers) Stats() map[LifecycleEventConsumer]lifecycleConsumerStats { + s.mu.Lock() + defer s.mu.Unlock() + + stats := make(map[LifecycleEventConsumer]lifecycleConsumerStats, len(allLifecycleEventConsumers)) + for _, consumer := range allLifecycleEventConsumers { + stats[consumer] = lifecycleConsumerStats{} + } + for _, sub := range s.subs { + stat := stats[sub.consumer] + stat.Subscribers++ + queueDepth := int64(len(sub.ch)) + if queueDepth > stat.MaxQueueDepth { + stat.MaxQueueDepth = queueDepth + } + stats[sub.consumer] = stat + } + return stats +} + +// trySendLifecycleEvent attempts a non-blocking send. +// Returns true when the event was dropped because the channel buffer was full. +func trySendLifecycleEvent(ch chan LifecycleEvent, event LifecycleEvent) (dropped bool) { + defer func() { + if recover() != nil { + dropped = false + } + }() + + select { + case ch <- event: + return false + default: + return true + } +} diff --git a/lib/instances/lifecycle_events_test.go b/lib/instances/lifecycle_events_test.go new file mode 100644 index 00000000..0cf0c1c7 --- /dev/null +++ b/lib/instances/lifecycle_events_test.go @@ -0,0 +1,125 @@ +package instances + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestLifecycleSubscribers_NotifyDelivers(t *testing.T) { + s := newLifecycleSubscribers() + ch, unsub := s.Subscribe(LifecycleEventConsumerWaitForState) + defer unsub() + + s.Notify(context.Background(), LifecycleEvent{ + Action: LifecycleEventStart, + InstanceID: "inst-1", + Instance: &Instance{State: StateRunning}, + }) + + select { + case event := <-ch: + assert.Equal(t, LifecycleEventStart, event.Action) + require.NotNil(t, event.Instance) + assert.Equal(t, StateRunning, event.Instance.State) + case <-time.After(time.Second): + t.Fatal("timed out waiting for lifecycle event") + } +} + +func TestLifecycleSubscribers_MultipleSubscribers(t *testing.T) { + s := newLifecycleSubscribers() + ch1, unsub1 := s.Subscribe(LifecycleEventConsumerWaitForState) + defer unsub1() + ch2, unsub2 := s.Subscribe(LifecycleEventConsumerAutoStandby) + defer unsub2() + + s.Notify(context.Background(), LifecycleEvent{ + Action: LifecycleEventStop, + InstanceID: "inst-1", + Instance: &Instance{State: StateStopped}, + }) + + for _, ch := range []<-chan LifecycleEvent{ch1, ch2} { + select { + case event := <-ch: + assert.Equal(t, LifecycleEventStop, event.Action) + require.NotNil(t, event.Instance) + assert.Equal(t, StateStopped, event.Instance.State) + case <-time.After(time.Second): + t.Fatal("timed out waiting for lifecycle event") + } + } +} + +func TestLifecycleSubscribers_UnsubscribeStopsDelivery(t *testing.T) { + s := newLifecycleSubscribers() + ch, unsub := s.Subscribe(LifecycleEventConsumerWaitForState) + unsub() + + _, ok := <-ch + assert.False(t, ok, "channel should be closed after unsubscribe") +} + +func TestLifecycleSubscribers_StatsByConsumer(t *testing.T) { + s := newLifecycleSubscribers() + wait1, unsub1 := s.Subscribe(LifecycleEventConsumerWaitForState) + defer unsub1() + wait2, unsub2 := s.Subscribe(LifecycleEventConsumerWaitForState) + defer unsub2() + auto, unsub3 := s.Subscribe(LifecycleEventConsumerAutoStandby) + defer unsub3() + + for i := 0; i < 3; i++ { + s.Notify(context.Background(), LifecycleEvent{ + Action: LifecycleEventUpdate, + InstanceID: "inst-1", + Instance: &Instance{State: StateRunning}, + }) + } + <-wait1 + <-wait2 + + stats := s.Stats() + assert.Equal(t, int64(2), stats[LifecycleEventConsumerWaitForState].Subscribers) + assert.Equal(t, int64(2), stats[LifecycleEventConsumerWaitForState].MaxQueueDepth) + assert.Equal(t, int64(1), stats[LifecycleEventConsumerAutoStandby].Subscribers) + assert.Equal(t, int64(3), stats[LifecycleEventConsumerAutoStandby].MaxQueueDepth) + assert.Equal(t, 3, len(auto)) +} + +func TestLifecycleSubscribers_DropCallbackOnBackpressure(t *testing.T) { + s := newLifecycleSubscribers() + + drops := make(chan LifecycleEventConsumer, 1) + s.onDrop = func(ctx context.Context, consumer LifecycleEventConsumer) { + drops <- consumer + } + + _, unsub := s.Subscribe(LifecycleEventConsumerWaitForState) + defer unsub() + + for i := 0; i < lifecycleEventBufferSize; i++ { + s.Notify(context.Background(), LifecycleEvent{ + Action: LifecycleEventUpdate, + InstanceID: "inst-1", + Instance: &Instance{State: StateRunning}, + }) + } + + s.Notify(context.Background(), LifecycleEvent{ + Action: LifecycleEventStart, + InstanceID: "inst-1", + Instance: &Instance{State: StateRunning}, + }) + + select { + case consumer := <-drops: + assert.Equal(t, LifecycleEventConsumerWaitForState, consumer) + case <-time.After(time.Second): + t.Fatal("expected drop callback") + } +} diff --git a/lib/instances/manager.go b/lib/instances/manager.go index 4ef8d9bf..accb9741 100644 --- a/lib/instances/manager.go +++ b/lib/instances/manager.go @@ -57,9 +57,8 @@ type Manager interface { SetResourceValidator(v ResourceValidator) // GetVsockDialer returns a VsockDialer for the specified instance. GetVsockDialer(ctx context.Context, instanceID string) (hypervisor.VsockDialer, error) - // Subscribe returns a channel that receives state change events for the - // given instance, plus an unsubscribe function the caller must defer. - Subscribe(instanceID string) (<-chan StateChange, func()) + // SubscribeLifecycleEvents returns the shared internal lifecycle event stream. + SubscribeLifecycleEvents(consumer LifecycleEventConsumer) (<-chan LifecycleEvent, func()) } // ImageUsageRecorder records newly used images before instance metadata is persisted. @@ -115,8 +114,8 @@ type manager struct { nativeCodecPaths map[string]string imageUsageRecorder ImageUsageRecorder - // State change subscriptions for waitForState - stateSubscribers *subscribers + // Shared lifecycle event subscriptions for internal consumers. + lifecycleEvents *lifecycleSubscribers // Hypervisor support vmStarters map[hypervisor.Type]hypervisor.VMStarter @@ -169,7 +168,7 @@ func NewManager(p *paths.Paths, imageManager images.Manager, systemManager syste snapshotDefaults: snapshotDefaults, compressionJobs: make(map[string]*compressionJob), nativeCodecPaths: make(map[string]string), - stateSubscribers: newSubscribers(), + lifecycleEvents: newLifecycleSubscribers(), } m.deleteSnapshotFn = m.deleteSnapshot @@ -180,6 +179,9 @@ func NewManager(p *paths.Paths, imageManager images.Manager, systemManager syste m.metrics = metrics } } + m.lifecycleEvents.onDrop = func(ctx context.Context, consumer LifecycleEventConsumer) { + m.recordLifecycleEventDropped(ctx, consumer, lifecycleEventDropReasonBufferFull) + } return m } @@ -195,15 +197,25 @@ func (m *manager) SetImageUsageRecorder(recorder ImageUsageRecorder) { m.imageUsageRecorder = recorder } -func (m *manager) Subscribe(instanceID string) (<-chan StateChange, func()) { - return m.stateSubscribers.Subscribe(instanceID) +func (m *manager) SubscribeLifecycleEvents(consumer LifecycleEventConsumer) (<-chan LifecycleEvent, func()) { + return m.lifecycleEvents.Subscribe(consumer) +} + +func (m *manager) notifyLifecycleEvent(ctx context.Context, action LifecycleEventAction, inst *Instance) { + if inst == nil { + return + } + m.lifecycleEvents.Notify(ctx, LifecycleEvent{ + Action: action, + InstanceID: inst.Id, + Instance: inst, + }) } -// notifyStateChange broadcasts a state change to all subscribers for the instance. -func (m *manager) notifyStateChange(instanceID string, inst *Instance) { - m.stateSubscribers.Notify(instanceID, StateChange{ - State: inst.State, - StateError: inst.StateError, +func (m *manager) notifyLifecycleDelete(ctx context.Context, instanceID string) { + m.lifecycleEvents.Notify(ctx, LifecycleEvent{ + Action: LifecycleEventDelete, + InstanceID: instanceID, }) } @@ -270,7 +282,11 @@ func (m *manager) CreateInstance(ctx context.Context, req CreateInstanceRequest) // 1. ULID generation is unique // 2. Filesystem mkdir is atomic per instance directory // 3. Concurrent creates of different instances don't conflict - return m.createInstance(ctx, req) + inst, err := m.createInstance(ctx, req) + if err == nil { + m.notifyLifecycleEvent(ctx, LifecycleEventCreate, inst) + } + return inst, err } // DeleteInstance stops and deletes an instance @@ -281,7 +297,7 @@ func (m *manager) DeleteInstance(ctx context.Context, id string) error { err := m.deleteInstance(ctx, id) if err == nil { - m.stateSubscribers.CloseAll(id) + m.notifyLifecycleDelete(ctx, id) // Clean up the lock after successful deletion m.instanceLocks.Delete(id) } @@ -332,11 +348,16 @@ func (m *manager) ForkInstance(ctx context.Context, id string, req ForkInstanceR return nil, fmt.Errorf("wait for fork guest agent readiness: %w", err) } } + m.notifyLifecycleEvent(ctx, LifecycleEventFork, inst) return inst, nil } func (m *manager) ForkSnapshot(ctx context.Context, snapshotID string, req ForkSnapshotRequest) (*Instance, error) { - return m.forkSnapshot(ctx, snapshotID, req) + inst, err := m.forkSnapshot(ctx, snapshotID, req) + if err == nil { + m.notifyLifecycleEvent(ctx, LifecycleEventFork, inst) + } + return inst, err } // StandbyInstance puts an instance in standby (pause, snapshot, delete VMM) @@ -346,7 +367,7 @@ func (m *manager) StandbyInstance(ctx context.Context, id string, req StandbyIns defer lock.Unlock() inst, err := m.standbyInstance(ctx, id, req, false) if err == nil { - m.notifyStateChange(id, inst) + m.notifyLifecycleEvent(ctx, LifecycleEventStandby, inst) } return inst, err } @@ -358,7 +379,7 @@ func (m *manager) RestoreInstance(ctx context.Context, id string) (*Instance, er defer lock.Unlock() inst, err := m.restoreInstance(ctx, id) if err == nil { - m.notifyStateChange(id, inst) + m.notifyLifecycleEvent(ctx, LifecycleEventRestore, inst) } return inst, err } @@ -369,7 +390,7 @@ func (m *manager) RestoreSnapshot(ctx context.Context, id string, snapshotID str defer lock.Unlock() inst, err := m.restoreSnapshot(ctx, id, snapshotID, req) if err == nil { - m.notifyStateChange(id, inst) + m.notifyLifecycleEvent(ctx, LifecycleEventRestore, inst) } return inst, err } @@ -381,7 +402,7 @@ func (m *manager) StopInstance(ctx context.Context, id string) (*Instance, error defer lock.Unlock() inst, err := m.stopInstance(ctx, id) if err == nil { - m.notifyStateChange(id, inst) + m.notifyLifecycleEvent(ctx, LifecycleEventStop, inst) } return inst, err } @@ -393,7 +414,7 @@ func (m *manager) StartInstance(ctx context.Context, id string, req StartInstanc defer lock.Unlock() inst, err := m.startInstance(ctx, id, req) if err == nil { - m.notifyStateChange(id, inst) + m.notifyLifecycleEvent(ctx, LifecycleEventStart, inst) } return inst, err } @@ -403,7 +424,11 @@ func (m *manager) UpdateInstance(ctx context.Context, id string, req UpdateInsta lock := m.getInstanceLock(id) lock.Lock() defer lock.Unlock() - return m.updateInstance(ctx, id, req) + inst, err := m.updateInstance(ctx, id, req) + if err == nil { + m.notifyLifecycleEvent(ctx, LifecycleEventUpdate, inst) + } + return inst, err } // ListInstances returns instances, optionally filtered by the given criteria. diff --git a/lib/instances/metrics.go b/lib/instances/metrics.go index bee79d1a..1b652fca 100644 --- a/lib/instances/metrics.go +++ b/lib/instances/metrics.go @@ -61,6 +61,10 @@ const ( snapshotCodecFallbackReasonNotExecutable snapshotCodecFallbackReason = "not_executable" ) +type lifecycleEventDropReason string + +const lifecycleEventDropReasonBufferFull lifecycleEventDropReason = "buffer_full" + // Metrics holds the metrics instruments for instance operations. type Metrics struct { createDuration metric.Float64Histogram @@ -78,6 +82,7 @@ type Metrics struct { snapshotRestoreMemoryPrepareTotal metric.Int64Counter snapshotRestoreMemoryPrepareDuration metric.Float64Histogram snapshotCompressionPreemptionsTotal metric.Int64Counter + lifecycleEventsDroppedTotal metric.Int64Counter tracer trace.Tracer } @@ -220,6 +225,14 @@ func newInstanceMetrics(meter metric.Meter, tracer trace.Tracer, m *manager) (*M return nil, err } + lifecycleEventsDroppedTotal, err := meter.Int64Counter( + "hypeman_instances_lifecycle_events_dropped_total", + metric.WithDescription("Total number of lifecycle events dropped because subscriber buffers were full"), + ) + if err != nil { + return nil, err + } + // Register observable gauge for instance counts by state instancesTotal, err := meter.Int64ObservableGauge( "hypeman_instances_total", @@ -246,6 +259,22 @@ func newInstanceMetrics(meter metric.Meter, tracer trace.Tracer, m *manager) (*M return nil, err } + lifecycleSubscribersTotal, err := meter.Int64ObservableGauge( + "hypeman_instances_lifecycle_subscribers_total", + metric.WithDescription("Current number of lifecycle event subscribers by consumer"), + ) + if err != nil { + return nil, err + } + + lifecycleSubscriberQueueDepth, err := meter.Int64ObservableGauge( + "hypeman_instances_lifecycle_subscriber_queue_depth", + metric.WithDescription("Maximum buffered lifecycle events across subscribers for each consumer"), + ) + if err != nil { + return nil, err + } + _, err = meter.RegisterCallback( func(ctx context.Context, o metric.Observer) error { instances, err := m.listInstances(ctx) @@ -332,6 +361,27 @@ func newInstanceMetrics(meter metric.Meter, tracer trace.Tracer, m *manager) (*M return nil, err } + _, err = meter.RegisterCallback( + func(ctx context.Context, o metric.Observer) error { + stats := make(map[LifecycleEventConsumer]lifecycleConsumerStats, len(allLifecycleEventConsumers)) + if m.lifecycleEvents != nil { + stats = m.lifecycleEvents.Stats() + } + for _, consumer := range allLifecycleEventConsumers { + stat := stats[consumer] + attrs := metric.WithAttributes(attribute.String("consumer", string(consumer))) + o.ObserveInt64(lifecycleSubscribersTotal, stat.Subscribers, attrs) + o.ObserveInt64(lifecycleSubscriberQueueDepth, stat.MaxQueueDepth, attrs) + } + return nil + }, + lifecycleSubscribersTotal, + lifecycleSubscriberQueueDepth, + ) + if err != nil { + return nil, err + } + return &Metrics{ createDuration: createDuration, restoreDuration: restoreDuration, @@ -348,6 +398,7 @@ func newInstanceMetrics(meter metric.Meter, tracer trace.Tracer, m *manager) (*M snapshotRestoreMemoryPrepareTotal: snapshotRestoreMemoryPrepareTotal, snapshotRestoreMemoryPrepareDuration: snapshotRestoreMemoryPrepareDuration, snapshotCompressionPreemptionsTotal: snapshotCompressionPreemptionsTotal, + lifecycleEventsDroppedTotal: lifecycleEventsDroppedTotal, tracer: tracer, }, nil } @@ -534,3 +585,14 @@ func (m *manager) recordSnapshotCodecFallback(ctx context.Context, algorithm sna attribute.String("reason", string(reason)), )) } + +func (m *manager) recordLifecycleEventDropped(ctx context.Context, consumer LifecycleEventConsumer, reason lifecycleEventDropReason) { + if m.metrics == nil { + return + } + + m.metrics.lifecycleEventsDroppedTotal.Add(ctx, 1, metric.WithAttributes( + attribute.String("consumer", string(consumer)), + attribute.String("reason", string(reason)), + )) +} diff --git a/lib/instances/metrics_test.go b/lib/instances/metrics_test.go index 88def215..b5681fd0 100644 --- a/lib/instances/metrics_test.go +++ b/lib/instances/metrics_test.go @@ -1,6 +1,7 @@ package instances import ( + "context" "os" "path/filepath" "testing" @@ -115,6 +116,106 @@ func TestSnapshotCompressionMetrics_RecordAndObserve(t *testing.T) { assert.Equal(t, "standby", metricLabel(t, active.DataPoints[0].Attributes, "source")) } +func TestLifecycleEventMetrics_ObserveSubscribersQueueDepthAndDrops(t *testing.T) { + t.Parallel() + + reader := otelmetric.NewManualReader() + provider := otelmetric.NewMeterProvider(otelmetric.WithReader(reader)) + + m := &manager{ + paths: paths.New(t.TempDir()), + lifecycleEvents: newLifecycleSubscribers(), + } + + metrics, err := newInstanceMetrics(provider.Meter("test"), nil, m) + require.NoError(t, err) + m.metrics = metrics + m.lifecycleEvents.onDrop = func(ctx context.Context, consumer LifecycleEventConsumer) { + m.recordLifecycleEventDropped(ctx, consumer, lifecycleEventDropReasonBufferFull) + } + + waitCh, waitUnsub := m.SubscribeLifecycleEvents(LifecycleEventConsumerWaitForState) + defer waitUnsub() + autoCh, autoUnsub := m.SubscribeLifecycleEvents(LifecycleEventConsumerAutoStandby) + defer autoUnsub() + + for i := 0; i < 3; i++ { + m.lifecycleEvents.Notify(t.Context(), LifecycleEvent{ + Action: LifecycleEventUpdate, + InstanceID: "inst-1", + Instance: &Instance{State: StateRunning}, + }) + } + <-waitCh + + for i := 0; i < lifecycleEventBufferSize; i++ { + m.lifecycleEvents.Notify(t.Context(), LifecycleEvent{ + Action: LifecycleEventUpdate, + InstanceID: "inst-1", + Instance: &Instance{State: StateRunning}, + }) + } + + var rm metricdata.ResourceMetrics + require.NoError(t, reader.Collect(t.Context(), &rm)) + + assertMetricNames(t, rm, []string{ + "hypeman_instances_lifecycle_subscribers_total", + "hypeman_instances_lifecycle_subscriber_queue_depth", + "hypeman_instances_lifecycle_events_dropped_total", + }) + + subscribersMetric := findMetric(t, rm, "hypeman_instances_lifecycle_subscribers_total") + subscribers, ok := subscribersMetric.Data.(metricdata.Gauge[int64]) + require.True(t, ok) + require.Len(t, subscribers.DataPoints, len(allLifecycleEventConsumers)) + for _, point := range subscribers.DataPoints { + switch metricLabel(t, point.Attributes, "consumer") { + case string(LifecycleEventConsumerWaitForState): + assert.Equal(t, int64(1), point.Value) + case string(LifecycleEventConsumerAutoStandby): + assert.Equal(t, int64(1), point.Value) + default: + t.Fatalf("unexpected consumer label: %s", metricLabel(t, point.Attributes, "consumer")) + } + } + + queueDepthMetric := findMetric(t, rm, "hypeman_instances_lifecycle_subscriber_queue_depth") + queueDepth, ok := queueDepthMetric.Data.(metricdata.Gauge[int64]) + require.True(t, ok) + require.Len(t, queueDepth.DataPoints, len(allLifecycleEventConsumers)) + for _, point := range queueDepth.DataPoints { + switch metricLabel(t, point.Attributes, "consumer") { + case string(LifecycleEventConsumerWaitForState): + assert.Equal(t, int64(lifecycleEventBufferSize), point.Value) + case string(LifecycleEventConsumerAutoStandby): + assert.Equal(t, int64(lifecycleEventBufferSize), point.Value) + default: + t.Fatalf("unexpected consumer label: %s", metricLabel(t, point.Attributes, "consumer")) + } + } + + droppedMetric := findMetric(t, rm, "hypeman_instances_lifecycle_events_dropped_total") + dropped, ok := droppedMetric.Data.(metricdata.Sum[int64]) + require.True(t, ok) + require.NotEmpty(t, dropped.DataPoints) + + var waitDrops int64 + var autoDrops int64 + for _, point := range dropped.DataPoints { + assert.Equal(t, string(lifecycleEventDropReasonBufferFull), metricLabel(t, point.Attributes, "reason")) + switch metricLabel(t, point.Attributes, "consumer") { + case string(LifecycleEventConsumerWaitForState): + waitDrops += point.Value + case string(LifecycleEventConsumerAutoStandby): + autoDrops += point.Value + } + } + assert.Greater(t, waitDrops, int64(0)) + assert.Greater(t, autoDrops, int64(0)) + assert.Equal(t, lifecycleEventBufferSize, len(autoCh)) +} + func TestInstanceOldestInStateMetric_ObserveOldestAgePerState(t *testing.T) { t.Parallel() diff --git a/lib/instances/subscribe.go b/lib/instances/subscribe.go deleted file mode 100644 index f401f89d..00000000 --- a/lib/instances/subscribe.go +++ /dev/null @@ -1,83 +0,0 @@ -package instances - -import "sync" - -// StateChange represents a state transition for an instance. -type StateChange struct { - State State - StateError *string -} - -// subscribers manages per-instance state change subscriptions. -type subscribers struct { - mu sync.Mutex - subs map[string][]chan StateChange -} - -func newSubscribers() *subscribers { - return &subscribers{ - subs: make(map[string][]chan StateChange), - } -} - -// Subscribe returns a channel that receives state changes for the given -// instance and an unsubscribe function. The channel is buffered (16) to -// avoid blocking the notifier on slow consumers. -func (s *subscribers) Subscribe(instanceID string) (<-chan StateChange, func()) { - ch := make(chan StateChange, 16) - s.mu.Lock() - s.subs[instanceID] = append(s.subs[instanceID], ch) - s.mu.Unlock() - - return ch, func() { - s.mu.Lock() - defer s.mu.Unlock() - chans := s.subs[instanceID] - for i, c := range chans { - if c == ch { - s.subs[instanceID] = append(chans[:i], chans[i+1:]...) - close(ch) - break - } - } - if len(s.subs[instanceID]) == 0 { - delete(s.subs, instanceID) - } - } -} - -// Notify fans out a state change to all subscribers for the given instance. -// Non-blocking: drops the event if a subscriber's buffer is full. -// Safe to call concurrently with CloseAll. -func (s *subscribers) Notify(instanceID string, sc StateChange) { - s.mu.Lock() - chans := make([]chan StateChange, len(s.subs[instanceID])) - copy(chans, s.subs[instanceID]) - s.mu.Unlock() - - for _, ch := range chans { - trySend(ch, sc) - } -} - -// trySend attempts a non-blocking send on ch, recovering from a panic if the -// channel was closed by a concurrent CloseAll. -func trySend(ch chan StateChange, sc StateChange) { - defer func() { recover() }() - select { - case ch <- sc: - default: - } -} - -// CloseAll closes and removes all subscriber channels for the given instance. -func (s *subscribers) CloseAll(instanceID string) { - s.mu.Lock() - chans := s.subs[instanceID] - delete(s.subs, instanceID) - s.mu.Unlock() - - for _, ch := range chans { - close(ch) - } -} diff --git a/lib/instances/subscribe_test.go b/lib/instances/subscribe_test.go deleted file mode 100644 index 667e09ab..00000000 --- a/lib/instances/subscribe_test.go +++ /dev/null @@ -1,97 +0,0 @@ -package instances - -import ( - "testing" - "time" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -func TestSubscribers_NotifyDelivers(t *testing.T) { - s := newSubscribers() - ch, unsub := s.Subscribe("inst-1") - defer unsub() - - s.Notify("inst-1", StateChange{State: StateRunning}) - - select { - case sc := <-ch: - assert.Equal(t, StateRunning, sc.State) - case <-time.After(time.Second): - t.Fatal("timed out waiting for state change") - } -} - -func TestSubscribers_MultipleSubscribers(t *testing.T) { - s := newSubscribers() - ch1, unsub1 := s.Subscribe("inst-1") - defer unsub1() - ch2, unsub2 := s.Subscribe("inst-1") - defer unsub2() - - s.Notify("inst-1", StateChange{State: StateStopped}) - - for _, ch := range []<-chan StateChange{ch1, ch2} { - select { - case sc := <-ch: - assert.Equal(t, StateStopped, sc.State) - case <-time.After(time.Second): - t.Fatal("timed out waiting for state change") - } - } -} - -func TestSubscribers_UnsubscribeStopsDelivery(t *testing.T) { - s := newSubscribers() - ch, unsub := s.Subscribe("inst-1") - unsub() - - // Channel should be closed after unsubscribe. - _, ok := <-ch - assert.False(t, ok, "channel should be closed after unsubscribe") -} - -func TestSubscribers_DifferentInstancesIsolated(t *testing.T) { - s := newSubscribers() - ch1, unsub1 := s.Subscribe("inst-1") - defer unsub1() - ch2, unsub2 := s.Subscribe("inst-2") - defer unsub2() - - s.Notify("inst-1", StateChange{State: StateRunning}) - - select { - case sc := <-ch1: - assert.Equal(t, StateRunning, sc.State) - case <-time.After(time.Second): - t.Fatal("inst-1 subscriber should have received event") - } - - select { - case <-ch2: - t.Fatal("inst-2 subscriber should not have received event") - case <-time.After(50 * time.Millisecond): - // expected - } -} - -func TestSubscribers_CloseAll(t *testing.T) { - s := newSubscribers() - ch1, _ := s.Subscribe("inst-1") - ch2, _ := s.Subscribe("inst-1") - - s.CloseAll("inst-1") - - _, ok1 := <-ch1 - _, ok2 := <-ch2 - assert.False(t, ok1, "ch1 should be closed") - assert.False(t, ok2, "ch2 should be closed") -} - -func TestSubscribers_NotifyNoSubscribersNoPanic(t *testing.T) { - s := newSubscribers() - require.NotPanics(t, func() { - s.Notify("no-such-instance", StateChange{State: StateRunning}) - }) -} diff --git a/lib/instances/wait.go b/lib/instances/wait.go index ccb52c06..23ebeb44 100644 --- a/lib/instances/wait.go +++ b/lib/instances/wait.go @@ -22,13 +22,13 @@ type WaitForStateResult struct { TimedOut bool } -// WaitForState subscribes to state change events for the instance and waits -// until it reaches targetState, a terminal/error state is detected, the timeout +// WaitForState subscribes to lifecycle events for the instance and waits until +// it reaches targetState, a terminal/error state is detected, the timeout // expires, or the context is cancelled. A polling fallback (every 5s) guards -// against missed subscription events. +// against missed or dropped events. func WaitForState(ctx context.Context, mgr Manager, inst *Instance, targetState State, timeout time.Duration) (*WaitForStateResult, error) { // Subscribe first to avoid missing events between initial check and loop. - ch, unsub := mgr.Subscribe(inst.Id) + ch, unsub := mgr.SubscribeLifecycleEvents(LifecycleEventConsumerWaitForState) defer unsub() // Already in target state — return immediately. @@ -80,14 +80,20 @@ func WaitForState(ctx context.Context, mgr Manager, inst *Instance, targetState TimedOut: latest.State != targetState, }, nil - case sc, ok := <-ch: + case event, ok := <-ch: if !ok { - // Channel closed — instance was deleted. return nil, ErrNotFound } - latest = &Instance{} - latest.State = sc.State - latest.StateError = sc.StateError + if event.InstanceID != id { + continue + } + if event.Action == LifecycleEventDelete { + return nil, ErrNotFound + } + if event.Instance == nil { + continue + } + latest = event.Instance if latest.State == targetState { return &WaitForStateResult{ diff --git a/lib/instances/wait_test.go b/lib/instances/wait_test.go index 8fe3291b..bab6f06d 100644 --- a/lib/instances/wait_test.go +++ b/lib/instances/wait_test.go @@ -13,12 +13,12 @@ import ( // stubManager is a minimal Manager implementation for unit-testing WaitForState. type stubManager struct { - subs *subscribers + subs *lifecycleSubscribers getInstance func(ctx context.Context, id string) (*Instance, error) } -func (s *stubManager) Subscribe(instanceID string) (<-chan StateChange, func()) { - return s.subs.Subscribe(instanceID) +func (s *stubManager) SubscribeLifecycleEvents(consumer LifecycleEventConsumer) (<-chan LifecycleEvent, func()) { + return s.subs.Subscribe(consumer) } func (s *stubManager) GetInstance(ctx context.Context, id string) (*Instance, error) { @@ -28,7 +28,7 @@ func (s *stubManager) GetInstance(ctx context.Context, id string) (*Instance, er return nil, ErrNotFound } -// Unused interface methods — only GetInstance and Subscribe are needed. +// Unused interface methods — only GetInstance and SubscribeLifecycleEvents are needed. func (s *stubManager) ListInstances(context.Context, *ListInstancesFilter) ([]Instance, error) { return nil, nil } @@ -87,7 +87,7 @@ func (s *stubManager) GetVsockDialer(context.Context, string) (hypervisor.VsockD func TestWaitForState_SubscriptionDelivers(t *testing.T) { t.Parallel() - subs := newSubscribers() + subs := newLifecycleSubscribers() mgr := &stubManager{subs: subs} inst := &Instance{} @@ -97,7 +97,11 @@ func TestWaitForState_SubscriptionDelivers(t *testing.T) { // Simulate a state change via subscription after 100ms. go func() { time.Sleep(100 * time.Millisecond) - subs.Notify("test-sub", StateChange{State: StateRunning}) + subs.Notify(context.Background(), LifecycleEvent{ + Action: LifecycleEventStart, + InstanceID: "test-sub", + Instance: &Instance{State: StateRunning}, + }) }() start := time.Now() @@ -113,7 +117,7 @@ func TestWaitForState_SubscriptionDelivers(t *testing.T) { func TestWaitForState_ChannelClosedOnDelete(t *testing.T) { t.Parallel() - subs := newSubscribers() + subs := newLifecycleSubscribers() mgr := &stubManager{subs: subs} inst := &Instance{} @@ -123,7 +127,10 @@ func TestWaitForState_ChannelClosedOnDelete(t *testing.T) { // Simulate instance deletion (close all subscriber channels). go func() { time.Sleep(100 * time.Millisecond) - subs.CloseAll("test-close") + subs.Notify(context.Background(), LifecycleEvent{ + Action: LifecycleEventDelete, + InstanceID: "test-close", + }) }() start := time.Now() @@ -137,7 +144,7 @@ func TestWaitForState_ChannelClosedOnDelete(t *testing.T) { func TestWaitForState_TerminalViaSubscription(t *testing.T) { t.Parallel() - subs := newSubscribers() + subs := newLifecycleSubscribers() mgr := &stubManager{subs: subs} inst := &Instance{} @@ -146,7 +153,11 @@ func TestWaitForState_TerminalViaSubscription(t *testing.T) { go func() { time.Sleep(100 * time.Millisecond) - subs.Notify("test-terminal-sub", StateChange{State: StateStopped}) + subs.Notify(context.Background(), LifecycleEvent{ + Action: LifecycleEventStop, + InstanceID: "test-terminal-sub", + Instance: &Instance{State: StateStopped}, + }) }() result, err := WaitForState(context.Background(), mgr, inst, StateRunning, 30*time.Second) @@ -159,7 +170,7 @@ func TestWaitForState_TerminalViaSubscription(t *testing.T) { func TestWaitForState_ShutdownIsTerminal(t *testing.T) { t.Parallel() - subs := newSubscribers() + subs := newLifecycleSubscribers() mgr := &stubManager{subs: subs} inst := &Instance{} @@ -168,7 +179,11 @@ func TestWaitForState_ShutdownIsTerminal(t *testing.T) { go func() { time.Sleep(100 * time.Millisecond) - subs.Notify("test-shutdown", StateChange{State: StateShutdown}) + subs.Notify(context.Background(), LifecycleEvent{ + Action: LifecycleEventStop, + InstanceID: "test-shutdown", + Instance: &Instance{State: StateShutdown}, + }) }() start := time.Now() @@ -184,7 +199,7 @@ func TestWaitForState_ShutdownIsTerminal(t *testing.T) { func TestWaitForState_PausedIsTerminal(t *testing.T) { t.Parallel() - subs := newSubscribers() + subs := newLifecycleSubscribers() mgr := &stubManager{subs: subs} inst := &Instance{} @@ -193,7 +208,11 @@ func TestWaitForState_PausedIsTerminal(t *testing.T) { go func() { time.Sleep(100 * time.Millisecond) - subs.Notify("test-paused", StateChange{State: StatePaused}) + subs.Notify(context.Background(), LifecycleEvent{ + Action: LifecycleEventStandby, + InstanceID: "test-paused", + Instance: &Instance{State: StatePaused}, + }) }() start := time.Now() @@ -206,3 +225,69 @@ func TestWaitForState_PausedIsTerminal(t *testing.T) { assert.False(t, result.TimedOut) assert.Less(t, elapsed, 2*time.Second, "paused should be detected as terminal immediately") } + +func TestWaitForState_IgnoresEventsForOtherInstances(t *testing.T) { + t.Parallel() + subs := newLifecycleSubscribers() + mgr := &stubManager{subs: subs} + + inst := &Instance{} + inst.Id = "target-instance" + inst.State = StateInitializing + + go func() { + time.Sleep(50 * time.Millisecond) + subs.Notify(context.Background(), LifecycleEvent{ + Action: LifecycleEventStart, + InstanceID: "other-instance", + Instance: &Instance{State: StateRunning}, + }) + time.Sleep(50 * time.Millisecond) + subs.Notify(context.Background(), LifecycleEvent{ + Action: LifecycleEventStart, + InstanceID: "target-instance", + Instance: &Instance{State: StateRunning}, + }) + }() + + result, err := WaitForState(context.Background(), mgr, inst, StateRunning, 30*time.Second) + + require.NoError(t, err) + require.NotNil(t, result) + assert.Equal(t, StateRunning, result.State) +} + +func TestWaitForState_IgnoresNilInstancePayloadAndUsesPollingFallback(t *testing.T) { + t.Parallel() + subs := newLifecycleSubscribers() + mgr := &stubManager{ + subs: subs, + getInstance: func(ctx context.Context, id string) (*Instance, error) { + return &Instance{ + StoredMetadata: StoredMetadata{Id: id}, + State: StateRunning, + }, nil + }, + } + + inst := &Instance{} + inst.Id = "test-nil-event" + inst.State = StateInitializing + + go func() { + time.Sleep(100 * time.Millisecond) + subs.Notify(context.Background(), LifecycleEvent{ + Action: LifecycleEventStart, + InstanceID: "test-nil-event", + }) + }() + + start := time.Now() + result, err := WaitForState(context.Background(), mgr, inst, StateRunning, 6*time.Second) + elapsed := time.Since(start) + + require.NoError(t, err) + require.NotNil(t, result) + assert.Equal(t, StateRunning, result.State) + assert.GreaterOrEqual(t, elapsed, WaitForStatePollInterval) +} From 134628d677cbb38284330c93bb7f9dc5c741c368 Mon Sep 17 00:00:00 2001 From: Steven Miller Date: Mon, 6 Apr 2026 16:13:54 -0400 Subject: [PATCH 2/2] Add config for lifecycle event buffer size --- cmd/api/config/config.go | 13 ++++++++++ cmd/api/config/config_test.go | 34 ++++++++++++++++++++++++++ lib/instances/lifecycle_events.go | 24 +++++++++++++----- lib/instances/lifecycle_events_test.go | 2 +- lib/instances/manager.go | 21 +++++++++++++++- lib/instances/metrics_test.go | 8 +++--- lib/providers/providers.go | 5 +++- 7 files changed, 94 insertions(+), 13 deletions(-) diff --git a/cmd/api/config/config.go b/cmd/api/config/config.go index ce7c90a0..55079fd6 100644 --- a/cmd/api/config/config.go +++ b/cmd/api/config/config.go @@ -144,6 +144,11 @@ type BuildConfig struct { DockerSocket string `koanf:"docker_socket"` } +// InstancesConfig holds instance-manager internal settings. +type InstancesConfig struct { + LifecycleEventBufferSize int `koanf:"lifecycle_event_buffer_size"` +} + // RegistryConfig holds OCI registry settings. type RegistryConfig struct { URL string `koanf:"url"` @@ -240,6 +245,7 @@ type Config struct { Logging LoggingConfig `koanf:"logging"` Images ImagesConfig `koanf:"images"` Build BuildConfig `koanf:"build"` + Instances InstancesConfig `koanf:"instances"` Registry RegistryConfig `koanf:"registry"` Limits LimitsConfig `koanf:"limits"` Oversubscription OversubscriptionConfig `koanf:"oversubscription"` @@ -348,6 +354,10 @@ func defaultConfig() *Config { DockerSocket: "/var/run/docker.sock", }, + Instances: InstancesConfig{ + LifecycleEventBufferSize: 256, + }, + Registry: RegistryConfig{ URL: "localhost:8080", Insecure: false, @@ -532,6 +542,9 @@ func (c *Config) Validate() error { if c.Build.Timeout <= 0 { return fmt.Errorf("build.timeout must be positive, got %d", c.Build.Timeout) } + if c.Instances.LifecycleEventBufferSize <= 0 { + return fmt.Errorf("instances.lifecycle_event_buffer_size must be positive, got %d", c.Instances.LifecycleEventBufferSize) + } if err := validateDuration("images.auto_delete.unused_for", c.Images.AutoDelete.UnusedFor); err != nil { return err } diff --git a/cmd/api/config/config_test.go b/cmd/api/config/config_test.go index bb451957..21703eb5 100644 --- a/cmd/api/config/config_test.go +++ b/cmd/api/config/config_test.go @@ -40,6 +40,9 @@ func TestDefaultConfigIncludesMetricsSettings(t *testing.T) { if len(cfg.Images.AutoDelete.Allowed) != 0 { t.Fatalf("expected default images.auto_delete.allowed to be empty, got %v", cfg.Images.AutoDelete.Allowed) } + if cfg.Instances.LifecycleEventBufferSize != 256 { + t.Fatalf("expected default instances.lifecycle_event_buffer_size to be 256, got %d", cfg.Instances.LifecycleEventBufferSize) + } } func TestLoadEnvOverridesMetricsAndOtelInterval(t *testing.T) { @@ -49,6 +52,7 @@ func TestLoadEnvOverridesMetricsAndOtelInterval(t *testing.T) { t.Setenv("METRICS__RESOURCE_REFRESH_INTERVAL", "30s") t.Setenv("OTEL__METRIC_EXPORT_INTERVAL", "15s") t.Setenv("OTEL__SUCCESSFUL_GET_SAMPLE_RATIO", "0.25") + t.Setenv("INSTANCES__LIFECYCLE_EVENT_BUFFER_SIZE", "512") tmp := t.TempDir() cfgPath := filepath.Join(tmp, "config.yaml") @@ -79,6 +83,9 @@ func TestLoadEnvOverridesMetricsAndOtelInterval(t *testing.T) { if cfg.Otel.SuccessfulGetSampleRatio != 0.25 { t.Fatalf("expected otel.successful_get_sample_ratio override, got %v", cfg.Otel.SuccessfulGetSampleRatio) } + if cfg.Instances.LifecycleEventBufferSize != 512 { + t.Fatalf("expected instances.lifecycle_event_buffer_size override, got %d", cfg.Instances.LifecycleEventBufferSize) + } } func TestValidateRejectsInvalidMetricsPort(t *testing.T) { @@ -147,6 +154,33 @@ func TestValidateRejectsInvalidResourceRefreshInterval(t *testing.T) { } } +func TestLoadUsesConfiguredLifecycleEventBufferSize(t *testing.T) { + tmp := t.TempDir() + cfgPath := filepath.Join(tmp, "config.yaml") + if err := os.WriteFile(cfgPath, []byte("instances:\n lifecycle_event_buffer_size: 384\n"), 0600); err != nil { + t.Fatalf("write temp config: %v", err) + } + + cfg, err := Load(cfgPath) + if err != nil { + t.Fatalf("load config: %v", err) + } + + if cfg.Instances.LifecycleEventBufferSize != 384 { + t.Fatalf("expected instances.lifecycle_event_buffer_size from config file, got %d", cfg.Instances.LifecycleEventBufferSize) + } +} + +func TestValidateRejectsInvalidLifecycleEventBufferSize(t *testing.T) { + cfg := defaultConfig() + cfg.Instances.LifecycleEventBufferSize = 0 + + err := cfg.Validate() + if err == nil { + t.Fatalf("expected validation error for invalid lifecycle event buffer size") + } +} + func TestLoadUsesDefaultImageAutoDeleteRetentionWindow(t *testing.T) { tmp := t.TempDir() cfgPath := filepath.Join(tmp, "config.yaml") diff --git a/lib/instances/lifecycle_events.go b/lib/instances/lifecycle_events.go index ad14cb00..5ae42024 100644 --- a/lib/instances/lifecycle_events.go +++ b/lib/instances/lifecycle_events.go @@ -5,7 +5,7 @@ import ( "sync" ) -const lifecycleEventBufferSize = 32 +const defaultLifecycleEventBufferSize = 256 // LifecycleEventConsumer identifies the internal consumer of lifecycle events. // Keep this set bounded for observability label safety. @@ -54,17 +54,29 @@ type lifecycleConsumerStats struct { } type lifecycleSubscribers struct { - mu sync.Mutex - subs []lifecycleSubscriber - onDrop func(context.Context, LifecycleEventConsumer) + mu sync.Mutex + subs []lifecycleSubscriber + onDrop func(context.Context, LifecycleEventConsumer) + bufferSize int } func newLifecycleSubscribers() *lifecycleSubscribers { - return &lifecycleSubscribers{} + return &lifecycleSubscribers{ + bufferSize: defaultLifecycleEventBufferSize, + } +} + +func newLifecycleSubscribersWithBufferSize(bufferSize int) *lifecycleSubscribers { + if bufferSize <= 0 { + bufferSize = defaultLifecycleEventBufferSize + } + return &lifecycleSubscribers{ + bufferSize: bufferSize, + } } func (s *lifecycleSubscribers) Subscribe(consumer LifecycleEventConsumer) (<-chan LifecycleEvent, func()) { - ch := make(chan LifecycleEvent, lifecycleEventBufferSize) + ch := make(chan LifecycleEvent, s.bufferSize) s.mu.Lock() s.subs = append(s.subs, lifecycleSubscriber{ diff --git a/lib/instances/lifecycle_events_test.go b/lib/instances/lifecycle_events_test.go index 0cf0c1c7..a46366e7 100644 --- a/lib/instances/lifecycle_events_test.go +++ b/lib/instances/lifecycle_events_test.go @@ -102,7 +102,7 @@ func TestLifecycleSubscribers_DropCallbackOnBackpressure(t *testing.T) { _, unsub := s.Subscribe(LifecycleEventConsumerWaitForState) defer unsub() - for i := 0; i < lifecycleEventBufferSize; i++ { + for i := 0; i < s.bufferSize; i++ { s.Notify(context.Background(), LifecycleEvent{ Action: LifecycleEventUpdate, InstanceID: "inst-1", diff --git a/lib/instances/manager.go b/lib/instances/manager.go index accb9741..9f6492ac 100644 --- a/lib/instances/manager.go +++ b/lib/instances/manager.go @@ -78,6 +78,19 @@ type ResourceLimits struct { MaxMemoryPerInstance int64 // Maximum memory in bytes per instance (0 = unlimited) } +// ManagerConfig holds non-resource manager behavior settings. +type ManagerConfig struct { + LifecycleEventBufferSize int +} + +// Normalize applies defaults to manager config values. +func (c ManagerConfig) Normalize() ManagerConfig { + if c.LifecycleEventBufferSize <= 0 { + c.LifecycleEventBufferSize = defaultLifecycleEventBufferSize + } + return c +} + // ResourceValidator validates if resources can be allocated type ResourceValidator interface { // ValidateAllocation checks if the requested resources are available. @@ -130,6 +143,11 @@ var platformStarters = make(map[hypervisor.Type]hypervisor.VMStarter) // If meter is nil, metrics are disabled. // defaultHypervisor specifies which hypervisor to use when not specified in requests. func NewManager(p *paths.Paths, imageManager images.Manager, systemManager system.Manager, networkManager network.Manager, deviceManager devices.Manager, volumeManager volumes.Manager, limits ResourceLimits, defaultHypervisor hypervisor.Type, snapshotDefaults SnapshotPolicy, meter metric.Meter, tracer trace.Tracer, memoryPolicy ...guestmemory.Policy) Manager { + return NewManagerWithConfig(p, imageManager, systemManager, networkManager, deviceManager, volumeManager, limits, defaultHypervisor, snapshotDefaults, ManagerConfig{}, meter, tracer, memoryPolicy...) +} + +// NewManagerWithConfig creates a new instances manager with additional manager settings. +func NewManagerWithConfig(p *paths.Paths, imageManager images.Manager, systemManager system.Manager, networkManager network.Manager, deviceManager devices.Manager, volumeManager volumes.Manager, limits ResourceLimits, defaultHypervisor hypervisor.Type, snapshotDefaults SnapshotPolicy, managerConfig ManagerConfig, meter metric.Meter, tracer trace.Tracer, memoryPolicy ...guestmemory.Policy) Manager { // Validate and default the hypervisor type if defaultHypervisor == "" { defaultHypervisor = hypervisor.TypeCloudHypervisor @@ -140,6 +158,7 @@ func NewManager(p *paths.Paths, imageManager images.Manager, systemManager syste policy = memoryPolicy[0] } policy = policy.Normalize() + managerConfig = managerConfig.Normalize() // Initialize VM starters from platform-specific init functions vmStarters := make(map[hypervisor.Type]hypervisor.VMStarter, len(platformStarters)) @@ -168,7 +187,7 @@ func NewManager(p *paths.Paths, imageManager images.Manager, systemManager syste snapshotDefaults: snapshotDefaults, compressionJobs: make(map[string]*compressionJob), nativeCodecPaths: make(map[string]string), - lifecycleEvents: newLifecycleSubscribers(), + lifecycleEvents: newLifecycleSubscribersWithBufferSize(managerConfig.LifecycleEventBufferSize), } m.deleteSnapshotFn = m.deleteSnapshot diff --git a/lib/instances/metrics_test.go b/lib/instances/metrics_test.go index b5681fd0..37e251dd 100644 --- a/lib/instances/metrics_test.go +++ b/lib/instances/metrics_test.go @@ -148,7 +148,7 @@ func TestLifecycleEventMetrics_ObserveSubscribersQueueDepthAndDrops(t *testing.T } <-waitCh - for i := 0; i < lifecycleEventBufferSize; i++ { + for i := 0; i < m.lifecycleEvents.bufferSize; i++ { m.lifecycleEvents.Notify(t.Context(), LifecycleEvent{ Action: LifecycleEventUpdate, InstanceID: "inst-1", @@ -187,9 +187,9 @@ func TestLifecycleEventMetrics_ObserveSubscribersQueueDepthAndDrops(t *testing.T for _, point := range queueDepth.DataPoints { switch metricLabel(t, point.Attributes, "consumer") { case string(LifecycleEventConsumerWaitForState): - assert.Equal(t, int64(lifecycleEventBufferSize), point.Value) + assert.Equal(t, int64(m.lifecycleEvents.bufferSize), point.Value) case string(LifecycleEventConsumerAutoStandby): - assert.Equal(t, int64(lifecycleEventBufferSize), point.Value) + assert.Equal(t, int64(m.lifecycleEvents.bufferSize), point.Value) default: t.Fatalf("unexpected consumer label: %s", metricLabel(t, point.Attributes, "consumer")) } @@ -213,7 +213,7 @@ func TestLifecycleEventMetrics_ObserveSubscribersQueueDepthAndDrops(t *testing.T } assert.Greater(t, waitDrops, int64(0)) assert.Greater(t, autoDrops, int64(0)) - assert.Equal(t, lifecycleEventBufferSize, len(autoCh)) + assert.Equal(t, m.lifecycleEvents.bufferSize, len(autoCh)) } func TestInstanceOldestInStateMetric_ObserveOldestAgePerState(t *testing.T) { diff --git a/lib/providers/providers.go b/lib/providers/providers.go index 4c112915..205176d8 100644 --- a/lib/providers/providers.go +++ b/lib/providers/providers.go @@ -134,7 +134,10 @@ func ProvideInstanceManager(p *paths.Paths, cfg *config.Config, imageManager ima ReclaimEnabled: cfg.Hypervisor.Memory.ReclaimEnabled, VZBalloonRequired: cfg.Hypervisor.Memory.VZBalloonRequired, } - return instances.NewManager(p, imageManager, systemManager, networkManager, deviceManager, volumeManager, limits, defaultHypervisor, snapshotDefaults, meter, tracer, memoryPolicy), nil + managerConfig := instances.ManagerConfig{ + LifecycleEventBufferSize: cfg.Instances.LifecycleEventBufferSize, + } + return instances.NewManagerWithConfig(p, imageManager, systemManager, networkManager, deviceManager, volumeManager, limits, defaultHypervisor, snapshotDefaults, managerConfig, meter, tracer, memoryPolicy), nil } func snapshotDefaultsFromConfig(cfg *config.Config) instances.SnapshotPolicy {