You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

65 lines
1.3 KiB

package main
import (
"context"
"fmt"
"log"
"time"
"code.hollensbe.org/erikh/natsrouter"
"github.com/nats-io/nats.go"
"google.golang.org/protobuf/proto"
)
//go:generate protoc --proto_path=.:$GOPATH/src --go_out=. messages.proto
func main() {
r := natsrouter.NewRouter()
processed := 0
r.HandleFunc("foo", natsrouter.Handler{
Func: func(ctx context.Context, msg proto.Message) {
foo := msg.(*Foo)
fmt.Println(foo.Id, foo.Message)
processed++
},
InputTypeFunc: func() proto.Message { return &Foo{} },
})
conn, err := nats.Connect("localhost:4222")
if err != nil {
panic(err)
}
sub, err := conn.SubscribeSync("router")
if err != nil {
panic(err)
}
ctx, cancel := context.WithCancel(context.Background())
go r.Handle(ctx, sub, 120000)
sent := 0
go func() {
p := r.GetPublisher(conn, "router")
after := time.After(5 * time.Second) // this gives the processor more time to process things instead of pushing more shit onto the queue
for {
select {
case <-after:
return
default:
}
if err := p.Publish("foo", &Foo{Id: int64(sent), Message: "hello, world"}); err != nil {
log.Print(err)
break
}
sent++
}
}()
time.Sleep(10 * time.Second)
cancel()
fmt.Println("Sent:", sent, "Processed:", processed)
}