Skip to content

Commit

Permalink
Merge pull request #130 from philips/add-version-to-join2
Browse files Browse the repository at this point in the history
add versioning to cluster join
  • Loading branch information
xiang90 committed Aug 19, 2013
2 parents 52cbc89 + b430a07 commit 7b28904
Show file tree
Hide file tree
Showing 7 changed files with 164 additions and 5 deletions.
61 changes: 61 additions & 0 deletions Documentation/internal-protocol-versioning.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# Versioning

Goal: We want to be able to upgrade an individual machine in an etcd cluster to a newer version of etcd.
The process will take the form of individual followers upgrading to the latest version until the entire cluster is on the new version.

Immediate need: etcd is moving too fast to version the internal API right now.
But, we need to keep mixed version clusters from being started by a rollowing upgrade process (e.g. the CoreOS developer alpha).

Longer term need: Having a mixed version cluster where all machines are not be running the exact same version of etcd itself but are able to speak one version of the internal protocol.

Solution: The internal protocol needs to be versioned just as the client protocol is.
Initially during the 0.\*.\* series of etcd releases we won't allow mixed versions at all.

## Join Control

We will add a version field to the join command.
But, who decides whether a newly upgraded follower should be able to join a cluster?

### Leader Controlled

If the leader controls the version of followers joining the cluster then it compares its version to the version number presented by the follower in the JoinCommand and rejects the join if the number is less than the leader's version number.

Advantages

- Leader controls all cluster decisions still

Disadvantages

- Follower knows better what versions of the interal protocol it can talk than the leader


### Follower Controlled

A newly upgraded follower should be able to figure out the leaders internal version from a defined internal backwards compatible API endpoint and figure out if it can join the cluster.
If it cannot join the cluster then it simply exits.

Advantages

- The follower is running newer code and knows better if it can talk older protocols

Disadvantages

- This cluster decision isn't made by the leader

## Recommendation

To solve the immediate need and to plan for the future lets do the following:

- Add Version field to JoinCommand
- Have a joining follower read the Version field of the leader and if its own version doesn't match the leader then sleep for some random interval and retry later to see if the leader has upgraded.

# Research

## Zookeeper versioning

Zookeeper very recently added versioning into the protocol and it doesn't seem to have seen any use yet.
https://issues.apache.org/jira/browse/ZOOKEEPER-1633

## doozerd

doozerd stores the version number of the machine in the datastore for other clients to check, no decisions are made off of this number currently.
6 changes: 4 additions & 2 deletions command.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,13 +117,15 @@ func (c *WatchCommand) Apply(server *raft.Server) (interface{}, error) {

// JoinCommand
type JoinCommand struct {
RaftVersion string `json:"raftVersion"`
Name string `json:"name"`
RaftURL string `json:"raftURL"`
EtcdURL string `json:"etcdURL"`
}

func newJoinCommand() *JoinCommand {
return &JoinCommand{
RaftVersion: r.version,
Name: r.name,
RaftURL: r.url,
EtcdURL: e.url,
Expand Down Expand Up @@ -152,14 +154,14 @@ func (c *JoinCommand) Apply(raftServer *raft.Server) (interface{}, error) {
return []byte("join fail"), etcdErr.NewError(103, "")
}

addNameToURL(c.Name, c.RaftURL, c.EtcdURL)
addNameToURL(c.Name, c.RaftVersion, c.RaftURL, c.EtcdURL)

// add peer in raft
err := raftServer.AddPeer(c.Name, "")

// add machine in etcd storage
key := path.Join("_etcd/machines", c.Name)
value := fmt.Sprintf("raft=%s&etcd=%s", c.RaftURL, c.EtcdURL)
value := fmt.Sprintf("raft=%s&etcd=%s&raftVersion=%s", c.RaftURL, c.EtcdURL, c.RaftVersion)
etcdStore.Set(key, value, time.Unix(0, 0), raftServer.CommitIndex())

return []byte("join success"), err
Expand Down
49 changes: 49 additions & 0 deletions etcd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"github.com/coreos/go-etcd/etcd"
"math/rand"
"net/http"
"net/http/httptest"
"net/url"
"os"
"strconv"
"strings"
Expand Down Expand Up @@ -54,6 +56,53 @@ func TestSingleNode(t *testing.T) {
}
}

// TestInternalVersionFail will ensure that etcd does not come up if the internal raft
// versions do not match.
func TestInternalVersionFail(t *testing.T) {
checkedVersion := false
testMux := http.NewServeMux()

testMux.HandleFunc("/version", func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintln(w, "This is not a version number")
checkedVersion = true
})

testMux.HandleFunc("/join", func(w http.ResponseWriter, r *http.Request) {
t.Fatal("should not attempt to join!")
})

ts := httptest.NewServer(testMux)
defer ts.Close()

fakeURL, _ := url.Parse(ts.URL)

procAttr := new(os.ProcAttr)
procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr}
args := []string{"etcd", "-n=node1", "-f", "-d=/tmp/node1", "-vv", "-C="+fakeURL.Host}

process, err := os.StartProcess("etcd", args, procAttr)
if err != nil {
t.Fatal("start process failed:" + err.Error())
return
}
defer process.Kill()

time.Sleep(time.Second)

_, err = http.Get("http://127.0.0.1:4001")

if err == nil {
t.Fatal("etcd node should not be up")
return
}

if checkedVersion == false {
t.Fatal("etcd did not check the version")
return
}
}


