Practical Golang: Building a simple, distributed one-value database with Hashicorp Serf

Introduction

With the advent of distributed applications, we see new storage solutions emerging constantly.
They include, but are not limited to, Cassandra, Redis, CockroachDB, Consul or RethinkDB.
Most of you probably use one, or more, of them.

They seem to be really complex systems, because they actually are. This can’t be denied.
But it’s pretty easy to write a simple, one value database, featuring high availability.
You probably wouldn’t use anything near this in production, but it should be a fruitful learning experience for you nevertheless.
If you’re interested, read on!

Dependencies

You’ll need to

go get github.com/hashicorp/serf/serf

as a key dependency.

We’ll also use those for convenience’s sake:

"github.com/gorilla/mux"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"

Small overview

What will we build? We’ll build a one-value clustered database. Which means, numerous instances of our application will be able to work together.
You’ll be able to set or get the value using a REST interface. The value will then shortly be spread across the cluster using the Gossip protocol.
Which means, every node tells a part of the cluster about the current state of the variable in set intervals. But because later each of those also tells a part of the cluster about the state, the whole cluster ends up having been informed shortly.

It’ll use Serf for easy cluster membership, which uses SWIM under the hood. SWIM is a more advanced Gossip-like algorithm, which you can read on about here.

Now let’s get to the implementation…

Getting started

First, we’ll of course have to put in all our imports:

import (
    "context"
    "fmt"
    "log"
    "math/rand"
    "net/http"
    "os"
    "strconv"
    "sync"
    "time"

    "github.com/gorilla/mux"
    "github.com/hashicorp/serf/serf"
    "github.com/pkg/errors"
    "golang.org/x/sync/errgroup"
)

Following this, it’s time to write a simple thread-safe, one-value store.
An important thing is, the database will also hold the generation of the variable. This way, when one instance gets notified about a new value, it can check if the incoming notification actually has a higher generation count. Only then, will it change the current local value.
So our database structure will hold exactly this: the number, generation and a mutex.

type oneAndOnlyNumber struct {
    num        int
    generation int
    numMutex   sync.RWMutex
}

func InitTheNumber(val int) *oneAndOnlyNumber {
    return &oneAndOnlyNumber{
        num: val,
    }
}

We’ll also need a way to set and get the value.
Setting the value will also advance the generation count, so when we notify the rest of this cluster, we will overwrite their values and generation counts.

func (n *oneAndOnlyNumber) setValue(newVal int) {
    n.numMutex.Lock()
    defer n.numMutex.Unlock()
    n.num = newVal
    n.generation = n.generation + 1
}

func (n *oneAndOnlyNumber) getValue() (int, int) {
    n.numMutex.RLock()
    defer n.numMutex.RUnlock()
    return n.num, n.generation
}

Finally, we will need a way to notify the database of changes that happened elsewhere, if they have a higher generation count.
For that we’ll have a small notify method, which will return true, if anything has been changed:

func (n *oneAndOnlyNumber) notifyValue(curVal int, curGeneration int) bool {
    if curGeneration > n.generation {
        n.numMutex.Lock()
        defer n.numMutex.Unlock()
        n.generation = curGeneration
        n.num = curVal
        return true
    }
    return false
}

We’ll also create a const describing how many nodes we will notify about the new value every time.

const MembersToNotify = 2

Now let’s get to the actual functioning of the application. First we’ll have to start an instance of serf, using two variables. The address of our instance in the network and the -optional- address of the cluster to join.

func main() {
    cluster, err := setupCluster(
        os.Getenv("ADVERTISE_ADDR"),
        os.Getenv("CLUSTER_ADDR"))
    if err != nil {
        log.Fatal(err)
    }
    defer cluster.Leave()

How does the setupCluster function work, you may ask? Here it is:

func setupCluster(advertiseAddr string, clusterAddr string) (*serf.Serf, error) {
    conf := serf.DefaultConfig()
    conf.Init()
    conf.MemberlistConfig.AdvertiseAddr = advertiseAddr

    cluster, err := serf.Create(conf)
    if err != nil {
        return nil, errors.Wrap(err, "Couldn't create cluster")
    }

    _, err = cluster.Join([]string{clusterAddr}, true)
    if err != nil {
        log.Printf("Couldn't join cluster, starting own: %v\n", err)
    }

    return cluster, nil
}

As we can see, we are creating the cluster, only changing the advertise address.

If the creation fails, we of course return the error.
If the joining fails though, it means that we either didn’t get a cluster address,
or the cluster doesn’t exist (omitting network failures), which means we can safely ignore that and just log it.

To continue with, we initialize the database and the REST API:
(I’ve really chosen the number at random… really!)

    theOneAndOnlyNumber := InitTheNumber(42)
    launchHTTPAPI(theOneAndOnlyNumber)

And this is what the API creation looks like:

func launchHTTPAPI(db *oneAndOnlyNumber) {
    go func() {
        m := mux.NewRouter()

We first asynchronously start our server. Then we declare our getter:

        m.HandleFunc("/get", func(w http.ResponseWriter, r *http.Request) {
            val, _ := db.getValue()
            fmt.Fprintf(w, "%v", val)
        })

our setter:

m.HandleFunc("/set/{newVal}", func(w http.ResponseWriter, r *http.Request) {
            vars := mux.Vars(r)
            newVal, err := strconv.Atoi(vars["newVal"])
            if err != nil {
                w.WriteHeader(http.StatusBadRequest)
                fmt.Fprintf(w, "%v", err)
                return
            }

            db.setValue(newVal)

            fmt.Fprintf(w, "%v", newVal)
        })

and finally the API endpoint which allows other nodes to notify this instance of changes:

        m.HandleFunc("/notify/{curVal}/{curGeneration}", func(w http.ResponseWriter, r *http.Request) {
            vars := mux.Vars(r)
            curVal, err := strconv.Atoi(vars["curVal"])
            if err != nil {
                w.WriteHeader(http.StatusBadRequest)
                fmt.Fprintf(w, "%v", err)
                return
            }
            curGeneration, err := strconv.Atoi(vars["curGeneration"])
            if err != nil {
                w.WriteHeader(http.StatusBadRequest)
                fmt.Fprintf(w, "%v", err)
                return
            }

            if changed := db.notifyValue(curVal, curGeneration); changed {
                log.Printf(
                    "NewVal: %v Gen: %v Notifier: %v",
                    curVal,
                    curGeneration,
                    r.URL.Query().Get("notifier"))
            }
            w.WriteHeader(http.StatusOK)
        })
        log.Fatal(http.ListenAndServe(":8080", m))
    }()
}

It’s also here where we start our server and print some debug info when getting notified of new values by other members of our cluster.


Great, we’ve got a way to talk to our service now. Time to make it actually spread all the information.
We’ll also be printing debug info regularly.

To begin with, let’s initiate our context (that’s always a good idea in the main function).
We’ll also put a value into it, the name of our host, just for the debug logs.
It’s a good thing to put into the context, as it’s not something crucial for the functioning of our program,
and the context will get passed further anyways.

    ctx := context.Background()
    if name, err := os.Hostname(); err == nil {
        ctx = context.WithValue(ctx, "name", name)
    }

Having done this, we can set up our main loop, including the intervals at which we’ll be sending state updates to peers and printing debug info.

    debugDataPrinterTicker := time.Tick(time.Second * 5)
    numberBroadcastTicker := time.Tick(time.Second * 2)
    for {
        select {
        case <-numberBroadcastTicker:
        // Notification code goes here...
        case <-debugDataPrinterTicker:
            log.Printf("Members: %v\n", cluster.Members())

            curVal, curGen := theOneAndOnlyNumber.getValue()
            log.Printf("State: Val: %v Gen: %v\n", curVal, curGen)
        }
    }

Ok, that seems to be it.

Just kidding. Time to finish up our service with the notification code.
We’ll now get a list of other members in the cluster, set a timeout, and asynchronously notify a part of those others.

        case <-numberBroadcastTicker:
            members := getOtherMembers(cluster)

            ctx, _ := context.WithTimeout(ctx, time.Second*2)
            go notifyOthers(ctx, members, theOneAndOnlyNumber)

Now, let’s look at the getOtherMembers function. It’s actually just a function scanning through the memberlist, deleting ourselves and other nodes that aren’t alive at the moment.

func getOtherMembers(cluster *serf.Serf) []serf.Member {
    members := cluster.Members()
    for i := 0; i < len(members); {
        if members[i].Name == cluster.LocalMember().Name || members[i].Status != serf.StatusAlive {
            if i < len(members)-1 {
                members = append(members[:i], members[i + 1:]...)
            } else {
                members = members[:i]
            }
        } else {
            i++
        }
    }
    return members
}

There’s not much to it I suppose. It’s using slicing to cut out or cut off members not conforming to our predicates.

Finally the function we use to notify others:

func notifyOthers(ctx context.Context, otherMembers []serf.Member, db *oneAndOnlyNumber) {
    g, ctx := errgroup.WithContext(ctx)

    if len(otherMembers) <= MembersToNotify {
        for _, member := range otherMembers {
            curMember := member
            g.Go(func() error {
                return notifyMember(ctx, curMember.Addr.String(), db)
            })
        }
    } else {
        randIndex := rand.Int() % len(otherMembers)
        for i := 0; i < MembersToNotify; i++ {
            g.Go(func() error {
                return notifyMember(
                    ctx,
                    otherMembers[(randIndex + i) % len(otherMembers)].Addr.String(),
                    db)
            })
        }
    }

    err := g.Wait()
    if err != nil {
        log.Printf("Error when notifying other members: %v", err)
    }
}

If there are only two members then it sends the notifications to them, otherwise it chooses a random index in the members array and chooses subsequent members from there on.
How does the errgroup work? It’s a nifty library Brian Ketelsen wrote a great article about. It’s basically a wait group which also gathers errors and aborts when one happens.

Now to finish our code, the notifyMember function:

func notifyMember(ctx context.Context, addr string, db *oneAndOnlyNumber) error {
    val, gen := db.getValue()
    req, err := http.NewRequest("POST", fmt.Sprintf("http://%v:8080/notify/%v/%v?notifier=%v", addr, val, gen, ctx.Value("name")), nil)
    if err != nil {
        return errors.Wrap(err, "Couldn't create request")
    }
    req = req.WithContext(ctx)

    _, err = http.DefaultClient.Do(req)
    if err != nil {
        return errors.Wrap(err, "Couldn't make request")
    }
    return nil
}

We craft a path with the formula {nodeAddress}:8080/notify/{curVal}/{curGen}?notifier={selfHostName}
We add the context to the request, so we get the timeout functionality, and finally make the request.

And that’s actually all there is to the code.

Testing our database

We’ll test our database using docker. The necessary dockerfile to put into your project directory looks like this:

FROM alpine
WORKDIR /app
COPY distApp /app/
ENTRYPOINT ["./distApp"]

Now, first build your application (if you’re not on linux, you have to set the env variables GOOS=linux and GOARCH=amd64)
Later build the docker image:

docker build -t distapp .

And finally we can launch it. To supply the necessary environment variables, we’ll need to know what ip address the containers will get.
First run:

docker network inspect bridge

Bridge is the default network containers get assigned to. You should get something like this:

[
    {
        "Name": "bridge",
        "Id": "b56a19697ed9d30488f189d5517fd79f04a4df70c8bbc07d8f3c49a491f10433",
        "Created": "2017-01-29T10:48:05.1592086Z",
        "Scope": "local",
        "Driver": "bridge",
        "EnableIPv6": false,
        "IPAM": {
            "Driver": "default",
            "Options": null,
            "Config": [
                {
                    "Subnet": "172.17.0.0/16",
                    "Gateway": "172.17.0.1" <-- this is what we need
                }
            ]
        },
        "Internal": false,
        "Attachable": false,
        "Containers": {},
        "Options": {
            "com.docker.network.bridge.default_bridge": "true",
            "com.docker.network.bridge.enable_icc": "true",
            "com.docker.network.bridge.enable_ip_masquerade": "true",
            "com.docker.network.bridge.host_binding_ipv4": "0.0.0.0",
            "com.docker.network.bridge.name": "docker0",
            "com.docker.network.driver.mtu": "1500"
        },
        "Labels": {}
    }
]

What’s important for us is the gateway. In this case, our containers would be spawned with IP addresses from 172.17.0.2

So now we can start a few containers:

docker run -e ADVERTISE_ADDR=172.17.0.2 -p 8080:8080 distapp
docker run -e ADVERTISE_ADDR=172.17.0.3 -e CLUSTER_ADDR=172.17.0.2 -p 8081:8080 distapp
docker run -e ADVERTISE_ADDR=172.17.0.4 -e CLUSTER_ADDR=172.17.0.3 -p 8082:8080 distapp
docker run -e ADVERTISE_ADDR=172.17.0.5 -e CLUSTER_ADDR=172.17.0.4 -p 8083:8080 distapp

Next on you can test your deployment by stopping and starting containers, and setting/getting the variables at:

localhost:8080/set/5
localhost:8082/get/5
etc...

Conclusion

What’s important, this is a really basic distributed system, it may become inconsistent (if you update the value on two different machines simultaneously, the cluster will have two values depending on the machine).
If you want to learn more, read about CAP, consensus, Paxos, RAFT, gossip, and data replication, they are all very interesting topics (at least in my opinion).

Anyways, I hope you had fun creating a small distributed system and encourage you to build your own, more advanced one, it’ll be a great learning experience for sure!

The whole code is available on my Github.

Web app using Microservices in Go: Part 1 – Design

Introduction

Recently it’s a constantly repeated buzzword – Microservices. You can love ’em or hate ’em, but you really shouldn’t ignore ’em. In this short series we’ll create a web app using a microservice architecture. We’ll try not to use 3rd party tools and libraries. Remember though that when creating a production web app it is highly recommended to use 3rd party libraries (even if only to save you time).

We will create the various components in a basic form. We won’t use advanced caching or use a database. We will create a basic key-value store and a simple storage service. We will use the Go language for all this.

UPDATE: as there are comments regarding overcomplication: this is meant to show a scalable and working skeleton for a microservice architecture. If you only want to add some filters to photos, don’t design it like that. It’s overkill.

On further thought and another comment, (Which you can find on the golang Reddit) do design it this way. Software usually lives much longer than we think it will, and such a design will lead to an easily extendable and scalable web app.

The functionality

First we should decide what our web app will do. The web app we’ll create in this series will get an image from a user and give back an unique ID. The image will get modified using complicated and highly sophisticated algorithms, like swapping the blue and red channel, and the user will be able to use the ID to check if the work on the image has been finished already or if it’s still in progress. If it’s finished he will be able to download the altered image.

Designing the architecture

We want the architecture to be microservices, so we should design it like that. We’ll for sure need a service facing the user, the one that provides the interface for communication with our app. This could also handle authentication, and should be used as the service redirecting the workload to the right sub-services. (useful if you plan to integrate more funcionality into the app)

We will also want a microservice which will handle all our images. It will get the image, generate an ID, store information related to each task, and save the images. To handle high workloads it’s a good idea to use a master-slave system for our image modification service. The image handler will be the master, and we will create slave microservices which will ask the master for images to work on.

We will also need a key-value datastore for various configuration, a storage system, for saving our images, pre- and post-modification, and a database-ish service holding the information about each task.

This should suffice to begin with.

Here I’d like to also state that the architecture could change during the series if needed. And I encourage you to comment if you think that something could be done better.

Communication

We will also need to define the method the services communicate by. In this app we will use REST everywhere. You could also use a message BUS or Remote Procedure Calls – short RPC, but I won’t write about them here.

Designing the microservice API’s

Another important thing is to design the API‘s of you microservices. We will now design each of them to get an understanding about what they are for.

The key-value store

This one’s mainly for configuration. It will have a simple post-get interface:

  • POST:
    • Arguments:
      • Key
      • Value
    • Response:
      • Success/Failure
  • GET:
    • Arguments:
      • Key
    • Response:
      • Value/Failure

The storage

Here we will store the images, again using a key-value interface and an argument stating if this one’s pre- or post-modification. For the sake of simplicity we will just save the image to a folder named, depending on the state of the image, finished/inProgress.

  • POST:
    • Arguments:
      • Key
      • State: pre-/post-modification
      • Data
    • Response:
      • Success/Failure
  • GET:
    • Arguments:
      • Key
      • State: pre-/post-modification
    • Response:
      • Data/Failure

Database

This one will save our tasks. If they are waiting to start, in progress or finished, their Id.

  • POST:
    • Arguments:
      • TaskId
      • State: not started/ in progress/ finished
    • Response:
      • Success/Failure
  • GET:
    • Arguments:
      • TaskId
    • Response:
      • State/Failure
  • GET:
    • Path:
      • not started/ in progress/ finished
    • Reponse:
      • list of TaskId’s

The Frontend

The frontend is there mainly to provide a communication way between the various services and the user. It can also be used for authentication and authorization.

  • POST:
    • Path:
      • newImage
    • Arguments:
      • Data
    • Response:
      • Id
  • GET:
    • Path:
      • image/isReady
    • Arguments:
      • Id
    • Response:
      • not found/ in progress / finished
  • GET:
    • Path:
      • image/get
    • Arguments:
      • Id
    • Response:
      • Data

Image master microservice

This one will get new images from the fronted/user and send them to the storage service. It will also create a new task in the database, and orchestrate the workers who can ask for work and notify when it’s finished.

  • Frontend interface:
    • POST:
      • Path:
        • newImage
      • Arguments:
        • Data
      • Response:
        • Id
    • GET:
      • Path:
        • isReady
      • Arguments:
        • Id
      • Response:
        • not found/ in progress / finished
    • GET:
      • Path:
        • get
      • Arguments:
        • Id
      • Response:
        • Data/Failure
  • Worker interface:
    • GET:
      • Path:
        • getWork
      • Response:
        • Id/noWorkToDo
    • POST:
      • Path:
        • workFinished
      • Arguments:
        • Id
      • Response:
        • Success/Failure

Image worker microservice

This one doesn’t have any API. It is a client to the master image service, which he finds using the key-value store. He gets the image data to work on from the storage service.

Scheme

The microservice architecure design

Conclusion

This is basically everything regarding the design. In the next part we will write part of the microservices. Again, I encourage you to comment expressing what you think about this design!

Next part