Practical Golang: Building a simple, distributed one-value database with Hashicorp Serf

Introduction

With the advent of distributed applications, we see new storage solutions emerging constantly.
They include, but are not limited to, Cassandra, Redis, CockroachDB, Consul or RethinkDB.
Most of you probably use one, or more, of them.

They seem to be really complex systems, because they actually are. This can’t be denied.
But it’s pretty easy to write a simple, one value database, featuring high availability.
You probably wouldn’t use anything near this in production, but it should be a fruitful learning experience for you nevertheless.
If you’re interested, read on!

Dependencies

You’ll need to

go get github.com/hashicorp/serf/serf

as a key dependency.

We’ll also use those for convenience’s sake:

"github.com/gorilla/mux"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"

Small overview

What will we build? We’ll build a one-value clustered database. Which means, numerous instances of our application will be able to work together.
You’ll be able to set or get the value using a REST interface. The value will then shortly be spread across the cluster using the Gossip protocol.
Which means, every node tells a part of the cluster about the current state of the variable in set intervals. But because later each of those also tells a part of the cluster about the state, the whole cluster ends up having been informed shortly.

It’ll use Serf for easy cluster membership, which uses SWIM under the hood. SWIM is a more advanced Gossip-like algorithm, which you can read on about here.

Now let’s get to the implementation…

Getting started

First, we’ll of course have to put in all our imports:

import (
    "context"
    "fmt"
    "log"
    "math/rand"
    "net/http"
    "os"
    "strconv"
    "sync"
    "time"

    "github.com/gorilla/mux"
    "github.com/hashicorp/serf/serf"
    "github.com/pkg/errors"
    "golang.org/x/sync/errgroup"
)

Following this, it’s time to write a simple thread-safe, one-value store.
An important thing is, the database will also hold the generation of the variable. This way, when one instance gets notified about a new value, it can check if the incoming notification actually has a higher generation count. Only then, will it change the current local value.
So our database structure will hold exactly this: the number, generation and a mutex.

type oneAndOnlyNumber struct {
    num        int
    generation int
    numMutex   sync.RWMutex
}

func InitTheNumber(val int) *oneAndOnlyNumber {
    return &oneAndOnlyNumber{
        num: val,
    }
}

We’ll also need a way to set and get the value.
Setting the value will also advance the generation count, so when we notify the rest of this cluster, we will overwrite their values and generation counts.

func (n *oneAndOnlyNumber) setValue(newVal int) {
    n.numMutex.Lock()
    defer n.numMutex.Unlock()
    n.num = newVal
    n.generation = n.generation + 1
}

func (n *oneAndOnlyNumber) getValue() (int, int) {
    n.numMutex.RLock()
    defer n.numMutex.RUnlock()
    return n.num, n.generation
}

Finally, we will need a way to notify the database of changes that happened elsewhere, if they have a higher generation count.
For that we’ll have a small notify method, which will return true, if anything has been changed:

func (n *oneAndOnlyNumber) notifyValue(curVal int, curGeneration int) bool {
    if curGeneration > n.generation {
        n.numMutex.Lock()
        defer n.numMutex.Unlock()
        n.generation = curGeneration
        n.num = curVal
        return true
    }
    return false
}

We’ll also create a const describing how many nodes we will notify about the new value every time.

const MembersToNotify = 2

Now let’s get to the actual functioning of the application. First we’ll have to start an instance of serf, using two variables. The address of our instance in the network and the -optional- address of the cluster to join.

