Practical Golang: Event multicast/subscription service

Introduction

In our microservice architectures we always need a method for communicating between services. There are various ways to achieve this. Few of them are, but are not limited to: Remote Procedure Call, REST API’s, message BUSses. In this comprehensive tutorial we’ll write a service, which you can use to distribute messages/events across your system.

Design

How will it work? It will accept registering subscribers (other microservices). Whenever it gets a message from a microservice, it will send it further to all subscribers, using a REST call to the other microservices /event URL.

Subscribers will need to call a keep-alive URL regularly, otherwise they will get removed from the subscriber list. This protects us from sending messages to too many ghost subscribers.

Implementation

Let’s start with a basic structure. We’ll define the API and set up our two main data structures:
1. The subscriber list with their register/lastKeepAlive dates.
2. The mutex controlling access to our subscriber list.

package main

import (
    "net/http"
    "time"
    "sync"
    "fmt"
    "net/url"
    "io/ioutil"
    "bytes"
)

var registeredServiceStorage map[string]time.Time
var serviceStorageMutex sync.RWMutex

func main() {
    registeredServiceStorage = make(map[string]time.Time)
    serviceStorageMutex = sync.RWMutex{}

    http.HandleFunc("/registerAndKeepAlive", registerAndKeepAlive)
    http.HandleFunc("/deregister", deregister)
    http.HandleFunc("/sendMessage", handleMessage)
    http.HandleFunc("/listSubscribers", handleSubscriberListing)

    go killZombieServices()
    http.ListenAndServe(":3000", nil)
}

func registerAndKeepAlive(w http.ResponseWriter, r *http.Request) {
}

func deregister(w http.ResponseWriter, r *http.Request) {
}

func handleMessage(w http.ResponseWriter, r *http.Request) {
}

func sendMessageToSubscriber(data []byte, address string) {
}

func handleSubscriberListing(w http.ResponseWriter, r *http.Request) {
}

func killZombieServices() {
}

We initialize our subscriber list and mutex, and also launch, on another thread, a function that will regularly delete ghost subscribers.

So far so good!
We can now start getting into each functions implementation.

We can begin with the registerAndKeepAlive which does both things. Registering a new subscriber, or updating an existing one. This works because in both cases we just update the map entry with the subscriber address to contain the current time.

func registerAndKeepAlive(w http.ResponseWriter, r *http.Request) {
    if r.Method == http.MethodPost {
        //Subscriber registration
    } else {
        w.WriteHeader(http.StatusBadRequest)
        fmt.Fprint(w, "Error: Only POST accepted")
    }
}

The register function should be called with a POST request. That’s why the first thing we do, is checking if the method is right, otherwise we answer with an error. If it’s ok, then we register the client:

if r.Method == http.MethodPost {
    values, err := url.ParseQuery(r.URL.RawQuery)
    if err != nil {
        w.WriteHeader(http.StatusBadRequest)
        fmt.Fprint(w, "Error:", err)
        return
    }
    if len(values.Get("address")) == 0 {
        w.WriteHeader(http.StatusBadRequest)
        fmt.Fprint(w, "Error:","Wrong input address.")
        return
    }

}

We check if the URL arguments are correct, and finally register the subscriber:

if len(values.Get("address")) == 0 {
    w.WriteHeader(http.StatusBadRequest)
    fmt.Fprint(w, "Error:","Wrong input address.")
    return
}

serviceStorageMutex.Lock()
registeredServiceStorage[values.Get("address")] = time.Now()
serviceStorageMutex.Unlock()

fmt.Fprint(w, "success")

Awesome!

Let’s now implement the function which shall delete the entry when the subscriber wants to deregister.

func deregister(w http.ResponseWriter, r *http.Request) {
    if r.Method == http.MethodDelete {
        values, err := url.ParseQuery(r.URL.RawQuery)
        if err != nil {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error:", err)
            return
        }
        if len(values.Get("address")) == 0 {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error:","Wrong input address.")
            return
        }

        //Subscriber deletion will come here

    } else {
        w.WriteHeader(http.StatusBadRequest)
        fmt.Fprint(w, "Error: Only DELETE accepted")
    }
}

Again do we check if the request method is good and if the address argument is correct. If that’s the case, then we can remove this client from our subscriber list.

if len(values.Get("address")) == 0 {
    w.WriteHeader(http.StatusBadRequest)
    fmt.Fprint(w, "Error:","Wrong input address.")
    return
}

serviceStorageMutex.Lock()
delete(registeredServiceStorage, values.Get("address"))
serviceStorageMutex.Unlock()

fmt.Fprint(w, "success")

Now it’s time for the main functionality. Namely handling messages and sending them to all subscribers:

func handleMessage(w http.ResponseWriter, r *http.Request) {
    if r.Method == http.MethodPost {

    } else {
        w.WriteHeader(http.StatusBadRequest)
        fmt.Fprint(w, "Error: Only POST accepted")
    }
}

As usual, we check if the request method is correct.

Then, we read the data we got, so we can pass it to multiple concurrent sending functions.

if r.Method == http.MethodPost {

    data, err := ioutil.ReadAll(r.Body)
    if err != nil {
        fmt.Println(err)
    }
    //...
}

We then lock the mutex for read. That’s important so that we can handle huge amounts of messages efficiently. Basically, it means that we allow others to read while we are reading, because concurrent reading is supported by maps. We can use this unless there’s no one modifying the map.

While we lock the map for read, we check the list of subscribers we have to send the message to, and start concurrent functions that will do the sending. As we don’t want to lock the map for the entire sending time, we only need the addresses.

    data, err := ioutil.ReadAll(r.Body)
    if err != nil {
        fmt.Println(err)
    }

    serviceStorageMutex.RLock()
    for address, _ := range registeredServiceStorage {
        go sendMessageToSubscriber(data, address)
    }
    serviceStorageMutex.RUnlock()

    fmt.Fprint(w, "success")

Which means we now have to implement the sendMessageToSubscriber(…) function.

It’s pretty simple, we just make a post, and print an error if it happened.

func sendMessageToSubscriber(data []byte, address string) {
    _, err := http.Post("http://" + address + "/event", "", bytes.NewBuffer(data))
    if err != nil {
        fmt.Println(err)
    }
}

It’s important to notice, that we have to create a buffer from the data, as the http.Post(…) function needs a reader type data structure.

We’ll also implement the function which makes it possible to list all the subscribers. Mainly for debugging purposes. There’s nothing new in it. We check if the method is alright, lock the mutex for read, and finally print the map with a correct format of the register time.

func handleSubscriberListing(w http.ResponseWriter, r *http.Request) {
    if r.Method == http.MethodGet {
        serviceStorageMutex.RLock()

        for address, registerTime := range registeredServiceStorage {
            fmt.Fprintln(w, address, " : ", registerTime.Format(time.RFC3339))
        }

        serviceStorageMutex.RUnlock()
    } else {
        w.WriteHeader(http.StatusBadRequest)
        fmt.Fprint(w, "Error: Only GET accepted")
    }
}

Now there’s only one function left. The one that will make sure no ghost services stay for too long. It will check all the services once per minute. This way we’re making it cheap on performance:

func killZombieServices() {
    t := time.Tick(1 * time.Minute)

    for range t {
    }
}

This is a nice way to launch the code every minute. We create a channel which will send us the time every minute, and range over it, ignoring the received values.

