Practical Golang: Building a simple, distributed one-value database with Hashicorp Serf

Introduction

With the advent of distributed applications, we see new storage solutions emerging constantly.
They include, but are not limited to, Cassandra, Redis, CockroachDB, Consul or RethinkDB.
Most of you probably use one, or more, of them.

They seem to be really complex systems, because they actually are. This can’t be denied.
But it’s pretty easy to write a simple, one value database, featuring high availability.
You probably wouldn’t use anything near this in production, but it should be a fruitful learning experience for you nevertheless.
If you’re interested, read on!

Dependencies

You’ll need to

go get github.com/hashicorp/serf/serf

as a key dependency.

We’ll also use those for convenience’s sake:

"github.com/gorilla/mux"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"

Small overview

What will we build? We’ll build a one-value clustered database. Which means, numerous instances of our application will be able to work together.
You’ll be able to set or get the value using a REST interface. The value will then shortly be spread across the cluster using the Gossip protocol.
Which means, every node tells a part of the cluster about the current state of the variable in set intervals. But because later each of those also tells a part of the cluster about the state, the whole cluster ends up having been informed shortly.

It’ll use Serf for easy cluster membership, which uses SWIM under the hood. SWIM is a more advanced Gossip-like algorithm, which you can read on about here.

Now let’s get to the implementation…

Getting started

First, we’ll of course have to put in all our imports:

import (
    "context"
    "fmt"
    "log"
    "math/rand"
    "net/http"
    "os"
    "strconv"
    "sync"
    "time"

    "github.com/gorilla/mux"
    "github.com/hashicorp/serf/serf"
    "github.com/pkg/errors"
    "golang.org/x/sync/errgroup"
)

Following this, it’s time to write a simple thread-safe, one-value store.
An important thing is, the database will also hold the generation of the variable. This way, when one instance gets notified about a new value, it can check if the incoming notification actually has a higher generation count. Only then, will it change the current local value.
So our database structure will hold exactly this: the number, generation and a mutex.

type oneAndOnlyNumber struct {
    num        int
    generation int
    numMutex   sync.RWMutex
}

func InitTheNumber(val int) *oneAndOnlyNumber {
    return &oneAndOnlyNumber{
        num: val,
    }
}

We’ll also need a way to set and get the value.
Setting the value will also advance the generation count, so when we notify the rest of this cluster, we will overwrite their values and generation counts.

func (n *oneAndOnlyNumber) setValue(newVal int) {
    n.numMutex.Lock()
    defer n.numMutex.Unlock()
    n.num = newVal
    n.generation = n.generation + 1
}

func (n *oneAndOnlyNumber) getValue() (int, int) {
    n.numMutex.RLock()
    defer n.numMutex.RUnlock()
    return n.num, n.generation
}

Finally, we will need a way to notify the database of changes that happened elsewhere, if they have a higher generation count.
For that we’ll have a small notify method, which will return true, if anything has been changed:

func (n *oneAndOnlyNumber) notifyValue(curVal int, curGeneration int) bool {
    if curGeneration > n.generation {
        n.numMutex.Lock()
        defer n.numMutex.Unlock()
        n.generation = curGeneration
        n.num = curVal
        return true
    }
    return false
}

We’ll also create a const describing how many nodes we will notify about the new value every time.

const MembersToNotify = 2

Now let’s get to the actual functioning of the application. First we’ll have to start an instance of serf, using two variables. The address of our instance in the network and the -optional- address of the cluster to join.

func main() {
    cluster, err := setupCluster(
        os.Getenv("ADVERTISE_ADDR"),
        os.Getenv("CLUSTER_ADDR"))
    if err != nil {
        log.Fatal(err)
    }
    defer cluster.Leave()

How does the setupCluster function work, you may ask? Here it is:

func setupCluster(advertiseAddr string, clusterAddr string) (*serf.Serf, error) {
    conf := serf.DefaultConfig()
    conf.Init()
    conf.MemberlistConfig.AdvertiseAddr = advertiseAddr

    cluster, err := serf.Create(conf)
    if err != nil {
        return nil, errors.Wrap(err, "Couldn't create cluster")
    }

    _, err = cluster.Join([]string{clusterAddr}, true)
    if err != nil {
        log.Printf("Couldn't join cluster, starting own: %v\n", err)
    }

    return cluster, nil
}

As we can see, we are creating the cluster, only changing the advertise address.

If the creation fails, we of course return the error.
If the joining fails though, it means that we either didn’t get a cluster address,
or the cluster doesn’t exist (omitting network failures), which means we can safely ignore that and just log it.

To continue with, we initialize the database and the REST API:
(I’ve really chosen the number at random… really!)

    theOneAndOnlyNumber := InitTheNumber(42)
    launchHTTPAPI(theOneAndOnlyNumber)

And this is what the API creation looks like:

func launchHTTPAPI(db *oneAndOnlyNumber) {
    go func() {
        m := mux.NewRouter()

We first asynchronously start our server. Then we declare our getter:

        m.HandleFunc("/get", func(w http.ResponseWriter, r *http.Request) {
            val, _ := db.getValue()
            fmt.Fprintf(w, "%v", val)
        })

our setter:

m.HandleFunc("/set/{newVal}", func(w http.ResponseWriter, r *http.Request) {
            vars := mux.Vars(r)
            newVal, err := strconv.Atoi(vars["newVal"])
            if err != nil {
                w.WriteHeader(http.StatusBadRequest)
                fmt.Fprintf(w, "%v", err)
                return
            }

            db.setValue(newVal)

            fmt.Fprintf(w, "%v", newVal)
        })

and finally the API endpoint which allows other nodes to notify this instance of changes:

        m.HandleFunc("/notify/{curVal}/{curGeneration}", func(w http.ResponseWriter, r *http.Request) {
            vars := mux.Vars(r)
            curVal, err := strconv.Atoi(vars["curVal"])
            if err != nil {
                w.WriteHeader(http.StatusBadRequest)
                fmt.Fprintf(w, "%v", err)
                return
            }
            curGeneration, err := strconv.Atoi(vars["curGeneration"])
            if err != nil {
                w.WriteHeader(http.StatusBadRequest)
                fmt.Fprintf(w, "%v", err)
                return
            }

            if changed := db.notifyValue(curVal, curGeneration); changed {
                log.Printf(
                    "NewVal: %v Gen: %v Notifier: %v",
                    curVal,
                    curGeneration,
                    r.URL.Query().Get("notifier"))
            }
            w.WriteHeader(http.StatusOK)
        })
        log.Fatal(http.ListenAndServe(":8080", m))
    }()
}

It’s also here where we start our server and print some debug info when getting notified of new values by other members of our cluster.


Great, we’ve got a way to talk to our service now. Time to make it actually spread all the information.
We’ll also be printing debug info regularly.

To begin with, let’s initiate our context (that’s always a good idea in the main function).
We’ll also put a value into it, the name of our host, just for the debug logs.
It’s a good thing to put into the context, as it’s not something crucial for the functioning of our program,
and the context will get passed further anyways.

    ctx := context.Background()
    if name, err := os.Hostname(); err == nil {
        ctx = context.WithValue(ctx, "name", name)
    }

Having done this, we can set up our main loop, including the intervals at which we’ll be sending state updates to peers and printing debug info.

    debugDataPrinterTicker := time.Tick(time.Second * 5)
    numberBroadcastTicker := time.Tick(time.Second * 2)
    for {
        select {
        case <-numberBroadcastTicker:
        // Notification code goes here...
        case <-debugDataPrinterTicker:
            log.Printf("Members: %v\n", cluster.Members())

            curVal, curGen := theOneAndOnlyNumber.getValue()
            log.Printf("State: Val: %v Gen: %v\n", curVal, curGen)
        }
    }

Ok, that seems to be it.

Just kidding. Time to finish up our service with the notification code.
We’ll now get a list of other members in the cluster, set a timeout, and asynchronously notify a part of those others.

        case <-numberBroadcastTicker:
            members := getOtherMembers(cluster)

            ctx, _ := context.WithTimeout(ctx, time.Second*2)
            go notifyOthers(ctx, members, theOneAndOnlyNumber)

Now, let’s look at the getOtherMembers function. It’s actually just a function scanning through the memberlist, deleting ourselves and other nodes that aren’t alive at the moment.

func getOtherMembers(cluster *serf.Serf) []serf.Member {
    members := cluster.Members()
    for i := 0; i < len(members); {
        if members[i].Name == cluster.LocalMember().Name || members[i].Status != serf.StatusAlive {
            if i < len(members)-1 {
                members = append(members[:i], members[i + 1:]...)
            } else {
                members = members[:i]
            }
        } else {
            i++
        }
    }
    return members
}

There’s not much to it I suppose. It’s using slicing to cut out or cut off members not conforming to our predicates.

Finally the function we use to notify others:

func notifyOthers(ctx context.Context, otherMembers []serf.Member, db *oneAndOnlyNumber) {
    g, ctx := errgroup.WithContext(ctx)

    if len(otherMembers) <= MembersToNotify {
        for _, member := range otherMembers {
            curMember := member
            g.Go(func() error {
                return notifyMember(ctx, curMember.Addr.String(), db)
            })
        }
    } else {
        randIndex := rand.Int() % len(otherMembers)
        for i := 0; i < MembersToNotify; i++ {
            g.Go(func() error {
                return notifyMember(
                    ctx,
                    otherMembers[(randIndex + i) % len(otherMembers)].Addr.String(),
                    db)
            })
        }
    }

    err := g.Wait()
    if err != nil {
        log.Printf("Error when notifying other members: %v", err)
    }
}

If there are only two members then it sends the notifications to them, otherwise it chooses a random index in the members array and chooses subsequent members from there on.
How does the errgroup work? It’s a nifty library Brian Ketelsen wrote a great article about. It’s basically a wait group which also gathers errors and aborts when one happens.

Now to finish our code, the notifyMember function:

func notifyMember(ctx context.Context, addr string, db *oneAndOnlyNumber) error {
    val, gen := db.getValue()
    req, err := http.NewRequest("POST", fmt.Sprintf("http://%v:8080/notify/%v/%v?notifier=%v", addr, val, gen, ctx.Value("name")), nil)
    if err != nil {
        return errors.Wrap(err, "Couldn't create request")
    }
    req = req.WithContext(ctx)

    _, err = http.DefaultClient.Do(req)
    if err != nil {
        return errors.Wrap(err, "Couldn't make request")
    }
    return nil
}

We craft a path with the formula {nodeAddress}:8080/notify/{curVal}/{curGen}?notifier={selfHostName}
We add the context to the request, so we get the timeout functionality, and finally make the request.

And that’s actually all there is to the code.

Testing our database

We’ll test our database using docker. The necessary dockerfile to put into your project directory looks like this:

FROM alpine
WORKDIR /app
COPY distApp /app/
ENTRYPOINT ["./distApp"]

Now, first build your application (if you’re not on linux, you have to set the env variables GOOS=linux and GOARCH=amd64)
Later build the docker image:

docker build -t distapp .

And finally we can launch it. To supply the necessary environment variables, we’ll need to know what ip address the containers will get.
First run:

docker network inspect bridge

Bridge is the default network containers get assigned to. You should get something like this:

[
    {
        "Name": "bridge",
        "Id": "b56a19697ed9d30488f189d5517fd79f04a4df70c8bbc07d8f3c49a491f10433",
        "Created": "2017-01-29T10:48:05.1592086Z",
        "Scope": "local",
        "Driver": "bridge",
        "EnableIPv6": false,
        "IPAM": {
            "Driver": "default",
            "Options": null,
            "Config": [
                {
                    "Subnet": "172.17.0.0/16",
                    "Gateway": "172.17.0.1" <-- this is what we need
                }
            ]
        },
        "Internal": false,
        "Attachable": false,
        "Containers": {},
        "Options": {
            "com.docker.network.bridge.default_bridge": "true",
            "com.docker.network.bridge.enable_icc": "true",
            "com.docker.network.bridge.enable_ip_masquerade": "true",
            "com.docker.network.bridge.host_binding_ipv4": "0.0.0.0",
            "com.docker.network.bridge.name": "docker0",
            "com.docker.network.driver.mtu": "1500"
        },
        "Labels": {}
    }
]

What’s important for us is the gateway. In this case, our containers would be spawned with IP addresses from 172.17.0.2

So now we can start a few containers:

docker run -e ADVERTISE_ADDR=172.17.0.2 -p 8080:8080 distapp
docker run -e ADVERTISE_ADDR=172.17.0.3 -e CLUSTER_ADDR=172.17.0.2 -p 8081:8080 distapp
docker run -e ADVERTISE_ADDR=172.17.0.4 -e CLUSTER_ADDR=172.17.0.3 -p 8082:8080 distapp
docker run -e ADVERTISE_ADDR=172.17.0.5 -e CLUSTER_ADDR=172.17.0.4 -p 8083:8080 distapp

Next on you can test your deployment by stopping and starting containers, and setting/getting the variables at:

localhost:8080/set/5
localhost:8082/get/5
etc...

Conclusion

What’s important, this is a really basic distributed system, it may become inconsistent (if you update the value on two different machines simultaneously, the cluster will have two values depending on the machine).
If you want to learn more, read about CAP, consensus, Paxos, RAFT, gossip, and data replication, they are all very interesting topics (at least in my opinion).

Anyways, I hope you had fun creating a small distributed system and encourage you to build your own, more advanced one, it’ll be a great learning experience for sure!

The whole code is available on my Github.

Practical Golang: Getting started with NATS and related patterns

Practical Golang: Getting started with NATS and related patterns

Introduction

Microservices… the never disappearing buzzword of our times. They promise a lot, but can be slow or complicated if not implemented correctly. One of the main challenges when developing and using a microservice-based architecture is getting the communication right. Many will ask, why not REST? As I did at some point. Many will actually use it. But the truth is that it leads to tighter coupling, and is synchronous. Microservice architectures are meant to be asynchronous. Also, REST is blocking, which also isn’t good on many occasions.

What are we meant to use for communication? Usually we use:
– RPC – Remote Procedure Call
– Message BUS/Broker

In this article I’ll write about one specific Message BUS called NATS and using it in Go.

There are also other message BUS’ses/Brokers. Some popular ones are Kafka and RabbitMQ.

Why NATS? It’s simple, and astonishingly fast.

Setting up NATS

To use NATS you can do one of the following things:
1. Use the NATS Docker image
2. Get the binaries
3. Use the public NATS server nats://demo.nats.io:4222
4. Build from source

Also, remember to

go get https://github.com/nats-io/nats

the official Go library.

Getting started

In this article we’ll be using protobuffs a lot. So if you want to know more about them, check out my previous article about protobuffs.

First, let’s write one of the key usages of microservices. A fronted, that lists information from other micrservices, but doesn’t care if one of them is down. It will respond to the user anyways. This makes microservices swappable live, one at a time.

In each of our services we’ll need to connect to NATS:

package main

import (
    "github.com/nats-io/nats"
    "fmt"
)

var nc *nats.Conn

func main() {

    if len(os.Args) != 2 {
        fmt.Println("Wrong number of arguments. Need NATS server address.")
        return
    }
    var err error

    nc, err = nats.Connect(os.Args[1])
    if err != nil {
        fmt.Println(err)
    }
}

Now, let’s write the first provider service. It will receive a User Id, and answer with a user name For which we’ll need a transport structure to send its data over NATS. I wrote this short proto file for that:

syntax = "proto3";
package Transport;

message User {
        string id = 1;
        string name = 2;
}

Now we will create the map containing our user names:

var users map[string]string
var nc *nats.Conn

func main() {

    if len(os.Args) != 2 {
        fmt.Println("Wrong number of arguments. Need NATS server address.")
        return
    }
    var err error

    nc, err = nats.Connect(os.Args[1])
    if err != nil {
        fmt.Println(err)
    }

    users = make(map[string]string)
    users["1"] = "Bob"
    users["2"] = "John"
    users["3"] = "Dan"
    users["4"] = "Kate"
}

and finally the part that’s most interesting to us. Subscribing to the topic:

users["4"] = "Kate"

nc.QueueSubscribe("UserNameById", "userNameByIdProviders", replyWithUserId)

Notice that it’s a QueueSubscribe. Which means that if we start 10 instances of this service in the userNameByIdProviders group , only one will get each message sent over UserNameById. Another thing to note is that this function call is asynchronous, so we need to block somehow. This select {} will provide an endless block:

nc.QueueSubscribe("UserNameById", "userNameByIdProviders", replyWithUserId)
select {}
}

Ok, now to the replyWithUserId function:

func replyWithUserId(m *nats.Msg) {
}

Notice that it takes one argument, a pointer to the message.

We’ll unmarshal the data:

func replyWithUserId(m *nats.Msg) {

    myUser := Transport.User{}
    err := proto.Unmarshal(m.Data, &myUser)
    if err != nil {
        fmt.Println(err)
        return
}

get the name and marshal back:

myUser.Name = users[myUser.Id]
data, err := proto.Marshal(&myUser)
if err != nil {
    fmt.Println(err)
    return
}

And, as this shall be a request we’re handling, we respond to the Reply topic, a topic created by the caller exactly for this purpose:

if err != nil {
    fmt.Println(err)
    return
}
fmt.Println("Replying to ", m.Reply)
nc.Publish(m.Reply, data)

}

Ok, now let’s get to the second service. Our time provider service, first the same basic structure:

package main

import (
    "github.com/nats-io/nats"
    "fmt"
    "github.com/cube2222/Blog/NATS/FrontendBackend"
    "github.com/golang/protobuf/proto"
    "os"
    "sync"
    "time"
)

// We use globals because it's a small application demonstrating NATS.

var nc *nats.Conn

func main() {

    if len(os.Args) != 2 {
        fmt.Println("Wrong number of arguments. Need NATS server address.")
        return
    }
    var err error

    nc, err = nats.Connect(os.Args[1])
    if err != nil {
        fmt.Println(err)
    }

    nc.QueueSubscribe("TimeTeller", "TimeTellers", replyWithTime)
    select {} // Block forever
}

This time we’re not getting any data from the caller, so we just marshal our time into this proto structure:

syntax = "proto3";
package Transport;

message Time {
        string time = 1;
}

and send it back:

func replyWithTime(m *nats.Msg) {
    curTime := Transport.Time{time.Now().Format(time.RFC3339)}

    data, err := proto.Marshal(&curTime)
    if err != nil {
        fmt.Println(err)
        return
    }
    fmt.Println("Replying to ", m.Reply)
    nc.Publish(m.Reply, data)

}

We can now get to our frontend, which will use both those services. First the standard basic structure:

package main

import (
    "net/http"
    "github.com/gorilla/mux"
    "github.com/cube2222/Blog/NATS/FrontendBackend"
    "github.com/golang/protobuf/proto"
    "fmt"
    "github.com/nats-io/nats"
    "time"
    "os"
    "sync"
)

var nc *nats.Conn

func main() {
    if len(os.Args) != 2 {
        fmt.Println("Wrong number of arguments. Need NATS server address.")
        return
    }
    var err error

    nc, err = nats.Connect(os.Args[1])
    if err != nil {
        fmt.Println(err)
    }

    m := mux.NewRouter()
    m.HandleFunc("/{id}", handleUserWithTime)

    http.ListenAndServe(":3000", m)
}

That’s a pretty standard web server, now to the interesting bits, the handleUserWithTime function, which will respond with the user name and time:

func handleUserWithTime(w http.ResponseWriter, r *http.Request) {
    vars := mux.Vars(r)
    myUser := Transport.User{Id: vars["id"]}
    curTime := Transport.Time{}
    wg := sync.WaitGroup{}
    wg.Add(2)
}

We’ve parsed the request arguments and started a WaitGroup with the value two, as we will do one asynchronous request for each of our services. First we’ll marshal the user struct:

go func() {
    data, err := proto.Marshal(&myUser)
    if err != nil || len(myUser.Id) == 0 {
        fmt.Println(err)
        w.WriteHeader(500)
        fmt.Println("Problem with parsing the user Id.")
        return
    }

and, then we make a request. Sending the user data, and waiting at most 100 ms for the response:

fmt.Println("Problem with parsing the user Id.")
return
}

msg, err := nc.Request("UserNameById", data, 100 * time.Millisecond)

now we can check if any error happend, or the response is empty and finish this thread:

msg, err := nc.Request("UserNameById", data, 100 * time.Millisecond)
if err == nil && msg != nil {
    myUserWithName := Transport.User{}
    err := proto.Unmarshal(msg.Data, &myUserWithName)
    if err == nil {
        myUser = myUserWithName
    }
}
wg.Done()
}()

Next we’ll do the request to the Time Tellers.
We again make a request, but its body is nil, as we don’t need to pass any data:

go func() {
    msg, err := nc.Request("TimeTeller", nil, 100*time.Millisecond)
    if err == nil && msg != nil {
        receivedTime := Transport.Time{}
        err := proto.Unmarshal(msg.Data, &receivedTime)
        if err == nil {
            curTime = receivedTime
        }
    }
    wg.Done()
}()

After both requests finished (or failed) we can just respond to the user:

wg.Wait()

fmt.Fprintln(w, "Hello ", myUser.Name, " with id ", myUser.Id, ", the time is ", curTime.Time, ".")
}

Now if you actually test it, you’ll notice that if one of the provider services isn’t active, the frontend will respond anyways, putting a zero’ed value in place of the non-available resource. You could also make a template that shows an error in that place.

Ok, that was already an interesting architecture. Now we can implement…

The Master-Slave pattern

This is such a popular pattern, especially in Go, that we really should know how to implement it. The workers will do simple operations on a text file (count the usage amounts of each word in a comma-separated list).

Now you could think that the Master, should send the files to the Workers over NATS. Wrong. This would lead to a huge slowdown of NATS (at least for bigger files). That’s why the Master will send the files to a file server over a REST API, and the Workers will get it from there. We’ll also learn how to do service discovery over NATS.

First, the File Server. I won’t really go through the file handling part, as it’s a simple get/post API.I will however, go over the service discovery part.

package main

import (
    "net/http"
    "github.com/gorilla/mux"
    "os"
    "io"
    "fmt"
    "github.com/nats-io/nats"
    "github.com/cube2222/Blog/NATS/MasterWorker"
    "github.com/golang/protobuf/proto"
)

func main() {

    if len(os.Args) != 2 {
        fmt.Println("Wrong number of arguments. Need NATS server address.")
        return
    }

    m := mux.NewRouter()

    m.HandleFunc("/{name}", func(w http.ResponseWriter, r *http.Request) {
        vars := mux.Vars(r)
        file, err := os.Open("/tmp/" + vars["name"])
        defer file.Close()
        if err != nil {
            w.WriteHeader(404)
        }
        if file != nil {
            _, err := io.Copy(w, file)
            if err != nil {
                w.WriteHeader(500)
            }
        }
    }).Methods("GET")

    m.HandleFunc("/{name}", func(w http.ResponseWriter, r *http.Request) {
        vars := mux.Vars(r)
        file, err := os.Create("/tmp/" + vars["name"])
        defer file.Close()
        if err != nil {
            w.WriteHeader(500)
        }
        if file != nil {
            _, err := io.Copy(file, r.Body)
            if err != nil {
                w.WriteHeader(500)
            }
        }
    }).Methods("POST")

    RunServiceDiscoverable()

    http.ListenAndServe(":3000", m)
}

Now, what does the RunServiceDiscoverable function do? It connects to the NATS server and responds with its own http address to incoming requests.

func RunServiceDiscoverable() {
    nc, err := nats.Connect(os.Args[1])
    if err != nil {
        fmt.Println("Can't connect to NATS. Service is not discoverable.")
    }
    nc.Subscribe("Discovery.FileServer", func(m *nats.Msg) {
        serviceAddressTransport := Transport.DiscoverableServiceTransport{"http://localhost:3000"}
        data, err := proto.Marshal(&serviceAddressTransport)
        if err == nil {
            nc.Publish(m.Reply, data)
        }
    })
}

The proto file looks like this:

syntax = "proto3";
package Transport;

message DiscoverableServiceTransport {
        string Address = 1;
}

We can now go on with the Master.

The protofile for the Task structure is:

syntax = "proto3";
package Transport;

message Task {
        string uuid = 1;
        string finisheduuid = 2;
        int32 state = 3; // 0 - not started, 1 - in progress, 2 - finished
        int32 id = 4;
}

Our Master will hold a list of tasks with the respecting UUID (at the same time the name of the file), id (the position in the master Tasks slice), and a pointer which holds the position of the last not finished Task, which will get updated on new Task retrieval. It’s pretty similar to the Task storage in my Microservice Architecture series

I’m using github.com/satori/go.uuid for UUID generation.

First, as usual, the basic structure:

package main

import (
    "github.com/satori/go.uuid"
    "github.com/cube2222/Blog/NATS/MasterWorker"
    "os"
    "fmt"
    "github.com/nats-io/nats"
    "github.com/golang/protobuf/proto"
    "time"
    "bytes"
    "net/http"
    "sync"
)

var Tasks []Transport.Task
var TaskMutex sync.Mutex
var oldestFinishedTaskPointer int
var nc *nats.Conn


func main() {
    if len(os.Args) != 2 {
        fmt.Println("Wrong number of arguments. Need NATS server address.")
        return
    }

    var err error

    nc, err = nats.Connect(os.Args[1])
    if err != nil {
        fmt.Println(err)
    }

    Tasks = make([]Transport.Task, 0, 20)
    TaskMutex = sync.Mutex{}
    oldestFinishedTaskPointer = 0

    initTestTasks()

    wg := sync.WaitGroup{}

    nc.Subscribe("Work.TaskToDo", func (m *nats.Msg) {
    })

    nc.Subscribe("Work.TaskFinished", func (m *nats.Msg) {
    })

    select {} // Block forever
}

Ok, we’ve also already set up the Subscriptions

How does the initTestTasks function work? It’s interesting because it gets the file server address over NATS.

So, we want to create 20 test Tasks, so we run the loop 20 times:

func initTestTasks() {
    for i := 0; i < 20; i++ {
    }
}

We create a new Task and ask the File Server for its address:

for i := 0; i < 20; i++ {
    newTask := Transport.Task{Uuid: uuid.NewV4().String(), State: 0}
    fileServerAddressTransport := Transport.DiscoverableServiceTransport{}
    msg, err := nc.Request("Discovery.FileServer", nil, 1000 * time.Millisecond)
    if err == nil && msg != nil {
        err := proto.Unmarshal(msg.Data, &fileServerAddressTransport)
        if err != nil {
            continue
        }
    }
    if err != nil {
        continue
    }

    fileServerAddress := fileServerAddressTransport.Address
}

Next we finally make the post Request to the file server and add the Task to our Tasks list:

        fileServerAddress := fileServerAddressTransport.Address
        data := make([]byte, 0, 1024)
        buf := bytes.NewBuffer(data)
        fmt.Fprint(buf, "get,my,data,my,get,get,have")
        r, err := http.Post(fileServerAddress + "/" + newTask.Uuid, "", buf)
        if err != nil || r.StatusCode != http.StatusOK {
            continue
        }

        newTask.Id = int32(len(Tasks))
        Tasks = append(Tasks, newTask)
    }

How do we dispatch new Tasks to do? Simply like this:

nc.Subscribe("Work.TaskToDo", func (m *nats.Msg) {
    myTaskPointer, ok := getNextTask()
    if ok {
        data, err := proto.Marshal(myTaskPointer)
        if err == nil {
            nc.Publish(m.Reply, data)
        }
    }
})

How do we get the next Task? We just loop over the Task to find one that is not started. If tasks above our pointer are all finished, then we also move up the pointer. Remember the mutex as this function may be run in parallel:

func getNextTask() (*Transport.Task, bool) {
    TaskMutex.Lock()
    defer TaskMutex.Unlock()
    for i := oldestFinishedTaskPointer; i < len(Tasks); i++ {
        if i == oldestFinishedTaskPointer && Tasks[i].State == 2 {
            oldestFinishedTaskPointer++
        } else {
            if Tasks[i].State == 0 {
                Tasks[i].State = 1
                go resetTaskIfNotFinished(i)
                return &Tasks[i], true
            }
        }
    }
    return nil, false
}

We also called the resetTaskIfNotFinished function. It will reset the Task state if it’s still in progress after 2 minutes:

func resetTaskIfNotFinished(i int) {
    time.Sleep(2 * time.Minute)
    TaskMutex.Lock()
    if Tasks[i].State != 2 {
        Tasks[i].State = 0
    }
}

The TaskFinished subscription handler is much simpler, it just sets the Task to finished, and the UUID accordingly to the received protobuffer:

nc.Subscribe("Work.TaskFinished", func (m *nats.Msg) {
    myTask := Transport.Task{}
    err := proto.Unmarshal(m.Data, &myTask)
    if err == nil {
        TaskMutex.Lock()
        Tasks[myTask.Id].State = 2
        Tasks[myTask.Id].Finisheduuid = myTask.Finisheduuid
        TaskMutex.Unlock()
    }
})

That’s all in regards to the Master! We can now move on writing the Worker.

The basic structure:

package main

import (
    "os"
    "fmt"
    "github.com/nats-io/nats"
    "time"
    "github.com/cube2222/Blog/NATS/MasterWorker"
    "github.com/golang/protobuf/proto"
    "net/http"
    "bytes"
    "io/ioutil"
    "sort"
    "strings"
    "github.com/satori/go.uuid"
    "sync"
)

var nc *nats.Conn

func main() {
    if len(os.Args) != 2 {
        fmt.Println("Wrong number of arguments. Need NATS server address.")
        return
    }

    var err error

    nc, err = nats.Connect(os.Args[1])
    if err != nil {
        fmt.Println(err)
    }

    for i := 0; i < 8; i++ {
        go doWork()
    }

    select {} // Block forever
}

Now the main function doing something here is the doWork function. I’ll post it all at once with comments everywhere, as it’s a very long function and this will be the most convenient way to read it:

func doWork() {
    for {
        // We ask for a Task with a 1 second Timeout
        msg, err := nc.Request("Work.TaskToDo", nil, 1 * time.Second)
        if err != nil {
            fmt.Println("Something went wrong. Waiting 2 seconds before retrying:", err)
            continue
        }

        // We unmarshal the Task
        curTask := Transport.Task{}
        err = proto.Unmarshal(msg.Data, &curTask)
        if err != nil {
            fmt.Println("Something went wrong. Waiting 2 seconds before retrying:", err)
            continue
        }

        // We get the FileServer address
        msg, err = nc.Request("Discovery.FileServer", nil, 1000 * time.Millisecond)
        if err != nil {
            fmt.Println("Something went wrong. Waiting 2 seconds before retrying:", err)
            continue
        }

        fileServerAddressTransport := Transport.DiscoverableServiceTransport{}
        err = proto.Unmarshal(msg.Data, &fileServerAddressTransport)
        if err != nil {
            fmt.Println("Something went wrong. Waiting 2 seconds before retrying:", err)
            continue
        }

        // We get the file
        fileServerAddress := fileServerAddressTransport.Address
        r, err := http.Get(fileServerAddress + "/" + curTask.Uuid)
        if err != nil {
            fmt.Println("Something went wrong. Waiting 2 seconds before retrying:", err)
            continue
        }

        data, err := ioutil.ReadAll(r.Body)
        if err != nil {
            fmt.Println("Something went wrong. Waiting 2 seconds before retrying:", err)
            continue
        }

        // We split and count the words
        words := strings.Split(string(data), ",")
        sort.Strings(words)
        wordCounts := make(map[string]int)
        for i := 0; i < len(words); i++{
            wordCounts[words[i]] = wordCounts[words[i]] + 1
        }

        resultData := make([]byte, 0, 1024)
        buf := bytes.NewBuffer(resultData)

        // We print the results to a buffer
        for key, value := range wordCounts {
            fmt.Fprintln(buf, key, ":", value)
        }

        // We generate a new UUID for the finished file
        curTask.Finisheduuid = uuid.NewV4().String()
        r, err = http.Post(fileServerAddress + "/" + curTask.Finisheduuid, "", buf)
        if err != nil || r.StatusCode != http.StatusOK {
            fmt.Println("Something went wrong. Waiting 2 seconds before retrying:", err, ":", r.StatusCode)
            continue
        }

        // We marshal the current Task into a protobuffer
        data, err = proto.Marshal(&curTask)
        if err != nil {
            fmt.Println("Something went wrong. Waiting 2 seconds before retrying:", err)
            continue
        }

        // We notify the Master about finishing the Task
        nc.Publish("Work.TaskFinished", data)
    }
}

Awesome, our Master-Slave setup is ready, you can test it if you’d like. After you do, we can now check out the last architecture.

The Events pattern

Imagine you have servers which keep connections to clients over websockets. You want these clients to get live news updates. With this pattern you can. We’ll also learn about a few convenient NATS client abstractions. Like using a encoded connection, or using channels for sending/receiving.

The basic architecture as usual:

package main

import (
    "os"
    "fmt"
    "github.com/nats-io/nats"
    natsp "github.com/nats-io/nats/encoders/protobuf"
    "github.com/cube2222/Blog/NATS/EventSubs"
    "time"
)

func main() {
    if len(os.Args) != 2 {
        fmt.Println("Wrong number of arguments. Need NATS server address.")
        return
    }

    nc, err := nats.Connect(os.Args[1])
    if err != nil {
        fmt.Println(err)
    }
    ec, err := nats.NewEncodedConn(nc, natsp.PROTOBUF_ENCODER)
    defer ec.Close()
}

Wait… What’s that at the end!? It’s an encoded connection! It will automatically encode our structs into raw data. We’ll use the protobuf one, but there are a default one, a gob one and a json one too.

Here’s the protofile we’ll use:

syntax = "proto3";
package Transport;

message TextMessage {
        int32 id = 1;
        string body = 2;
}

Ok, how can we just publish simple event-structs? Totally intuitive, like that:

defer ec.Close()

for i := 0; i < 5; i++ {
    myMessage := Transport.TextMessage{Id: int32(i), Body: "Hello over standard!"}

    err := ec.Publish("Messaging.Text.Standard", &myMessage)
    if err != nil {
        fmt.Println(err)
    }
}

It’s a little bit counter intuitive with Requests. As the signature differs, it follows like this:

err := ec.Request(topic, *body, *response, timeout)

So our request sending part will look like this:

for i := 5; i < 10; i++ {
    myMessage := Transport.TextMessage{Id: int32(i), Body: "Hello, please respond!"}

    res := Transport.TextMessage{}
    err := ec.Request("Messaging.Text.Respond", &myMessage, &res, 200 * time.Millisecond)
    if err != nil {
        fmt.Println(err)
    }

    fmt.Println(res.Body, " with id ", res.Id)

}

The last thing we can do is sending them via Channels, which is relatively the simplest:

sendChannel := make(chan *Transport.TextMessage)
ec.BindSendChan("Messaging.Text.Channel", sendChannel)
for i := 10; i < 15; i++ {
    myMessage := Transport.TextMessage{Id: int32(i), Body: "Hello over channel!"}

    sendChannel <- &myMessage
}

Now we can write the receiving end. First the same structure as before:

package main

import (
    "github.com/nats-io/nats"
    natsp "github.com/nats-io/nats/encoders/protobuf"
    "os"
    "fmt"
    "github.com/cube2222/Blog/NATS/EventSubs"
)

func main() {
    if len(os.Args) != 2 {
        fmt.Println("Wrong number of arguments. Need NATS server address.")
        return
    }

    nc, err := nats.Connect(os.Args[1])
    if err != nil {
        fmt.Println(err)
    }
    ec, err := nats.NewEncodedConn(nc, natsp.PROTOBUF_ENCODER)
    defer ec.Close()
}

Ok, first the standard receive which is totally natural:

defer ec.Close()

ec.Subscribe("Messaging.Text.Standard", func(m *Transport.TextMessage) {
    fmt.Println("Got standard message: \"", m.Body, "\" with the Id ", m.Id, ".")
})

Now, the responding, which has a little bit changed syntax again. As the handler function is:

func (subject, reply string, m *Transport.TextMessage)

So the responding looks like this:

ec.Subscribe("Messaging.Text.Respond", func(subject, reply string, m *Transport.TextMessage) {
    fmt.Println("Got ask for response message: \"", m.Body, "\" with the Id ", m.Id, ".")

    newMessage := Transport.TextMessage{Id: m.Id, Body: "Responding!"}
    ec.Publish(reply, &newMessage)
})

And finally using channels, which doesn’t differ nearly at all in comparison to the sending side:

receiveChannel := make(chan *Transport.TextMessage)
ec.BindRecvChan("Messaging.Text.Channel", receiveChannel)

for m := range receiveChannel {
    fmt.Println("Got channel'ed message: \"", m.Body, "\" with the Id ", m.Id, ".")
}

Ok, that’s all in the topic of NATS. I hope you liked it and discovered something new! Please comment if you have any opinions, or don’t like something, or just want me to write about something.

Now go and build something great!

Practical Golang: Using Protobuffs

Introduction

Most apps we make need a means of communication. We usually use JSON, or just plain text. JSON has got especially popular because of the rise of Node.js. The truth though, is, that JSON isn’t really a fast format. The marshaller in Go also isn’t that fast. That’s why in this article we’ll learn how to use google protocol buffers. They are in fact very easy to use, and are much faster than JSON.

Regarding the performance gains, here they are, according to this benchmark:

benchmark iter time/iter bytes alloc allocs
BenchmarkJsonMarshal-8 500000 3714 ns/op 1232 B/op 10 allocs/op
BenchmarkJsonUnmarshal-8 500000 4125 ns/op 416 B/op 7 allocs/op
BenchmarkProtobufMarshal-8 1000000 1554 ns/op 200 B/op 7 allocs/op
BenchmarkProtobufUnmarshal-8 1000000 1055 ns/op 192 B/op 10 allocs/op
BenchmarkGogoprotobufMarshal-8 10000000 211 ns/op 64 B/op 1 allocs/op
BenchmarkGogoprotobufUnmarshal-8 5000000 289 ns/op 96 B/op 3 allocs/op

Ok, now let’s set up the environment.

Setup

First we’ll need to get the protobuffer compiler binaries from here:
https://github.com/google/protobuf/releases/tag/v3.0.0-beta-3
Unpack them somewhere in your PATH.

The next step is to get the golang plugin. Make sure that GOPATH/bin is in your PATH.

go get -u github.com/golang/protobuf/protoc-gen-go

Writing .proto files

Now it’s time to define our structure we’ll use. I’ll create mine in my project root. I’ll call it clientStructure.proto.

First we need to define the version of protobuffers we will use. Here we will use the newest – proto3. We’ll also define the package of the file. This will also be our go package name of the generated file.

syntax = "proto3";
package main;

Ok, now we’ll define our main structure in the file. The Client structure:

message Client {

}

Now it’s time to define the available fields. Fields are refered to by id, so for each field we define the type, name and id like this:

type name = id;

We’ll start with our first 4 fields of our client:

message Client {
        int32 id = 1;
        string name = 2;
        string email = 3;
        string country = 4;
}

We will also define an inner structure Mail:

        string country = 4;

        message Mail {
                string remoteEmail = 1;
                string body = 2;
        }

and finally define the inbox field. It’s an array of mails, which we create using the repeated keyword:

        message Mail {
                string remoteEmail = 1;
                string body = 2;
        }

        repeated Mail inbox = 5;
}

Now let’s compile it!
Open the directory with the protofile, and launch:

protoc --go_out=. clientStructure.proto

This will create a generated GO file with our Client structure.

My file looks like this:

// Code generated by protoc-gen-go.
// source: clientStructure.proto
// DO NOT EDIT!

/*
Package main is a generated protocol buffer package.

It is generated from these files:
    clientStructure.proto

It has these top-level messages:
    Client
*/
package main

import proto "github.com/golang/protobuf/proto"
import fmt "fmt"
import math "math"

// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf

// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
const _ = proto.ProtoPackageIsVersion1

type Client struct {
    Id          int32                   `protobuf:"varint,1,opt,name=id" json:"id,omitempty"`
    Name        string               `protobuf:"bytes,2,opt,name=name" json:"name,omitempty"`
    Email    string              `protobuf:"bytes,3,opt,name=email" json:"email,omitempty"`
    Country string               `protobuf:"bytes,4,opt,name=country" json:"country,omitempty"`
    Inbox    []*Client_Mail `protobuf:"bytes,5,rep,name=inbox" json:"inbox,omitempty"`
}

func (m *Client) Reset()                                        { *m = Client{} }
func (m *Client) String() string                        { return proto.CompactTextString(m) }
func (*Client) ProtoMessage()                            {}
func (*Client) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} }

func (m *Client) GetInbox() []*Client_Mail {
    if m != nil {
        return m.Inbox
    }
    return nil
}

type Client_Mail struct {
    RemoteEmail string `protobuf:"bytes,1,opt,name=remoteEmail" json:"remoteEmail,omitempty"`
    Body                string `protobuf:"bytes,2,opt,name=body" json:"body,omitempty"`
}

func (m *Client_Mail) Reset()                                       { *m = Client_Mail{} }
func (m *Client_Mail) String() string                       { return proto.CompactTextString(m) }
func (*Client_Mail) ProtoMessage()                           {}
func (*Client_Mail) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0, 0} }

func init() {
    proto.RegisterType((*Client)(nil), "main.Client")
    proto.RegisterType((*Client_Mail)(nil), "main.Client.Mail")
}

var fileDescriptor0 = []byte{
    // 191 bytes of a gzipped FileDescriptorProto
    0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xe2, 0x12, 0x4d, 0xce, 0xc9, 0x4c,
    0xcd, 0x2b, 0x09, 0x2e, 0x29, 0x2a, 0x4d, 0x2e, 0x29, 0x2d, 0x4a, 0xd5, 0x2b, 0x28, 0xca, 0x2f,
    0xc9, 0x17, 0x62, 0xc9, 0x4d, 0xcc, 0xcc, 0x53, 0x3a, 0xcc, 0xc8, 0xc5, 0xe6, 0x0c, 0x96, 0x17,
    0xe2, 0xe3, 0x62, 0xca, 0x4c, 0x91, 0x60, 0x54, 0x60, 0xd4, 0x60, 0x0d, 0x02, 0xb2, 0x84, 0x84,
    0xb8, 0x58, 0xf2, 0x12, 0x73, 0x53, 0x25, 0x98, 0x80, 0x22, 0x9c, 0x41, 0x60, 0xb6, 0x90, 0x08,
    0x17, 0x6b, 0x2a, 0x50, 0x5f, 0x8e, 0x04, 0x33, 0x58, 0x10, 0xc2, 0x11, 0x92, 0xe0, 0x62, 0x4f,
    0xce, 0x2f, 0xcd, 0x2b, 0x29, 0xaa, 0x94, 0x60, 0x01, 0x8b, 0xc3, 0xb8, 0x42, 0xea, 0x5c, 0xac,
    0x99, 0x79, 0x49, 0xf9, 0x15, 0x12, 0xac, 0x0a, 0xcc, 0x1a, 0xdc, 0x46, 0x82, 0x7a, 0x20, 0x4b,
    0xf5, 0x20, 0x16, 0xea, 0xf9, 0x02, 0xf5, 0x06, 0x41, 0xe4, 0xa5, 0x6c, 0xb8, 0x58, 0x40, 0x5c,
    0x21, 0x05, 0x2e, 0xee, 0xa2, 0xd4, 0xdc, 0xfc, 0x92, 0x54, 0x57, 0xb0, 0x35, 0x8c, 0x60, 0xe3,
    0x90, 0x85, 0x40, 0xce, 0x4a, 0xca, 0x4f, 0xa9, 0x84, 0x39, 0x0b, 0xc4, 0x4e, 0x62, 0x03, 0x7b,
    0xc9, 0x18, 0x10, 0x00, 0x00, 0xff, 0xff, 0xfd, 0x42, 0x16, 0x7b, 0xeb, 0x00, 0x00, 0x00,
}

Using Protocol Buffers in Go

The Server

Let’s first create the server. We will just receive a protobuf in a POST body, and print the contents.

First the basic structure and the imports:

package main

import (
    "github.com/golang/protobuf/proto"
    "net/http"
    "fmt"
    "io/ioutil"
)

func main() {
    http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
    })

    http.ListenAndServe(":3000", nil)
}

So, let’s start with a Client structure to fill, and read the body.

http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
        myClient := Client{}

        data, err := ioutil.ReadAll(r.Body)

        if err != nil {
            fmt.Println(err)
        }
    })

We’ll unmarshall the data, passing in a reference to the Client to fill in, and check for errors.

        if err != nil {
            fmt.Println(err)
        }

        if err := proto.Unmarshal(data, &myClient); err != nil {
            fmt.Println(err)
        }

and finally print it all

        if err := proto.Unmarshal(data, &myClient); err != nil {
            fmt.Println(err)
        }

        println(myClient.Id, ":", myClient.Name, ":", myClient.Email, ":", myClient.Country)

        for _, mail := range myClient.Inbox {
            fmt.Println(mail.RemoteEmail, ":", mail.Body)
        }
    })

Now let’s start with…

The Client

The Client will just fill in the Client structure, and send it to the server.

The structure:

package main

import (
    "github.com/golang/protobuf/proto"
    "net/http"
    "fmt"
    "bytes"
)

func main() {
}

We create the Client structure and fill it in. For the purposes of this article we’ll use, the oh so creatively named, John Doe.

func main() {
    myClient := Client{Id: 526, Name: "John Doe", Email: "[email protected]", Country: "US"}
    clientInbox := make([]*Client_Mail, 0, 20)
    clientInbox = append(clientInbox,
        &Client_Mail{RemoteEmail: "[email protected]", Body: "Hello. Greetings. Bye."},
        &Client_Mail{RemoteEmail: "[email protected]", Body: "Bye, Greetings, hello."})

    myClient.Inbox = clientInbox
}

We’ll marshall the John (the Client) into raw data and finally send him to the server.

    myClient.Inbox = clientInbox

    data, err := proto.Marshal(&myClient)
    if err != nil {
        fmt.Println(err)
        return
    }

    _, err = http.Post("http://localhost:3000", "", bytes.NewBuffer(data))

    if err != nil {
        fmt.Println(err)
        return
    }
}

Note that we used bytes.NewBuffer, so our raw data satisfies the Reader requirement for the request body.

Conclusion

As you can see, protobuffs are really easy to use and provide an actual speed boost in your application. Hope you’ll try to use them instead of JSON or other forms of transport in your next project. You can get more information about the more advanced functionalities here: https://developers.google.com/protocol-buffers/docs/gotutorial

Happy coding!

Practical Golang: Writing a simple login middleware

Introduction

In this part we’ll be creating a simple middleware you can easily apply to your handlers to get authentication/authorization. Middleware like this is an awesome way to add additional functionality to your Go server. Here we will only do authorization as we will only ask for a password, not a login. Although if you want, then you can easily extend this system to any authentication/authorization you’d like.

Implementation

We will mainly use the stdlib, and will use cookies to remember who’s already logged in. To generate the cookie values we will use the go.uuid library. So remember to

    go get github.com/satori/go.uuid

We will start with a basic structure of our system with an already existing “Hello World!!!” handler:

package main

import (
    "net/http"
    "fmt"
    "github.com/satori/go.uuid"
    "sync"
)

const loginPage = "<html><head><title>Login</title></head><body><form action=\"login\" method=\"post\"> <input type=\"password\" name=\"password\" /> <input type=\"submit\" value=\"login\" /> </form> </body> </html>"

func main() {

    http.Handle("/hello", helloWorldHandler{})
    http.HandleFunc("/login", handleLogin)

    http.ListenAndServe(":3000", nil)
}

type helloWorldHandler struct {
}

func (h helloWorldHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    fmt.Fprintln(w, "Hello World!!!")
}

type authenticationMiddleware struct {
    wrappedHandler http.Handler
}

func (h authenticationMiddleware) ServeHTTP(w http.ResponseWriter,r *http.Request) {
}

func authenticate(h http.Handler) authenticationMiddleware {
    return authenticationMiddleware{h}
}

func handleLogin(w http.ResponseWriter, r *http.Request) {
}

Ok, lets go over this code.

package main

import (
    "net/http"
    "fmt"
    "github.com/satori/go.uuid"
    "sync"
)

const loginPage = "<html><head><title>Login</title></head><body><form action=\"login\" method=\"post\"> <input type=\"password\" name=\"password\" /> <input type=\"submit\" value=\"login\" /> </form> </body> </html>"

func main() {

    http.Handle("/hello", helloWorldHandler{})
    http.HandleFunc("/login", handleLogin)

    http.ListenAndServe(":3000", nil)
}

type helloWorldHandler struct {
}

func (h helloWorldHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    fmt.Fprintln(w, "Hello World!!!")
}

First we declare the package, imports and the html code of the login page.

We also declare the basic hello world handler.

Now to get to the interesting part.

type authenticationMiddleware struct {
    wrappedHandler http.Handler
}

func (h authenticationMiddleware) ServeHTTP(w http.ResponseWriter,r *http.Request) {
}

func authenticate(h http.Handler) authenticationMiddleware {
    return authenticationMiddleware{h}
}

func handleLogin(w http.ResponseWriter, r *http.Request) {
}

we create a handler which will supply authorization, and if authorized successfully, will let the user through to the underlying handler. We also define the authenticate method, a simple function wrapper over creating a struct, and a function to handle the login.

That means we can also define the last route, the secured hello world route.

func main() {
    http.Handle("/hello", helloWorldHandler{})
    http.Handle("/secureHello", authenticate(helloWorldHandler{}))
    http.HandleFunc("/login", handleLogin)

    http.ListenAndServe(":3000", nil)
}

Ok, we will also need a simple client struct, which will just save if the target client session is authorized, and a map containing our Clients with cookie values being the keys.

var sessionStore map[string]Client
var storageMutex sync.RWMutex

type Client struct {
    loggedIn bool
}

We also need the mutex for concurrent map access. In the main function we initialize the map:

func main() {
    sessionStore = make(map[string]Client)
    http.Handle("/hello", helloWorldHandler{})

Now we can go to the authenticationMiddleware’s ServeHTTP function:

We’ll begin with checking if the cookie is present, if it isn’t there, we’ll continue and create a new. If the error is nonstandard then we just return.

func (h authenticationMiddleware) ServeHTTP(w http.ResponseWriter,r *http.Request) {
    cookie, err := r.Cookie("session")
    if err != nil {
        if err != http.ErrNoCookie {
            fmt.Fprint(w, err)
            return
        } else {
            err = nil
        }
    }

We later check, unless the cookie exists, if it’s saved in our map. If it’s not, then we will later generate a new one.

var present bool
var client Client
if cookie != nil {
    storageMutex.RLock()
    client, present = sessionStore[cookie.Value]
    storageMutex.RUnlock()
} else {
    present = false
}

Now, if the cookie wasn’t present, then we can generate a new one!:

if present == false {
    cookie = &http.Cookie{
        Name: "session",
        Value: uuid.NewV4().String(),
    }
    client = Client{false}
    storageMutex.Lock()
    sessionStore[cookie.Value] = client
    storageMutex.Unlock()
}

We can then set the cookie to our response writer, and if the client isn’t logged in, send him the login page, however, if he is logged in, then we can send him what he wanted:

    http.SetCookie(w, cookie)
    if client.loggedIn == false {
        fmt.Fprint(w, loginPage)
        return
    }
    if client.loggedIn == true {
        h.wrappedHandler.ServeHTTP(w, r)
        return
    }
}

So far so good, that’s actually already about the main part of the middleware, now we’ll write a function just for handling the login logic. The handleLogin function:

func handleLogin(w http.ResponseWriter, r *http.Request) {
    cookie, err := r.Cookie("session")
    if err != nil {
        if err != http.ErrNoCookie {
            fmt.Fprint(w, err)
            return
        } else {
            err = nil
        }
    }
    var present bool
    var client Client
    if cookie != nil {
        storageMutex.RLock()
        client, present = sessionStore[cookie.Value]
        storageMutex.RUnlock()
    } else {
        present = false
    }

    if present == false {
        cookie = &http.Cookie{
            Name: "session",
            Value: uuid.NewV4().String(),
        }
        client = Client{false}
        storageMutex.Lock()
        sessionStore[cookie.Value] = client
        storageMutex.Unlock()
    }
    http.SetCookie(w, cookie)}
}