func main() {
    cluster, err := setupCluster(
        os.Getenv("ADVERTISE_ADDR"),
        os.Getenv("CLUSTER_ADDR"))
    if err != nil {
        log.Fatal(err)
    }
    defer cluster.Leave()

How does the setupCluster function work, you may ask? Here it is:

func setupCluster(advertiseAddr string, clusterAddr string) (*serf.Serf, error) {
    conf := serf.DefaultConfig()
    conf.Init()
    conf.MemberlistConfig.AdvertiseAddr = advertiseAddr

    cluster, err := serf.Create(conf)
    if err != nil {
        return nil, errors.Wrap(err, "Couldn't create cluster")
    }

    _, err = cluster.Join([]string{clusterAddr}, true)
    if err != nil {
        log.Printf("Couldn't join cluster, starting own: %v\n", err)
    }

    return cluster, nil
}

As we can see, we are creating the cluster, only changing the advertise address.

If the creation fails, we of course return the error.
If the joining fails though, it means that we either didn’t get a cluster address,
or the cluster doesn’t exist (omitting network failures), which means we can safely ignore that and just log it.

To continue with, we initialize the database and the REST API:
(I’ve really chosen the number at random… really!)

    theOneAndOnlyNumber := InitTheNumber(42)
    launchHTTPAPI(theOneAndOnlyNumber)

And this is what the API creation looks like:

func launchHTTPAPI(db *oneAndOnlyNumber) {
    go func() {
        m := mux.NewRouter()

We first asynchronously start our server. Then we declare our getter:

        m.HandleFunc("/get", func(w http.ResponseWriter, r *http.Request) {
            val, _ := db.getValue()
            fmt.Fprintf(w, "%v", val)
        })

our setter:

m.HandleFunc("/set/{newVal}", func(w http.ResponseWriter, r *http.Request) {
            vars := mux.Vars(r)
            newVal, err := strconv.Atoi(vars["newVal"])
            if err != nil {
                w.WriteHeader(http.StatusBadRequest)
                fmt.Fprintf(w, "%v", err)
                return
            }

            db.setValue(newVal)

            fmt.Fprintf(w, "%v", newVal)
        })

and finally the API endpoint which allows other nodes to notify this instance of changes:

        m.HandleFunc("/notify/{curVal}/{curGeneration}", func(w http.ResponseWriter, r *http.Request) {
            vars := mux.Vars(r)
            curVal, err := strconv.Atoi(vars["curVal"])
            if err != nil {
                w.WriteHeader(http.StatusBadRequest)
                fmt.Fprintf(w, "%v", err)
                return
            }
            curGeneration, err := strconv.Atoi(vars["curGeneration"])
            if err != nil {
                w.WriteHeader(http.StatusBadRequest)
                fmt.Fprintf(w, "%v", err)
                return
            }

            if changed := db.notifyValue(curVal, curGeneration); changed {
                log.Printf(
                    "NewVal: %v Gen: %v Notifier: %v",
                    curVal,
                    curGeneration,
                    r.URL.Query().Get("notifier"))
            }
            w.WriteHeader(http.StatusOK)
        })
        log.Fatal(http.ListenAndServe(":8080", m))
    }()
}

It’s also here where we start our server and print some debug info when getting notified of new values by other members of our cluster.


Great, we’ve got a way to talk to our service now. Time to make it actually spread all the information.
We’ll also be printing debug info regularly.

To begin with, let’s initiate our context (that’s always a good idea in the main function).
We’ll also put a value into it, the name of our host, just for the debug logs.
It’s a good thing to put into the context, as it’s not something crucial for the functioning of our program,
and the context will get passed further anyways.

    ctx := context.Background()
    if name, err := os.Hostname(); err == nil {
        ctx = context.WithValue(ctx, "name", name)
    }

Having done this, we can set up our main loop, including the intervals at which we’ll be sending state updates to peers and printing debug info.

    debugDataPrinterTicker := time.Tick(time.Second * 5)
    numberBroadcastTicker := time.Tick(time.Second * 2)
    for {
        select {
        case <-numberBroadcastTicker:
        // Notification code goes here...
        case <-debugDataPrinterTicker:
            log.Printf("Members: %v\n", cluster.Members())

            curVal, curGen := theOneAndOnlyNumber.getValue()
            log.Printf("State: Val: %v Gen: %v\n", curVal, curGen)
        }
    }

Ok, that seems to be it.

Just kidding. Time to finish up our service with the notification code.
We’ll now get a list of other members in the cluster, set a timeout, and asynchronously notify a part of those others.

        case <-numberBroadcastTicker:
            members := getOtherMembers(cluster)

            ctx, _ := context.WithTimeout(ctx, time.Second*2)
            go notifyOthers(ctx, members, theOneAndOnlyNumber)

Now, let’s look at the getOtherMembers function. It’s actually just a function scanning through the memberlist, deleting ourselves and other nodes that aren’t alive at the moment.

func getOtherMembers(cluster *serf.Serf) []serf.Member {
    members := cluster.Members()
    for i := 0; i < len(members); {
        if members[i].Name == cluster.LocalMember().Name || members[i].Status != serf.StatusAlive {
            if i < len(members)-1 {
                members = append(members[:i], members[i + 1:]...)
            } else {
                members = members[:i]
            }
        } else {
            i++
        }
    }
    return members
}

There’s not much to it I suppose. It’s using slicing to cut out or cut off members not conforming to our predicates.

Finally the function we use to notify others:

func notifyOthers(ctx context.Context, otherMembers []serf.Member, db *oneAndOnlyNumber) {
    g, ctx := errgroup.WithContext(ctx)

    if len(otherMembers) <= MembersToNotify {
        for _, member := range otherMembers {
            curMember := member
            g.Go(func() error {
                return notifyMember(ctx, curMember.Addr.String(), db)
            })
        }
    } else {
        randIndex := rand.Int() % len(otherMembers)
        for i := 0; i < MembersToNotify; i++ {
            g.Go(func() error {
                return notifyMember(
                    ctx,
                    otherMembers[(randIndex + i) % len(otherMembers)].Addr.String(),
                    db)
            })
        }
    }

    err := g.Wait()
    if err != nil {
        log.Printf("Error when notifying other members: %v", err)
    }
}

If there are only two members then it sends the notifications to them, otherwise it chooses a random index in the members array and chooses subsequent members from there on.
How does the errgroup work? It’s a nifty library Brian Ketelsen wrote a great article about. It’s basically a wait group which also gathers errors and aborts when one happens.

Now to finish our code, the notifyMember function:

func notifyMember(ctx context.Context, addr string, db *oneAndOnlyNumber) error {
    val, gen := db.getValue()
    req, err := http.NewRequest("POST", fmt.Sprintf("http://%v:8080/notify/%v/%v?notifier=%v", addr, val, gen, ctx.Value("name")), nil)
    if err != nil {
        return errors.Wrap(err, "Couldn't create request")
    }
    req = req.WithContext(ctx)

    _, err = http.DefaultClient.Do(req)
    if err != nil {
        return errors.Wrap(err, "Couldn't make request")
    }
    return nil
}

We craft a path with the formula {nodeAddress}:8080/notify/{curVal}/{curGen}?notifier={selfHostName}
We add the context to the request, so we get the timeout functionality, and finally make the request.

And that’s actually all there is to the code.

Testing our database

We’ll test our database using docker. The necessary dockerfile to put into your project directory looks like this:

FROM alpine
WORKDIR /app
COPY distApp /app/
ENTRYPOINT ["./distApp"]

Now, first build your application (if you’re not on linux, you have to set the env variables GOOS=linux and GOARCH=amd64)
Later build the docker image:

docker build -t distapp .

And finally we can launch it. To supply the necessary environment variables, we’ll need to know what ip address the containers will get.
First run:

docker network inspect bridge

Bridge is the default network containers get assigned to. You should get something like this:

[
    {
        "Name": "bridge",
        "Id": "b56a19697ed9d30488f189d5517fd79f04a4df70c8bbc07d8f3c49a491f10433",
        "Created": "2017-01-29T10:48:05.1592086Z",
        "Scope": "local",
        "Driver": "bridge",
        "EnableIPv6": false,
        "IPAM": {
            "Driver": "default",
            "Options": null,
            "Config": [
                {
                    "Subnet": "172.17.0.0/16",
                    "Gateway": "172.17.0.1" <-- this is what we need
                }
            ]
        },
        "Internal": false,
        "Attachable": false,
        "Containers": {},
        "Options": {
            "com.docker.network.bridge.default_bridge": "true",
            "com.docker.network.bridge.enable_icc": "true",
            "com.docker.network.bridge.enable_ip_masquerade": "true",
            "com.docker.network.bridge.host_binding_ipv4": "0.0.0.0",
            "com.docker.network.bridge.name": "docker0",
            "com.docker.network.driver.mtu": "1500"
        },
        "Labels": {}
    }
]

What’s important for us is the gateway. In this case, our containers would be spawned with IP addresses from 172.17.0.2

So now we can start a few containers:

docker run -e ADVERTISE_ADDR=172.17.0.2 -p 8080:8080 distapp
docker run -e ADVERTISE_ADDR=172.17.0.3 -e CLUSTER_ADDR=172.17.0.2 -p 8081:8080 distapp
docker run -e ADVERTISE_ADDR=172.17.0.4 -e CLUSTER_ADDR=172.17.0.3 -p 8082:8080 distapp
docker run -e ADVERTISE_ADDR=172.17.0.5 -e CLUSTER_ADDR=172.17.0.4 -p 8083:8080 distapp

Next on you can test your deployment by stopping and starting containers, and setting/getting the variables at:

localhost:8080/set/5
localhost:8082/get/5
etc...

Conclusion

What’s important, this is a really basic distributed system, it may become inconsistent (if you update the value on two different machines simultaneously, the cluster will have two values depending on the machine).
If you want to learn more, read about CAP, consensus, Paxos, RAFT, gossip, and data replication, they are all very interesting topics (at least in my opinion).

Anyways, I hope you had fun creating a small distributed system and encourage you to build your own, more advanced one, it’ll be a great learning experience for sure!

The whole code is available on my Github.

Web app using Microservices in Go: Part 4 – Worker and Frontend

Previous part

Introduction

In this part we will finally finish writing our application. We will implement the last two services:
1. The Worker
2. The Frontend

The Worker

The worker will communicate with the Master to get new Tasks. When it gets a Task it will get the corresponding data from the storage and will start working on the task. When it finishes it will send the finished data to the storage service, and if that succeeds it will register the Task as finished to the Master.

That means that you can easily scale the workforce, by turning on additional machines, with the worker service on them. Easy scaling is good!

Implementation

As usual we will start with a basic structure which is similar to the structure of our previous services. Although there is one big difference. There won’t be any API here as the worker will be a client. You could, if you wanted, add an API for debugging purposes. Things like getting the processor usage. But this can also be implemented using 3rd party health checking services.

So here’s our basic structure:

package main

import (
    "os"
    "fmt"
    "net/http"
    "io/ioutil"
    "encoding/json"
    "time"
    "strconv"
    "image"
    "image/png"
    "image/color"
    "bytes"
    "sync"
)

type Task struct {
    Id int `json:"id"`
    State int `json:"state"`
}

var masterLocation string
var storageLocation string
var keyValueStoreAddress string

func main() {
    if len(os.Args) < 3 {
        fmt.Println("Error: Too few arguments.")
        return
    }
    keyValueStoreAddress = os.Args[1]

    response, err := http.Get("http://" + keyValueStoreAddress + "/get?key=masterAddress")
    if response.StatusCode != http.StatusOK {
        fmt.Println("Error: can't get master address.")
        fmt.Println(response.Body)
        return
    }
    data, err := ioutil.ReadAll(response.Body)
    if err != nil {
        fmt.Println(err)
        return
    }
    masterLocation = string(data)
    if len(masterLocation) == 0 {
        fmt.Println("Error: can't get master address. Length is zero.")
        return
    }

    response, err = http.Get("http://" + keyValueStoreAddress + "/get?key=storageAddress")
    if response.StatusCode != http.StatusOK {
        fmt.Println("Error: can't get storage address.")
        fmt.Println(response.Body)
        return
    }
    data, err = ioutil.ReadAll(response.Body)
    if err != nil {
        fmt.Println(err)
        return
    }
    storageLocation = string(data)
    if len(storageLocation) == 0 {
        fmt.Println("Error: can't get storage address. Length is zero.")
        return
    }
}

func getNewTask(masterAddress string) (Task, error) {
}
func getImageFromStorage(storageAddress string, myTask Task) (image.Image, error) {
}
func doWorkOnImage(myImage image.Image) image.Image {
}
func sendImageToStorage(storageAddress string, myTask Task, myImage image.Image) error {
}
func registerFinishedTask(masterAddress string, myTask Task) error {
}

It may seem to be much but there is nothing new actually in the most dense parts. We define the Task structure first and declare the needed addresses/locations.

Later we check if there are enough arguments passed to the application, and, as we did in the previous parts, we get the addresses of the Master and the Storage. That’s basically all.

For the worker we will want a command line parameter to set the number of concurrent threads. That’s why we check if there are 3 Arguments.

Now let’s parse the thread count:

storageLocation = string(data)
if len(storageLocation) == 0 {
    fmt.Println("Error: can't get storage address. Length is zero.")
    return
}

threadCount, err := strconv.Atoi(os.Args[2])
if err != nil {
    fmt.Println("Error: Couldn't parse thread count.")
    return
}

We use the atoi function to parse the string argument to an int.

And now we come to the main for loop which runs on each thread:

myWG := sync.WaitGroup{}
myWG.Add(threadCount)
for i := 0; i < threadCount; i++ {
    go func() {
        for {
            //Do work...
        }
    }()
}
myWG.Wait()

Ok, what are we doing there? We create a waitGroup. The main function has to be waiting for the goroutines and not just finish execution, that’s why we create a waitGroup and add the thread count. You could add a functionality to break the endless for loop and after that use the Done() function on the waitgroup. We won’t be adding this as we just want endless for work loops.

Now we will write down the execution process for each Task.
First we get a new Task:

for {
    myTask, err := getNewTask(masterLocation)
    if err != nil {
        fmt.Println(err)
        fmt.Println("Waiting 2 second timeout...")
        time.Sleep(time.Second * 2)
        continue
    }

If we error, then we wait 2 seconds, so it doesn’t make a lot of errored requests to the master at once. As this could flood the other services.

If we successfully get the Task, then we get the image to work on from the storage:

myTask, err := getNewTask(masterLocation)
if err != nil {
    fmt.Println(err)
    fmt.Println("Waiting 2 second timeout...")
    time.Sleep(time.Second * 2)
    continue
}

myImage, err := getImageFromStorage(storageLocation, myTask)
if err != nil {
    fmt.Println(err)
    fmt.Println("Waiting 2 second timeout...")
    time.Sleep(time.Second * 2)
    continue
}

Ok, now it’s time to do something with the image!!!

myImage, err := getImageFromStorage(storageLocation, myTask)
if err != nil {
    fmt.Println(err)
    fmt.Println("Waiting 2 second timeout...")
    time.Sleep(time.Second * 2)
    continue
}

myImage = doWorkOnImage(myImage)

We now of course have to save the image back to the storage:

myImage = doWorkOnImage(myImage)

err = sendImageToStorage(storageLocation, myTask, myImage)
if err != nil {
    fmt.Println(err)
    fmt.Println("Waiting 2 second timeout...")
    time.Sleep(time.Second * 2)
    continue
}

And finally if that succeeds we register our successfully finished Task!

        err = sendImageToStorage(storageLocation, myTask, myImage)
        if err != nil {
            fmt.Println(err)
            fmt.Println("Waiting 2 second timeout...")
            time.Sleep(time.Second * 2)
            continue
        }

        err = registerFinishedTask(masterLocation, myTask)
        if err != nil {
            fmt.Println(err)
            fmt.Println("Waiting 2 second timeout...")
            time.Sleep(time.Second * 2)
            continue
        }
    }
}()

