Skip to content
This repository was archived by the owner on Nov 22, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ object ApplicationDescriptor {
}

ApplicationDescriptor(sanitizedApplicationId, appVersion, namedStreamletDescriptors.map {
case (_, instance) => instance
case (_, instance) =>
StreamletInstance(instance.name, sanitizeDescriptor(instance.descriptor))
}, deployments, agentPaths, Version, libraryVersion)
}

Expand All @@ -77,6 +78,17 @@ object ApplicationDescriptor {
}.toMap
private def streamletToNamedStreamletDescriptor(streamlet: VerifiedStreamlet) =
(streamlet, StreamletInstance(streamlet.name, streamlet.descriptor))

/**
* Deletes every schema
* StreamletDescriptor.[inlets | outlets].SchemaDescriptor.schema
* to avoid adding the description of each type of the schema in the CR
*/
private def sanitizeDescriptor(descriptor: StreamletDescriptor): StreamletDescriptor = {
val sanitizedInlets = descriptor.inlets.map(each => each.copy(schema = each.schema.copy(schema = "")))
val sanitizedOutlets = descriptor.outlets.map(each => each.copy(schema = each.schema.copy(schema = "")))
descriptor.copy(inlets = sanitizedInlets, outlets = sanitizedOutlets)
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,44 @@ class ApplicationDescriptorSpec
processor2Deployment.portMappings.map { case (port, sp) => port -> sp.id } must contain("out" -> "foos2")
}

"from a verified blueprint clean the schema from the CR" in {
Given("a verified blueprint")
val ingress = randomStreamlet().asIngress[Foo].withServerAttribute
val processor = randomStreamlet().asProcessor[Foo, Bar].withRuntime("spark")
val egress = randomStreamlet().asEgress[Bar]

val ingressRef = ingress.ref("ingress")
val processorRef = processor.ref("processor")
val egressRef = egress.ref("egress")

val verifiedBlueprint = Blueprint()
.define(Vector(ingress, processor, egress))
.use(ingressRef)
.use(processorRef)
.use(egressRef)
.connect(BTopic("foos"), ingressRef.out, processorRef.in)
.connect(BTopic("bars1"), processorRef.out, egressRef.in)
.verified
.right
.value

When("I create a deployment descriptor from that blueprint")
val appId = "noisy-nissan-42"
val appVersion = "1-2345678"
val image = "image-1"
val descriptor = ApplicationDescriptor(appId, appVersion, image, verifiedBlueprint, agentPaths, BuildInfo.version)

Then("the descriptor must be valid")
descriptor.deployments.size mustBe 3
Then("the description of the streamlets should not contain the SchemaDescriptor.schema populated")
val schemas = for {
streamlet <- descriptor.streamlets
port <- streamlet.descriptor.inlets ++ streamlet.descriptor.outlets
} yield port.schema.schema
schemas mustBe Vector("", "", "", "")

}

"be built correctly from a verified blueprint (with dual-inlet merging)" in {
Given("a verified blueprint")
val ingress1 = randomStreamlet().asIngress[Foo].withServerAttribute
Expand Down