First we created the part which is accountable for the cookie handling, as in the recent function. Now we get to the form parsing and the actual login part.

http.SetCookie(w, cookie)
err = r.ParseForm()
if err != nil {
    fmt.Fprint(w, err)
    return
}

if subtle.ConstantTimeCompare([]byte(r.FormValue("password")), []byte("password123")) == 1 {
    //login user
} else {
    fmt.Fprintln(w, "Wrong password.")
}

We parse the login form and check if the login conditions are met. Here we only need the password to be correct. If it is we log the client in:

if subtle.ConstantTimeCompare([]byte(r.FormValue("password")), []byte("password123")) == 1 {
    client.loggedIn = true
    fmt.Fprintln(w, "Thank you for logging in.")
    storageMutex.Lock()
    sessionStore[cookie.Value] = client
    storageMutex.Unlock()
}

We use subtle.ConstantTimeCompare as it protects us from time-based attacks. (Thanks for the tip in the reddit comment.)

That’s basically all we need. Now you can secure the routes you want easily.

Conclusion

Remember, that for a secure implementation you need encrypt the network traffic using SSL/TLS, otherwise, somebody can just read the cookie and impersonate your user.

Another thing to consider is redirecting the user to the page he wanted to get to after logging in.

Have fun with creating other interesting middleware!

Practical Golang: Event multicast/subscription service

Introduction

In our microservice architectures we always need a method for communicating between services. There are various ways to achieve this. Few of them are, but are not limited to: Remote Procedure Call, REST API’s, message BUSses. In this comprehensive tutorial we’ll write a service, which you can use to distribute messages/events across your system.

Design

How will it work? It will accept registering subscribers (other microservices). Whenever it gets a message from a microservice, it will send it further to all subscribers, using a REST call to the other microservices /event URL.

Subscribers will need to call a keep-alive URL regularly, otherwise they will get removed from the subscriber list. This protects us from sending messages to too many ghost subscribers.

Implementation

Let’s start with a basic structure. We’ll define the API and set up our two main data structures:
1. The subscriber list with their register/lastKeepAlive dates.
2. The mutex controlling access to our subscriber list.

package main

import (
    "net/http"
    "time"
    "sync"
    "fmt"
    "net/url"
    "io/ioutil"
    "bytes"
)

var registeredServiceStorage map[string]time.Time
var serviceStorageMutex sync.RWMutex

func main() {
    registeredServiceStorage = make(map[string]time.Time)
    serviceStorageMutex = sync.RWMutex{}

    http.HandleFunc("/registerAndKeepAlive", registerAndKeepAlive)
    http.HandleFunc("/deregister", deregister)
    http.HandleFunc("/sendMessage", handleMessage)
    http.HandleFunc("/listSubscribers", handleSubscriberListing)

    go killZombieServices()
    http.ListenAndServe(":3000", nil)
}

func registerAndKeepAlive(w http.ResponseWriter, r *http.Request) {
}

func deregister(w http.ResponseWriter, r *http.Request) {
}

func handleMessage(w http.ResponseWriter, r *http.Request) {
}

func sendMessageToSubscriber(data []byte, address string) {
}

func handleSubscriberListing(w http.ResponseWriter, r *http.Request) {
}

func killZombieServices() {
}

We initialize our subscriber list and mutex, and also launch, on another thread, a function that will regularly delete ghost subscribers.

So far so good!
We can now start getting into each functions implementation.

We can begin with the registerAndKeepAlive which does both things. Registering a new subscriber, or updating an existing one. This works because in both cases we just update the map entry with the subscriber address to contain the current time.

func registerAndKeepAlive(w http.ResponseWriter, r *http.Request) {
    if r.Method == http.MethodPost {
        //Subscriber registration
    } else {
        w.WriteHeader(http.StatusBadRequest)
        fmt.Fprint(w, "Error: Only POST accepted")
    }
}

The register function should be called with a POST request. That’s why the first thing we do, is checking if the method is right, otherwise we answer with an error. If it’s ok, then we register the client:

if r.Method == http.MethodPost {
    values, err := url.ParseQuery(r.URL.RawQuery)
    if err != nil {
        w.WriteHeader(http.StatusBadRequest)
        fmt.Fprint(w, "Error:", err)
        return
    }
    if len(values.Get("address")) == 0 {
        w.WriteHeader(http.StatusBadRequest)
        fmt.Fprint(w, "Error:","Wrong input address.")
        return
    }

}

We check if the URL arguments are correct, and finally register the subscriber:

if len(values.Get("address")) == 0 {
    w.WriteHeader(http.StatusBadRequest)
    fmt.Fprint(w, "Error:","Wrong input address.")
    return
}

serviceStorageMutex.Lock()
registeredServiceStorage[values.Get("address")] = time.Now()
serviceStorageMutex.Unlock()

fmt.Fprint(w, "success")

Awesome!

Let’s now implement the function which shall delete the entry when the subscriber wants to deregister.

func deregister(w http.ResponseWriter, r *http.Request) {
    if r.Method == http.MethodDelete {
        values, err := url.ParseQuery(r.URL.RawQuery)
        if err != nil {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error:", err)
            return
        }
        if len(values.Get("address")) == 0 {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error:","Wrong input address.")
            return
        }

        //Subscriber deletion will come here

    } else {
        w.WriteHeader(http.StatusBadRequest)
        fmt.Fprint(w, "Error: Only DELETE accepted")
    }
}

Again do we check if the request method is good and if the address argument is correct. If that’s the case, then we can remove this client from our subscriber list.

if len(values.Get("address")) == 0 {
    w.WriteHeader(http.StatusBadRequest)
    fmt.Fprint(w, "Error:","Wrong input address.")
    return
}

serviceStorageMutex.Lock()
delete(registeredServiceStorage, values.Get("address"))
serviceStorageMutex.Unlock()

fmt.Fprint(w, "success")

Now it’s time for the main functionality. Namely handling messages and sending them to all subscribers:

func handleMessage(w http.ResponseWriter, r *http.Request) {
    if r.Method == http.MethodPost {

    } else {
        w.WriteHeader(http.StatusBadRequest)
        fmt.Fprint(w, "Error: Only POST accepted")
    }
}

As usual, we check if the request method is correct.

Then, we read the data we got, so we can pass it to multiple concurrent sending functions.

if r.Method == http.MethodPost {

    data, err := ioutil.ReadAll(r.Body)
    if err != nil {
        fmt.Println(err)
    }
    //...
}

We then lock the mutex for read. That’s important so that we can handle huge amounts of messages efficiently. Basically, it means that we allow others to read while we are reading, because concurrent reading is supported by maps. We can use this unless there’s no one modifying the map.

While we lock the map for read, we check the list of subscribers we have to send the message to, and start concurrent functions that will do the sending. As we don’t want to lock the map for the entire sending time, we only need the addresses.

    data, err := ioutil.ReadAll(r.Body)
    if err != nil {
        fmt.Println(err)
    }

    serviceStorageMutex.RLock()
    for address, _ := range registeredServiceStorage {
        go sendMessageToSubscriber(data, address)
    }
    serviceStorageMutex.RUnlock()

    fmt.Fprint(w, "success")

Which means we now have to implement the sendMessageToSubscriber(…) function.

It’s pretty simple, we just make a post, and print an error if it happened.

func sendMessageToSubscriber(data []byte, address string) {
    _, err := http.Post("http://" + address + "/event", "", bytes.NewBuffer(data))
    if err != nil {
        fmt.Println(err)
    }
}

It’s important to notice, that we have to create a buffer from the data, as the http.Post(…) function needs a reader type data structure.

We’ll also implement the function which makes it possible to list all the subscribers. Mainly for debugging purposes. There’s nothing new in it. We check if the method is alright, lock the mutex for read, and finally print the map with a correct format of the register time.

func handleSubscriberListing(w http.ResponseWriter, r *http.Request) {
    if r.Method == http.MethodGet {
        serviceStorageMutex.RLock()

        for address, registerTime := range registeredServiceStorage {
            fmt.Fprintln(w, address, " : ", registerTime.Format(time.RFC3339))
        }

        serviceStorageMutex.RUnlock()
    } else {
        w.WriteHeader(http.StatusBadRequest)
        fmt.Fprint(w, "Error: Only GET accepted")
    }
}

Now there’s only one function left. The one that will make sure no ghost services stay for too long. It will check all the services once per minute. This way we’re making it cheap on performance:

func killZombieServices() {
    t := time.Tick(1 * time.Minute)

    for range t {
    }
}

This is a nice way to launch the code every minute. We create a channel which will send us the time every minute, and range over it, ignoring the received values.

We can now get the check and remove working.

for range t {
    timeNow := time.Now()
    serviceStorageMutex.Lock()
    for address, timeKeepAlive := range registeredServiceStorage {
        if timeNow.Sub(timeKeepAlive).Minutes() > 2 {
            delete(registeredServiceStorage, address)
        }
    }
    serviceStorageMutex.Unlock()
}

We just range over the subscribers and delete those that haven’t kept their subscription alive.

To add to that, if you wanted you could first make a read-only pass over the subscribers, and immediately after that, make a write-locked deletion of the ones you found. This would allow others to read the map while you’re finding subscribers to delete.

Conclusion

That’s all! Have fun with creating an infrastructure based on such a service!

Practical Golang: Using Google Drive and Calendar

Introduction

Integrating Google services into your app can lead to a lot of nice features for your users, and can create a seamless experience for them. In this tutorial we’ll learn how to use the most useful functionalities of Google Calendar and Google Drive.

The theory

To begin with, we should understand the methodology of using the Google API in Golang. For most of their API’s I’ve skimmed through it works like that:

  1. Create an OAuth2 client from the OAuth2 access token.
  2. Use the client to create an app service, this will be our interface we’ll use to communicate with Google services.
  3. We create a request object and set the needed parameters.
  4. We start the action, usually using the Do() function on the request object.

