You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

139 lines
3.5 KiB

package natsrouter
import (
"context"
"log"
"runtime"
sync "sync"
"github.com/nats-io/nats.go"
"github.com/pkg/errors"
"google.golang.org/protobuf/proto"
)
//go:generate protoc --proto_path=.:$GOPATH/src --go_out=. router.proto
// Router is the router object. Strings are matched literally; pass data
// through the structs. There is no HTTP style route.
type Router map[string]Handler
// Handler encapsulates a type and HandlerFunc. The InputTypeFunc must return
// interface proto.Message; the encapsulated data will be unmarshalled to it.
type Handler struct {
Func HandlerFunc
InputTypeFunc func() proto.Message
}
// HandlerFunc receives a proto.Message and processes it. It also recieves a
// context which will be cancelled if the service closes while the request is
// in flight.
type HandlerFunc func(context context.Context, msg proto.Message)
// NewRouter creates a new router for use.
func NewRouter() Router {
return Router{}
}
// HandleFunc handles a route with a HandlerFunc. Handler routes are literal
// strings and will not match wildcards. Routes will be overwritten if this is
// called twice with the same string route.
func (r Router) HandleFunc(route string, h Handler) Router {
r[route] = h
return r
}
// Handle takes a nats subscription and reads it until the context or the
// connection is closed. Each message is delivered to a goroutine for
// processing, of which `limit` will be spawned for servicing. If `limit` is < 1,
// this value is taken from `runtime.NumCPU()`. Tuning this value is important,
// and the defaults are extremely conservative. Tuning this value to something
// like `runtime.NumCPU()*10000` would be worth trying, to set the scale of
// things.
//
// You are responsible for maintaining the subscription.
func (r Router) Handle(ctx context.Context, sub *nats.Subscription, limit int) {
if limit < 1 {
limit = runtime.NumCPU()
}
wg := &sync.WaitGroup{}
for i := limit; i >= 0; i-- {
go func(wg *sync.WaitGroup) {
wg.Add(1)
for {
msg, err := sub.NextMsgWithContext(ctx)
if err != nil {
if err != context.Canceled && err != context.DeadlineExceeded {
log.Print(err)
}
wg.Done()
return
}
m := &Message{}
if err := proto.Unmarshal(msg.Data, m); err != nil {
log.Print(errors.Wrap(err, "error unmarshalling routing envelope from NATS"))
continue
}
handler, ok := r[m.Route]
if !ok {
log.Printf("could not locate route %v", m.Route)
continue
}
input := handler.InputTypeFunc()
if err := proto.Unmarshal(m.Data, input); err != nil {
log.Print(errors.Wrapf(err, "could not unamrshal payload to protobuf type: %T", input))
continue
}
handler.Func(ctx, input)
}
}(wg)
}
wg.Wait()
}
// Publisher publishes to a route
type Publisher struct {
conn *nats.Conn
r Router
subject string
}
// GetPublisher returns a publisher
func (r Router) GetPublisher(conn *nats.Conn, subject string) *Publisher {
return &Publisher{
conn: conn,
r: r,
subject: subject,
}
}
// Publish publishes a message to the route
func (p *Publisher) Publish(route string, msg proto.Message) error {
if _, ok := p.r[route]; !ok {
return errors.Errorf("handler does not exist for route %q", route)
}
bytes, err := proto.Marshal(msg)
if err != nil {
return errors.Wrap(err, "could not marshal payload")
}
m := &Message{
Route: route,
Data: bytes,
}
marshalled, err := proto.Marshal(m)
if err != nil {
return errors.Wrap(err, "could not marshal envelope")
}
return p.conn.Publish(p.subject, marshalled)
}