Browse Source

other stuff

Signed-off-by: Erik Hollensbe <github@hollensbe.org>
master
Erik Hollensbe 2 years ago
parent
commit
043fb89c9f
  1. 19
      example/example.go
  2. 10
      router.go

19
example/example.go

@ -15,10 +15,12 @@ import (
func main() {
r := natsrouter.NewRouter()
processed := 0
r.HandleFunc("foo", natsrouter.Handler{
Func: func(ctx context.Context, msg proto.Message) {
foo := msg.(*Foo)
fmt.Println(foo.Id, foo.Message)
processed++
},
InputTypeFunc: func() proto.Message { return &Foo{} },
})
@ -37,20 +39,27 @@ func main() {
go r.Handle(ctx, sub, 120000)
i := 0
sent := 0
go func() {
p := r.GetPublisher(conn, "router")
after := time.After(5 * time.Second) // this gives the processor more time to process things instead of pushing more shit onto the queue
for {
if err := p.Publish("foo", &Foo{Id: int64(i), Message: "hello, world"}); err != nil {
select {
case <-after:
return
default:
}
if err := p.Publish("foo", &Foo{Id: int64(sent), Message: "hello, world"}); err != nil {
log.Print(err)
break
}
i++
sent++
}
}()
time.Sleep(2 * time.Second)
time.Sleep(10 * time.Second)
cancel()
fmt.Println(i)
fmt.Println("Sent:", sent, "Processed:", processed)
}

10
router.go

@ -45,7 +45,10 @@ func (r Router) HandleFunc(route string, h Handler) Router {
// Handle takes a nats subscription and reads it until the context or the
// connection is closed. Each message is delivered to a goroutine for
// processing, of which `limit` will be spawned for servicing. If `limit` is < 1,
// this value is taken from `runtime.NumCPU()`. Tuning this value is important.
// this value is taken from `runtime.NumCPU()`. Tuning this value is important,
// and the defaults are extremely conservative. Tuning this value to something
// like `runtime.NumCPU()*10000` would be worth trying, to set the scale of
// things.
//
// You are responsible for maintaining the subscription.
func (r Router) Handle(ctx context.Context, sub *nats.Subscription, limit int) {
@ -61,7 +64,10 @@ func (r Router) Handle(ctx context.Context, sub *nats.Subscription, limit int) {
for {
msg, err := sub.NextMsgWithContext(ctx)
if err != nil {
log.Print(err)
if err != context.Canceled && err != context.DeadlineExceeded {
log.Print(err)
}
wg.Done()
return
}

Loading…
Cancel
Save