We can now get the check and remove working.

for range t {
    timeNow := time.Now()
    serviceStorageMutex.Lock()
    for address, timeKeepAlive := range registeredServiceStorage {
        if timeNow.Sub(timeKeepAlive).Minutes() > 2 {
            delete(registeredServiceStorage, address)
        }
    }
    serviceStorageMutex.Unlock()
}

We just range over the subscribers and delete those that haven’t kept their subscription alive.

To add to that, if you wanted you could first make a read-only pass over the subscribers, and immediately after that, make a write-locked deletion of the ones you found. This would allow others to read the map while you’re finding subscribers to delete.

Conclusion

That’s all! Have fun with creating an infrastructure based on such a service!

Web app using Microservices in Go: Part 4 – Worker and Frontend

Previous part

Introduction

In this part we will finally finish writing our application. We will implement the last two services:
1. The Worker
2. The Frontend

The Worker

The worker will communicate with the Master to get new Tasks. When it gets a Task it will get the corresponding data from the storage and will start working on the task. When it finishes it will send the finished data to the storage service, and if that succeeds it will register the Task as finished to the Master.

That means that you can easily scale the workforce, by turning on additional machines, with the worker service on them. Easy scaling is good!

Implementation

As usual we will start with a basic structure which is similar to the structure of our previous services. Although there is one big difference. There won’t be any API here as the worker will be a client. You could, if you wanted, add an API for debugging purposes. Things like getting the processor usage. But this can also be implemented using 3rd party health checking services.

So here’s our basic structure:

package main

import (
    "os"
    "fmt"
    "net/http"
    "io/ioutil"
    "encoding/json"
    "time"
    "strconv"
    "image"
    "image/png"
    "image/color"
    "bytes"
    "sync"
)

type Task struct {
    Id int `json:"id"`
    State int `json:"state"`
}

var masterLocation string
var storageLocation string
var keyValueStoreAddress string

func main() {
    if len(os.Args) < 3 {
        fmt.Println("Error: Too few arguments.")
        return
    }
    keyValueStoreAddress = os.Args[1]

    response, err := http.Get("http://" + keyValueStoreAddress + "/get?key=masterAddress")
    if response.StatusCode != http.StatusOK {
        fmt.Println("Error: can't get master address.")
        fmt.Println(response.Body)
        return
    }
    data, err := ioutil.ReadAll(response.Body)
    if err != nil {
        fmt.Println(err)
        return
    }
    masterLocation = string(data)
    if len(masterLocation) == 0 {
        fmt.Println("Error: can't get master address. Length is zero.")
        return
    }

    response, err = http.Get("http://" + keyValueStoreAddress + "/get?key=storageAddress")
    if response.StatusCode != http.StatusOK {
        fmt.Println("Error: can't get storage address.")
        fmt.Println(response.Body)
        return
    }
    data, err = ioutil.ReadAll(response.Body)
    if err != nil {
        fmt.Println(err)
        return
    }
    storageLocation = string(data)
    if len(storageLocation) == 0 {
        fmt.Println("Error: can't get storage address. Length is zero.")
        return
    }
}

func getNewTask(masterAddress string) (Task, error) {
}
func getImageFromStorage(storageAddress string, myTask Task) (image.Image, error) {
}
func doWorkOnImage(myImage image.Image) image.Image {
}
func sendImageToStorage(storageAddress string, myTask Task, myImage image.Image) error {
}
func registerFinishedTask(masterAddress string, myTask Task) error {
}

It may seem to be much but there is nothing new actually in the most dense parts. We define the Task structure first and declare the needed addresses/locations.

Later we check if there are enough arguments passed to the application, and, as we did in the previous parts, we get the addresses of the Master and the Storage. That’s basically all.

For the worker we will want a command line parameter to set the number of concurrent threads. That’s why we check if there are 3 Arguments.

Now let’s parse the thread count:

storageLocation = string(data)
if len(storageLocation) == 0 {
    fmt.Println("Error: can't get storage address. Length is zero.")
    return
}

threadCount, err := strconv.Atoi(os.Args[2])
if err != nil {
    fmt.Println("Error: Couldn't parse thread count.")
    return
}

We use the atoi function to parse the string argument to an int.

And now we come to the main for loop which runs on each thread:

myWG := sync.WaitGroup{}
myWG.Add(threadCount)
for i := 0; i < threadCount; i++ {
    go func() {
        for {
            //Do work...
        }
    }()
}
myWG.Wait()

Ok, what are we doing there? We create a waitGroup. The main function has to be waiting for the goroutines and not just finish execution, that’s why we create a waitGroup and add the thread count. You could add a functionality to break the endless for loop and after that use the Done() function on the waitgroup. We won’t be adding this as we just want endless for work loops.

Now we will write down the execution process for each Task.
First we get a new Task:

for {
    myTask, err := getNewTask(masterLocation)
    if err != nil {
        fmt.Println(err)
        fmt.Println("Waiting 2 second timeout...")
        time.Sleep(time.Second * 2)
        continue
    }

If we error, then we wait 2 seconds, so it doesn’t make a lot of errored requests to the master at once. As this could flood the other services.

If we successfully get the Task, then we get the image to work on from the storage:

myTask, err := getNewTask(masterLocation)
if err != nil {
    fmt.Println(err)
    fmt.Println("Waiting 2 second timeout...")
    time.Sleep(time.Second * 2)
    continue
}

myImage, err := getImageFromStorage(storageLocation, myTask)
if err != nil {
    fmt.Println(err)
    fmt.Println("Waiting 2 second timeout...")
    time.Sleep(time.Second * 2)
    continue
}

Ok, now it’s time to do something with the image!!!

myImage, err := getImageFromStorage(storageLocation, myTask)
if err != nil {
    fmt.Println(err)
    fmt.Println("Waiting 2 second timeout...")
    time.Sleep(time.Second * 2)
    continue
}

myImage = doWorkOnImage(myImage)

We now of course have to save the image back to the storage:

myImage = doWorkOnImage(myImage)

err = sendImageToStorage(storageLocation, myTask, myImage)
if err != nil {
    fmt.Println(err)
    fmt.Println("Waiting 2 second timeout...")
    time.Sleep(time.Second * 2)
    continue
}

And finally if that succeeds we register our successfully finished Task!

        err = sendImageToStorage(storageLocation, myTask, myImage)
        if err != nil {
            fmt.Println(err)
            fmt.Println("Waiting 2 second timeout...")
            time.Sleep(time.Second * 2)
            continue
        }

        err = registerFinishedTask(masterLocation, myTask)
        if err != nil {
            fmt.Println(err)
            fmt.Println("Waiting 2 second timeout...")
            time.Sleep(time.Second * 2)
            continue
        }
    }
}()

Ok, so now we can continue with implementing these functions. Let’s first implement the getNewTask function:

func getNewTask(masterAddress string) (Task, error) {
    response, err := http.Post("http://" + masterAddress + "/getNewTask", "text/plain", nil)
    if err != nil || response.StatusCode != http.StatusOK {
        return Task{-1, -1}, err
    }
    data, err := ioutil.ReadAll(response.Body)
    if err != nil {
        return Task{-1, -1}, err
    }

    myTask := Task{}
    err = json.Unmarshal(data, &myTask)
    if err != nil {
        return Task{-1, -1}, err
    }

    return myTask, nil
}