After you learn it for one Google service, it will be trivial for you to use it for any other.

Dependencies

Here we will need the Google Calendar and Google Drive libraries, both of which we need version 3 of. (the newest at the moment)

So make sure to:

go get google.golang.org/api/calendar/v3
go get google.golang.org/api/drive/v3

The basic structure

For both of our apps we’ll need the same basic OAuth2 app structure. You can learn more about it in my previous article

package main

import (
    "fmt"
    "net/http"
    "golang.org/x/oauth2"
    "os"
    "golang.org/x/oauth2/google"
    "golang.org/x/net/context"
    "time"
)

var (
    googleOauthConfig = &oauth2.Config{
        RedirectURL:    "http://localhost:3000/GoogleCallback",
        ClientID:     os.Getenv("googlekey"), // from https://console.developers.google.com/project/<your-project-id>/apiui/credential
        ClientSecret: os.Getenv("googlesecret"), // from https://console.developers.google.com/project/<your-project-id>/apiui/credential
        Scopes:       []string{},
        Endpoint:     google.Endpoint,
    }
// Some random string, random for each request
    oauthStateString = "random"
)

const htmlIndex = `<html><body>
<a href="/GoogleLogin">Log in with Google</a>
</body></html>
`

func main() {
    http.HandleFunc("/", handleMain)
    http.HandleFunc("/GoogleLogin", handleGoogleLogin)
    http.HandleFunc("/GoogleCallback", handleGoogleCallback)
    fmt.Println(http.ListenAndServe(":3000", nil))
}
func handleMain(w http.ResponseWriter, r *http.Request) {
    fmt.Fprintf(w, htmlIndex)
}

func handleGoogleLogin(w http.ResponseWriter, r *http.Request) {
    url := googleOauthConfig.AuthCodeURL(oauthStateString)
    http.Redirect(w, r, url, http.StatusTemporaryRedirect)
}

func handleGoogleCallback(w http.ResponseWriter, r *http.Request) {
    state := r.FormValue("state")
    if state != oauthStateString {
        fmt.Printf("invalid oauth state, expected '%s', got '%s'\n", oauthStateString, state)
        http.Redirect(w, r, "/", http.StatusTemporaryRedirect)
        return
    }

    code := r.FormValue("code")
    token, err := googleOauthConfig.Exchange(oauth2.NoContext, code)
    if err != nil {
        fmt.Printf("oauthConf.Exchange() failed with '%s'\n", err)
        http.Redirect(w, r, "/", http.StatusTemporaryRedirect)
        return
    }

    client := oauth2.NewClient(context.Background(), oauth2.StaticTokenSource(token))
}

There is one thing I haven’t covered in my previous blog post, namely the last line:

client := oauth2.NewClient(context.Background(), oauth2.StaticTokenSource(token))

We need an OAuth2 client to use the Google API, so we create one. It takes a context, for lack of which we just use the background context. It also needs a token source. As we only want to make one request and know that this token will suffice we create a static token source which will always generate the same token which we’ve passed to it.

Creating the Calendar app.

Preperation

First, as described in my previous article, you should enable the Google Calendar API in the Google Cloud Console for you app.

Also, we’ll need to ask for permission, so add the https://www.googleapis.com/auth/calendar scope to our googleOauthConfig:

googleOauthConfig = &oauth2.Config{
        RedirectURL:    "http://localhost:3000/GoogleCallback",
        ClientID:     os.Getenv("googlekey"), // from https://console.developers.google.com/project/<your-project-id>/apiui/credential
        ClientSecret: os.Getenv("googlesecret"), // from https://console.developers.google.com/project/<your-project-id>/apiui/credential
        Scopes:       []string{"https://www.googleapis.com/auth/calendar"},
        Endpoint:     google.Endpoint,
    }

Remember to import google.golang.org/api/calendar/v3

The main code

We’ll add everything we write right after creating our OAuth2 client.

First, as I described before, we’ll need an app service, here it will be the calendar service, so let’s create it!

client := oauth2.NewClient(context.Background(), oauth2.StaticTokenSource(token))

calendarService, err := calendar.New(client)
if err != nil {
  fmt.Fprintln(w, err)
  return
}

It just uses the OAuth client to create the service and errors out if something goes wrong.

Listing events

Now we will create a request, add a few optional parameters to it and start it. We’ll build it up step by step.

calendarService, err := calendar.New(client)
if err != nil {
  fmt.Fprintln(w, err)
  return
}

calendarService.Events.List("primary")

This creates a request to list all events in your primary calendar. You could also name a specific calendar, but using primary will take the primary calendar of that user.

So… I think we don’t really care about the events 5 years ago. So let’s only take the upcoming ones.

calendarService.Events.List("primary").TimeMin(time.Now().Format(time.RFC3339))

We add an option TimeMin which takes a DateTime by string… No idea why it isn’t a nice struct like time.DateTime. You also need to format it as a string in the RFC3339 format.

Ok… but that could be a lot of events, so we’ll just take the 5 first:

calendarService.Events.List("primary").TimeMin(time.Now().Format(time.RFC3339)).MaxResults(5)

Now we just have to Do() it, and store the results:

calendarEvents, err := calendarService.Events.List("primary").TimeMin(time.Now().Format(time.RFC3339)).MaxResults(5).Do()
if err != nil {
  fmt.Fprintln(w, err)
  return
}

How can we now do something with the results? Simple! :

calendarEvents, err := calendarService.Events.List("primary").TimeMin(time.Now().Format(time.RFC3339)).MaxResults(5).Do()
if err != nil {
  fmt.Fprintln(w, err)
  return
}

if len(calendarEvents.Items) > 0 {
    for _, i := range calendarEvents.Items {
        fmt.Fprintln(w, i.Summary, " ", i.Start.DateTime)
    }
}

We access a list of events using the Items field in the calendarEvents variable, if there is at least one element, then for each element we write the summary and start time to the response writer using a for range loop.

Creating an event

Ok, we already know how to list events, now let’s create an event!
First, we need an event object:

if len(calendarEvents.Items) > 0 {
    for _, i := range calendarEvents.Items {
        fmt.Fprintln(w, i.Summary, " ", i.Start.DateTime)
    }
}
newEvent := calendar.Event{
    Summary: "Testevent",
    Start: &calendar.EventDateTime{DateTime: time.Date(2016, 3, 11, 12, 24, 0, 0, time.UTC).Format(time.RFC3339)},
    End: &calendar.EventDateTime{DateTime: time.Date(2016, 3, 11, 13, 24, 0, 0, time.UTC).Format(time.RFC3339)},
}

We create an Event struct and pass in the summary – title of the event.
We also pass the start and finish DateTime. We create a date using the stdlib time package, and then convert it to a string in the RFC3339 format. There are tons of other optional fields you can specify if you want to.

Next we need to create an insert request object:

newEvent := calendar.Event{
    Summary: "Testevent",
    Start: &calendar.EventDateTime{DateTime: time.Date(2016, 3, 11, 12, 24, 0, 0, time.UTC).Format(time.RFC3339)},
    End: &calendar.EventDateTime{DateTime: time.Date(2016, 3, 11, 13, 24, 0, 0, time.UTC).Format(time.RFC3339)},
}
calendarService.Events.Insert("primary", &newEvent)

The Insert request takes two arguments, the calendar name and an event object.

As usual we neeed to Do() the request! and saving the resulting created event can also come handy in the future:

createdEvent, err := calendarService.Events.Insert("primary", &newEvent).Do()
if err != nil {
    fmt.Fprintln(w, err)
    return
}

In the end let’s just print some kind of confirmation to the user:

fmt.Fprintln(w, "New event in your calendar: \"", createdEvent.Summary, "\" starting at ", createdEvent.Start.DateTime)

Hint

You can define the event ID yourself before creating it, but you can also let the Google Calendar service generate an ID for us as we did.

Creating the Drive app

Preperation

First, enable the Google Drive API in the Google Cloud Console

Again we will need to ask the user for permission. So we have to use the https://www.googleapis.com/auth/drive and https://www.googleapis.com/auth/drive.file scopes:

googleOauthConfig = &oauth2.Config{
        RedirectURL:    "http://localhost:3000/GoogleCallback",
        ClientID:     os.Getenv("googlekey"), // from https://console.developers.google.com/project/<your-project-id>/apiui/credential
        ClientSecret: os.Getenv("googlesecret"), // from https://console.developers.google.com/project/<your-project-id>/apiui/credential
        Scopes:       []string{"https://www.googleapis.com/auth/drive", "https://www.googleapis.com/auth/drive.file"},
        Endpoint:     google.Endpoint,
    }

Remember to import google.golang.org/api/drive/v3″

The main code

Again we need to start from our basic OAuth2 app structure with an OAuth2 client, and create an app service. Here, this will be the Drive service:

client := oauth2.NewClient(context.Background(), oauth2.StaticTokenSource(token))

driveService, err := drive.New(client)
if err != nil {
    fmt.Fprintln(w, err)
    return
}

Listing files

Ok, to begin with let’s learn how to list files from the Google Drive. There is an important concept you should understand before we start. Whenever we request a list of files from Google Drive, those can literally be thousands of files. That’s why we get one page of files, which includes files metadata, not the content, and a NextPageToken, which we can use to get the next page.

As before with the calendar app, we create a list request.

driveService, err := drive.New(client)
if err != nil {
    fmt.Fprintln(w, err)
    return
}

driveService.Files.List()

But we can also set an option. The ordering for example:

driveService.Files.List().OrderBy("name")

Ok, now let’s Do() it and save the results:

myFilesList, err := driveService.Files.List().OrderBy("name").Do()
if err != nil {
    fmt.Fprintf(w, "Couldn't retrieve files ", err)
}

As I wrote before, this is one page of files, now let’s go over it:

if len(myFilesList.Files) > 0 {
    for _, i := range myFilesList.Files {
        fmt.Fprintln(w, i.Name, " ", i.Id)
    }
} else {
    fmt.Fprintln(w, "No files found.")
}

We check if there are any files, and if there are, we range over them printing the name and Id of each one, else we print that there are none.

Now, let’s get more pages, it’s the myFilesList.NextPageToken holding the token for the next page. If it is an empty string, then this was the last page. While it is present we load new pages into our myFilesList variable.

