Skip to content

Instantly share code, notes, and snippets.

@duarten
Last active July 10, 2019 19:25
Show Gist options
  • Select an option

  • Save duarten/a9a6a7f6aaf4f94f9911d35b941293d3 to your computer and use it in GitHub Desktop.

Select an option

Save duarten/a9a6a7f6aaf4f94f9911d35b941293d3 to your computer and use it in GitHub Desktop.

Revisions

  1. duarten revised this gist Jul 10, 2019. 1 changed file with 1 addition and 23 deletions.
    24 changes: 1 addition & 23 deletions raft.cc
    Original file line number Diff line number Diff line change
    @@ -217,18 +217,13 @@ class log {
    } // namespace provider

    // The base type of Raft entries. There can be multiple types of entries managed
    // by the same Raft group, each with different requirements (e.g., one type of
    // entry may not be snapshottable, and exist only in the log).
    // by the same Raft group, each with different requirements.
    class entry {
    using type = unsigned;
    // The type of entry, useful to retrieve associated types (e.g., a deserializer).
    virtual type entry_type() const = 0;
    // The key of the entry. Keys are necessary so related changes can be serialized.
    virtual bytes_view key() const = 0;
    // Serializes the entry to the specified buffer.
    virtual void write(fragmented_temporary_buffer::ostream&) = 0;
    // Whether the entry can be placed in the state machine, or whether it should exist only in the log.
    virtual bool snapshottable() const = 0;
    // Called when the entry has been replicated across the set of replicas of the group.
    // Side-effects should be idempotent, as the callback can be called multiple times (e.g.,
    // when replaying the log).
    @@ -256,21 +251,4 @@ class protocol {
    virtual future<> update_configuration(std::vector<replica>) = 0;
    };

    class vnode {
    public:
    virtual ~vnode() = 0;
    virtual std::strong_equality operator<=>(const vnode&) const = 0;
    };

    // Registers the multiple Raft groups running on the current shard.
    class registry {
    public:
    // Returns the Raft instance for a particular group.
    lw_shared_ptr<protocol> group_of(const vnode&) const;
    // Registers a new Raft group, responsible for the specified token range.
    void register_group(lw_shared_ptr<group>, vnode, std::function<protocol()>);
    // Unregisters the specified Raft group.
    void unregister_group(lw_shared_ptr<group>);
    };

    } // namespace raft
  2. duarten created this gist Jul 9, 2019.
    276 changes: 276 additions & 0 deletions raft.cc
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,276 @@
    #pragma once

    #include <seastar/include/seastar/core/future.hh>
    #include <seastar/include/seastar/core/lowres_clock.hh>
    #include <seastar/include/seastar/core/reactor.hh>
    #include <seastar/include/seastar/net/socket_defs.hh>

    #include <cstdint>
    #include <compare>
    #include <vector>

    namespace raft {

    struct replica {
    seastar::socket_address id;
    };

    struct group_leader {
    replica node;
    bool current_node() const;
    };

    class term {
    uint32_t _val;
    public:
    term() = default;
    explicit term(uint32_t val) noexcept
    : _val(val) {
    }

    uint32_t value() const {
    return _val;
    }

    friend std::strong_ordering operator<=>(const term&, const term&);
    };

    class log_offset {
    uint64_t _pos;
    public:
    log_offset() = default;
    explicit log_offset(uint32_t pos) noexcept
    : _pos(pos) {
    }

    uint64_t position() const {
    return _pos;
    }

    friend std::strong_ordering operator<=>(const log_offset&, const log_offset&);
    };

    struct log_index {
    term term;
    seastar::shard_id leader_shard;
    log_offset offset;
    };

    class hybrid_clock {
    public:
    using logical_component = uint32_t;
    using physical_component = uint32_t;

    class time_point {
    uint64_t _rep;
    public:
    friend std::weak_equality operator<=>(const time_point&, const time_point&);
    };

    void update(logical_component);

    time_point now();
    };

    class group_id {
    uint64_t _id;
    public:
    explicit group_id(uint64_t id) noexcept
    : _id(id) {
    }

    friend std::strong_equality operator<=>(const group_id&, const group_id&);
    };

    struct group {
    group_id id;
    std::vector<replica> replicas;
    };

    namespace provider {

    // Sends Raft messages between members of a group. The module is not specific to a Raft group.
    class rpc {
    public:
    struct entry {
    fragmented_temporary_buffer data;
    log_index index;
    };

    struct success { };

    struct stale_leader {
    term current_term;
    };

    struct log_index_mismatch {
    log_index current_log_index;
    };

    struct prev_term_indexes {
    std::vector<log_index> indexes;
    };

    struct got_vote_tag { };
    using got_vote = bool_class<got_vote_tag>;

    // Sends the append entries payload, containing the following:
    // struct append_entries_payload {
    // term leader_term;
    // replica leader;
    // uint32_t shard;
    // log_index leader_commit_index;
    // log_index prev_log_index;
    // term prev_log_term;
    // entry entries[];
    // };
    // The follower either successfully processes the RPC, or updates a stale leader with
    // the current term, or sends the last index it has on its log for the current term (if
    // doesn't match the index of the first entry), or returns the log indexes of all leader
    // shards for the last term for which it received entries.
    virtual future<std::variant<success, stale_leader, log_index_mismatch, prev_term_indexes>> append_entries(
    replica destination,
    term leader_term,
    log_index leader_commit_index,
    log_index prev_log_index,
    std::vector<entry>) = 0;
    // Sends the request vote payload, containing the following:
    // struct request_vote_payload {
    // term candidate_term;
    // replica candidate;
    // log_index last_index;
    // };
    // The candidate either receives the vote or not, or it is updated with the current term of the group.
    virtual future<std::variant<got_vote, stale_leader>> request_vote(
    std::vector<replica> peers,
    term candidate_term,
    log_index last_index) = 0;
    };

    // Sends heartbeats on behalf of all groups of this node, and across all shards
    // Should exist only on one shard.
    class heartbeats {
    public:
    // The callback receives the election timeout to use. That timeout can be biased
    // by how many groups the current node is already a leader of.
    using on_leader_timeout = std::function<future<group_leader>(const group&, seastar::lowres_clock::time_point)>;
    protected:
    on_leader_timeout _on_leader_timeout;
    public:
    heartbeats(on_leader_timeout cb)
    : _on_leader_timeout(std::move(cb)) {
    }
    // Registers a new Raft group, for which leader election should be triggered.
    virtual future<group_leader> register_group(const group&) = 0;
    // Stop sending heartbeats for this group, as leadership has been relinquished.
    virtual future<> relinquish_leadership(const group&) = 0;
    // Sets the commit index for a particular group. If the current node is the leader,
    // then that commit index is sent in heartbeat messages.
    virtual future<> set_commit_index(const group&, log_index) = 0;
    // Unregisters a Raft group.
    virtual future<> unregister_group(const group&) = 0;
    };

    // State-machine for a particular Raft group.
    class state_machine {
    // Transfers the contents of the state machine to the specified replica.
    virtual future<> transfer_to(replica) = 0;
    // Applies the specified entry to the state machine.
    virtual future<> apply(fragmented_temporary_buffer) = 0;
    };

    // Stores the persistent Raft state belonging to a Raft group.
    class state {
    // Registers the replica for which the current node voted, for the specified term.
    // Should replicate that information across shards. When the future resolves, the
    // information should be persisted on disk.
    virtual future<> register_vote(term, replica) = 0;
    // Registers the current commit index for the current leader shard. When the future resolves,
    // the information is not guaranteed to have been persisted.
    virtual future<> register_commit_index_relaxed(log_index) = 0;
    // Registers the previous information of the discarded log for the current leader shard.
    // When the future resolves, the information should be persisted on disk.
    virtual future<> register_log_compaction(log_index, std::vector<replica> config) = 0;
    };

    // Implements the persistent Raft log for a particular Raft group.
    class log {
    // Appends the specified entry for the current shard, at the specified term.
    // When the future resolves, the entry should be persisted in stable storage.
    virtual future<log_index> append(
    term,
    size_t,
    std::function<void(fragmented_temporary_buffer::ostream&)>) = 0;
    // The persisted tail of the log for the current shard, used to include in the append_entries RPC.
    virtual log_index tail() const = 0;
    // The tail of the log for all shards of a term, used to bring up a follower to date.
    virtual std::vector<log_index> tail_for_all_shards_of(term) = 0;
    // Discards a prefix of the log, to support log compaction.
    virtual future<> discard_prefix(log_index) = 0;
    // Discards a suffix of the log, when support removing entries from an outdated follower.
    virtual future<> discard_suffix(log_index) = 0;
    // Reads a set of entries in the specified range.
    using process_entry = std::function<future<>(const fragmented_temporary_buffer&)>;
    virtual future<> read(log_index, log_index, process_entry) = 0;
    };

    } // namespace provider

    // The base type of Raft entries. There can be multiple types of entries managed
    // by the same Raft group, each with different requirements (e.g., one type of
    // entry may not be snapshottable, and exist only in the log).
    class entry {
    using type = unsigned;
    // The type of entry, useful to retrieve associated types (e.g., a deserializer).
    virtual type entry_type() const = 0;
    // The key of the entry. Keys are necessary so related changes can be serialized.
    virtual bytes_view key() const = 0;
    // Serializes the entry to the specified buffer.
    virtual void write(fragmented_temporary_buffer::ostream&) = 0;
    // Whether the entry can be placed in the state machine, or whether it should exist only in the log.
    virtual bool snapshottable() const = 0;
    // Called when the entry has been replicated across the set of replicas of the group.
    // Side-effects should be idempotent, as the callback can be called multiple times (e.g.,
    // when replaying the log).
    virtual future<> on_replicated() = 0;
    // Sets the timestamp
    virtual void set_timestamp(hybrid_clock::time_point) = 0;
    };

    // Deserializes entries of a particular type.
    class entry_deserializer {
    virtual entry read(const fragmented_temporary_buffer&) = 0;
    };

    // Exposes the Raft protocol to external consumers, for a particular group.
    // Consumes the heartbeat, rpc, state machine, state and log providers.
    class protocol {
    public:
    // Specifies whether the current node is the leader of this Raft instance.
    virtual bool is_leader() const = 0;
    // Returns the leader, useful to forward requests to it.
    virtual group_leader leader() const = 0;
    // Replicates the specified entry across the Raft group. Returns when the entry has been committed.
    virtual future<> replicate(entry) = 0;
    // When the group's configuration changes. Calls to this function are ordered w.r.t. calls to replicate().
    virtual future<> update_configuration(std::vector<replica>) = 0;
    };

    class vnode {
    public:
    virtual ~vnode() = 0;
    virtual std::strong_equality operator<=>(const vnode&) const = 0;
    };

    // Registers the multiple Raft groups running on the current shard.
    class registry {
    public:
    // Returns the Raft instance for a particular group.
    lw_shared_ptr<protocol> group_of(const vnode&) const;
    // Registers a new Raft group, responsible for the specified token range.
    void register_group(lw_shared_ptr<group>, vnode, std::function<protocol()>);
    // Unregisters the specified Raft group.
    void unregister_group(lw_shared_ptr<group>);
    };

    } // namespace raft