We make the request to the master and check if it was successful. We read the response body to memory and finally Unmarshal the response body to our Task structure. Finally we return it.

Now we can implement the function to get the image from storage!

func getImageFromStorage(storageAddress string, myTask Task) (image.Image, error) {
    response, err := http.Get("http://" + storageAddress + "/getImage?state=working&id=" + strconv.Itoa(myTask.Id))
    if err != nil || response.StatusCode != http.StatusOK {
        return nil, err
    }

    myImage, err := png.Decode(response.Body)
    if err != nil {
        return nil, err
    }

    return myImage, nil
}

We get the response whose body is the raw image, so we just Decode it and return it if we succeed.

Now that we have our image we can finally do some work on it:

func doWorkOnImage(myImage image.Image) image.Image {
    myCanvas := image.NewRGBA(myImage.Bounds())

    for i := 0; i < myCanvas.Rect.Max.X; i++ {
        for j := 0; j < myCanvas.Rect.Max.Y; j++ {
            r, g, b, _ := myImage.At(i, j).RGBA()
            myColor := new(color.RGBA)
            myColor.R = uint8(g)
            myColor.G = uint8(r)
            myColor.B = uint8(b)
            myColor.A = uint8(255)
            myCanvas.Set(i, j, myColor)
        }
    }

    return myCanvas.SubImage(myImage.Bounds())
}

The work is largely irrelevant, but I’ll explain it anyways. First we create a RGBA. That’s something like a canvas for drawing, and we create it with the size of our image. Later we draw on the canvas swapping the red with the green channel. Later we use the RGBA to return a new modified image, created from our canvas with the size of our original image.

After working on the image we have to send it back to the storage system. So let’s implement the sendImageToStorage function:

func sendImageToStorage(storageAddress string, myTask Task, myImage image.Image) error {
    data := []byte{}
    buffer := bytes.NewBuffer(data)
    err := png.Encode(buffer, myImage)
    if err != nil {
        return err
    }
    response, err := http.Post("http://" + storageAddress + "/sendImage?state=finished&id=" + strconv.Itoa(myTask.Id), "image/png", buffer)
    if err != nil || response.StatusCode != http.StatusOK {
        return err
    }

    return nil
}

We create a data byte slice, and from that a data buffer which allows us to use it as a readwriter interface. We then use this interface to encode our image to png into, and finally send it using a POST to the server. If everything works out, then we just return.

When we successfully saved the image, we can register to the Master that we finished the Task.

func registerFinishedTask(masterAddress string, myTask Task) error {
    response, err := http.Post("http://" + masterAddress + "/registerTaskFinished?id=" + strconv.Itoa(myTask.Id), "test/plain", nil)
    if err != nil || response.StatusCode != http.StatusOK {
        return err
    }

    return nil
}

Nothing fancy. Just sending a POST request to notify about finishing the task.

Ok, that’s all regarding the worker. You can turn it on and it will wait for Tasks to get. Which means we need to get the Tasks from the user to our service finally. And that leads us to…

The Frontend

This one will show the user the website and also parse the user form so the backend services only get the raw image data. It will be the only interface to our application for the user.

As usual, we’ll begin with the basic structure of the file:

package main

import (
    "net/http"
    "fmt"
    "io/ioutil"
    "os"
    "net/url"
    "io"
)

const indexPage = "<html><head><title>Upload file</title></head><body><form enctype=\"multipart/form-data\" action=\"submitTask\" method=\"post\"> <input type=\"file\" name=\"uploadfile\" /> <input type=\"submit\" value=\"upload\" /> </form> </body> </html>"

var keyValueStoreAddress string
var masterLocation string

func main() {
    if len(os.Args) < 2 {
        fmt.Println("Error: Too few arguments.")
        return
    }
    keyValueStoreAddress = os.Args[1]

    response, err := http.Get("http://" + keyValueStoreAddress + "/get?key=masterAddress")
    if response.StatusCode != http.StatusOK {
        fmt.Println("Error: can't get master address.")
        fmt.Println(response.Body)
        return
    }
    data, err := ioutil.ReadAll(response.Body)
    if err != nil {
        fmt.Println(err)
        return
    }
    masterLocation = string(data)
    if len(masterLocation) == 0 {
        fmt.Println("Error: can't get master address. Length is zero.")
        return
    }

    http.HandleFunc("/", handleIndex)
    http.HandleFunc("/submitTask", handleTask)
    http.HandleFunc("/isReady", handleCheckForReadiness)
    http.HandleFunc("/getImage", serveImage)
    http.ListenAndServe(":80", nil)
}

func handleIndex(w http.ResponseWriter, r *http.Request) {
}

func handleTask(w http.ResponseWriter, r *http.Request) {
}

func handleCheckForReadiness(w http.ResponseWriter, r *http.Request) {
}

func serveImage(w http.ResponseWriter, r *http.Request) {
}

We’ve got the code of our index web page here, and we also have the API declared. After starting the program we check if we have the k/v store address. If we do (we hope so), then we can get the master address from it.

Now we can go on to implementing the functions. We’ll start with the simples. The index handler:

func handleIndex(w http.ResponseWriter, r *http.Request) {
    fmt.Fprint(w, indexPage)
}

After writing this we can go on to writing the more complicated functions. We will start with the handleTask function. This function is responsible for parsing the user form and sending the raw image data to the master.