Ok, so now we can continue with implementing these functions. Let’s first implement the getNewTask function:

func getNewTask(masterAddress string) (Task, error) {
    response, err := http.Post("http://" + masterAddress + "/getNewTask", "text/plain", nil)
    if err != nil || response.StatusCode != http.StatusOK {
        return Task{-1, -1}, err
    }
    data, err := ioutil.ReadAll(response.Body)
    if err != nil {
        return Task{-1, -1}, err
    }

    myTask := Task{}
    err = json.Unmarshal(data, &myTask)
    if err != nil {
        return Task{-1, -1}, err
    }

    return myTask, nil
}

We make the request to the master and check if it was successful. We read the response body to memory and finally Unmarshal the response body to our Task structure. Finally we return it.

Now we can implement the function to get the image from storage!

func getImageFromStorage(storageAddress string, myTask Task) (image.Image, error) {
    response, err := http.Get("http://" + storageAddress + "/getImage?state=working&id=" + strconv.Itoa(myTask.Id))
    if err != nil || response.StatusCode != http.StatusOK {
        return nil, err
    }

    myImage, err := png.Decode(response.Body)
    if err != nil {
        return nil, err
    }

    return myImage, nil
}

We get the response whose body is the raw image, so we just Decode it and return it if we succeed.

Now that we have our image we can finally do some work on it:

