Practical Golang: Getting started with NATS and related patterns

Practical Golang: Getting started with NATS and related patterns

Introduction

Microservices… the never disappearing buzzword of our times. They promise a lot, but can be slow or complicated if not implemented correctly. One of the main challenges when developing and using a microservice-based architecture is getting the communication right. Many will ask, why not REST? As I did at some point. Many will actually use it. But the truth is that it leads to tighter coupling, and is synchronous. Microservice architectures are meant to be asynchronous. Also, REST is blocking, which also isn’t good on many occasions.

What are we meant to use for communication? Usually we use:
– RPC – Remote Procedure Call
– Message BUS/Broker

In this article I’ll write about one specific Message BUS called NATS and using it in Go.

There are also other message BUS’ses/Brokers. Some popular ones are Kafka and RabbitMQ.

Why NATS? It’s simple, and astonishingly fast.

Setting up NATS

To use NATS you can do one of the following things:
1. Use the NATS Docker image
2. Get the binaries
3. Use the public NATS server nats://demo.nats.io:4222
4. Build from source

Also, remember to

go get https://github.com/nats-io/nats

the official Go library.

Getting started

In this article we’ll be using protobuffs a lot. So if you want to know more about them, check out my previous article about protobuffs.

First, let’s write one of the key usages of microservices. A fronted, that lists information from other micrservices, but doesn’t care if one of them is down. It will respond to the user anyways. This makes microservices swappable live, one at a time.

In each of our services we’ll need to connect to NATS:

package main

import (
    "github.com/nats-io/nats"
    "fmt"
)

var nc *nats.Conn

func main() {

    if len(os.Args) != 2 {
        fmt.Println("Wrong number of arguments. Need NATS server address.")
        return
    }
    var err error

    nc, err = nats.Connect(os.Args[1])
    if err != nil {
        fmt.Println(err)
    }
}

Now, let’s write the first provider service. It will receive a User Id, and answer with a user name For which we’ll need a transport structure to send its data over NATS. I wrote this short proto file for that:

syntax = "proto3";
package Transport;

message User {
        string id = 1;
        string name = 2;
}

Now we will create the map containing our user names:

var users map[string]string
var nc *nats.Conn

func main() {

    if len(os.Args) != 2 {
        fmt.Println("Wrong number of arguments. Need NATS server address.")
        return
    }
    var err error

    nc, err = nats.Connect(os.Args[1])
    if err != nil {
        fmt.Println(err)
    }

    users = make(map[string]string)
    users["1"] = "Bob"
    users["2"] = "John"
    users["3"] = "Dan"
    users["4"] = "Kate"
}

and finally the part that’s most interesting to us. Subscribing to the topic:

users["4"] = "Kate"

nc.QueueSubscribe("UserNameById", "userNameByIdProviders", replyWithUserId)

Notice that it’s a QueueSubscribe. Which means that if we start 10 instances of this service in the userNameByIdProviders group , only one will get each message sent over UserNameById. Another thing to note is that this function call is asynchronous, so we need to block somehow. This select {} will provide an endless block:

nc.QueueSubscribe("UserNameById", "userNameByIdProviders", replyWithUserId)
select {}
}

Ok, now to the replyWithUserId function:

func replyWithUserId(m *nats.Msg) {
}

Notice that it takes one argument, a pointer to the message.

We’ll unmarshal the data:

func replyWithUserId(m *nats.Msg) {

    myUser := Transport.User{}
    err := proto.Unmarshal(m.Data, &myUser)
    if err != nil {
        fmt.Println(err)
        return
}

get the name and marshal back:

myUser.Name = users[myUser.Id]
data, err := proto.Marshal(&myUser)
if err != nil {
    fmt.Println(err)
    return
}

And, as this shall be a request we’re handling, we respond to the Reply topic, a topic created by the caller exactly for this purpose:

if err != nil {
    fmt.Println(err)
    return
}
fmt.Println("Replying to ", m.Reply)
nc.Publish(m.Reply, data)

}

Ok, now let’s get to the second service. Our time provider service, first the same basic structure:

package main

import (
    "github.com/nats-io/nats"
    "fmt"
    "github.com/cube2222/Blog/NATS/FrontendBackend"
    "github.com/golang/protobuf/proto"
    "os"
    "sync"
    "time"
)

// We use globals because it's a small application demonstrating NATS.

var nc *nats.Conn

func main() {

    if len(os.Args) != 2 {
        fmt.Println("Wrong number of arguments. Need NATS server address.")
        return
    }
    var err error

    nc, err = nats.Connect(os.Args[1])
    if err != nil {
        fmt.Println(err)
    }

    nc.QueueSubscribe("TimeTeller", "TimeTellers", replyWithTime)
    select {} // Block forever
}

This time we’re not getting any data from the caller, so we just marshal our time into this proto structure:

syntax = "proto3";
package Transport;

message Time {
        string time = 1;
}

and send it back:

func replyWithTime(m *nats.Msg) {
    curTime := Transport.Time{time.Now().Format(time.RFC3339)}

    data, err := proto.Marshal(&curTime)
    if err != nil {
        fmt.Println(err)
        return
    }
    fmt.Println("Replying to ", m.Reply)
    nc.Publish(m.Reply, data)

}

We can now get to our frontend, which will use both those services. First the standard basic structure:

package main

import (
    "net/http"
    "github.com/gorilla/mux"
    "github.com/cube2222/Blog/NATS/FrontendBackend"
    "github.com/golang/protobuf/proto"
    "fmt"
    "github.com/nats-io/nats"
    "time"
    "os"
    "sync"
)

var nc *nats.Conn

func main() {
    if len(os.Args) != 2 {
        fmt.Println("Wrong number of arguments. Need NATS server address.")
        return
    }
    var err error

    nc, err = nats.Connect(os.Args[1])
    if err != nil {
        fmt.Println(err)
    }

    m := mux.NewRouter()
    m.HandleFunc("/{id}", handleUserWithTime)

    http.ListenAndServe(":3000", m)
}

That’s a pretty standard web server, now to the interesting bits, the handleUserWithTime function, which will respond with the user name and time:

func handleUserWithTime(w http.ResponseWriter, r *http.Request) {
    vars := mux.Vars(r)
    myUser := Transport.User{Id: vars["id"]}
    curTime := Transport.Time{}
    wg := sync.WaitGroup{}
    wg.Add(2)
}

We’ve parsed the request arguments and started a WaitGroup with the value two, as we will do one asynchronous request for each of our services. First we’ll marshal the user struct:

go func() {
    data, err := proto.Marshal(&myUser)
    if err != nil || len(myUser.Id) == 0 {
        fmt.Println(err)
        w.WriteHeader(500)
        fmt.Println("Problem with parsing the user Id.")
        return
    }

and, then we make a request. Sending the user data, and waiting at most 100 ms for the response:

fmt.Println("Problem with parsing the user Id.")
return
}

