Practical Golang: Getting started with NATS and related patterns

Practical Golang: Getting started with NATS and related patterns

Introduction

Microservices… the never disappearing buzzword of our times. They promise a lot, but can be slow or complicated if not implemented correctly. One of the main challenges when developing and using a microservice-based architecture is getting the communication right. Many will ask, why not REST? As I did at some point. Many will actually use it. But the truth is that it leads to tighter coupling, and is synchronous. Microservice architectures are meant to be asynchronous. Also, REST is blocking, which also isn’t good on many occasions.

What are we meant to use for communication? Usually we use:
– RPC – Remote Procedure Call
– Message BUS/Broker

In this article I’ll write about one specific Message BUS called NATS and using it in Go.

There are also other message BUS’ses/Brokers. Some popular ones are Kafka and RabbitMQ.

Why NATS? It’s simple, and astonishingly fast.

Setting up NATS

To use NATS you can do one of the following things:
1. Use the NATS Docker image
2. Get the binaries
3. Use the public NATS server nats://demo.nats.io:4222
4. Build from source

Also, remember to

go get https://github.com/nats-io/nats

the official Go library.

Getting started

In this article we’ll be using protobuffs a lot. So if you want to know more about them, check out my previous article about protobuffs.

First, let’s write one of the key usages of microservices. A fronted, that lists information from other micrservices, but doesn’t care if one of them is down. It will respond to the user anyways. This makes microservices swappable live, one at a time.

In each of our services we’ll need to connect to NATS:

package main

import (
    "github.com/nats-io/nats"
    "fmt"
)

var nc *nats.Conn

func main() {

    if len(os.Args) != 2 {
        fmt.Println("Wrong number of arguments. Need NATS server address.")
        return
    }
    var err error

    nc, err = nats.Connect(os.Args[1])
    if err != nil {
        fmt.Println(err)
    }
}

Now, let’s write the first provider service. It will receive a User Id, and answer with a user name For which we’ll need a transport structure to send its data over NATS. I wrote this short proto file for that:

syntax = "proto3";
package Transport;

message User {
        string id = 1;
        string name = 2;
}

Now we will create the map containing our user names:

var users map[string]string
var nc *nats.Conn

func main() {

    if len(os.Args) != 2 {
        fmt.Println("Wrong number of arguments. Need NATS server address.")
        return
    }
    var err error

    nc, err = nats.Connect(os.Args[1])
    if err != nil {
        fmt.Println(err)
    }

    users = make(map[string]string)
    users["1"] = "Bob"
    users["2"] = "John"
    users["3"] = "Dan"
    users["4"] = "Kate"
}

and finally the part that’s most interesting to us. Subscribing to the topic:

users["4"] = "Kate"

nc.QueueSubscribe("UserNameById", "userNameByIdProviders", replyWithUserId)

Notice that it’s a QueueSubscribe. Which means that if we start 10 instances of this service in the userNameByIdProviders group , only one will get each message sent over UserNameById. Another thing to note is that this function call is asynchronous, so we need to block somehow. This select {} will provide an endless block:

nc.QueueSubscribe("UserNameById", "userNameByIdProviders", replyWithUserId)
select {}
}

Ok, now to the replyWithUserId function:

func replyWithUserId(m *nats.Msg) {
}

Notice that it takes one argument, a pointer to the message.

We’ll unmarshal the data:

func replyWithUserId(m *nats.Msg) {

    myUser := Transport.User{}
    err := proto.Unmarshal(m.Data, &myUser)
    if err != nil {
        fmt.Println(err)
        return
}

get the name and marshal back:

myUser.Name = users[myUser.Id]
data, err := proto.Marshal(&myUser)
if err != nil {
    fmt.Println(err)
    return
}

And, as this shall be a request we’re handling, we respond to the Reply topic, a topic created by the caller exactly for this purpose:

if err != nil {
    fmt.Println(err)
    return
}
fmt.Println("Replying to ", m.Reply)
nc.Publish(m.Reply, data)

}

Ok, now let’s get to the second service. Our time provider service, first the same basic structure:

package main

