raft源碼分析

hjc987 7年前發布 | 33K 次閱讀 Raft 源碼分析 Google Go/Golang開發

這篇文章主要是從源碼的級別來看 Raft 算法的實現。在網上找到了一個簡化版: 源碼 .

一個 Server 結構代表 Raft 網絡中的一個 節點 。節點會創建一個 Server ,并且通過 端(peers) 接口的方式暴露給其他節點。

傳輸層采用 http 包裝, 端對端 通信通過 rest http 方式。

|http transport| ---> |peers| ---> |server|

項目簡介

節點的增加和刪除

支持動態增刪節點,采用一個簡單的 共識 算法(節點更新時,接受配置更新的節點需要超過1/2,即新集群要大于舊集群)。

roadmap

  • leader選舉
  • 日志復制
  • 單元測試
  • http 傳輸層
  • 配置變更

除此之外,還有一些是未完成的

  • net/rpc 傳輸層或者其他類型的傳輸層
  • 日志壓縮
  • 快照安裝以及恢復
  • 完整的 demo 應用
  • 一些比較復雜的測試用例
    具體細節,看下面的代碼分析。

    源碼分析

    源碼目錄結構

    ├── JOINT-CONSENSUS.md
    ├── LICENSE
    ├── README.md
    ├── configuration.go // 配置
    ├── example_test.go // demo
    ├── log.go // 日志
    ├── log_test.go // 日志測試模塊
    ├── peers.go // 端
    ├── peers_test.go // 端模塊
    ├── rpc.go // rpc 對象模塊
    ├── server.go //  server模塊
    ├── server_internals_test.go // server內部測試模塊
    ├── server_test.go //  server測試模塊
    ├── transport.go // 傳輸層
    └── transport_test.go // 傳輸層模塊
    

主要的數據結構

rpc.go

// 日志追加
type appendEntriesTuple struct {
        // 日志追加請求
        Request  appendEntries 
        // 應答通道
        Response chan appendEntriesResponse 
}
// 投票選舉
type requestVoteTuple struct {
        // 選舉內容
        Request  requestVote 
        // 選舉結構應答
        Response chan requestVoteResponse
}

// appendEntries represents an appendEntries RPC.
// 日志追加-實體
type appendEntries struct {
        // 任期號
        Term         uint64     `json:"term"` 
        // leader 標識
        LeaderID     uint64     `json:"leader_id"` 
        // 前一個日志索引
        PrevLogIndex uint64     `json:"prev_log_index"` 
        // 前一個日志任期號
        PrevLogTerm  uint64     `json:"prev_log_term"` 
        // 要追加的實體數組-支持批量追加
        Entries      []logEntry `json:"entries"` 
        // 已經committed的縮影
        CommitIndex  uint64     `json:"commit_index"` 
}

// appendEntriesResponse represents the response to an appendEntries RPC.
// 日志追加應答
type appendEntriesResponse struct {
        // 應答節點的任期號
        Term    uint64 `json:"term"` 
        // 是否追加成功
        Success bool   `json:"success"` 
        // 失敗的原因
        reason  string 
}

// requestVote represents a requestVote RPC.
// 投票請求實體
type requestVote struct {
         // 發起者的任期號
        Term         uint64 `json:"term"`
        // 發起者的id
        CandidateID  uint64 `json:"candidate_id"`
        // 發起者的最新條目
        LastLogIndex uint64 `json:"last_log_index"`
        // 發起者的最新任期號
        LastLogTerm  uint64 `json:"last_log_term"`
}

// requestVoteResponse represents the response to a requestVote RPC.
// 投票應答
type requestVoteResponse struct {
        // 應答者任期號
        Term        uint64 `json:"term"`
        // 應答結果,true贊同,false反對
        VoteGranted bool   `json:"vote_granted"`
        // 反對原因
        reason      string
}

log.go

var (
        // 任期號太小
        errTermTooSmall    = errors.New("term too small")
        // 日志索引太小
        errIndexTooSmall   = errors.New("index too small")
        // 日志縮影太大
        errIndexTooBig     = errors.New("commit index too big")
        // 日志條目內容已損壞
        errInvalidChecksum = errors.New("invalid checksum")
       // 無效的命令
        errNoCommand       = errors.New("no command")
        // 錯誤的日志索引
        errBadIndex        = errors.New("bad index")
        // 錯誤任期號
        errBadTerm         = errors.New("bad term")
)
// 日志結構
type raftLog struct {
        // 日志讀寫鎖
        sync.RWMutex
        // 日志存儲接口
        store     io.Writer
        // 日志鏡像,現在存儲于內存
        entries   []logEntry
        // 下一條日志commit索引
        commitPos int
        // "操作"的回調函數,這個函數比較重要,可以"操作集合"鏡像,
        // 在快照時,只需要將"結果"快到存儲層即可
        apply     func(uint64, []byte)[]byte
}

func newRaftLog(store io.ReadWriter, applyfunc(uint64, []byte) []byte) *raftLog {
        l := &raftLog{
            store:     store,
            entries:   []logEntry{},
            commitPos: -1, // no commits to begin with
            apply:     apply,
        }
        l.recover(store)
        return l
}

// recover reads from the log's store, to populate the log with log entries
// from persistent storage. It should be called once, at log instantiation.
// 日志恢復,當服務重啟時,重建日志條目(一般重建都是居于于快照和日志的,可是這里沒有實現快照,所以從日志中重建即可)
// 1、這里的日志時commited之后的日志,所以重建時,commitPos也會更新
// 2、重建日志條目,會調用apply函數,對日志進行處理,這個函數相當于"狀態機"功能;如果有快照(相當于Redis 的RDB),先將安裝快照,再恢復日志(相當于Redis 的aof)
func (l *raftLog)recover(r io.Reader)error {
        for {
            var entry logEntry
            switch err := entry.decode(r); err {
            case io.EOF:
                return nil // successful completion
            case nil:
                if err := l.appendEntry(entry); err != nil {
                    return err
                }
                l.commitPos++
                l.apply(entry.Index, entry.Command)
            default:
                return err // unsuccessful completion
            }
        }
}

// entriesAfter returns a slice of log entries after (i.e. not including) the
// passed index, and the term of the log entry specified by index, as a
// convenience to the caller. (This function is only used by a leader attempting
// to flush log entries to its followers.)
//
// This function is called to populate an AppendEntries RPC. That implies they
// are destined for a follower, which implies the application of the commands
// should have the response thrown away, which implies we shouldn't pass a
// commandResponse channel (see: commitTo implementation). In the normal case,
// the raftLogEntries we return here will get serialized as they pass thru their
// transport, and lose their commandResponse channel anyway. But in the case of
// a LocalPeer (or equivalent) this doesn't happen. So, we must make sure to
// proactively strip commandResponse channels.
// 檢索index之后的日志條目
func (l *raftLog)entriesAfter(indexuint64)([]logEntry,uint64) {
        l.RLock()
        defer l.RUnlock()

        // 1.檢索出index對應term以及在實體集合entries中的位置Pos
        pos := 0
        lastTerm := uint64(0)
        for ; pos < len(l.entries); pos++ {
            if l.entries[pos].Index > index {
                break
            }
            lastTerm = l.entries[pos].Term
        }

        a := l.entries[pos:]
        if len(a) == 0 {
            return []logEntry{}, lastTerm
        }
        // 除去command Response channel
        return stripResponseChannels(a), lastTerm
}

func stripResponseChannels(a []logEntry)[]logEntry {
        stripped := make([]logEntry, len(a))
        for i, entry := range a {
            stripped[i] = logEntry{
                Index:           entry.Index,
                Term:            entry.Term,
                Command:         entry.Command,
                commandResponse: nil,
            }
        }
        return stripped
}

// contains returns true if a log entry with the given index and term exists in
// the log.
// 判斷是夠包含{term, index}條目
func (l *raftLog)contains(index, termuint64)bool {
        l.RLock()
        defer l.RUnlock()

        // It's not necessarily true that l.entries[i] has index == i.
        for _, entry := range l.entries {
            if entry.Index == index && entry.Term == term {
                return true
            }
            if entry.Index > index || entry.Term > term {
                break
            }
        }
        return false
}