msg, err := nc.Request("UserNameById", data, 100 * time.Millisecond)

now we can check if any error happend, or the response is empty and finish this thread:

msg, err := nc.Request("UserNameById", data, 100 * time.Millisecond)
if err == nil && msg != nil {
    myUserWithName := Transport.User{}
    err := proto.Unmarshal(msg.Data, &myUserWithName)
    if err == nil {
        myUser = myUserWithName
    }
}
wg.Done()
}()

Next we’ll do the request to the Time Tellers.
We again make a request, but its body is nil, as we don’t need to pass any data:

go func() {
    msg, err := nc.Request("TimeTeller", nil, 100*time.Millisecond)
    if err == nil && msg != nil {
        receivedTime := Transport.Time{}
        err := proto.Unmarshal(msg.Data, &receivedTime)
        if err == nil {
            curTime = receivedTime
        }
    }
    wg.Done()
}()

After both requests finished (or failed) we can just respond to the user:

wg.Wait()

fmt.Fprintln(w, "Hello ", myUser.Name, " with id ", myUser.Id, ", the time is ", curTime.Time, ".")
}

Now if you actually test it, you’ll notice that if one of the provider services isn’t active, the frontend will respond anyways, putting a zero’ed value in place of the non-available resource. You could also make a template that shows an error in that place.

Ok, that was already an interesting architecture. Now we can implement…

The Master-Slave pattern

This is such a popular pattern, especially in Go, that we really should know how to implement it. The workers will do simple operations on a text file (count the usage amounts of each word in a comma-separated list).

Now you could think that the Master, should send the files to the Workers over NATS. Wrong. This would lead to a huge slowdown of NATS (at least for bigger files). That’s why the Master will send the files to a file server over a REST API, and the Workers will get it from there. We’ll also learn how to do service discovery over NATS.

First, the File Server. I won’t really go through the file handling part, as it’s a simple get/post API.I will however, go over the service discovery part.

package main

import (
    "net/http"
    "github.com/gorilla/mux"
    "os"
    "io"
    "fmt"
    "github.com/nats-io/nats"
    "github.com/cube2222/Blog/NATS/MasterWorker"
    "github.com/golang/protobuf/proto"
)

func main() {

    if len(os.Args) != 2 {
        fmt.Println("Wrong number of arguments. Need NATS server address.")
        return
    }

    m := mux.NewRouter()

    m.HandleFunc("/{name}", func(w http.ResponseWriter, r *http.Request) {
        vars := mux.Vars(r)
        file, err := os.Open("/tmp/" + vars["name"])
        defer file.Close()
        if err != nil {
            w.WriteHeader(404)
        }
        if file != nil {
            _, err := io.Copy(w, file)
            if err != nil {
                w.WriteHeader(500)
            }
        }
    }).Methods("GET")

    m.HandleFunc("/{name}", func(w http.ResponseWriter, r *http.Request) {
        vars := mux.Vars(r)
        file, err := os.Create("/tmp/" + vars["name"])
        defer file.Close()
        if err != nil {
            w.WriteHeader(500)
        }
        if file != nil {
            _, err := io.Copy(file, r.Body)
            if err != nil {
                w.WriteHeader(500)
            }
        }
    }).Methods("POST")

    RunServiceDiscoverable()

    http.ListenAndServe(":3000", m)
}

Now, what does the RunServiceDiscoverable function do? It connects to the NATS server and responds with its own http address to incoming requests.

func RunServiceDiscoverable() {
    nc, err := nats.Connect(os.Args[1])
    if err != nil {
        fmt.Println("Can't connect to NATS. Service is not discoverable.")
    }
    nc.Subscribe("Discovery.FileServer", func(m *nats.Msg) {
        serviceAddressTransport := Transport.DiscoverableServiceTransport{"http://localhost:3000"}
        data, err := proto.Marshal(&serviceAddressTransport)
        if err == nil {
            nc.Publish(m.Reply, data)
        }
    })
}

The proto file looks like this:

syntax = "proto3";
package Transport;

message DiscoverableServiceTransport {
        string Address = 1;
}

We can now go on with the Master.

The protofile for the Task structure is:

syntax = "proto3";
package Transport;

message Task {
        string uuid = 1;
        string finisheduuid = 2;
        int32 state = 3; // 0 - not started, 1 - in progress, 2 - finished
        int32 id = 4;
}

Our Master will hold a list of tasks with the respecting UUID (at the same time the name of the file), id (the position in the master Tasks slice), and a pointer which holds the position of the last not finished Task, which will get updated on new Task retrieval. It’s pretty similar to the Task storage in my Microservice Architecture series

I’m using github.com/satori/go.uuid for UUID generation.

First, as usual, the basic structure:

package main

import (
    "github.com/satori/go.uuid"
    "github.com/cube2222/Blog/NATS/MasterWorker"
    "os"
    "fmt"
    "github.com/nats-io/nats"
    "github.com/golang/protobuf/proto"
    "time"
    "bytes"
    "net/http"
    "sync"
)

var Tasks []Transport.Task
var TaskMutex sync.Mutex
var oldestFinishedTaskPointer int
var nc *nats.Conn


func main() {
    if len(os.Args) != 2 {
        fmt.Println("Wrong number of arguments. Need NATS server address.")
        return
    }

    var err error

    nc, err = nats.Connect(os.Args[1])
    if err != nil {
        fmt.Println(err)
    }

    Tasks = make([]Transport.Task, 0, 20)
    TaskMutex = sync.Mutex{}
    oldestFinishedTaskPointer = 0

    initTestTasks()

    wg := sync.WaitGroup{}

    nc.Subscribe("Work.TaskToDo", func (m *nats.Msg) {
    })

    nc.Subscribe("Work.TaskFinished", func (m *nats.Msg) {
    })

    select {} // Block forever
}

Ok, we’ve also already set up the Subscriptions

How does the initTestTasks function work? It’s interesting because it gets the file server address over NATS.

So, we want to create 20 test Tasks, so we run the loop 20 times:

func initTestTasks() {
    for i := 0; i < 20; i++ {
    }
}

We create a new Task and ask the File Server for its address:

for i := 0; i < 20; i++ {
    newTask := Transport.Task{Uuid: uuid.NewV4().String(), State: 0}
    fileServerAddressTransport := Transport.DiscoverableServiceTransport{}
    msg, err := nc.Request("Discovery.FileServer", nil, 1000 * time.Millisecond)
    if err == nil && msg != nil {
        err := proto.Unmarshal(msg.Data, &fileServerAddressTransport)
        if err != nil {
            continue
        }
    }
    if err != nil {
        continue
    }

    fileServerAddress := fileServerAddressTransport.Address
}

