Skip to content
This repository was archived by the owner on Nov 22, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@

package cloudflow.streamlets.avro

import cloudflow.streamlets._
import java.util.Optional
import scala.reflect.ClassTag
import org.apache.avro.specific.SpecificRecordBase

import scala.reflect.ClassTag
import cloudflow.streamlets._
import AvroUtil._

case class AvroInlet[T <: SpecificRecordBase: ClassTag](
Expand All @@ -40,6 +41,18 @@ object AvroInlet {
def create[T <: SpecificRecordBase](name: String, clazz: Class[T]): AvroInlet[T] =
AvroInlet[T](name)(ClassTag.apply(clazz))

// Java API
def create[T <: SpecificRecordBase](name: String, clazz: Class[T], hasUniqueGroupId: Boolean): AvroInlet[T] =
AvroInlet[T](name, hasUniqueGroupId)(ClassTag.apply(clazz))

// Java API
def create[T <: SpecificRecordBase](
name: String,
clazz: Class[T],
hasUniqueGroupId: Boolean,
errorHandler: (Array[Byte], Throwable) => Optional[T]): AvroInlet[T] =
AvroInlet[T](name, hasUniqueGroupId, (a, t) => {
val opt = errorHandler(a, t)
if (opt.isPresent) Some(opt.get()) else None

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this same as returning the opt itself ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's calling the Scala constructor which needs Option, not Optional

})(ClassTag.apply(clazz))
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/

package cloudflow.streamlets.proto.javadsl

import java.util.Optional
import cloudflow.streamlets._
import com.google.protobuf.Descriptors.Descriptor
import com.google.protobuf.{ GeneratedMessageV3, TextFormat }
Expand All @@ -39,10 +39,19 @@ final case class ProtoInlet[T <: GeneratedMessageV3](
}

object ProtoInlet {
// Java API
def create[T <: GeneratedMessageV3](name: String, clazz: Class[T]): ProtoInlet[T] =
ProtoInlet[T](name, clazz)

def create[T <: GeneratedMessageV3](name: String, clazz: Class[T], hasUniqueGroupId: Boolean): ProtoInlet[T] =
ProtoInlet[T](name, clazz, hasUniqueGroupId)

def create[T <: GeneratedMessageV3](
name: String,
clazz: Class[T],
hasUniqueGroupId: Boolean,
errorHandler: (Array[Byte], Throwable) => Optional[T]): ProtoInlet[T] =
ProtoInlet[T](name, clazz, hasUniqueGroupId, (a, t) => {
val opt = errorHandler(a, t)
if (opt.isPresent) Some(opt.get()) else None

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this same as returning the opt itself ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh no, wait, actually this is converted to the Scala API. The constructor takes Option, not Optional.

})
}