Web app using Microservices in Go: Part 4 – Worker and Frontend

Previous part

Introduction

In this part we will finally finish writing our application. We will implement the last two services:
1. The Worker
2. The Frontend

The Worker

The worker will communicate with the Master to get new Tasks. When it gets a Task it will get the corresponding data from the storage and will start working on the task. When it finishes it will send the finished data to the storage service, and if that succeeds it will register the Task as finished to the Master.

That means that you can easily scale the workforce, by turning on additional machines, with the worker service on them. Easy scaling is good!

Implementation

As usual we will start with a basic structure which is similar to the structure of our previous services. Although there is one big difference. There won’t be any API here as the worker will be a client. You could, if you wanted, add an API for debugging purposes. Things like getting the processor usage. But this can also be implemented using 3rd party health checking services.

So here’s our basic structure:

package main

import (
    "os"
    "fmt"
    "net/http"
    "io/ioutil"
    "encoding/json"
    "time"
    "strconv"
    "image"
    "image/png"
    "image/color"
    "bytes"
    "sync"
)

type Task struct {
    Id int `json:"id"`
    State int `json:"state"`
}

var masterLocation string
var storageLocation string
var keyValueStoreAddress string

func main() {
    if len(os.Args) < 3 {
        fmt.Println("Error: Too few arguments.")
        return
    }
    keyValueStoreAddress = os.Args[1]

    response, err := http.Get("http://" + keyValueStoreAddress + "/get?key=masterAddress")
    if response.StatusCode != http.StatusOK {
        fmt.Println("Error: can't get master address.")
        fmt.Println(response.Body)
        return
    }
    data, err := ioutil.ReadAll(response.Body)
    if err != nil {
        fmt.Println(err)
        return
    }
    masterLocation = string(data)
    if len(masterLocation) == 0 {
        fmt.Println("Error: can't get master address. Length is zero.")
        return
    }

    response, err = http.Get("http://" + keyValueStoreAddress + "/get?key=storageAddress")
    if response.StatusCode != http.StatusOK {
        fmt.Println("Error: can't get storage address.")
        fmt.Println(response.Body)
        return
    }
    data, err = ioutil.ReadAll(response.Body)
    if err != nil {
        fmt.Println(err)
        return
    }
    storageLocation = string(data)
    if len(storageLocation) == 0 {
        fmt.Println("Error: can't get storage address. Length is zero.")
        return
    }
}

func getNewTask(masterAddress string) (Task, error) {
}
func getImageFromStorage(storageAddress string, myTask Task) (image.Image, error) {
}
func doWorkOnImage(myImage image.Image) image.Image {
}
func sendImageToStorage(storageAddress string, myTask Task, myImage image.Image) error {
}
func registerFinishedTask(masterAddress string, myTask Task) error {
}

It may seem to be much but there is nothing new actually in the most dense parts. We define the Task structure first and declare the needed addresses/locations.

Later we check if there are enough arguments passed to the application, and, as we did in the previous parts, we get the addresses of the Master and the Storage. That’s basically all.

For the worker we will want a command line parameter to set the number of concurrent threads. That’s why we check if there are 3 Arguments.

Now let’s parse the thread count:

storageLocation = string(data)
if len(storageLocation) == 0 {
    fmt.Println("Error: can't get storage address. Length is zero.")
    return
}

threadCount, err := strconv.Atoi(os.Args[2])
if err != nil {
    fmt.Println("Error: Couldn't parse thread count.")
    return
}

We use the atoi function to parse the string argument to an int.

And now we come to the main for loop which runs on each thread:

myWG := sync.WaitGroup{}
myWG.Add(threadCount)
for i := 0; i < threadCount; i++ {
    go func() {
        for {
            //Do work...
        }
    }()
}
myWG.Wait()

Ok, what are we doing there? We create a waitGroup. The main function has to be waiting for the goroutines and not just finish execution, that’s why we create a waitGroup and add the thread count. You could add a functionality to break the endless for loop and after that use the Done() function on the waitgroup. We won’t be adding this as we just want endless for work loops.

Now we will write down the execution process for each Task.
First we get a new Task:

for {
    myTask, err := getNewTask(masterLocation)
    if err != nil {
        fmt.Println(err)
        fmt.Println("Waiting 2 second timeout...")
        time.Sleep(time.Second * 2)
        continue
    }

If we error, then we wait 2 seconds, so it doesn’t make a lot of errored requests to the master at once. As this could flood the other services.

If we successfully get the Task, then we get the image to work on from the storage:

myTask, err := getNewTask(masterLocation)
if err != nil {
    fmt.Println(err)
    fmt.Println("Waiting 2 second timeout...")
    time.Sleep(time.Second * 2)
    continue
}

myImage, err := getImageFromStorage(storageLocation, myTask)
if err != nil {
    fmt.Println(err)
    fmt.Println("Waiting 2 second timeout...")
    time.Sleep(time.Second * 2)
    continue
}

Ok, now it’s time to do something with the image!!!

myImage, err := getImageFromStorage(storageLocation, myTask)
if err != nil {
    fmt.Println(err)
    fmt.Println("Waiting 2 second timeout...")
    time.Sleep(time.Second * 2)
    continue
}

myImage = doWorkOnImage(myImage)

We now of course have to save the image back to the storage:

myImage = doWorkOnImage(myImage)

err = sendImageToStorage(storageLocation, myTask, myImage)
if err != nil {
    fmt.Println(err)
    fmt.Println("Waiting 2 second timeout...")
    time.Sleep(time.Second * 2)
    continue
}

And finally if that succeeds we register our successfully finished Task!

        err = sendImageToStorage(storageLocation, myTask, myImage)
        if err != nil {
            fmt.Println(err)
            fmt.Println("Waiting 2 second timeout...")
            time.Sleep(time.Second * 2)
            continue
        }

        err = registerFinishedTask(masterLocation, myTask)
        if err != nil {
            fmt.Println(err)
            fmt.Println("Waiting 2 second timeout...")
            time.Sleep(time.Second * 2)
            continue
        }
    }
}()

Ok, so now we can continue with implementing these functions. Let’s first implement the getNewTask function:

func getNewTask(masterAddress string) (Task, error) {
    response, err := http.Post("http://" + masterAddress + "/getNewTask", "text/plain", nil)
    if err != nil || response.StatusCode != http.StatusOK {
        return Task{-1, -1}, err
    }
    data, err := ioutil.ReadAll(response.Body)
    if err != nil {
        return Task{-1, -1}, err
    }

    myTask := Task{}
    err = json.Unmarshal(data, &myTask)
    if err != nil {
        return Task{-1, -1}, err
    }

    return myTask, nil
}

We make the request to the master and check if it was successful. We read the response body to memory and finally Unmarshal the response body to our Task structure. Finally we return it.

Now we can implement the function to get the image from storage!

func getImageFromStorage(storageAddress string, myTask Task) (image.Image, error) {
    response, err := http.Get("http://" + storageAddress + "/getImage?state=working&id=" + strconv.Itoa(myTask.Id))
    if err != nil || response.StatusCode != http.StatusOK {
        return nil, err
    }

    myImage, err := png.Decode(response.Body)
    if err != nil {
        return nil, err
    }

    return myImage, nil
}

We get the response whose body is the raw image, so we just Decode it and return it if we succeed.

Now that we have our image we can finally do some work on it:

func doWorkOnImage(myImage image.Image) image.Image {
    myCanvas := image.NewRGBA(myImage.Bounds())

    for i := 0; i < myCanvas.Rect.Max.X; i++ {
        for j := 0; j < myCanvas.Rect.Max.Y; j++ {
            r, g, b, _ := myImage.At(i, j).RGBA()
            myColor := new(color.RGBA)
            myColor.R = uint8(g)
            myColor.G = uint8(r)
            myColor.B = uint8(b)
            myColor.A = uint8(255)
            myCanvas.Set(i, j, myColor)
        }
    }

    return myCanvas.SubImage(myImage.Bounds())
}

The work is largely irrelevant, but I’ll explain it anyways. First we create a RGBA. That’s something like a canvas for drawing, and we create it with the size of our image. Later we draw on the canvas swapping the red with the green channel. Later we use the RGBA to return a new modified image, created from our canvas with the size of our original image.

After working on the image we have to send it back to the storage system. So let’s implement the sendImageToStorage function:

func sendImageToStorage(storageAddress string, myTask Task, myImage image.Image) error {
    data := []byte{}
    buffer := bytes.NewBuffer(data)
    err := png.Encode(buffer, myImage)
    if err != nil {
        return err
    }
    response, err := http.Post("http://" + storageAddress + "/sendImage?state=finished&id=" + strconv.Itoa(myTask.Id), "image/png", buffer)
    if err != nil || response.StatusCode != http.StatusOK {
        return err
    }

    return nil
}

We create a data byte slice, and from that a data buffer which allows us to use it as a readwriter interface. We then use this interface to encode our image to png into, and finally send it using a POST to the server. If everything works out, then we just return.

When we successfully saved the image, we can register to the Master that we finished the Task.

func registerFinishedTask(masterAddress string, myTask Task) error {
    response, err := http.Post("http://" + masterAddress + "/registerTaskFinished?id=" + strconv.Itoa(myTask.Id), "test/plain", nil)
    if err != nil || response.StatusCode != http.StatusOK {
        return err
    }

    return nil
}

Nothing fancy. Just sending a POST request to notify about finishing the task.

Ok, that’s all regarding the worker. You can turn it on and it will wait for Tasks to get. Which means we need to get the Tasks from the user to our service finally. And that leads us to…

The Frontend

This one will show the user the website and also parse the user form so the backend services only get the raw image data. It will be the only interface to our application for the user.

As usual, we’ll begin with the basic structure of the file:

package main

import (
    "net/http"
    "fmt"
    "io/ioutil"
    "os"
    "net/url"
    "io"
)

const indexPage = "<html><head><title>Upload file</title></head><body><form enctype=\"multipart/form-data\" action=\"submitTask\" method=\"post\"> <input type=\"file\" name=\"uploadfile\" /> <input type=\"submit\" value=\"upload\" /> </form> </body> </html>"

var keyValueStoreAddress string
var masterLocation string

func main() {
    if len(os.Args) < 2 {
        fmt.Println("Error: Too few arguments.")
        return
    }
    keyValueStoreAddress = os.Args[1]

    response, err := http.Get("http://" + keyValueStoreAddress + "/get?key=masterAddress")
    if response.StatusCode != http.StatusOK {
        fmt.Println("Error: can't get master address.")
        fmt.Println(response.Body)
        return
    }
    data, err := ioutil.ReadAll(response.Body)
    if err != nil {
        fmt.Println(err)
        return
    }
    masterLocation = string(data)
    if len(masterLocation) == 0 {
        fmt.Println("Error: can't get master address. Length is zero.")
        return
    }

    http.HandleFunc("/", handleIndex)
    http.HandleFunc("/submitTask", handleTask)
    http.HandleFunc("/isReady", handleCheckForReadiness)
    http.HandleFunc("/getImage", serveImage)
    http.ListenAndServe(":80", nil)
}

func handleIndex(w http.ResponseWriter, r *http.Request) {
}

func handleTask(w http.ResponseWriter, r *http.Request) {
}

func handleCheckForReadiness(w http.ResponseWriter, r *http.Request) {
}

func serveImage(w http.ResponseWriter, r *http.Request) {
}

We’ve got the code of our index web page here, and we also have the API declared. After starting the program we check if we have the k/v store address. If we do (we hope so), then we can get the master address from it.

Now we can go on to implementing the functions. We’ll start with the simples. The index handler:

func handleIndex(w http.ResponseWriter, r *http.Request) {
    fmt.Fprint(w, indexPage)
}

After writing this we can go on to writing the more complicated functions. We will start with the handleTask function. This function is responsible for parsing the user form and sending the raw image data to the master.

func handleTask(w http.ResponseWriter, r *http.Request) {
    if r.Method == http.MethodPost {
    err := r.ParseMultipartForm(10000000)
    if err != nil {
        w.WriteHeader(http.StatusBadRequest)
        fmt.Fprint(w, "Wrong input")
        return
    }
    file, _, err := r.FormFile("uploadfile")
    if err != nil {
        w.WriteHeader(http.StatusBadRequest)
        fmt.Fprint(w, "Wrong input")
        return
    }

Ok, what do we have here? We check the method as we always do, and later parse the multipart form. We’ve got a nice lovely magic number there. The number is responsible for setting the max size of the form held in RAM. The rest will be stored in temporary files. We later do the request using the file we got:

    if err != nil {
        w.WriteHeader(http.StatusBadRequest)
        fmt.Fprint(w, "Wrong input")
        return
    }

    response, err := http.Post("http://" + masterLocation + "/new", "image", file)
    if err != nil || response.StatusCode != http.StatusOK {
        w.WriteHeader(http.StatusBadRequest)
        fmt.Fprint(w, "Error:", err)
        return
    }

    data, err := ioutil.ReadAll(response.Body)
    if err != nil {
        w.WriteHeader(http.StatusBadRequest)
        fmt.Fprint(w, "Error:", err)
        return
    }

    fmt.Fprint(w, string(data))
} else {
    w.WriteHeader(http.StatusBadRequest)
    fmt.Fprint(w, "Error: Only POST accepted")
}

We send the user back the new Task id we got back from the master.

Now we can implement the function which will handle the request to check if the Task is finished and ready:

func handleCheckForReadiness(w http.ResponseWriter, r *http.Request) {
    if r.Method == http.MethodGet {
        values, err := url.ParseQuery(r.URL.RawQuery)
        if err != nil {
            fmt.Fprint(w, err)
            return
        }
        if len(values.Get("id")) == 0 {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Wrong input")
            return
        }

        response, err := http.Get("http://" + masterLocation + "/isReady?id=" + values.Get("id") + "&state=finished")
        if err != nil || response.StatusCode != http.StatusOK {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error:", err)
            return
        }

        data, err := ioutil.ReadAll(response.Body)
        if err != nil {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error:", err)
            return
        }

        switch string(data) {
        case "0":
            fmt.Fprint(w, "Your image is not ready yet.")
        case "1":
            fmt.Fprint(w, "Your image is ready.")
        default :
            fmt.Fprint(w, "Internal server error.")
        }
    } else {
        w.WriteHeader(http.StatusBadRequest)
        fmt.Fprint(w, "Error: Only GET accepted")
    }
}

Most of this is the same as the function in the master. We just check if the id is correct and make a request to the master. The interesting part is this:

switch string(data) {
case "0":
    fmt.Fprint(w, "Your image is not ready yet.")
case "1":
    fmt.Fprint(w, "Your image is ready.")
default :
    fmt.Fprint(w, "Internal server error.")
}

We cast the response body to a string, and if it matches 1 or 0 we answer. If it’s something else then we know something went really wrong, and send back an error.

Now we can get to the last function, the function to serve the actual images:

func serveImage(w http.ResponseWriter, r *http.Request) {
    if r.Method == http.MethodGet {
        values, err := url.ParseQuery(r.URL.RawQuery)
        if err != nil {
            fmt.Fprint(w, err)
            return
        }
        if len(values.Get("id")) == 0 {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Wrong input")
            return
        }

        response, err := http.Get("http://" + masterLocation + "/get?id=" + values.Get("id") + "&state=finished")
        if err != nil || response.StatusCode != http.StatusOK {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error:", err)
            return
        }

        _, err = io.Copy(w, response.Body)
        if err != nil {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error:", err)
            return
        }
    } else {
        w.WriteHeader(http.StatusBadRequest)
        fmt.Fprint(w, "Error: Only GET accepted")
    }
}

It just checks the id, sends the request, and copies the response to answer the user.

Conclusion

So now it’s all finished. You can start all applications and they will work. You can contact the frontend and make requests, they will all start working and you will get your modified images. Six microservices working beautifully together.

This may be the end of this series, so have fun extending this system alone.

I may add a finishing part about deployment to container infrastructues, or a another extending series about refactoring the system with 3rd party libraries in the future but I’m not sure.

All in all, good luck!

UPDATE: Also remember, that when running the worker, it’s good to launch it with goroutines in the number of 2-4x your system threads. They have near-0 overhead in switching and this way your not unnecessarily blocking when waiting for http responses.

Web app using Microservices in Go: Part 3 – Storage and Master

Previous part

Introduction

In this part we will implement the next part of the microservices needed for our web app. We will implement the:
* Storage system
* Master

This way we will have the Master API ready when we’ll be writing the slaves/workers and the frontend. And we’ll already have the database, k/v store and storage when writing the master. SO every time we write something we’ll already have all its dependencies.

The storage system

Ok, this one will be pretty easy to write. Just handling files. Let’s build the basic structure, which will include a function to register in our k/v store. For reference how it works check out the previous part. So here’s the basic structure:

package main

import (
    "fmt"
    "net/http"
    "io/ioutil"
    "os"
    "net/url"
    "io"
)

func main() {
    if !registerInKVStore() {
        return
    }
    http.HandleFunc("/sendImage", receiveImage)
    http.HandleFunc("/getImage", serveImage)
    http.ListenAndServe(":3002", nil)
}

func receiveImage(w http.ResponseWriter, r *http.Request) {
}

func serveImage(w http.ResponseWriter, r *http.Request) {
}

func registerInKVStore() bool {
    if len(os.Args) < 3 {
        fmt.Println("Error: Too few arguments.")
        return false
    }
    storageAddress := os.Args[1] // The address of itself
    keyValueStoreAddress := os.Args[2]

    response, err := http.Post("http://" + keyValueStoreAddress + "/set?key=storageAddress&value=" + storageAddress, "", nil)
    if err != nil {
        fmt.Println(err)
        return false
    }
    data, err := ioutil.ReadAll(response.Body)
    if err != nil {
        fmt.Println(err)
        return false
    }
    if response.StatusCode != http.StatusOK {
        fmt.Println("Error: Failure when contacting key-value store: ", string(data))
        return false
    }
    return true
}

So now we’ll have to handle the file serving/uploading. We will use a state url argument to specify if we are using the not yet finished (aka working) directory, or the finished one.

So first let’s write the receiveImage function which is there to get the files from clients:

func receiveImage(w http.ResponseWriter, r *http.Request) {
    if r.Method == http.MethodPost {
        values, err := url.ParseQuery(r.URL.RawQuery)
        if err != nil {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error:", err)
            return
        }
        if len(values.Get("id")) == 0 {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error:","Wrong input id.")
            return
        }
        if values.Get("state") != "working" && values.Get("state") != "finished" {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error:","Wrong input state.")
            return
        }
    } else {
        w.WriteHeader(http.StatusBadRequest)
        fmt.Fprint(w, "Error: Only POST accepted")
    }
}

Here we check if the request method is POST, if there is an id, and if the state is working or finished.

Next we can create the file and put in the image:

if values.Get("state") != "working" && values.Get("state") != "finished" {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error:","Wrong input state.")
            return
        }

        _, err = strconv.Atoi(values.Get("id"))
        if err != nil {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error:","Wrong input id.")
            return
        }

        file, err := os.Create("/tmp/" + values.Get("state") + "/" + values.Get("id") + ".png")
        defer file.Close()
        if err != nil {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error:", err)
            return
        }

        _, err = io.Copy(file, r.Body)
        if err != nil {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error:", err)
            return
        }

        fmt.Fprint(w, "success")

We create a file in the tmp/state directory with the right id. Another thing we do is check if the id really is a valid int. We parse it to an int, to see if it succeeds and if it does then we use it, as a string.

we use the io.Copy function to put all the data from the request to the file. That means that the body of our request should be a raw image.

Next we can write the function to serve images which is pretty similar:

func serveImage(w http.ResponseWriter, r *http.Request) {
    if r.Method == http.MethodGet {
        values, err := url.ParseQuery(r.URL.RawQuery)
        if err != nil {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error:", err)
            return
        }
        if len(values.Get("id")) == 0 {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error:","Wrong input id.")
            return
        }
        if values.Get("state") != "working" && values.Get("state") != "finished" {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error:","Wrong input state.")
            return
        }

        _, err = strconv.Atoi(values.Get("id"))
        if err != nil {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error:","Wrong input id.")
            return
        }

        file, err := os.Open("/tmp/" + values.Get("state") + "/" + values.Get("id") + ".png")
        defer file.Close()
        if err != nil {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error:", err)
            return
        }

        _, err = io.Copy(w, file)
        if err != nil {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error:", err)
            return
        }
    } else {
        w.WriteHeader(http.StatusBadRequest)
        fmt.Fprint(w, "Error: Only GET accepted")
    }
}

Instead of creating the file, we open it. Instead of copying to the file we copy from it. And we check if the method is GET.

That’s it. We’ve got a storage service which saves and servers raw image files. Now we can get to the master!

The master

We now have all the dependencies the master needs. So let’s write it now. Here’s the basic structure:

package main

import (
    "os"
    "fmt"
    "net/http"
    "io/ioutil"
)

type Task struct {
    Id int `json:"id"`
    State int `json:"state"`
}

var databaseLocation string
var storageLocation string

func main() {
    if !registerInKVStore() {
        return
    }

    http.HandleFunc("/new", newImage)
    http.HandleFunc("/get", getImage)
    http.HandleFunc("/isReady", isReady)
    http.HandleFunc("/getNewTask", getNewTask)
    http.HandleFunc("/registerTaskFinished", registerTaskFinished)
    http.ListenAndServe(":3003", nil)
}

func newImage(w http.ResponseWriter, r *http.Request) {
}

func getImage(w http.ResponseWriter, r *http.Request) {
}

func isReady(w http.ResponseWriter, r *http.Request) {
}

func getNewTask(w http.ResponseWriter, r *http.Request) {
}

func registerTaskFinished(w http.ResponseWriter, r *http.Request) {
}

func registerInKVStore() bool {
    if len(os.Args) < 3 {
        fmt.Println("Error: Too few arguments.")
        return false
    }
    masterAddress := os.Args[1] // The address of itself
    keyValueStoreAddress := os.Args[2]

    response, err := http.Post("http://" + keyValueStoreAddress + "/set?key=masterAddress&value=" + masterAddress, "", nil)
    if err != nil {
        fmt.Println(err)
        return false
    }
    data, err := ioutil.ReadAll(response.Body)
    if err != nil {
        fmt.Println(err)
        return false
    }
    if response.StatusCode != http.StatusOK {
        fmt.Println("Error: Failure when contacting key-value store: ", string(data))
        return false
    }
    return true
}

It’s the structure of the API and the mechanics to register in the k/v store.

We also need to get the storage and database locations in the main function:

if !registerInKVStore() {
        return
    }
    keyValueStoreAddress = os.Args[2]

    response, err := http.Get("http://" + keyValueStoreAddress + "/get?key=databaseAddress")
    if response.StatusCode != http.StatusOK {
        fmt.Println("Error: can't get database address.")
        fmt.Println(response.Body)
        return
    }
    data, err := ioutil.ReadAll(response.Body)
    if err != nil {
        fmt.Println(err)
        return
    }
    databaseLocation = string(data)

    response, err = http.Get("http://" + keyValueStoreAddress + "/get?key=storageAddress")
    if response.StatusCode != http.StatusOK {
        fmt.Println("Error: can't get storage address.")
        fmt.Println(response.Body)
        return
    }
    data, err = ioutil.ReadAll(response.Body)
    if err != nil {
        fmt.Println(err)
        return
    }
    storageLocation = string(data)

Now we can start implementing all the functionality!

Let’s start with the newImage function as it contains a good bit of code and mechanics which will be again used in the other funtions.
Here’s the beginning:

func newImage(w http.ResponseWriter, r *http.Request) {
    if r.Method == http.MethodPost {
        response, err := http.Post("http://" + databaseLocation + "/newTask", "text/plain", nil)
        if err != nil {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error:", err)
            return
        }
        id, err := ioutil.ReadAll(response.Body)
        if err != nil {
            fmt.Println(err)
            return
        }
    } else {
        w.WriteHeader(http.StatusBadRequest)
        fmt.Fprint(w, "Error: Only POST accepted")
    }
}

As usual we check if the method is right. Next we register a new Task in the database and get and Id.

We now use this to send the image to the storage:

id, err := ioutil.ReadAll(response.Body)
if err != nil {
    fmt.Println(err)
    return
}

_, err = http.Post("http://" + storageLocation + "/sendImage?id=" + string(id) + "&state=working", "image", r.Body)
if err != nil {
    w.WriteHeader(http.StatusBadRequest)
    fmt.Fprint(w, "Error:", err)
    return
}
fmt.Fprint(w, string(id))

That’s it. The new task will be created, the storage will get a file into the working directory with the name of the file being the id, and the client gets back the id. The important thing here is that we need the raw image in the request. The user form has to be parsed in the frontend service.

Now we can create the function which just checks if a Task is ready:

func isReady(w http.ResponseWriter, r *http.Request) {
    if r.Method == http.MethodGet {
        values, err := url.ParseQuery(r.URL.RawQuery)
        if err != nil {
            fmt.Fprint(w, err)
            return
        }
        if len(values.Get("id")) == 0 {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Wrong input")
            return
        }
    } else {
        w.WriteHeader(http.StatusBadRequest)
        fmt.Fprint(w, "Error: Only GET accepted")
    }
}

We first have to verify all the parameters and the request method. Next we can ask the database for the Task requested:

if len(values.Get("id")) == 0 {
    w.WriteHeader(http.StatusBadRequest)
    fmt.Fprint(w, "Wrong input")
    return
}

response, err := http.Get("http://" + databaseLocation + "/getById?id=" + values.Get("id"))
if err != nil {
    w.WriteHeader(http.StatusBadRequest)
    fmt.Fprint(w, "Error:", err)
    return
}
data, err := ioutil.ReadAll(response.Body)
if err != nil {
    fmt.Println(err)
    return
}

We also read the response immediately. Now we can parse the Task and respond to the client:

if err != nil {
    fmt.Println(err)
    return
}

myTask := Task{}
json.Unmarshal(data, &myTask)

if(myTask.State == 2) {
    fmt.Fprint(w, "1")
} else {
    fmt.Fprint(w, "0")
}

So now we can implement the last client facing interface, the getImage function:

if r.Method == http.MethodGet {
        values, err := url.ParseQuery(r.URL.RawQuery)
        if err != nil {
            fmt.Fprint(w, err)
            return
        }
        if len(values.Get("id")) == 0 {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Wrong input")
            return
        }
} else {
    w.WriteHeader(http.StatusBadRequest)
    fmt.Fprint(w, "Error: Only GET accepted")
}

Here we verified the request and now we need to get the image from the storage system, and just copy the response to our client:

if len(values.Get("id")) == 0 {
    w.WriteHeader(http.StatusBadRequest)
    fmt.Fprint(w, "Wrong input")
    return
}

response, err := http.Get("http://" + storageLocation + "/getImage?id=" + values.Get("id") + "&state=finished")
if err != nil {
    w.WriteHeader(http.StatusBadRequest)
    fmt.Fprint(w, "Error:", err)
    return
}

_, err = io.Copy(w, response.Body)
if err != nil {
    w.WriteHeader(http.StatusBadRequest)
    fmt.Fprint(w, "Error:", err)
    return
}

That’s it! The client facing interface is finished!

Implementing the worker facing interface

Now we have to implement the functions to serve the workers.

Both functions will basically be just direct routes to the database and back, so now let’s write ’em too:

func getNewTask(w http.ResponseWriter, r *http.Request) {
    if r.Method == http.MethodPost {
        response, err := http.Post("http://" + databaseLocation + "/getNewTask", "text/plain", nil)
        if err != nil {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error:", err)
            return
        }

        _, err = io.Copy(w, response.Body)
        if err != nil {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error:", err)
            return
        }
    } else {
        w.WriteHeader(http.StatusBadRequest)
        fmt.Fprint(w, "Error: Only POST accepted")
    }
}

func registerTaskFinished(w http.ResponseWriter, r *http.Request) {
    if r.Method == http.MethodPost {
        values, err := url.ParseQuery(r.URL.RawQuery)
        if err != nil {
            fmt.Fprint(w, err)
            return
        }
        if len(values.Get("id")) == 0 {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Wrong input")
            return
        }

        response, err := http.Post("http://" + databaseLocation + "/finishTask?id=" + values.Get("id"), "test/plain", nil)
        if err != nil {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error:", err)
            return
        }

        _, err = io.Copy(w, response.Body)
        if err != nil {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error:", err)
            return
        }
    } else {
        w.WriteHeader(http.StatusBadRequest)
        fmt.Fprint(w, "Error: Only POST accepted")
    }
}

There’s not much to explain. They are both just passing further the request and responding with what they get.

You could think the workers should communicate directly with the database to get new Tasks. And with the current implementation it would work perfectly. However, if we wanted to add some functionality the master wanted to do for each of those requests it would be hard to implement. So this way is very extensible, and that’s nearly always what we want.

Conclusion

Now we have finished the Master and the Storage system. We now have the dependencies to create the workers and frontend which we will implement in the next part. As always I encourage you to comment about your opinion. Have fun extending the system to do what you want to achieve!

Next part

Web app using Microservices in Go: Part 2 – k/v store and Database

Previous part

Introduction

In this part we will implement part of the microservices needed for our web app. We will implement the:
* key-value store
* Database

This will be a pretty code heavy tutorial so concentrate and have fun!

The key-value store

Design

The design hasn’t changed much. We will save the key-value pairs as a global map, and create a global mutex for concurrent access. We’ll also add the ability to list all key-value pairs for debugging/analytical purposes. We will also add the ability to delete existing entries.

First, let’s create the structure:

package main

import (
    "net/http"
    "sync"
    "net/url"
    "fmt"
)

var keyValueStore map[string]string
var kVStoreMutex sync.RWMutex

func main() {
    keyValueStore = make(map[string]string)
    kVStoreMutex = sync.RWMutex{}
    http.HandleFunc("/get", get)
    http.HandleFunc("/set", set)
    http.HandleFunc("/remove", remove)
    http.HandleFunc("/list", list)
    http.ListenAndServe(":3000", nil)
}

func get(w http.ResponseWriter, r *http.Request) {
}

func set(w http.ResponseWriter, r *http.Request) {
}

func remove(w http.ResponseWriter, r *http.Request) {
}

func list(w http.ResponseWriter, r *http.Request) {
}

And now let’s dive into the implementation.

First, we should add parameter parsing in the get function and verify that the key parameter is right.

func get(w http.ResponseWriter, r *http.Request) {
    if(r.Method == http.MethodGet) {
        values, err := url.ParseQuery(r.URL.RawQuery)
        if err != nil {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error:", err)
            return
        }
        if len(values.Get("key")) == 0 {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error:","Wrong input key.")
            return
        }
    } else {
        w.WriteHeader(http.StatusBadRequest)
        fmt.Fprint(w, "Error: Only GET accepted.")
    }
}

The key shouldn’t have a length of 0, hence the length check. We also check if the method is GET, if it isn’t we print it and set the status code to bad request.
We answer with an explicit Error: before each error message so it doesn’t get misinterpreted by the client as a value.

Now, let’s access our map and send back a response:

if len(values.Get("key")) == 0 {
    w.WriteHeader(http.StatusBadRequest)
    fmt.Fprint(w, "Error:","Wrong input key.")
    return
}

kVStoreMutex.RLock()
value := keyValueStore[string(values.Get("key"))]
kVStoreMutex.RUnlock()

fmt.Fprint(w, value)

We copy the value into a variable so that we don’t block the map while sending back the response.

Now let’s create the set function, it’s actually pretty similar.

func set(w http.ResponseWriter, r *http.Request) {
    if(r.Method == http.MethodPost) {
        values, err := url.ParseQuery(r.URL.RawQuery)
        if err != nil {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error:", err)
            return
        }
        if len(values.Get("key")) == 0 {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error:", "Wrong input key.")
            return
        }
        if len(values.Get("value")) == 0 {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error:", "Wrong input value.")
            return
        }

        kVStoreMutex.Lock()
        keyValueStore[string(values.Get("key"))] = string(values.Get("value"))
        kVStoreMutex.Unlock()

        fmt.Fprint(w, "success")
    } else {
        w.WriteHeader(http.StatusBadRequest)
        fmt.Fprint(w, "Error: Only POST accepted.")
    }
}

The only difference is that we also check if there is a right value parameter and check if the method is POST.

Now we can add the implementation of the list function which is also pretty simple:

func list(w http.ResponseWriter, r *http.Request) {
    if(r.Method == http.MethodGet) {
        kVStoreMutex.RLock()
        for key, value := range keyValueStore {
            fmt.Fprintln(w, key, ":", value)
        }
        kVStoreMutex.RUnlock()
    } else {
        w.WriteHeader(http.StatusBadRequest)
        fmt.Fprint(w, "Error: Only GET accepted.")
    }
}

It just ranges over the map and prints everything. Simple yet effective.

And to finish the key-value store we will implement the remove function:

func remove(w http.ResponseWriter, r *http.Request) {
    if(r.Method == http.MethodDelete) {
        values, err := url.ParseQuery(r.URL.RawQuery)
        if err != nil {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error:", err)
            return
        }
        if len(values.Get("key")) == 0 {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error:", "Wrong input key.")
            return
        }

        kVStoreMutex.Lock()
        delete(keyValueStore, values.Get("key"))
        kVStoreMutex.Unlock()

        fmt.Fprint(w, "success")
    } else {
        w.WriteHeader(http.StatusBadRequest)
        fmt.Fprint(w, "Error: Only DELETE accepted.")
    }
}

It’s the same as setting a value, but instead of setting it we delete it.

The database

Design

After thinking through the design, I decided that it would be better if the database generated the task Id‘s. This will also make it easier to get the last non-finished task and generate consecutive Id‘s

How it will work:
* It will save new tasks assigning consecutive Id‘s.
* It will allow to get a new task to do.
* It will allow to get a task by Id.
* It will allow to set a task by Id.
* The state will be represented by an int:
* 0 – not started
* 1 – in progress
* 2 – finished
* It will change the state of a task to not started if it’s been too long in progress. (maybe someone started to work on it but has crashed)
* It will allow to list all tasks for debugging/analytical purposes.

Database microservice post

Implementation

First, we should create the API and later we will add the implementations of the functionality as before with the key-value store. We will also need a global map being our data store, a variable pointing to the oldest not started task, and mutexes for accessing the datastore and pointer.

package main

import (
    "net/http"
    "net/url"
    "fmt"
)

type Task struct {
}

var datastore map[int]Task
var datastoreMutex sync.RWMutex
var oldestNotFinishedTask int // remember to account for potential int overflow in production. Use something bigger.
var oNFTMutex sync.RWMutex

func main() {

    datastore = make(map[int]Task)
    datastoreMutex = sync.RWMutex{}
    oldestNotFinishedTask = 0
    oNFTMutex = sync.RWMutex{}

    http.HandleFunc("/getById", getById)
    http.HandleFunc("/newTask", newTask)
    http.HandleFunc("/getNewTask", getNewTask)
    http.HandleFunc("/finishTask", finishTask)
    http.HandleFunc("/setById", setById)
    http.HandleFunc("/list", list)
    http.ListenAndServe(":3001", nil)
}

func getById(w http.ResponseWriter, r *http.Request) {
}

func newTask(w http.ResponseWriter, r *http.Request) {
}

func getNewTask(w http.ResponseWriter, r *http.Request) {
}

func finishTask(w http.ResponseWriter, r *http.Request) {
}

func setById(w http.ResponseWriter, r *http.Request) {
}

func list(w http.ResponseWriter, r *http.Request) {
}

We also already declared the Task type which we will use for storage.

So far so good. Now let’s implement all those functions!

First, let’s implement the getById function.

func getById(w http.ResponseWriter, r *http.Request) {
    if r.Method == http.MethodGet {
        values, err := url.ParseQuery(r.URL.RawQuery)
        if err != nil {
            fmt.Fprint(w, err)
            return
        }
        if len(values.Get("id")) == 0 {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Wrong input")
            return
        }

        id, err := strconv.Atoi(string(values.Get("id")))
        if err != nil {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, err)
            return
        }

        datastoreMutex.RLock()
        bIsInError := err != nil || id >= len(datastore) // Reading the length of a slice must be done in a synchronized manner. That's why the mutex is used.
        datastoreMutex.RUnlock()

        if bIsInError {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Wrong input")
            return
        }

        datastoreMutex.RLock()
        value := datastore[id]
        datastoreMutex.RUnlock()

        response, err := json.Marshal(value)

        if err != nil {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, err)
            return
        }

        fmt.Fprint(w, string(response))
    } else {
        w.WriteHeader(http.StatusBadRequest)
        fmt.Fprint(w, "Error: Only GET accepted")
    }
}

We check if the GET method has been used. Later we parse the id argument and check if it’s proper. We then get the id as an int using the strconv.Atoi function. Next we make sure it is not out of bounds for our datastore, which we have to do using mutexes because we’re accessing a map which could be accessed from another thread. If everything is ok, then, again using mutexes, we get the task using the id.

After that we use the JSON library to marshal our struct into a JSON object and if that finishes without problems we send the JSON object to the client.

It’s also time to implement our Task struct:

type Task struct {
    Id int `json:"id"`
    State int `json:"state"`
}

It’s all that’s needed. We also added the information the JSON marshaller needs.

We can now go on with implementing the newTask function:

func newTask(w http.ResponseWriter, r *http.Request) {
    if r.Method == http.MethodPost {
        datastoreMutex.Lock()
        taskToAdd := Task{
            Id: len(datastore),
            State: 0,
        }
        datastore[taskToAdd.Id] = taskToAdd
        datastoreMutex.Unlock()

        fmt.Fprint(w, taskToAdd.Id)
    } else {
        w.WriteHeader(http.StatusBadRequest)
        fmt.Fprint(w, "Error: Only POST accepted")
    }
}

It’s pretty small actually. Creating a new Task with the next id and adding it to the datastore. After that it sends back the new Tasks Id.

That means we can go on to implementing the function used to list all Tasks, as this helps with debugging during writing.

It’s basically the same as with the key-value store:

func list(w http.ResponseWriter, r *http.Request) {
    if r.Method == http.MethodGet {
        datastoreMutex.RLock()
        for key, value := range datastore {
            fmt.Fprintln(w, key, ": ", "id:", value.Id, " state:", value.State)
        }
        datastoreMutex.RUnlock()
    } else {
        w.WriteHeader(http.StatusBadRequest)
        fmt.Fprint(w, "Error: Only GET accepted")
    }
}

Ok, so now we will implement the function which can set the Task by id:

func setById(w http.ResponseWriter, r *http.Request) {
    if r.Method == http.MethodPost {
        taskToSet := Task{}

        data, err := ioutil.ReadAll(r.Body)
        if err != nil {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, err)
            return
        }
        err = json.Unmarshal([]byte(data), &taskToSet)
        if err != nil {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, err)
            return
        }

        bErrored := false
        datastoreMutex.Lock()
        if taskToSet.Id >= len(datastore) || taskToSet.State > 2 || taskToSet.State < 0 {
            bErrored = true
        } else {
            datastore[taskToSet.Id] = taskToSet
        }
        datastoreMutex.Unlock()

        if bErrored {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error: Wrong input")
            return
        }

        fmt.Fprint(w, "success")
    } else {
        w.WriteHeader(http.StatusBadRequest)
        fmt.Fprint(w, "Error: Only POST accepted")
    }
}

Nothing new. We get the request and try to unmarshal it. If it succeeds we put it into the map, checking if it isn’t out of bounds or if the state is invalid. If it is then we print an error, otherwise we print success.

If we already have this we can now implement the finish task function, because it’s very simple:

func finishTask(w http.ResponseWriter, r *http.Request) {
    if r.Method == http.MethodPost {
        values, err := url.ParseQuery(r.URL.RawQuery)
        if err != nil {
            fmt.Fprint(w, err)
            return
        }
        if len(values.Get("id")) == 0 {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Wrong input")
            return
        }

        id, err := strconv.Atoi(string(values.Get("id")))

        if err != nil {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, err)
            return
        }

        updatedTask := Task{Id: id, State: 2}

        bErrored := false

        datastoreMutex.Lock()
        if datastore[id].State == 1 {
            datastore[id] = updatedTask
        } else {
            bErrored = true
        }
        datastoreMutex.Unlock()

        if bErrored {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error: Wrong input")
            return
        }

        fmt.Fprint(w, "success")
    } else {
        w.WriteHeader(http.StatusBadRequest)
        fmt.Fprint(w, "Error: Only POST accepted")
    }
}

It’s pretty similar to the getById function. The difference here is that here we update the state and only if it is currently in progress.

And now to one of the most interesting functions. The getNewTask function. It has to handle updating the oldest known finished task, and it also needs to handle the situation when someone takes a task but crashes during work. This would lead to a ghost task forever being in progress. That’s why we’ll add functionality which after 120 seconds from starting a task will set it back to not started:

func getNewTask(w http.ResponseWriter, r *http.Request) {
    if r.Method == http.MethodPost {

        bErrored := false

        datastoreMutex.RLock()
        if len(datastore) == 0 {
            bErrored = true
        }
        datastoreMutex.RUnlock()

        if bErrored {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error: No non-started task.")
            return
        }

        taskToSend := Task{Id: -1, State: 0}

        oNFTMutex.Lock()
        datastoreMutex.Lock()
        for i := oldestNotFinishedTask; i < len(datastore); i++ {
            if datastore[i].State == 2 && i == oldestNotFinishedTask {
                oldestNotFinishedTask++
                continue
            }
            if datastore[i].State == 0 {
                datastore[i] = Task{Id: i, State: 1}
                taskToSend = datastore[i]
                break
            }
        }
        datastoreMutex.Unlock()
        oNFTMutex.Unlock()

        if taskToSend.Id == -1 {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error: No non-started task.")
            return
        }

        myId := taskToSend.Id

        go func() {
            time.Sleep(time.Second * 120)
            datastoreMutex.Lock()
            if datastore[myId].State == 1 {
                datastore[myId] = Task{Id: myId, State: 0}
            }
            datastoreMutex.Unlock()
        }()

        response, err := json.Marshal(taskToSend)

        if err != nil {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, err)
            return
        }

        fmt.Fprint(w, string(response))
    } else {
        w.WriteHeader(http.StatusBadRequest)
        fmt.Fprint(w, "Error: Only POST accepted")
    }
}

First we try to find the oldest task that hasn’t started yet. By the way we update the oldestNotFinishedTask variable. If a task is finished and is pointed on by the variable, the variable get’s incremented. If we find something that’s not started, then we break out of the loop and send it back to the user setting it to in progress. However, on the way we start a function on another thread that will change the state of the task back to not started if it’s still in progress after 120 seconds.

Now the last thing. A database is useless… when you don’t know where it is! That’s why we’ll now implement the mechanism that the database will use to register itself in the key-value store:

func main() {

    if !registerInKVStore() {
        return
    }

    datastore = make(map[int]Task)

and later we define the function:

func registerInKVStore() bool {
    if len(os.Args) < 3 {
        fmt.Println("Error: Too few arguments.")
        return false
    }
    databaseAddress := os.Args[1] // The address of itself
    keyValueStoreAddress := os.Args[2]

    response, err := http.Post("http://" + keyValueStoreAddress + "/set?key=databaseAddress&value=" + databaseAddress, "", nil)
    if err != nil {
        fmt.Println(err)
        return false
    }
    data, err := ioutil.ReadAll(response.Body)
    if err != nil {
        fmt.Println(err)
        return false
    }
    if response.StatusCode != http.StatusOK {
        fmt.Println("Error: Failure when contacting key-value store: ", string(data))
        return false
    }
    return true
}

We check if there are at least 3 arguments. (The first being the executable) We read the current database address from the second argument and the key-value store address from the third argument. We use them to make a POST request where we add a databaseAddress key to the k/v store and set its value to the current database address. If the status code of the response isn’t OK then we know we messed up and we print the error we got. After that we quit the program.

Conclusion

We now have finished our k/v store and our database. You can even test them now using a REST client. (I used this one.) Remember that the code is subject to change if it will be necessary but I don’t think so. I hope you enjoyed the tutorial! I encourage you to comment, and if you have an opposing view to mine please make sure to express it in a comment too!

UPDATE: I changed the sync.Mutex to sync.RWMutex, and in the places where we only read data I changed mutex.Lock/Unlock to mutex.RLock/RUnlock.

UPDATE2: For some reason I used a slice for the database code although I tested with a map. Sorry for that, corrected it already.

Next part

Web app using Microservices in Go: Part 1 – Design

Introduction

Recently it’s a constantly repeated buzzword – Microservices. You can love ’em or hate ’em, but you really shouldn’t ignore ’em. In this short series we’ll create a web app using a microservice architecture. We’ll try not to use 3rd party tools and libraries. Remember though that when creating a production web app it is highly recommended to use 3rd party libraries (even if only to save you time).

We will create the various components in a basic form. We won’t use advanced caching or use a database. We will create a basic key-value store and a simple storage service. We will use the Go language for all this.

UPDATE: as there are comments regarding overcomplication: this is meant to show a scalable and working skeleton for a microservice architecture. If you only want to add some filters to photos, don’t design it like that. It’s overkill.

On further thought and another comment, (Which you can find on the golang Reddit) do design it this way. Software usually lives much longer than we think it will, and such a design will lead to an easily extendable and scalable web app.

The functionality

First we should decide what our web app will do. The web app we’ll create in this series will get an image from a user and give back an unique ID. The image will get modified using complicated and highly sophisticated algorithms, like swapping the blue and red channel, and the user will be able to use the ID to check if the work on the image has been finished already or if it’s still in progress. If it’s finished he will be able to download the altered image.

Designing the architecture

We want the architecture to be microservices, so we should design it like that. We’ll for sure need a service facing the user, the one that provides the interface for communication with our app. This could also handle authentication, and should be used as the service redirecting the workload to the right sub-services. (useful if you plan to integrate more funcionality into the app)

We will also want a microservice which will handle all our images. It will get the image, generate an ID, store information related to each task, and save the images. To handle high workloads it’s a good idea to use a master-slave system for our image modification service. The image handler will be the master, and we will create slave microservices which will ask the master for images to work on.

We will also need a key-value datastore for various configuration, a storage system, for saving our images, pre- and post-modification, and a database-ish service holding the information about each task.

This should suffice to begin with.

Here I’d like to also state that the architecture could change during the series if needed. And I encourage you to comment if you think that something could be done better.

Communication

We will also need to define the method the services communicate by. In this app we will use REST everywhere. You could also use a message BUS or Remote Procedure Calls – short RPC, but I won’t write about them here.

Designing the microservice API’s

Another important thing is to design the API‘s of you microservices. We will now design each of them to get an understanding about what they are for.

The key-value store

This one’s mainly for configuration. It will have a simple post-get interface:

  • POST:
    • Arguments:
      • Key
      • Value
    • Response:
      • Success/Failure
  • GET:
    • Arguments:
      • Key
    • Response:
      • Value/Failure

The storage

Here we will store the images, again using a key-value interface and an argument stating if this one’s pre- or post-modification. For the sake of simplicity we will just save the image to a folder named, depending on the state of the image, finished/inProgress.

  • POST:
    • Arguments:
      • Key
      • State: pre-/post-modification
      • Data
    • Response:
      • Success/Failure
  • GET:
    • Arguments:
      • Key
      • State: pre-/post-modification
    • Response:
      • Data/Failure

Database

This one will save our tasks. If they are waiting to start, in progress or finished, their Id.

  • POST:
    • Arguments:
      • TaskId
      • State: not started/ in progress/ finished
    • Response:
      • Success/Failure
  • GET:
    • Arguments:
      • TaskId
    • Response:
      • State/Failure
  • GET:
    • Path:
      • not started/ in progress/ finished
    • Reponse:
      • list of TaskId’s

The Frontend

The frontend is there mainly to provide a communication way between the various services and the user. It can also be used for authentication and authorization.

  • POST:
    • Path:
      • newImage
    • Arguments:
      • Data
    • Response:
      • Id
  • GET:
    • Path:
      • image/isReady
    • Arguments:
      • Id
    • Response:
      • not found/ in progress / finished
  • GET:
    • Path:
      • image/get
    • Arguments:
      • Id
    • Response:
      • Data

Image master microservice

This one will get new images from the fronted/user and send them to the storage service. It will also create a new task in the database, and orchestrate the workers who can ask for work and notify when it’s finished.

  • Frontend interface:
    • POST:
      • Path:
        • newImage
      • Arguments:
        • Data
      • Response:
        • Id
    • GET:
      • Path:
        • isReady
      • Arguments:
        • Id
      • Response:
        • not found/ in progress / finished
    • GET:
      • Path:
        • get
      • Arguments:
        • Id
      • Response:
        • Data/Failure
  • Worker interface:
    • GET:
      • Path:
        • getWork
      • Response:
        • Id/noWorkToDo
    • POST:
      • Path:
        • workFinished
      • Arguments:
        • Id
      • Response:
        • Success/Failure

Image worker microservice

This one doesn’t have any API. It is a client to the master image service, which he finds using the key-value store. He gets the image data to work on from the storage service.

Scheme

The microservice architecure design

Conclusion

This is basically everything regarding the design. In the next part we will write part of the microservices. Again, I encourage you to comment expressing what you think about this design!

Next part

Practical Golang: Using Google Drive and Calendar

Introduction

Integrating Google services into your app can lead to a lot of nice features for your users, and can create a seamless experience for them. In this tutorial we’ll learn how to use the most useful functionalities of Google Calendar and Google Drive.

The theory

To begin with, we should understand the methodology of using the Google API in Golang. For most of their API’s I’ve skimmed through it works like that:

  1. Create an OAuth2 client from the OAuth2 access token.
  2. Use the client to create an app service, this will be our interface we’ll use to communicate with Google services.
  3. We create a request object and set the needed parameters.
  4. We start the action, usually using the Do() function on the request object.

After you learn it for one Google service, it will be trivial for you to use it for any other.

Dependencies

Here we will need the Google Calendar and Google Drive libraries, both of which we need version 3 of. (the newest at the moment)

So make sure to:

go get google.golang.org/api/calendar/v3
go get google.golang.org/api/drive/v3

The basic structure

For both of our apps we’ll need the same basic OAuth2 app structure. You can learn more about it in my previous article

package main

import (
    "fmt"
    "net/http"
    "golang.org/x/oauth2"
    "os"
    "golang.org/x/oauth2/google"
    "golang.org/x/net/context"
    "time"
)

var (
    googleOauthConfig = &oauth2.Config{
        RedirectURL:    "http://localhost:3000/GoogleCallback",
        ClientID:     os.Getenv("googlekey"), // from https://console.developers.google.com/project/<your-project-id>/apiui/credential
        ClientSecret: os.Getenv("googlesecret"), // from https://console.developers.google.com/project/<your-project-id>/apiui/credential
        Scopes:       []string{},
        Endpoint:     google.Endpoint,
    }
// Some random string, random for each request
    oauthStateString = "random"
)

const htmlIndex = `<html><body>
<a href="/GoogleLogin">Log in with Google</a>
</body></html>
`

func main() {
    http.HandleFunc("/", handleMain)
    http.HandleFunc("/GoogleLogin", handleGoogleLogin)
    http.HandleFunc("/GoogleCallback", handleGoogleCallback)
    fmt.Println(http.ListenAndServe(":3000", nil))
}
func handleMain(w http.ResponseWriter, r *http.Request) {
    fmt.Fprintf(w, htmlIndex)
}

func handleGoogleLogin(w http.ResponseWriter, r *http.Request) {
    url := googleOauthConfig.AuthCodeURL(oauthStateString)
    http.Redirect(w, r, url, http.StatusTemporaryRedirect)
}

func handleGoogleCallback(w http.ResponseWriter, r *http.Request) {
    state := r.FormValue("state")
    if state != oauthStateString {
        fmt.Printf("invalid oauth state, expected '%s', got '%s'\n", oauthStateString, state)
        http.Redirect(w, r, "/", http.StatusTemporaryRedirect)
        return
    }

    code := r.FormValue("code")
    token, err := googleOauthConfig.Exchange(oauth2.NoContext, code)
    if err != nil {
        fmt.Printf("oauthConf.Exchange() failed with '%s'\n", err)
        http.Redirect(w, r, "/", http.StatusTemporaryRedirect)
        return
    }

    client := oauth2.NewClient(context.Background(), oauth2.StaticTokenSource(token))
}

There is one thing I haven’t covered in my previous blog post, namely the last line:

client := oauth2.NewClient(context.Background(), oauth2.StaticTokenSource(token))

We need an OAuth2 client to use the Google API, so we create one. It takes a context, for lack of which we just use the background context. It also needs a token source. As we only want to make one request and know that this token will suffice we create a static token source which will always generate the same token which we’ve passed to it.

Creating the Calendar app.

Preperation

First, as described in my previous article, you should enable the Google Calendar API in the Google Cloud Console for you app.

Also, we’ll need to ask for permission, so add the https://www.googleapis.com/auth/calendar scope to our googleOauthConfig:

googleOauthConfig = &oauth2.Config{
        RedirectURL:    "http://localhost:3000/GoogleCallback",
        ClientID:     os.Getenv("googlekey"), // from https://console.developers.google.com/project/<your-project-id>/apiui/credential
        ClientSecret: os.Getenv("googlesecret"), // from https://console.developers.google.com/project/<your-project-id>/apiui/credential
        Scopes:       []string{"https://www.googleapis.com/auth/calendar"},
        Endpoint:     google.Endpoint,
    }

Remember to import google.golang.org/api/calendar/v3

The main code

We’ll add everything we write right after creating our OAuth2 client.

First, as I described before, we’ll need an app service, here it will be the calendar service, so let’s create it!

client := oauth2.NewClient(context.Background(), oauth2.StaticTokenSource(token))

calendarService, err := calendar.New(client)
if err != nil {
  fmt.Fprintln(w, err)
  return
}

It just uses the OAuth client to create the service and errors out if something goes wrong.

Listing events

Now we will create a request, add a few optional parameters to it and start it. We’ll build it up step by step.

calendarService, err := calendar.New(client)
if err != nil {
  fmt.Fprintln(w, err)
  return
}

calendarService.Events.List("primary")

This creates a request to list all events in your primary calendar. You could also name a specific calendar, but using primary will take the primary calendar of that user.

So… I think we don’t really care about the events 5 years ago. So let’s only take the upcoming ones.

calendarService.Events.List("primary").TimeMin(time.Now().Format(time.RFC3339))

We add an option TimeMin which takes a DateTime by string… No idea why it isn’t a nice struct like time.DateTime. You also need to format it as a string in the RFC3339 format.

Ok… but that could be a lot of events, so we’ll just take the 5 first:

calendarService.Events.List("primary").TimeMin(time.Now().Format(time.RFC3339)).MaxResults(5)

Now we just have to Do() it, and store the results:

calendarEvents, err := calendarService.Events.List("primary").TimeMin(time.Now().Format(time.RFC3339)).MaxResults(5).Do()
if err != nil {
  fmt.Fprintln(w, err)
  return
}

How can we now do something with the results? Simple! :

calendarEvents, err := calendarService.Events.List("primary").TimeMin(time.Now().Format(time.RFC3339)).MaxResults(5).Do()
if err != nil {
  fmt.Fprintln(w, err)
  return
}

if len(calendarEvents.Items) > 0 {
    for _, i := range calendarEvents.Items {
        fmt.Fprintln(w, i.Summary, " ", i.Start.DateTime)
    }
}

We access a list of events using the Items field in the calendarEvents variable, if there is at least one element, then for each element we write the summary and start time to the response writer using a for range loop.

Creating an event

Ok, we already know how to list events, now let’s create an event!
First, we need an event object:

if len(calendarEvents.Items) > 0 {
    for _, i := range calendarEvents.Items {
        fmt.Fprintln(w, i.Summary, " ", i.Start.DateTime)
    }
}
newEvent := calendar.Event{
    Summary: "Testevent",
    Start: &calendar.EventDateTime{DateTime: time.Date(2016, 3, 11, 12, 24, 0, 0, time.UTC).Format(time.RFC3339)},
    End: &calendar.EventDateTime{DateTime: time.Date(2016, 3, 11, 13, 24, 0, 0, time.UTC).Format(time.RFC3339)},
}

We create an Event struct and pass in the summary – title of the event.
We also pass the start and finish DateTime. We create a date using the stdlib time package, and then convert it to a string in the RFC3339 format. There are tons of other optional fields you can specify if you want to.

Next we need to create an insert request object:

newEvent := calendar.Event{
    Summary: "Testevent",
    Start: &calendar.EventDateTime{DateTime: time.Date(2016, 3, 11, 12, 24, 0, 0, time.UTC).Format(time.RFC3339)},
    End: &calendar.EventDateTime{DateTime: time.Date(2016, 3, 11, 13, 24, 0, 0, time.UTC).Format(time.RFC3339)},
}
calendarService.Events.Insert("primary", &newEvent)

The Insert request takes two arguments, the calendar name and an event object.

As usual we neeed to Do() the request! and saving the resulting created event can also come handy in the future:

createdEvent, err := calendarService.Events.Insert("primary", &newEvent).Do()
if err != nil {
    fmt.Fprintln(w, err)
    return
}

In the end let’s just print some kind of confirmation to the user:

fmt.Fprintln(w, "New event in your calendar: \"", createdEvent.Summary, "\" starting at ", createdEvent.Start.DateTime)

Hint

You can define the event ID yourself before creating it, but you can also let the Google Calendar service generate an ID for us as we did.

Creating the Drive app

Preperation

First, enable the Google Drive API in the Google Cloud Console

Again we will need to ask the user for permission. So we have to use the https://www.googleapis.com/auth/drive and https://www.googleapis.com/auth/drive.file scopes:

googleOauthConfig = &oauth2.Config{
        RedirectURL:    "http://localhost:3000/GoogleCallback",
        ClientID:     os.Getenv("googlekey"), // from https://console.developers.google.com/project/<your-project-id>/apiui/credential
        ClientSecret: os.Getenv("googlesecret"), // from https://console.developers.google.com/project/<your-project-id>/apiui/credential
        Scopes:       []string{"https://www.googleapis.com/auth/drive", "https://www.googleapis.com/auth/drive.file"},
        Endpoint:     google.Endpoint,
    }

Remember to import google.golang.org/api/drive/v3″

The main code

Again we need to start from our basic OAuth2 app structure with an OAuth2 client, and create an app service. Here, this will be the Drive service:

client := oauth2.NewClient(context.Background(), oauth2.StaticTokenSource(token))

driveService, err := drive.New(client)
if err != nil {
    fmt.Fprintln(w, err)
    return
}

Listing files

Ok, to begin with let’s learn how to list files from the Google Drive. There is an important concept you should understand before we start. Whenever we request a list of files from Google Drive, those can literally be thousands of files. That’s why we get one page of files, which includes files metadata, not the content, and a NextPageToken, which we can use to get the next page.

As before with the calendar app, we create a list request.

driveService, err := drive.New(client)
if err != nil {
    fmt.Fprintln(w, err)
    return
}

driveService.Files.List()

But we can also set an option. The ordering for example:

driveService.Files.List().OrderBy("name")

Ok, now let’s Do() it and save the results:

myFilesList, err := driveService.Files.List().OrderBy("name").Do()
if err != nil {
    fmt.Fprintf(w, "Couldn't retrieve files ", err)
}

As I wrote before, this is one page of files, now let’s go over it:

if len(myFilesList.Files) > 0 {
    for _, i := range myFilesList.Files {
        fmt.Fprintln(w, i.Name, " ", i.Id)
    }
} else {
    fmt.Fprintln(w, "No files found.")
}

We check if there are any files, and if there are, we range over them printing the name and Id of each one, else we print that there are none.

Now, let’s get more pages, it’s the myFilesList.NextPageToken holding the token for the next page. If it is an empty string, then this was the last page. While it is present we load new pages into our myFilesList variable.

for myFilesList.NextPageToken != "" {
    myFilesList, err = driveService.Files.List().OrderBy("name").PageToken(myFilesList.NextPageToken).Do()
    if err != nil {
        fmt.Fprintf(w, "Couldn't retrieve files ", err)
        break
    }
    fmt.Fprintln(w, "Next Page: !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
    if len(myFilesList.Files) > 0 {
        for _, i := range myFilesList.Files {
            fmt.Fprintln(w, i.Name, " ", i.Id)
        }
    } else {
        fmt.Fprintln(w, "No files found.")
    }
}

To retrieve the next page we add the PageToken option to our file listing request

    myFilesList, err = driveService.Files.List().OrderBy("name").PageToken(myFilesList.NextPageToken).Do()

Whenever we start printing the new page, we notify a user that a new page just started. Later we check if we have any files, and if we do, then we range over them as before, printing the names and Id’s

Creating a file

We already know how to list files in our Google Drive. Now let’s learn how to create a file and add some content to it!

First, we need to create the file metadata:

for myFilesList.NextPageToken != "" {
    myFilesList, err = driveService.Files.List().OrderBy("name").PageToken(myFilesList.NextPageToken).Do()
    if err != nil {
        fmt.Fprintf(w, "Couldn't retrieve files ", err)
        break
    }
    fmt.Fprintln(w, "Next Page: !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
    if len(myFilesList.Files) > 0 {
        for _, i := range myFilesList.Files {
            fmt.Fprintln(w, i.Name, " ", i.Id)
        }
    } else {
        fmt.Fprintln(w, "No files found.")
    }
}


myFile := drive.File{Name: "cats.png"}

Again, the Id will be generated for us if we do not provide any.

Putting the file into Google Drive is pretty easy now. We create a create request, referencing our file metadata in it:

myFile := drive.File{Name: "cats.png"}

driveService.Files.Create(&myFile)

and Do() it, saving the results:

createdFile, err := driveService.Files.Create(&myFile).Do()
if err != nil {
    fmt.Fprintf(w, "Couldn't create file ", err)
}

Ok, if you started this code, you would get a cats.png file. However, it’s empty. So let’s add some data to it!

To add it, we’ll first need some data. We can load it from a file:

createdFile, err := driveService.Files.Create(&myFile).Do()
if err != nil {
    fmt.Fprintf(w, "Couldn't create file ", err)
}
myImage, err := os.Open("/tmp/image.png")
if err != nil {
    fmt.Fprintln(w, err)
}

Now we have to create the updated file metadata:

myImage, err := os.Open("/tmp/image.png")
if err != nil {
    fmt.Fprintln(w, err)
}
updatedFile := drive.File{Name: "catsUpdated.png"}

We have to construct an update request:

driveService.Files.Update(createdFile.Id, &updatedFile)

Here we specify the Id of the file to modify, and the new metadata. We’ll now add the data to the update request, and Do() it, checking for errors and ignoring the result.

_, err = driveService.Files.Update(createdFile.Id, &updatedFile).Media(myImage).Do()
if err != nil {
    fmt.Fprintln(w, err)
}
fmt.Fprintln(w, createdFile.Id)

In the end we send the new file id to the user.

Hint

You could add the Media option already to the create request if you wanted.

Conclusion

I suppose this was a good short introduction into the structure of the Google API. After learning these, you should have few to no problems using the other API’s.

Have fun integrating it into your app!

Practical Golang: Using websockets

Introduction

This is the first post in the practical Golang series. Posts in it are meant to provide short and informative introductions to various topics.

This one is a about websockets, which are an awesome and easy way to provide communication between your web app and server.

Here we will use the gorilla websocket library, but you could also use a few others.

We will create two basic apps which should cover most day to day usage:
1. A client subscribing to a server to get continues information.
2. A client-ping server-pong app.

Dependencies

Make sure to go get:

go get github.com/gorilla/websocket

What’s the theory?

  1. The client connects to your server using his web browser.
  2. He gets back the website.
  3. He connects to your server through his websocket client using javascript.
  4. Your server accepts it using a standard http handler.
  5. You create a websocket connection from the http connection.
  6. You communicate with the client.
  7. The connection gets closed by one of the sides.

Creating the subscription app

Preparations

Next to our main go file we will need a html folder to place our html file in.

Let’s name it creatively, like index.html

Now the contents:

<!DOCTYPE HTML>
<html>
<head>

    <script type="text/javascript">
         function myWebsocketStart()
         {
               var ws = new WebSocket("ws://localhost:3000/websocket");

               ws.onmessage = function (evt)
               {
                  var myTextArea = document.getElementById("textarea1");
                  myTextArea.value = myTextArea.value + "\n" + evt.data
               };

               ws.onclose = function()
               {
                  var myTextArea = document.getElementById("textarea1");
                  myTextArea.value = myTextArea.value + "\n" + "Connection closed";
               };

         }

    </script>

</head>
<body>
<button onclick="javascript:myWebsocketStart()">Subscribe</button>
<textarea id="textarea1">MyTextArea</textarea>
</body>
</html>

I’ll just go over it quickly as the main subject here is the go code.

We create a button and a textarea, after the user clicks the button he connects to our websocket. Whenever he receives a message, or the connection gets closed, we append the info to our textarea.

We will also need our, so creatively named, main.go file, with the basic structure and file server written:

package main

import (
    "github.com/gorilla/websocket"
    "net/http"
    "os"
    "fmt"
    "io/ioutil"
    "time"
    "encoding/json"
)

func main() {
    indexFile, err := os.Open("html/index.html")
    if err != nil {
        fmt.Println(err)
    }
    index, err := ioutil.ReadAll(indexFile)
    if err != nil {
        fmt.Println(err)
    }
    http.HandleFunc("/websocket", func(w http.ResponseWriter, r *http.Request) {
    })
    http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
        fmt.Fprintf(w, string(index))
    })
    http.ListenAndServe(":3000", nil)
}

Awesome, now let’s create the websocket part.

Writing the websocket code

Little bit of planning

Our server will create a Person object containing a name and age in seconds. Every two seconds it will send the client the current state of the person.

The meat

First we’ll need to define our Person type:

type Person struct {
    Name string
    Age  int
}

We’ll also need to create an upgraded variable, in which we define our read and write buffer sizes.

var upgrader = websocket.Upgrader{
    ReadBufferSize: 1024,
    WriteBufferSize: 1024,
}

Now, how do we create the websocket connection? Pretty easily in fact:

http.HandleFunc("/websocket", func(w http.ResponseWriter, r *http.Request) {
  conn, err := upgrader.Upgrade(w, r, nil)
  if err != nil {
    fmt.Println(err)
    return
  }
  fmt.Println("Client subscribed")
}

That’s all we need to have a client. Now let’s create Bill, our person, right after we get the client subscribed:

fmt.Println("Client subscribed")

myPerson := Person{
  Name: "Bill",
  Age:  0,
}

Now we need the main websocket handling code, which we will wrap into an endless for loop, which we get out of only if the channel closes or Bill gets 40 seconds old.

for {
  time.Sleep(2 * time.Second)
  if myPerson.Age < 40 {
    myJson, err := json.Marshal(myPerson)
    if err != nil {
      fmt.Println(err)
      return
    }
    err = conn.WriteMessage(websocket.TextMessage, myJson)
    if err != nil {
      fmt.Println(err)
      break
    }
    myPerson.Age += 2
  } else {
    conn.Close()
    break
  }
}
fmt.Println("Client unsubscribed")

We send the messages using conn.WriteMessage in which we specify the message type, can be binary or text, and the content. If Bill is 40 years old or more, we break out of the loop. So far so good, but what if we want bidirectional communication?

Creating the ping-pong app

Preparations

As before, we will need a html folder for our html file with the creative name index.html

And here’s the code:

<!DOCTYPE HTML>
<html>
<head>

    <script type="text/javascript">
         function myWebsocketStart()
         {
               var ws = new WebSocket("ws://localhost:3000/websocket");

               ws.onopen = function()
               {
                  // Web Socket is connected, send data using send()
                  ws.send("ping");
                  var myTextArea = document.getElementById("textarea1");
                  myTextArea.value = myTextArea.value + "\n" + "First message sent";
               };

               ws.onmessage = function (evt)
               {
                  var myTextArea = document.getElementById("textarea1");
                  myTextArea.value = myTextArea.value + "\n" + evt.data
                  if(evt.data == "pong") {
                    setTimeout(function(){ws.send("ping");}, 2000);
                  }
               };

               ws.onclose = function()
               {
                  var myTextArea = document.getElementById("textarea1");
                  myTextArea.value = myTextArea.value + "\n" + "Connection closed";
               };

         }

    </script>

</head>
<body>
<button onclick="javascript:myWebsocketStart()">Start websocket!</button>
<textarea id="textarea1">MyTextArea</textarea>
</body>
</html>

The only differences are, that when we open the connection, we send a “ping” message and notify our user about it. Now, whenever we get back a “pong” message, we append it to our textarea and after 2 seconds we answer with a “ping” message again.

We will again need the basic go file structure with the upgrader defined already, and the connection created:

package main

import (
    "github.com/gorilla/websocket"
    "net/http"
    "os"
    "fmt"
    "io/ioutil"
    "time"
)

var upgrader = websocket.Upgrader{
    ReadBufferSize: 1024,
    WriteBufferSize: 1024,
}

func main() {
    indexFile, err := os.Open("html/index.html")
    if err != nil {
        fmt.Println(err)
    }
    index, err := ioutil.ReadAll(indexFile)
    if err != nil {
        fmt.Println(err)
    }
    http.HandleFunc("/websocket", func(w http.ResponseWriter, r *http.Request) {
        conn, err := upgrader.Upgrade(w, r, nil)
        if err != nil {
            fmt.Println(err)
            return
        }
    })
    http.HandleFunc("/", func(w http.ResponseWriter, r * http.Request) {
        fmt.Fprintf(w, string(index))
    })
    http.ListenAndServe(":3000", nil)
}

Writing the websocket code

Ok, so now, whenever we get a “ping” message, we wait 2 seconds and answer with a “pong” message. If we get anything else, we just close the connection.

conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
  fmt.Println(err)
  return
}
for {
  msgType, msg, err := conn.ReadMessage()
  if err != nil {
    fmt.Println(err)
    return
  }
  if string(msg) == "ping" {
    fmt.Println("ping")
    time.Sleep(2 * time.Second)
    err = conn.WriteMessage(msgType, []byte("pong"))
    if err != nil {
      fmt.Println(err)
      return
    }
  } else {
    conn.Close()
    fmt.Println(string(msg))
    return
  }
}

Using the ReadMessage function on our connection we get the type, content, and maybe error. We check the message and answer.

Conclusion

That’s actually all, have fun with it and build something great!

Getting started with Apache Spark and Scala on Windows

Introduction

This tutorial is intended for people who really need to run Apache Spark on windows. Usually it would be better to run it in a Linux VM or on Docker.

There are a few things that cause problems with Spark on windows. But using this way of installation I managed to minimize the impact.

Getting the needed files

First, download the spark-1.6.0-bin-hadoop2.6.tgz file from the Apache Spark Downloads website. You should also install a program to open it, for example 7zip.

Next, download hadoop2.6.0 for Windows

This is all we will need.

Putting everything together

Extract both files to their own folders.

Open the environment variables settings. Add your Spark bin folder to your PATH variable and create a new SPARK_HOME environment variable and set it to your Apache Spark base folder.

Next, create another environment variable named HADOOP_HOME and set it to your hadoop base folder.

Preparing the configuration files

Open the conf folder in your Apache Spark base location. There should be a file called log4j.properties.template, change the name to log4j.properties.

For the sake of convenience you can open the file and change the value of rootCategory from INFO to WARN.

Launching it

After trying different ways, this one seems to be the least error-prone:

To launch the master open the command line and type in:

spark-class.cmd org.apache.spark.deploy.master.Master

Next, open your browser, and navigate to: http://localhost:8080/

There you should see the address of your spark master, like spark://192.168.1.27:7077

Open another command line, and start one worker using:

spark-class.cmd org.apache.spark.deploy.worker.Worker masteraddress

That’s it, spark is running.

To run the spark shell, use:

spark-shell –master masteraddress

To submit a packaged application jar, use:

spark-submit –class mainclassname –master masteraddress packagedjarlocation

Have fun with it!