func doWorkOnImage(myImage image.Image) image.Image {
    myCanvas := image.NewRGBA(myImage.Bounds())

    for i := 0; i < myCanvas.Rect.Max.X; i++ {
        for j := 0; j < myCanvas.Rect.Max.Y; j++ {
            r, g, b, _ := myImage.At(i, j).RGBA()
            myColor := new(color.RGBA)
            myColor.R = uint8(g)
            myColor.G = uint8(r)
            myColor.B = uint8(b)
            myColor.A = uint8(255)
            myCanvas.Set(i, j, myColor)
        }
    }

    return myCanvas.SubImage(myImage.Bounds())
}

The work is largely irrelevant, but I’ll explain it anyways. First we create a RGBA. That’s something like a canvas for drawing, and we create it with the size of our image. Later we draw on the canvas swapping the red with the green channel. Later we use the RGBA to return a new modified image, created from our canvas with the size of our original image.

After working on the image we have to send it back to the storage system. So let’s implement the sendImageToStorage function:

func sendImageToStorage(storageAddress string, myTask Task, myImage image.Image) error {
    data := []byte{}
    buffer := bytes.NewBuffer(data)
    err := png.Encode(buffer, myImage)
    if err != nil {
        return err
    }
    response, err := http.Post("http://" + storageAddress + "/sendImage?state=finished&id=" + strconv.Itoa(myTask.Id), "image/png", buffer)
    if err != nil || response.StatusCode != http.StatusOK {
        return err
    }

    return nil
}