// 判斷{term, index}是否為最新的日志條目,如果是,則將則將在其之后的日志清理掉,
// 這個條目應該在[commit_index, last_index]范圍內
func (l *raftLog)ensureLastIs(index, termuint64)error {
        l.Lock()
        defer l.Unlock()

        // Taken loosely from benbjohnson's impl

        if index < l.getCommitIndexWithLock() {
            return errIndexTooSmall
        }

        if index > l.lastIndexWithLock() {
            return errIndexTooBig
        }

        // It's possible that the passed index is 0. It means the leader has come to
        // decide we need a complete log rebuild. Of course, that's only valid if we
        // haven't committed anything, so this check comes after that one.
        // 全部重建,前提是沒有commited過任何的條目
        if index == 0 {
            for pos := 0; pos < len(l.entries); pos++ {
                if l.entries[pos].commandResponse != nil {
                    close(l.entries[pos].commandResponse)
                    l.entries[pos].commandResponse = nil
                }
                if l.entries[pos].committed != nil {
                    l.entries[pos].committed <- false
                    close(l.entries[pos].committed)
                    l.entries[pos].committed = nil
                }
            }
            l.entries = []logEntry{}
            return nil
        }

        // Normal case: find the position of the matching log entry.
        pos := 0
        for ; pos < len(l.entries); pos++ {
            if l.entries[pos].Index < index {
                continue // didn't find it yet
            }
            if l.entries[pos].Index > index {
                return errBadIndex // somehow went past it
            }
            if l.entries[pos].Index != index {
                panic("not <, not >, but somehow !=")
            }
            if l.entries[pos].Term != term {
                return errBadTerm
            }
            break // good
        }

        // Sanity check.
        // ? 怎么可能出現這種情況?
        if pos < l.commitPos {
            panic("index >= commitIndex, but pos < commitPos")
        }

        // `pos` is the position of log entry matching index and term.
        // We want to truncate everything after that.
        // 應為{term, index}是最新的了,所以將在其之后的所有條目給cut掉
        truncateFrom := pos + 1
        if truncateFrom >= len(l.entries) {
            return nil // nothing to truncate
        }

        // If we blow away log entries that haven't yet sent responses to clients,
        // signal the clients to stop waiting, by closing the channel without a
        // response value.
        for pos = truncateFrom; pos < len(l.entries); pos++ {
            if l.entries[pos].commandResponse != nil {
                close(l.entries[pos].commandResponse)
                l.entries[pos].commandResponse = nil
            }
            if l.entries[pos].committed != nil {
                l.entries[pos].committed <- false
                close(l.entries[pos].committed)
                l.entries[pos].committed = nil
            }
        }

        // Truncate the log.
        l.entries = l.entries[:truncateFrom]

        // Done.
        return nil
}

// getCommitIndex returns the commit index of the log. That is, the index of the
// last log entry which can be considered committed.
// 獲取最新的commited日志條目
func (l *raftLog)getCommitIndex()uint64 {
        l.RLock()
        defer l.RUnlock()
        return l.getCommitIndexWithLock()
}

// 獲取最新的日志條目
func (l *raftLog)getCommitIndexWithLock()uint64 {
        if l.commitPos < 0 {
            return 0
        }
        if l.commitPos >= len(l.entries) {
            panic(fmt.Sprintf("commitPos %d > len(l.entries) %d; bad bookkeeping in raftLog", l.commitPos, len(l.entries)))
        }
        return l.entries[l.commitPos].Index
}

// lastIndex returns the index of the most recent log entry.
func (l *raftLog)lastIndex()uint64 {
        l.RLock()
        defer l.RUnlock()
        return l.lastIndexWithLock()
}

func (l *raftLog)lastIndexWithLock()uint64 {
        if len(l.entries) <= 0 {
            return 0
        }
        return l.entries[len(l.entries)-1].Index
}

// lastTerm returns the term of the most recent log entry.
func (l *raftLog)lastTerm()uint64 {
        l.RLock()
        defer l.RUnlock()
        return l.lastTermWithLock()
}

func (l *raftLog)lastTermWithLock()uint64 {
        if len(l.entries) <= 0 {
            return 0
        }
        return l.entries[len(l.entries)-1].Term
}

// appendEntry appends the passed log entry to the log. It will return an error
// if the entry's term is smaller than the log's most recent term, or if the
// entry's index is too small relative to the log's most recent entry.
// 追加日志,注意此時還沒有commit該條目
func (l *raftLog)appendEntry(entry logEntry)error {
        l.Lock()
        defer l.Unlock()
        // 判定{entry.term, entry.index} > {last_term, last_index}
        if len(l.entries) > 0 {
            lastTerm := l.lastTermWithLock()
            if entry.Term < lastTerm {
                return errTermTooSmall
            }
            lastIndex := l.lastIndexWithLock()
            if entry.Term == lastTerm && entry.Index <= lastIndex {
                return errIndexTooSmall
            }
        }

        l.entries = append(l.entries, entry)
        return nil
}

// commitTo commits all log entries up to and including the passed commitIndex.
// Commit means: synchronize the log entry to persistent storage, and call the
// state machine apply function for the log entry's command.
// 注意:
// 1、commit是一個后端任務,再此并沒有"1/2"確認的概念(實際上是不是這樣呢,這得去參考raft的論文了)
// 2、apply函數是在commit過程中調用,而不是在append的時候調用
// 3、apply相當于狀態機函數,一般用戶會將這些操作結果保存起來,用于快照

// 比如,想實現一個kv存儲,那么用戶只要將kv相關的邏輯植入這個函數即可

// committed <= commitIndex <= last_index
func (l *raftLog)commitTo(commitIndexuint64)error {
        if commitIndex == 0 {
            panic("commitTo(0)")
        }

        l.Lock()
        defer l.Unlock()

        // Reject old commit indexes
        if commitIndex < l.getCommitIndexWithLock() {
            return errIndexTooSmall
        }

        // Reject new commit indexes
        if commitIndex > l.lastIndexWithLock() {
            return errIndexTooBig
        }

        // If we've already committed to the commitIndex, great!
        if commitIndex == l.getCommitIndexWithLock() {
            return nil
        }

        // We should start committing at precisely the last commitPos + 1
        pos := l.commitPos + 1
        if pos < 0 {
            panic("pending commit pos < 0")
        }

        // Commit entries between our existing commit index and the passed index.
        // Remember to include the passed index.
        for {
            // Sanity checks. TODO replace with plain `for` when this is stable.
            if pos >= len(l.entries) {
                panic(fmt.Sprintf("commitTo pos=%d advanced past all log entries (%d)", pos, len(l.entries)))
            }
            if l.entries[pos].Index > commitIndex {
                panic("commitTo advanced past the desired commitIndex")
            }

            // Encode the entry to persistent storage.
            if err := l.entries[pos].encode(l.store); err != nil {
                return err
            }

            // Forward non-configuration commands to the state machine.
            // Send the responses to the waiting client, if applicable.
            // 如果不是配置類型的Log,則調用apply function
            // 配置類型的Log,在其他地方處理
            if !l.entries[pos].isConfiguration {
                resp := l.apply(l.entries[pos].Index, l.entries[pos].Command)
                if l.entries[pos].commandResponse != nil {
                    select {
                    case l.entries[pos].commandResponse <- resp:
                        break
                    // 問什么選取這個時間???
                    case <-time.After(maximumElectionTimeout()): // << ElectionInterval
                        panic("unco?perative command response receiver")
                    }
                    close(l.entries[pos].commandResponse)
                    l.entries[pos].commandResponse = nil
                }
            }

            // Signal the entry has been committed, if applicable.
            if l.entries[pos].committed != nil {
                l.entries[pos].committed <- true
                close(l.entries[pos].committed)
                l.entries[pos].committed = nil
            }

            // Mark our commit position cursor.
            l.commitPos = pos

            // If that was the last one, we're done.
            if l.entries[pos].Index == commitIndex {
                break
            }
            if l.entries[pos].Index > commitIndex {
                panic(fmt.Sprintf(
                    "current entry Index %d is beyond our desired commitIndex %d",
                    l.entries[pos].Index,
                    commitIndex,
                ))
            }

            // Otherwise, advance!
            pos++
        }

        // Done.
        return nil
}

// logEntry is the atomic unit being managed by the distributed log. A log entry
// always has an index (monotonically increasing), a term in which the Raft
// network leader first sees the entry, and a command. The command is what gets
// executed against the node state machine when the log entry is successfully
// replicated.
type logEntry struct {
        // 日志索引號
        Index           uint64        `json:"index"`
        // 任期號
        Term            uint64        `json:"term"` // when received by leader
        // 日志內容
        Command         []byte        `json:"command,omitempty"`
        // commited 通道
        committed       chan bool     `json:"-"`
        // 命令應答 通道
        commandResponse chan<- []byte `json:"-"` // only non-nil on receiver's log
        // 日志類型標示
        isConfiguration bool          `json:"-"` // for configuration change entries
}

// encode serializes the log entry to the passed io.Writer.
//
// Entries are serialized in a simple binary format:
//
// ---------------------------------------------
// | uint32 | uint64 | uint64 | uint32 | []byte |
// ---------------------------------------------
// | CRC | TERM | INDEX | SIZE | COMMAND |
// ---------------------------------------------
//

// 序列化,大端
func (e *logEntry)encode(w io.Writer)error {
        if len(e.Command) <= 0 {
            return errNoCommand
        }
        if e.Index <= 0 {
            return errBadIndex
        }
        if e.Term <= 0 {
            return errBadTerm
        }

        commandSize := len(e.Command)
        buf := make([]byte, 24+commandSize)

        binary.LittleEndian.PutUint64(buf[4:12], e.Term)
        binary.LittleEndian.PutUint64(buf[12:20], e.Index)
        binary.LittleEndian.PutUint32(buf[20:24], uint32(commandSize))

        copy(buf[24:], e.Command)

        binary.LittleEndian.PutUint32(
            buf[0:4],
            crc32.ChecksumIEEE(buf[4:]),
        )

        _, err := w.Write(buf)
        return err
}