import (
    "github.com/nats-io/nats"
    "fmt"
    "github.com/cube2222/Blog/NATS/FrontendBackend"
    "github.com/golang/protobuf/proto"
    "os"
    "sync"
    "time"
)

// We use globals because it's a small application demonstrating NATS.

var nc *nats.Conn

func main() {

    if len(os.Args) != 2 {
        fmt.Println("Wrong number of arguments. Need NATS server address.")
        return
    }
    var err error

    nc, err = nats.Connect(os.Args[1])
    if err != nil {
        fmt.Println(err)
    }

    nc.QueueSubscribe("TimeTeller", "TimeTellers", replyWithTime)
    select {} // Block forever
}

This time we’re not getting any data from the caller, so we just marshal our time into this proto structure:

syntax = "proto3";
package Transport;

message Time {
        string time = 1;
}

and send it back:

func replyWithTime(m *nats.Msg) {
    curTime := Transport.Time{time.Now().Format(time.RFC3339)}

    data, err := proto.Marshal(&curTime)
    if err != nil {
        fmt.Println(err)
        return
    }
    fmt.Println("Replying to ", m.Reply)
    nc.Publish(m.Reply, data)

}

We can now get to our frontend, which will use both those services. First the standard basic structure:

package main

import (
    "net/http"
    "github.com/gorilla/mux"
    "github.com/cube2222/Blog/NATS/FrontendBackend"
    "github.com/golang/protobuf/proto"
    "fmt"
    "github.com/nats-io/nats"
    "time"
    "os"
    "sync"
)

var nc *nats.Conn

func main() {
    if len(os.Args) != 2 {
        fmt.Println("Wrong number of arguments. Need NATS server address.")
        return
    }
    var err error

    nc, err = nats.Connect(os.Args[1])
    if err != nil {
        fmt.Println(err)
    }

    m := mux.NewRouter()
    m.HandleFunc("/{id}", handleUserWithTime)

    http.ListenAndServe(":3000", m)
}

That’s a pretty standard web server, now to the interesting bits, the handleUserWithTime function, which will respond with the user name and time:

func handleUserWithTime(w http.ResponseWriter, r *http.Request) {
    vars := mux.Vars(r)
    myUser := Transport.User{Id: vars["id"]}
    curTime := Transport.Time{}
    wg := sync.WaitGroup{}
    wg.Add(2)
}

We’ve parsed the request arguments and started a WaitGroup with the value two, as we will do one asynchronous request for each of our services. First we’ll marshal the user struct:

go func() {
    data, err := proto.Marshal(&myUser)
    if err != nil || len(myUser.Id) == 0 {
        fmt.Println(err)
        w.WriteHeader(500)
        fmt.Println("Problem with parsing the user Id.")
        return
    }

and, then we make a request. Sending the user data, and waiting at most 100 ms for the response:

fmt.Println("Problem with parsing the user Id.")
return
}

msg, err := nc.Request("UserNameById", data, 100 * time.Millisecond)

now we can check if any error happend, or the response is empty and finish this thread:

msg, err := nc.Request("UserNameById", data, 100 * time.Millisecond)
if err == nil && msg != nil {
    myUserWithName := Transport.User{}
    err := proto.Unmarshal(msg.Data, &myUserWithName)
    if err == nil {
        myUser = myUserWithName
    }
}
wg.Done()
}()

Next we’ll do the request to the Time Tellers.
We again make a request, but its body is nil, as we don’t need to pass any data:

go func() {
    msg, err := nc.Request("TimeTeller", nil, 100*time.Millisecond)
    if err == nil && msg != nil {
        receivedTime := Transport.Time{}
        err := proto.Unmarshal(msg.Data, &receivedTime)
        if err == nil {
            curTime = receivedTime
        }
    }
    wg.Done()
}()

After both requests finished (or failed) we can just respond to the user:

wg.Wait()

fmt.Fprintln(w, "Hello ", myUser.Name, " with id ", myUser.Id, ", the time is ", curTime.Time, ".")
}

