Merge Region

Merge Region时,PD先将source region和targer Region 的TiKV节点对齐。

Merge流程

理解的关键点

  1. Source region 在向target region 提交CommitMerge前,怎么发现和处理target region发生了变动
  2. source region的on_catch_up_logs_for_mergeon_ready_prepare_merge这两个被调用时序问题。
  3. target和source region之间通过CatchUpLogs中的atomic catch_up_logs,来同步补齐的状态。

相关RaftCmdRequest

在merge region中,主要涉及到的raft cmd为PrePareMergeRequest和CommitMergeRequest

PrepareMergeRequest 将由source region来proposal并执行,在source region执行PrepareMerge时, PeerState为Merging, 并在RaftLocalState中保存了一个MergeState。然后发CommitMergeRequest给本地的target region,

target region把CommitMergeRequest proposal到target region的raft group后, 由target region来执行CommitMerge.

PrepareMerge

Source Region: propose PrepareMerge

Source Region leader在leader收到PrepareMerge请求后,会propose 一条PrepareMerge消息。

propose 之前会做一些检查, 最后会设置PrePareMerge中的min_index参数

在ApplyFsm执行PrepareMerge时,region的epoch和conf_version都会+1, 这样PrepareMerge 之后Proposal的log entry 在Apply时都会被skip掉。 所以soure region在propose PreapreMerge 之后,就不可读写了。

Source Region ApplyDelegate::exec_prepare_merge

将PeerState设置为Merging, 将region epoch的conf_ver和version 都+1

Source Region PeerFsmDelegate::on_ready_prepare_merge

source region raft 在收到ExecResult::PreapreMerge消息之后, 会调用on_ready_prepare_merge 处理该消息。 首先设置了pending_merge_state,在此之后,该region raft 对于proposal(RollbackMerge的除外)请求,会返回Error::ProposalInMergeMode.