// 反序列化
// decode deserializes one log entry from the passed io.Reader.
func (e *logEntry)decode(r io.Reader)error {
        header := make([]byte, 24)

        if _, err := r.Read(header); err != nil {
            return err
        }

        command := make([]byte, binary.LittleEndian.Uint32(header[20:24]))

        if _, err := r.Read(command); err != nil {
            return err
        }

        crc := binary.LittleEndian.Uint32(header[:4])

        check := crc32.NewIEEE()
        check.Write(header[4:])
        check.Write(command)

        if crc != check.Sum32() {
            return errInvalidChecksum
        }

        e.Term = binary.LittleEndian.Uint64(header[4:12])
        e.Index = binary.LittleEndian.Uint64(header[12:20])
        e.Command = command

        return nil
}

Peers.go

var (
    errTimeout = errors.New("timeout")
)
// peers為節點的一個抽象,對外提供了一些訪問接口,
// 需要注意的地方是peers的序列化
type Peer interface {
    // 返回server標示
    id() uint64
    // 日志追加接口
    callAppendEntries(appendEntries) appendEntriesResponse
    // 投票選舉接口
    callRequestVote(requestVote) requestVoteResponse
    // 命令調用
    callCommand([]byte, chan<- []byte) error
    // 集群配置變化接口
    callSetConfiguration(...Peer) error
}

// localPeer is the simplest kind of peer, mapped to a server in the
// same process-space. Useful for testing and demonstration; not so
// useful for networks of independent processes.
// 本地local peers,用于測試,不用經過網絡
type localPeer struct {
    server *Server
}

func newLocalPeer(server *Server)*localPeer { return &localPeer{server} }

func (p *localPeer)id()uint64 { return p.server.id }

// 追加日志
func (p *localPeer)callAppendEntries(ae appendEntries)appendEntriesResponse {
    return p.server.appendEntries(ae)
}

// 投票選舉
func (p *localPeer)callRequestVote(rv requestVote)requestVoteResponse {
    return p.server.requestVote(rv)
}

// 命令
// 實際調用為Leader
func (p *localPeer)callCommand(cmd []byte, responsechan<- []byte)error {
    return p.server.Command(cmd, response)
}

// 設置配置
func (p *localPeer)callSetConfiguration(peers ...Peer)error {
    return p.server.SetConfiguration(peers...)
}

// requestVoteTimeout issues the requestVote to the given peer.
// If no response is received before timeout, an error is returned.
// 投票
func requestVoteTimeout(p Peer, rv requestVote, timeout time.Duration)(requestVoteResponse, error) {
    c := make(chan requestVoteResponse, 1)
    go func() { c <- p.callRequestVote(rv) }()

    select {
    case resp := <-c:
        return resp, nil
    case <-time.After(timeout):
        return requestVoteResponse{}, errTimeout
    }
}

// peerMap is a collection of Peer interfaces. It provides some convenience
// functions for actions that should apply to multiple Peers.
type peerMap map[uint64]Peer

// makePeerMap constructs a peerMap from a list of peers.
func makePeerMap(peers ...Peer)peerMap {
    pm := peerMap{}
    for _, peer := range peers {
        pm[peer.id()] = peer
    }
    return pm
}

// explodePeerMap converts a peerMap into a slice of peers.
func explodePeerMap(pm peerMap)[]Peer {
    a := []Peer{}
    for _, peer := range pm {
        a = append(a, peer)
    }
    return a
}

func (pm peerMap)except(iduint64)peerMap {
    except := peerMap{}
    for id0, peer := range pm {
        if id0 == id {
            continue
        }
        except[id0] = peer
    }
    return except
}

func (pm peerMap)count()int { return len(pm) }

// 法定人數
func (pm peerMap)quorum()int {
    switch n := len(pm); n {
    case 0, 1:
        return 1
    default:
        return (n / 2) + 1
    }
}

// requestVotes sends the passed requestVote RPC to every peer in Peers. It
// forwards responses along the returned requestVoteResponse channel. It makes
// the RPCs with a timeout of BroadcastInterval * 2 (chosen arbitrarily). Peers
// that don't respond within the timeout are retried forever. The retry loop
// stops only when all peers have responded, or a Cancel signal is sent via the
// returned canceler.
func (pm peerMap)requestVotes(r requestVote)(chanvoteResponseTuple, canceler) {
    // "[A server entering the candidate stage] issues requestVote RPCs in
    // parallel to each of the other servers in the cluster. If the candidate
    // receives no response for an RPC, it reissues the RPC repeatedly until a
    // response arrives or the election concludes."

    // construct the channels we'll return
    abortChan := make(chan struct{})
    tupleChan := make(chan voteResponseTuple)

    go func() {
        // We loop until all Peers have given us a response.
        // Track which Peers have responded.
        respondedAlready := peerMap{} // none yet

        for {
            notYetResponded := disjoint(pm, respondedAlready)
            if len(notYetResponded) <= 0 {
                return // done
            }

            // scatter
            tupleChan0 := make(chan voteResponseTuple, len(notYetResponded))
            for id, peer := range notYetResponded {
                go func(iduint64, peer Peer) {
                    resp, err := requestVoteTimeout(peer, r, 2*maximumElectionTimeout())
                    tupleChan0 <- voteResponseTuple{id, resp, err}
                }(id, peer)
            }

            // gather
            for i := 0; i < cap(tupleChan0); i++ {
                select {
                case t := <-tupleChan0:
                    if t.err != nil {
                        continue // will need to retry
                    }
                    respondedAlready[t.id] = nil // set membership semantics
                    tupleChan <- t

                case <-abortChan:
                    return // give up
                }
            }
        }
    }()

    return tupleChan, cancel(abortChan)
}

// 選舉應答
type voteResponseTuple struct {
    id       uint64
    response requestVoteResponse
    err      error
}

type canceler interface {
    Cancel()
}

type cancel chan struct{}

func (c cancel)Cancel() { close(c) }

// 過濾peers
func disjoint(all, except peerMap)peerMap {
    d := peerMap{}
    for id, peer := range all {
        if _, ok := except[id]; ok {
            continue
        }
        d[id] = peer
    }
    return d
}

server.go

這是最重要的一個邏輯

幾點配置變更

// 角色分類 
const (
    follower  = "Follower"
    candidate = "Candidate"
    leader    = "Leader"
)

const (
    unknownLeader = 0
    noVote        = 0
)

// 選舉時間隨機范圍[MinimumElectionTimeoutMS, maximumElectionTimeoutMS]
var (
    MinimumElectionTimeoutMS int32 = 250

    maximumElectionTimeoutMS = 2 * MinimumElectionTimeoutMS
)

var (
    errNotLeader             = errors.New("not the leader")
    errUnknownLeader         = errors.New("unknown leader")
    errDeposed               = errors.New("deposed during replication")
    errAppendE#008000ntriesRejected = errors.New("appendEntries RPC rejected")
    errReplicationFailed     = errors.New("command replication failed (but will keep retrying)")
    errOutOfSync             = errors.New("out of sync")
    errAlreadyRunning        = errors.New("already running")
)

// 重置選舉時間
func resetElectionTimeoutMS(newMin, newMaxint)(int,int) {
    oldMin := atomic.LoadInt32(&MinimumElectionTimeoutMS)
    oldMax := atomic.LoadInt32(&maximumElectionTimeoutMS)
    atomic.StoreInt32(&MinimumElectionTimeoutMS, int32(newMin))
    atomic.StoreInt32(&maximumElectionTimeoutMS, int32(newMax))
    return int(oldMin), int(oldMax)
}

// minimumElectionTimeout returns the current minimum election timeout.
func minimumElectionTimeout()time.Duration {
    return time.Duration(MinimumElectionTimeoutMS) * time.Millisecond
}

// maximumElectionTimeout returns the current maximum election time.
func maximumElectionTimeout()time.Duration {
    return time.Duration(maximumElectionTimeoutMS) * time.Millisecond
}

// 選舉時間隨機函數
func electionTimeout()time.Duration {
    n := rand.Intn(int(maximumElectionTimeoutMS - MinimumElectionTimeoutMS))
    d := int(MinimumElectionTimeoutMS) + n
    return time.Duration(d) * time.Millisecond
}

// broadcastInterval returns the interval between heartbeats (AppendEntry RPCs)
// broadcast from the leader. It is the minimum election timeout / 10, as
// dictated by the spec: BroadcastInterval << ElectionTimeout << MTBF.
// 廣播時間,用于Leader發送心跳廣播,這個時間應小于選舉時間;否則,非Leader節點會產生選舉操作
func broadcastInterval()time.Duration {
    d := MinimumElectionTimeoutMS / 10
    return time.Duration(d) * time.Millisecond
}

// protectedString is just a string protected by a mutex.
type protectedString struct {
    sync.RWMutex
    value string
}

func (s *protectedString)Get()string {
    s.RLock()
    defer s.RUnlock()
    return s.value
}

func (s *protectedString)Set(valuestring) {
    s.Lock()
    defer s.Unlock()
    s.value = value
}

// protectedBool is just a bool protected by a mutex.
type protectedBool struct {
    sync.RWMutex
    value bool
}

func (s *protectedBool)Get()bool {
    s.RLock()
    defer s.RUnlock()
    return s.value
}

func (s *protectedBool)Set(valuebool) {
    s.Lock()
    defer s.Unlock()
    s.value = value
}