for myFilesList.NextPageToken != "" {
    myFilesList, err = driveService.Files.List().OrderBy("name").PageToken(myFilesList.NextPageToken).Do()
    if err != nil {
        fmt.Fprintf(w, "Couldn't retrieve files ", err)
        break
    }
    fmt.Fprintln(w, "Next Page: !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
    if len(myFilesList.Files) > 0 {
        for _, i := range myFilesList.Files {
            fmt.Fprintln(w, i.Name, " ", i.Id)
        }
    } else {
        fmt.Fprintln(w, "No files found.")
    }
}

To retrieve the next page we add the PageToken option to our file listing request

    myFilesList, err = driveService.Files.List().OrderBy("name").PageToken(myFilesList.NextPageToken).Do()

Whenever we start printing the new page, we notify a user that a new page just started. Later we check if we have any files, and if we do, then we range over them as before, printing the names and Id’s

Creating a file

We already know how to list files in our Google Drive. Now let’s learn how to create a file and add some content to it!

First, we need to create the file metadata:

for myFilesList.NextPageToken != "" {
    myFilesList, err = driveService.Files.List().OrderBy("name").PageToken(myFilesList.NextPageToken).Do()
    if err != nil {
        fmt.Fprintf(w, "Couldn't retrieve files ", err)
        break
    }
    fmt.Fprintln(w, "Next Page: !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
    if len(myFilesList.Files) > 0 {
        for _, i := range myFilesList.Files {
            fmt.Fprintln(w, i.Name, " ", i.Id)
        }
    } else {
        fmt.Fprintln(w, "No files found.")
    }
}


myFile := drive.File{Name: "cats.png"}

Again, the Id will be generated for us if we do not provide any.

Putting the file into Google Drive is pretty easy now. We create a create request, referencing our file metadata in it:

myFile := drive.File{Name: "cats.png"}

driveService.Files.Create(&myFile)

and Do() it, saving the results:

createdFile, err := driveService.Files.Create(&myFile).Do()
if err != nil {
    fmt.Fprintf(w, "Couldn't create file ", err)
}

Ok, if you started this code, you would get a cats.png file. However, it’s empty. So let’s add some data to it!

To add it, we’ll first need some data. We can load it from a file:

createdFile, err := driveService.Files.Create(&myFile).Do()
if err != nil {
    fmt.Fprintf(w, "Couldn't create file ", err)
}
myImage, err := os.Open("/tmp/image.png")
if err != nil {
    fmt.Fprintln(w, err)
}

Now we have to create the updated file metadata:

myImage, err := os.Open("/tmp/image.png")
if err != nil {
    fmt.Fprintln(w, err)
}
updatedFile := drive.File{Name: "catsUpdated.png"}

We have to construct an update request:

driveService.Files.Update(createdFile.Id, &updatedFile)

Here we specify the Id of the file to modify, and the new metadata. We’ll now add the data to the update request, and Do() it, checking for errors and ignoring the result.

_, err = driveService.Files.Update(createdFile.Id, &updatedFile).Media(myImage).Do()
if err != nil {
    fmt.Fprintln(w, err)
}
fmt.Fprintln(w, createdFile.Id)

In the end we send the new file id to the user.

Hint

You could add the Media option already to the create request if you wanted.

Conclusion

I suppose this was a good short introduction into the structure of the Google API. After learning these, you should have few to no problems using the other API’s.

Have fun integrating it into your app!

Practical Golang: Using websockets

Introduction

This is the first post in the practical Golang series. Posts in it are meant to provide short and informative introductions to various topics.

This one is a about websockets, which are an awesome and easy way to provide communication between your web app and server.

Here we will use the gorilla websocket library, but you could also use a few others.

We will create two basic apps which should cover most day to day usage:
1. A client subscribing to a server to get continues information.
2. A client-ping server-pong app.

Dependencies

Make sure to go get:

go get github.com/gorilla/websocket

What’s the theory?

  1. The client connects to your server using his web browser.
  2. He gets back the website.
  3. He connects to your server through his websocket client using javascript.
  4. Your server accepts it using a standard http handler.
  5. You create a websocket connection from the http connection.
  6. You communicate with the client.
  7. The connection gets closed by one of the sides.

Creating the subscription app

Preparations

Next to our main go file we will need a html folder to place our html file in.

Let’s name it creatively, like index.html

Now the contents:

<!DOCTYPE HTML>
<html>
<head>

    <script type="text/javascript">
         function myWebsocketStart()
         {
               var ws = new WebSocket("ws://localhost:3000/websocket");

               ws.onmessage = function (evt)
               {
                  var myTextArea = document.getElementById("textarea1");
                  myTextArea.value = myTextArea.value + "\n" + evt.data
               };

               ws.onclose = function()
               {
                  var myTextArea = document.getElementById("textarea1");
                  myTextArea.value = myTextArea.value + "\n" + "Connection closed";
               };

         }

    </script>

</head>
<body>
<button onclick="javascript:myWebsocketStart()">Subscribe</button>
<textarea id="textarea1">MyTextArea</textarea>
</body>
</html>

I’ll just go over it quickly as the main subject here is the go code.

We create a button and a textarea, after the user clicks the button he connects to our websocket. Whenever he receives a message, or the connection gets closed, we append the info to our textarea.

We will also need our, so creatively named, main.go file, with the basic structure and file server written:

package main

import (
    "github.com/gorilla/websocket"
    "net/http"
    "os"
    "fmt"
    "io/ioutil"
    "time"
    "encoding/json"
)

func main() {
    indexFile, err := os.Open("html/index.html")
    if err != nil {
        fmt.Println(err)
    }
    index, err := ioutil.ReadAll(indexFile)
    if err != nil {
        fmt.Println(err)
    }
    http.HandleFunc("/websocket", func(w http.ResponseWriter, r *http.Request) {
    })
    http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
        fmt.Fprintf(w, string(index))
    })
    http.ListenAndServe(":3000", nil)
}

Awesome, now let’s create the websocket part.

Writing the websocket code

Little bit of planning

Our server will create a Person object containing a name and age in seconds. Every two seconds it will send the client the current state of the person.

The meat

First we’ll need to define our Person type:

type Person struct {
    Name string
    Age  int
}

We’ll also need to create an upgraded variable, in which we define our read and write buffer sizes.

var upgrader = websocket.Upgrader{
    ReadBufferSize: 1024,
    WriteBufferSize: 1024,
}

Now, how do we create the websocket connection? Pretty easily in fact:

http.HandleFunc("/websocket", func(w http.ResponseWriter, r *http.Request) {
  conn, err := upgrader.Upgrade(w, r, nil)
  if err != nil {
    fmt.Println(err)
    return
  }
  fmt.Println("Client subscribed")
}

That’s all we need to have a client. Now let’s create Bill, our person, right after we get the client subscribed:

fmt.Println("Client subscribed")

myPerson := Person{
  Name: "Bill",
  Age:  0,
}

Now we need the main websocket handling code, which we will wrap into an endless for loop, which we get out of only if the channel closes or Bill gets 40 seconds old.

for {
  time.Sleep(2 * time.Second)
  if myPerson.Age < 40 {
    myJson, err := json.Marshal(myPerson)
    if err != nil {
      fmt.Println(err)
      return
    }
    err = conn.WriteMessage(websocket.TextMessage, myJson)
    if err != nil {
      fmt.Println(err)
      break
    }
    myPerson.Age += 2
  } else {
    conn.Close()
    break
  }
}
fmt.Println("Client unsubscribed")

We send the messages using conn.WriteMessage in which we specify the message type, can be binary or text, and the content. If Bill is 40 years old or more, we break out of the loop. So far so good, but what if we want bidirectional communication?

Creating the ping-pong app

Preparations

As before, we will need a html folder for our html file with the creative name index.html

And here’s the code:

<!DOCTYPE HTML>
<html>
<head>

    <script type="text/javascript">
         function myWebsocketStart()
         {
               var ws = new WebSocket("ws://localhost:3000/websocket");

               ws.onopen = function()
               {
                  // Web Socket is connected, send data using send()
                  ws.send("ping");
                  var myTextArea = document.getElementById("textarea1");
                  myTextArea.value = myTextArea.value + "\n" + "First message sent";
               };

               ws.onmessage = function (evt)
               {
                  var myTextArea = document.getElementById("textarea1");
                  myTextArea.value = myTextArea.value + "\n" + evt.data
                  if(evt.data == "pong") {
                    setTimeout(function(){ws.send("ping");}, 2000);
                  }
               };

               ws.onclose = function()
               {
                  var myTextArea = document.getElementById("textarea1");
                  myTextArea.value = myTextArea.value + "\n" + "Connection closed";
               };

         }

    </script>

</head>
<body>
<button onclick="javascript:myWebsocketStart()">Start websocket!</button>
<textarea id="textarea1">MyTextArea</textarea>
</body>
</html>

The only differences are, that when we open the connection, we send a “ping” message and notify our user about it. Now, whenever we get back a “pong” message, we append it to our textarea and after 2 seconds we answer with a “ping” message again.

We will again need the basic go file structure with the upgrader defined already, and the connection created:

package main

import (
    "github.com/gorilla/websocket"
    "net/http"
    "os"
    "fmt"
    "io/ioutil"
    "time"
)

var upgrader = websocket.Upgrader{
    ReadBufferSize: 1024,
    WriteBufferSize: 1024,
}

func main() {
    indexFile, err := os.Open("html/index.html")
    if err != nil {
        fmt.Println(err)
    }
    index, err := ioutil.ReadAll(indexFile)
    if err != nil {
        fmt.Println(err)
    }
    http.HandleFunc("/websocket", func(w http.ResponseWriter, r *http.Request) {
        conn, err := upgrader.Upgrade(w, r, nil)
        if err != nil {
            fmt.Println(err)
            return
        }
    })
    http.HandleFunc("/", func(w http.ResponseWriter, r * http.Request) {
        fmt.Fprintf(w, string(index))
    })
    http.ListenAndServe(":3000", nil)
}

Writing the websocket code

Ok, so now, whenever we get a “ping” message, we wait 2 seconds and answer with a “pong” message. If we get anything else, we just close the connection.

conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
  fmt.Println(err)
  return
}
for {
  msgType, msg, err := conn.ReadMessage()
  if err != nil {
    fmt.Println(err)
    return
  }
  if string(msg) == "ping" {
    fmt.Println("ping")
    time.Sleep(2 * time.Second)
    err = conn.WriteMessage(msgType, []byte("pong"))
    if err != nil {
      fmt.Println(err)
      return
    }
  } else {
    conn.Close()
    fmt.Println(string(msg))
    return
  }
}

Using the ReadMessage function on our connection we get the type, content, and maybe error. We check the message and answer.

Conclusion

That’s actually all, have fun with it and build something great!