Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions configresolve/resolve.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,15 @@ type Resolved struct {
K8sWorkerTolerationKey string
K8sWorkerTolerationValue string
K8sWorkerExclusiveNode bool
K8sIceboomEnabled bool
K8sIceboomImage string
K8sIceboomPort int
K8sIceboomConfigMap string
K8sIceboomCPURequest string
K8sIceboomCPULimit string
K8sIceboomMemoryRequest string
K8sIceboomMemoryLimit string
K8sIceboomImagePullPolicy string
AWSRegion string
ConfigStoreConn string
ConfigPollInterval time.Duration
Expand Down Expand Up @@ -192,6 +201,11 @@ func ResolveEffective(fileCfg *configloader.FileConfig, cli CLIInputs, getenv fu
var k8sWorkerCPURequest, k8sWorkerMemoryRequest string
var k8sWorkerNodeSelector, k8sWorkerTolerationKey, k8sWorkerTolerationValue string
var k8sWorkerExclusiveNode bool
var k8sIceboomEnabled bool
var k8sIceboomImage, k8sIceboomConfigMap, k8sIceboomImagePullPolicy string
var k8sIceboomCPURequest, k8sIceboomCPULimit string
var k8sIceboomMemoryRequest, k8sIceboomMemoryLimit string
var k8sIceboomPort int
var awsRegion string
var configStoreConn string
var configPollInterval time.Duration
Expand Down Expand Up @@ -836,6 +850,41 @@ func ResolveEffective(fileCfg *configloader.FileConfig, cli CLIInputs, getenv fu
k8sWorkerExclusiveNode = b
}
}
if v := getenv("DUCKGRES_K8S_WORKER_ICEBOOM_ENABLED"); v != "" {
if b, err := strconv.ParseBool(v); err == nil {
k8sIceboomEnabled = b
} else {
warn("Invalid DUCKGRES_K8S_WORKER_ICEBOOM_ENABLED: " + err.Error())
}
}
if v := getenv("DUCKGRES_K8S_WORKER_ICEBOOM_IMAGE"); v != "" {
k8sIceboomImage = v
}
if v := getenv("DUCKGRES_K8S_WORKER_ICEBOOM_PORT"); v != "" {
if n, err := strconv.Atoi(v); err == nil {
k8sIceboomPort = n
} else {
warn("Invalid DUCKGRES_K8S_WORKER_ICEBOOM_PORT: " + err.Error())
}
}
if v := getenv("DUCKGRES_K8S_WORKER_ICEBOOM_CONFIGMAP"); v != "" {
k8sIceboomConfigMap = v
}
if v := getenv("DUCKGRES_K8S_WORKER_ICEBOOM_CPU_REQUEST"); v != "" {
k8sIceboomCPURequest = v
}
if v := getenv("DUCKGRES_K8S_WORKER_ICEBOOM_CPU_LIMIT"); v != "" {
k8sIceboomCPULimit = v
}
if v := getenv("DUCKGRES_K8S_WORKER_ICEBOOM_MEMORY_REQUEST"); v != "" {
k8sIceboomMemoryRequest = v
}
if v := getenv("DUCKGRES_K8S_WORKER_ICEBOOM_MEMORY_LIMIT"); v != "" {
k8sIceboomMemoryLimit = v
}
if v := getenv("DUCKGRES_K8S_WORKER_ICEBOOM_IMAGE_PULL_POLICY"); v != "" {
k8sIceboomImagePullPolicy = v
}
if v := getenv("DUCKGRES_AWS_REGION"); v != "" {
awsRegion = v
}
Expand Down Expand Up @@ -1164,6 +1213,15 @@ func ResolveEffective(fileCfg *configloader.FileConfig, cli CLIInputs, getenv fu
K8sWorkerTolerationKey: k8sWorkerTolerationKey,
K8sWorkerTolerationValue: k8sWorkerTolerationValue,
K8sWorkerExclusiveNode: k8sWorkerExclusiveNode,
K8sIceboomEnabled: k8sIceboomEnabled,
K8sIceboomImage: k8sIceboomImage,
K8sIceboomPort: k8sIceboomPort,
K8sIceboomConfigMap: k8sIceboomConfigMap,
K8sIceboomCPURequest: k8sIceboomCPURequest,
K8sIceboomCPULimit: k8sIceboomCPULimit,
K8sIceboomMemoryRequest: k8sIceboomMemoryRequest,
K8sIceboomMemoryLimit: k8sIceboomMemoryLimit,
K8sIceboomImagePullPolicy: k8sIceboomImagePullPolicy,
AWSRegion: awsRegion,
ConfigStoreConn: configStoreConn,
ConfigPollInterval: configPollInterval,
Expand Down
18 changes: 18 additions & 0 deletions controlplane/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,24 @@ type K8sConfig struct {
WorkerTolerationValue string // Taint value for worker pod NoSchedule toleration
WorkerExclusiveNode bool // One worker per node via pod anti-affinity
AWSRegion string // AWS region for STS client
Iceboom IceboomConfig
}

// IceboomConfig configures the optional iceboom sidecar injected into each
// worker pod. iceboom is an Iceberg REST catalog proxy that fronts AWS S3
// Tables; the control plane reconfigures it per tenant at activation time,
// reusing the same STS-broker path used for DuckLake S3 creds. Chart wiring
// lives in PostHog/charts charts/duckgres.
type IceboomConfig struct {
Enabled bool
Image string
Port int // Loopback port iceboom listens on inside the worker pod
ConfigMap string // ConfigMap mounted at /etc/iceboom (must define config.toml)
CPURequest string // CPU request (e.g., "200m"). Empty = no resources block.
CPULimit string // CPU limit; should equal request for Guaranteed QoS.
MemoryRequest string // Memory request (e.g., "128Mi").
MemoryLimit string // Memory limit; should equal request for Guaranteed QoS.
ImagePullPolicy string
}

// ControlPlane manages the TCP listener and routes connections to Flight SQL workers.
Expand Down
103 changes: 103 additions & 0 deletions controlplane/k8s_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ type K8sWorkerPool struct {
workerTolerationKey string // taint key for NoSchedule toleration
workerTolerationValue string // taint value for NoSchedule toleration
workerExclusiveNode bool // one worker per node via anti-affinity
iceboom IceboomPoolConfig // optional iceboom sidecar (Iceberg REST catalog proxy)
orgID string // org ID for pod labels (multi-tenant mode)
workerIDGenerator func() int // shared ID generator across orgs (nil = internal counter)
resolveOrgConfig func(string) (*configstore.OrgConfig, error) // resolve org config for per-tenant image reaping
Expand Down Expand Up @@ -143,6 +144,20 @@ func newK8sWorkerPool(cfg K8sWorkerPoolConfig, clientset kubernetes.Interface) (
if strings.TrimSpace(cfg.ServiceAccount) == "" {
cfg.ServiceAccount = DefaultK8sWorkerServiceAccount
}
if cfg.Iceboom.Enabled {
if cfg.Iceboom.Image == "" {
return nil, fmt.Errorf("iceboom.image is required when iceboom is enabled")
}
if cfg.Iceboom.ConfigMap == "" {
return nil, fmt.Errorf("iceboom.configMap is required when iceboom is enabled")
}
if cfg.Iceboom.Port == 0 {
cfg.Iceboom.Port = 8181
}
if cfg.Iceboom.ImagePullPolicy == "" {
cfg.Iceboom.ImagePullPolicy = cfg.ImagePullPolicy
}
}

// Limit concurrent K8s API calls to avoid overwhelming the API server.
spawnConcurrency := 3
Expand Down Expand Up @@ -170,6 +185,7 @@ func newK8sWorkerPool(cfg K8sWorkerPoolConfig, clientset kubernetes.Interface) (
workerTolerationKey: cfg.WorkerTolerationKey,
workerTolerationValue: cfg.WorkerTolerationValue,
workerExclusiveNode: cfg.WorkerExclusiveNode,
iceboom: cfg.Iceboom,
orgID: cfg.OrgID,
workerIDGenerator: cfg.WorkerIDGenerator,
resolveOrgConfig: cfg.ResolveOrgConfig,
Expand Down Expand Up @@ -752,6 +768,24 @@ func (p *K8sWorkerPool) spawnWorker(ctx context.Context, id int, image string, p
})
}

// Iceboom sidecar: Iceberg REST catalog proxy on 127.0.0.1:<port>. The
// container starts with a bootable upstream-disabled config; the control
// plane reconfigures it per tenant at activation time via an HTTP RPC
// (iceboom roadmap phase 8). Adding it here keeps the worker pod's QoS
// class consistent — iceboom carries its own request==limit when those
// values are populated.
if p.iceboom.Enabled {
pod.Spec.Containers = append(pod.Spec.Containers, p.buildIceboomContainer())
pod.Spec.Volumes = append(pod.Spec.Volumes, corev1.Volume{
Name: "iceboom-config",
VolumeSource: corev1.VolumeSource{
ConfigMap: &corev1.ConfigMapVolumeSource{
LocalObjectReference: corev1.LocalObjectReference{Name: p.iceboom.ConfigMap},
},
},
})
}

// Delete stale pod with the same name if it exists (from a previous run)
_ = p.clientset.CoreV1().Pods(p.namespace).Delete(ctx, podName, metav1.DeleteOptions{
GracePeriodSeconds: int64Ptr(0),
Expand Down Expand Up @@ -2624,6 +2658,75 @@ func (p *K8sWorkerPool) workerResources() corev1.ResourceRequirements {
return corev1.ResourceRequirements{Requests: requests, Limits: limits}
}

// buildIceboomContainer assembles the iceboom sidecar container for a
// worker pod. iceboom listens on 127.0.0.1:<port> (loopback only — the
// duckdb-worker container in the same pod reaches it via localhost) and
// reads its bootable config from the mounted ConfigMap. The control
// plane reconfigures the upstream / per-tenant warehouse ARN / STS creds
// at activation time over a separate RPC.
//
// The container intentionally:
// - runs with allowPrivilegeEscalation=false to match the worker.
// - mounts the iceboom-config volume read-only at /etc/iceboom.
// - takes the same imagePullPolicy as the worker when unset.
// - emits a livenessProbe but no readinessProbe (the worker pod's
// readiness is driven by duckdb-worker; iceboom's /readyz depends on
// the upstream, which isn't configured until activation).
func (p *K8sWorkerPool) buildIceboomContainer() corev1.Container {
c := corev1.Container{
Name: "iceboom",
Image: p.iceboom.Image,
ImagePullPolicy: corev1.PullPolicy(p.iceboom.ImagePullPolicy),
Args: []string{"--config", "/etc/iceboom/config.toml"},
Ports: []corev1.ContainerPort{
{
Name: "iceberg",
ContainerPort: int32(p.iceboom.Port),
Protocol: corev1.ProtocolTCP,
},
},
SecurityContext: &corev1.SecurityContext{
AllowPrivilegeEscalation: boolPtr(false),
},
VolumeMounts: []corev1.VolumeMount{{
Name: "iceboom-config",
MountPath: "/etc/iceboom",
ReadOnly: true,
}},
Resources: p.iceboomResources(),
}
return c
}

// iceboomResources returns the resource block for the iceboom sidecar.
// Mirrors workerResources's BestEffort-when-empty semantics; when CPU /
// memory limits are set independently from requests, both are honored so
// Guaranteed QoS only kicks in when the operator opts in.
func (p *K8sWorkerPool) iceboomResources() corev1.ResourceRequirements {
requests := corev1.ResourceList{}
if p.iceboom.CPURequest != "" {
requests[corev1.ResourceCPU] = resource.MustParse(p.iceboom.CPURequest)
}
if p.iceboom.MemoryRequest != "" {
requests[corev1.ResourceMemory] = resource.MustParse(p.iceboom.MemoryRequest)
}
limits := corev1.ResourceList{}
if p.iceboom.CPULimit != "" {
limits[corev1.ResourceCPU] = resource.MustParse(p.iceboom.CPULimit)
}
if p.iceboom.MemoryLimit != "" {
limits[corev1.ResourceMemory] = resource.MustParse(p.iceboom.MemoryLimit)
}
out := corev1.ResourceRequirements{}
if len(requests) > 0 {
out.Requests = requests
}
if len(limits) > 0 {
out.Limits = limits
}
return out
}

// --- Helpers ---

// allocateWorkerID returns the next worker ID, using the shared generator
Expand Down
134 changes: 134 additions & 0 deletions controlplane/k8s_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2193,6 +2193,140 @@ func assertSpawnedWorkerPod(t *testing.T, pod *corev1.Pod) {
}
}

// When iceboom is enabled the spawned worker pod must carry the iceboom
// sidecar container, an iceboom-config ConfigMap volume, and a read-only
// mount at /etc/iceboom on the sidecar. Pod-level QoS depends on every
// container carrying matching request/limit pairs, so this also asserts
// that the sidecar resources block is what we configured.
func TestK8sPool_SpawnWorkerInjectsIceboomSidecarWhenEnabled(t *testing.T) {
pool, cs := newTestK8sPool(t, 5)
pool.iceboom = IceboomPoolConfig{
Enabled: true,
Image: "iceboom:test",
Port: 8181,
ConfigMap: "duckgres-iceboom-config",
CPURequest: "200m",
CPULimit: "200m",
MemoryRequest: "128Mi",
MemoryLimit: "256Mi",
ImagePullPolicy: "IfNotPresent",
}

var createdWorkerPod *corev1.Pod
cs.PrependReactor("create", "pods", func(action k8stesting.Action) (bool, runtime.Object, error) {
createAction, ok := action.(k8stesting.CreateAction)
if !ok {
return false, nil, nil
}
pod, ok := createAction.GetObject().(*corev1.Pod)
if !ok {
return false, nil, nil
}
if pod.Labels["app"] == "duckgres-worker" {
createdWorkerPod = pod.DeepCopy()
}
return false, nil, nil
})

ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
_ = pool.SpawnWorker(ctx, 0, pool.workerImage)

if createdWorkerPod == nil {
t.Fatal("expected worker pod create to be attempted")
}

if got, want := len(createdWorkerPod.Spec.Containers), 2; got != want {
t.Fatalf("expected %d containers (worker + iceboom), got %d", want, got)
}
var iceboom *corev1.Container
for i := range createdWorkerPod.Spec.Containers {
if createdWorkerPod.Spec.Containers[i].Name == "iceboom" {
iceboom = &createdWorkerPod.Spec.Containers[i]
break
}
}
if iceboom == nil {
t.Fatal("iceboom container not found in pod spec")
}
if iceboom.Image != "iceboom:test" {
t.Fatalf("expected iceboom image iceboom:test, got %q", iceboom.Image)
}
if string(iceboom.ImagePullPolicy) != "IfNotPresent" {
t.Fatalf("expected pullPolicy IfNotPresent, got %q", iceboom.ImagePullPolicy)
}
if len(iceboom.Args) != 2 || iceboom.Args[0] != "--config" || iceboom.Args[1] != "/etc/iceboom/config.toml" {
t.Fatalf("expected args [--config /etc/iceboom/config.toml], got %v", iceboom.Args)
}
if iceboom.SecurityContext == nil || iceboom.SecurityContext.AllowPrivilegeEscalation == nil || *iceboom.SecurityContext.AllowPrivilegeEscalation {
t.Fatal("expected allowPrivilegeEscalation=false on iceboom container")
}
foundMount := false
for _, vm := range iceboom.VolumeMounts {
if vm.Name == "iceboom-config" && vm.MountPath == "/etc/iceboom" && vm.ReadOnly {
foundMount = true
}
}
if !foundMount {
t.Fatalf("expected read-only iceboom-config mount at /etc/iceboom, got %+v", iceboom.VolumeMounts)
}
if iceboom.Resources.Requests.Cpu().String() != "200m" {
t.Fatalf("expected iceboom cpu request 200m, got %s", iceboom.Resources.Requests.Cpu().String())
}
if iceboom.Resources.Limits.Memory().String() != "256Mi" {
t.Fatalf("expected iceboom memory limit 256Mi, got %s", iceboom.Resources.Limits.Memory().String())
}

foundVol := false
for _, v := range createdWorkerPod.Spec.Volumes {
if v.Name == "iceboom-config" && v.ConfigMap != nil && v.ConfigMap.Name == "duckgres-iceboom-config" {
foundVol = true
}
}
if !foundVol {
t.Fatal("expected iceboom-config ConfigMap volume on pod spec")
}
}

// Default disabled iceboom must not perturb the spawned pod spec: still
// exactly one container, no iceboom volume.
func TestK8sPool_SpawnWorkerDoesNotInjectIceboomWhenDisabled(t *testing.T) {
pool, cs := newTestK8sPool(t, 5)
// iceboom left zero-valued (Enabled=false).

var createdWorkerPod *corev1.Pod
cs.PrependReactor("create", "pods", func(action k8stesting.Action) (bool, runtime.Object, error) {
createAction, ok := action.(k8stesting.CreateAction)
if !ok {
return false, nil, nil
}
pod, ok := createAction.GetObject().(*corev1.Pod)
if !ok {
return false, nil, nil
}
if pod.Labels["app"] == "duckgres-worker" {
createdWorkerPod = pod.DeepCopy()
}
return false, nil, nil
})

ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
_ = pool.SpawnWorker(ctx, 0, pool.workerImage)

if createdWorkerPod == nil {
t.Fatal("expected worker pod create to be attempted")
}
if got := len(createdWorkerPod.Spec.Containers); got != 1 {
t.Fatalf("expected 1 container when iceboom disabled, got %d", got)
}
for _, v := range createdWorkerPod.Spec.Volumes {
if v.Name == "iceboom-config" {
t.Fatal("did not expect iceboom-config volume when iceboom is disabled")
}
}
}

func TestK8sPool_RetireWorkerDeletesWorkerRPCSecret(t *testing.T) {
pool, cs := newTestK8sPool(t, 5)
worker := &ManagedWorker{ID: 1, podName: "duckgres-worker-test-cp-1", done: make(chan struct{})}
Expand Down
Loading
Loading