// Server is the agent that performs all of the Raft protocol logic.
// In a typical application, each running process that wants to be part of
// the distributed state machine will contain a server component.
type Server struct {
    id      uint64 // id of this server
    // 節點狀態
    state   *protectedString
    // 節點運行狀態
    running *protectedBool
    // Leader節點標示
    leader  uint64 
    // 當前節點任期號
    term    uint64 // "current term number, which increases monotonically"
    // 0表示,當前節點還有投出自己的票;非零表示節點已經投票了,值是獲票者的標示ID
    vote    uint64 // who we voted for this term, if applicable
    log     *raftLog
    config  *configuration

    // 追加日志信道
    appendEntriesChan chan appendEntriesTuple
    // 投票信道
    requestVoteChan   chan requestVoteTuple
    // 命令信道
    commandChan       chan commandTuple
    // 配置修改信道
    configurationChan chan configurationTuple

    // 選舉信道
    electionTick <-chan time.Time
    // 退出信道
    quit         chan chan struct{}
}

// 狀態機函數
// 該函數不可并發執行,否則就達不到一致性狀態機的效果(執行時間不要超過選舉時間)

// 正常來說,只有"共識"達成的時候,才會調用該函數,然后返回給客戶端
// 但是,在這里為了簡化實現,"共識“算法是放在后臺任務操作的,客戶端發送命令單Leader時,Leader馬上
// 應答客戶端,并沒有等”共識算法“的共識結果
type ApplyFunc func(commitIndexuint64, cmd []byte)[]byte

// 初始化節點
// 1. 構建日志 2.初始化為"follower"角色 3.leader為"unknown"
func NewServer(iduint64, store io.ReadWriter, a ApplyFunc) *Server {
    if id <= 0 {
        panic("server id must be > 0")
    }

    // 5.2 Leader election: "the latest term this server has seen is persisted,
    // and is initialized to 0 on first boot."
    log := newRaftLog(store, a)
    latestTerm := log.lastTerm()

    s := &Server{
        id:      id,
        state:   &protectedString{value: follower}, // "when servers start up they begin as followers"
        running: &protectedBool{value: false},
        leader:  unknownLeader, // unknown at startup
        log:     log,
        term:    latestTerm,
        config:  newConfiguration(peerMap{}),

        appendEntriesChan: make(chan appendEntriesTuple),
        requestVoteChan:   make(chan requestVoteTuple),
        commandChan:       make(chan commandTuple),
        configurationChan: make(chan configurationTuple),

        electionTick: nil,
        quit:         make(chan chan struct{}),
    }
    s.resetElectionTimeout()
    return s
}

type configurationTuple struct {
    Peers []Peer
    Err   chan error
}

// 設置配置
// 1. 服務啟動時,先設置配置
// 2. 集群變更時,設置配置
func (s *Server)SetConfiguration(peers ...Peer)error {
    // 節點剛啟動
    if !s.running.Get() {
        s.config.directSet(makePeerMap(peers...))
        return nil
    }

    err := make(chan error)
    // 節點已經啟動了
    s.configurationChan <- configurationTuple{peers, err}
    return <-err
}

// Start triggers the server to begin communicating with its peers.
func (s *Server)Start() {
    go s.loop()
}

// Stop terminates the server. Stopped servers should not be restarted.
func (s *Server)Stop() {
    q := make(chan struct{})
    s.quit <- q
    <-q
    s.logGeneric("server stopped")
}

// 命令元組
type commandTuple struct {
    // 命令內容
    Command         []byte
    // 命令信道
    CommandResponse chan<- []byte
    Err             chan error
}

// 命令接口
func (s *Server)Command(cmd []byte, responsechan<- []byte)error {
    err := make(chan error)
    s.commandChan <- commandTuple{cmd, response, err}
    return <-err
}

// 日志追加
func (s *Server)appendEntries(ae appendEntries)appendEntriesResponse {
    t := appendEntriesTuple{
        Request:  ae,
        Response: make(chan appendEntriesResponse),
    }
    s.appendEntriesChan <- t
    return <-t.Response
}

// 投票
func (s *Server)requestVote(rv requestVote)requestVoteResponse {
    t := requestVoteTuple{
        Request:  rv,
        Response: make(chan requestVoteResponse),
    }
    s.requestVoteChan <- t
    return <-t.Response
}

// times out,
// new election
// | .-----.
// | | |
// v times out, | v receives votes from
// +----------+ starts election +-----------+ majority of servers +--------+
// | Follower |------------------>| Candidate |---------------------->| Leader |
// +----------+ +-----------+ +--------+
// ^ ^ | |
// | | discovers current leader | |
// | | or new term | |
// | '------------------------------' |
// | |
// | discovers server with higher term |
// '------------------------------------------------------------------'
//
//

func (s *Server)loop() {
    s.running.Set(true)
    for s.running.Get() {
        switch state := s.state.Get(); state {
        case follower:
            s.followerSelect()
        case candidate:
            s.candidateSelect()
        case leader:
            s.leaderSelect()
        default:
            panic(fmt.Sprintf("unknown Server State '%s'", state))
        }
    }
}

func (s *Server)resetElectionTimeout() {
    s.electionTick = time.NewTimer(electionTimeout()).C
}

func (s *Server)logGeneric(formatstring, args ...interface{}) {
    prefix := fmt.Sprintf("id=%d term=%d state=%s: ", s.id, s.term, s.state.Get())
    log.Printf(prefix+format, args...)
}

func (s *Server)logAppendEntriesResponse(req appendEntries, resp appendEntriesResponse, stepDownbool) {
    s.logGeneric(
        "got appendEntries, sz=%d leader=%d prevIndex/Term=%d/%d commitIndex=%d: responded with success=%v (reason='%s') stepDown=%v",
        len(req.Entries),
        req.LeaderID,
        req.PrevLogIndex,
        req.PrevLogTerm,
        req.CommitIndex,
        resp.Success,
        resp.reason,
        stepDown,
    )
}

func (s *Server)logRequestVoteResponse(req requestVote, resp requestVoteResponse, stepDownbool) {
    s.logGeneric(
        "got RequestVote, candidate=%d: responded with granted=%v (reason='%s') stepDown=%v",
        req.CandidateID,
        resp.VoteGranted,
        resp.reason,
        stepDown,
    )
}

func (s *Server)handleQuit(qchan struct{}) {
    s.logGeneric("got quit signal")
    s.running.Set(false)
    close(q)
}

// 命令轉發
// 如果當前節點不是Leader節點,并且已存在Leader節點,則其會以"代理“的角色,將命令轉發至Leader節點
func (s *Server)forwardCommand(t commandTuple) {
    switch s.leader {
    case unknownLeader:
        s.logGeneric("got command, but don't know leader")
        t.Err <- errUnknownLeader

    case s.id: // I am the leader
        panic("impossible state in forwardCommand")

    default:
        leader, ok := s.config.get(s.leader)
        if !ok {
            panic("invalid state in peers")
        }
        s.logGeneric("got command, forwarding to leader (%d)", s.leader)
        // We're blocking our {follower,candidate}Select function in the
        // receive-command branch. If we continue to block while forwarding
        // the command, the leader won't be able to get a response from us!
        go func() { t.Err <- leader.callCommand(t.Command, t.CommandResponse) }()
    }
}

// 配置變更
// 轉發規則和命令轉發一樣
func (s *Server)forwardConfiguration(t configurationTuple) {
    switch s.leader {
    case unknownLeader:
        s.logGeneric("got configuration, but don't know leader")
        t.Err <- errUnknownLeader

    case s.id: // I am the leader
        panic("impossible state in forwardConfiguration")

    default:
        leader, ok := s.config.get(s.leader)
        if !ok {
            panic("invalid state in peers")
        }
        s.logGeneric("got configuration, forwarding to leader (%d)", s.leader)
        go func() { t.Err <- leader.callSetConfiguration(t.Peers...) }()
    }
}

// follower 節點邏輯
func (s *Server)followerSelect() {
    for {
        select {
        case q := <-s.quit:
            s.handleQuit(q)
            return
        // 命令轉發
        case t := <-s.commandChan:
            s.forwardCommand(t)
        // 集群變更轉發
        case t := <-s.configurationChan:
            s.forwardConfiguration(t)
        // Leader選舉
        case <-s.electionTick:
            // 5.2 Leader election: "A follower increments its current term and
            // transitions to candidate state."
            if s.config == nil {
                s.logGeneric("election timeout, but no configuration: ignoring")
                s.resetElectionTimeout()
                continue
            }
            s.logGeneric("election timeout, becoming candidate")
            // 提高自己的任期號
            s.term++
            // 投票置為空
            s.vote = noVote
            // Leader
            s.leader = unknownLeader
            // 設置節點角色為"候選人"
            s.state.Set(candidate)
            // 重置選舉時間,防止馬上再次出發選舉
            s.resetElectionTimeout()
            return
        // 日志追加(除了客戶端請求,leader的心跳也會出發這個行為)
        case t := <-s.appendEntriesChan:
            if s.leader == unknownLeader {
                s.leader = t.Request.LeaderID
                s.logGeneric("discovered Leader %d", s.leader)
            }
            // 處理日志最佳操作
            resp, stepDown := s.handleAppendEntries(t.Request)
            s.logAppendEntriesResponse(t.Request, resp, stepDown)
            t.Response <- resp
            // 如果節點已經脫離了當前的集群,需要跟新Leader地址
            if stepDown {
                // stepDown as a Follower means just to reset the leader
                if s.leader != unknownLeader {
                    s.logGeneric("abandoning old leader=%d", s.leader)
                }
                s.logGeneric("following new leader=%d", t.Request.LeaderID)
                s.leader = t.Request.LeaderID
            }
        // 選舉
        case t := <-s.requestVoteChan:
            // 選舉處理
            resp, stepDown := s.handleRequestVote(t.Request)
            s.logRequestVoteResponse(t.Request, resp, stepDown)
            t.Response <- resp
            // 如果落后于當前節點了,把當前的Leader修改為"unkownleader",等待訊據成功后,進行切換
            if stepDown {
                // stepDown as a Follower means just to reset the leader
                if s.leader != unknownLeader {
                    s.logGeneric("abandoning old leader=%d", s.leader)
                }
                s.logGeneric("new leader unknown")
                s.leader = unknownLeader
            }
        }
    }
}

