Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions cmd/api/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,11 @@ type BuildConfig struct {
DockerSocket string `koanf:"docker_socket"`
}

// InstancesConfig holds instance-manager internal settings.
type InstancesConfig struct {
LifecycleEventBufferSize int `koanf:"lifecycle_event_buffer_size"`
}

// RegistryConfig holds OCI registry settings.
type RegistryConfig struct {
URL string `koanf:"url"`
Expand Down Expand Up @@ -240,6 +245,7 @@ type Config struct {
Logging LoggingConfig `koanf:"logging"`
Images ImagesConfig `koanf:"images"`
Build BuildConfig `koanf:"build"`
Instances InstancesConfig `koanf:"instances"`
Registry RegistryConfig `koanf:"registry"`
Limits LimitsConfig `koanf:"limits"`
Oversubscription OversubscriptionConfig `koanf:"oversubscription"`
Expand Down Expand Up @@ -348,6 +354,10 @@ func defaultConfig() *Config {
DockerSocket: "/var/run/docker.sock",
},

Instances: InstancesConfig{
LifecycleEventBufferSize: 256,
},

Registry: RegistryConfig{
URL: "localhost:8080",
Insecure: false,
Expand Down Expand Up @@ -532,6 +542,9 @@ func (c *Config) Validate() error {
if c.Build.Timeout <= 0 {
return fmt.Errorf("build.timeout must be positive, got %d", c.Build.Timeout)
}
if c.Instances.LifecycleEventBufferSize <= 0 {
return fmt.Errorf("instances.lifecycle_event_buffer_size must be positive, got %d", c.Instances.LifecycleEventBufferSize)
}
if err := validateDuration("images.auto_delete.unused_for", c.Images.AutoDelete.UnusedFor); err != nil {
return err
}
Expand Down
34 changes: 34 additions & 0 deletions cmd/api/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ func TestDefaultConfigIncludesMetricsSettings(t *testing.T) {
if len(cfg.Images.AutoDelete.Allowed) != 0 {
t.Fatalf("expected default images.auto_delete.allowed to be empty, got %v", cfg.Images.AutoDelete.Allowed)
}
if cfg.Instances.LifecycleEventBufferSize != 256 {
t.Fatalf("expected default instances.lifecycle_event_buffer_size to be 256, got %d", cfg.Instances.LifecycleEventBufferSize)
}
}

func TestLoadEnvOverridesMetricsAndOtelInterval(t *testing.T) {
Expand All @@ -49,6 +52,7 @@ func TestLoadEnvOverridesMetricsAndOtelInterval(t *testing.T) {
t.Setenv("METRICS__RESOURCE_REFRESH_INTERVAL", "30s")
t.Setenv("OTEL__METRIC_EXPORT_INTERVAL", "15s")
t.Setenv("OTEL__SUCCESSFUL_GET_SAMPLE_RATIO", "0.25")
t.Setenv("INSTANCES__LIFECYCLE_EVENT_BUFFER_SIZE", "512")

tmp := t.TempDir()
cfgPath := filepath.Join(tmp, "config.yaml")
Expand Down Expand Up @@ -79,6 +83,9 @@ func TestLoadEnvOverridesMetricsAndOtelInterval(t *testing.T) {
if cfg.Otel.SuccessfulGetSampleRatio != 0.25 {
t.Fatalf("expected otel.successful_get_sample_ratio override, got %v", cfg.Otel.SuccessfulGetSampleRatio)
}
if cfg.Instances.LifecycleEventBufferSize != 512 {
t.Fatalf("expected instances.lifecycle_event_buffer_size override, got %d", cfg.Instances.LifecycleEventBufferSize)
}
}

func TestValidateRejectsInvalidMetricsPort(t *testing.T) {
Expand Down Expand Up @@ -147,6 +154,33 @@ func TestValidateRejectsInvalidResourceRefreshInterval(t *testing.T) {
}
}

func TestLoadUsesConfiguredLifecycleEventBufferSize(t *testing.T) {
tmp := t.TempDir()
cfgPath := filepath.Join(tmp, "config.yaml")
if err := os.WriteFile(cfgPath, []byte("instances:\n lifecycle_event_buffer_size: 384\n"), 0600); err != nil {
t.Fatalf("write temp config: %v", err)
}

cfg, err := Load(cfgPath)
if err != nil {
t.Fatalf("load config: %v", err)
}

if cfg.Instances.LifecycleEventBufferSize != 384 {
t.Fatalf("expected instances.lifecycle_event_buffer_size from config file, got %d", cfg.Instances.LifecycleEventBufferSize)
}
}

func TestValidateRejectsInvalidLifecycleEventBufferSize(t *testing.T) {
cfg := defaultConfig()
cfg.Instances.LifecycleEventBufferSize = 0

err := cfg.Validate()
if err == nil {
t.Fatalf("expected validation error for invalid lifecycle event buffer size")
}
}

func TestLoadUsesDefaultImageAutoDeleteRetentionWindow(t *testing.T) {
tmp := t.TempDir()
cfgPath := filepath.Join(tmp, "config.yaml")
Expand Down
4 changes: 2 additions & 2 deletions lib/builds/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,8 @@ func (m *mockInstanceManager) GetVsockDialer(ctx context.Context, instanceID str
return nil, nil
}

func (m *mockInstanceManager) Subscribe(instanceID string) (<-chan instances.StateChange, func()) {
ch := make(chan instances.StateChange, 1)
func (m *mockInstanceManager) SubscribeLifecycleEvents(consumer instances.LifecycleEventConsumer) (<-chan instances.LifecycleEvent, func()) {
ch := make(chan instances.LifecycleEvent, 1)
return ch, func() { close(ch) }
}

Expand Down
4 changes: 2 additions & 2 deletions lib/instances/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,9 @@ Any State → Stopped
- If an instance is deleted, its schedule is retained so retention can continue cleaning existing scheduled snapshots.
- Once the deleted instance has no scheduled snapshots left, the scheduler removes that schedule automatically.

## WaitForState (wait.go, subscribe.go)
## WaitForState (wait.go, lifecycle_events.go)

Waits for an instance to reach a target state using channel-based subscriptions with a polling fallback. State-changing manager methods (start, stop, standby, restore, fork, delete) broadcast via `notifyStateChange`, which fans out to subscriber channels. `WaitForState` uses a 3-way select: subscription events, 5s polling fallback (logs at debug level if it detects the state change), and context cancellation/timeout. Returns early on terminal (`Stopped`, `Standby`, `Shutdown`) or error (`Unknown`) states. Used by `GET /instances/{id}/wait`.
Waits for an instance to reach a target state using the shared lifecycle event bus with a polling fallback. State-changing manager methods publish lifecycle events after successful mutations, and `WaitForState` filters them by `instance_id`. A 5s polling fallback guards against missed or dropped events. Returns early on terminal (`Stopped`, `Standby`, `Shutdown`) or error (`Unknown`) states. Used by `GET /instances/{id}/wait`.

## Reference Handling

Expand Down
148 changes: 148 additions & 0 deletions lib/instances/lifecycle_events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
package instances

import (
"context"
"sync"
)

const defaultLifecycleEventBufferSize = 256

// LifecycleEventConsumer identifies the internal consumer of lifecycle events.
// Keep this set bounded for observability label safety.
type LifecycleEventConsumer string

const (
LifecycleEventConsumerWaitForState LifecycleEventConsumer = "wait_for_state"
LifecycleEventConsumerAutoStandby LifecycleEventConsumer = "auto_standby"
)

var allLifecycleEventConsumers = []LifecycleEventConsumer{
LifecycleEventConsumerWaitForState,
LifecycleEventConsumerAutoStandby,
}

// LifecycleEventAction identifies which instance lifecycle action occurred.
type LifecycleEventAction string

const (
LifecycleEventCreate LifecycleEventAction = "create"
LifecycleEventUpdate LifecycleEventAction = "update"
LifecycleEventStart LifecycleEventAction = "start"
LifecycleEventStop LifecycleEventAction = "stop"
LifecycleEventStandby LifecycleEventAction = "standby"
LifecycleEventRestore LifecycleEventAction = "restore"
LifecycleEventDelete LifecycleEventAction = "delete"
LifecycleEventFork LifecycleEventAction = "fork"
)

// LifecycleEvent is a global instance change event stream used by internal
// consumers such as wait-for-state and background controllers.
type LifecycleEvent struct {
Action LifecycleEventAction
InstanceID string
Instance *Instance
}

type lifecycleSubscriber struct {
consumer LifecycleEventConsumer
ch chan LifecycleEvent
}

type lifecycleConsumerStats struct {
Subscribers int64
MaxQueueDepth int64
}

type lifecycleSubscribers struct {
mu sync.Mutex
subs []lifecycleSubscriber
onDrop func(context.Context, LifecycleEventConsumer)
bufferSize int
}

func newLifecycleSubscribers() *lifecycleSubscribers {
return &lifecycleSubscribers{
bufferSize: defaultLifecycleEventBufferSize,
}
}

func newLifecycleSubscribersWithBufferSize(bufferSize int) *lifecycleSubscribers {
if bufferSize <= 0 {
bufferSize = defaultLifecycleEventBufferSize
}
return &lifecycleSubscribers{
bufferSize: bufferSize,
}
}

func (s *lifecycleSubscribers) Subscribe(consumer LifecycleEventConsumer) (<-chan LifecycleEvent, func()) {
ch := make(chan LifecycleEvent, s.bufferSize)

s.mu.Lock()
s.subs = append(s.subs, lifecycleSubscriber{
consumer: consumer,
ch: ch,
})
s.mu.Unlock()

return ch, func() {
s.mu.Lock()
defer s.mu.Unlock()
for i, sub := range s.subs {
if sub.ch == ch {
s.subs = append(s.subs[:i], s.subs[i+1:]...)
close(ch)
break
}
}
}
}

func (s *lifecycleSubscribers) Notify(ctx context.Context, event LifecycleEvent) {
s.mu.Lock()
subs := append([]lifecycleSubscriber(nil), s.subs...)
s.mu.Unlock()

for _, sub := range subs {
if trySendLifecycleEvent(sub.ch, event) && s.onDrop != nil {
s.onDrop(ctx, sub.consumer)
}
}
}

func (s *lifecycleSubscribers) Stats() map[LifecycleEventConsumer]lifecycleConsumerStats {
s.mu.Lock()
defer s.mu.Unlock()

stats := make(map[LifecycleEventConsumer]lifecycleConsumerStats, len(allLifecycleEventConsumers))
for _, consumer := range allLifecycleEventConsumers {
stats[consumer] = lifecycleConsumerStats{}
}
for _, sub := range s.subs {
stat := stats[sub.consumer]
stat.Subscribers++
queueDepth := int64(len(sub.ch))
if queueDepth > stat.MaxQueueDepth {
stat.MaxQueueDepth = queueDepth
}
stats[sub.consumer] = stat
}
return stats
}

// trySendLifecycleEvent attempts a non-blocking send.
// Returns true when the event was dropped because the channel buffer was full.
func trySendLifecycleEvent(ch chan LifecycleEvent, event LifecycleEvent) (dropped bool) {
defer func() {
if recover() != nil {
dropped = false
}
}()

select {
case ch <- event:
return false
default:
return true
}
}
Loading
Loading