Next we finally make the post Request to the file server and add the Task to our Tasks list:

        fileServerAddress := fileServerAddressTransport.Address
        data := make([]byte, 0, 1024)
        buf := bytes.NewBuffer(data)
        fmt.Fprint(buf, "get,my,data,my,get,get,have")
        r, err := http.Post(fileServerAddress + "/" + newTask.Uuid, "", buf)
        if err != nil || r.StatusCode != http.StatusOK {
            continue
        }

        newTask.Id = int32(len(Tasks))
        Tasks = append(Tasks, newTask)
    }

How do we dispatch new Tasks to do? Simply like this:

nc.Subscribe("Work.TaskToDo", func (m *nats.Msg) {
    myTaskPointer, ok := getNextTask()
    if ok {
        data, err := proto.Marshal(myTaskPointer)
        if err == nil {
            nc.Publish(m.Reply, data)
        }
    }
})

How do we get the next Task? We just loop over the Task to find one that is not started. If tasks above our pointer are all finished, then we also move up the pointer. Remember the mutex as this function may be run in parallel:

func getNextTask() (*Transport.Task, bool) {
    TaskMutex.Lock()
    defer TaskMutex.Unlock()
    for i := oldestFinishedTaskPointer; i < len(Tasks); i++ {
        if i == oldestFinishedTaskPointer && Tasks[i].State == 2 {
            oldestFinishedTaskPointer++
        } else {
            if Tasks[i].State == 0 {
                Tasks[i].State = 1
                go resetTaskIfNotFinished(i)
                return &Tasks[i], true
            }
        }
    }
    return nil, false
}

We also called the resetTaskIfNotFinished function. It will reset the Task state if it’s still in progress after 2 minutes:

func resetTaskIfNotFinished(i int) {
    time.Sleep(2 * time.Minute)
    TaskMutex.Lock()
    if Tasks[i].State != 2 {
        Tasks[i].State = 0
    }
}

The TaskFinished subscription handler is much simpler, it just sets the Task to finished, and the UUID accordingly to the received protobuffer:

nc.Subscribe("Work.TaskFinished", func (m *nats.Msg) {
    myTask := Transport.Task{}
    err := proto.Unmarshal(m.Data, &myTask)
    if err == nil {
        TaskMutex.Lock()
        Tasks[myTask.Id].State = 2
        Tasks[myTask.Id].Finisheduuid = myTask.Finisheduuid
        TaskMutex.Unlock()
    }
})

That’s all in regards to the Master! We can now move on writing the Worker.

The basic structure:

package main

import (
    "os"
    "fmt"
    "github.com/nats-io/nats"
    "time"
    "github.com/cube2222/Blog/NATS/MasterWorker"
    "github.com/golang/protobuf/proto"
    "net/http"
    "bytes"
    "io/ioutil"
    "sort"
    "strings"
    "github.com/satori/go.uuid"
    "sync"
)

var nc *nats.Conn

func main() {
    if len(os.Args) != 2 {
        fmt.Println("Wrong number of arguments. Need NATS server address.")
        return
    }

    var err error

    nc, err = nats.Connect(os.Args[1])
    if err != nil {
        fmt.Println(err)
    }

    for i := 0; i < 8; i++ {
        go doWork()
    }

    select {} // Block forever
}

Now the main function doing something here is the doWork function. I’ll post it all at once with comments everywhere, as it’s a very long function and this will be the most convenient way to read it:

func doWork() {
    for {
        // We ask for a Task with a 1 second Timeout
        msg, err := nc.Request("Work.TaskToDo", nil, 1 * time.Second)
        if err != nil {
            fmt.Println("Something went wrong. Waiting 2 seconds before retrying:", err)
            continue
        }

        // We unmarshal the Task
        curTask := Transport.Task{}
        err = proto.Unmarshal(msg.Data, &curTask)
        if err != nil {
            fmt.Println("Something went wrong. Waiting 2 seconds before retrying:", err)
            continue
        }

        // We get the FileServer address
        msg, err = nc.Request("Discovery.FileServer", nil, 1000 * time.Millisecond)
        if err != nil {
            fmt.Println("Something went wrong. Waiting 2 seconds before retrying:", err)
            continue
        }

        fileServerAddressTransport := Transport.DiscoverableServiceTransport{}
        err = proto.Unmarshal(msg.Data, &fileServerAddressTransport)
        if err != nil {
            fmt.Println("Something went wrong. Waiting 2 seconds before retrying:", err)
            continue
        }

        // We get the file
        fileServerAddress := fileServerAddressTransport.Address
        r, err := http.Get(fileServerAddress + "/" + curTask.Uuid)
        if err != nil {
            fmt.Println("Something went wrong. Waiting 2 seconds before retrying:", err)
            continue
        }

        data, err := ioutil.ReadAll(r.Body)
        if err != nil {
            fmt.Println("Something went wrong. Waiting 2 seconds before retrying:", err)
            continue
        }

        // We split and count the words
        words := strings.Split(string(data), ",")
        sort.Strings(words)
        wordCounts := make(map[string]int)
        for i := 0; i < len(words); i++{
            wordCounts[words[i]] = wordCounts[words[i]] + 1
        }

        resultData := make([]byte, 0, 1024)
        buf := bytes.NewBuffer(resultData)

        // We print the results to a buffer
        for key, value := range wordCounts {
            fmt.Fprintln(buf, key, ":", value)
        }

        // We generate a new UUID for the finished file
        curTask.Finisheduuid = uuid.NewV4().String()
        r, err = http.Post(fileServerAddress + "/" + curTask.Finisheduuid, "", buf)
        if err != nil || r.StatusCode != http.StatusOK {
            fmt.Println("Something went wrong. Waiting 2 seconds before retrying:", err, ":", r.StatusCode)
            continue
        }

        // We marshal the current Task into a protobuffer
        data, err = proto.Marshal(&curTask)
        if err != nil {
            fmt.Println("Something went wrong. Waiting 2 seconds before retrying:", err)
            continue
        }

        // We notify the Master about finishing the Task
        nc.Publish("Work.TaskFinished", data)
    }
}

Awesome, our Master-Slave setup is ready, you can test it if you’d like. After you do, we can now check out the last architecture.

The Events pattern

Imagine you have servers which keep connections to clients over websockets. You want these clients to get live news updates. With this pattern you can. We’ll also learn about a few convenient NATS client abstractions. Like using a encoded connection, or using channels for sending/receiving.

The basic architecture as usual:

package main

import (
    "os"
    "fmt"
    "github.com/nats-io/nats"
    natsp "github.com/nats-io/nats/encoders/protobuf"
    "github.com/cube2222/Blog/NATS/EventSubs"
    "time"
)

