RaftKV
- RaftKV对上层提供了
async_snapshot和async_write异步读写接口- RaftKV使用RaftStoreRouter将propose(读写请求)发送给region的peer来处理请求。
- RaftKV中和raft相关部分代码封装在PeerStorage中。
- RaftKV存储Engine分两种,一个负责存储key,value,一个负责raft log存储.
Engines
RaftKV 的存储分两种,一个为负责存储state machine的key, value, 对应于模板参数EK,
其实现为RocksEngine,
另一个负责存储raft log, 对应于模板参数ER,其实现为RocksEngine或者RaftLogEngine.
RaftLogEngine是一个单独的repo,对raft log存储做了优化。
A WAL-is-data engine that used to store multi-raft log
在初始化调用run_tikv函数时,会根据配置config.raft_engine.enable来决定
是否采用RaftLogEngine来存储raft log日志
#![allow(unused)] fn main() { pub fn run_tikv(config: TiKvConfig) { //other code... if !config.raft_engine.enable { run_impl!(RocksEngine) } else { run_impl!(RaftLogEngine) } } }
关键数据结构关系如下:
RaftRouter
根据region_id将RaftCmdRequest消息发送到对应的PeerFSM, 由RaftPoller线程池来
批量的处理消息,处理消息时候,先将写操作写入write batch,在这一批处理完毕后
再将整个write batch写入底层的RaftLogEngine或者RocksEngine, 这样降低了IO频率
, 提高了性能。
Normals Hashmap的初始化和batchSystem的机制,详见后面的BatchSystem相关代码分析。
PeerStorage
PeerStorage 使用raftlog和kv engine, 实现了Raft-rs中的Storage trait接口。
#![allow(unused)] fn main() { pub trait Storage { fn initial_state(&self) -> Result<RaftState>; fn entries(&self, low: u64, high: u64, max_size: impl Into<Option<u64>>) -> Result<Vec<Entry>>; fn term(&self, idx: u64) -> Result<u64>; fn first_index(&self) -> Result<u64>; fn last_index(&self) -> Result<u64>; fn snapshot(&self, request_index: u64) -> Result<Snapshot>; } }
Raft的log entries,raft state, apply state写入流程如下:
- 先调用PeerFsmDelegate
handle_msgs,将RaftCmdRequest 发给raft_group - collect ready调用
raft_group.ready,获取需要保存的log entries PeerStorage::handle_raft_ready将log entries, raft state, apply state等信息写到write batch中RaftPoller::end将write batch写入磁盘中,然后PeerStorage::post_ready更改raft_state,apply_state等状态
读写队列
每个raft region的异步读写队列,存放在Peer中。
调用Peer::propose 处理RaftCmdRequest时,会同时传入一个callback.
Peer会将根据request类型,将request,callback打包在一起放入等待队列中。
对于读请求,会放在ReadIndexQueue,写请求则放入ProposalQueue
#![allow(unused)] fn main() { pub struct Peer<EK, ER> where EK: KvEngine, ER: RaftEngine, { /// The Raft state machine of this Peer. pub raft_group: RawNode<PeerStorage<EK, ER>>, pending_reads: ReadIndexQueue<EK::Snapshot>, proposals: ProposalQueue<EK::Snapshot>, //... } }
ReadIndexQueue
ReadIndex 大致流程如下:
- 将ReadIndex request和callback放入ReadIndexQueue中,request会生成一个uuid::u64作为Id, 来标识这个request.
- 带上生成的uuid, 调用
raft_group的read_index方法 apply_reads处理raft_group.ready()返回的ready.read_states- 根据uuid从队列中找到对应的callback, 调用callback.(TODO: 这块逻辑好像不是这样的)
ProposalQueue
在向Raft group propose之后,会调用Callback的invoke_proposed,
Raft ready 之后log entries commited 之后,会回调Callback的invoke_committed
然后将cb 包在Apply中,发送apply task给ApplyFsm.
ApplyFsm在修改写入底层kv engine后,会回调callback的invoke_all