We create a data byte slice, and from that a data buffer which allows us to use it as a readwriter interface. We then use this interface to encode our image to png into, and finally send it using a POST to the server. If everything works out, then we just return.

When we successfully saved the image, we can register to the Master that we finished the Task.

func registerFinishedTask(masterAddress string, myTask Task) error {
    response, err := http.Post("http://" + masterAddress + "/registerTaskFinished?id=" + strconv.Itoa(myTask.Id), "test/plain", nil)
    if err != nil || response.StatusCode != http.StatusOK {
        return err
    }

    return nil
}

Nothing fancy. Just sending a POST request to notify about finishing the task.

Ok, that’s all regarding the worker. You can turn it on and it will wait for Tasks to get. Which means we need to get the Tasks from the user to our service finally. And that leads us to…

The Frontend

This one will show the user the website and also parse the user form so the backend services only get the raw image data. It will be the only interface to our application for the user.

As usual, we’ll begin with the basic structure of the file:

package main

import (
    "net/http"
    "fmt"
    "io/ioutil"
    "os"
    "net/url"
    "io"
)

const indexPage = "<html><head><title>Upload file</title></head><body><form enctype=\"multipart/form-data\" action=\"submitTask\" method=\"post\"> <input type=\"file\" name=\"uploadfile\" /> <input type=\"submit\" value=\"upload\" /> </form> </body> </html>"