Now if you actually test it, you’ll notice that if one of the provider services isn’t active, the frontend will respond anyways, putting a zero’ed value in place of the non-available resource. You could also make a template that shows an error in that place.

Ok, that was already an interesting architecture. Now we can implement…

The Master-Slave pattern

This is such a popular pattern, especially in Go, that we really should know how to implement it. The workers will do simple operations on a text file (count the usage amounts of each word in a comma-separated list).

Now you could think that the Master, should send the files to the Workers over NATS. Wrong. This would lead to a huge slowdown of NATS (at least for bigger files). That’s why the Master will send the files to a file server over a REST API, and the Workers will get it from there. We’ll also learn how to do service discovery over NATS.

First, the File Server. I won’t really go through the file handling part, as it’s a simple get/post API.I will however, go over the service discovery part.

package main

import (
    "net/http"
    "github.com/gorilla/mux"
    "os"
    "io"
    "fmt"
    "github.com/nats-io/nats"
    "github.com/cube2222/Blog/NATS/MasterWorker"
    "github.com/golang/protobuf/proto"
)

func main() {

    if len(os.Args) != 2 {
        fmt.Println("Wrong number of arguments. Need NATS server address.")
        return
    }

    m := mux.NewRouter()

    m.HandleFunc("/{name}", func(w http.ResponseWriter, r *http.Request) {
        vars := mux.Vars(r)
        file, err := os.Open("/tmp/" + vars["name"])
        defer file.Close()
        if err != nil {
            w.WriteHeader(404)
        }
        if file != nil {
            _, err := io.Copy(w, file)
            if err != nil {
                w.WriteHeader(500)
            }
        }
    }).Methods("GET")

    m.HandleFunc("/{name}", func(w http.ResponseWriter, r *http.Request) {
        vars := mux.Vars(r)
        file, err := os.Create("/tmp/" + vars["name"])
        defer file.Close()
        if err != nil {
            w.WriteHeader(500)
        }
        if file != nil {
            _, err := io.Copy(file, r.Body)
            if err != nil {
                w.WriteHeader(500)
            }
        }
    }).Methods("POST")

    RunServiceDiscoverable()

    http.ListenAndServe(":3000", m)
}

Now, what does the RunServiceDiscoverable function do? It connects to the NATS server and responds with its own http address to incoming requests.

func RunServiceDiscoverable() {
    nc, err := nats.Connect(os.Args[1])
    if err != nil {
        fmt.Println("Can't connect to NATS. Service is not discoverable.")
    }
    nc.Subscribe("Discovery.FileServer", func(m *nats.Msg) {
        serviceAddressTransport := Transport.DiscoverableServiceTransport{"http://localhost:3000"}
        data, err := proto.Marshal(&serviceAddressTransport)
        if err == nil {
            nc.Publish(m.Reply, data)
        }
    })
}

The proto file looks like this:

syntax = "proto3";
package Transport;

message DiscoverableServiceTransport {
        string Address = 1;
}

We can now go on with the Master.

The protofile for the Task structure is:

syntax = "proto3";
package Transport;

message Task {
        string uuid = 1;
        string finisheduuid = 2;
        int32 state = 3; // 0 - not started, 1 - in progress, 2 - finished
        int32 id = 4;
}

Our Master will hold a list of tasks with the respecting UUID (at the same time the name of the file), id (the position in the master Tasks slice), and a pointer which holds the position of the last not finished Task, which will get updated on new Task retrieval. It’s pretty similar to the Task storage in my Microservice Architecture series

I’m using github.com/satori/go.uuid for UUID generation.

First, as usual, the basic structure:

package main

import (
    "github.com/satori/go.uuid"
    "github.com/cube2222/Blog/NATS/MasterWorker"
    "os"
    "fmt"
    "github.com/nats-io/nats"
    "github.com/golang/protobuf/proto"
    "time"
    "bytes"
    "net/http"
    "sync"
)

var Tasks []Transport.Task
var TaskMutex sync.Mutex
var oldestFinishedTaskPointer int
var nc *nats.Conn


