Message integration patterns

Request-reply pattern

Service A wants some work from Service B. How does Service B respond, considering that Service B might be handling requests for a lot of other services.

The solution is to have the requestor mention the response topic(in this case a channel) to the responder.

type Request struct {
someArg string
replyTo chan<- Response
}
type Response struct {
reply string
}
func responder(c <-chan Request) {
for request := range c {
var resp Response
resp.reply = "reply-to-" + request.someArg
request.replyTo <- resp
}
}
func requestor(c chan<- Request) {
myChannel := make(chan Response)
for i := 0; i < 5; i++ {
c <- Request{fmt.Sprintf("message%d", i), myChannel}
resp := <-myChannel
fmt.Printf("request %d, response %s\n", i, resp.reply)
}
// cleanup after my work is done
close(myChannel)
}
func main() {
requestChannel := make(chan Request)
go responder(requestChannel)
go requestor(requestChannel)
time.Sleep(time.Second * 10)
fmt.Println("vim-go")
}

The correlation identifier pattern

In a microservice architecture, a message may flow through multiple services, it is important that each message has a unique identifier to enable correlation and debugging in the service’s code.

We can use twitter sonyflake to generate id: 20f8707d6000108

Pipes and filters pattern

A single event triggers a quick flow(or a sequence of processing step)

func emitter(till int) <-chan int {
out := make(chan int)
go func() {
for i := 0; i < till; i++ {
out <- i
}
close(out)
}()
}
func xSquare(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for x := range in {
out <- x * x
}
close(out)
}()
return out
}
func addC(in <-chan int, c int) <-chan int {
out := make(chan int)
go func() {
for x := range in {
out <- x + c
}
close(out) // close forward
}()
return out
}
func main() {
// y = x*x + c
out := addC(
xSquare(emitter(3)),
5)
for y := range out {
fmt.Println(y)
}
}

his will print the following:

5
6
21

The content-based router pattern

Message’s destination is not always fixed but actually depends on the context in the message

The content-based router pattern examines the message content and routes the message based on the data/metadata contained in the message

Govaluate ( https://github.com/Knetic/govaluate ) is a good rules-based evaluation framework that can be used to this end:

expression, err := govaluate.NewEvaluableExpression("(requests_made * requests_succeeded / 100) >= 90");
parameters := make(map[string]interface{}, 8)
parameters["requests_made"] = 100;
parameters["requests_succeeded"] = 80;
result, err := expression.Evaluate(parameters);
// result is now set to "false", the bool value.

The fan-in pattern

There is a requirement to source messages from multiple sources and do some processing. We merge the messages into one fanIn channel

package main
import (
"fmt"
"math/rand"
"time"
)
func fanIn(input1, input2 <-chan string) <-chan string {
c := make(chan string)
go func() {
for {
c <- <-input1
}
}()
go func() {
for {
c <- <-input2 // Write the message to the FanIn channel, Blocking Call.
}
}()
return c
}
func emitter(name string) <-chan string {
c := make(chan string)
go func() {
for i := 0; ; i++ {
c <- fmt.Sprintf("[%s] says %d", name, i)
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond) // Sleep for some time
}
}()
return c
}
func fanInSelect(input1, input2 <-chan string) <-chan string {
out := make(chan string)
go func() {
for {
select {
case in := <-input1:
out <- in
case in := <-input2:
out <- in
}
}
}()
return out
}
func main() {
c := fanIn(emitter("Source1"), emitter("Source2"))
for i := 0; i < 10; i++ {
fmt.Println(<-c)
}
}
Last updated on