var keyValueStoreAddress string
var masterLocation string

func main() {
    if len(os.Args) < 2 {
        fmt.Println("Error: Too few arguments.")
        return
    }
    keyValueStoreAddress = os.Args[1]

    response, err := http.Get("http://" + keyValueStoreAddress + "/get?key=masterAddress")
    if response.StatusCode != http.StatusOK {
        fmt.Println("Error: can't get master address.")
        fmt.Println(response.Body)
        return
    }
    data, err := ioutil.ReadAll(response.Body)
    if err != nil {
        fmt.Println(err)
        return
    }
    masterLocation = string(data)
    if len(masterLocation) == 0 {
        fmt.Println("Error: can't get master address. Length is zero.")
        return
    }

    http.HandleFunc("/", handleIndex)
    http.HandleFunc("/submitTask", handleTask)
    http.HandleFunc("/isReady", handleCheckForReadiness)
    http.HandleFunc("/getImage", serveImage)
    http.ListenAndServe(":80", nil)
}

func handleIndex(w http.ResponseWriter, r *http.Request) {
}

func handleTask(w http.ResponseWriter, r *http.Request) {
}

func handleCheckForReadiness(w http.ResponseWriter, r *http.Request) {
}

func serveImage(w http.ResponseWriter, r *http.Request) {
}

We’ve got the code of our index web page here, and we also have the API declared. After starting the program we check if we have the k/v store address. If we do (we hope so), then we can get the master address from it.

Now we can go on to implementing the functions. We’ll start with the simples. The index handler:

func handleIndex(w http.ResponseWriter, r *http.Request) {
    fmt.Fprint(w, indexPage)
}

After writing this we can go on to writing the more complicated functions. We will start with the handleTask function. This function is responsible for parsing the user form and sending the raw image data to the master.

