You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

139 lines
3.5 KiB

package natsrouter
import (
sync "sync"
//go:generate protoc --proto_path=.:$GOPATH/src --go_out=. router.proto
// Router is the router object. Strings are matched literally; pass data
// through the structs. There is no HTTP style route.
type Router map[string]Handler
// Handler encapsulates a type and HandlerFunc. The InputTypeFunc must return
// interface proto.Message; the encapsulated data will be unmarshalled to it.
type Handler struct {
Func HandlerFunc
InputTypeFunc func() proto.Message
// HandlerFunc receives a proto.Message and processes it. It also recieves a
// context which will be cancelled if the service closes while the request is
// in flight.
type HandlerFunc func(context context.Context, msg proto.Message)
// NewRouter creates a new router for use.
func NewRouter() Router {
return Router{}
// HandleFunc handles a route with a HandlerFunc. Handler routes are literal
// strings and will not match wildcards. Routes will be overwritten if this is
// called twice with the same string route.
func (r Router) HandleFunc(route string, h Handler) Router {
r[route] = h
return r
// Handle takes a nats subscription and reads it until the context or the
// connection is closed. Each message is delivered to a goroutine for
// processing, of which `limit` will be spawned for servicing. If `limit` is < 1,
// this value is taken from `runtime.NumCPU()`. Tuning this value is important,
// and the defaults are extremely conservative. Tuning this value to something
// like `runtime.NumCPU()*10000` would be worth trying, to set the scale of
// things.
// You are responsible for maintaining the subscription.
func (r Router) Handle(ctx context.Context, sub *nats.Subscription, limit int) {
if limit < 1 {
limit = runtime.NumCPU()
wg := &sync.WaitGroup{}
for i := limit; i >= 0; i-- {
go func(wg *sync.WaitGroup) {
for {
msg, err := sub.NextMsgWithContext(ctx)
if err != nil {
if err != context.Canceled && err != context.DeadlineExceeded {
m := &Message{}
if err := proto.Unmarshal(msg.Data, m); err != nil {
log.Print(errors.Wrap(err, "error unmarshalling routing envelope from NATS"))
handler, ok := r[m.Route]
if !ok {
log.Printf("could not locate route %v", m.Route)
input := handler.InputTypeFunc()
if err := proto.Unmarshal(m.Data, input); err != nil {
log.Print(errors.Wrapf(err, "could not unamrshal payload to protobuf type: %T", input))
handler.Func(ctx, input)
// Publisher publishes to a route
type Publisher struct {
conn *nats.Conn
r Router
subject string
// GetPublisher returns a publisher
func (r Router) GetPublisher(conn *nats.Conn, subject string) *Publisher {
return &Publisher{
conn: conn,
r: r,
subject: subject,
// Publish publishes a message to the route
func (p *Publisher) Publish(route string, msg proto.Message) error {
if _, ok := p.r[route]; !ok {
return errors.Errorf("handler does not exist for route %q", route)
bytes, err := proto.Marshal(msg)
if err != nil {
return errors.Wrap(err, "could not marshal payload")
m := &Message{
Route: route,
Data: bytes,
marshalled, err := proto.Marshal(m)
if err != nil {
return errors.Wrap(err, "could not marshal envelope")
return p.conn.Publish(p.subject, marshalled)