func main() {
    if len(os.Args) != 2 {
        fmt.Println("Wrong number of arguments. Need NATS server address.")
        return
    }

    var err error

    nc, err = nats.Connect(os.Args[1])
    if err != nil {
        fmt.Println(err)
    }

    Tasks = make([]Transport.Task, 0, 20)
    TaskMutex = sync.Mutex{}
    oldestFinishedTaskPointer = 0

    initTestTasks()

    wg := sync.WaitGroup{}

    nc.Subscribe("Work.TaskToDo", func (m *nats.Msg) {
    })

    nc.Subscribe("Work.TaskFinished", func (m *nats.Msg) {
    })

    select {} // Block forever
}

Ok, we’ve also already set up the Subscriptions

How does the initTestTasks function work? It’s interesting because it gets the file server address over NATS.

So, we want to create 20 test Tasks, so we run the loop 20 times:

func initTestTasks() {
    for i := 0; i < 20; i++ {
    }
}

We create a new Task and ask the File Server for its address:

for i := 0; i < 20; i++ {
    newTask := Transport.Task{Uuid: uuid.NewV4().String(), State: 0}
    fileServerAddressTransport := Transport.DiscoverableServiceTransport{}
    msg, err := nc.Request("Discovery.FileServer", nil, 1000 * time.Millisecond)
    if err == nil && msg != nil {
        err := proto.Unmarshal(msg.Data, &fileServerAddressTransport)
        if err != nil {
            continue
        }
    }
    if err != nil {
        continue
    }

    fileServerAddress := fileServerAddressTransport.Address
}

Next we finally make the post Request to the file server and add the Task to our Tasks list:

        fileServerAddress := fileServerAddressTransport.Address
        data := make([]byte, 0, 1024)
        buf := bytes.NewBuffer(data)
        fmt.Fprint(buf, "get,my,data,my,get,get,have")
        r, err := http.Post(fileServerAddress + "/" + newTask.Uuid, "", buf)
        if err != nil || r.StatusCode != http.StatusOK {
            continue
        }

        newTask.Id = int32(len(Tasks))
        Tasks = append(Tasks, newTask)
    }

How do we dispatch new Tasks to do? Simply like this:

nc.Subscribe("Work.TaskToDo", func (m *nats.Msg) {
    myTaskPointer, ok := getNextTask()
    if ok {
        data, err := proto.Marshal(myTaskPointer)
        if err == nil {
            nc.Publish(m.Reply, data)
        }
    }
})

How do we get the next Task? We just loop over the Task to find one that is not started. If tasks above our pointer are all finished, then we also move up the pointer. Remember the mutex as this function may be run in parallel:

func getNextTask() (*Transport.Task, bool) {
    TaskMutex.Lock()
    defer TaskMutex.Unlock()
    for i := oldestFinishedTaskPointer; i < len(Tasks); i++ {
        if i == oldestFinishedTaskPointer && Tasks[i].State == 2 {
            oldestFinishedTaskPointer++
        } else {
            if Tasks[i].State == 0 {
                Tasks[i].State = 1
                go resetTaskIfNotFinished(i)
                return &Tasks[i], true
            }
        }
    }
    return nil, false
}

We also called the resetTaskIfNotFinished function. It will reset the Task state if it’s still in progress after 2 minutes:

func resetTaskIfNotFinished(i int) {
    time.Sleep(2 * time.Minute)
    TaskMutex.Lock()
    if Tasks[i].State != 2 {
        Tasks[i].State = 0
    }
}

The TaskFinished subscription handler is much simpler, it just sets the Task to finished, and the UUID accordingly to the received protobuffer:

nc.Subscribe("Work.TaskFinished", func (m *nats.Msg) {
    myTask := Transport.Task{}
    err := proto.Unmarshal(m.Data, &myTask)
    if err == nil {
        TaskMutex.Lock()
        Tasks[myTask.Id].State = 2
        Tasks[myTask.Id].Finisheduuid = myTask.Finisheduuid
        TaskMutex.Unlock()
    }
})

That’s all in regards to the Master! We can now move on writing the Worker.

The basic structure:

package main

