Skip to content
This repository was archived by the owner on Nov 22, 2024. It is now read-only.

Disabling checkpointing#971

Merged
franciscolopezsancho merged 8 commits into
masterfrom
disabling-checkpointing
Jan 28, 2021
Merged

Disabling checkpointing#971
franciscolopezsancho merged 8 commits into
masterfrom
disabling-checkpointing

Conversation

@franciscolopezsancho

Copy link
Copy Markdown
Contributor

What changes were proposed in this pull request?

allowing through configuration disable the default checkpointing that Cloudflow sets if not Flink checkpointing configuration is passed

Why are the changes needed?

is a request from a ticket

Does this PR introduce any user-facing change?

yes, the config

How was this patch tested?

unit tests and runLocal

@debasishg

Copy link
Copy Markdown
Contributor

LGTM ..

}

if (!env.getCheckpointConfig.isCheckpointingEnabled()) {
if (!env.getCheckpointConfig.isCheckpointingEnabled() && isDefaultCheckpointingEnabled(config, streamlet)) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How it will look like, from the user's perspective, to explicitly disable: isCheckpointingEnabled ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add documentation

@RayRoestenburg RayRoestenburg Jan 28, 2021

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have found ( I guess) that we could not specifically disable checkpointing through normal flink config (you cannot disable: isCheckpointingEnabled, if that is what you are asking). setting a number lower than 10 (where -1 means off, looking at the code) was not possible. So they can't really directly disable this through Flink. (I'm not sure if I am representing your comment @andreaTP but I read into it like that)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is happening here is that if they have not set any value for checkpointing interval, isCheckpointingEnabled is false.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it now! Thanks for the explanation! 👍
Should we add a mention to this configuration in the relevant doc?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Always good to add detail, and we likely need to document this on the flinkside, though I'm not sure how many will want to disable checkpointing, it is basically what you use flink for, to have stateful processing, IMHO.

val runtimePath = "cloudflow.runtimes.flink.config.cloudflow.checkpointing.default"
val streamletPath = s"cloudflow.streamlet.${streamlet}.config.cloudflow.checkpointing.default"
if (config.hasPath(runtimePath) && config.getBoolean(runtimePath) == false) return false
if (config.hasPath(streamletPath) && config.getBoolean(streamletPath) == false) return false

@andreaTP andreaTP Jan 28, 2021

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this condition doesn't exactly cover the use case e.g.:

  • "cloudflow.runtimes.flink.config.cloudflow.checkpointing.default" = false
  • "cloudflow.streamlet.my-streamlet.config.cloudflow.checkpointing.default" = true
    will return false for the streamlet my-streamlet instead of true (e.g. the streamlet configuration has to be "more specific" and override the runtime config)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

great catch! I'll fix it.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor suggestion here, since you are at it, can I ask if you can use a single if - else if - else structure (or even a match case) instead of using early returns? It will make the code easier to reason about and more "scala-idiomatic" 🙂

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added "find checkpointing is disabled for streamlet overriding runtimes being enabled"

}

"find checkpointing is disabled by runtime" in {
val config = ConfigFactory.parseString("cloudflow.runtimes.flink.config.cloudflow.checkpointing.default = false")

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hocon is so nice to also support "off" instead of false :-)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice, I rather use that

franciscolopezsancho and others added 4 commits January 28, 2021 14:20
…kStreamletConfigSpec.scala

Co-authored-by: Raymond Roestenburg <raymond.roestenburg@gmail.com>
…kStreamletConfigSpec.scala

Co-authored-by: Raymond Roestenburg <raymond.roestenburg@gmail.com>
…kStreamletConfigSpec.scala

Co-authored-by: Raymond Roestenburg <raymond.roestenburg@gmail.com>
def isDefaultCheckpointingEnabled(config: Config, streamlet: String): Boolean = {
val runtimePath = "cloudflow.runtimes.flink.config.cloudflow.checkpointing.default"
val streamletPath = s"cloudflow.streamlet.${streamlet}.config.cloudflow.checkpointing.default"
if (config.hasPath(streamletPath)) return config.getBoolean(streamletPath)

@andreaTP andreaTP Jan 28, 2021

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please 🙏 🙂

if (config.hasPath(streamletPath)) {
  config.getBoolean(streamletPath)
} else if (config.hasPath(runtimePath)) {
  config.getBoolean(runtimePath)
} else {
   true
}

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

of course! Sorry didn't read it the first time

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done now

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks!

@franciscolopezsancho franciscolopezsancho merged commit 8c98409 into master Jan 28, 2021
@franciscolopezsancho franciscolopezsancho deleted the disabling-checkpointing branch January 28, 2021 17:02
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants