Skip to content

[FLINK-39018][checkpoint] Support checkpoint for LocalInputChannel#27861

Open
1996fanrui wants to merge 5 commits intoapache:masterfrom
1996fanrui:39018/support-checkpoint-for-localinputchannel
Open

[FLINK-39018][checkpoint] Support checkpoint for LocalInputChannel#27861
1996fanrui wants to merge 5 commits intoapache:masterfrom
1996fanrui:39018/support-checkpoint-for-localinputchannel

Conversation

@1996fanrui
Copy link
Copy Markdown
Member

@1996fanrui 1996fanrui commented Mar 31, 2026

This PR depends on #27782 and #27783

What is the purpose of the change

[FLINK-39018][checkpoint] Support checkpoint for LocalInputChannel

Brief change log

  • [hotfix][network] Fix LocalInputChannel.getBuffersInUseCount to include toBeConsumedBuffers
  • [FLINK-39018][checkpoint] Support LocalInputChannel checkpoint snapshot for recovered buffers
  • [FLINK-39018][network] Fix LocalInputChannel priority event and buffer availability for recovered buffers
  • [FLINK-39018][checkpoint] Notify PriorityEvent to downstream task even if it is blocked to ensure the checkpoint barrier can be handled by downstream task
  • [FLINK-39018][network] Buffer migration from RecoveredInputChannel to physical channels

Verifying this change

  • Tons of unit tests

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive):no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
  • The S3 file system connector:no

Documentation

  • Does this pull request introduce a new feature? no

@flinkbot
Copy link
Copy Markdown
Collaborator

flinkbot commented Mar 31, 2026

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@1996fanrui 1996fanrui force-pushed the 39018/support-checkpoint-for-localinputchannel branch from cf606db to 4fa25ef Compare March 31, 2026 16:36
@1996fanrui 1996fanrui force-pushed the 39018/support-checkpoint-for-localinputchannel branch from 4fa25ef to b1a7ca7 Compare March 31, 2026 18:37

if (!toBeConsumedBuffers.isEmpty()) {
return getBufferAndAvailability(toBeConsumedBuffers.removeFirst());
// If there is a pending priority event (e.g., unaligned checkpoint barrier), fetch it
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this method has grown too much and should be split into smaller ones

Comment on lines +460 to +462
// When blocked, only allow priority buffers (e.g. unaligned checkpoint barriers)
// to be polled. Data buffers remain blocked until resumeConsumption() is called.
if (isBlocked && buffers.getNumPriorityElements() == 0) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this change from this commit? Could you elaborate a bit more in the commit message?

Comment on lines +552 to +558
// Block the subpartition by consuming an aligned checkpoint barrier
blockSubpartitionByCheckpoint(1);
assertThat(availablityListener.getNumPriorityEvents()).isZero();

// While blocked, add an unaligned checkpoint barrier (priority event).
// Even though isBlocked=true, the priority event notification should NOT
// be suppressed — priority events must bypass blocking.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How can this happen in reality outside of the unit test? 🤔

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants