raft源碼分析
這篇文章主要是從源碼的級別來看 Raft 算法的實現。在網上找到了一個簡化版: 源碼 .
一個 Server 結構代表 Raft 網絡中的一個 節點 。節點會創建一個 Server ,并且通過 端(peers) 接口的方式暴露給其他節點。
傳輸層采用 http 包裝, 端對端 通信通過 rest http 方式。
|http transport| ---> |peers| ---> |server|
項目簡介
節點的增加和刪除
支持動態增刪節點,采用一個簡單的 共識 算法(節點更新時,接受配置更新的節點需要超過1/2,即新集群要大于舊集群)。
roadmap
- leader選舉
- 日志復制
- 單元測試
- http 傳輸層
- 配置變更
除此之外,還有一些是未完成的
- net/rpc 傳輸層或者其他類型的傳輸層
- 日志壓縮
- 快照安裝以及恢復
- 完整的 demo 應用
- 一些比較復雜的測試用例
具體細節,看下面的代碼分析。源碼分析
源碼目錄結構
├── JOINT-CONSENSUS.md ├── LICENSE ├── README.md ├── configuration.go // 配置 ├── example_test.go // demo ├── log.go // 日志 ├── log_test.go // 日志測試模塊 ├── peers.go // 端 ├── peers_test.go // 端模塊 ├── rpc.go // rpc 對象模塊 ├── server.go // server模塊 ├── server_internals_test.go // server內部測試模塊 ├── server_test.go // server測試模塊 ├── transport.go // 傳輸層 └── transport_test.go // 傳輸層模塊
主要的數據結構
rpc.go
// 日志追加
type appendEntriesTuple struct {
// 日志追加請求
Request appendEntries
// 應答通道
Response chan appendEntriesResponse
}
// 投票選舉
type requestVoteTuple struct {
// 選舉內容
Request requestVote
// 選舉結構應答
Response chan requestVoteResponse
}
// appendEntries represents an appendEntries RPC.
// 日志追加-實體
type appendEntries struct {
// 任期號
Term uint64 `json:"term"`
// leader 標識
LeaderID uint64 `json:"leader_id"`
// 前一個日志索引
PrevLogIndex uint64 `json:"prev_log_index"`
// 前一個日志任期號
PrevLogTerm uint64 `json:"prev_log_term"`
// 要追加的實體數組-支持批量追加
Entries []logEntry `json:"entries"`
// 已經committed的縮影
CommitIndex uint64 `json:"commit_index"`
}
// appendEntriesResponse represents the response to an appendEntries RPC.
// 日志追加應答
type appendEntriesResponse struct {
// 應答節點的任期號
Term uint64 `json:"term"`
// 是否追加成功
Success bool `json:"success"`
// 失敗的原因
reason string
}
// requestVote represents a requestVote RPC.
// 投票請求實體
type requestVote struct {
// 發起者的任期號
Term uint64 `json:"term"`
// 發起者的id
CandidateID uint64 `json:"candidate_id"`
// 發起者的最新條目
LastLogIndex uint64 `json:"last_log_index"`
// 發起者的最新任期號
LastLogTerm uint64 `json:"last_log_term"`
}
// requestVoteResponse represents the response to a requestVote RPC.
// 投票應答
type requestVoteResponse struct {
// 應答者任期號
Term uint64 `json:"term"`
// 應答結果,true贊同,false反對
VoteGranted bool `json:"vote_granted"`
// 反對原因
reason string
}
log.go
var (
// 任期號太小
errTermTooSmall = errors.New("term too small")
// 日志索引太小
errIndexTooSmall = errors.New("index too small")
// 日志縮影太大
errIndexTooBig = errors.New("commit index too big")
// 日志條目內容已損壞
errInvalidChecksum = errors.New("invalid checksum")
// 無效的命令
errNoCommand = errors.New("no command")
// 錯誤的日志索引
errBadIndex = errors.New("bad index")
// 錯誤任期號
errBadTerm = errors.New("bad term")
)
// 日志結構
type raftLog struct {
// 日志讀寫鎖
sync.RWMutex
// 日志存儲接口
store io.Writer
// 日志鏡像,現在存儲于內存
entries []logEntry
// 下一條日志commit索引
commitPos int
// "操作"的回調函數,這個函數比較重要,可以"操作集合"鏡像,
// 在快照時,只需要將"結果"快到存儲層即可
apply func(uint64, []byte)[]byte
}
func newRaftLog(store io.ReadWriter, applyfunc(uint64, []byte) []byte) *raftLog {
l := &raftLog{
store: store,
entries: []logEntry{},
commitPos: -1, // no commits to begin with
apply: apply,
}
l.recover(store)
return l
}
// recover reads from the log's store, to populate the log with log entries
// from persistent storage. It should be called once, at log instantiation.
// 日志恢復,當服務重啟時,重建日志條目(一般重建都是居于于快照和日志的,可是這里沒有實現快照,所以從日志中重建即可)
// 1、這里的日志時commited之后的日志,所以重建時,commitPos也會更新
// 2、重建日志條目,會調用apply函數,對日志進行處理,這個函數相當于"狀態機"功能;如果有快照(相當于Redis 的RDB),先將安裝快照,再恢復日志(相當于Redis 的aof)
func (l *raftLog)recover(r io.Reader)error {
for {
var entry logEntry
switch err := entry.decode(r); err {
case io.EOF:
return nil // successful completion
case nil:
if err := l.appendEntry(entry); err != nil {
return err
}
l.commitPos++
l.apply(entry.Index, entry.Command)
default:
return err // unsuccessful completion
}
}
}
// entriesAfter returns a slice of log entries after (i.e. not including) the
// passed index, and the term of the log entry specified by index, as a
// convenience to the caller. (This function is only used by a leader attempting
// to flush log entries to its followers.)
//
// This function is called to populate an AppendEntries RPC. That implies they
// are destined for a follower, which implies the application of the commands
// should have the response thrown away, which implies we shouldn't pass a
// commandResponse channel (see: commitTo implementation). In the normal case,
// the raftLogEntries we return here will get serialized as they pass thru their
// transport, and lose their commandResponse channel anyway. But in the case of
// a LocalPeer (or equivalent) this doesn't happen. So, we must make sure to
// proactively strip commandResponse channels.
// 檢索index之后的日志條目
func (l *raftLog)entriesAfter(indexuint64)([]logEntry,uint64) {
l.RLock()
defer l.RUnlock()
// 1.檢索出index對應term以及在實體集合entries中的位置Pos
pos := 0
lastTerm := uint64(0)
for ; pos < len(l.entries); pos++ {
if l.entries[pos].Index > index {
break
}
lastTerm = l.entries[pos].Term
}
a := l.entries[pos:]
if len(a) == 0 {
return []logEntry{}, lastTerm
}
// 除去command Response channel
return stripResponseChannels(a), lastTerm
}
func stripResponseChannels(a []logEntry)[]logEntry {
stripped := make([]logEntry, len(a))
for i, entry := range a {
stripped[i] = logEntry{
Index: entry.Index,
Term: entry.Term,
Command: entry.Command,
commandResponse: nil,
}
}
return stripped
}
// contains returns true if a log entry with the given index and term exists in
// the log.
// 判斷是夠包含{term, index}條目
func (l *raftLog)contains(index, termuint64)bool {
l.RLock()
defer l.RUnlock()
// It's not necessarily true that l.entries[i] has index == i.
for _, entry := range l.entries {
if entry.Index == index && entry.Term == term {
return true
}
if entry.Index > index || entry.Term > term {
break
}
}
return false
}
// 判斷{term, index}是否為最新的日志條目,如果是,則將則將在其之后的日志清理掉,
// 這個條目應該在[commit_index, last_index]范圍內
func (l *raftLog)ensureLastIs(index, termuint64)error {
l.Lock()
defer l.Unlock()
// Taken loosely from benbjohnson's impl
if index < l.getCommitIndexWithLock() {
return errIndexTooSmall
}
if index > l.lastIndexWithLock() {
return errIndexTooBig
}
// It's possible that the passed index is 0. It means the leader has come to
// decide we need a complete log rebuild. Of course, that's only valid if we
// haven't committed anything, so this check comes after that one.
// 全部重建,前提是沒有commited過任何的條目
if index == 0 {
for pos := 0; pos < len(l.entries); pos++ {
if l.entries[pos].commandResponse != nil {
close(l.entries[pos].commandResponse)
l.entries[pos].commandResponse = nil
}
if l.entries[pos].committed != nil {
l.entries[pos].committed <- false
close(l.entries[pos].committed)
l.entries[pos].committed = nil
}
}
l.entries = []logEntry{}
return nil
}
// Normal case: find the position of the matching log entry.
pos := 0
for ; pos < len(l.entries); pos++ {
if l.entries[pos].Index < index {
continue // didn't find it yet
}
if l.entries[pos].Index > index {
return errBadIndex // somehow went past it
}
if l.entries[pos].Index != index {
panic("not <, not >, but somehow !=")
}
if l.entries[pos].Term != term {
return errBadTerm
}
break // good
}
// Sanity check.
// ? 怎么可能出現這種情況?
if pos < l.commitPos {
panic("index >= commitIndex, but pos < commitPos")
}
// `pos` is the position of log entry matching index and term.
// We want to truncate everything after that.
// 應為{term, index}是最新的了,所以將在其之后的所有條目給cut掉
truncateFrom := pos + 1
if truncateFrom >= len(l.entries) {
return nil // nothing to truncate
}
// If we blow away log entries that haven't yet sent responses to clients,
// signal the clients to stop waiting, by closing the channel without a
// response value.
for pos = truncateFrom; pos < len(l.entries); pos++ {
if l.entries[pos].commandResponse != nil {
close(l.entries[pos].commandResponse)
l.entries[pos].commandResponse = nil
}
if l.entries[pos].committed != nil {
l.entries[pos].committed <- false
close(l.entries[pos].committed)
l.entries[pos].committed = nil
}
}
// Truncate the log.
l.entries = l.entries[:truncateFrom]
// Done.
return nil
}
// getCommitIndex returns the commit index of the log. That is, the index of the
// last log entry which can be considered committed.
// 獲取最新的commited日志條目
func (l *raftLog)getCommitIndex()uint64 {
l.RLock()
defer l.RUnlock()
return l.getCommitIndexWithLock()
}
// 獲取最新的日志條目
func (l *raftLog)getCommitIndexWithLock()uint64 {
if l.commitPos < 0 {
return 0
}
if l.commitPos >= len(l.entries) {
panic(fmt.Sprintf("commitPos %d > len(l.entries) %d; bad bookkeeping in raftLog", l.commitPos, len(l.entries)))
}
return l.entries[l.commitPos].Index
}
// lastIndex returns the index of the most recent log entry.
func (l *raftLog)lastIndex()uint64 {
l.RLock()
defer l.RUnlock()
return l.lastIndexWithLock()
}
func (l *raftLog)lastIndexWithLock()uint64 {
if len(l.entries) <= 0 {
return 0
}
return l.entries[len(l.entries)-1].Index
}
// lastTerm returns the term of the most recent log entry.
func (l *raftLog)lastTerm()uint64 {
l.RLock()
defer l.RUnlock()
return l.lastTermWithLock()
}
func (l *raftLog)lastTermWithLock()uint64 {
if len(l.entries) <= 0 {
return 0
}
return l.entries[len(l.entries)-1].Term
}
// appendEntry appends the passed log entry to the log. It will return an error
// if the entry's term is smaller than the log's most recent term, or if the
// entry's index is too small relative to the log's most recent entry.
// 追加日志,注意此時還沒有commit該條目
func (l *raftLog)appendEntry(entry logEntry)error {
l.Lock()
defer l.Unlock()
// 判定{entry.term, entry.index} > {last_term, last_index}
if len(l.entries) > 0 {
lastTerm := l.lastTermWithLock()
if entry.Term < lastTerm {
return errTermTooSmall
}
lastIndex := l.lastIndexWithLock()
if entry.Term == lastTerm && entry.Index <= lastIndex {
return errIndexTooSmall
}
}
l.entries = append(l.entries, entry)
return nil
}
// commitTo commits all log entries up to and including the passed commitIndex.
// Commit means: synchronize the log entry to persistent storage, and call the
// state machine apply function for the log entry's command.
// 注意:
// 1、commit是一個后端任務,再此并沒有"1/2"確認的概念(實際上是不是這樣呢,這得去參考raft的論文了)
// 2、apply函數是在commit過程中調用,而不是在append的時候調用
// 3、apply相當于狀態機函數,一般用戶會將這些操作結果保存起來,用于快照
// 比如,想實現一個kv存儲,那么用戶只要將kv相關的邏輯植入這個函數即可
// committed <= commitIndex <= last_index
func (l *raftLog)commitTo(commitIndexuint64)error {
if commitIndex == 0 {
panic("commitTo(0)")
}
l.Lock()
defer l.Unlock()
// Reject old commit indexes
if commitIndex < l.getCommitIndexWithLock() {
return errIndexTooSmall
}
// Reject new commit indexes
if commitIndex > l.lastIndexWithLock() {
return errIndexTooBig
}
// If we've already committed to the commitIndex, great!
if commitIndex == l.getCommitIndexWithLock() {
return nil
}
// We should start committing at precisely the last commitPos + 1
pos := l.commitPos + 1
if pos < 0 {
panic("pending commit pos < 0")
}
// Commit entries between our existing commit index and the passed index.
// Remember to include the passed index.
for {
// Sanity checks. TODO replace with plain `for` when this is stable.
if pos >= len(l.entries) {
panic(fmt.Sprintf("commitTo pos=%d advanced past all log entries (%d)", pos, len(l.entries)))
}
if l.entries[pos].Index > commitIndex {
panic("commitTo advanced past the desired commitIndex")
}
// Encode the entry to persistent storage.
if err := l.entries[pos].encode(l.store); err != nil {
return err
}
// Forward non-configuration commands to the state machine.
// Send the responses to the waiting client, if applicable.
// 如果不是配置類型的Log,則調用apply function
// 配置類型的Log,在其他地方處理
if !l.entries[pos].isConfiguration {
resp := l.apply(l.entries[pos].Index, l.entries[pos].Command)
if l.entries[pos].commandResponse != nil {
select {
case l.entries[pos].commandResponse <- resp:
break
// 問什么選取這個時間???
case <-time.After(maximumElectionTimeout()): // << ElectionInterval
panic("unco?perative command response receiver")
}
close(l.entries[pos].commandResponse)
l.entries[pos].commandResponse = nil
}
}
// Signal the entry has been committed, if applicable.
if l.entries[pos].committed != nil {
l.entries[pos].committed <- true
close(l.entries[pos].committed)
l.entries[pos].committed = nil
}
// Mark our commit position cursor.
l.commitPos = pos
// If that was the last one, we're done.
if l.entries[pos].Index == commitIndex {
break
}
if l.entries[pos].Index > commitIndex {
panic(fmt.Sprintf(
"current entry Index %d is beyond our desired commitIndex %d",
l.entries[pos].Index,
commitIndex,
))
}
// Otherwise, advance!
pos++
}
// Done.
return nil
}
// logEntry is the atomic unit being managed by the distributed log. A log entry
// always has an index (monotonically increasing), a term in which the Raft
// network leader first sees the entry, and a command. The command is what gets
// executed against the node state machine when the log entry is successfully
// replicated.
type logEntry struct {
// 日志索引號
Index uint64 `json:"index"`
// 任期號
Term uint64 `json:"term"` // when received by leader
// 日志內容
Command []byte `json:"command,omitempty"`
// commited 通道
committed chan bool `json:"-"`
// 命令應答 通道
commandResponse chan<- []byte `json:"-"` // only non-nil on receiver's log
// 日志類型標示
isConfiguration bool `json:"-"` // for configuration change entries
}
// encode serializes the log entry to the passed io.Writer.
//
// Entries are serialized in a simple binary format:
//
// ---------------------------------------------
// | uint32 | uint64 | uint64 | uint32 | []byte |
// ---------------------------------------------
// | CRC | TERM | INDEX | SIZE | COMMAND |
// ---------------------------------------------
//
// 序列化,大端
func (e *logEntry)encode(w io.Writer)error {
if len(e.Command) <= 0 {
return errNoCommand
}
if e.Index <= 0 {
return errBadIndex
}
if e.Term <= 0 {
return errBadTerm
}
commandSize := len(e.Command)
buf := make([]byte, 24+commandSize)
binary.LittleEndian.PutUint64(buf[4:12], e.Term)
binary.LittleEndian.PutUint64(buf[12:20], e.Index)
binary.LittleEndian.PutUint32(buf[20:24], uint32(commandSize))
copy(buf[24:], e.Command)
binary.LittleEndian.PutUint32(
buf[0:4],
crc32.ChecksumIEEE(buf[4:]),
)
_, err := w.Write(buf)
return err
}
// 反序列化
// decode deserializes one log entry from the passed io.Reader.
func (e *logEntry)decode(r io.Reader)error {
header := make([]byte, 24)
if _, err := r.Read(header); err != nil {
return err
}
command := make([]byte, binary.LittleEndian.Uint32(header[20:24]))
if _, err := r.Read(command); err != nil {
return err
}
crc := binary.LittleEndian.Uint32(header[:4])
check := crc32.NewIEEE()
check.Write(header[4:])
check.Write(command)
if crc != check.Sum32() {
return errInvalidChecksum
}
e.Term = binary.LittleEndian.Uint64(header[4:12])
e.Index = binary.LittleEndian.Uint64(header[12:20])
e.Command = command
return nil
}
Peers.go
var (
errTimeout = errors.New("timeout")
)
// peers為節點的一個抽象,對外提供了一些訪問接口,
// 需要注意的地方是peers的序列化
type Peer interface {
// 返回server標示
id() uint64
// 日志追加接口
callAppendEntries(appendEntries) appendEntriesResponse
// 投票選舉接口
callRequestVote(requestVote) requestVoteResponse
// 命令調用
callCommand([]byte, chan<- []byte) error
// 集群配置變化接口
callSetConfiguration(...Peer) error
}
// localPeer is the simplest kind of peer, mapped to a server in the
// same process-space. Useful for testing and demonstration; not so
// useful for networks of independent processes.
// 本地local peers,用于測試,不用經過網絡
type localPeer struct {
server *Server
}
func newLocalPeer(server *Server)*localPeer { return &localPeer{server} }
func (p *localPeer)id()uint64 { return p.server.id }
// 追加日志
func (p *localPeer)callAppendEntries(ae appendEntries)appendEntriesResponse {
return p.server.appendEntries(ae)
}
// 投票選舉
func (p *localPeer)callRequestVote(rv requestVote)requestVoteResponse {
return p.server.requestVote(rv)
}
// 命令
// 實際調用為Leader
func (p *localPeer)callCommand(cmd []byte, responsechan<- []byte)error {
return p.server.Command(cmd, response)
}
// 設置配置
func (p *localPeer)callSetConfiguration(peers ...Peer)error {
return p.server.SetConfiguration(peers...)
}
// requestVoteTimeout issues the requestVote to the given peer.
// If no response is received before timeout, an error is returned.
// 投票
func requestVoteTimeout(p Peer, rv requestVote, timeout time.Duration)(requestVoteResponse, error) {
c := make(chan requestVoteResponse, 1)
go func() { c <- p.callRequestVote(rv) }()
select {
case resp := <-c:
return resp, nil
case <-time.After(timeout):
return requestVoteResponse{}, errTimeout
}
}
// peerMap is a collection of Peer interfaces. It provides some convenience
// functions for actions that should apply to multiple Peers.
type peerMap map[uint64]Peer
// makePeerMap constructs a peerMap from a list of peers.
func makePeerMap(peers ...Peer)peerMap {
pm := peerMap{}
for _, peer := range peers {
pm[peer.id()] = peer
}
return pm
}
// explodePeerMap converts a peerMap into a slice of peers.
func explodePeerMap(pm peerMap)[]Peer {
a := []Peer{}
for _, peer := range pm {
a = append(a, peer)
}
return a
}
func (pm peerMap)except(iduint64)peerMap {
except := peerMap{}
for id0, peer := range pm {
if id0 == id {
continue
}
except[id0] = peer
}
return except
}
func (pm peerMap)count()int { return len(pm) }
// 法定人數
func (pm peerMap)quorum()int {
switch n := len(pm); n {
case 0, 1:
return 1
default:
return (n / 2) + 1
}
}
// requestVotes sends the passed requestVote RPC to every peer in Peers. It
// forwards responses along the returned requestVoteResponse channel. It makes
// the RPCs with a timeout of BroadcastInterval * 2 (chosen arbitrarily). Peers
// that don't respond within the timeout are retried forever. The retry loop
// stops only when all peers have responded, or a Cancel signal is sent via the
// returned canceler.
func (pm peerMap)requestVotes(r requestVote)(chanvoteResponseTuple, canceler) {
// "[A server entering the candidate stage] issues requestVote RPCs in
// parallel to each of the other servers in the cluster. If the candidate
// receives no response for an RPC, it reissues the RPC repeatedly until a
// response arrives or the election concludes."
// construct the channels we'll return
abortChan := make(chan struct{})
tupleChan := make(chan voteResponseTuple)
go func() {
// We loop until all Peers have given us a response.
// Track which Peers have responded.
respondedAlready := peerMap{} // none yet
for {
notYetResponded := disjoint(pm, respondedAlready)
if len(notYetResponded) <= 0 {
return // done
}
// scatter
tupleChan0 := make(chan voteResponseTuple, len(notYetResponded))
for id, peer := range notYetResponded {
go func(iduint64, peer Peer) {
resp, err := requestVoteTimeout(peer, r, 2*maximumElectionTimeout())
tupleChan0 <- voteResponseTuple{id, resp, err}
}(id, peer)
}
// gather
for i := 0; i < cap(tupleChan0); i++ {
select {
case t := <-tupleChan0:
if t.err != nil {
continue // will need to retry
}
respondedAlready[t.id] = nil // set membership semantics
tupleChan <- t
case <-abortChan:
return // give up
}
}
}
}()
return tupleChan, cancel(abortChan)
}
// 選舉應答
type voteResponseTuple struct {
id uint64
response requestVoteResponse
err error
}
type canceler interface {
Cancel()
}
type cancel chan struct{}
func (c cancel)Cancel() { close(c) }
// 過濾peers
func disjoint(all, except peerMap)peerMap {
d := peerMap{}
for id, peer := range all {
if _, ok := except[id]; ok {
continue
}
d[id] = peer
}
return d
}
server.go
這是最重要的一個邏輯
幾點配置變更
// 角色分類
const (
follower = "Follower"
candidate = "Candidate"
leader = "Leader"
)
const (
unknownLeader = 0
noVote = 0
)
// 選舉時間隨機范圍[MinimumElectionTimeoutMS, maximumElectionTimeoutMS]
var (
MinimumElectionTimeoutMS int32 = 250
maximumElectionTimeoutMS = 2 * MinimumElectionTimeoutMS
)
var (
errNotLeader = errors.New("not the leader")
errUnknownLeader = errors.New("unknown leader")
errDeposed = errors.New("deposed during replication")
errAppendE#008000ntriesRejected = errors.New("appendEntries RPC rejected")
errReplicationFailed = errors.New("command replication failed (but will keep retrying)")
errOutOfSync = errors.New("out of sync")
errAlreadyRunning = errors.New("already running")
)
// 重置選舉時間
func resetElectionTimeoutMS(newMin, newMaxint)(int,int) {
oldMin := atomic.LoadInt32(&MinimumElectionTimeoutMS)
oldMax := atomic.LoadInt32(&maximumElectionTimeoutMS)
atomic.StoreInt32(&MinimumElectionTimeoutMS, int32(newMin))
atomic.StoreInt32(&maximumElectionTimeoutMS, int32(newMax))
return int(oldMin), int(oldMax)
}
// minimumElectionTimeout returns the current minimum election timeout.
func minimumElectionTimeout()time.Duration {
return time.Duration(MinimumElectionTimeoutMS) * time.Millisecond
}
// maximumElectionTimeout returns the current maximum election time.
func maximumElectionTimeout()time.Duration {
return time.Duration(maximumElectionTimeoutMS) * time.Millisecond
}
// 選舉時間隨機函數
func electionTimeout()time.Duration {
n := rand.Intn(int(maximumElectionTimeoutMS - MinimumElectionTimeoutMS))
d := int(MinimumElectionTimeoutMS) + n
return time.Duration(d) * time.Millisecond
}
// broadcastInterval returns the interval between heartbeats (AppendEntry RPCs)
// broadcast from the leader. It is the minimum election timeout / 10, as
// dictated by the spec: BroadcastInterval << ElectionTimeout << MTBF.
// 廣播時間,用于Leader發送心跳廣播,這個時間應小于選舉時間;否則,非Leader節點會產生選舉操作
func broadcastInterval()time.Duration {
d := MinimumElectionTimeoutMS / 10
return time.Duration(d) * time.Millisecond
}
// protectedString is just a string protected by a mutex.
type protectedString struct {
sync.RWMutex
value string
}
func (s *protectedString)Get()string {
s.RLock()
defer s.RUnlock()
return s.value
}
func (s *protectedString)Set(valuestring) {
s.Lock()
defer s.Unlock()
s.value = value
}
// protectedBool is just a bool protected by a mutex.
type protectedBool struct {
sync.RWMutex
value bool
}
func (s *protectedBool)Get()bool {
s.RLock()
defer s.RUnlock()
return s.value
}
func (s *protectedBool)Set(valuebool) {
s.Lock()
defer s.Unlock()
s.value = value
}
// Server is the agent that performs all of the Raft protocol logic.
// In a typical application, each running process that wants to be part of
// the distributed state machine will contain a server component.
type Server struct {
id uint64 // id of this server
// 節點狀態
state *protectedString
// 節點運行狀態
running *protectedBool
// Leader節點標示
leader uint64
// 當前節點任期號
term uint64 // "current term number, which increases monotonically"
// 0表示,當前節點還有投出自己的票;非零表示節點已經投票了,值是獲票者的標示ID
vote uint64 // who we voted for this term, if applicable
log *raftLog
config *configuration
// 追加日志信道
appendEntriesChan chan appendEntriesTuple
// 投票信道
requestVoteChan chan requestVoteTuple
// 命令信道
commandChan chan commandTuple
// 配置修改信道
configurationChan chan configurationTuple
// 選舉信道
electionTick <-chan time.Time
// 退出信道
quit chan chan struct{}
}
// 狀態機函數
// 該函數不可并發執行,否則就達不到一致性狀態機的效果(執行時間不要超過選舉時間)
// 正常來說,只有"共識"達成的時候,才會調用該函數,然后返回給客戶端
// 但是,在這里為了簡化實現,"共識“算法是放在后臺任務操作的,客戶端發送命令單Leader時,Leader馬上
// 應答客戶端,并沒有等”共識算法“的共識結果
type ApplyFunc func(commitIndexuint64, cmd []byte)[]byte
// 初始化節點
// 1. 構建日志 2.初始化為"follower"角色 3.leader為"unknown"
func NewServer(iduint64, store io.ReadWriter, a ApplyFunc) *Server {
if id <= 0 {
panic("server id must be > 0")
}
// 5.2 Leader election: "the latest term this server has seen is persisted,
// and is initialized to 0 on first boot."
log := newRaftLog(store, a)
latestTerm := log.lastTerm()
s := &Server{
id: id,
state: &protectedString{value: follower}, // "when servers start up they begin as followers"
running: &protectedBool{value: false},
leader: unknownLeader, // unknown at startup
log: log,
term: latestTerm,
config: newConfiguration(peerMap{}),
appendEntriesChan: make(chan appendEntriesTuple),
requestVoteChan: make(chan requestVoteTuple),
commandChan: make(chan commandTuple),
configurationChan: make(chan configurationTuple),
electionTick: nil,
quit: make(chan chan struct{}),
}
s.resetElectionTimeout()
return s
}
type configurationTuple struct {
Peers []Peer
Err chan error
}
// 設置配置
// 1. 服務啟動時,先設置配置
// 2. 集群變更時,設置配置
func (s *Server)SetConfiguration(peers ...Peer)error {
// 節點剛啟動
if !s.running.Get() {
s.config.directSet(makePeerMap(peers...))
return nil
}
err := make(chan error)
// 節點已經啟動了
s.configurationChan <- configurationTuple{peers, err}
return <-err
}
// Start triggers the server to begin communicating with its peers.
func (s *Server)Start() {
go s.loop()
}
// Stop terminates the server. Stopped servers should not be restarted.
func (s *Server)Stop() {
q := make(chan struct{})
s.quit <- q
<-q
s.logGeneric("server stopped")
}
// 命令元組
type commandTuple struct {
// 命令內容
Command []byte
// 命令信道
CommandResponse chan<- []byte
Err chan error
}
// 命令接口
func (s *Server)Command(cmd []byte, responsechan<- []byte)error {
err := make(chan error)
s.commandChan <- commandTuple{cmd, response, err}
return <-err
}
// 日志追加
func (s *Server)appendEntries(ae appendEntries)appendEntriesResponse {
t := appendEntriesTuple{
Request: ae,
Response: make(chan appendEntriesResponse),
}
s.appendEntriesChan <- t
return <-t.Response
}
// 投票
func (s *Server)requestVote(rv requestVote)requestVoteResponse {
t := requestVoteTuple{
Request: rv,
Response: make(chan requestVoteResponse),
}
s.requestVoteChan <- t
return <-t.Response
}
// times out,
// new election
// | .-----.
// | | |
// v times out, | v receives votes from
// +----------+ starts election +-----------+ majority of servers +--------+
// | Follower |------------------>| Candidate |---------------------->| Leader |
// +----------+ +-----------+ +--------+
// ^ ^ | |
// | | discovers current leader | |
// | | or new term | |
// | '------------------------------' |
// | |
// | discovers server with higher term |
// '------------------------------------------------------------------'
//
//
func (s *Server)loop() {
s.running.Set(true)
for s.running.Get() {
switch state := s.state.Get(); state {
case follower:
s.followerSelect()
case candidate:
s.candidateSelect()
case leader:
s.leaderSelect()
default:
panic(fmt.Sprintf("unknown Server State '%s'", state))
}
}
}
func (s *Server)resetElectionTimeout() {
s.electionTick = time.NewTimer(electionTimeout()).C
}
func (s *Server)logGeneric(formatstring, args ...interface{}) {
prefix := fmt.Sprintf("id=%d term=%d state=%s: ", s.id, s.term, s.state.Get())
log.Printf(prefix+format, args...)
}
func (s *Server)logAppendEntriesResponse(req appendEntries, resp appendEntriesResponse, stepDownbool) {
s.logGeneric(
"got appendEntries, sz=%d leader=%d prevIndex/Term=%d/%d commitIndex=%d: responded with success=%v (reason='%s') stepDown=%v",
len(req.Entries),
req.LeaderID,
req.PrevLogIndex,
req.PrevLogTerm,
req.CommitIndex,
resp.Success,
resp.reason,
stepDown,
)
}
func (s *Server)logRequestVoteResponse(req requestVote, resp requestVoteResponse, stepDownbool) {
s.logGeneric(
"got RequestVote, candidate=%d: responded with granted=%v (reason='%s') stepDown=%v",
req.CandidateID,
resp.VoteGranted,
resp.reason,
stepDown,
)
}
func (s *Server)handleQuit(qchan struct{}) {
s.logGeneric("got quit signal")
s.running.Set(false)
close(q)
}
// 命令轉發
// 如果當前節點不是Leader節點,并且已存在Leader節點,則其會以"代理“的角色,將命令轉發至Leader節點
func (s *Server)forwardCommand(t commandTuple) {
switch s.leader {
case unknownLeader:
s.logGeneric("got command, but don't know leader")
t.Err <- errUnknownLeader
case s.id: // I am the leader
panic("impossible state in forwardCommand")
default:
leader, ok := s.config.get(s.leader)
if !ok {
panic("invalid state in peers")
}
s.logGeneric("got command, forwarding to leader (%d)", s.leader)
// We're blocking our {follower,candidate}Select function in the
// receive-command branch. If we continue to block while forwarding
// the command, the leader won't be able to get a response from us!
go func() { t.Err <- leader.callCommand(t.Command, t.CommandResponse) }()
}
}
// 配置變更
// 轉發規則和命令轉發一樣
func (s *Server)forwardConfiguration(t configurationTuple) {
switch s.leader {
case unknownLeader:
s.logGeneric("got configuration, but don't know leader")
t.Err <- errUnknownLeader
case s.id: // I am the leader
panic("impossible state in forwardConfiguration")
default:
leader, ok := s.config.get(s.leader)
if !ok {
panic("invalid state in peers")
}
s.logGeneric("got configuration, forwarding to leader (%d)", s.leader)
go func() { t.Err <- leader.callSetConfiguration(t.Peers...) }()
}
}
// follower 節點邏輯
func (s *Server)followerSelect() {
for {
select {
case q := <-s.quit:
s.handleQuit(q)
return
// 命令轉發
case t := <-s.commandChan:
s.forwardCommand(t)
// 集群變更轉發
case t := <-s.configurationChan:
s.forwardConfiguration(t)
// Leader選舉
case <-s.electionTick:
// 5.2 Leader election: "A follower increments its current term and
// transitions to candidate state."
if s.config == nil {
s.logGeneric("election timeout, but no configuration: ignoring")
s.resetElectionTimeout()
continue
}
s.logGeneric("election timeout, becoming candidate")
// 提高自己的任期號
s.term++
// 投票置為空
s.vote = noVote
// Leader
s.leader = unknownLeader
// 設置節點角色為"候選人"
s.state.Set(candidate)
// 重置選舉時間,防止馬上再次出發選舉
s.resetElectionTimeout()
return
// 日志追加(除了客戶端請求,leader的心跳也會出發這個行為)
case t := <-s.appendEntriesChan:
if s.leader == unknownLeader {
s.leader = t.Request.LeaderID
s.logGeneric("discovered Leader %d", s.leader)
}
// 處理日志最佳操作
resp, stepDown := s.handleAppendEntries(t.Request)
s.logAppendEntriesResponse(t.Request, resp, stepDown)
t.Response <- resp
// 如果節點已經脫離了當前的集群,需要跟新Leader地址
if stepDown {
// stepDown as a Follower means just to reset the leader
if s.leader != unknownLeader {
s.logGeneric("abandoning old leader=%d", s.leader)
}
s.logGeneric("following new leader=%d", t.Request.LeaderID)
s.leader = t.Request.LeaderID
}
// 選舉
case t := <-s.requestVoteChan:
// 選舉處理
resp, stepDown := s.handleRequestVote(t.Request)
s.logRequestVoteResponse(t.Request, resp, stepDown)
t.Response <- resp
// 如果落后于當前節點了,把當前的Leader修改為"unkownleader",等待訊據成功后,進行切換
if stepDown {
// stepDown as a Follower means just to reset the leader
if s.leader != unknownLeader {
s.logGeneric("abandoning old leader=%d", s.leader)
}
s.logGeneric("new leader unknown")
s.leader = unknownLeader
}
}
}
}
// 候選狀態
func (s *Server)candidateSelect() {
if s.leader != unknownLeader {
panic("known leader when entering candidateSelect")
}
if s.vote != 0 {
panic("existing vote when entering candidateSelect")
}
// "[A server entering the candidate stage] issues requestVote RPCs in
// parallel to each of the other servers in the cluster. If the candidate
// receives no response for an RPC, it reissues the RPC repeatedly until a
// response arrives or the election concludes."
// 發起選舉RPC
requestVoteResponses, canceler := s.config.allPeers().except(s.id).requestVotes(requestVote{
Term: s.term,
CandidateID: s.id,
LastLogIndex: s.log.lastIndex(),
LastLogTerm: s.log.lastTerm(),
})
defer canceler.Cancel()
// Set up vote tallies (plus, vote for myself)
votes := map[uint64]bool{s.id: true}
s.vote = s.id
s.logGeneric("term=%d election started (configuration state %s)", s.term, s.config.state)
// 如果已經達到了選舉“共識”,則成功選舉
if s.config.pass(votes) {
s.logGeneric("I immediately won the election")
s.leader = s.id
s.state.Set(leader)
s.vote = noVote
return
}
// "A candidate continues in this state until one of three things happens:
// (a) it wins the election, (b) another server establishes itself as
// leader, or (c) a period of time goes by with no winner."
for {
select {
case q := <-s.quit:
s.handleQuit(q)
return
// 命令轉發
case t := <-s.commandChan:
s.forwardCommand(t)
// 配置更新轉發,注意和Leader的不同
case t := <-s.configurationChan:
s.forwardConfiguration(t)
// 收到選舉的應答
case t := <-requestVoteResponses:
s.logGeneric("got vote: id=%d term=%d granted=%v", t.id, t.response.Term, t.response.VoteGranted)
// "A candidate wins the election if it receives votes from a
// majority of servers in the full cluster for the same term."
// 本節點落后于其他幾點
if t.response.Term > s.term {
s.logGeneric("got vote from future term (%d>%d); abandoning election", t.response.Term, s.term)
s.leader = unknownLeader
s.state.Set(follower)
s.vote = noVote
return // lose
}
// 收到了"落后"當前節點的應答,忽略掉它
if t.response.Term < s.term {
s.logGeneric("got vote from past term (%d<%d); ignoring", t.response.Term, s.term)
break
}
// 收到贊同票
if t.response.VoteGranted {
s.logGeneric("%d voted for me", t.id)
votes[t.id] = true
}
// "Once a candidate wins an election, it becomes leader."
// “共識”達成
if s.config.pass(votes) {
s.logGeneric("I won the election")
s.leader = s.id
s.state.Set(leader)
s.vote = noVote
return // win
}
// 收到日志追加(在這里,心跳也當做日志追加的方式發送)
case t := <-s.appendEntriesChan:
// "While waiting for votes, a candidate may receive an
// appendEntries RPC from another server claiming to be leader.
// If the leader's term (included in its RPC) is at least as
// large as the candidate's current term, then the candidate
// recognizes the leader as legitimate and steps down, meaning
// that it returns to follower state."
// 處理日志
resp, stepDown := s.handleAppendEntries(t.Request)
s.logAppendEntriesResponse(t.Request, resp, stepDown)
t.Response <- resp
// candidate節點落后于Leader節點
if stepDown {
s.logGeneric("after an appendEntries, stepping down to Follower (leader=%d)", t.Request.LeaderID)
s.leader = t.Request.LeaderID
s.state.Set(follower)
return // lose
}
// 雖然當前節點是candidate節點,但集群中此時可能存在多個candidate節點
case t := <-s.requestVoteChan:
// We can also be defeated by a more recent candidate
resp, stepDown := s.handleRequestVote(t.Request)
s.logRequestVoteResponse(t.Request, resp, stepDown)
t.Response <- resp
if stepDown {
// 當前candidate節點落后于集群中已存在的candidate節點,將自己的角色變為follower,
// 并且也會投贊同票
s.logGeneric("after a requestVote, stepping down to Follower (leader unknown)")
s.leader = unknownLeader
s.state.Set(follower)
return // lose
}
// 選舉
case <-s.electionTick:
// "The third possible outcome is that a candidate neither wins nor
// loses the election: if many followers become candidates at the
// same time, votes could be split so that no candidate obtains a
// majority. When this happens, each candidate will start a new
// election by incrementing its term and initiating another round of
// requestVote RPCs."
s.logGeneric("election ended with no winner; incrementing term and trying again")
s.resetElectionTimeout()
s.term++
s.vote = noVote
return // draw
}
}
}
// Leader 保存的Follower節點的所有最新同步條目
type nextIndex struct {
sync.RWMutex
m map[uint64]uint64 // followerId: nextIndex
}
func newNextIndex(pm peerMap, defaultNextIndexuint64)*nextIndex {
ni := &nextIndex{
m: map[uint64]uint64{},
}
for id := range pm {
ni.m[id] = defaultNextIndex
}
return ni
}
// 找出已經同步Follower的最小日志
func (ni *nextIndex)bestIndex()uint64 {
ni.RLock()
defer ni.RUnlock()
if len(ni.m) <= 0 {
return 0
}
i := uint64(math.MaxUint64)
for _, nextIndex := range ni.m {
if nextIndex < i {
i = nextIndex
}
}
return i
}
// 返回節點(id)最新的同步日志
func (ni *nextIndex)prevLogIndex(iduint64)uint64 {
ni.RLock()
defer ni.RUnlock()
if _, ok := ni.m[id]; !ok {
panic(fmt.Sprintf("peer %d not found", id))
}
return ni.m[id]
}
// 自減節點(id)的最新同步日志,用于同步失敗時的回滾
func (ni *nextIndex)decrement(iduint64, prevuint64)(uint64, error) {
ni.Lock()
defer ni.Unlock()
i, ok := ni.m[id]
if !ok {
panic(fmt.Sprintf("peer %d not found", id))
}
if i != prev {
return i, errOutOfSync
}
if i > 0 {
ni.m[id]--
}
return ni.m[id], nil
}
// 更新節點(id)的同步日志
func (ni *nextIndex)set(id, index, prevuint64)(uint64, error) {
ni.Lock()
defer ni.Unlock()
i, ok := ni.m[id]
if !ok {
panic(fmt.Sprintf("peer %d not found", id))
}
if i != prev {
return i, errOutOfSync
}
ni.m[id] = index
return index, nil
}
// 心跳、復制命令都會用到該函數,flush是同步的,如果對端節點不可達,則阻塞
func (s *Server)flush(peer Peer, ni *nextIndex)error {
peerID := peer.id()
// Leader的任期號
currentTerm := s.term
// 節點(peer)的最新同步索引
prevLogIndex := ni.prevLogIndex(peerID)
// 檢索出peers節點落后于Leader幾點的日志條目,然后進行同步
entries, prevLogTerm := s.log.entriesAfter(prevLogIndex)
// 獲取Leader committed的最新索引
commitIndex := s.log.getCommitIndex()
s.logGeneric("flush to %d: term=%d leaderId=%d prevLogIndex/Term=%d/%d sz=%d commitIndex=%d", peerID, currentTerm, s.id, prevLogIndex, prevLogTerm, len(entries), commitIndex)
// 日志追加RPC
resp := peer.callAppendEntries(appendEntries{
Term: currentTerm,
LeaderID: s.id,
PrevLogIndex: prevLogIndex,
PrevLogTerm: prevLogTerm,
Entries: entries,
CommitIndex: commitIndex,
})
if resp.Term > currentTerm {
// 應答的節點比當前節點的任期號大,當前的Leader被罷免
s.logGeneric("flush to %d: responseTerm=%d > currentTerm=%d: deposed", peerID, resp.Term, currentTerm)
return errDeposed
}
if !resp.Success {
// 應答失敗,可能是leader RPC等待超時,或者出現了網絡錯誤(包括腦裂),回滾
newPrevLogIndex, err := ni.decrement(peerID, prevLogIndex)
if err != nil {
s.logGeneric("flush to %d: while decrementing prevLogIndex: %s", peerID, err)
return err
}
s.logGeneric("flush to %d: rejected; prevLogIndex(%d) becomes %d", peerID, peerID, newPrevLogIndex)
return errAppendEntriesRejected
}
if len(entries) > 0 {
// 復制成功,更新同步狀態
newPrevLogIndex, err := ni.set(peer.id(), entries[len(entries)-1].Index, prevLogIndex)
if err != nil {
s.logGeneric("flush to %d: while moving prevLogIndex forward: %s", peerID, err)
return err
}
s.logGeneric("flush to %d: accepted; prevLogIndex(%d) becomes %d", peerID, peerID, newPrevLogIndex)
return nil
}
s.logGeneric("flush to %d: accepted; prevLogIndex(%d) remains %d", peerID, peerID, ni.prevLogIndex(peerID))
return nil
}
// Leader并發同步日志
func (s *Server)concurrentFlush(pm peerMap, ni *nextIndex, timeout time.Duration)(int,bool) {
type tuple struct {
id uint64
err error
}
responses := make(chan tuple, len(pm))
for _, peer := range pm {
go func(peer Peer) {
errChan := make(chan error, 1)
go func() { errChan <- s.flush(peer, ni) }()
go func() { time.Sleep(timeout); errChan <- errTimeout }()
responses <- tuple{peer.id(), <-errChan} // first responder wins
}(peer)
}
successes, stepDown := 0, false
for i := 0; i < cap(responses); i++ {
switch t := <-responses; t.err {
case nil:
s.logGeneric("concurrentFlush: peer %d: OK (prevLogIndex(%d)=%d)", t.id, t.id, ni.prevLogIndex(t.id))
successes++
case errDeposed:
// 當前的Leder節點落后于其他節點
s.logGeneric("concurrentFlush: peer %d: deposed!", t.id)
stepDown = true
default:
s.logGeneric("concurrentFlush: peer %d: %s (prevLogIndex(%d)=%d)", t.id, t.err, t.id, ni.prevLogIndex(t.id))
// nothing to do but log and continue
}
}
return successes, stepDown
}
// 作為Leader角色運行
func (s *Server)leaderSelect() {
if s.leader != s.id {
panic(fmt.Sprintf("leader (%d) not me (%d) when entering leaderSelect", s.leader, s.id))
}
if s.vote != 0 {
panic(fmt.Sprintf("vote (%d) not zero when entering leaderSelect", s.leader))
}
// 5.3 Log replication: "The leader maintains a nextIndex for each follower,
// which is the index of the next log entry the leader will send to that
// follower. When a leader first comes to power it initializes all nextIndex
// values to the index just after the last one in its log."
//
// I changed this from lastIndex+1 to simply lastIndex. Every initial
// communication from leader to follower was being rejected and we were
// doing the decrement. This was just annoying, except if you manage to
// sneak in a command before the first heartbeat. Then, it will never get
// properly replicated (it seemed).
// Leader為每個Follower保存了最新的同步日志索引
ni := newNextIndex(s.config.allPeers().except(s.id), s.log.lastIndex()) // +1)
flush := make(chan struct{})
heartbeat := time.NewTicker(broadcastInterval())
defer heartbeat.Stop()
go func() {
// 發送心跳,除了檢測心跳外,還有防止Follower發送選舉
for _ = range heartbeat.C {
flush <- struct{}{}
}
}()
for {
select {
case q := <-s.quit:
s.handleQuit(q)
return
// 收到命令
case t := <-s.commandChan:
// Append the command to our (leader) log
s.logGeneric("got command, appending")
currentTerm := s.term
entry := logEntry{
Index: s.log.lastIndex() + 1,
Term: currentTerm,
Command: t.Command,
commandResponse: t.CommandResponse,
}
// 追加日志
if err := s.log.appendEntry(entry); err != nil {
t.Err <- err
continue
}
s.logGeneric(
"after append, commitIndex=%d lastIndex=%d lastTerm=%d",
s.log.getCommitIndex(),
s.log.lastIndex(),
s.log.lastTerm(),
)
// Now that the entry is in the log, we can fall back to the
// normal flushing mechanism to attempt to replicate the entry
// and advance the commit index. We trigger a manual flush as a
// convenience, so our caller might get a response a bit sooner.
// 這里將日志同步放到了同步隊列就返回給客戶端了,正常來說,需要"共識"達成才返回給客戶端
go func() { flush <- struct{}{} }()
t.Err <- nil
// 收到配置變更
case t := <-s.configurationChan:
// Attempt to change our local configuration
if err := s.config.changeTo(makePeerMap(t.Peers...)); err != nil {
t.Err <- err
continue
}
// Serialize the local (C_old,new) configuration
encodedConfiguration, err := s.config.encode()
if err != nil {
t.Err <- err
continue
}
// We're gonna write+replicate that config via log mechanisms.
// Prepare the on-commit callback.
entry := logEntry{
Index: s.log.lastIndex() + 1,
Term: s.term,
Command: encodedConfiguration,
isConfiguration: true,
committed: make(chan bool),
}
go func() {
// 當日志被commited時,committed將被回調
committed := <-entry.committed
if !committed {
s.config.changeAborted()
return
}
// 日志被committed了,說明其他節點都應用了最新的配置,所以當前的節點配置也需要更新
s.config.changeCommitted()
if _, ok := s.config.allPeers()[s.id]; !ok {
// 當前節點已被新集群剔除
s.logGeneric("leader expelled; shutting down")
q := make(chan struct{})
s.quit <- q
// 節點已退出
<-q
}
}()
// 日志追加
if err := s.log.appendEntry(entry); err != nil {
t.Err <- err
continue
}
case <-flush:
// 獲取需要同步的節點
recipients := s.config.allPeers().except(s.id)
// Special case: network of 1
if len(recipients) <= 0 {
ourLastIndex := s.log.lastIndex()
if ourLastIndex > 0 {
if err := s.log.commitTo(ourLastIndex); err != nil {
s.logGeneric("commitTo(%d): %s", ourLastIndex, err)
continue
}
s.logGeneric("after commitTo(%d), commitIndex=%d", ourLastIndex, s.log.getCommitIndex())
}
continue
}
// Normal case: network of at-least-2
// 并發同步日志
successes, stepDown := s.concurrentFlush(recipients, ni, 2*broadcastInterval())
if stepDown {
// 節點已被卸任
s.logGeneric("deposed during flush")
s.state.Set(follower)
s.leader = unknownLeader
return
}
// Only when we know all followers accepted the flush can we
// consider incrementing commitIndex and pushing out another
// round of flushes.
if successes == len(recipients) {
// 最小被同步的Index
peersBestIndex := ni.bestIndex()
ourLastIndex := s.log.lastIndex()
ourCommitIndex := s.log.getCommitIndex()
if peersBestIndex > ourLastIndex {
// safety check: we've probably been deposed
s.logGeneric("peers' best index %d > our lastIndex %d", peersBestIndex, ourLastIndex)
s.logGeneric("this is crazy, I'm gonna become a follower")
s.leader = unknownLeader
s.vote = noVote
s.state.Set(follower)
return
}
if peersBestIndex > ourCommitIndex {
// committed Leader Index
if err := s.log.commitTo(peersBestIndex); err != nil {
s.logGeneric("commitTo(%d): %s", peersBestIndex, err)
// 比如某個Follower在同步Index時失敗了,
continue // oh well, next time?
}
if s.log.getCommitIndex() > ourCommitIndex {
// 繼續同步日志
s.logGeneric("after commitTo(%d), commitIndex=%d -- queueing another flush", peersBestIndex, s.log.getCommitIndex())
go func() { flush <- struct{}{} }()
}
}
}
// 追加日志, 正常來說,Leader節點是不會受到該命令的,出現這種的可能是集群存在一個新的Leader節點,這命令就是該Leader發送過來的
case t := <-s.appendEntriesChan:
resp, stepDown := s.handleAppendEntries(t.Request)
s.logAppendEntriesResponse(t.Request, resp, stepDown)
t.Response <- resp
if stepDown {
s.logGeneric("after an appendEntries, deposed to Follower (leader=%d)", t.Request.LeaderID)
s.leader = t.Request.LeaderID
s.state.Set(follower)
return // deposed
}
// 受到投票請求
case t := <-s.requestVoteChan:
resp, stepDown := s.handleRequestVote(t.Request)
s.logRequestVoteResponse(t.Request, resp, stepDown)
t.Response <- resp
if stepDown {
s.logGeneric("after a requestVote, deposed to Follower (leader unknown)")
s.leader = unknownLeader
s.state.Set(follower)
return // deposed
}
}
}
}
// handleRequestVote will modify s.term and s.vote, but nothing else.
// stepDown means you need to: s.leader=unknownLeader, s.state.Set(Follower).
// 處理投票
// 可能會修改s.term和s.vote 的值; stepDown意味著需要設置s.leader = unkownLeader, s.state.Set(Follower)
func (s *Server)handleRequestVote(rv requestVote)(requestVoteResponse,bool) {
// Spec is ambiguous here; basing this (loosely!) on benbjohnson's impl
// If the request is from an old term, reject
if rv.Term < s.term {
return requestVoteResponse{
Term: s.term,
VoteGranted: false,
reason: fmt.Sprintf("Term %d < %d", rv.Term, s.term),
}, false
}
// If the request is from a newer term, reset our state
stepDown := false
if rv.Term > s.term {
// 本地節點落后于集群的其他節點,需要更新一下自己的任期號
s.logGeneric("requestVote from newer term (%d): we defer", rv.Term)
s.term = rv.Term
s.vote = noVote
s.leader = unknownLeader
stepDown = true
}
// Special case: if we're the leader, and we haven't been deposed by a more
// recent term, then we should always deny the vote
if s.state.Get() == leader && !stepDown {
// 如果本地節點是Leader,并且又不落后于req 節點,則投反對票
return requestVoteResponse{
Term: s.term,
VoteGranted: false,
reason: "already the leader",
}, stepDown
}
// If we've already voted for someone else this term, reject
// 如果已經投過票,則投失敗票
if s.vote != 0 && s.vote != rv.CandidateID {
if stepDown {
panic("impossible state in handleRequestVote")
}
return requestVoteResponse{
Term: s.term,
VoteGranted: false,
reason: fmt.Sprintf("already cast vote for %d", s.vote),
}, stepDown
}
// If the candidate log isn't at least as recent as ours, reject
if s.log.lastIndex() > rv.LastLogIndex || s.log.lastTerm() > rv.LastLogTerm {
return requestVoteResponse{
Term: s.term,
VoteGranted: false,
reason: fmt.Sprintf(
"our index/term %d/%d > %d/%d",
s.log.lastIndex(),
s.log.lastTerm(),
rv.LastLogIndex,
rv.LastLogTerm,
),
}, stepDown
}
// We passed all the tests: cast vote in favor
s.vote = rv.CandidateID
s.resetElectionTimeout()
return requestVoteResponse{
Term: s.term,
VoteGranted: true,
}, stepDown
}
// handleAppendEntries will modify s.term and s.vote, but nothing else.
// stepDown means you need to: s.leader=r.LeaderID, s.state.Set(Follower).
// 追加日志,需要注意的是,handleAppendEntries也會修改s.term和s.vote
// stepDown也會修改s.Leader, s,state
// 需要注意的是,本地節點的state不同時,其行為也是不用的
func (s *Server)handleAppendEntries(r appendEntries)(appendEntriesResponse,bool) {
// Spec is ambiguous here; basing this on benbjohnson's impl
// Maybe a nicer way to handle this is to define explicit handler functions
// for each Server state. Then, we won't try to hide too much logic (i.e.
// too many protocol rules) in one code path.
// If the request is from an old term, reject
if r.Term < s.term {
return appendEntriesResponse{
Term: s.term,
Success: false,
reason: fmt.Sprintf("Term %d < %d", r.Term, s.term),
}, false
}
// If the request is from a newer term, reset our state
stepDown := false
if r.Term > s.term {
s.term = r.Term
s.vote = noVote
stepDown = true
}
// Special case for candidates: "While waiting for votes, a candidate may
// receive an appendEntries RPC from another server claiming to be leader.
// If the leader’s term (included in its RPC) is at least as large as the
// candidate’s current term, then the candidate recognizes the leader as
// legitimate and steps down, meaning that it returns to follower state."
if s.state.Get() == candidate && r.LeaderID != s.leader && r.Term >= s.term {
s.term = r.Term
s.vote = noVote
stepDown = true
}
// In any case, reset our election timeout
s.resetElectionTimeout()
// Reject if log doesn't contain a matching previous entry
// 如果{PreLogIndex, PreLogTerm} 不是最新的條目,則失敗
// [{1, 2},{1, 3}, {1,4},{1,5},{1,6}] => {1,5} => [{1, 2},{1, 3}, {1,4},{1,5}]
if err := s.log.ensureLastIs(r.PrevLogIndex, r.PrevLogTerm); err != nil {
return appendEntriesResponse{
Term: s.term,
Success: false,
reason: fmt.Sprintf(
"while ensuring last log entry had index=%d term=%d: error: %s",
r.PrevLogIndex,
r.PrevLogTerm,
err,
),
}, stepDown
}
// Process the entries
for i, entry := range r.Entries {
// Configuration changes requre special preprocessing
var pm peerMap
// 處理配置
if entry.isConfiguration {
commandBuf := bytes.NewBuffer(entry.Command)
if err := gob.NewDecoder(commandBuf).Decode(±); err != nil {
panic("gob decode of peers failed")
}
if s.state.Get() == leader {
// TODO should we instead just ignore this entry?
return appendEntriesResponse{
Term: s.term,
Success: false,
reason: fmt.Sprintf(
"AppendEntry %d/%d failed (configuration): %s",
i+1,
len(r.Entries),
"Leader shouldn't receive configurations via appendEntries",
),
}, stepDown
}
// Expulsion recognition
if _, ok := pm[s.id]; !ok {
entry.committed = make(chan bool)
go func() {
if <-entry.committed {
s.logGeneric("non-leader expelled; shutting down")
q := make(chan struct{})
s.quit <- q
<-q
}
}()
}
}
// Append entry to the log
if err := s.log.appendEntry(entry); err != nil {
return appendEntriesResponse{
Term: s.term,
Success: false,
reason: fmt.Sprintf(
"AppendEntry %d/%d failed: %s",
i+1,
len(r.Entries),
err,
),
}, stepDown
}
// "Once a given server adds the new configuration entry to its log, it
// uses that configuration for all future decisions (it does not wait
// for the entry to become committed)."
if entry.isConfiguration {
if err := s.config.directSet(pm); err != nil {
return appendEntriesResponse{
Term: s.term,
Success: false,
reason: fmt.Sprintf(
"AppendEntry %d/%d failed (configuration): %s",
i+1,
len(r.Entries),
err,
),
}, stepDown
}
}
}
// Commit up to the commit index.
//
// < ptrb> ongardie: if the new leader sends a 0-entry appendEntries
// with lastIndex=5 commitIndex=4, to a follower that has lastIndex=5
// commitIndex=5 -- in my impl, this fails, because commitIndex is too
// small. shouldn't be?
// <@ongardie> ptrb: i don't think that should fail
// <@ongardie> there are 4 ways an appendEntries request can fail: (1)
// network drops packet (2) caller has stale term (3) would leave gap in
// the recipient's log (4) term of entry preceding the new entries doesn't
// match the term at the same index on the recipient
//
// 出現這種情況的原因可能是本地節點運行到committed邏輯的時候出現了問題,或者說應答給Leader時,網絡出現了問題等等。
// 這些情況都會造成數據不同步的情況,也就是本地節點的commiitted情況和Leader節點保存的Follower(本地節點)不一致
if r.CommitIndex > 0 && r.CommitIndex > s.log.getCommitIndex() {
if err := s.log.commitTo(r.CommitIndex); err != nil {
return appendEntriesResponse{
Term: s.term,
Success: false,
reason: fmt.Sprintf("CommitTo(%d) failed: %s", r.CommitIndex, err),
}, stepDown
}
}
// all good
return appendEntriesResponse{
Term: s.term,
Success: true,
}, stepDown
}
configuration.go
var (
errConfigurationAlreadyChanging = errors.New("configuration already changing")
)
const (
cOld = "C_old"
cOldNew = "C_old,new"
)
// configuration represents the sets of peers and behaviors required to
// implement joint-consensus.
type configuration struct {
sync.RWMutex
state string
// 老配置
cOldPeers peerMap
// 新配置-》用于過度
cNewPeers peerMap
}
// newConfiguration returns a new configuration in stable (C_old) state based
// on the passed peers.
func newConfiguration(pm peerMap)*configuration {
return &configuration{
state: cOld, // start in a stable state,
cOldPeers: pm, // with only C_old
}
}
// directSet is used when bootstrapping, and when receiving a replicated
// configuration from a leader. It directly sets the configuration to the
// passed peers. It's assumed this is called on a non-leader, and therefore
// requires no consistency dance.
// 配置變更
func (c *configuration)directSet(pm peerMap)error {
c.Lock()
defer c.Unlock()
c.cOldPeers = pm
c.cNewPeers = peerMap{}
c.state = cOld
return nil
}
func (c *configuration)get(iduint64)(Peer,bool) {
c.RLock()
defer c.RUnlock()
if peer, ok := c.cOldPeers[id]; ok {
return peer, true
}
if peer, ok := c.cNewPeers[id]; ok {
return peer, true
}
return nil, false
}
func (c *configuration)encode()([]byte, error) {
buf := &bytes.Buffer{}
if err := gob.NewEncoder(buf).Encode(c.allPeers()); err != nil {
return []byte{}, err
}
return buf.Bytes(), nil
}
// allPeers returns the union set of all peers in the configuration.
func (c *configuration)allPeers()peerMap {
c.RLock()
defer c.RUnlock()
union := peerMap{}
for id, peer := range c.cOldPeers {
union[id] = peer
}
for id, peer := range c.cNewPeers {
union[id] = peer
}
return union
}
// pass returns true if the votes represented by the votes map are sufficient
// to constitute a quorum. pass respects C_old,new requirements, which dictate
// that any request must receive a majority from both C_old and C_new to pass.
// 共識判斷
func (c *configuration)pass(votesmap[uint64]bool)bool {
c.RLock()
defer c.RUnlock()
// Count the votes
cOldHave, cOldRequired := 0, c.cOldPeers.quorum()
for id := range c.cOldPeers {
if votes[id] {
cOldHave++
}
if cOldHave >= cOldRequired {
break
}
}
// If we've already failed, we can stop here
if cOldHave < cOldRequired {
return false
}
// C_old passes: if we're in C_old, we pass
if c.state == cOld {
return true
}
// Not in C_old, so make sure we have some peers in C_new
if len(c.cNewPeers) <= 0 {
panic(fmt.Sprintf("configuration state '%s', but no C_new peers", c.state))
}
// Since we're in C_old,new, we need to also pass C_new to pass overall.
// It's important that we range through C_new and check our votes map, and
// not the other way around: if a server casts a vote but doesn't exist in
// a particular configuration, that vote should not be counted.
cNewHave, cNewRequired := 0, c.cNewPeers.quorum()
for id := range c.cNewPeers {
if votes[id] {
cNewHave++
}
if cNewHave >= cNewRequired {
break
}
}
return cNewHave >= cNewRequired
}
// 配置變更準備, prepare-change
func (c *configuration)changeTo(pm peerMap)error {
c.Lock()
defer c.Unlock()
if c.state != cOld {
return errConfigurationAlreadyChanging
}
if len(c.cNewPeers) > 0 {
panic(fmt.Sprintf("configuration ChangeTo in state '%s', but have C_new peers already", c.state))
}
c.cNewPeers = pm
c.state = cOldNew
return nil
}
// 提交變更邏輯
func (c *configuration)changeCommitted() {
c.Lock()
defer c.Unlock()
if c.state != cOldNew {
panic("configuration ChangeCommitted, but not in C_old,new")
}
if len(c.cNewPeers) <= 0 {
panic("configuration ChangeCommitted, but C_new peers are empty")
}
c.cOldPeers = c.cNewPeers
c.cNewPeers = peerMap{}
c.state = cOld
}
// 中斷變更
func (c *configuration)changeAborted() {
c.Lock()
defer c.Unlock()
if c.state != cOldNew {
panic("configuration ChangeAborted, but not in C_old,new")
}
c.cNewPeers = peerMap{}
c.state = cOld
}
Demo
package main
import (
"bytes"
"fmt"
"hash/fnv"
"net/http"
"net/url"
"time"
"github.com/peterbourgon/raft"
)
func main() {
a := func(idxuint64, cmd []byte)[]byte {
fmt.Printf("%d, apply function: %s\n", idx, cmd)
return cmd
}
mustParseURL := func(rawURLstring)*url.URL {
u, _ := url.Parse(rawURL)
u.Path = ""
return u
}
mustNewHTTPPeer := func(u *url.URL)raft.Peer {
p, err := raft.NewHTTPPeer(u)
if err != nil {
panic(err)
}
return p
}
peersAddr := []string{
"127.0.0.1:7090",
"127.0.0.1:7091",
"127.0.0.1:7092",
"127.0.0.1:7093",
"127.0.0.1:7094"}
var ss []*raft.Server
for _, addr := range peersAddr {
hash := fnv.New64()
hash.Write([]byte(addr))
id := hash.Sum64()
hash.Reset()
s := raft.NewServer(id, &bytes.Buffer{}, a)
mux := http.NewServeMux()
raft.HTTPTransport(mux, s)
go func(addrstring) {
if err := http.ListenAndServe(addr, mux); err != nil {
panic(err)
}
}(addr)
ss = append(ss, s)
}
time.Sleep(time.Second)
for _, s := range ss {
s.SetConfiguration(
mustNewHTTPPeer(mustParseURL("http://127.0.0.1:7090")),
mustNewHTTPPeer(mustParseURL("http://127.0.0.1:7091")),
mustNewHTTPPeer(mustParseURL("http://127.0.0.1:7092")),
mustNewHTTPPeer(mustParseURL("http://127.0.0.1:7093")),
mustNewHTTPPeer(mustParseURL("http://127.0.0.1:7094")),
)
s.Start()
}
for {
cmd := []byte(time.Now().String())
cmdChan := make(chan []byte)
go ss[0].Command(cmd, cmdChan)
<-cmdChan
time.Sleep(time.Millisecond * 500)
}
time.Sleep(time.Hour)
}
Run
? go run raft-server.go 2>/dev/null
1, apply function: 2017-09-11 11:41:13.668460404 +0800 CST
1, apply function: 2017-09-11 11:41:13.668460404 +0800 CST
1, apply function: 2017-09-11 11:41:13.668460404 +0800 CST
1, apply function: 2017-09-11 11:41:13.668460404 +0800 CST
1, apply function: 2017-09-11 11:41:13.668460404 +0800 CST
2, apply function: 2017-09-11 11:41:14.169165702 +0800 CST
2, apply function: 2017-09-11 11:41:14.169165702 +0800 CST
2, apply function: 2017-09-11 11:41:14.169165702 +0800 CST
2, apply function: 2017-09-11 11:41:14.169165702 +0800 CST
2, apply function: 2017-09-11 11:41:14.169165702 +0800 CST
3, apply function: 2017-09-11 11:41:14.670873193 +0800 CST
3, apply function: 2017-09-11 11:41:14.670873193 +0800 CST
3, apply function: 2017-09-11 11:41:14.670873193 +0800 CST
3, apply function: 2017-09-11 11:41:14.670873193 +0800 CST
3, apply function: 2017-09-11 11:41:14.670873193 +0800 CST
4, apply function: 2017-09-11 11:41:15.171741805 +0800 CST
4, apply function: 2017-09-11 11:41:15.171741805 +0800 CST
4, apply function: 2017-09-11 11:41:15.171741805 +0800 CST
4, apply function: 2017-09-11 11:41:15.171741805 +0800 CST
4, apply function: 2017-09-11 11:41:15.171741805 +0800 CST
5, apply function: 2017-09-11 11:41:15.673498401 +0800 CST
5, apply function: 2017-09-11 11:41:15.673498401 +0800 CST
5, apply function: 2017-09-11 11:41:15.673498401 +0800 CST
5, apply function: 2017-09-11 11:41:15.673498401 +0800 CST
5, apply function: 2017-09-11 11:41:15.673498401 +0800 CST
6, apply function: 2017-09-11 11:41:16.175658603 +0800 CST
6, apply function: 2017-09-11 11:41:16.175658603 +0800 CST
6, apply function: 2017-09-11 11:41:16.175658603 +0800 CST
6, apply function: 2017-09-11 11:41:16.175658603 +0800 CST
6, apply function: 2017-09-11 11:41:16.175658603 +0800 CST
7, apply function: 2017-09-11 11:41:16.677758823 +0800 CST
來自:http://laohanlinux.github.io/2017/09/11/raft源碼分析/
本文由用戶 hjc987 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!