Practical Golang: Getting started with NATS and related patterns

Practical Golang: Getting started with NATS and related patterns

Introduction

Microservices… the never disappearing buzzword of our times. They promise a lot, but can be slow or complicated if not implemented correctly. One of the main challenges when developing and using a microservice-based architecture is getting the communication right. Many will ask, why not REST? As I did at some point. Many will actually use it. But the truth is that it leads to tighter coupling, and is synchronous. Microservice architectures are meant to be asynchronous. Also, REST is blocking, which also isn’t good on many occasions.

What are we meant to use for communication? Usually we use:
– RPC – Remote Procedure Call
– Message BUS/Broker

In this article I’ll write about one specific Message BUS called NATS and using it in Go.

There are also other message BUS’ses/Brokers. Some popular ones are Kafka and RabbitMQ.

Why NATS? It’s simple, and astonishingly fast.

Setting up NATS

To use NATS you can do one of the following things:
1. Use the NATS Docker image
2. Get the binaries
3. Use the public NATS server nats://demo.nats.io:4222
4. Build from source

Also, remember to

go get https://github.com/nats-io/nats

the official Go library.

Getting started

In this article we’ll be using protobuffs a lot. So if you want to know more about them, check out my previous article about protobuffs.

First, let’s write one of the key usages of microservices. A fronted, that lists information from other micrservices, but doesn’t care if one of them is down. It will respond to the user anyways. This makes microservices swappable live, one at a time.

In each of our services we’ll need to connect to NATS:

package main

import (
    "github.com/nats-io/nats"
    "fmt"
)

var nc *nats.Conn

func main() {

    if len(os.Args) != 2 {
        fmt.Println("Wrong number of arguments. Need NATS server address.")
        return
    }
    var err error

    nc, err = nats.Connect(os.Args[1])
    if err != nil {
        fmt.Println(err)
    }
}

Now, let’s write the first provider service. It will receive a User Id, and answer with a user name For which we’ll need a transport structure to send its data over NATS. I wrote this short proto file for that:

syntax = "proto3";
package Transport;

message User {
        string id = 1;
        string name = 2;
}

Now we will create the map containing our user names:

var users map[string]string
var nc *nats.Conn

func main() {

    if len(os.Args) != 2 {
        fmt.Println("Wrong number of arguments. Need NATS server address.")
        return
    }
    var err error

    nc, err = nats.Connect(os.Args[1])
    if err != nil {
        fmt.Println(err)
    }

    users = make(map[string]string)
    users["1"] = "Bob"
    users["2"] = "John"
    users["3"] = "Dan"
    users["4"] = "Kate"
}

and finally the part that’s most interesting to us. Subscribing to the topic:

users["4"] = "Kate"

nc.QueueSubscribe("UserNameById", "userNameByIdProviders", replyWithUserId)

Notice that it’s a QueueSubscribe. Which means that if we start 10 instances of this service in the userNameByIdProviders group , only one will get each message sent over UserNameById. Another thing to note is that this function call is asynchronous, so we need to block somehow. This select {} will provide an endless block:

nc.QueueSubscribe("UserNameById", "userNameByIdProviders", replyWithUserId)
select {}
}

Ok, now to the replyWithUserId function:

func replyWithUserId(m *nats.Msg) {
}

Notice that it takes one argument, a pointer to the message.

We’ll unmarshal the data:

func replyWithUserId(m *nats.Msg) {

    myUser := Transport.User{}
    err := proto.Unmarshal(m.Data, &myUser)
    if err != nil {
        fmt.Println(err)
        return
}

get the name and marshal back:

myUser.Name = users[myUser.Id]
data, err := proto.Marshal(&myUser)
if err != nil {
    fmt.Println(err)
    return
}

And, as this shall be a request we’re handling, we respond to the Reply topic, a topic created by the caller exactly for this purpose:

if err != nil {
    fmt.Println(err)
    return
}
fmt.Println("Replying to ", m.Reply)
nc.Publish(m.Reply, data)

}

Ok, now let’s get to the second service. Our time provider service, first the same basic structure:

package main

import (
    "github.com/nats-io/nats"
    "fmt"
    "github.com/cube2222/Blog/NATS/FrontendBackend"
    "github.com/golang/protobuf/proto"
    "os"
    "sync"
    "time"
)

// We use globals because it's a small application demonstrating NATS.

var nc *nats.Conn

func main() {

    if len(os.Args) != 2 {
        fmt.Println("Wrong number of arguments. Need NATS server address.")
        return
    }
    var err error

    nc, err = nats.Connect(os.Args[1])
    if err != nil {
        fmt.Println(err)
    }

    nc.QueueSubscribe("TimeTeller", "TimeTellers", replyWithTime)
    select {} // Block forever
}

This time we’re not getting any data from the caller, so we just marshal our time into this proto structure:

syntax = "proto3";
package Transport;

message Time {
        string time = 1;
}

and send it back:

func replyWithTime(m *nats.Msg) {
    curTime := Transport.Time{time.Now().Format(time.RFC3339)}

    data, err := proto.Marshal(&curTime)
    if err != nil {
        fmt.Println(err)
        return
    }
    fmt.Println("Replying to ", m.Reply)
    nc.Publish(m.Reply, data)

}

We can now get to our frontend, which will use both those services. First the standard basic structure:

package main

import (
    "net/http"
    "github.com/gorilla/mux"
    "github.com/cube2222/Blog/NATS/FrontendBackend"
    "github.com/golang/protobuf/proto"
    "fmt"
    "github.com/nats-io/nats"
    "time"
    "os"
    "sync"
)

var nc *nats.Conn

func main() {
    if len(os.Args) != 2 {
        fmt.Println("Wrong number of arguments. Need NATS server address.")
        return
    }
    var err error

    nc, err = nats.Connect(os.Args[1])
    if err != nil {
        fmt.Println(err)
    }

    m := mux.NewRouter()
    m.HandleFunc("/{id}", handleUserWithTime)

    http.ListenAndServe(":3000", m)
}

That’s a pretty standard web server, now to the interesting bits, the handleUserWithTime function, which will respond with the user name and time:

func handleUserWithTime(w http.ResponseWriter, r *http.Request) {
    vars := mux.Vars(r)
    myUser := Transport.User{Id: vars["id"]}
    curTime := Transport.Time{}
    wg := sync.WaitGroup{}
    wg.Add(2)
}

We’ve parsed the request arguments and started a WaitGroup with the value two, as we will do one asynchronous request for each of our services. First we’ll marshal the user struct:

go func() {
    data, err := proto.Marshal(&myUser)
    if err != nil || len(myUser.Id) == 0 {
        fmt.Println(err)
        w.WriteHeader(500)
        fmt.Println("Problem with parsing the user Id.")
        return
    }

and, then we make a request. Sending the user data, and waiting at most 100 ms for the response:

fmt.Println("Problem with parsing the user Id.")
return
}

msg, err := nc.Request("UserNameById", data, 100 * time.Millisecond)

now we can check if any error happend, or the response is empty and finish this thread:

msg, err := nc.Request("UserNameById", data, 100 * time.Millisecond)
if err == nil && msg != nil {
    myUserWithName := Transport.User{}
    err := proto.Unmarshal(msg.Data, &myUserWithName)
    if err == nil {
        myUser = myUserWithName
    }
}
wg.Done()
}()

Next we’ll do the request to the Time Tellers.
We again make a request, but its body is nil, as we don’t need to pass any data:

go func() {
    msg, err := nc.Request("TimeTeller", nil, 100*time.Millisecond)
    if err == nil && msg != nil {
        receivedTime := Transport.Time{}
        err := proto.Unmarshal(msg.Data, &receivedTime)
        if err == nil {
            curTime = receivedTime
        }
    }
    wg.Done()
}()

After both requests finished (or failed) we can just respond to the user:

wg.Wait()

fmt.Fprintln(w, "Hello ", myUser.Name, " with id ", myUser.Id, ", the time is ", curTime.Time, ".")
}

Now if you actually test it, you’ll notice that if one of the provider services isn’t active, the frontend will respond anyways, putting a zero’ed value in place of the non-available resource. You could also make a template that shows an error in that place.

Ok, that was already an interesting architecture. Now we can implement…

The Master-Slave pattern

This is such a popular pattern, especially in Go, that we really should know how to implement it. The workers will do simple operations on a text file (count the usage amounts of each word in a comma-separated list).

Now you could think that the Master, should send the files to the Workers over NATS. Wrong. This would lead to a huge slowdown of NATS (at least for bigger files). That’s why the Master will send the files to a file server over a REST API, and the Workers will get it from there. We’ll also learn how to do service discovery over NATS.

First, the File Server. I won’t really go through the file handling part, as it’s a simple get/post API.I will however, go over the service discovery part.

package main

import (
    "net/http"
    "github.com/gorilla/mux"
    "os"
    "io"
    "fmt"
    "github.com/nats-io/nats"
    "github.com/cube2222/Blog/NATS/MasterWorker"
    "github.com/golang/protobuf/proto"
)

func main() {

    if len(os.Args) != 2 {
        fmt.Println("Wrong number of arguments. Need NATS server address.")
        return
    }

    m := mux.NewRouter()

    m.HandleFunc("/{name}", func(w http.ResponseWriter, r *http.Request) {
        vars := mux.Vars(r)
        file, err := os.Open("/tmp/" + vars["name"])
        defer file.Close()
        if err != nil {
            w.WriteHeader(404)
        }
        if file != nil {
            _, err := io.Copy(w, file)
            if err != nil {
                w.WriteHeader(500)
            }
        }
    }).Methods("GET")

    m.HandleFunc("/{name}", func(w http.ResponseWriter, r *http.Request) {
        vars := mux.Vars(r)
        file, err := os.Create("/tmp/" + vars["name"])
        defer file.Close()
        if err != nil {
            w.WriteHeader(500)
        }
        if file != nil {
            _, err := io.Copy(file, r.Body)
            if err != nil {
                w.WriteHeader(500)
            }
        }
    }).Methods("POST")

    RunServiceDiscoverable()

    http.ListenAndServe(":3000", m)
}

Now, what does the RunServiceDiscoverable function do? It connects to the NATS server and responds with its own http address to incoming requests.

func RunServiceDiscoverable() {
    nc, err := nats.Connect(os.Args[1])
    if err != nil {
        fmt.Println("Can't connect to NATS. Service is not discoverable.")
    }
    nc.Subscribe("Discovery.FileServer", func(m *nats.Msg) {
        serviceAddressTransport := Transport.DiscoverableServiceTransport{"http://localhost:3000"}
        data, err := proto.Marshal(&serviceAddressTransport)
        if err == nil {
            nc.Publish(m.Reply, data)
        }
    })
}

The proto file looks like this:

syntax = "proto3";
package Transport;

message DiscoverableServiceTransport {
        string Address = 1;
}

We can now go on with the Master.

The protofile for the Task structure is:

syntax = "proto3";
package Transport;

message Task {
        string uuid = 1;
        string finisheduuid = 2;
        int32 state = 3; // 0 - not started, 1 - in progress, 2 - finished
        int32 id = 4;
}

Our Master will hold a list of tasks with the respecting UUID (at the same time the name of the file), id (the position in the master Tasks slice), and a pointer which holds the position of the last not finished Task, which will get updated on new Task retrieval. It’s pretty similar to the Task storage in my Microservice Architecture series

I’m using github.com/satori/go.uuid for UUID generation.

First, as usual, the basic structure:

package main

import (
    "github.com/satori/go.uuid"
    "github.com/cube2222/Blog/NATS/MasterWorker"
    "os"
    "fmt"
    "github.com/nats-io/nats"
    "github.com/golang/protobuf/proto"
    "time"
    "bytes"
    "net/http"
    "sync"
)

var Tasks []Transport.Task
var TaskMutex sync.Mutex
var oldestFinishedTaskPointer int
var nc *nats.Conn


func main() {
    if len(os.Args) != 2 {
        fmt.Println("Wrong number of arguments. Need NATS server address.")
        return
    }

    var err error

    nc, err = nats.Connect(os.Args[1])
    if err != nil {
        fmt.Println(err)
    }

    Tasks = make([]Transport.Task, 0, 20)
    TaskMutex = sync.Mutex{}
    oldestFinishedTaskPointer = 0

    initTestTasks()

    wg := sync.WaitGroup{}

    nc.Subscribe("Work.TaskToDo", func (m *nats.Msg) {
    })

    nc.Subscribe("Work.TaskFinished", func (m *nats.Msg) {
    })

    select {} // Block forever
}

Ok, we’ve also already set up the Subscriptions

How does the initTestTasks function work? It’s interesting because it gets the file server address over NATS.

So, we want to create 20 test Tasks, so we run the loop 20 times:

func initTestTasks() {
    for i := 0; i < 20; i++ {
    }
}

We create a new Task and ask the File Server for its address:

for i := 0; i < 20; i++ {
    newTask := Transport.Task{Uuid: uuid.NewV4().String(), State: 0}
    fileServerAddressTransport := Transport.DiscoverableServiceTransport{}
    msg, err := nc.Request("Discovery.FileServer", nil, 1000 * time.Millisecond)
    if err == nil && msg != nil {
        err := proto.Unmarshal(msg.Data, &fileServerAddressTransport)
        if err != nil {
            continue
        }
    }
    if err != nil {
        continue
    }

    fileServerAddress := fileServerAddressTransport.Address
}

Next we finally make the post Request to the file server and add the Task to our Tasks list:

        fileServerAddress := fileServerAddressTransport.Address
        data := make([]byte, 0, 1024)
        buf := bytes.NewBuffer(data)
        fmt.Fprint(buf, "get,my,data,my,get,get,have")
        r, err := http.Post(fileServerAddress + "/" + newTask.Uuid, "", buf)
        if err != nil || r.StatusCode != http.StatusOK {
            continue
        }

        newTask.Id = int32(len(Tasks))
        Tasks = append(Tasks, newTask)
    }

How do we dispatch new Tasks to do? Simply like this:

nc.Subscribe("Work.TaskToDo", func (m *nats.Msg) {
    myTaskPointer, ok := getNextTask()
    if ok {
        data, err := proto.Marshal(myTaskPointer)
        if err == nil {
            nc.Publish(m.Reply, data)
        }
    }
})

How do we get the next Task? We just loop over the Task to find one that is not started. If tasks above our pointer are all finished, then we also move up the pointer. Remember the mutex as this function may be run in parallel:

func getNextTask() (*Transport.Task, bool) {
    TaskMutex.Lock()
    defer TaskMutex.Unlock()
    for i := oldestFinishedTaskPointer; i < len(Tasks); i++ {
        if i == oldestFinishedTaskPointer && Tasks[i].State == 2 {
            oldestFinishedTaskPointer++
        } else {
            if Tasks[i].State == 0 {
                Tasks[i].State = 1
                go resetTaskIfNotFinished(i)
                return &Tasks[i], true
            }
        }
    }
    return nil, false
}

We also called the resetTaskIfNotFinished function. It will reset the Task state if it’s still in progress after 2 minutes:

func resetTaskIfNotFinished(i int) {
    time.Sleep(2 * time.Minute)
    TaskMutex.Lock()
    if Tasks[i].State != 2 {
        Tasks[i].State = 0
    }
}

The TaskFinished subscription handler is much simpler, it just sets the Task to finished, and the UUID accordingly to the received protobuffer:

nc.Subscribe("Work.TaskFinished", func (m *nats.Msg) {
    myTask := Transport.Task{}
    err := proto.Unmarshal(m.Data, &myTask)
    if err == nil {
        TaskMutex.Lock()
        Tasks[myTask.Id].State = 2
        Tasks[myTask.Id].Finisheduuid = myTask.Finisheduuid
        TaskMutex.Unlock()
    }
})

That’s all in regards to the Master! We can now move on writing the Worker.

The basic structure:

package main

import (
    "os"
    "fmt"
    "github.com/nats-io/nats"
    "time"
    "github.com/cube2222/Blog/NATS/MasterWorker"
    "github.com/golang/protobuf/proto"
    "net/http"
    "bytes"
    "io/ioutil"
    "sort"
    "strings"
    "github.com/satori/go.uuid"
    "sync"
)

var nc *nats.Conn

func main() {
    if len(os.Args) != 2 {
        fmt.Println("Wrong number of arguments. Need NATS server address.")
        return
    }

    var err error

    nc, err = nats.Connect(os.Args[1])
    if err != nil {
        fmt.Println(err)
    }

    for i := 0; i < 8; i++ {
        go doWork()
    }

    select {} // Block forever
}

Now the main function doing something here is the doWork function. I’ll post it all at once with comments everywhere, as it’s a very long function and this will be the most convenient way to read it:

func doWork() {
    for {
        // We ask for a Task with a 1 second Timeout
        msg, err := nc.Request("Work.TaskToDo", nil, 1 * time.Second)
        if err != nil {
            fmt.Println("Something went wrong. Waiting 2 seconds before retrying:", err)
            continue
        }

        // We unmarshal the Task
        curTask := Transport.Task{}
        err = proto.Unmarshal(msg.Data, &curTask)
        if err != nil {
            fmt.Println("Something went wrong. Waiting 2 seconds before retrying:", err)
            continue
        }

        // We get the FileServer address
        msg, err = nc.Request("Discovery.FileServer", nil, 1000 * time.Millisecond)
        if err != nil {
            fmt.Println("Something went wrong. Waiting 2 seconds before retrying:", err)
            continue
        }

        fileServerAddressTransport := Transport.DiscoverableServiceTransport{}
        err = proto.Unmarshal(msg.Data, &fileServerAddressTransport)
        if err != nil {
            fmt.Println("Something went wrong. Waiting 2 seconds before retrying:", err)
            continue
        }

        // We get the file
        fileServerAddress := fileServerAddressTransport.Address
        r, err := http.Get(fileServerAddress + "/" + curTask.Uuid)
        if err != nil {
            fmt.Println("Something went wrong. Waiting 2 seconds before retrying:", err)
            continue
        }

        data, err := ioutil.ReadAll(r.Body)
        if err != nil {
            fmt.Println("Something went wrong. Waiting 2 seconds before retrying:", err)
            continue
        }

        // We split and count the words
        words := strings.Split(string(data), ",")
        sort.Strings(words)
        wordCounts := make(map[string]int)
        for i := 0; i < len(words); i++{
            wordCounts[words[i]] = wordCounts[words[i]] + 1
        }

        resultData := make([]byte, 0, 1024)
        buf := bytes.NewBuffer(resultData)

        // We print the results to a buffer
        for key, value := range wordCounts {
            fmt.Fprintln(buf, key, ":", value)
        }

        // We generate a new UUID for the finished file
        curTask.Finisheduuid = uuid.NewV4().String()
        r, err = http.Post(fileServerAddress + "/" + curTask.Finisheduuid, "", buf)
        if err != nil || r.StatusCode != http.StatusOK {
            fmt.Println("Something went wrong. Waiting 2 seconds before retrying:", err, ":", r.StatusCode)
            continue
        }

        // We marshal the current Task into a protobuffer
        data, err = proto.Marshal(&curTask)
        if err != nil {
            fmt.Println("Something went wrong. Waiting 2 seconds before retrying:", err)
            continue
        }

        // We notify the Master about finishing the Task
        nc.Publish("Work.TaskFinished", data)
    }
}

Awesome, our Master-Slave setup is ready, you can test it if you’d like. After you do, we can now check out the last architecture.

The Events pattern

Imagine you have servers which keep connections to clients over websockets. You want these clients to get live news updates. With this pattern you can. We’ll also learn about a few convenient NATS client abstractions. Like using a encoded connection, or using channels for sending/receiving.

The basic architecture as usual:

package main

import (
    "os"
    "fmt"
    "github.com/nats-io/nats"
    natsp "github.com/nats-io/nats/encoders/protobuf"
    "github.com/cube2222/Blog/NATS/EventSubs"
    "time"
)

func main() {
    if len(os.Args) != 2 {
        fmt.Println("Wrong number of arguments. Need NATS server address.")
        return
    }

    nc, err := nats.Connect(os.Args[1])
    if err != nil {
        fmt.Println(err)
    }
    ec, err := nats.NewEncodedConn(nc, natsp.PROTOBUF_ENCODER)
    defer ec.Close()
}

Wait… What’s that at the end!? It’s an encoded connection! It will automatically encode our structs into raw data. We’ll use the protobuf one, but there are a default one, a gob one and a json one too.

Here’s the protofile we’ll use:

syntax = "proto3";
package Transport;

message TextMessage {
        int32 id = 1;
        string body = 2;
}

Ok, how can we just publish simple event-structs? Totally intuitive, like that:

defer ec.Close()

for i := 0; i < 5; i++ {
    myMessage := Transport.TextMessage{Id: int32(i), Body: "Hello over standard!"}

    err := ec.Publish("Messaging.Text.Standard", &myMessage)
    if err != nil {
        fmt.Println(err)
    }
}

It’s a little bit counter intuitive with Requests. As the signature differs, it follows like this:

err := ec.Request(topic, *body, *response, timeout)

So our request sending part will look like this:

for i := 5; i < 10; i++ {
    myMessage := Transport.TextMessage{Id: int32(i), Body: "Hello, please respond!"}

    res := Transport.TextMessage{}
    err := ec.Request("Messaging.Text.Respond", &myMessage, &res, 200 * time.Millisecond)
    if err != nil {
        fmt.Println(err)
    }

    fmt.Println(res.Body, " with id ", res.Id)

}

The last thing we can do is sending them via Channels, which is relatively the simplest:

sendChannel := make(chan *Transport.TextMessage)
ec.BindSendChan("Messaging.Text.Channel", sendChannel)
for i := 10; i < 15; i++ {
    myMessage := Transport.TextMessage{Id: int32(i), Body: "Hello over channel!"}

    sendChannel <- &myMessage
}

Now we can write the receiving end. First the same structure as before:

package main

import (
    "github.com/nats-io/nats"
    natsp "github.com/nats-io/nats/encoders/protobuf"
    "os"
    "fmt"
    "github.com/cube2222/Blog/NATS/EventSubs"
)

func main() {
    if len(os.Args) != 2 {
        fmt.Println("Wrong number of arguments. Need NATS server address.")
        return
    }

    nc, err := nats.Connect(os.Args[1])
    if err != nil {
        fmt.Println(err)
    }
    ec, err := nats.NewEncodedConn(nc, natsp.PROTOBUF_ENCODER)
    defer ec.Close()
}

Ok, first the standard receive which is totally natural:

defer ec.Close()

ec.Subscribe("Messaging.Text.Standard", func(m *Transport.TextMessage) {
    fmt.Println("Got standard message: \"", m.Body, "\" with the Id ", m.Id, ".")
})

Now, the responding, which has a little bit changed syntax again. As the handler function is:

func (subject, reply string, m *Transport.TextMessage)

So the responding looks like this:

ec.Subscribe("Messaging.Text.Respond", func(subject, reply string, m *Transport.TextMessage) {
    fmt.Println("Got ask for response message: \"", m.Body, "\" with the Id ", m.Id, ".")

    newMessage := Transport.TextMessage{Id: m.Id, Body: "Responding!"}
    ec.Publish(reply, &newMessage)
})

And finally using channels, which doesn’t differ nearly at all in comparison to the sending side:

receiveChannel := make(chan *Transport.TextMessage)
ec.BindRecvChan("Messaging.Text.Channel", receiveChannel)

for m := range receiveChannel {
    fmt.Println("Got channel'ed message: \"", m.Body, "\" with the Id ", m.Id, ".")
}

Ok, that’s all in the topic of NATS. I hope you liked it and discovered something new! Please comment if you have any opinions, or don’t like something, or just want me to write about something.

Now go and build something great!

Practical Golang: Using Protobuffs

Introduction

Most apps we make need a means of communication. We usually use JSON, or just plain text. JSON has got especially popular because of the rise of Node.js. The truth though, is, that JSON isn’t really a fast format. The marshaller in Go also isn’t that fast. That’s why in this article we’ll learn how to use google protocol buffers. They are in fact very easy to use, and are much faster than JSON.

Regarding the performance gains, here they are, according to this benchmark:

benchmark iter time/iter bytes alloc allocs
BenchmarkJsonMarshal-8 500000 3714 ns/op 1232 B/op 10 allocs/op
BenchmarkJsonUnmarshal-8 500000 4125 ns/op 416 B/op 7 allocs/op
BenchmarkProtobufMarshal-8 1000000 1554 ns/op 200 B/op 7 allocs/op
BenchmarkProtobufUnmarshal-8 1000000 1055 ns/op 192 B/op 10 allocs/op
BenchmarkGogoprotobufMarshal-8 10000000 211 ns/op 64 B/op 1 allocs/op
BenchmarkGogoprotobufUnmarshal-8 5000000 289 ns/op 96 B/op 3 allocs/op

Ok, now let’s set up the environment.

Setup

First we’ll need to get the protobuffer compiler binaries from here:
https://github.com/google/protobuf/releases/tag/v3.0.0-beta-3
Unpack them somewhere in your PATH.

The next step is to get the golang plugin. Make sure that GOPATH/bin is in your PATH.

go get -u github.com/golang/protobuf/protoc-gen-go

Writing .proto files

Now it’s time to define our structure we’ll use. I’ll create mine in my project root. I’ll call it clientStructure.proto.

First we need to define the version of protobuffers we will use. Here we will use the newest – proto3. We’ll also define the package of the file. This will also be our go package name of the generated file.

syntax = "proto3";
package main;

Ok, now we’ll define our main structure in the file. The Client structure:

message Client {

}

Now it’s time to define the available fields. Fields are refered to by id, so for each field we define the type, name and id like this:

type name = id;

We’ll start with our first 4 fields of our client:

message Client {
        int32 id = 1;
        string name = 2;
        string email = 3;
        string country = 4;
}

We will also define an inner structure Mail:

        string country = 4;

        message Mail {
                string remoteEmail = 1;
                string body = 2;
        }

and finally define the inbox field. It’s an array of mails, which we create using the repeated keyword:

        message Mail {
                string remoteEmail = 1;
                string body = 2;
        }

        repeated Mail inbox = 5;
}

Now let’s compile it!
Open the directory with the protofile, and launch:

protoc --go_out=. clientStructure.proto

This will create a generated GO file with our Client structure.

My file looks like this:

// Code generated by protoc-gen-go.
// source: clientStructure.proto
// DO NOT EDIT!

/*
Package main is a generated protocol buffer package.

It is generated from these files:
    clientStructure.proto

It has these top-level messages:
    Client
*/
package main

import proto "github.com/golang/protobuf/proto"
import fmt "fmt"
import math "math"

// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf

// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
const _ = proto.ProtoPackageIsVersion1

type Client struct {
    Id          int32                   `protobuf:"varint,1,opt,name=id" json:"id,omitempty"`
    Name        string               `protobuf:"bytes,2,opt,name=name" json:"name,omitempty"`
    Email    string              `protobuf:"bytes,3,opt,name=email" json:"email,omitempty"`
    Country string               `protobuf:"bytes,4,opt,name=country" json:"country,omitempty"`
    Inbox    []*Client_Mail `protobuf:"bytes,5,rep,name=inbox" json:"inbox,omitempty"`
}

func (m *Client) Reset()                                        { *m = Client{} }
func (m *Client) String() string                        { return proto.CompactTextString(m) }
func (*Client) ProtoMessage()                            {}
func (*Client) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} }

func (m *Client) GetInbox() []*Client_Mail {
    if m != nil {
        return m.Inbox
    }
    return nil
}

type Client_Mail struct {
    RemoteEmail string `protobuf:"bytes,1,opt,name=remoteEmail" json:"remoteEmail,omitempty"`
    Body                string `protobuf:"bytes,2,opt,name=body" json:"body,omitempty"`
}

func (m *Client_Mail) Reset()                                       { *m = Client_Mail{} }
func (m *Client_Mail) String() string                       { return proto.CompactTextString(m) }
func (*Client_Mail) ProtoMessage()                           {}
func (*Client_Mail) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0, 0} }

func init() {
    proto.RegisterType((*Client)(nil), "main.Client")
    proto.RegisterType((*Client_Mail)(nil), "main.Client.Mail")
}

var fileDescriptor0 = []byte{
    // 191 bytes of a gzipped FileDescriptorProto
    0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xe2, 0x12, 0x4d, 0xce, 0xc9, 0x4c,
    0xcd, 0x2b, 0x09, 0x2e, 0x29, 0x2a, 0x4d, 0x2e, 0x29, 0x2d, 0x4a, 0xd5, 0x2b, 0x28, 0xca, 0x2f,
    0xc9, 0x17, 0x62, 0xc9, 0x4d, 0xcc, 0xcc, 0x53, 0x3a, 0xcc, 0xc8, 0xc5, 0xe6, 0x0c, 0x96, 0x17,
    0xe2, 0xe3, 0x62, 0xca, 0x4c, 0x91, 0x60, 0x54, 0x60, 0xd4, 0x60, 0x0d, 0x02, 0xb2, 0x84, 0x84,
    0xb8, 0x58, 0xf2, 0x12, 0x73, 0x53, 0x25, 0x98, 0x80, 0x22, 0x9c, 0x41, 0x60, 0xb6, 0x90, 0x08,
    0x17, 0x6b, 0x2a, 0x50, 0x5f, 0x8e, 0x04, 0x33, 0x58, 0x10, 0xc2, 0x11, 0x92, 0xe0, 0x62, 0x4f,
    0xce, 0x2f, 0xcd, 0x2b, 0x29, 0xaa, 0x94, 0x60, 0x01, 0x8b, 0xc3, 0xb8, 0x42, 0xea, 0x5c, 0xac,
    0x99, 0x79, 0x49, 0xf9, 0x15, 0x12, 0xac, 0x0a, 0xcc, 0x1a, 0xdc, 0x46, 0x82, 0x7a, 0x20, 0x4b,
    0xf5, 0x20, 0x16, 0xea, 0xf9, 0x02, 0xf5, 0x06, 0x41, 0xe4, 0xa5, 0x6c, 0xb8, 0x58, 0x40, 0x5c,
    0x21, 0x05, 0x2e, 0xee, 0xa2, 0xd4, 0xdc, 0xfc, 0x92, 0x54, 0x57, 0xb0, 0x35, 0x8c, 0x60, 0xe3,
    0x90, 0x85, 0x40, 0xce, 0x4a, 0xca, 0x4f, 0xa9, 0x84, 0x39, 0x0b, 0xc4, 0x4e, 0x62, 0x03, 0x7b,
    0xc9, 0x18, 0x10, 0x00, 0x00, 0xff, 0xff, 0xfd, 0x42, 0x16, 0x7b, 0xeb, 0x00, 0x00, 0x00,
}

Using Protocol Buffers in Go

The Server

Let’s first create the server. We will just receive a protobuf in a POST body, and print the contents.

First the basic structure and the imports:

package main

import (
    "github.com/golang/protobuf/proto"
    "net/http"
    "fmt"
    "io/ioutil"
)

func main() {
    http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
    })

    http.ListenAndServe(":3000", nil)
}

So, let’s start with a Client structure to fill, and read the body.

http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
        myClient := Client{}

        data, err := ioutil.ReadAll(r.Body)

        if err != nil {
            fmt.Println(err)
        }
    })

We’ll unmarshall the data, passing in a reference to the Client to fill in, and check for errors.

        if err != nil {
            fmt.Println(err)
        }

        if err := proto.Unmarshal(data, &myClient); err != nil {
            fmt.Println(err)
        }

and finally print it all

        if err := proto.Unmarshal(data, &myClient); err != nil {
            fmt.Println(err)
        }

        println(myClient.Id, ":", myClient.Name, ":", myClient.Email, ":", myClient.Country)

        for _, mail := range myClient.Inbox {
            fmt.Println(mail.RemoteEmail, ":", mail.Body)
        }
    })

Now let’s start with…

The Client

The Client will just fill in the Client structure, and send it to the server.

The structure:

package main

import (
    "github.com/golang/protobuf/proto"
    "net/http"
    "fmt"
    "bytes"
)

func main() {
}

We create the Client structure and fill it in. For the purposes of this article we’ll use, the oh so creatively named, John Doe.

func main() {
    myClient := Client{Id: 526, Name: "John Doe", Email: "[email protected]", Country: "US"}
    clientInbox := make([]*Client_Mail, 0, 20)
    clientInbox = append(clientInbox,
        &Client_Mail{RemoteEmail: "[email protected]", Body: "Hello. Greetings. Bye."},
        &Client_Mail{RemoteEmail: "[email protected]", Body: "Bye, Greetings, hello."})

    myClient.Inbox = clientInbox
}

We’ll marshall the John (the Client) into raw data and finally send him to the server.

    myClient.Inbox = clientInbox

    data, err := proto.Marshal(&myClient)
    if err != nil {
        fmt.Println(err)
        return
    }

    _, err = http.Post("http://localhost:3000", "", bytes.NewBuffer(data))

    if err != nil {
        fmt.Println(err)
        return
    }
}

Note that we used bytes.NewBuffer, so our raw data satisfies the Reader requirement for the request body.

Conclusion

As you can see, protobuffs are really easy to use and provide an actual speed boost in your application. Hope you’ll try to use them instead of JSON or other forms of transport in your next project. You can get more information about the more advanced functionalities here: https://developers.google.com/protocol-buffers/docs/gotutorial

Happy coding!

Practical Golang: Writing a simple login middleware

Introduction

In this part we’ll be creating a simple middleware you can easily apply to your handlers to get authentication/authorization. Middleware like this is an awesome way to add additional functionality to your Go server. Here we will only do authorization as we will only ask for a password, not a login. Although if you want, then you can easily extend this system to any authentication/authorization you’d like.

Implementation

We will mainly use the stdlib, and will use cookies to remember who’s already logged in. To generate the cookie values we will use the go.uuid library. So remember to

    go get github.com/satori/go.uuid

We will start with a basic structure of our system with an already existing “Hello World!!!” handler:

package main

import (
    "net/http"
    "fmt"
    "github.com/satori/go.uuid"
    "sync"
)

const loginPage = "<html><head><title>Login</title></head><body><form action=\"login\" method=\"post\"> <input type=\"password\" name=\"password\" /> <input type=\"submit\" value=\"login\" /> </form> </body> </html>"

func main() {

    http.Handle("/hello", helloWorldHandler{})
    http.HandleFunc("/login", handleLogin)

    http.ListenAndServe(":3000", nil)
}

type helloWorldHandler struct {
}

func (h helloWorldHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    fmt.Fprintln(w, "Hello World!!!")
}

type authenticationMiddleware struct {
    wrappedHandler http.Handler
}

func (h authenticationMiddleware) ServeHTTP(w http.ResponseWriter,r *http.Request) {
}

func authenticate(h http.Handler) authenticationMiddleware {
    return authenticationMiddleware{h}
}

func handleLogin(w http.ResponseWriter, r *http.Request) {
}

Ok, lets go over this code.

package main

import (
    "net/http"
    "fmt"
    "github.com/satori/go.uuid"
    "sync"
)

const loginPage = "<html><head><title>Login</title></head><body><form action=\"login\" method=\"post\"> <input type=\"password\" name=\"password\" /> <input type=\"submit\" value=\"login\" /> </form> </body> </html>"

func main() {

    http.Handle("/hello", helloWorldHandler{})
    http.HandleFunc("/login", handleLogin)

    http.ListenAndServe(":3000", nil)
}

type helloWorldHandler struct {
}

func (h helloWorldHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    fmt.Fprintln(w, "Hello World!!!")
}

First we declare the package, imports and the html code of the login page.

We also declare the basic hello world handler.

Now to get to the interesting part.

type authenticationMiddleware struct {
    wrappedHandler http.Handler
}

func (h authenticationMiddleware) ServeHTTP(w http.ResponseWriter,r *http.Request) {
}

func authenticate(h http.Handler) authenticationMiddleware {
    return authenticationMiddleware{h}
}

func handleLogin(w http.ResponseWriter, r *http.Request) {
}

we create a handler which will supply authorization, and if authorized successfully, will let the user through to the underlying handler. We also define the authenticate method, a simple function wrapper over creating a struct, and a function to handle the login.

That means we can also define the last route, the secured hello world route.

func main() {
    http.Handle("/hello", helloWorldHandler{})
    http.Handle("/secureHello", authenticate(helloWorldHandler{}))
    http.HandleFunc("/login", handleLogin)

    http.ListenAndServe(":3000", nil)
}

Ok, we will also need a simple client struct, which will just save if the target client session is authorized, and a map containing our Clients with cookie values being the keys.

var sessionStore map[string]Client
var storageMutex sync.RWMutex

type Client struct {
    loggedIn bool
}

We also need the mutex for concurrent map access. In the main function we initialize the map:

func main() {
    sessionStore = make(map[string]Client)
    http.Handle("/hello", helloWorldHandler{})

Now we can go to the authenticationMiddleware’s ServeHTTP function:

We’ll begin with checking if the cookie is present, if it isn’t there, we’ll continue and create a new. If the error is nonstandard then we just return.

func (h authenticationMiddleware) ServeHTTP(w http.ResponseWriter,r *http.Request) {
    cookie, err := r.Cookie("session")
    if err != nil {
        if err != http.ErrNoCookie {
            fmt.Fprint(w, err)
            return
        } else {
            err = nil
        }
    }

We later check, unless the cookie exists, if it’s saved in our map. If it’s not, then we will later generate a new one.

var present bool
var client Client
if cookie != nil {
    storageMutex.RLock()
    client, present = sessionStore[cookie.Value]
    storageMutex.RUnlock()
} else {
    present = false
}

Now, if the cookie wasn’t present, then we can generate a new one!:

if present == false {
    cookie = &http.Cookie{
        Name: "session",
        Value: uuid.NewV4().String(),
    }
    client = Client{false}
    storageMutex.Lock()
    sessionStore[cookie.Value] = client
    storageMutex.Unlock()
}

We can then set the cookie to our response writer, and if the client isn’t logged in, send him the login page, however, if he is logged in, then we can send him what he wanted:

    http.SetCookie(w, cookie)
    if client.loggedIn == false {
        fmt.Fprint(w, loginPage)
        return
    }
    if client.loggedIn == true {
        h.wrappedHandler.ServeHTTP(w, r)
        return
    }
}

So far so good, that’s actually already about the main part of the middleware, now we’ll write a function just for handling the login logic. The handleLogin function:

func handleLogin(w http.ResponseWriter, r *http.Request) {
    cookie, err := r.Cookie("session")
    if err != nil {
        if err != http.ErrNoCookie {
            fmt.Fprint(w, err)
            return
        } else {
            err = nil
        }
    }
    var present bool
    var client Client
    if cookie != nil {
        storageMutex.RLock()
        client, present = sessionStore[cookie.Value]
        storageMutex.RUnlock()
    } else {
        present = false
    }

    if present == false {
        cookie = &http.Cookie{
            Name: "session",
            Value: uuid.NewV4().String(),
        }
        client = Client{false}
        storageMutex.Lock()
        sessionStore[cookie.Value] = client
        storageMutex.Unlock()
    }
    http.SetCookie(w, cookie)}
}

First we created the part which is accountable for the cookie handling, as in the recent function. Now we get to the form parsing and the actual login part.

http.SetCookie(w, cookie)
err = r.ParseForm()
if err != nil {
    fmt.Fprint(w, err)
    return
}

if subtle.ConstantTimeCompare([]byte(r.FormValue("password")), []byte("password123")) == 1 {
    //login user
} else {
    fmt.Fprintln(w, "Wrong password.")
}

We parse the login form and check if the login conditions are met. Here we only need the password to be correct. If it is we log the client in:

if subtle.ConstantTimeCompare([]byte(r.FormValue("password")), []byte("password123")) == 1 {
    client.loggedIn = true
    fmt.Fprintln(w, "Thank you for logging in.")
    storageMutex.Lock()
    sessionStore[cookie.Value] = client
    storageMutex.Unlock()
}

We use subtle.ConstantTimeCompare as it protects us from time-based attacks. (Thanks for the tip in the reddit comment.)

That’s basically all we need. Now you can secure the routes you want easily.

Conclusion

Remember, that for a secure implementation you need encrypt the network traffic using SSL/TLS, otherwise, somebody can just read the cookie and impersonate your user.

Another thing to consider is redirecting the user to the page he wanted to get to after logging in.

Have fun with creating other interesting middleware!

Practical Golang: Event multicast/subscription service

Introduction

In our microservice architectures we always need a method for communicating between services. There are various ways to achieve this. Few of them are, but are not limited to: Remote Procedure Call, REST API’s, message BUSses. In this comprehensive tutorial we’ll write a service, which you can use to distribute messages/events across your system.

Design

How will it work? It will accept registering subscribers (other microservices). Whenever it gets a message from a microservice, it will send it further to all subscribers, using a REST call to the other microservices /event URL.

Subscribers will need to call a keep-alive URL regularly, otherwise they will get removed from the subscriber list. This protects us from sending messages to too many ghost subscribers.

Implementation

Let’s start with a basic structure. We’ll define the API and set up our two main data structures:
1. The subscriber list with their register/lastKeepAlive dates.
2. The mutex controlling access to our subscriber list.

package main

import (
    "net/http"
    "time"
    "sync"
    "fmt"
    "net/url"
    "io/ioutil"
    "bytes"
)

var registeredServiceStorage map[string]time.Time
var serviceStorageMutex sync.RWMutex

func main() {
    registeredServiceStorage = make(map[string]time.Time)
    serviceStorageMutex = sync.RWMutex{}

    http.HandleFunc("/registerAndKeepAlive", registerAndKeepAlive)
    http.HandleFunc("/deregister", deregister)
    http.HandleFunc("/sendMessage", handleMessage)
    http.HandleFunc("/listSubscribers", handleSubscriberListing)

    go killZombieServices()
    http.ListenAndServe(":3000", nil)
}

func registerAndKeepAlive(w http.ResponseWriter, r *http.Request) {
}

func deregister(w http.ResponseWriter, r *http.Request) {
}

func handleMessage(w http.ResponseWriter, r *http.Request) {
}

func sendMessageToSubscriber(data []byte, address string) {
}

func handleSubscriberListing(w http.ResponseWriter, r *http.Request) {
}

func killZombieServices() {
}

We initialize our subscriber list and mutex, and also launch, on another thread, a function that will regularly delete ghost subscribers.

So far so good!
We can now start getting into each functions implementation.

We can begin with the registerAndKeepAlive which does both things. Registering a new subscriber, or updating an existing one. This works because in both cases we just update the map entry with the subscriber address to contain the current time.

func registerAndKeepAlive(w http.ResponseWriter, r *http.Request) {
    if r.Method == http.MethodPost {
        //Subscriber registration
    } else {
        w.WriteHeader(http.StatusBadRequest)
        fmt.Fprint(w, "Error: Only POST accepted")
    }
}

The register function should be called with a POST request. That’s why the first thing we do, is checking if the method is right, otherwise we answer with an error. If it’s ok, then we register the client:

if r.Method == http.MethodPost {
    values, err := url.ParseQuery(r.URL.RawQuery)
    if err != nil {
        w.WriteHeader(http.StatusBadRequest)
        fmt.Fprint(w, "Error:", err)
        return
    }
    if len(values.Get("address")) == 0 {
        w.WriteHeader(http.StatusBadRequest)
        fmt.Fprint(w, "Error:","Wrong input address.")
        return
    }

}

We check if the URL arguments are correct, and finally register the subscriber:

if len(values.Get("address")) == 0 {
    w.WriteHeader(http.StatusBadRequest)
    fmt.Fprint(w, "Error:","Wrong input address.")
    return
}

serviceStorageMutex.Lock()
registeredServiceStorage[values.Get("address")] = time.Now()
serviceStorageMutex.Unlock()

fmt.Fprint(w, "success")

Awesome!

Let’s now implement the function which shall delete the entry when the subscriber wants to deregister.

func deregister(w http.ResponseWriter, r *http.Request) {
    if r.Method == http.MethodDelete {
        values, err := url.ParseQuery(r.URL.RawQuery)
        if err != nil {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error:", err)
            return
        }
        if len(values.Get("address")) == 0 {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error:","Wrong input address.")
            return
        }

        //Subscriber deletion will come here

    } else {
        w.WriteHeader(http.StatusBadRequest)
        fmt.Fprint(w, "Error: Only DELETE accepted")
    }
}

Again do we check if the request method is good and if the address argument is correct. If that’s the case, then we can remove this client from our subscriber list.

if len(values.Get("address")) == 0 {
    w.WriteHeader(http.StatusBadRequest)
    fmt.Fprint(w, "Error:","Wrong input address.")
    return
}

