Disabling checkpointing#971
Conversation
|
LGTM .. |
| } | ||
|
|
||
| if (!env.getCheckpointConfig.isCheckpointingEnabled()) { | ||
| if (!env.getCheckpointConfig.isCheckpointingEnabled() && isDefaultCheckpointingEnabled(config, streamlet)) { |
There was a problem hiding this comment.
How it will look like, from the user's perspective, to explicitly disable: isCheckpointingEnabled ?
There was a problem hiding this comment.
I'll add documentation
There was a problem hiding this comment.
We have found ( I guess) that we could not specifically disable checkpointing through normal flink config (you cannot disable: isCheckpointingEnabled, if that is what you are asking). setting a number lower than 10 (where -1 means off, looking at the code) was not possible. So they can't really directly disable this through Flink. (I'm not sure if I am representing your comment @andreaTP but I read into it like that)
There was a problem hiding this comment.
What is happening here is that if they have not set any value for checkpointing interval, isCheckpointingEnabled is false.
There was a problem hiding this comment.
got it now! Thanks for the explanation! 👍
Should we add a mention to this configuration in the relevant doc?
There was a problem hiding this comment.
Always good to add detail, and we likely need to document this on the flinkside, though I'm not sure how many will want to disable checkpointing, it is basically what you use flink for, to have stateful processing, IMHO.
| val runtimePath = "cloudflow.runtimes.flink.config.cloudflow.checkpointing.default" | ||
| val streamletPath = s"cloudflow.streamlet.${streamlet}.config.cloudflow.checkpointing.default" | ||
| if (config.hasPath(runtimePath) && config.getBoolean(runtimePath) == false) return false | ||
| if (config.hasPath(streamletPath) && config.getBoolean(streamletPath) == false) return false |
There was a problem hiding this comment.
I think this condition doesn't exactly cover the use case e.g.:
- "cloudflow.runtimes.flink.config.cloudflow.checkpointing.default" =
false - "cloudflow.streamlet.my-streamlet.config.cloudflow.checkpointing.default" =
true
will returnfalsefor the streamletmy-streamletinstead oftrue(e.g. the streamlet configuration has to be "more specific" and override the runtime config)
There was a problem hiding this comment.
great catch! I'll fix it.
There was a problem hiding this comment.
minor suggestion here, since you are at it, can I ask if you can use a single if - else if - else structure (or even a match case) instead of using early returns? It will make the code easier to reason about and more "scala-idiomatic" 🙂
There was a problem hiding this comment.
added "find checkpointing is disabled for streamlet overriding runtimes being enabled"
| } | ||
|
|
||
| "find checkpointing is disabled by runtime" in { | ||
| val config = ConfigFactory.parseString("cloudflow.runtimes.flink.config.cloudflow.checkpointing.default = false") |
There was a problem hiding this comment.
hocon is so nice to also support "off" instead of false :-)
There was a problem hiding this comment.
nice, I rather use that
…kStreamletConfigSpec.scala Co-authored-by: Raymond Roestenburg <raymond.roestenburg@gmail.com>
…kStreamletConfigSpec.scala Co-authored-by: Raymond Roestenburg <raymond.roestenburg@gmail.com>
…kStreamletConfigSpec.scala Co-authored-by: Raymond Roestenburg <raymond.roestenburg@gmail.com>
| def isDefaultCheckpointingEnabled(config: Config, streamlet: String): Boolean = { | ||
| val runtimePath = "cloudflow.runtimes.flink.config.cloudflow.checkpointing.default" | ||
| val streamletPath = s"cloudflow.streamlet.${streamlet}.config.cloudflow.checkpointing.default" | ||
| if (config.hasPath(streamletPath)) return config.getBoolean(streamletPath) |
There was a problem hiding this comment.
Please 🙏 🙂
if (config.hasPath(streamletPath)) {
config.getBoolean(streamletPath)
} else if (config.hasPath(runtimePath)) {
config.getBoolean(runtimePath)
} else {
true
}There was a problem hiding this comment.
of course! Sorry didn't read it the first time
There was a problem hiding this comment.
done now
What changes were proposed in this pull request?
allowing through configuration disable the default checkpointing that Cloudflow sets if not Flink checkpointing configuration is passed
Why are the changes needed?
is a request from a ticket
Does this PR introduce any user-facing change?
yes, the config
How was this patch tested?
unit tests and runLocal