func handleTask(w http.ResponseWriter, r *http.Request) {
    if r.Method == http.MethodPost {
    err := r.ParseMultipartForm(10000000)
    if err != nil {
        w.WriteHeader(http.StatusBadRequest)
        fmt.Fprint(w, "Wrong input")
        return
    }
    file, _, err := r.FormFile("uploadfile")
    if err != nil {
        w.WriteHeader(http.StatusBadRequest)
        fmt.Fprint(w, "Wrong input")
        return
    }

Ok, what do we have here? We check the method as we always do, and later parse the multipart form. We’ve got a nice lovely magic number there. The number is responsible for setting the max size of the form held in RAM. The rest will be stored in temporary files. We later do the request using the file we got:

    if err != nil {
        w.WriteHeader(http.StatusBadRequest)
        fmt.Fprint(w, "Wrong input")
        return
    }

    response, err := http.Post("http://" + masterLocation + "/new", "image", file)
    if err != nil || response.StatusCode != http.StatusOK {
        w.WriteHeader(http.StatusBadRequest)
        fmt.Fprint(w, "Error:", err)
        return
    }

    data, err := ioutil.ReadAll(response.Body)
    if err != nil {
        w.WriteHeader(http.StatusBadRequest)
        fmt.Fprint(w, "Error:", err)
        return
    }

    fmt.Fprint(w, string(data))
} else {
    w.WriteHeader(http.StatusBadRequest)
    fmt.Fprint(w, "Error: Only POST accepted")
}

We send the user back the new Task id we got back from the master.

Now we can implement the function which will handle the request to check if the Task is finished and ready:

func handleCheckForReadiness(w http.ResponseWriter, r *http.Request) {
    if r.Method == http.MethodGet {
        values, err := url.ParseQuery(r.URL.RawQuery)
        if err != nil {
            fmt.Fprint(w, err)
            return
        }
        if len(values.Get("id")) == 0 {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Wrong input")
            return
        }

        response, err := http.Get("http://" + masterLocation + "/isReady?id=" + values.Get("id") + "&state=finished")
        if err != nil || response.StatusCode != http.StatusOK {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error:", err)
            return
        }

        data, err := ioutil.ReadAll(response.Body)
        if err != nil {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error:", err)
            return
        }

        switch string(data) {
        case "0":
            fmt.Fprint(w, "Your image is not ready yet.")
        case "1":
            fmt.Fprint(w, "Your image is ready.")
        default :
            fmt.Fprint(w, "Internal server error.")
        }
    } else {
        w.WriteHeader(http.StatusBadRequest)
        fmt.Fprint(w, "Error: Only GET accepted")
    }
}

Most of this is the same as the function in the master. We just check if the id is correct and make a request to the master. The interesting part is this:

switch string(data) {
case "0":
    fmt.Fprint(w, "Your image is not ready yet.")
case "1":
    fmt.Fprint(w, "Your image is ready.")
default :
    fmt.Fprint(w, "Internal server error.")
}

We cast the response body to a string, and if it matches 1 or 0 we answer. If it’s something else then we know something went really wrong, and send back an error.

Now we can get to the last function, the function to serve the actual images:

func serveImage(w http.ResponseWriter, r *http.Request) {
    if r.Method == http.MethodGet {
        values, err := url.ParseQuery(r.URL.RawQuery)
        if err != nil {
            fmt.Fprint(w, err)
            return
        }
        if len(values.Get("id")) == 0 {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Wrong input")
            return
        }

        response, err := http.Get("http://" + masterLocation + "/get?id=" + values.Get("id") + "&state=finished")
        if err != nil || response.StatusCode != http.StatusOK {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error:", err)
            return
        }

        _, err = io.Copy(w, response.Body)
        if err != nil {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error:", err)
            return
        }
    } else {
        w.WriteHeader(http.StatusBadRequest)
        fmt.Fprint(w, "Error: Only GET accepted")
    }
}

It just checks the id, sends the request, and copies the response to answer the user.

Conclusion

So now it’s all finished. You can start all applications and they will work. You can contact the frontend and make requests, they will all start working and you will get your modified images. Six microservices working beautifully together.

This may be the end of this series, so have fun extending this system alone.

I may add a finishing part about deployment to container infrastructues, or a another extending series about refactoring the system with 3rd party libraries in the future but I’m not sure.

All in all, good luck!

UPDATE: Also remember, that when running the worker, it’s good to launch it with goroutines in the number of 2-4x your system threads. They have near-0 overhead in switching and this way your not unnecessarily blocking when waiting for http responses.

Web app using Microservices in Go: Part 3 – Storage and Master

Previous part

Introduction

In this part we will implement the next part of the microservices needed for our web app. We will implement the:
* Storage system
* Master

This way we will have the Master API ready when we’ll be writing the slaves/workers and the frontend. And we’ll already have the database, k/v store and storage when writing the master. SO every time we write something we’ll already have all its dependencies.

The storage system

Ok, this one will be pretty easy to write. Just handling files. Let’s build the basic structure, which will include a function to register in our k/v store. For reference how it works check out the previous part. So here’s the basic structure:

package main

import (
    "fmt"
    "net/http"
    "io/ioutil"
    "os"
    "net/url"
    "io"
)

func main() {
    if !registerInKVStore() {
        return
    }
    http.HandleFunc("/sendImage", receiveImage)
    http.HandleFunc("/getImage", serveImage)
    http.ListenAndServe(":3002", nil)
}

func receiveImage(w http.ResponseWriter, r *http.Request) {
}

func serveImage(w http.ResponseWriter, r *http.Request) {
}

func registerInKVStore() bool {
    if len(os.Args) < 3 {
        fmt.Println("Error: Too few arguments.")
        return false
    }
    storageAddress := os.Args[1] // The address of itself
    keyValueStoreAddress := os.Args[2]

    response, err := http.Post("http://" + keyValueStoreAddress + "/set?key=storageAddress&value=" + storageAddress, "", nil)
    if err != nil {
        fmt.Println(err)
        return false
    }
    data, err := ioutil.ReadAll(response.Body)
    if err != nil {
        fmt.Println(err)
        return false
    }
    if response.StatusCode != http.StatusOK {
        fmt.Println("Error: Failure when contacting key-value store: ", string(data))
        return false
    }
    return true
}

So now we’ll have to handle the file serving/uploading. We will use a state url argument to specify if we are using the not yet finished (aka working) directory, or the finished one.

So first let’s write the receiveImage function which is there to get the files from clients:

func receiveImage(w http.ResponseWriter, r *http.Request) {
    if r.Method == http.MethodPost {
        values, err := url.ParseQuery(r.URL.RawQuery)
        if err != nil {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error:", err)
            return
        }
        if len(values.Get("id")) == 0 {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error:","Wrong input id.")
            return
        }
        if values.Get("state") != "working" && values.Get("state") != "finished" {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error:","Wrong input state.")
            return
        }
    } else {
        w.WriteHeader(http.StatusBadRequest)
        fmt.Fprint(w, "Error: Only POST accepted")
    }
}

Here we check if the request method is POST, if there is an id, and if the state is working or finished.

Next we can create the file and put in the image:

if values.Get("state") != "working" && values.Get("state") != "finished" {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error:","Wrong input state.")
            return
        }

        _, err = strconv.Atoi(values.Get("id"))
        if err != nil {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error:","Wrong input id.")
            return
        }

        file, err := os.Create("/tmp/" + values.Get("state") + "/" + values.Get("id") + ".png")
        defer file.Close()
        if err != nil {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error:", err)
            return
        }

        _, err = io.Copy(file, r.Body)
        if err != nil {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error:", err)
            return
        }

        fmt.Fprint(w, "success")

We create a file in the tmp/state directory with the right id. Another thing we do is check if the id really is a valid int. We parse it to an int, to see if it succeeds and if it does then we use it, as a string.

we use the io.Copy function to put all the data from the request to the file. That means that the body of our request should be a raw image.

Next we can write the function to serve images which is pretty similar:

func serveImage(w http.ResponseWriter, r *http.Request) {
    if r.Method == http.MethodGet {
        values, err := url.ParseQuery(r.URL.RawQuery)
        if err != nil {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error:", err)
            return
        }
        if len(values.Get("id")) == 0 {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error:","Wrong input id.")
            return
        }
        if values.Get("state") != "working" && values.Get("state") != "finished" {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error:","Wrong input state.")
            return
        }

        _, err = strconv.Atoi(values.Get("id"))
        if err != nil {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error:","Wrong input id.")
            return
        }

        file, err := os.Open("/tmp/" + values.Get("state") + "/" + values.Get("id") + ".png")
        defer file.Close()
        if err != nil {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error:", err)
            return
        }

        _, err = io.Copy(w, file)
        if err != nil {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error:", err)
            return
        }
    } else {
        w.WriteHeader(http.StatusBadRequest)
        fmt.Fprint(w, "Error: Only GET accepted")
    }
}

Instead of creating the file, we open it. Instead of copying to the file we copy from it. And we check if the method is GET.

That’s it. We’ve got a storage service which saves and servers raw image files. Now we can get to the master!

The master

We now have all the dependencies the master needs. So let’s write it now. Here’s the basic structure:

package main

import (
    "os"
    "fmt"
    "net/http"
    "io/ioutil"
)

type Task struct {
    Id int `json:"id"`
    State int `json:"state"`
}

var databaseLocation string
var storageLocation string

func main() {
    if !registerInKVStore() {
        return
    }

    http.HandleFunc("/new", newImage)
    http.HandleFunc("/get", getImage)
    http.HandleFunc("/isReady", isReady)
    http.HandleFunc("/getNewTask", getNewTask)
    http.HandleFunc("/registerTaskFinished", registerTaskFinished)
    http.ListenAndServe(":3003", nil)
}

func newImage(w http.ResponseWriter, r *http.Request) {
}

func getImage(w http.ResponseWriter, r *http.Request) {
}

func isReady(w http.ResponseWriter, r *http.Request) {
}

func getNewTask(w http.ResponseWriter, r *http.Request) {
}

func registerTaskFinished(w http.ResponseWriter, r *http.Request) {
}

func registerInKVStore() bool {
    if len(os.Args) < 3 {
        fmt.Println("Error: Too few arguments.")
        return false
    }
    masterAddress := os.Args[1] // The address of itself
    keyValueStoreAddress := os.Args[2]

    response, err := http.Post("http://" + keyValueStoreAddress + "/set?key=masterAddress&value=" + masterAddress, "", nil)
    if err != nil {
        fmt.Println(err)
        return false
    }
    data, err := ioutil.ReadAll(response.Body)
    if err != nil {
        fmt.Println(err)
        return false
    }
    if response.StatusCode != http.StatusOK {
        fmt.Println("Error: Failure when contacting key-value store: ", string(data))
        return false
    }
    return true
}

It’s the structure of the API and the mechanics to register in the k/v store.

We also need to get the storage and database locations in the main function:

if !registerInKVStore() {
        return
    }
    keyValueStoreAddress = os.Args[2]

    response, err := http.Get("http://" + keyValueStoreAddress + "/get?key=databaseAddress")
    if response.StatusCode != http.StatusOK {
        fmt.Println("Error: can't get database address.")
        fmt.Println(response.Body)
        return
    }
    data, err := ioutil.ReadAll(response.Body)
    if err != nil {
        fmt.Println(err)
        return
    }
    databaseLocation = string(data)

    response, err = http.Get("http://" + keyValueStoreAddress + "/get?key=storageAddress")
    if response.StatusCode != http.StatusOK {
        fmt.Println("Error: can't get storage address.")
        fmt.Println(response.Body)
        return
    }
    data, err = ioutil.ReadAll(response.Body)
    if err != nil {
        fmt.Println(err)
        return
    }
    storageLocation = string(data)

Now we can start implementing all the functionality!

Let’s start with the newImage function as it contains a good bit of code and mechanics which will be again used in the other funtions.
Here’s the beginning:

func newImage(w http.ResponseWriter, r *http.Request) {
    if r.Method == http.MethodPost {
        response, err := http.Post("http://" + databaseLocation + "/newTask", "text/plain", nil)
        if err != nil {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error:", err)
            return
        }
        id, err := ioutil.ReadAll(response.Body)
        if err != nil {
            fmt.Println(err)
            return
        }
    } else {
        w.WriteHeader(http.StatusBadRequest)
        fmt.Fprint(w, "Error: Only POST accepted")
    }
}

As usual we check if the method is right. Next we register a new Task in the database and get and Id.

We now use this to send the image to the storage:

id, err := ioutil.ReadAll(response.Body)
if err != nil {
    fmt.Println(err)
    return
}

_, err = http.Post("http://" + storageLocation + "/sendImage?id=" + string(id) + "&state=working", "image", r.Body)
if err != nil {
    w.WriteHeader(http.StatusBadRequest)
    fmt.Fprint(w, "Error:", err)
    return
}
fmt.Fprint(w, string(id))

That’s it. The new task will be created, the storage will get a file into the working directory with the name of the file being the id, and the client gets back the id. The important thing here is that we need the raw image in the request. The user form has to be parsed in the frontend service.

Now we can create the function which just checks if a Task is ready:

func isReady(w http.ResponseWriter, r *http.Request) {
    if r.Method == http.MethodGet {
        values, err := url.ParseQuery(r.URL.RawQuery)
        if err != nil {
            fmt.Fprint(w, err)
            return
        }
        if len(values.Get("id")) == 0 {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Wrong input")
            return
        }
    } else {
        w.WriteHeader(http.StatusBadRequest)
        fmt.Fprint(w, "Error: Only GET accepted")
    }
}

We first have to verify all the parameters and the request method. Next we can ask the database for the Task requested:

if len(values.Get("id")) == 0 {
    w.WriteHeader(http.StatusBadRequest)
    fmt.Fprint(w, "Wrong input")
    return
}

response, err := http.Get("http://" + databaseLocation + "/getById?id=" + values.Get("id"))
if err != nil {
    w.WriteHeader(http.StatusBadRequest)
    fmt.Fprint(w, "Error:", err)
    return
}
data, err := ioutil.ReadAll(response.Body)
if err != nil {
    fmt.Println(err)
    return
}

We also read the response immediately. Now we can parse the Task and respond to the client:

if err != nil {
    fmt.Println(err)
    return
}

myTask := Task{}
json.Unmarshal(data, &myTask)

if(myTask.State == 2) {
    fmt.Fprint(w, "1")
} else {
    fmt.Fprint(w, "0")
}

So now we can implement the last client facing interface, the getImage function:

if r.Method == http.MethodGet {
        values, err := url.ParseQuery(r.URL.RawQuery)
        if err != nil {
            fmt.Fprint(w, err)
            return
        }
        if len(values.Get("id")) == 0 {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Wrong input")
            return
        }
} else {
    w.WriteHeader(http.StatusBadRequest)
    fmt.Fprint(w, "Error: Only GET accepted")
}

Here we verified the request and now we need to get the image from the storage system, and just copy the response to our client:

if len(values.Get("id")) == 0 {
    w.WriteHeader(http.StatusBadRequest)
    fmt.Fprint(w, "Wrong input")
    return
}

response, err := http.Get("http://" + storageLocation + "/getImage?id=" + values.Get("id") + "&state=finished")
if err != nil {
    w.WriteHeader(http.StatusBadRequest)
    fmt.Fprint(w, "Error:", err)
    return
}

_, err = io.Copy(w, response.Body)
if err != nil {
    w.WriteHeader(http.StatusBadRequest)
    fmt.Fprint(w, "Error:", err)
    return
}

That’s it! The client facing interface is finished!

Implementing the worker facing interface

Now we have to implement the functions to serve the workers.

Both functions will basically be just direct routes to the database and back, so now let’s write ’em too:

func getNewTask(w http.ResponseWriter, r *http.Request) {
    if r.Method == http.MethodPost {
        response, err := http.Post("http://" + databaseLocation + "/getNewTask", "text/plain", nil)
        if err != nil {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error:", err)
            return
        }

        _, err = io.Copy(w, response.Body)
        if err != nil {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error:", err)
            return
        }
    } else {
        w.WriteHeader(http.StatusBadRequest)
        fmt.Fprint(w, "Error: Only POST accepted")
    }
}

func registerTaskFinished(w http.ResponseWriter, r *http.Request) {
    if r.Method == http.MethodPost {
        values, err := url.ParseQuery(r.URL.RawQuery)
        if err != nil {
            fmt.Fprint(w, err)
            return
        }
        if len(values.Get("id")) == 0 {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Wrong input")
            return
        }

        response, err := http.Post("http://" + databaseLocation + "/finishTask?id=" + values.Get("id"), "test/plain", nil)
        if err != nil {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error:", err)
            return
        }

        _, err = io.Copy(w, response.Body)
        if err != nil {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error:", err)
            return
        }
    } else {
        w.WriteHeader(http.StatusBadRequest)
        fmt.Fprint(w, "Error: Only POST accepted")
    }
}

There’s not much to explain. They are both just passing further the request and responding with what they get.

You could think the workers should communicate directly with the database to get new Tasks. And with the current implementation it would work perfectly. However, if we wanted to add some functionality the master wanted to do for each of those requests it would be hard to implement. So this way is very extensible, and that’s nearly always what we want.

Conclusion

Now we have finished the Master and the Storage system. We now have the dependencies to create the workers and frontend which we will implement in the next part. As always I encourage you to comment about your opinion. Have fun extending the system to do what you want to achieve!

Next part

Web app using Microservices in Go: Part 2 – k/v store and Database

Previous part

Introduction

In this part we will implement part of the microservices needed for our web app. We will implement the:
* key-value store
* Database

This will be a pretty code heavy tutorial so concentrate and have fun!

The key-value store

Design

The design hasn’t changed much. We will save the key-value pairs as a global map, and create a global mutex for concurrent access. We’ll also add the ability to list all key-value pairs for debugging/analytical purposes. We will also add the ability to delete existing entries.

First, let’s create the structure:

package main

import (
    "net/http"
    "sync"
    "net/url"
    "fmt"
)

var keyValueStore map[string]string
var kVStoreMutex sync.RWMutex

func main() {
    keyValueStore = make(map[string]string)
    kVStoreMutex = sync.RWMutex{}
    http.HandleFunc("/get", get)
    http.HandleFunc("/set", set)
    http.HandleFunc("/remove", remove)
    http.HandleFunc("/list", list)
    http.ListenAndServe(":3000", nil)
}

func get(w http.ResponseWriter, r *http.Request) {
}

func set(w http.ResponseWriter, r *http.Request) {
}

func remove(w http.ResponseWriter, r *http.Request) {
}

func list(w http.ResponseWriter, r *http.Request) {
}

And now let’s dive into the implementation.

First, we should add parameter parsing in the get function and verify that the key parameter is right.

func get(w http.ResponseWriter, r *http.Request) {
    if(r.Method == http.MethodGet) {
        values, err := url.ParseQuery(r.URL.RawQuery)
        if err != nil {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error:", err)
            return
        }
        if len(values.Get("key")) == 0 {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error:","Wrong input key.")
            return
        }
    } else {
        w.WriteHeader(http.StatusBadRequest)
        fmt.Fprint(w, "Error: Only GET accepted.")
    }
}

The key shouldn’t have a length of 0, hence the length check. We also check if the method is GET, if it isn’t we print it and set the status code to bad request.
We answer with an explicit Error: before each error message so it doesn’t get misinterpreted by the client as a value.

Now, let’s access our map and send back a response:

if len(values.Get("key")) == 0 {
    w.WriteHeader(http.StatusBadRequest)
    fmt.Fprint(w, "Error:","Wrong input key.")
    return
}

kVStoreMutex.RLock()
value := keyValueStore[string(values.Get("key"))]
kVStoreMutex.RUnlock()

fmt.Fprint(w, value)

We copy the value into a variable so that we don’t block the map while sending back the response.

Now let’s create the set function, it’s actually pretty similar.

func set(w http.ResponseWriter, r *http.Request) {
    if(r.Method == http.MethodPost) {
        values, err := url.ParseQuery(r.URL.RawQuery)
        if err != nil {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error:", err)
            return
        }
        if len(values.Get("key")) == 0 {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error:", "Wrong input key.")
            return
        }
        if len(values.Get("value")) == 0 {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error:", "Wrong input value.")
            return
        }

        kVStoreMutex.Lock()
        keyValueStore[string(values.Get("key"))] = string(values.Get("value"))
        kVStoreMutex.Unlock()

        fmt.Fprint(w, "success")
    } else {
        w.WriteHeader(http.StatusBadRequest)
        fmt.Fprint(w, "Error: Only POST accepted.")
    }
}

The only difference is that we also check if there is a right value parameter and check if the method is POST.

Now we can add the implementation of the list function which is also pretty simple:

func list(w http.ResponseWriter, r *http.Request) {
    if(r.Method == http.MethodGet) {
        kVStoreMutex.RLock()
        for key, value := range keyValueStore {
            fmt.Fprintln(w, key, ":", value)
        }
        kVStoreMutex.RUnlock()
    } else {
        w.WriteHeader(http.StatusBadRequest)
        fmt.Fprint(w, "Error: Only GET accepted.")
    }
}

It just ranges over the map and prints everything. Simple yet effective.

And to finish the key-value store we will implement the remove function:

func remove(w http.ResponseWriter, r *http.Request) {
    if(r.Method == http.MethodDelete) {
        values, err := url.ParseQuery(r.URL.RawQuery)
        if err != nil {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error:", err)
            return
        }
        if len(values.Get("key")) == 0 {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error:", "Wrong input key.")
            return
        }

        kVStoreMutex.Lock()
        delete(keyValueStore, values.Get("key"))
        kVStoreMutex.Unlock()

        fmt.Fprint(w, "success")
    } else {
        w.WriteHeader(http.StatusBadRequest)
        fmt.Fprint(w, "Error: Only DELETE accepted.")
    }
}

It’s the same as setting a value, but instead of setting it we delete it.

The database

Design

After thinking through the design, I decided that it would be better if the database generated the task Id‘s. This will also make it easier to get the last non-finished task and generate consecutive Id‘s

How it will work:
* It will save new tasks assigning consecutive Id‘s.
* It will allow to get a new task to do.
* It will allow to get a task by Id.
* It will allow to set a task by Id.
* The state will be represented by an int:
* 0 – not started
* 1 – in progress
* 2 – finished
* It will change the state of a task to not started if it’s been too long in progress. (maybe someone started to work on it but has crashed)
* It will allow to list all tasks for debugging/analytical purposes.

Database microservice post

Implementation

First, we should create the API and later we will add the implementations of the functionality as before with the key-value store. We will also need a global map being our data store, a variable pointing to the oldest not started task, and mutexes for accessing the datastore and pointer.

package main

import (
    "net/http"
    "net/url"
    "fmt"
)

type Task struct {
}

var datastore map[int]Task
var datastoreMutex sync.RWMutex
var oldestNotFinishedTask int // remember to account for potential int overflow in production. Use something bigger.
var oNFTMutex sync.RWMutex

func main() {

    datastore = make(map[int]Task)
    datastoreMutex = sync.RWMutex{}
    oldestNotFinishedTask = 0
    oNFTMutex = sync.RWMutex{}

    http.HandleFunc("/getById", getById)
    http.HandleFunc("/newTask", newTask)
    http.HandleFunc("/getNewTask", getNewTask)
    http.HandleFunc("/finishTask", finishTask)
    http.HandleFunc("/setById", setById)
    http.HandleFunc("/list", list)
    http.ListenAndServe(":3001", nil)
}

func getById(w http.ResponseWriter, r *http.Request) {
}

func newTask(w http.ResponseWriter, r *http.Request) {
}

func getNewTask(w http.ResponseWriter, r *http.Request) {
}

func finishTask(w http.ResponseWriter, r *http.Request) {
}

func setById(w http.ResponseWriter, r *http.Request) {
}

func list(w http.ResponseWriter, r *http.Request) {
}

We also already declared the Task type which we will use for storage.

So far so good. Now let’s implement all those functions!

First, let’s implement the getById function.

func getById(w http.ResponseWriter, r *http.Request) {
    if r.Method == http.MethodGet {
        values, err := url.ParseQuery(r.URL.RawQuery)
        if err != nil {
            fmt.Fprint(w, err)
            return
        }
        if len(values.Get("id")) == 0 {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Wrong input")
            return
        }

        id, err := strconv.Atoi(string(values.Get("id")))
        if err != nil {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, err)
            return
        }

        datastoreMutex.RLock()
        bIsInError := err != nil || id >= len(datastore) // Reading the length of a slice must be done in a synchronized manner. That's why the mutex is used.
        datastoreMutex.RUnlock()

        if bIsInError {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Wrong input")
            return
        }

        datastoreMutex.RLock()
        value := datastore[id]
        datastoreMutex.RUnlock()

        response, err := json.Marshal(value)

        if err != nil {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, err)
            return
        }

        fmt.Fprint(w, string(response))
    } else {
        w.WriteHeader(http.StatusBadRequest)
        fmt.Fprint(w, "Error: Only GET accepted")
    }
}