func main() {
    if len(os.Args) != 2 {
        fmt.Println("Wrong number of arguments. Need NATS server address.")
        return
    }

    nc, err := nats.Connect(os.Args[1])
    if err != nil {
        fmt.Println(err)
    }
    ec, err := nats.NewEncodedConn(nc, natsp.PROTOBUF_ENCODER)
    defer ec.Close()
}

Wait… What’s that at the end!? It’s an encoded connection! It will automatically encode our structs into raw data. We’ll use the protobuf one, but there are a default one, a gob one and a json one too.

Here’s the protofile we’ll use:

syntax = "proto3";
package Transport;

message TextMessage {
        int32 id = 1;
        string body = 2;
}

Ok, how can we just publish simple event-structs? Totally intuitive, like that:

defer ec.Close()

for i := 0; i < 5; i++ {
    myMessage := Transport.TextMessage{Id: int32(i), Body: "Hello over standard!"}

    err := ec.Publish("Messaging.Text.Standard", &myMessage)
    if err != nil {
        fmt.Println(err)
    }
}

It’s a little bit counter intuitive with Requests. As the signature differs, it follows like this:

err := ec.Request(topic, *body, *response, timeout)

So our request sending part will look like this:

for i := 5; i < 10; i++ {
    myMessage := Transport.TextMessage{Id: int32(i), Body: "Hello, please respond!"}

    res := Transport.TextMessage{}
    err := ec.Request("Messaging.Text.Respond", &myMessage, &res, 200 * time.Millisecond)
    if err != nil {
        fmt.Println(err)
    }

    fmt.Println(res.Body, " with id ", res.Id)

}

The last thing we can do is sending them via Channels, which is relatively the simplest:

sendChannel := make(chan *Transport.TextMessage)
ec.BindSendChan("Messaging.Text.Channel", sendChannel)
for i := 10; i < 15; i++ {
    myMessage := Transport.TextMessage{Id: int32(i), Body: "Hello over channel!"}

    sendChannel <- &myMessage
}

Now we can write the receiving end. First the same structure as before:

package main

import (
    "github.com/nats-io/nats"
    natsp "github.com/nats-io/nats/encoders/protobuf"
    "os"
    "fmt"
    "github.com/cube2222/Blog/NATS/EventSubs"
)

func main() {
    if len(os.Args) != 2 {
        fmt.Println("Wrong number of arguments. Need NATS server address.")
        return
    }

    nc, err := nats.Connect(os.Args[1])
    if err != nil {
        fmt.Println(err)
    }
    ec, err := nats.NewEncodedConn(nc, natsp.PROTOBUF_ENCODER)
    defer ec.Close()
}

Ok, first the standard receive which is totally natural:

defer ec.Close()

ec.Subscribe("Messaging.Text.Standard", func(m *Transport.TextMessage) {
    fmt.Println("Got standard message: \"", m.Body, "\" with the Id ", m.Id, ".")
})

Now, the responding, which has a little bit changed syntax again. As the handler function is:

func (subject, reply string, m *Transport.TextMessage)

So the responding looks like this:

ec.Subscribe("Messaging.Text.Respond", func(subject, reply string, m *Transport.TextMessage) {
    fmt.Println("Got ask for response message: \"", m.Body, "\" with the Id ", m.Id, ".")

    newMessage := Transport.TextMessage{Id: m.Id, Body: "Responding!"}
    ec.Publish(reply, &newMessage)
})

And finally using channels, which doesn’t differ nearly at all in comparison to the sending side:

receiveChannel := make(chan *Transport.TextMessage)
ec.BindRecvChan("Messaging.Text.Channel", receiveChannel)

for m := range receiveChannel {
    fmt.Println("Got channel'ed message: \"", m.Body, "\" with the Id ", m.Id, ".")
}

Ok, that’s all in the topic of NATS. I hope you liked it and discovered something new! Please comment if you have any opinions, or don’t like something, or just want me to write about something.

Now go and build something great!

Web app using Microservices in Go: Part 4 – Worker and Frontend

Previous part

Introduction

In this part we will finally finish writing our application. We will implement the last two services:
1. The Worker
2. The Frontend

The Worker

The worker will communicate with the Master to get new Tasks. When it gets a Task it will get the corresponding data from the storage and will start working on the task. When it finishes it will send the finished data to the storage service, and if that succeeds it will register the Task as finished to the Master.

That means that you can easily scale the workforce, by turning on additional machines, with the worker service on them. Easy scaling is good!

Implementation

As usual we will start with a basic structure which is similar to the structure of our previous services. Although there is one big difference. There won’t be any API here as the worker will be a client. You could, if you wanted, add an API for debugging purposes. Things like getting the processor usage. But this can also be implemented using 3rd party health checking services.

So here’s our basic structure:

package main

import (
    "os"
    "fmt"
    "net/http"
    "io/ioutil"
    "encoding/json"
    "time"
    "strconv"
    "image"
    "image/png"
    "image/color"
    "bytes"
    "sync"
)

type Task struct {
    Id int `json:"id"`
    State int `json:"state"`
}

var masterLocation string
var storageLocation string
var keyValueStoreAddress string

func main() {
    if len(os.Args) < 3 {
        fmt.Println("Error: Too few arguments.")
        return
    }
    keyValueStoreAddress = os.Args[1]

    response, err := http.Get("http://" + keyValueStoreAddress + "/get?key=masterAddress")
    if response.StatusCode != http.StatusOK {
        fmt.Println("Error: can't get master address.")
        fmt.Println(response.Body)
        return
    }
    data, err := ioutil.ReadAll(response.Body)
    if err != nil {
        fmt.Println(err)
        return
    }
    masterLocation = string(data)
    if len(masterLocation) == 0 {
        fmt.Println("Error: can't get master address. Length is zero.")
        return
    }

    response, err = http.Get("http://" + keyValueStoreAddress + "/get?key=storageAddress")
    if response.StatusCode != http.StatusOK {
        fmt.Println("Error: can't get storage address.")
        fmt.Println(response.Body)
        return
    }
    data, err = ioutil.ReadAll(response.Body)
    if err != nil {
        fmt.Println(err)
        return
    }
    storageLocation = string(data)
    if len(storageLocation) == 0 {
        fmt.Println("Error: can't get storage address. Length is zero.")
        return
    }
}

func getNewTask(masterAddress string) (Task, error) {
}
func getImageFromStorage(storageAddress string, myTask Task) (image.Image, error) {
}
func doWorkOnImage(myImage image.Image) image.Image {
}
func sendImageToStorage(storageAddress string, myTask Task, myImage image.Image) error {
}
func registerFinishedTask(masterAddress string, myTask Task) error {
}