import (
    "os"
    "fmt"
    "github.com/nats-io/nats"
    "time"
    "github.com/cube2222/Blog/NATS/MasterWorker"
    "github.com/golang/protobuf/proto"
    "net/http"
    "bytes"
    "io/ioutil"
    "sort"
    "strings"
    "github.com/satori/go.uuid"
    "sync"
)

var nc *nats.Conn

func main() {
    if len(os.Args) != 2 {
        fmt.Println("Wrong number of arguments. Need NATS server address.")
        return
    }

    var err error

    nc, err = nats.Connect(os.Args[1])
    if err != nil {
        fmt.Println(err)
    }

    for i := 0; i < 8; i++ {
        go doWork()
    }

    select {} // Block forever
}

Now the main function doing something here is the doWork function. I’ll post it all at once with comments everywhere, as it’s a very long function and this will be the most convenient way to read it:

func doWork() {
    for {
        // We ask for a Task with a 1 second Timeout
        msg, err := nc.Request("Work.TaskToDo", nil, 1 * time.Second)
        if err != nil {
            fmt.Println("Something went wrong. Waiting 2 seconds before retrying:", err)
            continue
        }

        // We unmarshal the Task
        curTask := Transport.Task{}
        err = proto.Unmarshal(msg.Data, &curTask)
        if err != nil {
            fmt.Println("Something went wrong. Waiting 2 seconds before retrying:", err)
            continue
        }

        // We get the FileServer address
        msg, err = nc.Request("Discovery.FileServer", nil, 1000 * time.Millisecond)
        if err != nil {
            fmt.Println("Something went wrong. Waiting 2 seconds before retrying:", err)
            continue
        }

        fileServerAddressTransport := Transport.DiscoverableServiceTransport{}
        err = proto.Unmarshal(msg.Data, &fileServerAddressTransport)
        if err != nil {
            fmt.Println("Something went wrong. Waiting 2 seconds before retrying:", err)
            continue
        }

        // We get the file
        fileServerAddress := fileServerAddressTransport.Address
        r, err := http.Get(fileServerAddress + "/" + curTask.Uuid)
        if err != nil {
            fmt.Println("Something went wrong. Waiting 2 seconds before retrying:", err)
            continue
        }

        data, err := ioutil.ReadAll(r.Body)
        if err != nil {
            fmt.Println("Something went wrong. Waiting 2 seconds before retrying:", err)
            continue
        }

        // We split and count the words
        words := strings.Split(string(data), ",")
        sort.Strings(words)
        wordCounts := make(map[string]int)
        for i := 0; i < len(words); i++{
            wordCounts[words[i]] = wordCounts[words[i]] + 1
        }

        resultData := make([]byte, 0, 1024)
        buf := bytes.NewBuffer(resultData)

        // We print the results to a buffer
        for key, value := range wordCounts {
            fmt.Fprintln(buf, key, ":", value)
        }

        // We generate a new UUID for the finished file
        curTask.Finisheduuid = uuid.NewV4().String()
        r, err = http.Post(fileServerAddress + "/" + curTask.Finisheduuid, "", buf)
        if err != nil || r.StatusCode != http.StatusOK {
            fmt.Println("Something went wrong. Waiting 2 seconds before retrying:", err, ":", r.StatusCode)
            continue
        }

        // We marshal the current Task into a protobuffer
        data, err = proto.Marshal(&curTask)
        if err != nil {
            fmt.Println("Something went wrong. Waiting 2 seconds before retrying:", err)
            continue
        }

        // We notify the Master about finishing the Task
        nc.Publish("Work.TaskFinished", data)
    }
}

Awesome, our Master-Slave setup is ready, you can test it if you’d like. After you do, we can now check out the last architecture.

The Events pattern

Imagine you have servers which keep connections to clients over websockets. You want these clients to get live news updates. With this pattern you can. We’ll also learn about a few convenient NATS client abstractions. Like using a encoded connection, or using channels for sending/receiving.

The basic architecture as usual:

package main

import (
    "os"
    "fmt"
    "github.com/nats-io/nats"
    natsp "github.com/nats-io/nats/encoders/protobuf"
    "github.com/cube2222/Blog/NATS/EventSubs"
    "time"
)