We check if the GET method has been used. Later we parse the id argument and check if it’s proper. We then get the id as an int using the strconv.Atoi function. Next we make sure it is not out of bounds for our datastore, which we have to do using mutexes because we’re accessing a map which could be accessed from another thread. If everything is ok, then, again using mutexes, we get the task using the id.

After that we use the JSON library to marshal our struct into a JSON object and if that finishes without problems we send the JSON object to the client.

It’s also time to implement our Task struct:

type Task struct {
    Id int `json:"id"`
    State int `json:"state"`
}

It’s all that’s needed. We also added the information the JSON marshaller needs.

We can now go on with implementing the newTask function:

func newTask(w http.ResponseWriter, r *http.Request) {
    if r.Method == http.MethodPost {
        datastoreMutex.Lock()
        taskToAdd := Task{
            Id: len(datastore),
            State: 0,
        }
        datastore[taskToAdd.Id] = taskToAdd
        datastoreMutex.Unlock()

        fmt.Fprint(w, taskToAdd.Id)
    } else {
        w.WriteHeader(http.StatusBadRequest)
        fmt.Fprint(w, "Error: Only POST accepted")
    }
}

It’s pretty small actually. Creating a new Task with the next id and adding it to the datastore. After that it sends back the new Tasks Id.

That means we can go on to implementing the function used to list all Tasks, as this helps with debugging during writing.

It’s basically the same as with the key-value store:

func list(w http.ResponseWriter, r *http.Request) {
    if r.Method == http.MethodGet {
        datastoreMutex.RLock()
        for key, value := range datastore {
            fmt.Fprintln(w, key, ": ", "id:", value.Id, " state:", value.State)
        }
        datastoreMutex.RUnlock()
    } else {
        w.WriteHeader(http.StatusBadRequest)
        fmt.Fprint(w, "Error: Only GET accepted")
    }
}

Ok, so now we will implement the function which can set the Task by id:

func setById(w http.ResponseWriter, r *http.Request) {
    if r.Method == http.MethodPost {
        taskToSet := Task{}

        data, err := ioutil.ReadAll(r.Body)
        if err != nil {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, err)
            return
        }
        err = json.Unmarshal([]byte(data), &taskToSet)
        if err != nil {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, err)
            return
        }

        bErrored := false
        datastoreMutex.Lock()
        if taskToSet.Id >= len(datastore) || taskToSet.State > 2 || taskToSet.State < 0 {
            bErrored = true
        } else {
            datastore[taskToSet.Id] = taskToSet
        }
        datastoreMutex.Unlock()

        if bErrored {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error: Wrong input")
            return
        }

        fmt.Fprint(w, "success")
    } else {
        w.WriteHeader(http.StatusBadRequest)
        fmt.Fprint(w, "Error: Only POST accepted")
    }
}