// 候選狀態
func (s *Server)candidateSelect() {
    if s.leader != unknownLeader {
        panic("known leader when entering candidateSelect")
    }
    if s.vote != 0 {
        panic("existing vote when entering candidateSelect")
    }

    // "[A server entering the candidate stage] issues requestVote RPCs in
    // parallel to each of the other servers in the cluster. If the candidate
    // receives no response for an RPC, it reissues the RPC repeatedly until a
    // response arrives or the election concludes."
    // 發起選舉RPC
    requestVoteResponses, canceler := s.config.allPeers().except(s.id).requestVotes(requestVote{
        Term:         s.term,
        CandidateID:  s.id,
        LastLogIndex: s.log.lastIndex(),
        LastLogTerm:  s.log.lastTerm(),
    })
    defer canceler.Cancel()

    // Set up vote tallies (plus, vote for myself)
    votes := map[uint64]bool{s.id: true}
    s.vote = s.id
    s.logGeneric("term=%d election started (configuration state %s)", s.term, s.config.state)

    // 如果已經達到了選舉“共識”,則成功選舉
    if s.config.pass(votes) {
        s.logGeneric("I immediately won the election")
        s.leader = s.id
        s.state.Set(leader)
        s.vote = noVote
        return
    }

    // "A candidate continues in this state until one of three things happens:
    // (a) it wins the election, (b) another server establishes itself as
    // leader, or (c) a period of time goes by with no winner."
    for {
        select {
        case q := <-s.quit:
            s.handleQuit(q)
            return
        // 命令轉發
        case t := <-s.commandChan:
            s.forwardCommand(t)
        // 配置更新轉發,注意和Leader的不同
        case t := <-s.configurationChan:
            s.forwardConfiguration(t)
        // 收到選舉的應答
        case t := <-requestVoteResponses:
            s.logGeneric("got vote: id=%d term=%d granted=%v", t.id, t.response.Term, t.response.VoteGranted)
            // "A candidate wins the election if it receives votes from a
            // majority of servers in the full cluster for the same term."
            // 本節點落后于其他幾點
            if t.response.Term > s.term {
                s.logGeneric("got vote from future term (%d>%d); abandoning election", t.response.Term, s.term)
                s.leader = unknownLeader
                s.state.Set(follower)
                s.vote = noVote
                return // lose
            }
            // 收到了"落后"當前節點的應答,忽略掉它
            if t.response.Term < s.term {
                s.logGeneric("got vote from past term (%d<%d); ignoring", t.response.Term, s.term)
                break
            }

            // 收到贊同票
            if t.response.VoteGranted {
                s.logGeneric("%d voted for me", t.id)
                votes[t.id] = true
            }
            // "Once a candidate wins an election, it becomes leader."
            // “共識”達成
            if s.config.pass(votes) {
                s.logGeneric("I won the election")
                s.leader = s.id
                s.state.Set(leader)
                s.vote = noVote
                return // win
            }
          // 收到日志追加(在這里,心跳也當做日志追加的方式發送)
        case t := <-s.appendEntriesChan:
            // "While waiting for votes, a candidate may receive an
            // appendEntries RPC from another server claiming to be leader.
            // If the leader's term (included in its RPC) is at least as
            // large as the candidate's current term, then the candidate
            // recognizes the leader as legitimate and steps down, meaning
            // that it returns to follower state."
            // 處理日志
            resp, stepDown := s.handleAppendEntries(t.Request)
            s.logAppendEntriesResponse(t.Request, resp, stepDown)
            t.Response <- resp
            // candidate節點落后于Leader節點
            if stepDown {
                s.logGeneric("after an appendEntries, stepping down to Follower (leader=%d)", t.Request.LeaderID)
                s.leader = t.Request.LeaderID
                s.state.Set(follower)
                return // lose
            }

        // 雖然當前節點是candidate節點,但集群中此時可能存在多個candidate節點
        case t := <-s.requestVoteChan:
            // We can also be defeated by a more recent candidate
            resp, stepDown := s.handleRequestVote(t.Request)
            s.logRequestVoteResponse(t.Request, resp, stepDown)
            t.Response <- resp
            if stepDown {
                // 當前candidate節點落后于集群中已存在的candidate節點,將自己的角色變為follower,
                // 并且也會投贊同票
                s.logGeneric("after a requestVote, stepping down to Follower (leader unknown)")
                s.leader = unknownLeader
                s.state.Set(follower)
                return // lose
            }

        // 選舉
        case <-s.electionTick:
            // "The third possible outcome is that a candidate neither wins nor
            // loses the election: if many followers become candidates at the
            // same time, votes could be split so that no candidate obtains a
            // majority. When this happens, each candidate will start a new
            // election by incrementing its term and initiating another round of
            // requestVote RPCs."
            s.logGeneric("election ended with no winner; incrementing term and trying again")
            s.resetElectionTimeout()
            s.term++
            s.vote = noVote
            return // draw
        }
    }
}

// Leader 保存的Follower節點的所有最新同步條目
type nextIndex struct {
    sync.RWMutex
    m map[uint64]uint64 // followerId: nextIndex
}

func newNextIndex(pm peerMap, defaultNextIndexuint64)*nextIndex {
    ni := &nextIndex{
        m: map[uint64]uint64{},
    }
    for id := range pm {
        ni.m[id] = defaultNextIndex
    }
    return ni
}

// 找出已經同步Follower的最小日志
func (ni *nextIndex)bestIndex()uint64 {
    ni.RLock()
    defer ni.RUnlock()

    if len(ni.m) <= 0 {
        return 0
    }

    i := uint64(math.MaxUint64)
    for _, nextIndex := range ni.m {
        if nextIndex < i {
            i = nextIndex
        }
    }
    return i
}

// 返回節點(id)最新的同步日志
func (ni *nextIndex)prevLogIndex(iduint64)uint64 {
    ni.RLock()
    defer ni.RUnlock()
    if _, ok := ni.m[id]; !ok {
        panic(fmt.Sprintf("peer %d not found", id))
    }
    return ni.m[id]
}

// 自減節點(id)的最新同步日志,用于同步失敗時的回滾
func (ni *nextIndex)decrement(iduint64, prevuint64)(uint64, error) {
    ni.Lock()
    defer ni.Unlock()

    i, ok := ni.m[id]
    if !ok {
        panic(fmt.Sprintf("peer %d not found", id))
    }

    if i != prev {
        return i, errOutOfSync
    }

    if i > 0 {
        ni.m[id]--
    }
    return ni.m[id], nil
}

// 更新節點(id)的同步日志
func (ni *nextIndex)set(id, index, prevuint64)(uint64, error) {
    ni.Lock()
    defer ni.Unlock()

    i, ok := ni.m[id]
    if !ok {
        panic(fmt.Sprintf("peer %d not found", id))
    }
    if i != prev {
        return i, errOutOfSync
    }

    ni.m[id] = index
    return index, nil
}

