Browse Source

refactors + initial stab at sync db

Signed-off-by: Erik Hollensbe <github@hollensbe.org>
main
Erik Hollensbe 6 months ago
parent
commit
3a0ffeff01
  1. 1
      go.mod
  2. 3
      go.sum
  3. 83
      sink.go

1
go.mod

@ -6,5 +6,6 @@ require (
github.com/emersion/go-imap v1.0.6
github.com/emersion/go-maildir v0.2.0
github.com/urfave/cli/v2 v2.3.0
go.etcd.io/bbolt v1.3.5
golang.org/x/text v0.3.5-0.20201125200606-c27b9fd57aec // indirect
)

3
go.sum

@ -22,6 +22,9 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/urfave/cli/v2 v2.3.0 h1:qph92Y649prgesehzOrQjdWyxFOp/QVM+6imKHad91M=
github.com/urfave/cli/v2 v2.3.0/go.mod h1:LJmUH05zAU44vOAcrfzZQKsZbVcdbOG8rtL3/XcUArI=
go.etcd.io/bbolt v1.3.5 h1:XAzx9gjCb0Rxj7EoqcClPD1d5ZBxZJk0jbuoPHenBt0=
go.etcd.io/bbolt v1.3.5/go.mod h1:G5EMThwa9y8QZGBClrRx5EY+Yw9kAhnjy3bSjsnlVTQ=
golang.org/x/sys v0.0.0-20200202164722-d101bd2416d5/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/text v0.3.5-0.20201125200606-c27b9fd57aec h1:A1qYjneJuzBZZ2gIB8rd6zrfq6l7SoEMJ8EsSilNK/U=

83
sink.go

@ -2,6 +2,7 @@ package main
import (
"crypto/tls"
"encoding/binary"
"errors"
"fmt"
"io"
@ -12,6 +13,7 @@ import (
"github.com/emersion/go-imap/client"
"github.com/emersion/go-maildir"
"github.com/urfave/cli/v2"
"go.etcd.io/bbolt"
)
type mailbox struct {
@ -19,6 +21,11 @@ type mailbox struct {
uids []uint32
}
type message struct {
mailbox *imap.MailboxInfo
message *imap.Message
}
func main() {
app := cli.NewApp()
app.Authors = []*cli.Author{{Name: "Erik Hollensbe", Email: "github@hollensbe.org"}}
@ -55,6 +62,13 @@ func main() {
EnvVars: []string{"SINK_HOST"},
Value: "imap.gmail.com:993",
},
&cli.StringFlag{
Name: "db",
Usage: "Host to connect to: addr:port",
Aliases: []string{"d"},
EnvVars: []string{"SINK_DB"},
Value: "/tmp/sink.db",
},
&cli.BoolFlag{
Name: "noverify",
Aliases: []string{"n"},
@ -75,6 +89,12 @@ func run(ctx *cli.Context) error {
return errors.New("invalid arguments; try --help")
}
db, err := bbolt.Open(ctx.String("db"), 0600, nil)
if err != nil {
return err
}
defer db.Close()
tlsConfig := &tls.Config{InsecureSkipVerify: ctx.Bool("noverify")}
conns := ctx.Uint("connections")
@ -126,7 +146,7 @@ func run(ctx *cli.Context) error {
mailinfo := make(chan mailbox, conns)
listdone := make(chan error, 1)
messages := make(chan *imap.Message, conns*batch)
messages := make(chan message, conns*batch)
allDone := make(chan struct{})
wg := &sync.WaitGroup{}
@ -158,12 +178,39 @@ func run(ctx *cli.Context) error {
return
}
for i := uint(0); i < uint(len(uids)); i += batch {
tx, err := db.Begin(true)
if err != nil {
listdone <- err
return
}
bucket, err := tx.CreateBucketIfNotExists([]byte(mb.Name))
if err != nil {
listdone <- err
return
}
filtered := []uint32{}
for _, uid := range uids {
byt := make([]byte, 8)
binary.BigEndian.PutUint64(byt, uint64(uid))
if value := bucket.Get(byt); value == nil {
filtered = append(filtered, uid)
}
}
if err := tx.Commit(); err != nil {
listdone <- err
return
}
for i := uint(0); i < uint(len(filtered)); i += batch {
b := i + batch
if b >= uint(len(uids)) {
b = uint(len(uids)) - 1
if b >= uint(len(filtered)) {
b = uint(len(filtered)) - 1
}
mailinfo <- mailbox{mailbox: mb, uids: uids[i:b]}
mailinfo <- mailbox{mailbox: mb, uids: filtered[i:b]}
}
}
}
@ -246,7 +293,10 @@ func run(ctx *cli.Context) error {
}(msg, &section)
}
messages <- msg
messages <- message{
mailbox: m.mailbox,
message: msg,
}
}
}
}
@ -262,13 +312,28 @@ func run(ctx *cli.Context) error {
case <-allDone:
return nil
case m := <-messages:
err := db.Update(func(tx *bbolt.Tx) error {
bucket, err := tx.CreateBucketIfNotExists([]byte(m.mailbox.Name))
if err != nil {
return err
}
byt := make([]byte, 8)
binary.BigEndian.PutUint64(byt, uint64(m.message.Uid))
return bucket.Put(byt, byt)
})
if err != nil {
return err
}
from := "No From Address"
if len(m.Envelope.From) > 0 {
from = m.Envelope.From[0].Address()
if len(m.message.Envelope.From) > 0 {
from = m.message.Envelope.From[0].Address()
}
fmt.Printf("Message #%d: %s %s\n", count, from, m.Envelope.Subject)
fmt.Printf("Message #%d: %s %s\n", count, from, m.message.Envelope.Subject)
count++
}
}

Loading…
Cancel
Save