Nothing new. We get the request and try to unmarshal it. If it succeeds we put it into the map, checking if it isn’t out of bounds or if the state is invalid. If it is then we print an error, otherwise we print success.

If we already have this we can now implement the finish task function, because it’s very simple:

func finishTask(w http.ResponseWriter, r *http.Request) {
    if r.Method == http.MethodPost {
        values, err := url.ParseQuery(r.URL.RawQuery)
        if err != nil {
            fmt.Fprint(w, err)
            return
        }
        if len(values.Get("id")) == 0 {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Wrong input")
            return
        }

        id, err := strconv.Atoi(string(values.Get("id")))

        if err != nil {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, err)
            return
        }

        updatedTask := Task{Id: id, State: 2}

        bErrored := false

        datastoreMutex.Lock()
        if datastore[id].State == 1 {
            datastore[id] = updatedTask
        } else {
            bErrored = true
        }
        datastoreMutex.Unlock()

        if bErrored {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error: Wrong input")
            return
        }

        fmt.Fprint(w, "success")
    } else {
        w.WriteHeader(http.StatusBadRequest)
        fmt.Fprint(w, "Error: Only POST accepted")
    }
}

It’s pretty similar to the getById function. The difference here is that here we update the state and only if it is currently in progress.

And now to one of the most interesting functions. The getNewTask function. It has to handle updating the oldest known finished task, and it also needs to handle the situation when someone takes a task but crashes during work. This would lead to a ghost task forever being in progress. That’s why we’ll add functionality which after 120 seconds from starting a task will set it back to not started:

func getNewTask(w http.ResponseWriter, r *http.Request) {
    if r.Method == http.MethodPost {

        bErrored := false

        datastoreMutex.RLock()
        if len(datastore) == 0 {
            bErrored = true
        }
        datastoreMutex.RUnlock()

        if bErrored {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error: No non-started task.")
            return
        }

        taskToSend := Task{Id: -1, State: 0}

        oNFTMutex.Lock()
        datastoreMutex.Lock()
        for i := oldestNotFinishedTask; i < len(datastore); i++ {
            if datastore[i].State == 2 && i == oldestNotFinishedTask {
                oldestNotFinishedTask++
                continue
            }
            if datastore[i].State == 0 {
                datastore[i] = Task{Id: i, State: 1}
                taskToSend = datastore[i]
                break
            }
        }
        datastoreMutex.Unlock()
        oNFTMutex.Unlock()

        if taskToSend.Id == -1 {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error: No non-started task.")
            return
        }

        myId := taskToSend.Id

        go func() {
            time.Sleep(time.Second * 120)
            datastoreMutex.Lock()
            if datastore[myId].State == 1 {
                datastore[myId] = Task{Id: myId, State: 0}
            }
            datastoreMutex.Unlock()
        }()

        response, err := json.Marshal(taskToSend)

        if err != nil {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, err)
            return
        }

        fmt.Fprint(w, string(response))
    } else {
        w.WriteHeader(http.StatusBadRequest)
        fmt.Fprint(w, "Error: Only POST accepted")
    }
}