It may seem to be much but there is nothing new actually in the most dense parts. We define the Task structure first and declare the needed addresses/locations.

Later we check if there are enough arguments passed to the application, and, as we did in the previous parts, we get the addresses of the Master and the Storage. That’s basically all.

For the worker we will want a command line parameter to set the number of concurrent threads. That’s why we check if there are 3 Arguments.

Now let’s parse the thread count:

storageLocation = string(data)
if len(storageLocation) == 0 {
    fmt.Println("Error: can't get storage address. Length is zero.")
    return
}

threadCount, err := strconv.Atoi(os.Args[2])
if err != nil {
    fmt.Println("Error: Couldn't parse thread count.")
    return
}

We use the atoi function to parse the string argument to an int.

And now we come to the main for loop which runs on each thread:

myWG := sync.WaitGroup{}
myWG.Add(threadCount)
for i := 0; i < threadCount; i++ {
    go func() {
        for {
            //Do work...
        }
    }()
}
myWG.Wait()

Ok, what are we doing there? We create a waitGroup. The main function has to be waiting for the goroutines and not just finish execution, that’s why we create a waitGroup and add the thread count. You could add a functionality to break the endless for loop and after that use the Done() function on the waitgroup. We won’t be adding this as we just want endless for work loops.

Now we will write down the execution process for each Task.
First we get a new Task:

for {
    myTask, err := getNewTask(masterLocation)
    if err != nil {
        fmt.Println(err)
        fmt.Println("Waiting 2 second timeout...")
        time.Sleep(time.Second * 2)
        continue
    }

If we error, then we wait 2 seconds, so it doesn’t make a lot of errored requests to the master at once. As this could flood the other services.

If we successfully get the Task, then we get the image to work on from the storage:

myTask, err := getNewTask(masterLocation)
if err != nil {
    fmt.Println(err)
    fmt.Println("Waiting 2 second timeout...")
    time.Sleep(time.Second * 2)
    continue
}

myImage, err := getImageFromStorage(storageLocation, myTask)
if err != nil {
    fmt.Println(err)
    fmt.Println("Waiting 2 second timeout...")
    time.Sleep(time.Second * 2)
    continue
}

Ok, now it’s time to do something with the image!!!

myImage, err := getImageFromStorage(storageLocation, myTask)
if err != nil {
    fmt.Println(err)
    fmt.Println("Waiting 2 second timeout...")
    time.Sleep(time.Second * 2)
    continue
}

myImage = doWorkOnImage(myImage)

We now of course have to save the image back to the storage:

myImage = doWorkOnImage(myImage)

err = sendImageToStorage(storageLocation, myTask, myImage)
if err != nil {
    fmt.Println(err)
    fmt.Println("Waiting 2 second timeout...")
    time.Sleep(time.Second * 2)
    continue
}

And finally if that succeeds we register our successfully finished Task!

        err = sendImageToStorage(storageLocation, myTask, myImage)
        if err != nil {
            fmt.Println(err)
            fmt.Println("Waiting 2 second timeout...")
            time.Sleep(time.Second * 2)
            continue
        }

        err = registerFinishedTask(masterLocation, myTask)
        if err != nil {
            fmt.Println(err)
            fmt.Println("Waiting 2 second timeout...")
            time.Sleep(time.Second * 2)
            continue
        }
    }
}()

Ok, so now we can continue with implementing these functions. Let’s first implement the getNewTask function:

func getNewTask(masterAddress string) (Task, error) {
    response, err := http.Post("http://" + masterAddress + "/getNewTask", "text/plain", nil)
    if err != nil || response.StatusCode != http.StatusOK {
        return Task{-1, -1}, err
    }
    data, err := ioutil.ReadAll(response.Body)
    if err != nil {
        return Task{-1, -1}, err
    }

    myTask := Task{}
    err = json.Unmarshal(data, &myTask)
    if err != nil {
        return Task{-1, -1}, err
    }

    return myTask, nil
}

We make the request to the master and check if it was successful. We read the response body to memory and finally Unmarshal the response body to our Task structure. Finally we return it.

Now we can implement the function to get the image from storage!

func getImageFromStorage(storageAddress string, myTask Task) (image.Image, error) {
    response, err := http.Get("http://" + storageAddress + "/getImage?state=working&id=" + strconv.Itoa(myTask.Id))
    if err != nil || response.StatusCode != http.StatusOK {
        return nil, err
    }

    myImage, err := png.Decode(response.Body)
    if err != nil {
        return nil, err
    }

    return myImage, nil
}

We get the response whose body is the raw image, so we just Decode it and return it if we succeed.

Now that we have our image we can finally do some work on it:

func doWorkOnImage(myImage image.Image) image.Image {
    myCanvas := image.NewRGBA(myImage.Bounds())

    for i := 0; i < myCanvas.Rect.Max.X; i++ {
        for j := 0; j < myCanvas.Rect.Max.Y; j++ {
            r, g, b, _ := myImage.At(i, j).RGBA()
            myColor := new(color.RGBA)
            myColor.R = uint8(g)
            myColor.G = uint8(r)
            myColor.B = uint8(b)
            myColor.A = uint8(255)
            myCanvas.Set(i, j, myColor)
        }
    }

    return myCanvas.SubImage(myImage.Bounds())
}

The work is largely irrelevant, but I’ll explain it anyways. First we create a RGBA. That’s something like a canvas for drawing, and we create it with the size of our image. Later we draw on the canvas swapping the red with the green channel. Later we use the RGBA to return a new modified image, created from our canvas with the size of our original image.

After working on the image we have to send it back to the storage system. So let’s implement the sendImageToStorage function:

func sendImageToStorage(storageAddress string, myTask Task, myImage image.Image) error {
    data := []byte{}
    buffer := bytes.NewBuffer(data)
    err := png.Encode(buffer, myImage)
    if err != nil {
        return err
    }
    response, err := http.Post("http://" + storageAddress + "/sendImage?state=finished&id=" + strconv.Itoa(myTask.Id), "image/png", buffer)
    if err != nil || response.StatusCode != http.StatusOK {
        return err
    }

    return nil
}

We create a data byte slice, and from that a data buffer which allows us to use it as a readwriter interface. We then use this interface to encode our image to png into, and finally send it using a POST to the server. If everything works out, then we just return.

When we successfully saved the image, we can register to the Master that we finished the Task.

func registerFinishedTask(masterAddress string, myTask Task) error {
    response, err := http.Post("http://" + masterAddress + "/registerTaskFinished?id=" + strconv.Itoa(myTask.Id), "test/plain", nil)
    if err != nil || response.StatusCode != http.StatusOK {
        return err
    }

    return nil
}

Nothing fancy. Just sending a POST request to notify about finishing the task.