// 心跳、復制命令都會用到該函數,flush是同步的,如果對端節點不可達,則阻塞
func (s *Server)flush(peer Peer, ni *nextIndex)error {
    peerID := peer.id()
    // Leader的任期號
    currentTerm := s.term
    // 節點(peer)的最新同步索引
    prevLogIndex := ni.prevLogIndex(peerID)
    // 檢索出peers節點落后于Leader幾點的日志條目,然后進行同步
    entries, prevLogTerm := s.log.entriesAfter(prevLogIndex)
    // 獲取Leader committed的最新索引
    commitIndex := s.log.getCommitIndex()
    s.logGeneric("flush to %d: term=%d leaderId=%d prevLogIndex/Term=%d/%d sz=%d commitIndex=%d", peerID, currentTerm, s.id, prevLogIndex, prevLogTerm, len(entries), commitIndex)

    // 日志追加RPC
    resp := peer.callAppendEntries(appendEntries{
        Term:         currentTerm,
        LeaderID:     s.id,
        PrevLogIndex: prevLogIndex,
        PrevLogTerm:  prevLogTerm,
        Entries:      entries,
        CommitIndex:  commitIndex,
    })

    if resp.Term > currentTerm {
        // 應答的節點比當前節點的任期號大,當前的Leader被罷免
        s.logGeneric("flush to %d: responseTerm=%d > currentTerm=%d: deposed", peerID, resp.Term, currentTerm)
        return errDeposed
    }


    if !resp.Success {
        // 應答失敗,可能是leader RPC等待超時,或者出現了網絡錯誤(包括腦裂),回滾
        newPrevLogIndex, err := ni.decrement(peerID, prevLogIndex)
        if err != nil {
            s.logGeneric("flush to %d: while decrementing prevLogIndex: %s", peerID, err)
            return err
        }
        s.logGeneric("flush to %d: rejected; prevLogIndex(%d) becomes %d", peerID, peerID, newPrevLogIndex)
        return errAppendEntriesRejected
    }

    if len(entries) > 0 {
        // 復制成功,更新同步狀態
        newPrevLogIndex, err := ni.set(peer.id(), entries[len(entries)-1].Index, prevLogIndex)
        if err != nil {
            s.logGeneric("flush to %d: while moving prevLogIndex forward: %s", peerID, err)
            return err
        }
        s.logGeneric("flush to %d: accepted; prevLogIndex(%d) becomes %d", peerID, peerID, newPrevLogIndex)
        return nil
    }

    s.logGeneric("flush to %d: accepted; prevLogIndex(%d) remains %d", peerID, peerID, ni.prevLogIndex(peerID))
    return nil
}

// Leader并發同步日志
func (s *Server)concurrentFlush(pm peerMap, ni *nextIndex, timeout time.Duration)(int,bool) {
    type tuple struct {
        id  uint64
        err error
    }
    responses := make(chan tuple, len(pm))
    for _, peer := range pm {
        go func(peer Peer) {
            errChan := make(chan error, 1)
            go func() { errChan <- s.flush(peer, ni) }()
            go func() { time.Sleep(timeout); errChan <- errTimeout }()
            responses <- tuple{peer.id(), <-errChan} // first responder wins
        }(peer)
    }

    successes, stepDown := 0, false
    for i := 0; i < cap(responses); i++ {
        switch t := <-responses; t.err {
        case nil:
            s.logGeneric("concurrentFlush: peer %d: OK (prevLogIndex(%d)=%d)", t.id, t.id, ni.prevLogIndex(t.id))
            successes++
        case errDeposed:
            // 當前的Leder節點落后于其他節點
            s.logGeneric("concurrentFlush: peer %d: deposed!", t.id)
            stepDown = true
        default:
            s.logGeneric("concurrentFlush: peer %d: %s (prevLogIndex(%d)=%d)", t.id, t.err, t.id, ni.prevLogIndex(t.id))
            // nothing to do but log and continue
        }
    }
    return successes, stepDown
}

// 作為Leader角色運行
func (s *Server)leaderSelect() {
    if s.leader != s.id {
        panic(fmt.Sprintf("leader (%d) not me (%d) when entering leaderSelect", s.leader, s.id))
    }
    if s.vote != 0 {
        panic(fmt.Sprintf("vote (%d) not zero when entering leaderSelect", s.leader))
    }

    // 5.3 Log replication: "The leader maintains a nextIndex for each follower,
    // which is the index of the next log entry the leader will send to that
    // follower. When a leader first comes to power it initializes all nextIndex
    // values to the index just after the last one in its log."
    //
    // I changed this from lastIndex+1 to simply lastIndex. Every initial
    // communication from leader to follower was being rejected and we were
    // doing the decrement. This was just annoying, except if you manage to
    // sneak in a command before the first heartbeat. Then, it will never get
    // properly replicated (it seemed).

    // Leader為每個Follower保存了最新的同步日志索引
    ni := newNextIndex(s.config.allPeers().except(s.id), s.log.lastIndex()) // +1)

    flush := make(chan struct{})
    heartbeat := time.NewTicker(broadcastInterval())
    defer heartbeat.Stop()
    go func() {
        // 發送心跳,除了檢測心跳外,還有防止Follower發送選舉
        for _ = range heartbeat.C {
            flush <- struct{}{}
        }
    }()

    for {
        select {
        case q := <-s.quit:
            s.handleQuit(q)
            return
        // 收到命令
        case t := <-s.commandChan:
            // Append the command to our (leader) log
            s.logGeneric("got command, appending")
            currentTerm := s.term
            entry := logEntry{
                Index:           s.log.lastIndex() + 1,
                Term:            currentTerm,
                Command:         t.Command,
                commandResponse: t.CommandResponse,
            }
            // 追加日志
            if err := s.log.appendEntry(entry); err != nil {
                t.Err <- err
                continue
            }
            s.logGeneric(
                "after append, commitIndex=%d lastIndex=%d lastTerm=%d",
                s.log.getCommitIndex(),
                s.log.lastIndex(),
                s.log.lastTerm(),
            )

            // Now that the entry is in the log, we can fall back to the
            // normal flushing mechanism to attempt to replicate the entry
            // and advance the commit index. We trigger a manual flush as a
            // convenience, so our caller might get a response a bit sooner.
            // 這里將日志同步放到了同步隊列就返回給客戶端了,正常來說,需要"共識"達成才返回給客戶端
            go func() { flush <- struct{}{} }()
            t.Err <- nil
        // 收到配置變更
        case t := <-s.configurationChan:
            // Attempt to change our local configuration
            if err := s.config.changeTo(makePeerMap(t.Peers...)); err != nil {
                t.Err <- err
                continue
            }

            // Serialize the local (C_old,new) configuration
            encodedConfiguration, err := s.config.encode()
            if err != nil {
                t.Err <- err
                continue
            }

            // We're gonna write+replicate that config via log mechanisms.
            // Prepare the on-commit callback.
            entry := logEntry{
                Index:           s.log.lastIndex() + 1,
                Term:            s.term,
                Command:         encodedConfiguration,
                isConfiguration: true,
                committed:       make(chan bool),
            }
            go func() {
                // 當日志被commited時,committed將被回調
                committed := <-entry.committed
                if !committed {
                    s.config.changeAborted()
                    return
                }
                // 日志被committed了,說明其他節點都應用了最新的配置,所以當前的節點配置也需要更新
                s.config.changeCommitted()
                if _, ok := s.config.allPeers()[s.id]; !ok {
                    // 當前節點已被新集群剔除
                    s.logGeneric("leader expelled; shutting down")
                    q := make(chan struct{})
                    s.quit <- q
                    // 節點已退出
                    <-q
                }
            }()
            // 日志追加
            if err := s.log.appendEntry(entry); err != nil {
                t.Err <- err
                continue
            }

        case <-flush:
            // 獲取需要同步的節點
            recipients := s.config.allPeers().except(s.id)

            // Special case: network of 1
            if len(recipients) <= 0 {
                ourLastIndex := s.log.lastIndex()
                if ourLastIndex > 0 {
                    if err := s.log.commitTo(ourLastIndex); err != nil {
                        s.logGeneric("commitTo(%d): %s", ourLastIndex, err)
                        continue
                    }
                    s.logGeneric("after commitTo(%d), commitIndex=%d", ourLastIndex, s.log.getCommitIndex())
                }
                continue
            }

            // Normal case: network of at-least-2
            // 并發同步日志
            successes, stepDown := s.concurrentFlush(recipients, ni, 2*broadcastInterval())
            if stepDown {
                // 節點已被卸任
                s.logGeneric("deposed during flush")
                s.state.Set(follower)
                s.leader = unknownLeader
                return
            }

            // Only when we know all followers accepted the flush can we
            // consider incrementing commitIndex and pushing out another
            // round of flushes.
            if successes == len(recipients) {
                // 最小被同步的Index
                peersBestIndex := ni.bestIndex()
                ourLastIndex := s.log.lastIndex()
                ourCommitIndex := s.log.getCommitIndex()
                if peersBestIndex > ourLastIndex {
                    // safety check: we've probably been deposed
                    s.logGeneric("peers' best index %d > our lastIndex %d", peersBestIndex, ourLastIndex)
                    s.logGeneric("this is crazy, I'm gonna become a follower")
                    s.leader = unknownLeader
                    s.vote = noVote
                    s.state.Set(follower)
                    return
                }
                if peersBestIndex > ourCommitIndex {
                    // committed Leader Index
                    if err := s.log.commitTo(peersBestIndex); err != nil {
                        s.logGeneric("commitTo(%d): %s", peersBestIndex, err)
                        // 比如某個Follower在同步Index時失敗了,
                        continue // oh well, next time?
                    }

                    if s.log.getCommitIndex() > ourCommitIndex {
                        // 繼續同步日志
                        s.logGeneric("after commitTo(%d), commitIndex=%d -- queueing another flush", peersBestIndex, s.log.getCommitIndex())
                        go func() { flush <- struct{}{} }()
                    }
                }
            }
        // 追加日志, 正常來說,Leader節點是不會受到該命令的,出現這種的可能是集群存在一個新的Leader節點,這命令就是該Leader發送過來的
        case t := <-s.appendEntriesChan:
            resp, stepDown := s.handleAppendEntries(t.Request)
            s.logAppendEntriesResponse(t.Request, resp, stepDown)
            t.Response <- resp
            if stepDown {
                s.logGeneric("after an appendEntries, deposed to Follower (leader=%d)", t.Request.LeaderID)
                s.leader = t.Request.LeaderID
                s.state.Set(follower)
                return // deposed
            }
        // 受到投票請求
        case t := <-s.requestVoteChan:
            resp, stepDown := s.handleRequestVote(t.Request)
            s.logRequestVoteResponse(t.Request, resp, stepDown)
            t.Response <- resp
            if stepDown {
                s.logGeneric("after a requestVote, deposed to Follower (leader unknown)")
                s.leader = unknownLeader
                s.state.Set(follower)
                return // deposed
            }
        }
    }
}

