From 73458be8a611b5a6147ceaee91941477edec2f3c Mon Sep 17 00:00:00 2001 From: Andrea Peruffo Date: Mon, 26 Apr 2021 08:52:22 +0100 Subject: [PATCH 1/2] Fix pod config stacking order --- .../cloudflow/config/CloudflowConfig.scala | 2 +- .../config/CloudflowConfigSpec.scala | 56 +++++++++++++++++++ 2 files changed, 57 insertions(+), 1 deletion(-) diff --git a/tools/cloudflow-config/src/main/scala/akka/cloudflow/config/CloudflowConfig.scala b/tools/cloudflow-config/src/main/scala/akka/cloudflow/config/CloudflowConfig.scala index f46348d3d..b24649c29 100644 --- a/tools/cloudflow-config/src/main/scala/akka/cloudflow/config/CloudflowConfig.scala +++ b/tools/cloudflow-config/src/main/scala/akka/cloudflow/config/CloudflowConfig.scala @@ -621,8 +621,8 @@ object CloudflowConfig { ConfigFactory .empty() - .withFallback(streamletConfig) .withFallback(runtimeConfig) + .withFallback(streamletConfig) .withOnlyPath("kubernetes") } diff --git a/tools/cloudflow-config/src/test/scala/akka/cloudflow/config/CloudflowConfigSpec.scala b/tools/cloudflow-config/src/test/scala/akka/cloudflow/config/CloudflowConfigSpec.scala index a97377232..1115f84c4 100644 --- a/tools/cloudflow-config/src/test/scala/akka/cloudflow/config/CloudflowConfigSpec.scala +++ b/tools/cloudflow-config/src/test/scala/akka/cloudflow/config/CloudflowConfigSpec.scala @@ -1007,4 +1007,60 @@ class CloudflowConfigSpec extends AnyFlatSpec with Matchers with OptionValues wi res.isFailure shouldBe true res.failure.exception.getMessage.contains("cloudflow.topics.my-topic.topic.replicas") shouldBe true } + + it should "stack pod config in the correct order" in { + // Arrange + val appConfig = ConfigFactory.parseString(""" + cloudflow { + # if u remove the streamlet specific config then it works again + streamlets.logger { + kubernetes.pods.pod.containers { + cloudflow { + resources { + requests { + memory = "1G" + } + } + } + } + } + runtimes.akka { + kubernetes.pods.pod.containers { + cloudflow { + resources { + limits { + memory = "3G" + } + } + env = [ + { name = "JAVA_OPTS" + value = "-XX:MaxRAMPercentage=40.0" + } + ] + } + } + } + } + """) + + // Act + val cloudflowConfig = CloudflowConfig.loadAndValidate(appConfig).get + + val podConfig = CloudflowConfig.podsConfig("logger", "akka", cloudflowConfig) + + // Assert + podConfig + .getConfigList("kubernetes.pods.pod.containers.cloudflow.env") + .get(0) + .getString("name") shouldBe "JAVA_OPTS" + podConfig + .getConfigList("kubernetes.pods.pod.containers.cloudflow.env") + .get(0) + .getString("value") shouldBe "-XX:MaxRAMPercentage=40.0" + podConfig + .getString("kubernetes.pods.pod.containers.cloudflow.resources.requests.memory") shouldBe "1G" + podConfig + .getString("kubernetes.pods.pod.containers.cloudflow.resources.limits.memory") shouldBe "3G" + } + } From 6e353c474c3b9c273109e2cf9af8794238396e37 Mon Sep 17 00:00:00 2001 From: Andrea Peruffo Date: Mon, 26 Apr 2021 09:09:45 +0100 Subject: [PATCH 2/2] Lists to Optional --- .../cloudflow/config/CloudflowConfig.scala | 6 +-- .../config/CloudflowConfigSpec.scala | 5 ++- .../operator/action/runner/Runner.scala | 44 +++++++++++-------- .../src/main/scala/cli/SamplesGenerator.scala | 4 +- 4 files changed, 34 insertions(+), 25 deletions(-) diff --git a/tools/cloudflow-config/src/main/scala/akka/cloudflow/config/CloudflowConfig.scala b/tools/cloudflow-config/src/main/scala/akka/cloudflow/config/CloudflowConfig.scala index b24649c29..8fa6394db 100644 --- a/tools/cloudflow-config/src/main/scala/akka/cloudflow/config/CloudflowConfig.scala +++ b/tools/cloudflow-config/src/main/scala/akka/cloudflow/config/CloudflowConfig.scala @@ -190,8 +190,8 @@ object CloudflowConfig { // Container final case class Container( - env: List[EnvVar] = List(), - ports: List[ContainerPort] = List(), + env: Option[List[EnvVar]] = None, + ports: Option[List[ContainerPort]] = None, resources: Requirements = Requirements(), volumeMounts: Map[String, VolumeMount] = Map()) @@ -621,8 +621,8 @@ object CloudflowConfig { ConfigFactory .empty() - .withFallback(runtimeConfig) .withFallback(streamletConfig) + .withFallback(runtimeConfig) .withOnlyPath("kubernetes") } diff --git a/tools/cloudflow-config/src/test/scala/akka/cloudflow/config/CloudflowConfigSpec.scala b/tools/cloudflow-config/src/test/scala/akka/cloudflow/config/CloudflowConfigSpec.scala index 1115f84c4..bc4b56c90 100644 --- a/tools/cloudflow-config/src/test/scala/akka/cloudflow/config/CloudflowConfigSpec.scala +++ b/tools/cloudflow-config/src/test/scala/akka/cloudflow/config/CloudflowConfigSpec.scala @@ -459,8 +459,8 @@ class CloudflowConfigSpec extends AnyFlatSpec with Matchers with OptionValues wi executor.annotations(AnnotationKey("akey3")) shouldBe AnnotationValue("avalue3") executor.annotations(AnnotationKey("akey4")) shouldBe AnnotationValue("avalue4") - val driverContainerEnv = driver.containers("container").env.head - val executorContainerEnv = executor.containers("container").env.head + val driverContainerEnv = driver.containers("container").env.head.head + val executorContainerEnv = executor.containers("container").env.head.head driverContainerEnv.name shouldBe "FOO" driverContainerEnv.value shouldBe "BAR" @@ -615,6 +615,7 @@ class CloudflowConfigSpec extends AnyFlatSpec with Matchers with OptionValues wi .pods("pod") .containers("container") .ports + .get ports(0).containerPort shouldBe 9001 ports(0).protocol shouldBe "TCP" diff --git a/tools/cloudflow-operator/src/main/scala/cloudflow/operator/action/runner/Runner.scala b/tools/cloudflow-operator/src/main/scala/cloudflow/operator/action/runner/Runner.scala index f26a37e77..c25f1cba0 100644 --- a/tools/cloudflow-operator/src/main/scala/cloudflow/operator/action/runner/Runner.scala +++ b/tools/cloudflow-operator/src/main/scala/cloudflow/operator/action/runner/Runner.scala @@ -384,12 +384,16 @@ object PodsConfig { */ private def getContainerConfig(container: CloudflowConfig.Container): ContainerConfig = { - val env = container.env.map { env => - new EnvVarBuilder() - .withName(env.name) - .withValue(env.value) - .build() - } + val env = container.env + .map { + _.map { env => + new EnvVarBuilder() + .withName(env.name) + .withValue(env.value) + .build() + } + } + .getOrElse(List()) val resources = { val limits = { @@ -423,18 +427,22 @@ object PodsConfig { .build() }.toList - val ports = container.ports.map { port => - val base = new ContainerPortBuilder() - .withName(port.name.map(_.value).getOrElse(null)) // TODO: check if empty string or null - .withContainerPort(port.containerPort) - .withHostIP(port.hostIP) - .withProtocol(port.protocol) - - (port.hostPort match { - case Some(hp) => base.withHostPort(hp) - case _ => base - }).build() - } + val ports = container.ports + .map { + _.map { port => + val base = new ContainerPortBuilder() + .withName(port.name.map(_.value).getOrElse(null)) // TODO: check if empty string or null + .withContainerPort(port.containerPort) + .withHostIP(port.hostIP) + .withProtocol(port.protocol) + + (port.hostPort match { + case Some(hp) => base.withHostPort(hp) + case _ => base + }).build() + } + } + .getOrElse(List()) ContainerConfig(env = env, resources = resources, volumeMounts = volumeMounts, ports = ports) } diff --git a/tools/tooling/src/main/scala/cli/SamplesGenerator.scala b/tools/tooling/src/main/scala/cli/SamplesGenerator.scala index b62c04613..467f012d8 100644 --- a/tools/tooling/src/main/scala/cli/SamplesGenerator.scala +++ b/tools/tooling/src/main/scala/cli/SamplesGenerator.scala @@ -27,7 +27,7 @@ object SamplesGenerator extends App { "my-pvc" -> PvcVolume(name = "cloudflow-pvc", readOnly = false), "my-secret" -> SecretVolume(name = "secret.conf")), containers = Map("my-container" -> Container( - env = List(EnvVar("ENV_VAR_KEY", "ENV_VAR_VALUE")), + env = Some(List(EnvVar("ENV_VAR_KEY", "ENV_VAR_VALUE"))), resources = Requirements( requests = Requirement(cpu = Some(Quantity("1")), memory = Some(Quantity("1Gb"))), limits = Requirement(cpu = Some(Quantity("2")), memory = Some(Quantity("2Gb")))), @@ -44,7 +44,7 @@ object SamplesGenerator extends App { "my-pvc" -> PvcVolume(name = "cloudflow-pvc", readOnly = false), "my-secret" -> SecretVolume(name = "secret.conf")), containers = Map("my-container" -> Container( - env = List(EnvVar("ENV_VAR_KEY", "ENV_VAR_VALUE")), + env = Some(List(EnvVar("ENV_VAR_KEY", "ENV_VAR_VALUE"))), resources = Requirements( requests = Requirement(cpu = Some(Quantity("1")), memory = Some(Quantity("1Gb"))), limits = Requirement(cpu = Some(Quantity("2")), memory = Some(Quantity("2Gb")))),