Golang patterns
Processor
state/state.go
package state
import (
"context"
"errors"
)
// executing state mangement
type op string
const (
Add op = "add"
Subtract = "sub"
Multiply = "mult"
Divide = "div"
)
type WorkRequest struct {
Operation op
Value1 int64
Value2 int64
}
type WorkResponse struct {
Wr *WorkRequest
Result int64
Err error
}
func Processor(ctx context.Context, in chan *WorkRequest, out chan *WorkResponse) {
for {
select {
case <-ctx.Done():
return
case wr := <-in:
out <- Process(wr)
}
}
}
func Process(wr *WorkRequest) *WorkResponse {
resp := WorkResponse{Wr: wr}
switch wr.Operation {
case Add:
resp.Result = wr.Value1 + wr.Value2
case Subtract:
resp.Result = wr.Value1 + wr.Value2
case Divide:
if wr.Value2 == 0 {
resp.Err = errors.New("Divide by 0")
break
}
resp.Result = wr.Value1 / wr.Value2
default:
resp.Err = errors.New("unsupported operation")
}
return &resp
}
main.go
package main
import (
"anhthii/go-examples/woker-pool/state"
"context"
"fmt"
)
func main() {
in := make(chan *state.WorkRequest, 10)
out := make(chan *state.WorkResponse, 10)
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
go state.Processor(ctx, in, out)
req := state.WorkRequest{state.Add, 3, 4}
in <- &req
req2 := state.WorkRequest{state.Subtract, 5, 2}
in <- &req2
req3 := state.WorkRequest{state.Multiply, 9, 9}
in <- &req3
req4 := state.WorkRequest{state.Divide, 8, 2}
in <- &req4
req5 := state.WorkRequest{state.Divide, 8, 0}
in <- &req5
for i := 0; i < 5; i++ {
resp := <-out
fmt.Printf("Request: %+v, Result: %v, Error: %v\n", resp.Wr, resp.Result, resp.Err)
}
}
Handler patterns
type MyMessageHandler struct {}
func (h *MyMessageHandler) HandleMessage(m *nsq.Message) error {
m.DisableAutoResponse()
delegateChannel <- m
return nil
}
go func() {
for m := range delegateChannel {
err := doSomeWork(m) // some long winded tasks
if err != nil {
m.Requeue(-1)
continue
}
m.Finish()
}
}()
cfg := nsq.NewConfig()
//Maximum number of messages to allow in flight (concurrency knob)
cfg.MaxInFlight = 1000
topicName := "my_topic"
channelName := "my_chan"
cons, err := nsq.NewConsumer(topicName, channelName, cfg)
if err != nil {
log.Fatalf(err.Error())
}
// the method below is an alternative to AddHandler to enable concurrent processing
// the second argument is the number of goroutines to spawn for processing
cons.AddConcurrentHandlers(&MyMessageHandler{}, 20)