// handleRequestVote will modify s.term and s.vote, but nothing else.
// stepDown means you need to: s.leader=unknownLeader, s.state.Set(Follower).
// 處理投票
// 可能會修改s.term和s.vote 的值; stepDown意味著需要設置s.leader = unkownLeader, s.state.Set(Follower)
func (s *Server)handleRequestVote(rv requestVote)(requestVoteResponse,bool) {
    // Spec is ambiguous here; basing this (loosely!) on benbjohnson's impl

    // If the request is from an old term, reject
    if rv.Term < s.term {
        return requestVoteResponse{
            Term:        s.term,
            VoteGranted: false,
            reason:      fmt.Sprintf("Term %d < %d", rv.Term, s.term),
        }, false
    }

    // If the request is from a newer term, reset our state
    stepDown := false
    if rv.Term > s.term {
        // 本地節點落后于集群的其他節點,需要更新一下自己的任期號
        s.logGeneric("requestVote from newer term (%d): we defer", rv.Term)
        s.term = rv.Term
        s.vote = noVote
        s.leader = unknownLeader
        stepDown = true
    }

    // Special case: if we're the leader, and we haven't been deposed by a more
    // recent term, then we should always deny the vote
    if s.state.Get() == leader && !stepDown {
        // 如果本地節點是Leader,并且又不落后于req 節點,則投反對票
        return requestVoteResponse{
            Term:        s.term,
            VoteGranted: false,
            reason:      "already the leader",
        }, stepDown
    }

    // If we've already voted for someone else this term, reject
    // 如果已經投過票,則投失敗票
    if s.vote != 0 && s.vote != rv.CandidateID {
        if stepDown {
            panic("impossible state in handleRequestVote")
        }
        return requestVoteResponse{
            Term:        s.term,
            VoteGranted: false,
            reason:      fmt.Sprintf("already cast vote for %d", s.vote),
        }, stepDown
    }

    // If the candidate log isn't at least as recent as ours, reject
    if s.log.lastIndex() > rv.LastLogIndex || s.log.lastTerm() > rv.LastLogTerm {
        return requestVoteResponse{
            Term:        s.term,
            VoteGranted: false,
            reason: fmt.Sprintf(
                "our index/term %d/%d > %d/%d",
                s.log.lastIndex(),
                s.log.lastTerm(),
                rv.LastLogIndex,
                rv.LastLogTerm,
            ),
        }, stepDown
    }

    // We passed all the tests: cast vote in favor
    s.vote = rv.CandidateID
    s.resetElectionTimeout()
    return requestVoteResponse{
        Term:        s.term,
        VoteGranted: true,
    }, stepDown
}

// handleAppendEntries will modify s.term and s.vote, but nothing else.
// stepDown means you need to: s.leader=r.LeaderID, s.state.Set(Follower).
// 追加日志,需要注意的是,handleAppendEntries也會修改s.term和s.vote
// stepDown也會修改s.Leader, s,state
// 需要注意的是,本地節點的state不同時,其行為也是不用的
func (s *Server)handleAppendEntries(r appendEntries)(appendEntriesResponse,bool) {
    // Spec is ambiguous here; basing this on benbjohnson's impl

    // Maybe a nicer way to handle this is to define explicit handler functions
    // for each Server state. Then, we won't try to hide too much logic (i.e.
    // too many protocol rules) in one code path.

    // If the request is from an old term, reject
    if r.Term < s.term {
        return appendEntriesResponse{
            Term:    s.term,
            Success: false,
            reason:  fmt.Sprintf("Term %d < %d", r.Term, s.term),
        }, false
    }

    // If the request is from a newer term, reset our state
    stepDown := false
    if r.Term > s.term {
        s.term = r.Term
        s.vote = noVote
        stepDown = true
    }

    // Special case for candidates: "While waiting for votes, a candidate may
    // receive an appendEntries RPC from another server claiming to be leader.
    // If the leader’s term (included in its RPC) is at least as large as the
    // candidate’s current term, then the candidate recognizes the leader as
    // legitimate and steps down, meaning that it returns to follower state."
    if s.state.Get() == candidate && r.LeaderID != s.leader && r.Term >= s.term {
        s.term = r.Term
        s.vote = noVote
        stepDown = true
    }

    // In any case, reset our election timeout
    s.resetElectionTimeout()

    // Reject if log doesn't contain a matching previous entry
    // 如果{PreLogIndex, PreLogTerm} 不是最新的條目,則失敗
    // [{1, 2},{1, 3}, {1,4},{1,5},{1,6}] => {1,5} => [{1, 2},{1, 3}, {1,4},{1,5}]
    if err := s.log.ensureLastIs(r.PrevLogIndex, r.PrevLogTerm); err != nil {
        return appendEntriesResponse{
            Term:    s.term,
            Success: false,
            reason: fmt.Sprintf(
                "while ensuring last log entry had index=%d term=%d: error: %s",
                r.PrevLogIndex,
                r.PrevLogTerm,
                err,
            ),
        }, stepDown
    }

    // Process the entries
    for i, entry := range r.Entries {
        // Configuration changes requre special preprocessing
        var pm peerMap
        // 處理配置
        if entry.isConfiguration {
            commandBuf := bytes.NewBuffer(entry.Command)
            if err := gob.NewDecoder(commandBuf).Decode(±); err != nil {
                panic("gob decode of peers failed")
            }

            if s.state.Get() == leader {
                // TODO should we instead just ignore this entry?
                return appendEntriesResponse{
                    Term:    s.term,
                    Success: false,
                    reason: fmt.Sprintf(
                        "AppendEntry %d/%d failed (configuration): %s",
                        i+1,
                        len(r.Entries),
                        "Leader shouldn't receive configurations via appendEntries",
                    ),
                }, stepDown
            }

            // Expulsion recognition
            if _, ok := pm[s.id]; !ok {
                entry.committed = make(chan bool)
                go func() {
                    if <-entry.committed {
                        s.logGeneric("non-leader expelled; shutting down")
                        q := make(chan struct{})
                        s.quit <- q
                        <-q
                    }
                }()
            }
        }

        // Append entry to the log
        if err := s.log.appendEntry(entry); err != nil {
            return appendEntriesResponse{
                Term:    s.term,
                Success: false,
                reason: fmt.Sprintf(
                    "AppendEntry %d/%d failed: %s",
                    i+1,
                    len(r.Entries),
                    err,
                ),
            }, stepDown
        }

        // "Once a given server adds the new configuration entry to its log, it
        // uses that configuration for all future decisions (it does not wait
        // for the entry to become committed)."
        if entry.isConfiguration {
            if err := s.config.directSet(pm); err != nil {
                return appendEntriesResponse{
                    Term:    s.term,
                    Success: false,
                    reason: fmt.Sprintf(
                        "AppendEntry %d/%d failed (configuration): %s",
                        i+1,
                        len(r.Entries),
                        err,
                    ),
                }, stepDown
            }
        }
    }

    // Commit up to the commit index.
    //
    // < ptrb> ongardie: if the new leader sends a 0-entry appendEntries
    // with lastIndex=5 commitIndex=4, to a follower that has lastIndex=5
    // commitIndex=5 -- in my impl, this fails, because commitIndex is too
    // small. shouldn't be?
    // <@ongardie> ptrb: i don't think that should fail
    // <@ongardie> there are 4 ways an appendEntries request can fail: (1)
    // network drops packet (2) caller has stale term (3) would leave gap in
    // the recipient's log (4) term of entry preceding the new entries doesn't
    // match the term at the same index on the recipient
    //
    // 出現這種情況的原因可能是本地節點運行到committed邏輯的時候出現了問題,或者說應答給Leader時,網絡出現了問題等等。
    // 這些情況都會造成數據不同步的情況,也就是本地節點的commiitted情況和Leader節點保存的Follower(本地節點)不一致
    if r.CommitIndex > 0 && r.CommitIndex > s.log.getCommitIndex() {
        if err := s.log.commitTo(r.CommitIndex); err != nil {
            return appendEntriesResponse{
                Term:    s.term,
                Success: false,
                reason:  fmt.Sprintf("CommitTo(%d) failed: %s", r.CommitIndex, err),
            }, stepDown
        }
    }

    // all good
    return appendEntriesResponse{
        Term:    s.term,
        Success: true,
    }, stepDown
}

configuration.go

var (
        errConfigurationAlreadyChanging = errors.New("configuration already changing")
)

const (
        cOld    = "C_old"
        cOldNew = "C_old,new"
)