First we try to find the oldest task that hasn’t started yet. By the way we update the oldestNotFinishedTask variable. If a task is finished and is pointed on by the variable, the variable get’s incremented. If we find something that’s not started, then we break out of the loop and send it back to the user setting it to in progress. However, on the way we start a function on another thread that will change the state of the task back to not started if it’s still in progress after 120 seconds.

Now the last thing. A database is useless… when you don’t know where it is! That’s why we’ll now implement the mechanism that the database will use to register itself in the key-value store:

func main() {

    if !registerInKVStore() {
        return
    }

    datastore = make(map[int]Task)

and later we define the function:

func registerInKVStore() bool {
    if len(os.Args) < 3 {
        fmt.Println("Error: Too few arguments.")
        return false
    }
    databaseAddress := os.Args[1] // The address of itself
    keyValueStoreAddress := os.Args[2]

    response, err := http.Post("http://" + keyValueStoreAddress + "/set?key=databaseAddress&value=" + databaseAddress, "", nil)
    if err != nil {
        fmt.Println(err)
        return false
    }
    data, err := ioutil.ReadAll(response.Body)
    if err != nil {
        fmt.Println(err)
        return false
    }
    if response.StatusCode != http.StatusOK {
        fmt.Println("Error: Failure when contacting key-value store: ", string(data))
        return false
    }
    return true
}

We check if there are at least 3 arguments. (The first being the executable) We read the current database address from the second argument and the key-value store address from the third argument. We use them to make a POST request where we add a databaseAddress key to the k/v store and set its value to the current database address. If the status code of the response isn’t OK then we know we messed up and we print the error we got. After that we quit the program.

Conclusion

We now have finished our k/v store and our database. You can even test them now using a REST client. (I used this one.) Remember that the code is subject to change if it will be necessary but I don’t think so. I hope you enjoyed the tutorial! I encourage you to comment, and if you have an opposing view to mine please make sure to express it in a comment too!

UPDATE: I changed the sync.Mutex to sync.RWMutex, and in the places where we only read data I changed mutex.Lock/Unlock to mutex.RLock/RUnlock.

UPDATE2: For some reason I used a slice for the database code although I tested with a map. Sorry for that, corrected it already.

Next part

Web app using Microservices in Go: Part 1 – Design

Introduction

Recently it’s a constantly repeated buzzword – Microservices. You can love ’em or hate ’em, but you really shouldn’t ignore ’em. In this short series we’ll create a web app using a microservice architecture. We’ll try not to use 3rd party tools and libraries. Remember though that when creating a production web app it is highly recommended to use 3rd party libraries (even if only to save you time).

We will create the various components in a basic form. We won’t use advanced caching or use a database. We will create a basic key-value store and a simple storage service. We will use the Go language for all this.

UPDATE: as there are comments regarding overcomplication: this is meant to show a scalable and working skeleton for a microservice architecture. If you only want to add some filters to photos, don’t design it like that. It’s overkill.

On further thought and another comment, (Which you can find on the golang Reddit) do design it this way. Software usually lives much longer than we think it will, and such a design will lead to an easily extendable and scalable web app.

The functionality

First we should decide what our web app will do. The web app we’ll create in this series will get an image from a user and give back an unique ID. The image will get modified using complicated and highly sophisticated algorithms, like swapping the blue and red channel, and the user will be able to use the ID to check if the work on the image has been finished already or if it’s still in progress. If it’s finished he will be able to download the altered image.

Designing the architecture

We want the architecture to be microservices, so we should design it like that. We’ll for sure need a service facing the user, the one that provides the interface for communication with our app. This could also handle authentication, and should be used as the service redirecting the workload to the right sub-services. (useful if you plan to integrate more funcionality into the app)

We will also want a microservice which will handle all our images. It will get the image, generate an ID, store information related to each task, and save the images. To handle high workloads it’s a good idea to use a master-slave system for our image modification service. The image handler will be the master, and we will create slave microservices which will ask the master for images to work on.

We will also need a key-value datastore for various configuration, a storage system, for saving our images, pre- and post-modification, and a database-ish service holding the information about each task.

This should suffice to begin with.

Here I’d like to also state that the architecture could change during the series if needed. And I encourage you to comment if you think that something could be done better.

Communication

We will also need to define the method the services communicate by. In this app we will use REST everywhere. You could also use a message BUS or Remote Procedure Calls – short RPC, but I won’t write about them here.

Designing the microservice API’s

Another important thing is to design the API‘s of you microservices. We will now design each of them to get an understanding about what they are for.

The key-value store

This one’s mainly for configuration. It will have a simple post-get interface:

  • POST:
    • Arguments:
      • Key
      • Value
    • Response:
      • Success/Failure
  • GET:
    • Arguments:
      • Key
    • Response:
      • Value/Failure

The storage

Here we will store the images, again using a key-value interface and an argument stating if this one’s pre- or post-modification. For the sake of simplicity we will just save the image to a folder named, depending on the state of the image, finished/inProgress.

  • POST:
    • Arguments:
      • Key
      • State: pre-/post-modification
      • Data
    • Response:
      • Success/Failure
  • GET:
    • Arguments:
      • Key
      • State: pre-/post-modification
    • Response:
      • Data/Failure

Database

This one will save our tasks. If they are waiting to start, in progress or finished, their Id.

  • POST:
    • Arguments:
      • TaskId
      • State: not started/ in progress/ finished
    • Response:
      • Success/Failure
  • GET:
    • Arguments:
      • TaskId
    • Response:
      • State/Failure
  • GET:
    • Path:
      • not started/ in progress/ finished
    • Reponse:
      • list of TaskId’s

The Frontend

The frontend is there mainly to provide a communication way between the various services and the user. It can also be used for authentication and authorization.

  • POST:
    • Path:
      • newImage
    • Arguments:
      • Data
    • Response:
      • Id
  • GET:
    • Path:
      • image/isReady
    • Arguments:
      • Id
    • Response:
      • not found/ in progress / finished
  • GET:
    • Path:
      • image/get
    • Arguments:
      • Id
    • Response:
      • Data

Image master microservice

This one will get new images from the fronted/user and send them to the storage service. It will also create a new task in the database, and orchestrate the workers who can ask for work and notify when it’s finished.

  • Frontend interface:
    • POST:
      • Path:
        • newImage
      • Arguments:
        • Data
      • Response:
        • Id
    • GET:
      • Path:
        • isReady
      • Arguments:
        • Id
      • Response:
        • not found/ in progress / finished
    • GET:
      • Path:
        • get
      • Arguments:
        • Id
      • Response:
        • Data/Failure
  • Worker interface:
    • GET:
      • Path:
        • getWork
      • Response:
        • Id/noWorkToDo
    • POST:
      • Path:
        • workFinished
      • Arguments:
        • Id
      • Response:
        • Success/Failure

Image worker microservice

This one doesn’t have any API. It is a client to the master image service, which he finds using the key-value store. He gets the image data to work on from the storage service.

Scheme

The microservice architecure design

Conclusion

This is basically everything regarding the design. In the next part we will write part of the microservices. Again, I encourage you to comment expressing what you think about this design!

Next part