func main() {
    if len(os.Args) != 2 {
        fmt.Println("Wrong number of arguments. Need NATS server address.")
        return
    }

    nc, err := nats.Connect(os.Args[1])
    if err != nil {
        fmt.Println(err)
    }
    ec, err := nats.NewEncodedConn(nc, natsp.PROTOBUF_ENCODER)
    defer ec.Close()
}

Wait… What’s that at the end!? It’s an encoded connection! It will automatically encode our structs into raw data. We’ll use the protobuf one, but there are a default one, a gob one and a json one too.

Here’s the protofile we’ll use:

syntax = "proto3";
package Transport;

message TextMessage {
        int32 id = 1;
        string body = 2;
}

Ok, how can we just publish simple event-structs? Totally intuitive, like that:

defer ec.Close()

for i := 0; i < 5; i++ {
    myMessage := Transport.TextMessage{Id: int32(i), Body: "Hello over standard!"}

    err := ec.Publish("Messaging.Text.Standard", &myMessage)
    if err != nil {
        fmt.Println(err)
    }
}

It’s a little bit counter intuitive with Requests. As the signature differs, it follows like this:

err := ec.Request(topic, *body, *response, timeout)

So our request sending part will look like this:

for i := 5; i < 10; i++ {
    myMessage := Transport.TextMessage{Id: int32(i), Body: "Hello, please respond!"}

    res := Transport.TextMessage{}
    err := ec.Request("Messaging.Text.Respond", &myMessage, &res, 200 * time.Millisecond)
    if err != nil {
        fmt.Println(err)
    }

    fmt.Println(res.Body, " with id ", res.Id)

}

The last thing we can do is sending them via Channels, which is relatively the simplest:

sendChannel := make(chan *Transport.TextMessage)
ec.BindSendChan("Messaging.Text.Channel", sendChannel)
for i := 10; i < 15; i++ {
    myMessage := Transport.TextMessage{Id: int32(i), Body: "Hello over channel!"}

    sendChannel <- &myMessage
}

Now we can write the receiving end. First the same structure as before:

package main

import (
    "github.com/nats-io/nats"
    natsp "github.com/nats-io/nats/encoders/protobuf"
    "os"
    "fmt"
    "github.com/cube2222/Blog/NATS/EventSubs"
)

func main() {
    if len(os.Args) != 2 {
        fmt.Println("Wrong number of arguments. Need NATS server address.")
        return
    }

    nc, err := nats.Connect(os.Args[1])
    if err != nil {
        fmt.Println(err)
    }
    ec, err := nats.NewEncodedConn(nc, natsp.PROTOBUF_ENCODER)
    defer ec.Close()
}

Ok, first the standard receive which is totally natural:

defer ec.Close()

ec.Subscribe("Messaging.Text.Standard", func(m *Transport.TextMessage) {
    fmt.Println("Got standard message: \"", m.Body, "\" with the Id ", m.Id, ".")
})

Now, the responding, which has a little bit changed syntax again. As the handler function is:

func (subject, reply string, m *Transport.TextMessage)

So the responding looks like this:

ec.Subscribe("Messaging.Text.Respond", func(subject, reply string, m *Transport.TextMessage) {
    fmt.Println("Got ask for response message: \"", m.Body, "\" with the Id ", m.Id, ".")

    newMessage := Transport.TextMessage{Id: m.Id, Body: "Responding!"}
    ec.Publish(reply, &newMessage)
})

And finally using channels, which doesn’t differ nearly at all in comparison to the sending side:

receiveChannel := make(chan *Transport.TextMessage)
ec.BindRecvChan("Messaging.Text.Channel", receiveChannel)

for m := range receiveChannel {
    fmt.Println("Got channel'ed message: \"", m.Body, "\" with the Id ", m.Id, ".")
}

Ok, that’s all in the topic of NATS. I hope you liked it and discovered something new! Please comment if you have any opinions, or don’t like something, or just want me to write about something.

Now go and build something great!

Practical Golang: Event multicast/subscription service

Introduction