// configuration represents the sets of peers and behaviors required to
// implement joint-consensus.
type configuration struct {
        sync.RWMutex
        state     string
        // 老配置
        cOldPeers peerMap
        // 新配置-》用于過度
        cNewPeers peerMap
}

// newConfiguration returns a new configuration in stable (C_old) state based
// on the passed peers.
func newConfiguration(pm peerMap)*configuration {
        return &configuration{
            state:     cOld, // start in a stable state,
            cOldPeers: pm,   // with only C_old
        }
}

// directSet is used when bootstrapping, and when receiving a replicated
// configuration from a leader. It directly sets the configuration to the
// passed peers. It's assumed this is called on a non-leader, and therefore
// requires no consistency dance.
// 配置變更
func (c *configuration)directSet(pm peerMap)error {
        c.Lock()
        defer c.Unlock()

        c.cOldPeers = pm
        c.cNewPeers = peerMap{}
        c.state = cOld
        return nil
}

func (c *configuration)get(iduint64)(Peer,bool) {
        c.RLock()
        defer c.RUnlock()

        if peer, ok := c.cOldPeers[id]; ok {
            return peer, true
        }
        if peer, ok := c.cNewPeers[id]; ok {
            return peer, true
        }
        return nil, false
}

func (c *configuration)encode()([]byte, error) {
        buf := &bytes.Buffer{}
        if err := gob.NewEncoder(buf).Encode(c.allPeers()); err != nil {
            return []byte{}, err
        }
        return buf.Bytes(), nil
}

// allPeers returns the union set of all peers in the configuration.
func (c *configuration)allPeers()peerMap {
        c.RLock()
        defer c.RUnlock()

        union := peerMap{}
        for id, peer := range c.cOldPeers {
            union[id] = peer
        }
        for id, peer := range c.cNewPeers {
            union[id] = peer
        }
        return union
}

// pass returns true if the votes represented by the votes map are sufficient
// to constitute a quorum. pass respects C_old,new requirements, which dictate
// that any request must receive a majority from both C_old and C_new to pass.
// 共識判斷
func (c *configuration)pass(votesmap[uint64]bool)bool {
        c.RLock()
        defer c.RUnlock()

        // Count the votes
        cOldHave, cOldRequired := 0, c.cOldPeers.quorum()
        for id := range c.cOldPeers {
            if votes[id] {
                cOldHave++
            }
            if cOldHave >= cOldRequired {
                break
            }
        }

        // If we've already failed, we can stop here
        if cOldHave < cOldRequired {
            return false
        }

        // C_old passes: if we're in C_old, we pass
        if c.state == cOld {
            return true
        }

        // Not in C_old, so make sure we have some peers in C_new
        if len(c.cNewPeers) <= 0 {
            panic(fmt.Sprintf("configuration state '%s', but no C_new peers", c.state))
        }

        // Since we're in C_old,new, we need to also pass C_new to pass overall.
        // It's important that we range through C_new and check our votes map, and
        // not the other way around: if a server casts a vote but doesn't exist in
        // a particular configuration, that vote should not be counted.
        cNewHave, cNewRequired := 0, c.cNewPeers.quorum()
        for id := range c.cNewPeers {
            if votes[id] {
                cNewHave++
            }
            if cNewHave >= cNewRequired {
                break
            }
        }

        return cNewHave >= cNewRequired
}

// 配置變更準備, prepare-change
func (c *configuration)changeTo(pm peerMap)error {
        c.Lock()
        defer c.Unlock()

        if c.state != cOld {
            return errConfigurationAlreadyChanging
        }

        if len(c.cNewPeers) > 0 {
            panic(fmt.Sprintf("configuration ChangeTo in state '%s', but have C_new peers already", c.state))
        }

        c.cNewPeers = pm
        c.state = cOldNew
        return nil
}

// 提交變更邏輯
func (c *configuration)changeCommitted() {
        c.Lock()
        defer c.Unlock()

        if c.state != cOldNew {
            panic("configuration ChangeCommitted, but not in C_old,new")
        }

        if len(c.cNewPeers) <= 0 {
            panic("configuration ChangeCommitted, but C_new peers are empty")
        }

        c.cOldPeers = c.cNewPeers
        c.cNewPeers = peerMap{}
        c.state = cOld
}

// 中斷變更
func (c *configuration)changeAborted() {
        c.Lock()
        defer c.Unlock()

        if c.state != cOldNew {
            panic("configuration ChangeAborted, but not in C_old,new")
        }

        c.cNewPeers = peerMap{}
        c.state = cOld
}

Demo

package main

import (
        "bytes"
        "fmt"
        "hash/fnv"
        "net/http"
        "net/url"
        "time"

        "github.com/peterbourgon/raft"
)

func main() {
        a := func(idxuint64, cmd []byte)[]byte {
            fmt.Printf("%d, apply function: %s\n", idx, cmd)
            return cmd
        }

        mustParseURL := func(rawURLstring)*url.URL {
            u, _ := url.Parse(rawURL)
            u.Path = ""
            return u
        }
        mustNewHTTPPeer := func(u *url.URL)raft.Peer {
            p, err := raft.NewHTTPPeer(u)
            if err != nil {
                panic(err)
            }
            return p
        }
        peersAddr := []string{
            "127.0.0.1:7090",
            "127.0.0.1:7091",
            "127.0.0.1:7092",
            "127.0.0.1:7093",
            "127.0.0.1:7094"}
        var ss []*raft.Server
        for _, addr := range peersAddr {
            hash := fnv.New64()
            hash.Write([]byte(addr))
            id := hash.Sum64()
            hash.Reset()
            s := raft.NewServer(id, &bytes.Buffer{}, a)
            mux := http.NewServeMux()
            raft.HTTPTransport(mux, s)
            go func(addrstring) {
                if err := http.ListenAndServe(addr, mux); err != nil {
                    panic(err)
                }
            }(addr)
            ss = append(ss, s)
        }
        time.Sleep(time.Second)
        for _, s := range ss {
            s.SetConfiguration(
                mustNewHTTPPeer(mustParseURL("http://127.0.0.1:7090")),
                mustNewHTTPPeer(mustParseURL("http://127.0.0.1:7091")),
                mustNewHTTPPeer(mustParseURL("http://127.0.0.1:7092")),
                mustNewHTTPPeer(mustParseURL("http://127.0.0.1:7093")),
                mustNewHTTPPeer(mustParseURL("http://127.0.0.1:7094")),
            )
            s.Start()
        }

        for {
            cmd := []byte(time.Now().String())
            cmdChan := make(chan []byte)
            go ss[0].Command(cmd, cmdChan)
            <-cmdChan
            time.Sleep(time.Millisecond * 500)
        }

        time.Sleep(time.Hour)
}

Run

? go run raft-server.go 2>/dev/null     
1, apply function: 2017-09-11 11:41:13.668460404 +0800 CST
1, apply function: 2017-09-11 11:41:13.668460404 +0800 CST
1, apply function: 2017-09-11 11:41:13.668460404 +0800 CST
1, apply function: 2017-09-11 11:41:13.668460404 +0800 CST
1, apply function: 2017-09-11 11:41:13.668460404 +0800 CST
2, apply function: 2017-09-11 11:41:14.169165702 +0800 CST
2, apply function: 2017-09-11 11:41:14.169165702 +0800 CST
2, apply function: 2017-09-11 11:41:14.169165702 +0800 CST
2, apply function: 2017-09-11 11:41:14.169165702 +0800 CST
2, apply function: 2017-09-11 11:41:14.169165702 +0800 CST
3, apply function: 2017-09-11 11:41:14.670873193 +0800 CST
3, apply function: 2017-09-11 11:41:14.670873193 +0800 CST
3, apply function: 2017-09-11 11:41:14.670873193 +0800 CST
3, apply function: 2017-09-11 11:41:14.670873193 +0800 CST
3, apply function: 2017-09-11 11:41:14.670873193 +0800 CST
4, apply function: 2017-09-11 11:41:15.171741805 +0800 CST
4, apply function: 2017-09-11 11:41:15.171741805 +0800 CST
4, apply function: 2017-09-11 11:41:15.171741805 +0800 CST
4, apply function: 2017-09-11 11:41:15.171741805 +0800 CST
4, apply function: 2017-09-11 11:41:15.171741805 +0800 CST
5, apply function: 2017-09-11 11:41:15.673498401 +0800 CST
5, apply function: 2017-09-11 11:41:15.673498401 +0800 CST
5, apply function: 2017-09-11 11:41:15.673498401 +0800 CST
5, apply function: 2017-09-11 11:41:15.673498401 +0800 CST
5, apply function: 2017-09-11 11:41:15.673498401 +0800 CST
6, apply function: 2017-09-11 11:41:16.175658603 +0800 CST
6, apply function: 2017-09-11 11:41:16.175658603 +0800 CST
6, apply function: 2017-09-11 11:41:16.175658603 +0800 CST
6, apply function: 2017-09-11 11:41:16.175658603 +0800 CST
6, apply function: 2017-09-11 11:41:16.175658603 +0800 CST
7, apply function: 2017-09-11 11:41:16.677758823 +0800 CST

 

來自:http://laohanlinux.github.io/2017/09/11/raft源碼分析/

 

 本文由用戶 hjc987 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!