#![allow(unused)]
fn main() {
    fn propose_normal<T>(
        &mut self,
        poll_ctx: &mut PollContext<EK, ER, T>,
        mut req: RaftCmdRequest,
    ) -> Result<Either<u64, u64>> {
        if self.pending_merge_state.is_some()
            && req.get_admin_request().get_cmd_type() != AdminCmdType::RollbackMerge
        {
            return Err(Error::ProposalInMergingMode(self.region_id));
        }
}

然调用on_check_merge, 经过一系列检查后, 向本地的target region Propose 一条CommitMergeRequest消息, CommitMergeRequest 带上了source region一些peer要补齐的log entries.

其中比较重要的方法是Peer::validate_merge_peer, 会检查Source的MergeState 中的target region信息 和当前本地target region信息。如果merge state中的比本地的epoch小,则返回错误。

如果比本地的大,则需要等target region epoch 追上后再schedule_merge, 在下一次check merge tick中接着检查。

向本地target region发送AdminCmdType::CommitMerge类型的RaftCmd.


#![allow(unused)]
fn main() {
// Please note that, here assumes that the unit of network isolation is store rather than
// peer. So a quorum stores of source region should also be the quorum stores of target
// region. Otherwise we need to enable proposal forwarding.
self.ctx
    .router
    .force_send(
        target_id,
        PeerMsg::RaftCommand(RaftCommand::new(request, Callback::None)),
    )
    .map_err(|_| Error::RegionNotFound(target_id))
}

处理Schedule Error: RegionNotFound, 以及target region epoch比merge state中的大。

RollbackMerge

RollbackMerge执行后,会将pending_merge_state设置为none, 这样 就停止了on_check_merge, 并且propose_normal也可以正常工作了

RollbackMerge会将region epoch的version +1, 然后通过pd hearbeat 上报给pd server.

CommitMerge

Target Region ApplyDelegate::exec_commit_merge

CommitMerge消息由source region 发给本地的target region后,如果本地 的target region是leader, 则会像正常消息一样propose 到raft group, 如果target region不是leader, 则会slient drop掉该消息。

在target节点执行CommitMerge时,会先发送一个CatchUpLogs消息,给本地的source region 让它把日志补齐,CatchUpLogs里面带了一个logs_up_to_date是个AtomicU64.

如果source region补齐了log, 则会设置logs_up_to_date为自己的region_id

ApplyDelegate::wait_merge_state 也引用了logs_up_to_date,每次resume_pending 都会load logs_up_to_date,如果有值,则会继续重新执行exec_commit_merge.

最后返回结果ExecResult::CommitMerge

等SourceRegion 已经CatchUpLogs后, 会修改atomic logs_up_to_date 从而影响ApplyDelegate::wait_merge_state, 在resume_pending 时重新执行exec_commit_merge

这次会将target region的key range扩大, 增加target region的version, 最后调用 write_peer_state将target region信息保存起来。

Source Region: PeerFsmDelegate::on_catch_up_logs_for_merge

使用CommitMergeRequest中的entries,补齐apply自己本地raft log. ,然后发送LogsUpToDate消息个ApplyFsm。

ApplyFsm中设置atomic 变量CatchUpLogs::logs_up_to_date值为 source_region_id, 然后发Noop消息给target region, 让target region接着处理自己的wait_merge_state

在执行on_catch_up_logs_for_merge时,如果pending_merge_state不为None, 说明source region可能已经过PreapreMerge消息了,直接发送LogsUpToDate消息给applyFsm.


#![allow(unused)]
fn main() {
fn on_catch_up_logs_for_merge(&mut self, mut catch_up_logs: CatchUpLogs) {

if let Some(ref pending_merge_state) = self.fsm.peer.pending_merge_state {
    if pending_merge_state.get_commit() == catch_up_logs.merge.get_commit() {
        assert_eq!(
            pending_merge_state.get_target().get_id(),
            catch_up_logs.target_region_id
        );
        // Indicate that `on_ready_prepare_merge` has already executed.
        // Mark pending_remove because its apply fsm will be destroyed.
        self.fsm.peer.pending_remove = true;
        // Just for saving memory.
        catch_up_logs.merge.clear_entries();
        // Send CatchUpLogs back to destroy source apply fsm,
        // then it will send `Noop` to trigger target apply fsm.
        self.ctx
            .apply_router
            .schedule_task(region_id, ApplyTask::LogsUpToDate(catch_up_logs));
        return;
    }
}
}

同样在执行on_ready_prepare_merge中如果 Peer.catch_up_logs不为None,说明on_catch_up_logs_for_merge 这个先执行的,此时执行时的是被补齐的log中的PrepareMerge消息。

这时候Log已经补齐了,可以ApplyFsm发送LogsUpToDate消息了。


#![allow(unused)]
fn main() {
fn on_ready_prepare_merge(&mut self, region: metapb::Region, state: MergeState) {
//...
    if let Some(ref catch_up_logs) = self.fsm.peer.catch_up_logs {
        if state.get_commit() == catch_up_logs.merge.get_commit() {
            assert_eq!(state.get_target().get_id(), catch_up_logs.target_region_id);
            // Indicate that `on_catch_up_logs_for_merge` has already executed.
            // Mark pending_remove because its apply fsm will be destroyed.
            self.fsm.peer.pending_remove = true;
            // Send CatchUpLogs back to destroy source apply fsm,
            // then it will send `Noop` to trigger target apply fsm.
            self.ctx.apply_router.schedule_task(
                self.fsm.region_id(),
                ApplyTask::LogsUpToDate(self.fsm.peer.catch_up_logs.take().unwrap()),
            );
            return;
        }
    }
}

Target region: PeerFsmDelegate::on_ready_commit_merge

target region的PeerFsm 中更新StoreMeta中regions, readers, region_ranges信息, 删除source_region的,更新target region的

然后发送SignificantMsg::MergeResult消息给source_region.

Source Region: PeerFsmDelegate::on_merge_result

destory source regon PeerFsm和ApplyFsm.

如果ApplyFsm还没被注销的话,发送ApplyTask::destory 先destory ApplyFsm.