serviceStorageMutex.Lock()
delete(registeredServiceStorage, values.Get("address"))
serviceStorageMutex.Unlock()

fmt.Fprint(w, "success")

Now it’s time for the main functionality. Namely handling messages and sending them to all subscribers:

func handleMessage(w http.ResponseWriter, r *http.Request) {
    if r.Method == http.MethodPost {

    } else {
        w.WriteHeader(http.StatusBadRequest)
        fmt.Fprint(w, "Error: Only POST accepted")
    }
}

As usual, we check if the request method is correct.

Then, we read the data we got, so we can pass it to multiple concurrent sending functions.

if r.Method == http.MethodPost {

    data, err := ioutil.ReadAll(r.Body)
    if err != nil {
        fmt.Println(err)
    }
    //...
}

We then lock the mutex for read. That’s important so that we can handle huge amounts of messages efficiently. Basically, it means that we allow others to read while we are reading, because concurrent reading is supported by maps. We can use this unless there’s no one modifying the map.

While we lock the map for read, we check the list of subscribers we have to send the message to, and start concurrent functions that will do the sending. As we don’t want to lock the map for the entire sending time, we only need the addresses.

    data, err := ioutil.ReadAll(r.Body)
    if err != nil {
        fmt.Println(err)
    }

    serviceStorageMutex.RLock()
    for address, _ := range registeredServiceStorage {
        go sendMessageToSubscriber(data, address)
    }
    serviceStorageMutex.RUnlock()

    fmt.Fprint(w, "success")

Which means we now have to implement the sendMessageToSubscriber(…) function.

It’s pretty simple, we just make a post, and print an error if it happened.

func sendMessageToSubscriber(data []byte, address string) {
    _, err := http.Post("http://" + address + "/event", "", bytes.NewBuffer(data))
    if err != nil {
        fmt.Println(err)
    }
}

It’s important to notice, that we have to create a buffer from the data, as the http.Post(…) function needs a reader type data structure.

We’ll also implement the function which makes it possible to list all the subscribers. Mainly for debugging purposes. There’s nothing new in it. We check if the method is alright, lock the mutex for read, and finally print the map with a correct format of the register time.

func handleSubscriberListing(w http.ResponseWriter, r *http.Request) {
    if r.Method == http.MethodGet {
        serviceStorageMutex.RLock()

        for address, registerTime := range registeredServiceStorage {
            fmt.Fprintln(w, address, " : ", registerTime.Format(time.RFC3339))
        }

        serviceStorageMutex.RUnlock()
    } else {
        w.WriteHeader(http.StatusBadRequest)
        fmt.Fprint(w, "Error: Only GET accepted")
    }
}

Now there’s only one function left. The one that will make sure no ghost services stay for too long. It will check all the services once per minute. This way we’re making it cheap on performance:

func killZombieServices() {
    t := time.Tick(1 * time.Minute)

    for range t {
    }
}

This is a nice way to launch the code every minute. We create a channel which will send us the time every minute, and range over it, ignoring the received values.

We can now get the check and remove working.

for range t {
    timeNow := time.Now()
    serviceStorageMutex.Lock()
    for address, timeKeepAlive := range registeredServiceStorage {
        if timeNow.Sub(timeKeepAlive).Minutes() > 2 {
            delete(registeredServiceStorage, address)
        }
    }
    serviceStorageMutex.Unlock()
}

We just range over the subscribers and delete those that haven’t kept their subscription alive.

To add to that, if you wanted you could first make a read-only pass over the subscribers, and immediately after that, make a write-locked deletion of the ones you found. This would allow others to read the map while you’re finding subscribers to delete.

Conclusion

That’s all! Have fun with creating an infrastructure based on such a service!

Web app using Microservices in Go: Part 4 – Worker and Frontend

Previous part

Introduction

In this part we will finally finish writing our application. We will implement the last two services:
1. The Worker
2. The Frontend

The Worker

The worker will communicate with the Master to get new Tasks. When it gets a Task it will get the corresponding data from the storage and will start working on the task. When it finishes it will send the finished data to the storage service, and if that succeeds it will register the Task as finished to the Master.

That means that you can easily scale the workforce, by turning on additional machines, with the worker service on them. Easy scaling is good!

Implementation

As usual we will start with a basic structure which is similar to the structure of our previous services. Although there is one big difference. There won’t be any API here as the worker will be a client. You could, if you wanted, add an API for debugging purposes. Things like getting the processor usage. But this can also be implemented using 3rd party health checking services.

So here’s our basic structure:

package main

import (
    "os"
    "fmt"
    "net/http"
    "io/ioutil"
    "encoding/json"
    "time"
    "strconv"
    "image"
    "image/png"
    "image/color"
    "bytes"
    "sync"
)

type Task struct {
    Id int `json:"id"`
    State int `json:"state"`
}

var masterLocation string
var storageLocation string
var keyValueStoreAddress string

func main() {
    if len(os.Args) < 3 {
        fmt.Println("Error: Too few arguments.")
        return
    }
    keyValueStoreAddress = os.Args[1]

    response, err := http.Get("http://" + keyValueStoreAddress + "/get?key=masterAddress")
    if response.StatusCode != http.StatusOK {
        fmt.Println("Error: can't get master address.")
        fmt.Println(response.Body)
        return
    }
    data, err := ioutil.ReadAll(response.Body)
    if err != nil {
        fmt.Println(err)
        return
    }
    masterLocation = string(data)
    if len(masterLocation) == 0 {
        fmt.Println("Error: can't get master address. Length is zero.")
        return
    }

    response, err = http.Get("http://" + keyValueStoreAddress + "/get?key=storageAddress")
    if response.StatusCode != http.StatusOK {
        fmt.Println("Error: can't get storage address.")
        fmt.Println(response.Body)
        return
    }
    data, err = ioutil.ReadAll(response.Body)
    if err != nil {
        fmt.Println(err)
        return
    }
    storageLocation = string(data)
    if len(storageLocation) == 0 {
        fmt.Println("Error: can't get storage address. Length is zero.")
        return
    }
}

func getNewTask(masterAddress string) (Task, error) {
}
func getImageFromStorage(storageAddress string, myTask Task) (image.Image, error) {
}
func doWorkOnImage(myImage image.Image) image.Image {
}
func sendImageToStorage(storageAddress string, myTask Task, myImage image.Image) error {
}
func registerFinishedTask(masterAddress string, myTask Task) error {
}

It may seem to be much but there is nothing new actually in the most dense parts. We define the Task structure first and declare the needed addresses/locations.

Later we check if there are enough arguments passed to the application, and, as we did in the previous parts, we get the addresses of the Master and the Storage. That’s basically all.

For the worker we will want a command line parameter to set the number of concurrent threads. That’s why we check if there are 3 Arguments.

Now let’s parse the thread count:

storageLocation = string(data)
if len(storageLocation) == 0 {
    fmt.Println("Error: can't get storage address. Length is zero.")
    return
}

threadCount, err := strconv.Atoi(os.Args[2])
if err != nil {
    fmt.Println("Error: Couldn't parse thread count.")
    return
}

We use the atoi function to parse the string argument to an int.

And now we come to the main for loop which runs on each thread:

myWG := sync.WaitGroup{}
myWG.Add(threadCount)
for i := 0; i < threadCount; i++ {
    go func() {
        for {
            //Do work...
        }
    }()
}
myWG.Wait()

Ok, what are we doing there? We create a waitGroup. The main function has to be waiting for the goroutines and not just finish execution, that’s why we create a waitGroup and add the thread count. You could add a functionality to break the endless for loop and after that use the Done() function on the waitgroup. We won’t be adding this as we just want endless for work loops.

Now we will write down the execution process for each Task.
First we get a new Task:

for {
    myTask, err := getNewTask(masterLocation)
    if err != nil {
        fmt.Println(err)
        fmt.Println("Waiting 2 second timeout...")
        time.Sleep(time.Second * 2)
        continue
    }

If we error, then we wait 2 seconds, so it doesn’t make a lot of errored requests to the master at once. As this could flood the other services.

If we successfully get the Task, then we get the image to work on from the storage:

myTask, err := getNewTask(masterLocation)
if err != nil {
    fmt.Println(err)
    fmt.Println("Waiting 2 second timeout...")
    time.Sleep(time.Second * 2)
    continue
}

myImage, err := getImageFromStorage(storageLocation, myTask)
if err != nil {
    fmt.Println(err)
    fmt.Println("Waiting 2 second timeout...")
    time.Sleep(time.Second * 2)
    continue
}

Ok, now it’s time to do something with the image!!!

myImage, err := getImageFromStorage(storageLocation, myTask)
if err != nil {
    fmt.Println(err)
    fmt.Println("Waiting 2 second timeout...")
    time.Sleep(time.Second * 2)
    continue
}

myImage = doWorkOnImage(myImage)

We now of course have to save the image back to the storage:

myImage = doWorkOnImage(myImage)

err = sendImageToStorage(storageLocation, myTask, myImage)
if err != nil {
    fmt.Println(err)
    fmt.Println("Waiting 2 second timeout...")
    time.Sleep(time.Second * 2)
    continue
}

And finally if that succeeds we register our successfully finished Task!

        err = sendImageToStorage(storageLocation, myTask, myImage)
        if err != nil {
            fmt.Println(err)
            fmt.Println("Waiting 2 second timeout...")
            time.Sleep(time.Second * 2)
            continue
        }

        err = registerFinishedTask(masterLocation, myTask)
        if err != nil {
            fmt.Println(err)
            fmt.Println("Waiting 2 second timeout...")
            time.Sleep(time.Second * 2)
            continue
        }
    }
}()

Ok, so now we can continue with implementing these functions. Let’s first implement the getNewTask function:

func getNewTask(masterAddress string) (Task, error) {
    response, err := http.Post("http://" + masterAddress + "/getNewTask", "text/plain", nil)
    if err != nil || response.StatusCode != http.StatusOK {
        return Task{-1, -1}, err
    }
    data, err := ioutil.ReadAll(response.Body)
    if err != nil {
        return Task{-1, -1}, err
    }

    myTask := Task{}
    err = json.Unmarshal(data, &myTask)
    if err != nil {
        return Task{-1, -1}, err
    }

    return myTask, nil
}

We make the request to the master and check if it was successful. We read the response body to memory and finally Unmarshal the response body to our Task structure. Finally we return it.

Now we can implement the function to get the image from storage!

func getImageFromStorage(storageAddress string, myTask Task) (image.Image, error) {
    response, err := http.Get("http://" + storageAddress + "/getImage?state=working&id=" + strconv.Itoa(myTask.Id))
    if err != nil || response.StatusCode != http.StatusOK {
        return nil, err
    }

    myImage, err := png.Decode(response.Body)
    if err != nil {
        return nil, err
    }

    return myImage, nil
}

We get the response whose body is the raw image, so we just Decode it and return it if we succeed.

Now that we have our image we can finally do some work on it:

func doWorkOnImage(myImage image.Image) image.Image {
    myCanvas := image.NewRGBA(myImage.Bounds())

    for i := 0; i < myCanvas.Rect.Max.X; i++ {
        for j := 0; j < myCanvas.Rect.Max.Y; j++ {
            r, g, b, _ := myImage.At(i, j).RGBA()
            myColor := new(color.RGBA)
            myColor.R = uint8(g)
            myColor.G = uint8(r)
            myColor.B = uint8(b)
            myColor.A = uint8(255)
            myCanvas.Set(i, j, myColor)
        }
    }

    return myCanvas.SubImage(myImage.Bounds())
}

The work is largely irrelevant, but I’ll explain it anyways. First we create a RGBA. That’s something like a canvas for drawing, and we create it with the size of our image. Later we draw on the canvas swapping the red with the green channel. Later we use the RGBA to return a new modified image, created from our canvas with the size of our original image.

After working on the image we have to send it back to the storage system. So let’s implement the sendImageToStorage function:

func sendImageToStorage(storageAddress string, myTask Task, myImage image.Image) error {
    data := []byte{}
    buffer := bytes.NewBuffer(data)
    err := png.Encode(buffer, myImage)
    if err != nil {
        return err
    }
    response, err := http.Post("http://" + storageAddress + "/sendImage?state=finished&id=" + strconv.Itoa(myTask.Id), "image/png", buffer)
    if err != nil || response.StatusCode != http.StatusOK {
        return err
    }

    return nil
}

We create a data byte slice, and from that a data buffer which allows us to use it as a readwriter interface. We then use this interface to encode our image to png into, and finally send it using a POST to the server. If everything works out, then we just return.

When we successfully saved the image, we can register to the Master that we finished the Task.

func registerFinishedTask(masterAddress string, myTask Task) error {
    response, err := http.Post("http://" + masterAddress + "/registerTaskFinished?id=" + strconv.Itoa(myTask.Id), "test/plain", nil)
    if err != nil || response.StatusCode != http.StatusOK {
        return err
    }

    return nil
}

Nothing fancy. Just sending a POST request to notify about finishing the task.

Ok, that’s all regarding the worker. You can turn it on and it will wait for Tasks to get. Which means we need to get the Tasks from the user to our service finally. And that leads us to…

The Frontend

This one will show the user the website and also parse the user form so the backend services only get the raw image data. It will be the only interface to our application for the user.

As usual, we’ll begin with the basic structure of the file:

package main

import (
    "net/http"
    "fmt"
    "io/ioutil"
    "os"
    "net/url"
    "io"
)

const indexPage = "<html><head><title>Upload file</title></head><body><form enctype=\"multipart/form-data\" action=\"submitTask\" method=\"post\"> <input type=\"file\" name=\"uploadfile\" /> <input type=\"submit\" value=\"upload\" /> </form> </body> </html>"

var keyValueStoreAddress string
var masterLocation string

func main() {
    if len(os.Args) < 2 {
        fmt.Println("Error: Too few arguments.")
        return
    }
    keyValueStoreAddress = os.Args[1]

    response, err := http.Get("http://" + keyValueStoreAddress + "/get?key=masterAddress")
    if response.StatusCode != http.StatusOK {
        fmt.Println("Error: can't get master address.")
        fmt.Println(response.Body)
        return
    }
    data, err := ioutil.ReadAll(response.Body)
    if err != nil {
        fmt.Println(err)
        return
    }
    masterLocation = string(data)
    if len(masterLocation) == 0 {
        fmt.Println("Error: can't get master address. Length is zero.")
        return
    }

    http.HandleFunc("/", handleIndex)
    http.HandleFunc("/submitTask", handleTask)
    http.HandleFunc("/isReady", handleCheckForReadiness)
    http.HandleFunc("/getImage", serveImage)
    http.ListenAndServe(":80", nil)
}

func handleIndex(w http.ResponseWriter, r *http.Request) {
}

func handleTask(w http.ResponseWriter, r *http.Request) {
}

func handleCheckForReadiness(w http.ResponseWriter, r *http.Request) {
}

func serveImage(w http.ResponseWriter, r *http.Request) {
}

We’ve got the code of our index web page here, and we also have the API declared. After starting the program we check if we have the k/v store address. If we do (we hope so), then we can get the master address from it.

Now we can go on to implementing the functions. We’ll start with the simples. The index handler:

func handleIndex(w http.ResponseWriter, r *http.Request) {
    fmt.Fprint(w, indexPage)
}

After writing this we can go on to writing the more complicated functions. We will start with the handleTask function. This function is responsible for parsing the user form and sending the raw image data to the master.

func handleTask(w http.ResponseWriter, r *http.Request) {
    if r.Method == http.MethodPost {
    err := r.ParseMultipartForm(10000000)
    if err != nil {
        w.WriteHeader(http.StatusBadRequest)
        fmt.Fprint(w, "Wrong input")
        return
    }
    file, _, err := r.FormFile("uploadfile")
    if err != nil {
        w.WriteHeader(http.StatusBadRequest)
        fmt.Fprint(w, "Wrong input")
        return
    }

Ok, what do we have here? We check the method as we always do, and later parse the multipart form. We’ve got a nice lovely magic number there. The number is responsible for setting the max size of the form held in RAM. The rest will be stored in temporary files. We later do the request using the file we got:

    if err != nil {
        w.WriteHeader(http.StatusBadRequest)
        fmt.Fprint(w, "Wrong input")
        return
    }

    response, err := http.Post("http://" + masterLocation + "/new", "image", file)
    if err != nil || response.StatusCode != http.StatusOK {
        w.WriteHeader(http.StatusBadRequest)
        fmt.Fprint(w, "Error:", err)
        return
    }

    data, err := ioutil.ReadAll(response.Body)
    if err != nil {
        w.WriteHeader(http.StatusBadRequest)
        fmt.Fprint(w, "Error:", err)
        return
    }

    fmt.Fprint(w, string(data))
} else {
    w.WriteHeader(http.StatusBadRequest)
    fmt.Fprint(w, "Error: Only POST accepted")
}

We send the user back the new Task id we got back from the master.

Now we can implement the function which will handle the request to check if the Task is finished and ready:

func handleCheckForReadiness(w http.ResponseWriter, r *http.Request) {
    if r.Method == http.MethodGet {
        values, err := url.ParseQuery(r.URL.RawQuery)
        if err != nil {
            fmt.Fprint(w, err)
            return
        }
        if len(values.Get("id")) == 0 {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Wrong input")
            return
        }

        response, err := http.Get("http://" + masterLocation + "/isReady?id=" + values.Get("id") + "&state=finished")
        if err != nil || response.StatusCode != http.StatusOK {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error:", err)
            return
        }

        data, err := ioutil.ReadAll(response.Body)
        if err != nil {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error:", err)
            return
        }

        switch string(data) {
        case "0":
            fmt.Fprint(w, "Your image is not ready yet.")
        case "1":
            fmt.Fprint(w, "Your image is ready.")
        default :
            fmt.Fprint(w, "Internal server error.")
        }
    } else {
        w.WriteHeader(http.StatusBadRequest)
        fmt.Fprint(w, "Error: Only GET accepted")
    }
}

Most of this is the same as the function in the master. We just check if the id is correct and make a request to the master. The interesting part is this:

switch string(data) {
case "0":
    fmt.Fprint(w, "Your image is not ready yet.")
case "1":
    fmt.Fprint(w, "Your image is ready.")
default :
    fmt.Fprint(w, "Internal server error.")
}

We cast the response body to a string, and if it matches 1 or 0 we answer. If it’s something else then we know something went really wrong, and send back an error.

Now we can get to the last function, the function to serve the actual images:

func serveImage(w http.ResponseWriter, r *http.Request) {
    if r.Method == http.MethodGet {
        values, err := url.ParseQuery(r.URL.RawQuery)
        if err != nil {
            fmt.Fprint(w, err)
            return
        }
        if len(values.Get("id")) == 0 {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Wrong input")
            return
        }

        response, err := http.Get("http://" + masterLocation + "/get?id=" + values.Get("id") + "&state=finished")
        if err != nil || response.StatusCode != http.StatusOK {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error:", err)
            return
        }

        _, err = io.Copy(w, response.Body)
        if err != nil {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error:", err)
            return
        }
    } else {
        w.WriteHeader(http.StatusBadRequest)
        fmt.Fprint(w, "Error: Only GET accepted")
    }
}

It just checks the id, sends the request, and copies the response to answer the user.

Conclusion

So now it’s all finished. You can start all applications and they will work. You can contact the frontend and make requests, they will all start working and you will get your modified images. Six microservices working beautifully together.

This may be the end of this series, so have fun extending this system alone.

I may add a finishing part about deployment to container infrastructues, or a another extending series about refactoring the system with 3rd party libraries in the future but I’m not sure.

All in all, good luck!

UPDATE: Also remember, that when running the worker, it’s good to launch it with goroutines in the number of 2-4x your system threads. They have near-0 overhead in switching and this way your not unnecessarily blocking when waiting for http responses.

Web app using Microservices in Go: Part 3 – Storage and Master

Previous part

Introduction

In this part we will implement the next part of the microservices needed for our web app. We will implement the:
* Storage system
* Master

This way we will have the Master API ready when we’ll be writing the slaves/workers and the frontend. And we’ll already have the database, k/v store and storage when writing the master. SO every time we write something we’ll already have all its dependencies.

The storage system

Ok, this one will be pretty easy to write. Just handling files. Let’s build the basic structure, which will include a function to register in our k/v store. For reference how it works check out the previous part. So here’s the basic structure:

package main

import (
    "fmt"
    "net/http"
    "io/ioutil"
    "os"
    "net/url"
    "io"
)

func main() {
    if !registerInKVStore() {
        return
    }
    http.HandleFunc("/sendImage", receiveImage)
    http.HandleFunc("/getImage", serveImage)
    http.ListenAndServe(":3002", nil)
}

func receiveImage(w http.ResponseWriter, r *http.Request) {
}

func serveImage(w http.ResponseWriter, r *http.Request) {
}

func registerInKVStore() bool {
    if len(os.Args) < 3 {
        fmt.Println("Error: Too few arguments.")
        return false
    }
    storageAddress := os.Args[1] // The address of itself
    keyValueStoreAddress := os.Args[2]

    response, err := http.Post("http://" + keyValueStoreAddress + "/set?key=storageAddress&value=" + storageAddress, "", nil)
    if err != nil {
        fmt.Println(err)
        return false
    }
    data, err := ioutil.ReadAll(response.Body)
    if err != nil {
        fmt.Println(err)
        return false
    }
    if response.StatusCode != http.StatusOK {
        fmt.Println("Error: Failure when contacting key-value store: ", string(data))
        return false
    }
    return true
}

So now we’ll have to handle the file serving/uploading. We will use a state url argument to specify if we are using the not yet finished (aka working) directory, or the finished one.

So first let’s write the receiveImage function which is there to get the files from clients:

func receiveImage(w http.ResponseWriter, r *http.Request) {
    if r.Method == http.MethodPost {
        values, err := url.ParseQuery(r.URL.RawQuery)
        if err != nil {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error:", err)
            return
        }
        if len(values.Get("id")) == 0 {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error:","Wrong input id.")
            return
        }
        if values.Get("state") != "working" && values.Get("state") != "finished" {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error:","Wrong input state.")
            return
        }
    } else {
        w.WriteHeader(http.StatusBadRequest)
        fmt.Fprint(w, "Error: Only POST accepted")
    }
}

Here we check if the request method is POST, if there is an id, and if the state is working or finished.

Next we can create the file and put in the image:

if values.Get("state") != "working" && values.Get("state") != "finished" {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error:","Wrong input state.")
            return
        }

        _, err = strconv.Atoi(values.Get("id"))
        if err != nil {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error:","Wrong input id.")
            return
        }

        file, err := os.Create("/tmp/" + values.Get("state") + "/" + values.Get("id") + ".png")
        defer file.Close()
        if err != nil {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error:", err)
            return
        }

        _, err = io.Copy(file, r.Body)
        if err != nil {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error:", err)
            return
        }

        fmt.Fprint(w, "success")

We create a file in the tmp/state directory with the right id. Another thing we do is check if the id really is a valid int. We parse it to an int, to see if it succeeds and if it does then we use it, as a string.

we use the io.Copy function to put all the data from the request to the file. That means that the body of our request should be a raw image.

Next we can write the function to serve images which is pretty similar:

func serveImage(w http.ResponseWriter, r *http.Request) {
    if r.Method == http.MethodGet {
        values, err := url.ParseQuery(r.URL.RawQuery)
        if err != nil {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error:", err)
            return
        }
        if len(values.Get("id")) == 0 {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error:","Wrong input id.")
            return
        }
        if values.Get("state") != "working" && values.Get("state") != "finished" {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error:","Wrong input state.")
            return
        }

        _, err = strconv.Atoi(values.Get("id"))
        if err != nil {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error:","Wrong input id.")
            return
        }

        file, err := os.Open("/tmp/" + values.Get("state") + "/" + values.Get("id") + ".png")
        defer file.Close()
        if err != nil {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error:", err)
            return
        }

        _, err = io.Copy(w, file)
        if err != nil {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error:", err)
            return
        }
    } else {
        w.WriteHeader(http.StatusBadRequest)
        fmt.Fprint(w, "Error: Only GET accepted")
    }
}

Instead of creating the file, we open it. Instead of copying to the file we copy from it. And we check if the method is GET.

That’s it. We’ve got a storage service which saves and servers raw image files. Now we can get to the master!

The master

We now have all the dependencies the master needs. So let’s write it now. Here’s the basic structure:

package main

import (
    "os"
    "fmt"
    "net/http"
    "io/ioutil"
)

type Task struct {
    Id int `json:"id"`
    State int `json:"state"`
}

var databaseLocation string
var storageLocation string

func main() {
    if !registerInKVStore() {
        return
    }

    http.HandleFunc("/new", newImage)
    http.HandleFunc("/get", getImage)
    http.HandleFunc("/isReady", isReady)
    http.HandleFunc("/getNewTask", getNewTask)
    http.HandleFunc("/registerTaskFinished", registerTaskFinished)
    http.ListenAndServe(":3003", nil)
}

func newImage(w http.ResponseWriter, r *http.Request) {
}

func getImage(w http.ResponseWriter, r *http.Request) {
}

func isReady(w http.ResponseWriter, r *http.Request) {
}

func getNewTask(w http.ResponseWriter, r *http.Request) {
}

func registerTaskFinished(w http.ResponseWriter, r *http.Request) {
}

func registerInKVStore() bool {
    if len(os.Args) < 3 {
        fmt.Println("Error: Too few arguments.")
        return false
    }
    masterAddress := os.Args[1] // The address of itself
    keyValueStoreAddress := os.Args[2]

    response, err := http.Post("http://" + keyValueStoreAddress + "/set?key=masterAddress&value=" + masterAddress, "", nil)
    if err != nil {
        fmt.Println(err)
        return false
    }
    data, err := ioutil.ReadAll(response.Body)
    if err != nil {
        fmt.Println(err)
        return false
    }
    if response.StatusCode != http.StatusOK {
        fmt.Println("Error: Failure when contacting key-value store: ", string(data))
        return false
    }
    return true
}

It’s the structure of the API and the mechanics to register in the k/v store.

We also need to get the storage and database locations in the main function:

if !registerInKVStore() {
        return
    }
    keyValueStoreAddress = os.Args[2]

    response, err := http.Get("http://" + keyValueStoreAddress + "/get?key=databaseAddress")
    if response.StatusCode != http.StatusOK {
        fmt.Println("Error: can't get database address.")
        fmt.Println(response.Body)
        return
    }
    data, err := ioutil.ReadAll(response.Body)
    if err != nil {
        fmt.Println(err)
        return
    }
    databaseLocation = string(data)

    response, err = http.Get("http://" + keyValueStoreAddress + "/get?key=storageAddress")
    if response.StatusCode != http.StatusOK {
        fmt.Println("Error: can't get storage address.")
        fmt.Println(response.Body)
        return
    }
    data, err = ioutil.ReadAll(response.Body)
    if err != nil {
        fmt.Println(err)
        return
    }
    storageLocation = string(data)

Now we can start implementing all the functionality!

Let’s start with the newImage function as it contains a good bit of code and mechanics which will be again used in the other funtions.
Here’s the beginning:

func newImage(w http.ResponseWriter, r *http.Request) {
    if r.Method == http.MethodPost {
        response, err := http.Post("http://" + databaseLocation + "/newTask", "text/plain", nil)
        if err != nil {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error:", err)
            return
        }
        id, err := ioutil.ReadAll(response.Body)
        if err != nil {
            fmt.Println(err)
            return
        }
    } else {
        w.WriteHeader(http.StatusBadRequest)
        fmt.Fprint(w, "Error: Only POST accepted")
    }
}

As usual we check if the method is right. Next we register a new Task in the database and get and Id.

We now use this to send the image to the storage:

id, err := ioutil.ReadAll(response.Body)
if err != nil {
    fmt.Println(err)
    return
}

_, err = http.Post("http://" + storageLocation + "/sendImage?id=" + string(id) + "&state=working", "image", r.Body)
if err != nil {
    w.WriteHeader(http.StatusBadRequest)
    fmt.Fprint(w, "Error:", err)
    return
}
fmt.Fprint(w, string(id))

That’s it. The new task will be created, the storage will get a file into the working directory with the name of the file being the id, and the client gets back the id. The important thing here is that we need the raw image in the request. The user form has to be parsed in the frontend service.

Now we can create the function which just checks if a Task is ready:

func isReady(w http.ResponseWriter, r *http.Request) {
    if r.Method == http.MethodGet {
        values, err := url.ParseQuery(r.URL.RawQuery)
        if err != nil {
            fmt.Fprint(w, err)
            return
        }
        if len(values.Get("id")) == 0 {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Wrong input")
            return
        }
    } else {
        w.WriteHeader(http.StatusBadRequest)
        fmt.Fprint(w, "Error: Only GET accepted")
    }
}

We first have to verify all the parameters and the request method. Next we can ask the database for the Task requested:

if len(values.Get("id")) == 0 {
    w.WriteHeader(http.StatusBadRequest)
    fmt.Fprint(w, "Wrong input")
    return
}

response, err := http.Get("http://" + databaseLocation + "/getById?id=" + values.Get("id"))
if err != nil {
    w.WriteHeader(http.StatusBadRequest)
    fmt.Fprint(w, "Error:", err)
    return
}
data, err := ioutil.ReadAll(response.Body)
if err != nil {
    fmt.Println(err)
    return
}

We also read the response immediately. Now we can parse the Task and respond to the client:

if err != nil {
    fmt.Println(err)
    return
}

myTask := Task{}
json.Unmarshal(data, &myTask)

if(myTask.State == 2) {
    fmt.Fprint(w, "1")
} else {
    fmt.Fprint(w, "0")
}

So now we can implement the last client facing interface, the getImage function:

if r.Method == http.MethodGet {
        values, err := url.ParseQuery(r.URL.RawQuery)
        if err != nil {
            fmt.Fprint(w, err)
            return
        }
        if len(values.Get("id")) == 0 {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Wrong input")
            return
        }
} else {
    w.WriteHeader(http.StatusBadRequest)
    fmt.Fprint(w, "Error: Only GET accepted")
}

Here we verified the request and now we need to get the image from the storage system, and just copy the response to our client:

if len(values.Get("id")) == 0 {
    w.WriteHeader(http.StatusBadRequest)
    fmt.Fprint(w, "Wrong input")
    return
}

response, err := http.Get("http://" + storageLocation + "/getImage?id=" + values.Get("id") + "&state=finished")
if err != nil {
    w.WriteHeader(http.StatusBadRequest)
    fmt.Fprint(w, "Error:", err)
    return
}

_, err = io.Copy(w, response.Body)
if err != nil {
    w.WriteHeader(http.StatusBadRequest)
    fmt.Fprint(w, "Error:", err)
    return
}

That’s it! The client facing interface is finished!

Implementing the worker facing interface

Now we have to implement the functions to serve the workers.

Both functions will basically be just direct routes to the database and back, so now let’s write ’em too:

func getNewTask(w http.ResponseWriter, r *http.Request) {
    if r.Method == http.MethodPost {
        response, err := http.Post("http://" + databaseLocation + "/getNewTask", "text/plain", nil)
        if err != nil {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error:", err)
            return
        }

        _, err = io.Copy(w, response.Body)
        if err != nil {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error:", err)
            return
        }
    } else {
        w.WriteHeader(http.StatusBadRequest)
        fmt.Fprint(w, "Error: Only POST accepted")
    }
}

func registerTaskFinished(w http.ResponseWriter, r *http.Request) {
    if r.Method == http.MethodPost {
        values, err := url.ParseQuery(r.URL.RawQuery)
        if err != nil {
            fmt.Fprint(w, err)
            return
        }
        if len(values.Get("id")) == 0 {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Wrong input")
            return
        }

        response, err := http.Post("http://" + databaseLocation + "/finishTask?id=" + values.Get("id"), "test/plain", nil)
        if err != nil {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error:", err)
            return
        }

        _, err = io.Copy(w, response.Body)
        if err != nil {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error:", err)
            return
        }
    } else {
        w.WriteHeader(http.StatusBadRequest)
        fmt.Fprint(w, "Error: Only POST accepted")
    }
}

There’s not much to explain. They are both just passing further the request and responding with what they get.

You could think the workers should communicate directly with the database to get new Tasks. And with the current implementation it would work perfectly. However, if we wanted to add some functionality the master wanted to do for each of those requests it would be hard to implement. So this way is very extensible, and that’s nearly always what we want.

Conclusion

Now we have finished the Master and the Storage system. We now have the dependencies to create the workers and frontend which we will implement in the next part. As always I encourage you to comment about your opinion. Have fun extending the system to do what you want to achieve!

Next part

Web app using Microservices in Go: Part 2 – k/v store and Database

Previous part

Introduction

In this part we will implement part of the microservices needed for our web app. We will implement the:
* key-value store
* Database

This will be a pretty code heavy tutorial so concentrate and have fun!

The key-value store

Design

The design hasn’t changed much. We will save the key-value pairs as a global map, and create a global mutex for concurrent access. We’ll also add the ability to list all key-value pairs for debugging/analytical purposes. We will also add the ability to delete existing entries.

First, let’s create the structure:

package main

import (
    "net/http"
    "sync"
    "net/url"
    "fmt"
)

var keyValueStore map[string]string
var kVStoreMutex sync.RWMutex

func main() {
    keyValueStore = make(map[string]string)
    kVStoreMutex = sync.RWMutex{}
    http.HandleFunc("/get", get)
    http.HandleFunc("/set", set)
    http.HandleFunc("/remove", remove)
    http.HandleFunc("/list", list)
    http.ListenAndServe(":3000", nil)
}

func get(w http.ResponseWriter, r *http.Request) {
}

func set(w http.ResponseWriter, r *http.Request) {
}

func remove(w http.ResponseWriter, r *http.Request) {
}

func list(w http.ResponseWriter, r *http.Request) {
}

And now let’s dive into the implementation.

First, we should add parameter parsing in the get function and verify that the key parameter is right.

func get(w http.ResponseWriter, r *http.Request) {
    if(r.Method == http.MethodGet) {
        values, err := url.ParseQuery(r.URL.RawQuery)
        if err != nil {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error:", err)
            return
        }
        if len(values.Get("key")) == 0 {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error:","Wrong input key.")
            return
        }
    } else {
        w.WriteHeader(http.StatusBadRequest)
        fmt.Fprint(w, "Error: Only GET accepted.")
    }
}

The key shouldn’t have a length of 0, hence the length check. We also check if the method is GET, if it isn’t we print it and set the status code to bad request.
We answer with an explicit Error: before each error message so it doesn’t get misinterpreted by the client as a value.

Now, let’s access our map and send back a response:

if len(values.Get("key")) == 0 {
    w.WriteHeader(http.StatusBadRequest)
    fmt.Fprint(w, "Error:","Wrong input key.")
    return
}

kVStoreMutex.RLock()
value := keyValueStore[string(values.Get("key"))]
kVStoreMutex.RUnlock()

fmt.Fprint(w, value)

We copy the value into a variable so that we don’t block the map while sending back the response.

Now let’s create the set function, it’s actually pretty similar.

func set(w http.ResponseWriter, r *http.Request) {
    if(r.Method == http.MethodPost) {
        values, err := url.ParseQuery(r.URL.RawQuery)
        if err != nil {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error:", err)
            return
        }
        if len(values.Get("key")) == 0 {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error:", "Wrong input key.")
            return
        }
        if len(values.Get("value")) == 0 {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error:", "Wrong input value.")
            return
        }

        kVStoreMutex.Lock()
        keyValueStore[string(values.Get("key"))] = string(values.Get("value"))
        kVStoreMutex.Unlock()

        fmt.Fprint(w, "success")
    } else {
        w.WriteHeader(http.StatusBadRequest)
        fmt.Fprint(w, "Error: Only POST accepted.")
    }
}

The only difference is that we also check if there is a right value parameter and check if the method is POST.

Now we can add the implementation of the list function which is also pretty simple:

func list(w http.ResponseWriter, r *http.Request) {
    if(r.Method == http.MethodGet) {
        kVStoreMutex.RLock()
        for key, value := range keyValueStore {
            fmt.Fprintln(w, key, ":", value)
        }
        kVStoreMutex.RUnlock()
    } else {
        w.WriteHeader(http.StatusBadRequest)
        fmt.Fprint(w, "Error: Only GET accepted.")
    }
}

It just ranges over the map and prints everything. Simple yet effective.

And to finish the key-value store we will implement the remove function:

func remove(w http.ResponseWriter, r *http.Request) {
    if(r.Method == http.MethodDelete) {
        values, err := url.ParseQuery(r.URL.RawQuery)
        if err != nil {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error:", err)
            return
        }
        if len(values.Get("key")) == 0 {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error:", "Wrong input key.")
            return
        }

        kVStoreMutex.Lock()
        delete(keyValueStore, values.Get("key"))
        kVStoreMutex.Unlock()

        fmt.Fprint(w, "success")
    } else {
        w.WriteHeader(http.StatusBadRequest)
        fmt.Fprint(w, "Error: Only DELETE accepted.")
    }
}

It’s the same as setting a value, but instead of setting it we delete it.

The database

Design

After thinking through the design, I decided that it would be better if the database generated the task Id‘s. This will also make it easier to get the last non-finished task and generate consecutive Id‘s

How it will work:
* It will save new tasks assigning consecutive Id‘s.
* It will allow to get a new task to do.
* It will allow to get a task by Id.
* It will allow to set a task by Id.
* The state will be represented by an int:
* 0 – not started
* 1 – in progress
* 2 – finished
* It will change the state of a task to not started if it’s been too long in progress. (maybe someone started to work on it but has crashed)
* It will allow to list all tasks for debugging/analytical purposes.

Database microservice post

Implementation

First, we should create the API and later we will add the implementations of the functionality as before with the key-value store. We will also need a global map being our data store, a variable pointing to the oldest not started task, and mutexes for accessing the datastore and pointer.

package main

import (
    "net/http"
    "net/url"
    "fmt"
)

type Task struct {
}

var datastore map[int]Task
var datastoreMutex sync.RWMutex
var oldestNotFinishedTask int // remember to account for potential int overflow in production. Use something bigger.
var oNFTMutex sync.RWMutex

func main() {

    datastore = make(map[int]Task)
    datastoreMutex = sync.RWMutex{}
    oldestNotFinishedTask = 0
    oNFTMutex = sync.RWMutex{}

    http.HandleFunc("/getById", getById)
    http.HandleFunc("/newTask", newTask)
    http.HandleFunc("/getNewTask", getNewTask)
    http.HandleFunc("/finishTask", finishTask)
    http.HandleFunc("/setById", setById)
    http.HandleFunc("/list", list)
    http.ListenAndServe(":3001", nil)
}

func getById(w http.ResponseWriter, r *http.Request) {
}

func newTask(w http.ResponseWriter, r *http.Request) {
}

func getNewTask(w http.ResponseWriter, r *http.Request) {
}

func finishTask(w http.ResponseWriter, r *http.Request) {
}

func setById(w http.ResponseWriter, r *http.Request) {
}

func list(w http.ResponseWriter, r *http.Request) {
}

We also already declared the Task type which we will use for storage.

So far so good. Now let’s implement all those functions!

First, let’s implement the getById function.

func getById(w http.ResponseWriter, r *http.Request) {
    if r.Method == http.MethodGet {
        values, err := url.ParseQuery(r.URL.RawQuery)
        if err != nil {
            fmt.Fprint(w, err)
            return
        }
        if len(values.Get("id")) == 0 {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Wrong input")
            return
        }

        id, err := strconv.Atoi(string(values.Get("id")))
        if err != nil {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, err)
            return
        }

        datastoreMutex.RLock()
        bIsInError := err != nil || id >= len(datastore) // Reading the length of a slice must be done in a synchronized manner. That's why the mutex is used.
        datastoreMutex.RUnlock()

        if bIsInError {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Wrong input")
            return
        }

        datastoreMutex.RLock()
        value := datastore[id]
        datastoreMutex.RUnlock()

        response, err := json.Marshal(value)

        if err != nil {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, err)
            return
        }

        fmt.Fprint(w, string(response))
    } else {
        w.WriteHeader(http.StatusBadRequest)
        fmt.Fprint(w, "Error: Only GET accepted")
    }
}

We check if the GET method has been used. Later we parse the id argument and check if it’s proper. We then get the id as an int using the strconv.Atoi function. Next we make sure it is not out of bounds for our datastore, which we have to do using mutexes because we’re accessing a map which could be accessed from another thread. If everything is ok, then, again using mutexes, we get the task using the id.

After that we use the JSON library to marshal our struct into a JSON object and if that finishes without problems we send the JSON object to the client.

It’s also time to implement our Task struct:

type Task struct {
    Id int `json:"id"`
    State int `json:"state"`
}

It’s all that’s needed. We also added the information the JSON marshaller needs.

We can now go on with implementing the newTask function:

func newTask(w http.ResponseWriter, r *http.Request) {
    if r.Method == http.MethodPost {
        datastoreMutex.Lock()
        taskToAdd := Task{
            Id: len(datastore),
            State: 0,
        }
        datastore[taskToAdd.Id] = taskToAdd
        datastoreMutex.Unlock()

        fmt.Fprint(w, taskToAdd.Id)
    } else {
        w.WriteHeader(http.StatusBadRequest)
        fmt.Fprint(w, "Error: Only POST accepted")
    }
}

It’s pretty small actually. Creating a new Task with the next id and adding it to the datastore. After that it sends back the new Tasks Id.

That means we can go on to implementing the function used to list all Tasks, as this helps with debugging during writing.

It’s basically the same as with the key-value store:

func list(w http.ResponseWriter, r *http.Request) {
    if r.Method == http.MethodGet {
        datastoreMutex.RLock()
        for key, value := range datastore {
            fmt.Fprintln(w, key, ": ", "id:", value.Id, " state:", value.State)
        }
        datastoreMutex.RUnlock()
    } else {
        w.WriteHeader(http.StatusBadRequest)
        fmt.Fprint(w, "Error: Only GET accepted")
    }
}

Ok, so now we will implement the function which can set the Task by id:

func setById(w http.ResponseWriter, r *http.Request) {
    if r.Method == http.MethodPost {
        taskToSet := Task{}

        data, err := ioutil.ReadAll(r.Body)
        if err != nil {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, err)
            return
        }
        err = json.Unmarshal([]byte(data), &taskToSet)
        if err != nil {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, err)
            return
        }

        bErrored := false
        datastoreMutex.Lock()
        if taskToSet.Id >= len(datastore) || taskToSet.State > 2 || taskToSet.State < 0 {
            bErrored = true
        } else {
            datastore[taskToSet.Id] = taskToSet
        }
        datastoreMutex.Unlock()

        if bErrored {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error: Wrong input")
            return
        }

        fmt.Fprint(w, "success")
    } else {
        w.WriteHeader(http.StatusBadRequest)
        fmt.Fprint(w, "Error: Only POST accepted")
    }
}

Nothing new. We get the request and try to unmarshal it. If it succeeds we put it into the map, checking if it isn’t out of bounds or if the state is invalid. If it is then we print an error, otherwise we print success.

If we already have this we can now implement the finish task function, because it’s very simple:

func finishTask(w http.ResponseWriter, r *http.Request) {
    if r.Method == http.MethodPost {
        values, err := url.ParseQuery(r.URL.RawQuery)
        if err != nil {
            fmt.Fprint(w, err)
            return
        }
        if len(values.Get("id")) == 0 {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Wrong input")
            return
        }

        id, err := strconv.Atoi(string(values.Get("id")))

        if err != nil {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, err)
            return
        }

        updatedTask := Task{Id: id, State: 2}

        bErrored := false

        datastoreMutex.Lock()
        if datastore[id].State == 1 {
            datastore[id] = updatedTask
        } else {
            bErrored = true
        }
        datastoreMutex.Unlock()

        if bErrored {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error: Wrong input")
            return
        }

        fmt.Fprint(w, "success")
    } else {
        w.WriteHeader(http.StatusBadRequest)
        fmt.Fprint(w, "Error: Only POST accepted")
    }
}

It’s pretty similar to the getById function. The difference here is that here we update the state and only if it is currently in progress.

And now to one of the most interesting functions. The getNewTask function. It has to handle updating the oldest known finished task, and it also needs to handle the situation when someone takes a task but crashes during work. This would lead to a ghost task forever being in progress. That’s why we’ll add functionality which after 120 seconds from starting a task will set it back to not started:

func getNewTask(w http.ResponseWriter, r *http.Request) {
    if r.Method == http.MethodPost {

        bErrored := false

        datastoreMutex.RLock()
        if len(datastore) == 0 {
            bErrored = true
        }
        datastoreMutex.RUnlock()

        if bErrored {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error: No non-started task.")
            return
        }

        taskToSend := Task{Id: -1, State: 0}

        oNFTMutex.Lock()
        datastoreMutex.Lock()
        for i := oldestNotFinishedTask; i < len(datastore); i++ {
            if datastore[i].State == 2 && i == oldestNotFinishedTask {
                oldestNotFinishedTask++
                continue
            }
            if datastore[i].State == 0 {
                datastore[i] = Task{Id: i, State: 1}
                taskToSend = datastore[i]
                break
            }
        }
        datastoreMutex.Unlock()
        oNFTMutex.Unlock()

        if taskToSend.Id == -1 {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error: No non-started task.")
            return
        }

        myId := taskToSend.Id

        go func() {
            time.Sleep(time.Second * 120)
            datastoreMutex.Lock()
            if datastore[myId].State == 1 {
                datastore[myId] = Task{Id: myId, State: 0}
            }
            datastoreMutex.Unlock()
        }()

        response, err := json.Marshal(taskToSend)

        if err != nil {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, err)
            return
        }

        fmt.Fprint(w, string(response))
    } else {
        w.WriteHeader(http.StatusBadRequest)
        fmt.Fprint(w, "Error: Only POST accepted")
    }
}

First we try to find the oldest task that hasn’t started yet. By the way we update the oldestNotFinishedTask variable. If a task is finished and is pointed on by the variable, the variable get’s incremented. If we find something that’s not started, then we break out of the loop and send it back to the user setting it to in progress. However, on the way we start a function on another thread that will change the state of the task back to not started if it’s still in progress after 120 seconds.

Now the last thing. A database is useless… when you don’t know where it is! That’s why we’ll now implement the mechanism that the database will use to register itself in the key-value store:

func main() {

    if !registerInKVStore() {
        return
    }

    datastore = make(map[int]Task)

and later we define the function:

func registerInKVStore() bool {
    if len(os.Args) < 3 {
        fmt.Println("Error: Too few arguments.")
        return false
    }
    databaseAddress := os.Args[1] // The address of itself
    keyValueStoreAddress := os.Args[2]

    response, err := http.Post("http://" + keyValueStoreAddress + "/set?key=databaseAddress&value=" + databaseAddress, "", nil)
    if err != nil {
        fmt.Println(err)
        return false
    }
    data, err := ioutil.ReadAll(response.Body)
    if err != nil {
        fmt.Println(err)
        return false
    }
    if response.StatusCode != http.StatusOK {
        fmt.Println("Error: Failure when contacting key-value store: ", string(data))
        return false
    }
    return true
}

We check if there are at least 3 arguments. (The first being the executable) We read the current database address from the second argument and the key-value store address from the third argument. We use them to make a POST request where we add a databaseAddress key to the k/v store and set its value to the current database address. If the status code of the response isn’t OK then we know we messed up and we print the error we got. After that we quit the program.

Conclusion

We now have finished our k/v store and our database. You can even test them now using a REST client. (I used this one.) Remember that the code is subject to change if it will be necessary but I don’t think so. I hope you enjoyed the tutorial! I encourage you to comment, and if you have an opposing view to mine please make sure to express it in a comment too!

UPDATE: I changed the sync.Mutex to sync.RWMutex, and in the places where we only read data I changed mutex.Lock/Unlock to mutex.RLock/RUnlock.

UPDATE2: For some reason I used a slice for the database code although I tested with a map. Sorry for that, corrected it already.

Next part

Web app using Microservices in Go: Part 1 – Design

Introduction

Recently it’s a constantly repeated buzzword – Microservices. You can love ’em or hate ’em, but you really shouldn’t ignore ’em. In this short series we’ll create a web app using a microservice architecture. We’ll try not to use 3rd party tools and libraries. Remember though that when creating a production web app it is highly recommended to use 3rd party libraries (even if only to save you time).

We will create the various components in a basic form. We won’t use advanced caching or use a database. We will create a basic key-value store and a simple storage service. We will use the Go language for all this.

UPDATE: as there are comments regarding overcomplication: this is meant to show a scalable and working skeleton for a microservice architecture. If you only want to add some filters to photos, don’t design it like that. It’s overkill.

On further thought and another comment, (Which you can find on the golang Reddit) do design it this way. Software usually lives much longer than we think it will, and such a design will lead to an easily extendable and scalable web app.

The functionality

First we should decide what our web app will do. The web app we’ll create in this series will get an image from a user and give back an unique ID. The image will get modified using complicated and highly sophisticated algorithms, like swapping the blue and red channel, and the user will be able to use the ID to check if the work on the image has been finished already or if it’s still in progress. If it’s finished he will be able to download the altered image.

Designing the architecture

We want the architecture to be microservices, so we should design it like that. We’ll for sure need a service facing the user, the one that provides the interface for communication with our app. This could also handle authentication, and should be used as the service redirecting the workload to the right sub-services. (useful if you plan to integrate more funcionality into the app)

We will also want a microservice which will handle all our images. It will get the image, generate an ID, store information related to each task, and save the images. To handle high workloads it’s a good idea to use a master-slave system for our image modification service. The image handler will be the master, and we will create slave microservices which will ask the master for images to work on.

We will also need a key-value datastore for various configuration, a storage system, for saving our images, pre- and post-modification, and a database-ish service holding the information about each task.

This should suffice to begin with.

Here I’d like to also state that the architecture could change during the series if needed. And I encourage you to comment if you think that something could be done better.

Communication

We will also need to define the method the services communicate by. In this app we will use REST everywhere. You could also use a message BUS or Remote Procedure Calls – short RPC, but I won’t write about them here.

Designing the microservice API’s

Another important thing is to design the API‘s of you microservices. We will now design each of them to get an understanding about what they are for.

The key-value store

This one’s mainly for configuration. It will have a simple post-get interface:

  • POST:
    • Arguments:
      • Key
      • Value
    • Response:
      • Success/Failure
  • GET:
    • Arguments:
      • Key
    • Response:
      • Value/Failure

The storage

Here we will store the images, again using a key-value interface and an argument stating if this one’s pre- or post-modification. For the sake of simplicity we will just save the image to a folder named, depending on the state of the image, finished/inProgress.

  • POST:
    • Arguments:
      • Key
      • State: pre-/post-modification
      • Data
    • Response:
      • Success/Failure
  • GET:
    • Arguments:
      • Key
      • State: pre-/post-modification
    • Response:
      • Data/Failure

Database

This one will save our tasks. If they are waiting to start, in progress or finished, their Id.

  • POST:
    • Arguments:
      • TaskId
      • State: not started/ in progress/ finished
    • Response:
      • Success/Failure
  • GET:
    • Arguments:
      • TaskId
    • Response:
      • State/Failure
  • GET:
    • Path:
      • not started/ in progress/ finished
    • Reponse:
      • list of TaskId’s

The Frontend

The frontend is there mainly to provide a communication way between the various services and the user. It can also be used for authentication and authorization.

  • POST:
    • Path:
      • newImage
    • Arguments:
      • Data
    • Response:
      • Id
  • GET:
    • Path:
      • image/isReady
    • Arguments:
      • Id
    • Response:
      • not found/ in progress / finished
  • GET:
    • Path:
      • image/get
    • Arguments:
      • Id
    • Response:
      • Data

Image master microservice

This one will get new images from the fronted/user and send them to the storage service. It will also create a new task in the database, and orchestrate the workers who can ask for work and notify when it’s finished.

  • Frontend interface:
    • POST:
      • Path:
        • newImage
      • Arguments:
        • Data
      • Response:
        • Id
    • GET:
      • Path:
        • isReady
      • Arguments:
        • Id
      • Response:
        • not found/ in progress / finished
    • GET:
      • Path:
        • get
      • Arguments:
        • Id
      • Response:
        • Data/Failure
  • Worker interface:
    • GET:
      • Path:
        • getWork
      • Response:
        • Id/noWorkToDo
    • POST:
      • Path:
        • workFinished
      • Arguments:
        • Id
      • Response:
        • Success/Failure

Image worker microservice

This one doesn’t have any API. It is a client to the master image service, which he finds using the key-value store. He gets the image data to work on from the storage service.

Scheme

The microservice architecure design

Conclusion

This is basically everything regarding the design. In the next part we will write part of the microservices. Again, I encourage you to comment expressing what you think about this design!

Next part

Practical Golang: Using Google Drive and Calendar

Introduction

Integrating Google services into your app can lead to a lot of nice features for your users, and can create a seamless experience for them. In this tutorial we’ll learn how to use the most useful functionalities of Google Calendar and Google Drive.

The theory

To begin with, we should understand the methodology of using the Google API in Golang. For most of their API’s I’ve skimmed through it works like that:

  1. Create an OAuth2 client from the OAuth2 access token.
  2. Use the client to create an app service, this will be our interface we’ll use to communicate with Google services.
  3. We create a request object and set the needed parameters.
  4. We start the action, usually using the Do() function on the request object.

After you learn it for one Google service, it will be trivial for you to use it for any other.

Dependencies

Here we will need the Google Calendar and Google Drive libraries, both of which we need version 3 of. (the newest at the moment)

So make sure to:

go get google.golang.org/api/calendar/v3
go get google.golang.org/api/drive/v3

The basic structure

For both of our apps we’ll need the same basic OAuth2 app structure. You can learn more about it in my previous article

package main

import (
    "fmt"
    "net/http"
    "golang.org/x/oauth2"
    "os"
    "golang.org/x/oauth2/google"
    "golang.org/x/net/context"
    "time"
)

var (
    googleOauthConfig = &oauth2.Config{
        RedirectURL:    "http://localhost:3000/GoogleCallback",
        ClientID:     os.Getenv("googlekey"), // from https://console.developers.google.com/project/<your-project-id>/apiui/credential
        ClientSecret: os.Getenv("googlesecret"), // from https://console.developers.google.com/project/<your-project-id>/apiui/credential
        Scopes:       []string{},
        Endpoint:     google.Endpoint,
    }
// Some random string, random for each request
    oauthStateString = "random"
)

const htmlIndex = `<html><body>
<a href="/GoogleLogin">Log in with Google</a>
</body></html>
`

func main() {
    http.HandleFunc("/", handleMain)
    http.HandleFunc("/GoogleLogin", handleGoogleLogin)
    http.HandleFunc("/GoogleCallback", handleGoogleCallback)
    fmt.Println(http.ListenAndServe(":3000", nil))
}
func handleMain(w http.ResponseWriter, r *http.Request) {
    fmt.Fprintf(w, htmlIndex)
}

func handleGoogleLogin(w http.ResponseWriter, r *http.Request) {
    url := googleOauthConfig.AuthCodeURL(oauthStateString)
    http.Redirect(w, r, url, http.StatusTemporaryRedirect)
}

func handleGoogleCallback(w http.ResponseWriter, r *http.Request) {
    state := r.FormValue("state")
    if state != oauthStateString {
        fmt.Printf("invalid oauth state, expected '%s', got '%s'\n", oauthStateString, state)
        http.Redirect(w, r, "/", http.StatusTemporaryRedirect)
        return
    }

    code := r.FormValue("code")
    token, err := googleOauthConfig.Exchange(oauth2.NoContext, code)
    if err != nil {
        fmt.Printf("oauthConf.Exchange() failed with '%s'\n", err)
        http.Redirect(w, r, "/", http.StatusTemporaryRedirect)
        return
    }

    client := oauth2.NewClient(context.Background(), oauth2.StaticTokenSource(token))
}

There is one thing I haven’t covered in my previous blog post, namely the last line:

client := oauth2.NewClient(context.Background(), oauth2.StaticTokenSource(token))

We need an OAuth2 client to use the Google API, so we create one. It takes a context, for lack of which we just use the background context. It also needs a token source. As we only want to make one request and know that this token will suffice we create a static token source which will always generate the same token which we’ve passed to it.

Creating the Calendar app.

Preperation

First, as described in my previous article, you should enable the Google Calendar API in the Google Cloud Console for you app.

Also, we’ll need to ask for permission, so add the https://www.googleapis.com/auth/calendar scope to our googleOauthConfig:

googleOauthConfig = &oauth2.Config{
        RedirectURL:    "http://localhost:3000/GoogleCallback",
        ClientID:     os.Getenv("googlekey"), // from https://console.developers.google.com/project/<your-project-id>/apiui/credential
        ClientSecret: os.Getenv("googlesecret"), // from https://console.developers.google.com/project/<your-project-id>/apiui/credential
        Scopes:       []string{"https://www.googleapis.com/auth/calendar"},
        Endpoint:     google.Endpoint,
    }

Remember to import google.golang.org/api/calendar/v3

The main code

We’ll add everything we write right after creating our OAuth2 client.

First, as I described before, we’ll need an app service, here it will be the calendar service, so let’s create it!

client := oauth2.NewClient(context.Background(), oauth2.StaticTokenSource(token))

calendarService, err := calendar.New(client)
if err != nil {
  fmt.Fprintln(w, err)
  return
}

It just uses the OAuth client to create the service and errors out if something goes wrong.

Listing events

Now we will create a request, add a few optional parameters to it and start it. We’ll build it up step by step.

calendarService, err := calendar.New(client)
if err != nil {
  fmt.Fprintln(w, err)
  return
}

calendarService.Events.List("primary")

This creates a request to list all events in your primary calendar. You could also name a specific calendar, but using primary will take the primary calendar of that user.

So… I think we don’t really care about the events 5 years ago. So let’s only take the upcoming ones.

calendarService.Events.List("primary").TimeMin(time.Now().Format(time.RFC3339))

We add an option TimeMin which takes a DateTime by string… No idea why it isn’t a nice struct like time.DateTime. You also need to format it as a string in the RFC3339 format.

Ok… but that could be a lot of events, so we’ll just take the 5 first:

calendarService.Events.List("primary").TimeMin(time.Now().Format(time.RFC3339)).MaxResults(5)

Now we just have to Do() it, and store the results:

calendarEvents, err := calendarService.Events.List("primary").TimeMin(time.Now().Format(time.RFC3339)).MaxResults(5).Do()
if err != nil {
  fmt.Fprintln(w, err)
  return
}

How can we now do something with the results? Simple! :

calendarEvents, err := calendarService.Events.List("primary").TimeMin(time.Now().Format(time.RFC3339)).MaxResults(5).Do()
if err != nil {
  fmt.Fprintln(w, err)
  return
}

if len(calendarEvents.Items) > 0 {
    for _, i := range calendarEvents.Items {
        fmt.Fprintln(w, i.Summary, " ", i.Start.DateTime)
    }
}

We access a list of events using the Items field in the calendarEvents variable, if there is at least one element, then for each element we write the summary and start time to the response writer using a for range loop.

Creating an event

Ok, we already know how to list events, now let’s create an event!
First, we need an event object:

if len(calendarEvents.Items) > 0 {
    for _, i := range calendarEvents.Items {
        fmt.Fprintln(w, i.Summary, " ", i.Start.DateTime)
    }
}
newEvent := calendar.Event{
    Summary: "Testevent",
    Start: &calendar.EventDateTime{DateTime: time.Date(2016, 3, 11, 12, 24, 0, 0, time.UTC).Format(time.RFC3339)},
    End: &calendar.EventDateTime{DateTime: time.Date(2016, 3, 11, 13, 24, 0, 0, time.UTC).Format(time.RFC3339)},
}

We create an Event struct and pass in the summary – title of the event.
We also pass the start and finish DateTime. We create a date using the stdlib time package, and then convert it to a string in the RFC3339 format. There are tons of other optional fields you can specify if you want to.

Next we need to create an insert request object:

newEvent := calendar.Event{
    Summary: "Testevent",
    Start: &calendar.EventDateTime{DateTime: time.Date(2016, 3, 11, 12, 24, 0, 0, time.UTC).Format(time.RFC3339)},
    End: &calendar.EventDateTime{DateTime: time.Date(2016, 3, 11, 13, 24, 0, 0, time.UTC).Format(time.RFC3339)},
}
calendarService.Events.Insert("primary", &newEvent)

The Insert request takes two arguments, the calendar name and an event object.

As usual we neeed to Do() the request! and saving the resulting created event can also come handy in the future:

createdEvent, err := calendarService.Events.Insert("primary", &newEvent).Do()
if err != nil {
    fmt.Fprintln(w, err)
    return
}

In the end let’s just print some kind of confirmation to the user:

fmt.Fprintln(w, "New event in your calendar: \"", createdEvent.Summary, "\" starting at ", createdEvent.Start.DateTime)

Hint

You can define the event ID yourself before creating it, but you can also let the Google Calendar service generate an ID for us as we did.

Creating the Drive app

Preperation

First, enable the Google Drive API in the Google Cloud Console

Again we will need to ask the user for permission. So we have to use the https://www.googleapis.com/auth/drive and https://www.googleapis.com/auth/drive.file scopes:

googleOauthConfig = &oauth2.Config{
        RedirectURL:    "http://localhost:3000/GoogleCallback",
        ClientID:     os.Getenv("googlekey"), // from https://console.developers.google.com/project/<your-project-id>/apiui/credential
        ClientSecret: os.Getenv("googlesecret"), // from https://console.developers.google.com/project/<your-project-id>/apiui/credential
        Scopes:       []string{"https://www.googleapis.com/auth/drive", "https://www.googleapis.com/auth/drive.file"},
        Endpoint:     google.Endpoint,
    }

Remember to import google.golang.org/api/drive/v3″

The main code

Again we need to start from our basic OAuth2 app structure with an OAuth2 client, and create an app service. Here, this will be the Drive service:

client := oauth2.NewClient(context.Background(), oauth2.StaticTokenSource(token))

driveService, err := drive.New(client)
if err != nil {
    fmt.Fprintln(w, err)
    return
}

Listing files

Ok, to begin with let’s learn how to list files from the Google Drive. There is an important concept you should understand before we start. Whenever we request a list of files from Google Drive, those can literally be thousands of files. That’s why we get one page of files, which includes files metadata, not the content, and a NextPageToken, which we can use to get the next page.

As before with the calendar app, we create a list request.

driveService, err := drive.New(client)
if err != nil {
    fmt.Fprintln(w, err)
    return
}

driveService.Files.List()

But we can also set an option. The ordering for example:

driveService.Files.List().OrderBy("name")

Ok, now let’s Do() it and save the results:

myFilesList, err := driveService.Files.List().OrderBy("name").Do()
if err != nil {
    fmt.Fprintf(w, "Couldn't retrieve files ", err)
}

As I wrote before, this is one page of files, now let’s go over it:

if len(myFilesList.Files) > 0 {
    for _, i := range myFilesList.Files {
        fmt.Fprintln(w, i.Name, " ", i.Id)
    }
} else {
    fmt.Fprintln(w, "No files found.")
}

We check if there are any files, and if there are, we range over them printing the name and Id of each one, else we print that there are none.

Now, let’s get more pages, it’s the myFilesList.NextPageToken holding the token for the next page. If it is an empty string, then this was the last page. While it is present we load new pages into our myFilesList variable.

for myFilesList.NextPageToken != "" {
    myFilesList, err = driveService.Files.List().OrderBy("name").PageToken(myFilesList.NextPageToken).Do()
    if err != nil {
        fmt.Fprintf(w, "Couldn't retrieve files ", err)
        break
    }
    fmt.Fprintln(w, "Next Page: !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
    if len(myFilesList.Files) > 0 {
        for _, i := range myFilesList.Files {
            fmt.Fprintln(w, i.Name, " ", i.Id)
        }
    } else {
        fmt.Fprintln(w, "No files found.")
    }
}

To retrieve the next page we add the PageToken option to our file listing request

    myFilesList, err = driveService.Files.List().OrderBy("name").PageToken(myFilesList.NextPageToken).Do()

Whenever we start printing the new page, we notify a user that a new page just started. Later we check if we have any files, and if we do, then we range over them as before, printing the names and Id’s

Creating a file

We already know how to list files in our Google Drive. Now let’s learn how to create a file and add some content to it!

First, we need to create the file metadata:

for myFilesList.NextPageToken != "" {
    myFilesList, err = driveService.Files.List().OrderBy("name").PageToken(myFilesList.NextPageToken).Do()
    if err != nil {
        fmt.Fprintf(w, "Couldn't retrieve files ", err)
        break
    }
    fmt.Fprintln(w, "Next Page: !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
    if len(myFilesList.Files) > 0 {
        for _, i := range myFilesList.Files {
            fmt.Fprintln(w, i.Name, " ", i.Id)
        }
    } else {
        fmt.Fprintln(w, "No files found.")
    }
}


myFile := drive.File{Name: "cats.png"}

Again, the Id will be generated for us if we do not provide any.

Putting the file into Google Drive is pretty easy now. We create a create request, referencing our file metadata in it:

myFile := drive.File{Name: "cats.png"}

driveService.Files.Create(&myFile)

and Do() it, saving the results:

createdFile, err := driveService.Files.Create(&myFile).Do()
if err != nil {
    fmt.Fprintf(w, "Couldn't create file ", err)
}

Ok, if you started this code, you would get a cats.png file. However, it’s empty. So let’s add some data to it!

To add it, we’ll first need some data. We can load it from a file:

createdFile, err := driveService.Files.Create(&myFile).Do()
if err != nil {
    fmt.Fprintf(w, "Couldn't create file ", err)
}
myImage, err := os.Open("/tmp/image.png")
if err != nil {
    fmt.Fprintln(w, err)
}

Now we have to create the updated file metadata:

myImage, err := os.Open("/tmp/image.png")
if err != nil {
    fmt.Fprintln(w, err)
}
updatedFile := drive.File{Name: "catsUpdated.png"}

We have to construct an update request:

driveService.Files.Update(createdFile.Id, &updatedFile)

Here we specify the Id of the file to modify, and the new metadata. We’ll now add the data to the update request, and Do() it, checking for errors and ignoring the result.

_, err = driveService.Files.Update(createdFile.Id, &updatedFile).Media(myImage).Do()
if err != nil {
    fmt.Fprintln(w, err)
}
fmt.Fprintln(w, createdFile.Id)

In the end we send the new file id to the user.

Hint

You could add the Media option already to the create request if you wanted.

Conclusion

I suppose this was a good short introduction into the structure of the Google API. After learning these, you should have few to no problems using the other API’s.

Have fun integrating it into your app!

Practical Golang: Using websockets

Introduction

This is the first post in the practical Golang series. Posts in it are meant to provide short and informative introductions to various topics.

This one is a about websockets, which are an awesome and easy way to provide communication between your web app and server.

Here we will use the gorilla websocket library, but you could also use a few others.

We will create two basic apps which should cover most day to day usage:
1. A client subscribing to a server to get continues information.
2. A client-ping server-pong app.

Dependencies

Make sure to go get:

go get github.com/gorilla/websocket

What’s the theory?

  1. The client connects to your server using his web browser.
  2. He gets back the website.
  3. He connects to your server through his websocket client using javascript.
  4. Your server accepts it using a standard http handler.
  5. You create a websocket connection from the http connection.
  6. You communicate with the client.
  7. The connection gets closed by one of the sides.

Creating the subscription app

Preparations

Next to our main go file we will need a html folder to place our html file in.

Let’s name it creatively, like index.html

Now the contents:

<!DOCTYPE HTML>
<html>
<head>

    <script type="text/javascript">
         function myWebsocketStart()
         {
               var ws = new WebSocket("ws://localhost:3000/websocket");

               ws.onmessage = function (evt)
               {
                  var myTextArea = document.getElementById("textarea1");
                  myTextArea.value = myTextArea.value + "\n" + evt.data
               };

               ws.onclose = function()
               {
                  var myTextArea = document.getElementById("textarea1");
                  myTextArea.value = myTextArea.value + "\n" + "Connection closed";
               };

         }

    </script>

</head>
<body>
<button onclick="javascript:myWebsocketStart()">Subscribe</button>
<textarea id="textarea1">MyTextArea</textarea>
</body>
</html>

I’ll just go over it quickly as the main subject here is the go code.

We create a button and a textarea, after the user clicks the button he connects to our websocket. Whenever he receives a message, or the connection gets closed, we append the info to our textarea.

We will also need our, so creatively named, main.go file, with the basic structure and file server written:

package main

import (
    "github.com/gorilla/websocket"
    "net/http"
    "os"
    "fmt"
    "io/ioutil"
    "time"
    "encoding/json"
)

func main() {
    indexFile, err := os.Open("html/index.html")
    if err != nil {
        fmt.Println(err)
    }
    index, err := ioutil.ReadAll(indexFile)
    if err != nil {
        fmt.Println(err)
    }
    http.HandleFunc("/websocket", func(w http.ResponseWriter, r *http.Request) {
    })
    http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
        fmt.Fprintf(w, string(index))
    })
    http.ListenAndServe(":3000", nil)
}

Awesome, now let’s create the websocket part.

Writing the websocket code

Little bit of planning

Our server will create a Person object containing a name and age in seconds. Every two seconds it will send the client the current state of the person.

The meat

First we’ll need to define our Person type:

type Person struct {
    Name string
    Age  int
}

We’ll also need to create an upgraded variable, in which we define our read and write buffer sizes.

var upgrader = websocket.Upgrader{
    ReadBufferSize: 1024,
    WriteBufferSize: 1024,
}

Now, how do we create the websocket connection? Pretty easily in fact:

http.HandleFunc("/websocket", func(w http.ResponseWriter, r *http.Request) {
  conn, err := upgrader.Upgrade(w, r, nil)
  if err != nil {
    fmt.Println(err)
    return
  }
  fmt.Println("Client subscribed")
}

That’s all we need to have a client. Now let’s create Bill, our person, right after we get the client subscribed:

fmt.Println("Client subscribed")

myPerson := Person{
  Name: "Bill",
  Age:  0,
}

Now we need the main websocket handling code, which we will wrap into an endless for loop, which we get out of only if the channel closes or Bill gets 40 seconds old.

for {
  time.Sleep(2 * time.Second)
  if myPerson.Age < 40 {
    myJson, err := json.Marshal(myPerson)
    if err != nil {
      fmt.Println(err)
      return
    }
    err = conn.WriteMessage(websocket.TextMessage, myJson)
    if err != nil {
      fmt.Println(err)
      break
    }
    myPerson.Age += 2
  } else {
    conn.Close()
    break
  }
}
fmt.Println("Client unsubscribed")

We send the messages using conn.WriteMessage in which we specify the message type, can be binary or text, and the content. If Bill is 40 years old or more, we break out of the loop. So far so good, but what if we want bidirectional communication?

Creating the ping-pong app

Preparations

As before, we will need a html folder for our html file with the creative name index.html

And here’s the code:

<!DOCTYPE HTML>
<html>
<head>

    <script type="text/javascript">
         function myWebsocketStart()
         {
               var ws = new WebSocket("ws://localhost:3000/websocket");

               ws.onopen = function()
               {
                  // Web Socket is connected, send data using send()
                  ws.send("ping");
                  var myTextArea = document.getElementById("textarea1");
                  myTextArea.value = myTextArea.value + "\n" + "First message sent";
               };

               ws.onmessage = function (evt)
               {
                  var myTextArea = document.getElementById("textarea1");
                  myTextArea.value = myTextArea.value + "\n" + evt.data
                  if(evt.data == "pong") {
                    setTimeout(function(){ws.send("ping");}, 2000);
                  }
               };

               ws.onclose = function()
               {
                  var myTextArea = document.getElementById("textarea1");
                  myTextArea.value = myTextArea.value + "\n" + "Connection closed";
               };

         }

    </script>

</head>
<body>
<button onclick="javascript:myWebsocketStart()">Start websocket!</button>
<textarea id="textarea1">MyTextArea</textarea>
</body>
</html>

The only differences are, that when we open the connection, we send a “ping” message and notify our user about it. Now, whenever we get back a “pong” message, we append it to our textarea and after 2 seconds we answer with a “ping” message again.

We will again need the basic go file structure with the upgrader defined already, and the connection created:

package main

import (
    "github.com/gorilla/websocket"
    "net/http"
    "os"
    "fmt"
    "io/ioutil"
    "time"
)

var upgrader = websocket.Upgrader{
    ReadBufferSize: 1024,
    WriteBufferSize: 1024,
}

func main() {
    indexFile, err := os.Open("html/index.html")
    if err != nil {
        fmt.Println(err)
    }
    index, err := ioutil.ReadAll(indexFile)
    if err != nil {
        fmt.Println(err)
    }
    http.HandleFunc("/websocket", func(w http.ResponseWriter, r *http.Request) {
        conn, err := upgrader.Upgrade(w, r, nil)
        if err != nil {
            fmt.Println(err)
            return
        }
    })
    http.HandleFunc("/", func(w http.ResponseWriter, r * http.Request) {
        fmt.Fprintf(w, string(index))
    })
    http.ListenAndServe(":3000", nil)
}

Writing the websocket code

Ok, so now, whenever we get a “ping” message, we wait 2 seconds and answer with a “pong” message. If we get anything else, we just close the connection.

conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
  fmt.Println(err)
  return
}
for {
  msgType, msg, err := conn.ReadMessage()
  if err != nil {
    fmt.Println(err)
    return
  }
  if string(msg) == "ping" {
    fmt.Println("ping")
    time.Sleep(2 * time.Second)
    err = conn.WriteMessage(msgType, []byte("pong"))
    if err != nil {
      fmt.Println(err)
      return
    }
  } else {
    conn.Close()
    fmt.Println(string(msg))
    return
  }
}

Using the ReadMessage function on our connection we get the type, content, and maybe error. We check the message and answer.

Conclusion

That’s actually all, have fun with it and build something great!