Ok, that’s all regarding the worker. You can turn it on and it will wait for Tasks to get. Which means we need to get the Tasks from the user to our service finally. And that leads us to…

The Frontend

This one will show the user the website and also parse the user form so the backend services only get the raw image data. It will be the only interface to our application for the user.

As usual, we’ll begin with the basic structure of the file:

package main

import (
    "net/http"
    "fmt"
    "io/ioutil"
    "os"
    "net/url"
    "io"
)

const indexPage = "<html><head><title>Upload file</title></head><body><form enctype=\"multipart/form-data\" action=\"submitTask\" method=\"post\"> <input type=\"file\" name=\"uploadfile\" /> <input type=\"submit\" value=\"upload\" /> </form> </body> </html>"

var keyValueStoreAddress string
var masterLocation string

func main() {
    if len(os.Args) < 2 {
        fmt.Println("Error: Too few arguments.")
        return
    }
    keyValueStoreAddress = os.Args[1]

    response, err := http.Get("http://" + keyValueStoreAddress + "/get?key=masterAddress")
    if response.StatusCode != http.StatusOK {
        fmt.Println("Error: can't get master address.")
        fmt.Println(response.Body)
        return
    }
    data, err := ioutil.ReadAll(response.Body)
    if err != nil {
        fmt.Println(err)
        return
    }
    masterLocation = string(data)
    if len(masterLocation) == 0 {
        fmt.Println("Error: can't get master address. Length is zero.")
        return
    }

    http.HandleFunc("/", handleIndex)
    http.HandleFunc("/submitTask", handleTask)
    http.HandleFunc("/isReady", handleCheckForReadiness)
    http.HandleFunc("/getImage", serveImage)
    http.ListenAndServe(":80", nil)
}

func handleIndex(w http.ResponseWriter, r *http.Request) {
}

func handleTask(w http.ResponseWriter, r *http.Request) {
}

func handleCheckForReadiness(w http.ResponseWriter, r *http.Request) {
}

func serveImage(w http.ResponseWriter, r *http.Request) {
}

We’ve got the code of our index web page here, and we also have the API declared. After starting the program we check if we have the k/v store address. If we do (we hope so), then we can get the master address from it.

Now we can go on to implementing the functions. We’ll start with the simples. The index handler:

func handleIndex(w http.ResponseWriter, r *http.Request) {
    fmt.Fprint(w, indexPage)
}

After writing this we can go on to writing the more complicated functions. We will start with the handleTask function. This function is responsible for parsing the user form and sending the raw image data to the master.

