Browse Source

Improve the example and make some tweaks; add concurrency limits

Signed-off-by: Erik Hollensbe <github@hollensbe.org>
master
Erik Hollensbe 2 years ago
parent
commit
99551585f8
  1. 20
      example/example.go
  2. 1
      go.mod
  3. 8
      go.sum
  4. 70
      router.go

20
example/example.go

@ -3,6 +3,7 @@ package main
import (
"context"
"fmt"
"log"
"time"
"code.hollensbe.org/erikh/natsrouter"
@ -34,15 +35,22 @@ func main() {
ctx, cancel := context.WithCancel(context.Background())
go r.Handle(ctx, sub)
go r.Handle(ctx, sub, 120000)
p := r.GetPublisher(conn, "router")
for i := 0; i < 1000; i++ {
if err := p.Publish("foo", &Foo{Id: int64(i), Message: "hello, world"}); err != nil {
panic(err)
i := 0
go func() {
p := r.GetPublisher(conn, "router")
for {
if err := p.Publish("foo", &Foo{Id: int64(i), Message: "hello, world"}); err != nil {
log.Print(err)
break
}
i++
}
}
}()
time.Sleep(2 * time.Second)
cancel()
fmt.Println(i)
}

1
go.mod

@ -6,6 +6,7 @@ require (
github.com/golang/protobuf v1.4.0
github.com/nats-io/nats.go v1.9.2
github.com/pkg/errors v0.9.1
github.com/sirupsen/logrus v1.6.0
golang.org/x/crypto v0.0.0-20200429183012-4b2356b1ed79 // indirect
google.golang.org/protobuf v1.22.0
)

8
go.sum

@ -1,3 +1,4 @@
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8=
github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA=
github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs=
@ -7,6 +8,7 @@ github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvq
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/nats-io/jwt v0.3.2 h1:+RB5hMpXUUA2dfxuhBTEkMOrYmM+gKIZYS1KjSostMI=
github.com/nats-io/jwt v0.3.2/go.mod h1:/euKqTS1ZD+zzjYrY7pseZrTtWQSjujC7xjPc8wL6eU=
github.com/nats-io/nats.go v1.9.2 h1:oDeERm3NcZVrPpdR/JpGdWHMv3oJ8yY30YwxKq+DU2s=
@ -18,6 +20,10 @@ github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/sirupsen/logrus v1.6.0 h1:UBcNElsrwanuuMsnGSlYmtmgbb23qDR5dG+6X6Oo89I=
github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59 h1:3zb4D3T4G8jdExgVU/95+vQXfpEPiMdCaZgmGVxjNHM=
@ -27,6 +33,8 @@ golang.org/x/crypto v0.0.0-20200429183012-4b2356b1ed79/go.mod h1:LzIPMQfyMNhhGPh
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190422165155-953cdadca894 h1:Cz4ceDQGXuKRnVBDTS23GTn/pU5OE2C0WrNTOYK1Uuc=
golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=

70
router.go

@ -3,6 +3,8 @@ package natsrouter
import (
"context"
"log"
"runtime"
sync "sync"
"github.com/nats-io/nats.go"
"github.com/pkg/errors"
@ -42,34 +44,52 @@ func (r Router) HandleFunc(route string, h Handler) Router {
// Handle takes a nats subscription and reads it until the context or the
// connection is closed. Each message is delivered to a goroutine for
// processing.
// processing, of which `limit` will be spawned for servicing. If `limit` is < 1,
// this value is taken from `runtime.NumCPU()`. Tuning this value is important.
//
// You are responsible for maintaining the subscription.
func (r Router) Handle(ctx context.Context, sub *nats.Subscription) error {
for {
msg, err := sub.NextMsgWithContext(ctx)
if err != nil {
return err
}
m := &Message{}
if err := proto.Unmarshal(msg.Data, m); err != nil {
return errors.Wrap(err, "error unmarshalling routing envelope from NATS")
}
handler, ok := r[m.Route]
if !ok {
log.Printf("could not locate route %v", m.Route)
continue
}
input := handler.InputTypeFunc()
if err := proto.Unmarshal(m.Data, input); err != nil {
return errors.Wrapf(err, "could not unamrshal payload to protobuf type: %T", input)
}
go handler.Func(ctx, input)
func (r Router) Handle(ctx context.Context, sub *nats.Subscription, limit int) {
if limit < 1 {
limit = runtime.NumCPU()
}
wg := &sync.WaitGroup{}
for i := limit; i >= 0; i-- {
go func(wg *sync.WaitGroup) {
wg.Add(1)
for {
msg, err := sub.NextMsgWithContext(ctx)
if err != nil {
log.Print(err)
wg.Done()
return
}
m := &Message{}
if err := proto.Unmarshal(msg.Data, m); err != nil {
log.Print(errors.Wrap(err, "error unmarshalling routing envelope from NATS"))
continue
}
handler, ok := r[m.Route]
if !ok {
log.Printf("could not locate route %v", m.Route)
continue
}
input := handler.InputTypeFunc()
if err := proto.Unmarshal(m.Data, input); err != nil {
log.Print(errors.Wrapf(err, "could not unamrshal payload to protobuf type: %T", input))
continue
}
handler.Func(ctx, input)
}
}(wg)
}
wg.Wait()
}
// Publisher publishes to a route

Loading…
Cancel
Save