In our microservice architectures we always need a method for communicating between services. There are various ways to achieve this. Few of them are, but are not limited to: Remote Procedure Call, REST API’s, message BUSses. In this comprehensive tutorial we’ll write a service, which you can use to distribute messages/events across your system.

Design

How will it work? It will accept registering subscribers (other microservices). Whenever it gets a message from a microservice, it will send it further to all subscribers, using a REST call to the other microservices /event URL.

Subscribers will need to call a keep-alive URL regularly, otherwise they will get removed from the subscriber list. This protects us from sending messages to too many ghost subscribers.

Implementation

Let’s start with a basic structure. We’ll define the API and set up our two main data structures:
1. The subscriber list with their register/lastKeepAlive dates.
2. The mutex controlling access to our subscriber list.

package main

import (
    "net/http"
    "time"
    "sync"
    "fmt"
    "net/url"
    "io/ioutil"
    "bytes"
)

var registeredServiceStorage map[string]time.Time
var serviceStorageMutex sync.RWMutex

func main() {
    registeredServiceStorage = make(map[string]time.Time)
    serviceStorageMutex = sync.RWMutex{}

    http.HandleFunc("/registerAndKeepAlive", registerAndKeepAlive)
    http.HandleFunc("/deregister", deregister)
    http.HandleFunc("/sendMessage", handleMessage)
    http.HandleFunc("/listSubscribers", handleSubscriberListing)

    go killZombieServices()
    http.ListenAndServe(":3000", nil)
}

func registerAndKeepAlive(w http.ResponseWriter, r *http.Request) {
}

func deregister(w http.ResponseWriter, r *http.Request) {
}

func handleMessage(w http.ResponseWriter, r *http.Request) {
}

func sendMessageToSubscriber(data []byte, address string) {
}

func handleSubscriberListing(w http.ResponseWriter, r *http.Request) {
}

func killZombieServices() {
}

We initialize our subscriber list and mutex, and also launch, on another thread, a function that will regularly delete ghost subscribers.

So far so good!
We can now start getting into each functions implementation.

We can begin with the registerAndKeepAlive which does both things. Registering a new subscriber, or updating an existing one. This works because in both cases we just update the map entry with the subscriber address to contain the current time.

func registerAndKeepAlive(w http.ResponseWriter, r *http.Request) {
    if r.Method == http.MethodPost {
        //Subscriber registration
    } else {
        w.WriteHeader(http.StatusBadRequest)
        fmt.Fprint(w, "Error: Only POST accepted")
    }
}

The register function should be called with a POST request. That’s why the first thing we do, is checking if the method is right, otherwise we answer with an error. If it’s ok, then we register the client:

if r.Method == http.MethodPost {
    values, err := url.ParseQuery(r.URL.RawQuery)
    if err != nil {
        w.WriteHeader(http.StatusBadRequest)
        fmt.Fprint(w, "Error:", err)
        return
    }
    if len(values.Get("address")) == 0 {
        w.WriteHeader(http.StatusBadRequest)
        fmt.Fprint(w, "Error:","Wrong input address.")
        return
    }

}

We check if the URL arguments are correct, and finally register the subscriber:

if len(values.Get("address")) == 0 {
    w.WriteHeader(http.StatusBadRequest)
    fmt.Fprint(w, "Error:","Wrong input address.")
    return
}

serviceStorageMutex.Lock()
registeredServiceStorage[values.Get("address")] = time.Now()
serviceStorageMutex.Unlock()

fmt.Fprint(w, "success")

Awesome!

Let’s now implement the function which shall delete the entry when the subscriber wants to deregister.

func deregister(w http.ResponseWriter, r *http.Request) {
    if r.Method == http.MethodDelete {
        values, err := url.ParseQuery(r.URL.RawQuery)
        if err != nil {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error:", err)
            return
        }
        if len(values.Get("address")) == 0 {
            w.WriteHeader(http.StatusBadRequest)
            fmt.Fprint(w, "Error:","Wrong input address.")
            return
        }

        //Subscriber deletion will come here

    } else {
        w.WriteHeader(http.StatusBadRequest)
        fmt.Fprint(w, "Error: Only DELETE accepted")
    }
}