func handleTask(w http.ResponseWriter, r *http.Request) {
    if r.Method == http.MethodPost {
    err := r.ParseMultipartForm(10000000)
    if err != nil {
        w.WriteHeader(http.StatusBadRequest)
        fmt.Fprint(w, "Wrong input")
        return
    }
    file, _, err := r.FormFile("uploadfile")
    if err != nil {
        w.WriteHeader(http.StatusBadRequest)
        fmt.Fprint(w, "Wrong input")
        return
    }

Ok, what do we have here? We check the method as we always do, and later parse the multipart form. We’ve got a nice lovely magic number there. The number is responsible for setting the max size of the form held in RAM. The rest will be stored in temporary files. We later do the request using the file we got:

    if err != nil {
        w.WriteHeader(http.StatusBadRequest)
        fmt.Fprint(w, "Wrong input")
        return
    }

    response, err := http.Post("http://" + masterLocation + "/new", "image", file)
    if err != nil || response.StatusCode != http.StatusOK {
        w.WriteHeader(http.StatusBadRequest)
        fmt.Fprint(w, "Error:", err)
        return
    }

    data, err := ioutil.ReadAll(response.Body)
    if err != nil {
        w.WriteHeader(http.StatusBadRequest)
        fmt.Fprint(w, "Error:", err)
        return
    }

    fmt.Fprint(w, string(data))
} else {
    w.WriteHeader(http.StatusBadRequest)
    fmt.Fprint(w, "Error: Only POST accepted")
}

We send the user back the new Task id we got back from the master.

Now we can implement the function which will handle the request to check if the Task is finished and ready:

func handleCheckForReadiness(w http.ResponseWriter, r *http.Request) {
    if r.Method == http.MethodGet {
        values, err := url.ParseQuery(r.URL.RawQuery)
        if err != nil {
            fmt.Fprint(w, err)
            return
        }
        if len(values.Get("id")) == 0 {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Wrong input")
            return
        }

        response, err := http.Get("http://" + masterLocation + "/isReady?id=" + values.Get("id") + "&state=finished")
        if err != nil || response.StatusCode != http.StatusOK {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error:", err)
            return
        }

        data, err := ioutil.ReadAll(response.Body)
        if err != nil {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error:", err)
            return
        }

        switch string(data) {
        case "0":
            fmt.Fprint(w, "Your image is not ready yet.")
        case "1":
            fmt.Fprint(w, "Your image is ready.")
        default :
            fmt.Fprint(w, "Internal server error.")
        }
    } else {
        w.WriteHeader(http.StatusBadRequest)
        fmt.Fprint(w, "Error: Only GET accepted")
    }
}

Most of this is the same as the function in the master. We just check if the id is correct and make a request to the master. The interesting part is this:

switch string(data) {
case "0":
    fmt.Fprint(w, "Your image is not ready yet.")
case "1":
    fmt.Fprint(w, "Your image is ready.")
default :
    fmt.Fprint(w, "Internal server error.")
}

We cast the response body to a string, and if it matches 1 or 0 we answer. If it’s something else then we know something went really wrong, and send back an error.

Now we can get to the last function, the function to serve the actual images:

func serveImage(w http.ResponseWriter, r *http.Request) {
    if r.Method == http.MethodGet {
        values, err := url.ParseQuery(r.URL.RawQuery)
        if err != nil {
            fmt.Fprint(w, err)
            return
        }
        if len(values.Get("id")) == 0 {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Wrong input")
            return
        }

        response, err := http.Get("http://" + masterLocation + "/get?id=" + values.Get("id") + "&state=finished")
        if err != nil || response.StatusCode != http.StatusOK {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error:", err)
            return
        }

        _, err = io.Copy(w, response.Body)
        if err != nil {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error:", err)
            return
        }
    } else {
        w.WriteHeader(http.StatusBadRequest)
        fmt.Fprint(w, "Error: Only GET accepted")
    }
}

It just checks the id, sends the request, and copies the response to answer the user.

Conclusion

So now it’s all finished. You can start all applications and they will work. You can contact the frontend and make requests, they will all start working and you will get your modified images. Six microservices working beautifully together.

This may be the end of this series, so have fun extending this system alone.

I may add a finishing part about deployment to container infrastructues, or a another extending series about refactoring the system with 3rd party libraries in the future but I’m not sure.

All in all, good luck!

UPDATE: Also remember, that when running the worker, it’s good to launch it with goroutines in the number of 2-4x your system threads. They have near-0 overhead in switching and this way your not unnecessarily blocking when waiting for http responses.

Web app using Microservices in Go: Part 3 – Storage and Master

Previous part

Introduction

In this part we will implement the next part of the microservices needed for our web app. We will implement the:
* Storage system
* Master

This way we will have the Master API ready when we’ll be writing the slaves/workers and the frontend. And we’ll already have the database, k/v store and storage when writing the master. SO every time we write something we’ll already have all its dependencies.

The storage system

Ok, this one will be pretty easy to write. Just handling files. Let’s build the basic structure, which will include a function to register in our k/v store. For reference how it works check out the previous part. So here’s the basic structure:

package main

import (
    "fmt"
    "net/http"
    "io/ioutil"
    "os"
    "net/url"
    "io"
)

func main() {
    if !registerInKVStore() {
        return
    }
    http.HandleFunc("/sendImage", receiveImage)
    http.HandleFunc("/getImage", serveImage)
    http.ListenAndServe(":3002", nil)
}

func receiveImage(w http.ResponseWriter, r *http.Request) {
}

func serveImage(w http.ResponseWriter, r *http.Request) {
}

func registerInKVStore() bool {
    if len(os.Args) < 3 {
        fmt.Println("Error: Too few arguments.")
        return false
    }
    storageAddress := os.Args[1] // The address of itself
    keyValueStoreAddress := os.Args[2]

    response, err := http.Post("http://" + keyValueStoreAddress + "/set?key=storageAddress&value=" + storageAddress, "", nil)
    if err != nil {
        fmt.Println(err)
        return false
    }
    data, err := ioutil.ReadAll(response.Body)
    if err != nil {
        fmt.Println(err)
        return false
    }
    if response.StatusCode != http.StatusOK {
        fmt.Println("Error: Failure when contacting key-value store: ", string(data))
        return false
    }
    return true
}

So now we’ll have to handle the file serving/uploading. We will use a state url argument to specify if we are using the not yet finished (aka working) directory, or the finished one.

So first let’s write the receiveImage function which is there to get the files from clients:

func receiveImage(w http.ResponseWriter, r *http.Request) {
    if r.Method == http.MethodPost {
        values, err := url.ParseQuery(r.URL.RawQuery)
        if err != nil {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error:", err)
            return
        }
        if len(values.Get("id")) == 0 {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error:","Wrong input id.")
            return
        }
        if values.Get("state") != "working" && values.Get("state") != "finished" {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error:","Wrong input state.")
            return
        }
    } else {
        w.WriteHeader(http.StatusBadRequest)
        fmt.Fprint(w, "Error: Only POST accepted")
    }
}

Here we check if the request method is POST, if there is an id, and if the state is working or finished.

Next we can create the file and put in the image:

if values.Get("state") != "working" && values.Get("state") != "finished" {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error:","Wrong input state.")
            return
        }

        _, err = strconv.Atoi(values.Get("id"))
        if err != nil {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error:","Wrong input id.")
            return
        }

        file, err := os.Create("/tmp/" + values.Get("state") + "/" + values.Get("id") + ".png")
        defer file.Close()
        if err != nil {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error:", err)
            return
        }

        _, err = io.Copy(file, r.Body)
        if err != nil {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error:", err)
            return
        }

        fmt.Fprint(w, "success")

We create a file in the tmp/state directory with the right id. Another thing we do is check if the id really is a valid int. We parse it to an int, to see if it succeeds and if it does then we use it, as a string.

we use the io.Copy function to put all the data from the request to the file. That means that the body of our request should be a raw image.

Next we can write the function to serve images which is pretty similar:

func serveImage(w http.ResponseWriter, r *http.Request) {
    if r.Method == http.MethodGet {
        values, err := url.ParseQuery(r.URL.RawQuery)
        if err != nil {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error:", err)
            return
        }
        if len(values.Get("id")) == 0 {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error:","Wrong input id.")
            return
        }
        if values.Get("state") != "working" && values.Get("state") != "finished" {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error:","Wrong input state.")
            return
        }

        _, err = strconv.Atoi(values.Get("id"))
        if err != nil {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error:","Wrong input id.")
            return
        }

        file, err := os.Open("/tmp/" + values.Get("state") + "/" + values.Get("id") + ".png")
        defer file.Close()
        if err != nil {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error:", err)
            return
        }

        _, err = io.Copy(w, file)
        if err != nil {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error:", err)
            return
        }
    } else {
        w.WriteHeader(http.StatusBadRequest)
        fmt.Fprint(w, "Error: Only GET accepted")
    }
}

Instead of creating the file, we open it. Instead of copying to the file we copy from it. And we check if the method is GET.

That’s it. We’ve got a storage service which saves and servers raw image files. Now we can get to the master!

The master

We now have all the dependencies the master needs. So let’s write it now. Here’s the basic structure:

package main

import (
    "os"
    "fmt"
    "net/http"
    "io/ioutil"
)

type Task struct {
    Id int `json:"id"`
    State int `json:"state"`
}

var databaseLocation string
var storageLocation string

func main() {
    if !registerInKVStore() {
        return
    }

    http.HandleFunc("/new", newImage)
    http.HandleFunc("/get", getImage)
    http.HandleFunc("/isReady", isReady)
    http.HandleFunc("/getNewTask", getNewTask)
    http.HandleFunc("/registerTaskFinished", registerTaskFinished)
    http.ListenAndServe(":3003", nil)
}

func newImage(w http.ResponseWriter, r *http.Request) {
}

func getImage(w http.ResponseWriter, r *http.Request) {
}

func isReady(w http.ResponseWriter, r *http.Request) {
}

func getNewTask(w http.ResponseWriter, r *http.Request) {
}

func registerTaskFinished(w http.ResponseWriter, r *http.Request) {
}

func registerInKVStore() bool {
    if len(os.Args) < 3 {
        fmt.Println("Error: Too few arguments.")
        return false
    }
    masterAddress := os.Args[1] // The address of itself
    keyValueStoreAddress := os.Args[2]

    response, err := http.Post("http://" + keyValueStoreAddress + "/set?key=masterAddress&value=" + masterAddress, "", nil)
    if err != nil {
        fmt.Println(err)
        return false
    }
    data, err := ioutil.ReadAll(response.Body)
    if err != nil {
        fmt.Println(err)
        return false
    }
    if response.StatusCode != http.StatusOK {
        fmt.Println("Error: Failure when contacting key-value store: ", string(data))
        return false
    }
    return true
}

It’s the structure of the API and the mechanics to register in the k/v store.

We also need to get the storage and database locations in the main function:

if !registerInKVStore() {
        return
    }
    keyValueStoreAddress = os.Args[2]

    response, err := http.Get("http://" + keyValueStoreAddress + "/get?key=databaseAddress")
    if response.StatusCode != http.StatusOK {
        fmt.Println("Error: can't get database address.")
        fmt.Println(response.Body)
        return
    }
    data, err := ioutil.ReadAll(response.Body)
    if err != nil {
        fmt.Println(err)
        return
    }
    databaseLocation = string(data)

    response, err = http.Get("http://" + keyValueStoreAddress + "/get?key=storageAddress")
    if response.StatusCode != http.StatusOK {
        fmt.Println("Error: can't get storage address.")
        fmt.Println(response.Body)
        return
    }
    data, err = ioutil.ReadAll(response.Body)
    if err != nil {
        fmt.Println(err)
        return
    }
    storageLocation = string(data)

Now we can start implementing all the functionality!

Let’s start with the newImage function as it contains a good bit of code and mechanics which will be again used in the other funtions.
Here’s the beginning:

func newImage(w http.ResponseWriter, r *http.Request) {
    if r.Method == http.MethodPost {
        response, err := http.Post("http://" + databaseLocation + "/newTask", "text/plain", nil)
        if err != nil {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error:", err)
            return
        }
        id, err := ioutil.ReadAll(response.Body)
        if err != nil {
            fmt.Println(err)
            return
        }
    } else {
        w.WriteHeader(http.StatusBadRequest)
        fmt.Fprint(w, "Error: Only POST accepted")
    }
}

As usual we check if the method is right. Next we register a new Task in the database and get and Id.

We now use this to send the image to the storage:

id, err := ioutil.ReadAll(response.Body)
if err != nil {
    fmt.Println(err)
    return
}

_, err = http.Post("http://" + storageLocation + "/sendImage?id=" + string(id) + "&state=working", "image", r.Body)
if err != nil {
    w.WriteHeader(http.StatusBadRequest)
    fmt.Fprint(w, "Error:", err)
    return
}
fmt.Fprint(w, string(id))

That’s it. The new task will be created, the storage will get a file into the working directory with the name of the file being the id, and the client gets back the id. The important thing here is that we need the raw image in the request. The user form has to be parsed in the frontend service.

Now we can create the function which just checks if a Task is ready:

func isReady(w http.ResponseWriter, r *http.Request) {
    if r.Method == http.MethodGet {
        values, err := url.ParseQuery(r.URL.RawQuery)
        if err != nil {
            fmt.Fprint(w, err)
            return
        }
        if len(values.Get("id")) == 0 {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Wrong input")
            return
        }
    } else {
        w.WriteHeader(http.StatusBadRequest)
        fmt.Fprint(w, "Error: Only GET accepted")
    }
}

We first have to verify all the parameters and the request method. Next we can ask the database for the Task requested:

if len(values.Get("id")) == 0 {
    w.WriteHeader(http.StatusBadRequest)
    fmt.Fprint(w, "Wrong input")
    return
}

response, err := http.Get("http://" + databaseLocation + "/getById?id=" + values.Get("id"))
if err != nil {
    w.WriteHeader(http.StatusBadRequest)
    fmt.Fprint(w, "Error:", err)
    return
}
data, err := ioutil.ReadAll(response.Body)
if err != nil {
    fmt.Println(err)
    return
}

We also read the response immediately. Now we can parse the Task and respond to the client:

if err != nil {
    fmt.Println(err)
    return
}

myTask := Task{}
json.Unmarshal(data, &myTask)

if(myTask.State == 2) {
    fmt.Fprint(w, "1")
} else {
    fmt.Fprint(w, "0")
}

So now we can implement the last client facing interface, the getImage function:

if r.Method == http.MethodGet {
        values, err := url.ParseQuery(r.URL.RawQuery)
        if err != nil {
            fmt.Fprint(w, err)
            return
        }
        if len(values.Get("id")) == 0 {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Wrong input")
            return
        }
} else {
    w.WriteHeader(http.StatusBadRequest)
    fmt.Fprint(w, "Error: Only GET accepted")
}

Here we verified the request and now we need to get the image from the storage system, and just copy the response to our client:

if len(values.Get("id")) == 0 {
    w.WriteHeader(http.StatusBadRequest)
    fmt.Fprint(w, "Wrong input")
    return
}

response, err := http.Get("http://" + storageLocation + "/getImage?id=" + values.Get("id") + "&state=finished")
if err != nil {
    w.WriteHeader(http.StatusBadRequest)
    fmt.Fprint(w, "Error:", err)
    return
}

_, err = io.Copy(w, response.Body)
if err != nil {
    w.WriteHeader(http.StatusBadRequest)
    fmt.Fprint(w, "Error:", err)
    return
}

That’s it! The client facing interface is finished!

Implementing the worker facing interface

Now we have to implement the functions to serve the workers.

Both functions will basically be just direct routes to the database and back, so now let’s write ’em too:

func getNewTask(w http.ResponseWriter, r *http.Request) {
    if r.Method == http.MethodPost {
        response, err := http.Post("http://" + databaseLocation + "/getNewTask", "text/plain", nil)
        if err != nil {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error:", err)
            return
        }

        _, err = io.Copy(w, response.Body)
        if err != nil {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error:", err)
            return
        }
    } else {
        w.WriteHeader(http.StatusBadRequest)
        fmt.Fprint(w, "Error: Only POST accepted")
    }
}

func registerTaskFinished(w http.ResponseWriter, r *http.Request) {
    if r.Method == http.MethodPost {
        values, err := url.ParseQuery(r.URL.RawQuery)
        if err != nil {
            fmt.Fprint(w, err)
            return
        }
        if len(values.Get("id")) == 0 {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Wrong input")
            return
        }

        response, err := http.Post("http://" + databaseLocation + "/finishTask?id=" + values.Get("id"), "test/plain", nil)
        if err != nil {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error:", err)
            return
        }

        _, err = io.Copy(w, response.Body)
        if err != nil {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error:", err)
            return
        }
    } else {
        w.WriteHeader(http.StatusBadRequest)
        fmt.Fprint(w, "Error: Only POST accepted")
    }
}

There’s not much to explain. They are both just passing further the request and responding with what they get.

You could think the workers should communicate directly with the database to get new Tasks. And with the current implementation it would work perfectly. However, if we wanted to add some functionality the master wanted to do for each of those requests it would be hard to implement. So this way is very extensible, and that’s nearly always what we want.

Conclusion

Now we have finished the Master and the Storage system. We now have the dependencies to create the workers and frontend which we will implement in the next part. As always I encourage you to comment about your opinion. Have fun extending the system to do what you want to achieve!

Next part