Skip to content
This repository was archived by the owner on Nov 22, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
eb74884
Version is not needed
franciscolopezsancho Oct 22, 2021
dbecc7f
Merge branch 'main' of github.com:lightbend/cloudflow into main
franciscolopezsancho Nov 23, 2021
405d14e
Merge branch 'main' of github.com:lightbend/cloudflow into main
franciscolopezsancho Jan 17, 2022
013a451
Update current documentation to dev
Jan 24, 2022
17f1e14
Removed Spark and Flink docs.
RayRoestenburg Jan 25, 2022
2a2dba3
Removed docs related to Spark and Flink.
RayRoestenburg Jan 25, 2022
2f79112
Fix in example.
RayRoestenburg Jan 25, 2022
fbaac41
Fix for docs.
RayRoestenburg Jan 25, 2022
f9d53ff
Fixes for docs.
RayRoestenburg Jan 25, 2022
248c15a
Fix for docs.
RayRoestenburg Jan 25, 2022
bad512f
Format.
RayRoestenburg Jan 25, 2022
b299ed8
Fix in example.
RayRoestenburg Jan 25, 2022
2a55680
FOSSA policy check.
RayRoestenburg Jan 26, 2022
9729e46
Try connected car test on another port.
RayRoestenburg Jan 26, 2022
e99da13
Bumped junit to 4.13.1.
RayRoestenburg Jan 26, 2022
13bbbff
Runner should not depend on blueprint. (#1125)
RayRoestenburg Jan 26, 2022
7a1b0ac
scalapb to 0.11.8. (#1126)
RayRoestenburg Jan 26, 2022
cd462bf
Removed out of data comment.
RayRoestenburg Jan 26, 2022
1f4ca05
Merge branch 'main' of github.com:lightbend/cloudflow into main
franciscolopezsancho Jan 26, 2022
6971d86
Moved FOSSA checks to push to main.
RayRoestenburg Jan 26, 2022
0ed17ed
Fix push yaml.
RayRoestenburg Jan 26, 2022
94b58d3
Fix.
RayRoestenburg Jan 26, 2022
5c62c05
Fix...
RayRoestenburg Jan 26, 2022
ad33762
Fix.....
RayRoestenburg Jan 26, 2022
a9ef861
Fix yaml.
RayRoestenburg Jan 26, 2022
21b87b1
New FOSSA.
RayRoestenburg Jan 26, 2022
ac53960
Back to previous FOSSA.
RayRoestenburg Jan 26, 2022
15eb766
Bumped fabric8 kubernetesclient to 5.0.2. (#1128)
RayRoestenburg Jan 26, 2022
7212ffc
Merge branch 'main' of github.com:lightbend/cloudflow into main
franciscolopezsancho Jan 27, 2022
c52cf9c
new line to trigger build
sebastian-alfers Feb 1, 2022
798684f
remove new line
sebastian-alfers Feb 1, 2022
ec7768f
Merge branch 'main' of github.com:lightbend/cloudflow into main
franciscolopezsancho Feb 2, 2022
a76810c
removing Flink and Spark referecences
franciscolopezsancho Feb 23, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions .github/release-drafter.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ categories:
labels:
- 'streamlets'
- 'akka'
- 'flink'
- 'spark'
- title: ':book: Documentation'
labels:
- 'kind/documentation'
Expand Down
18 changes: 18 additions & 0 deletions .github/workflows/push.yml
Original file line number Diff line number Diff line change
Expand Up @@ -82,3 +82,21 @@ jobs:

- name: test-maven-examples
run: ./scripts/build-mvn-examples.sh test

fossa-checks:
name: "FOSSA checks"
runs-on: ubuntu-20.04
steps:

- uses: actions/checkout@v2
with:
fetch-depth: 0

- name: FOSSA policy check
if: ${{ github.event_name != 'pull_request' }}
run: |-
cd core
curl -H 'Cache-Control: no-cache' https://raw.githubusercontent.com/fossas/spectrometer/master/install.sh | bash
fossa analyze && fossa test
env:
FOSSA_API_KEY: "${{secrets.FOSSA_API_KEY}}"
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
</p>
Cloudflow enables users to quickly develop, orchestrate, and operate distributed streaming applications on Kubernetes.
Cloudflow allows you to easily break down your streaming application to smaller composable components and wire them together with schema-based contracts.
Cloudflow integrates with popular streaming engines like Akka, Spark and Flink. It also comes with a powerful CLI tool to easily manage, scale and configure your streaming applications at runtime.
It also comes with a powerful CLI tool to easily manage, scale and configure your streaming applications at runtime.
With its powerful abstractions, Cloudflow allows to define, build and deploy the most complex streaming applications.

- Develop: Focus only on business logic, leave the boilerplate to us.
Expand All @@ -18,7 +18,7 @@ As data pipelines become first-class citizens in microservices architectures, Cl
In a nutshell, Cloudflow is an application development toolkit comprising:

- An API definition for `Streamlet`, the core abstraction in Cloudflow.
- An extensible set of runtime implementations for `Streamlet`(s). Cloudflow provides support for popular streaming runtimes, like Spark's Structured Streaming, Flink, and Akka.
- An extensible set of runtime implementations for `Streamlet`(s).
- A `Streamlet` composition model driven by a `blueprint` definition.
- A sandbox execution mode that accelerates the development and testing of your applications.
- A set of `sbt` plugins that are able to package your application into a deployable container.
Expand Down Expand Up @@ -53,7 +53,7 @@ The underlying data streams are partitioned to allow for parallelism in a distri

The `Streamlet` logic can be written using an extensible choice of streaming runtimes, such as Akka Streams and Spark.
The lightweight API exposes the raw power of the underlying runtime and its libraries while providing a higher-level abstraction for composing `streamlets` and expressing data schemas.
Your code is written in your familiar Structured Streaming, Flink, or Akka Streams native API.
Your code is written in your familiar API.

Applications are deployed as a whole. Cloudflow takes care of deploying the individual `streamlets` and making sure connections get translated into data flowing between them at runtime.

Expand Down
2 changes: 1 addition & 1 deletion core/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ lazy val cloudflowAkkaTests =
lazy val cloudflowRunner =
Project(id = "cloudflow-runner", base = file("cloudflow-runner"))
.enablePlugins(BuildInfoPlugin, ScalafmtPlugin)
.dependsOn(cloudflowStreamlets, cloudflowBlueprint)
.dependsOn(cloudflowStreamlets)
.settings(
scalaVersion := Dependencies.Scala212,
crossScalaVersions := Vector(Dependencies.Scala212, Dependencies.Scala213),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ final case class ConfigureExecution(c: Configure, client: KubeClient, logger: Cl
cloudflowConfig,
() => client.getKafkaClusters(namespace = c.operatorNamespace).map(parseValues))

uid <- client.uidCloudflowApp(currentCr.spec.appId, namespace)
uid <- client.uidCloudflowApp(currentCr.getSpec.appId, namespace)
_ <- client.configureCloudflowApp(
currentCr.spec.appId,
currentCr.getSpec.appId,
namespace,
uid,
configStr,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ final case class DeployExecution(d: Deploy, client: KubeClient, logger: CliLogge
import DeployExecution._

private def applicationDescriptorValidation(crApp: App.Cr): Try[Unit] = {
crApp.spec.version match {
crApp.getSpec.version match {
case None =>
Failure(CliException("Application file parse error: spec.version is missing or empty"))

Expand All @@ -42,7 +42,7 @@ final case class DeployExecution(d: Deploy, client: KubeClient, logger: CliLogge
case _ => Failure(CliException("Application file parse error: spec.version is invalid"))
}
libraryVersion <- Try {
val libraryVersion = crApp.spec.libraryVersion.get
val libraryVersion = crApp.getSpec.libraryVersion.get
require { !libraryVersion.contains(' ') }
libraryVersion
}.recoverWith {
Expand Down Expand Up @@ -77,7 +77,7 @@ final case class DeployExecution(d: Deploy, client: KubeClient, logger: CliLogge
}

private def referencedKafkaSecretExists(appCr: App.Cr, kafkaClusters: () => Try[List[String]]): Try[Unit] = {
val expectedClusters = appCr.spec.deployments.flatMap(_.portMappings.values.map(_.cluster)).flatten.distinct
val expectedClusters = appCr.getSpec.deployments.flatMap(_.portMappings.values.map(_.cluster)).flatten.distinct

if (expectedClusters.nonEmpty) {
(for {
Expand All @@ -95,11 +95,11 @@ final case class DeployExecution(d: Deploy, client: KubeClient, logger: CliLogge
}

private def getImageReference(crApp: App.Cr) = {
if (crApp.spec.deployments.size < 1) {
if (crApp.getSpec.deployments.size < 1) {
Failure(CliException("The application specification doesn't contains deployments"))
} else {
// Get the first available image, all images must be present in the same repository.
val imageRef = crApp.spec.deployments(0).image
val imageRef = crApp.getSpec.deployments(0).image

Image(imageRef)
}
Expand All @@ -115,14 +115,14 @@ final case class DeployExecution(d: Deploy, client: KubeClient, logger: CliLogge
baseApplicationCr <- loadCrFile(d.crFile)
localApplicationCr = {
d.serviceAccount match {
case Some(sa) => baseApplicationCr.copy(spec = baseApplicationCr.spec.copy(serviceAccount = Some(sa)))
case Some(sa) => baseApplicationCr.copy(_spec = baseApplicationCr.getSpec.copy(serviceAccount = Some(sa)))
case _ => baseApplicationCr
}
}
namespace = d.namespace.getOrElse(localApplicationCr.spec.appId)
namespace = d.namespace.getOrElse(localApplicationCr.getSpec.appId)

// update the replicas
currentAppCr <- client.readCloudflowApp(localApplicationCr.spec.appId, namespace)
currentAppCr <- client.readCloudflowApp(localApplicationCr.getSpec.appId, namespace)
clusterReplicas = getStreamletsReplicas(currentAppCr)
clusterApplicationCr <- updateReplicas(localApplicationCr, clusterReplicas)
applicationCrReplicas <- updateReplicas(clusterApplicationCr, d.scales)
Expand Down Expand Up @@ -154,7 +154,7 @@ final case class DeployExecution(d: Deploy, client: KubeClient, logger: CliLogge
})

// Operations on the cluster
name = applicationCr.spec.appId
name = applicationCr.getSpec.appId
_ <- client.createNamespace(namespace)
_ <- {
if (d.noRegistryCredentials) Success(())
Expand All @@ -166,7 +166,7 @@ final case class DeployExecution(d: Deploy, client: KubeClient, logger: CliLogge
dockerPassword = d.dockerPassword)
}
}
uid <- client.createCloudflowApp(applicationCr.spec, namespace)
uid <- client.createCloudflowApp(applicationCr.getSpec, namespace)
_ <- client.configureCloudflowApp(name, namespace, uid, configStr, logbackContent, streamletsConfigs)
} yield {
logger.trace("Command Deploy executed successfully")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ trait WithConfiguration {

private def validateConfiguredStreamlets(crApp: App.Cr, cloudflowConfig: CloudflowConfig.CloudflowRoot): Try[Unit] = {
val configStreamlets = cloudflowConfig.cloudflow.streamlets.keys.toSeq.distinct
val crStreamlets = crApp.spec.streamlets.map(_.name).toSeq.distinct
val crStreamlets = crApp.getSpec.streamlets.map(_.name).toSeq.distinct

configStreamlets.diff(crStreamlets) match {
case Nil => Success(())
Expand All @@ -99,7 +99,7 @@ trait WithConfiguration {

private def validateTopicIds(crApp: App.Cr, cloudflowConfig: CloudflowConfig.CloudflowRoot): Try[Unit] = {
val configTopics = cloudflowConfig.cloudflow.topics.keys.toSeq.distinct
val crTopics = crApp.spec.deployments.flatMap(_.portMappings.values.map(_.id)).distinct
val crTopics = crApp.getSpec.deployments.flatMap(_.portMappings.values.map(_.id)).distinct

configTopics.diff(crTopics) match {
case Nil => Success(())
Expand Down Expand Up @@ -161,7 +161,7 @@ trait WithConfiguration {

def validateConfigParameters(crApp: App.Cr, cloudflowConfig: CloudflowConfig.CloudflowRoot): Try[Unit] = {
Try {
crApp.spec.streamlets.foreach { streamlet =>
crApp.getSpec.streamlets.foreach { streamlet =>
cloudflowConfig.cloudflow.streamlets.get(streamlet.name).foreach { s =>
val configParameters = streamlet.descriptor.configParameters
val detectedConfigParameters =
Expand Down Expand Up @@ -212,16 +212,16 @@ trait WithConfiguration {
for {
userConfig <- tryUserConfig
cloudflowConfig <- CloudflowConfig.loadAndValidate(userConfig)
defaultConfig = CloudflowConfig.defaultConfig(appCr.spec)
defaultMounts = CloudflowConfig.defaultMountsConfig(appCr.spec, allowedRuntimes = List("flink", "spark"))
defaultConfig = CloudflowConfig.defaultConfig(appCr.getSpec)
defaultMounts = CloudflowConfig.defaultMountsConfig(appCr.getSpec, allowedRuntimes = List("flink", "spark"))
config = userConfig
.withFallback(CloudflowConfig.writeConfig(defaultConfig))
.withFallback(CloudflowConfig.writeConfig(defaultMounts))
.withFallback {
loggingConfig match {
case Some(s) =>
CloudflowConfig.writeConfig(
CloudflowConfig.loggingMountsConfig(appCr.spec, s"${MurmurHash3.stringHash(s)}"))
CloudflowConfig.loggingMountsConfig(appCr.getSpec, s"${MurmurHash3.stringHash(s)}"))
case None => ConfigFactory.empty()
}
}
Expand Down Expand Up @@ -327,7 +327,7 @@ trait WithConfiguration {

val allReferencedClusters = {
appConfig.cloudflow.topics.flatMap { case (_, topic) => topic.cluster } ++
appCr.spec.deployments.flatMap(_.portMappings.values.flatMap(_.cluster)) ++
appCr.getSpec.deployments.flatMap(_.portMappings.values.flatMap(_.cluster)) ++
Seq(DefaultConfigurationName)
}

Expand All @@ -345,10 +345,10 @@ trait WithConfiguration {
}.toMap
}
res <- Try {
appCr.spec.deployments.map { deployment =>
appCr.getSpec.deployments.map { deployment =>
val streamletConfig = CloudflowConfig.streamletConfig(deployment.streamletName, deployment.runtime, appConfig)
val streamletWithPortMappingsConfig = portMappings(deployment, appConfig, streamletConfig, clustersConfig)
val applicationConfig = applicationRunnerConfig(appCr.name, appCr.spec.appVersion, deployment)
val applicationConfig = applicationRunnerConfig(appCr.name, appCr.getSpec.appVersion, deployment)
deployment -> StreamletConfigs(
streamlet = streamletWithPortMappingsConfig,
runtime = CloudflowConfig.runtimeConfig(deployment.streamletName, deployment.runtime, appConfig),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ trait WithUpdateReplicas {
appCr
.map { app =>
(for {
streamlet <- app.spec.deployments
streamlet <- app.getSpec.deployments
} yield {
streamlet.replicas match {
case Some(r) => Some(streamlet.streamletName -> r)
Expand All @@ -27,18 +27,18 @@ trait WithUpdateReplicas {
}

def updateReplicas(crApp: App.Cr, replicas: Map[String, Int]): Try[App.Cr] = {
val allStreamlets = crApp.spec.deployments.map { streamlet => streamlet.streamletName }.distinct
val allStreamlets = crApp.getSpec.deployments.map { streamlet => streamlet.streamletName }.distinct

(replicas.keys.toList.distinct.diff(allStreamlets)) match {
case Nil =>
val clusterDeployments = crApp.spec.deployments.map { streamlet =>
val clusterDeployments = crApp.getSpec.deployments.map { streamlet =>
streamlet.streamletName match {
case sname if replicas.contains(sname) =>
streamlet.copy(replicas = Some(replicas(sname)))
case _ => streamlet
}
}
val res = crApp.copy(spec = crApp.spec.copy(deployments = clusterDeployments))
val res = crApp.copy(_spec = crApp.getSpec.copy(deployments = clusterDeployments))
Success(res)
case missings =>
Failure(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ trait WithUpdateVolumeMounts {
for {
streamletVolumeNameToPvc <- streamletVolumeNameToPvcMap(crApp, volumeMountsArgs, pvcs)
_ <- missingStreamletVolumeMountNames(crApp, streamletVolumeNameToPvc.keys)
} yield crApp.copy(spec = crApp.spec.copy(
} yield crApp.copy(_spec = crApp.getSpec.copy(
deployments = updatedDeployments(crApp, streamletVolumeNameToPvc),
streamlets = updatedStreamlets(crApp, streamletVolumeNameToPvc)))
}
Expand All @@ -44,11 +44,11 @@ trait WithUpdateVolumeMounts {
}
val streamletName = parts(0)
val volumeMountName = parts(1)
if (crApp.spec.deployments.find(_.streamletName == streamletName).isEmpty) {
if (crApp.getSpec.deployments.find(_.streamletName == streamletName).isEmpty) {
throw new CliException(s"Cannot find streamlet '$streamletName' in --volume-mount argument")
}

if (crApp.spec.deployments
if (crApp.getSpec.deployments
.filter(_.streamletName == streamletName)
.flatMap(_.volumeMounts)
.find(_.name == volumeMountName)
Expand Down Expand Up @@ -82,15 +82,15 @@ trait WithUpdateVolumeMounts {
}

private def streamletVolumeMountNamesInCr(crApp: App.Cr): Set[(String, String)] = {
crApp.spec.deployments
crApp.getSpec.deployments
.flatMap(deployment => deployment.volumeMounts.map(vm => deployment.streamletName -> vm.name))
.toSet
}

private def updatedDeployments(
crApp: App.Cr,
streamletVolumeNameToPvc: Map[(String, String), String]): Seq[App.Deployment] = {
crApp.spec.deployments.map { deployment =>
crApp.getSpec.deployments.map { deployment =>
deployment.copy(volumeMounts = deployment.volumeMounts.map { vmd =>
streamletVolumeNameToPvc
.get((deployment.streamletName, vmd.name))
Expand All @@ -103,7 +103,7 @@ trait WithUpdateVolumeMounts {
private def updatedStreamlets(
crApp: App.Cr,
streamletVolumeNameToPvc: Map[(String, String), String]): Seq[App.Streamlet] = {
crApp.spec.streamlets.map { streamlet =>
crApp.getSpec.streamlets.map { streamlet =>
streamlet.copy(descriptor = streamlet.descriptor.copy(volumeMounts = streamlet.descriptor.volumeMounts.map {
vmd =>
streamletVolumeNameToPvc
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,17 +178,17 @@ class KubeClientFabric8(
.find(_.getMetadata.getName == appName)
.getOrElse(throw CliException(s"""Cloudflow application "${appName}" not found"""))

val appStatus: String = Try(app.status.appStatus).toOption.getOrElse("Unknown")
val appStatus: String = Try(app.getStatus.appStatus).toOption.getOrElse("Unknown")

val res = models.ApplicationStatus(
summary = getCRSummary(app),
status = appStatus,
// FIXME, remove in a breaking CRD change, the endpoint statuses are not updated anymore.
endpointsStatuses = Try(app.status.endpointStatuses).toOption
endpointsStatuses = Try(app.getStatus.endpointStatuses).toOption
.filterNot(_ == null)
.map(_.map(getEndpointStatus))
.getOrElse(Seq.empty),
streamletsStatuses = Try(app.status.streamletStatuses).toOption
streamletsStatuses = Try(app.getStatus.streamletStatuses).toOption
.filterNot(_ == null)
.map(_.map(getStreamletStatus))
.getOrElse(Seq.empty))
Expand Down Expand Up @@ -512,7 +512,7 @@ class KubeClientFabric8(
endpointStatuses = Seq(),
streamletStatuses = Seq())

App.Cr(spec = spec, metadata = metadata, status = status)
App.Cr(_spec = spec, _metadata = metadata, _status = status)
}

private def createCFApp(spec: App.Spec, namespace: String): Try[String] =
Expand Down Expand Up @@ -580,7 +580,7 @@ class KubeClientFabric8(
Try {
cloudflowApps
.inNamespace(namespace)
.withName(app.spec.appId)
.withName(app.getSpec.appId)
// NOTE: Patch doesn't work
//.patch(app)
.replace(app)
Expand Down Expand Up @@ -663,7 +663,7 @@ private object ModelConversions {
models.CRSummary(
name = app.name,
namespace = app.namespace,
version = app.spec.appVersion,
version = app.getSpec.appVersion,
creationTime = app.getMetadata.getCreationTimestamp)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ class ConfigValidationSpec extends AnyFlatSpec with Matchers with TryValues {

def crWithCPDescriptors(cpDescriptors: Seq[App.ConfigParameterDescriptor]) = {
App.Cr(
metadata = null,
spec = App.Spec(
_metadata = null,
_spec = App.Spec(
appId = "",
appVersion = "",
deployments = Seq(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ class WithUpdateVolumesMountsSpec extends AnyFlatSpec with WithUpdateVolumeMount
val streamletName = "my-streamlet"
def crWithVolumeMounts(volumeMounts: Seq[App.VolumeMountDescriptor]) = {
App.Cr(
metadata = null,
spec = App.Spec(
_metadata = null,
_spec = App.Spec(
appId = "",
appVersion = "",
deployments = Seq(
Expand Down Expand Up @@ -83,13 +83,13 @@ class WithUpdateVolumesMountsSpec extends AnyFlatSpec with WithUpdateVolumeMount

// Assert
val volumeMountsInDescriptor =
updatedCr.success.value.spec.streamlets
updatedCr.success.value.getSpec.streamlets
.find(_.name == streamletName)
.map(_.descriptor.volumeMounts)
.getOrElse(Seq())
volumeMountsInDescriptor should contain(expectedVolumeMount)
val volumeMountsInDeployment =
updatedCr.success.value.spec.deployments
updatedCr.success.value.getSpec.deployments
.find(_.streamletName == streamletName)
.map(_.volumeMounts)
.getOrElse(Seq())
Expand Down
Loading