Again do we check if the request method is good and if the address argument is correct. If that’s the case, then we can remove this client from our subscriber list.

if len(values.Get("address")) == 0 {
    w.WriteHeader(http.StatusBadRequest)
    fmt.Fprint(w, "Error:","Wrong input address.")
    return
}

serviceStorageMutex.Lock()
delete(registeredServiceStorage, values.Get("address"))
serviceStorageMutex.Unlock()

fmt.Fprint(w, "success")

Now it’s time for the main functionality. Namely handling messages and sending them to all subscribers:

func handleMessage(w http.ResponseWriter, r *http.Request) {
    if r.Method == http.MethodPost {

    } else {
        w.WriteHeader(http.StatusBadRequest)
        fmt.Fprint(w, "Error: Only POST accepted")
    }
}

As usual, we check if the request method is correct.

Then, we read the data we got, so we can pass it to multiple concurrent sending functions.

if r.Method == http.MethodPost {

    data, err := ioutil.ReadAll(r.Body)
    if err != nil {
        fmt.Println(err)
    }
    //...
}

We then lock the mutex for read. That’s important so that we can handle huge amounts of messages efficiently. Basically, it means that we allow others to read while we are reading, because concurrent reading is supported by maps. We can use this unless there’s no one modifying the map.

While we lock the map for read, we check the list of subscribers we have to send the message to, and start concurrent functions that will do the sending. As we don’t want to lock the map for the entire sending time, we only need the addresses.

    data, err := ioutil.ReadAll(r.Body)
    if err != nil {
        fmt.Println(err)
    }

    serviceStorageMutex.RLock()
    for address, _ := range registeredServiceStorage {
        go sendMessageToSubscriber(data, address)
    }
    serviceStorageMutex.RUnlock()

    fmt.Fprint(w, "success")

Which means we now have to implement the sendMessageToSubscriber(…) function.

It’s pretty simple, we just make a post, and print an error if it happened.

func sendMessageToSubscriber(data []byte, address string) {
    _, err := http.Post("http://" + address + "/event", "", bytes.NewBuffer(data))
    if err != nil {
        fmt.Println(err)
    }
}

It’s important to notice, that we have to create a buffer from the data, as the http.Post(…) function needs a reader type data structure.

We’ll also implement the function which makes it possible to list all the subscribers. Mainly for debugging purposes. There’s nothing new in it. We check if the method is alright, lock the mutex for read, and finally print the map with a correct format of the register time.

func handleSubscriberListing(w http.ResponseWriter, r *http.Request) {
    if r.Method == http.MethodGet {
        serviceStorageMutex.RLock()

        for address, registerTime := range registeredServiceStorage {
            fmt.Fprintln(w, address, " : ", registerTime.Format(time.RFC3339))
        }

        serviceStorageMutex.RUnlock()
    } else {
        w.WriteHeader(http.StatusBadRequest)
        fmt.Fprint(w, "Error: Only GET accepted")
    }
}

Now there’s only one function left. The one that will make sure no ghost services stay for too long. It will check all the services once per minute. This way we’re making it cheap on performance:

func killZombieServices() {
    t := time.Tick(1 * time.Minute)

    for range t {
    }
}

This is a nice way to launch the code every minute. We create a channel which will send us the time every minute, and range over it, ignoring the received values.

We can now get the check and remove working.

for range t {
    timeNow := time.Now()
    serviceStorageMutex.Lock()
    for address, timeKeepAlive := range registeredServiceStorage {
        if timeNow.Sub(timeKeepAlive).Minutes() > 2 {
            delete(registeredServiceStorage, address)
        }
    }
    serviceStorageMutex.Unlock()
}

We just range over the subscribers and delete those that haven’t kept their subscription alive.

To add to that, if you wanted you could first make a read-only pass over the subscribers, and immediately after that, make a write-locked deletion of the ones you found. This would allow others to read the map while you’re finding subscribers to delete.

Conclusion

That’s all! Have fun with creating an infrastructure based on such a service!