RaftClient

trait Transport


#![allow(unused)]
fn main() {
/// Transports messages between different Raft peers.
pub trait Transport: Send + Clone {
    fn send(&mut self, msg: RaftMessage) -> Result<()>;

    fn need_flush(&self) -> bool;

    fn flush(&mut self);
}
}

raft client使用方式如下,先send 将消息放入队列中,最后flush,才真正的发送消息。


#![allow(unused)]
fn main() {
/// A raft client that can manages connections correctly.
///
/// A correct usage of raft client is:
///
/// ```text
/// for m in msgs {
///     if !raft_client.send(m) {
///         // handle error.   
///     }
/// }
/// raft_client.flush();
/// ```
}

ServerTransport

connection pool

connection builder

RaftClient的创建

主要函数调用流程

send

先从LRUCache 中获取(store_id, conn_id)对应的Queue,如果成功, 则向 Queue中push raftMessage, 如果push消息时返回Full错误,就调用notify, 通知RaftCall 去pop Queue消息, 将消息发送出去。

如果LRUCache中没有,则向Connection Pool中获取,如果获取还失败的话,则创建一个。

最后在future pool中执行start,

load_stream

start

start会异步的调用PdStoreAddrResolver去resolve store_id的addr, 然后创建连接。

调用batch_call 新建一个RaftCall. RaftCall被poll时会不断的去Queue中pop 消息, 并通过grpc stream将消息发出去。

由于包含snap的Message太大,会有send_snapshot_sock专门处理

resolve: store addr解析

TiKVServer::init时,store addr resolve worker,会在background yatp 线程池中执行。 调用者使用PdStoreAddrResolver来向add resolver线程 发消息。它创建流程如下:

Resolve流程如下:addr-resolver worker收到消息后,先本地cache中看查看有没有store 的addr,如果没有或者 已经过期了,就调用PdClient的get_store方法,获取store的addr地址。

成功后回调task_cb函数,在该回调函数中会触发oneshot_channel, StreamBackEnd::resolve 接着执行 await resolve后边代码。

snapshot 发送和接收

send_snap

包含snap的RaftMessage消息体比较大,将由snap-handler worker来发送.

snap-handler的worker创建和启动流程如下:

send_snapshot_sock 使用scheduler的tx,向snap-handler worker 发送SnapTask::Send Task, snap-handler worker 调用send_snap创建 发送snap的异步任务,然后在上面创建的Tokio 线程池Runtime中执行。

send_snap会去snap manager获取snapshot 构造一个SnapChunk 然后创建和peer所在store addr的grpc connection channel,使用snapshotgrpc调用 将SnapChunk数据发送给peer.

SnapChunk实现了Stream trait, 在poll_next中调用read_exact一块块的将snap数据发出去。

recv_snap

broadcast_unreachable

store_id消息失败, 向自己所有region广播store unreachable消息

参考

  1. Snapshot 的发送和接收

draft