func handleTask(w http.ResponseWriter, r *http.Request) {
    if r.Method == http.MethodPost {
    err := r.ParseMultipartForm(10000000)
    if err != nil {
        w.WriteHeader(http.StatusBadRequest)
        fmt.Fprint(w, "Wrong input")
        return
    }
    file, _, err := r.FormFile("uploadfile")
    if err != nil {
        w.WriteHeader(http.StatusBadRequest)
        fmt.Fprint(w, "Wrong input")
        return
    }

Ok, what do we have here? We check the method as we always do, and later parse the multipart form. We’ve got a nice lovely magic number there. The number is responsible for setting the max size of the form held in RAM. The rest will be stored in temporary files. We later do the request using the file we got:

    if err != nil {
        w.WriteHeader(http.StatusBadRequest)
        fmt.Fprint(w, "Wrong input")
        return
    }

    response, err := http.Post("http://" + masterLocation + "/new", "image", file)
    if err != nil || response.StatusCode != http.StatusOK {
        w.WriteHeader(http.StatusBadRequest)
        fmt.Fprint(w, "Error:", err)
        return
    }

    data, err := ioutil.ReadAll(response.Body)
    if err != nil {
        w.WriteHeader(http.StatusBadRequest)
        fmt.Fprint(w, "Error:", err)
        return
    }

    fmt.Fprint(w, string(data))
} else {
    w.WriteHeader(http.StatusBadRequest)
    fmt.Fprint(w, "Error: Only POST accepted")
}

We send the user back the new Task id we got back from the master.

Now we can implement the function which will handle the request to check if the Task is finished and ready:

func handleCheckForReadiness(w http.ResponseWriter, r *http.Request) {
    if r.Method == http.MethodGet {
        values, err := url.ParseQuery(r.URL.RawQuery)
        if err != nil {
            fmt.Fprint(w, err)
            return
        }
        if len(values.Get("id")) == 0 {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Wrong input")
            return
        }

        response, err := http.Get("http://" + masterLocation + "/isReady?id=" + values.Get("id") + "&state=finished")
        if err != nil || response.StatusCode != http.StatusOK {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error:", err)
            return
        }

        data, err := ioutil.ReadAll(response.Body)
        if err != nil {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error:", err)
            return
        }

        switch string(data) {
        case "0":
            fmt.Fprint(w, "Your image is not ready yet.")
        case "1":
            fmt.Fprint(w, "Your image is ready.")
        default :
            fmt.Fprint(w, "Internal server error.")
        }
    } else {
        w.WriteHeader(http.StatusBadRequest)
        fmt.Fprint(w, "Error: Only GET accepted")
    }
}

Most of this is the same as the function in the master. We just check if the id is correct and make a request to the master. The interesting part is this:

switch string(data) {
case "0":
    fmt.Fprint(w, "Your image is not ready yet.")
case "1":
    fmt.Fprint(w, "Your image is ready.")
default :
    fmt.Fprint(w, "Internal server error.")
}

We cast the response body to a string, and if it matches 1 or 0 we answer. If it’s something else then we know something went really wrong, and send back an error.

Now we can get to the last function, the function to serve the actual images:

func serveImage(w http.ResponseWriter, r *http.Request) {
    if r.Method == http.MethodGet {
        values, err := url.ParseQuery(r.URL.RawQuery)
        if err != nil {
            fmt.Fprint(w, err)
            return
        }
        if len(values.Get("id")) == 0 {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Wrong input")
            return
        }

        response, err := http.Get("http://" + masterLocation + "/get?id=" + values.Get("id") + "&state=finished")
        if err != nil || response.StatusCode != http.StatusOK {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error:", err)
            return
        }

        _, err = io.Copy(w, response.Body)
        if err != nil {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error:", err)
            return
        }
    } else {
        w.WriteHeader(http.StatusBadRequest)
        fmt.Fprint(w, "Error: Only GET accepted")
    }
}

It just checks the id, sends the request, and copies the response to answer the user.

Conclusion

So now it’s all finished. You can start all applications and they will work. You can contact the frontend and make requests, they will all start working and you will get your modified images. Six microservices working beautifully together.

This may be the end of this series, so have fun extending this system alone.

I may add a finishing part about deployment to container infrastructues, or a another extending series about refactoring the system with 3rd party libraries in the future but I’m not sure.

All in all, good luck!

UPDATE: Also remember, that when running the worker, it’s good to launch it with goroutines in the number of 2-4x your system threads. They have near-0 overhead in switching and this way your not unnecessarily blocking when waiting for http responses.