// This test creates a single node and then set a value to it.
// Then this test kills the node and restart it and tries to get the value again.
func TestSingleNodeRecovery(t *testing.T) {
Expand Down
4 changes: 3 additions & 1 deletion name_url_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

// we map node name to url
type nodeInfo struct {
raftVersion string
raftURL string
etcdURL string
}
Expand Down Expand Up @@ -39,8 +40,9 @@ func nameToRaftURL(name string) (string, bool) {
}

// addNameToURL add a name that maps to raftURL and etcdURL
func addNameToURL(name string, raftURL string, etcdURL string) {
func addNameToURL(name string, version string, raftURL string, etcdURL string) {
namesMap[name] = &nodeInfo{
raftVersion: raftVersion,
raftURL: raftURL,
etcdURL: etcdURL,
}
Expand Down
7 changes: 7 additions & 0 deletions raft_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,3 +113,10 @@ func NameHttpHandler(w http.ResponseWriter, req *http.Request) {
w.WriteHeader(http.StatusOK)
w.Write([]byte(r.name))
}

// Response to the name request
func RaftVersionHttpHandler(w http.ResponseWriter, req *http.Request) {
debugf("[recv] Get %s/version/ ", r.url)
w.WriteHeader(http.StatusOK)
w.Write([]byte(r.version))
}
37 changes: 35 additions & 2 deletions raft_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"crypto/tls"
"encoding/json"
"io/ioutil"
"fmt"
etcdErr "github.com/coreos/etcd/error"
"github.com/coreos/go-raft"
Expand All @@ -14,6 +15,7 @@ import (

type raftServer struct {
*raft.Server
version string
name string
url string
tlsConf *TLSConfig
Expand All @@ -34,6 +36,7 @@ func newRaftServer(name string, url string, tlsConf *TLSConfig, tlsInfo *TLSInfo

return &raftServer{
Server: server,
version: raftVersion,
name: name,
url: url,
tlsConf: tlsConf,
Expand Down Expand Up @@ -144,6 +147,7 @@ func (r *raftServer) startTransport(scheme string, tlsConf tls.Config) {

// internal commands
raftMux.HandleFunc("/name", NameHttpHandler)
raftMux.HandleFunc("/version", RaftVersionHttpHandler)
raftMux.Handle("/join", errorHandler(JoinHttpHandler))
raftMux.HandleFunc("/vote", VoteHttpHandler)
raftMux.HandleFunc("/log", GetLogHttpHandler)
Expand All @@ -160,15 +164,44 @@ func (r *raftServer) startTransport(scheme string, tlsConf tls.Config) {

}

// getVersion fetches the raft version of a peer. This works for now but we
// will need to do something more sophisticated later when we allow mixed
// version clusters.
func getVersion(t transporter, versionURL url.URL) (string, error) {
resp, err := t.Get(versionURL.String())

if err != nil {
return "", err
}

defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)

return string(body), nil
}

// Send join requests to the leader.
func joinCluster(s *raft.Server, raftURL string, scheme string) error {
var b bytes.Buffer

json.NewEncoder(&b).Encode(newJoinCommand())

// t must be ok
t, _ := r.Transporter().(transporter)

// Our version must match the leaders version
versionURL := url.URL{Host: raftURL, Scheme: scheme, Path: "/version"}
version, err := getVersion(t, versionURL)
if err != nil {
return fmt.Errorf("Unable to join: %v", err)
}

// TODO: versioning of the internal protocol. See:
// Documentation/internatl-protocol-versioning.md
if version != r.version {
return fmt.Errorf("Unable to join: internal version mismatch, entire cluster must be running identical versions of etcd")
}

json.NewEncoder(&b).Encode(newJoinCommand())

joinURL := url.URL{Host: raftURL, Scheme: scheme, Path: "/join"}

debugf("Send Join Request to %s", raftURL)
Expand Down
5 changes: 5 additions & 0 deletions version.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
package main

const version = "v1"

// TODO: The release version (generated from the git tag) will be the raft
// protocol version for now. When things settle down we will fix it like the
// client API above.
const raftVersion = releaseVersion

0 comments on commit 7b28904

Please sign in to comment.