a mail experiment
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

387 lines
8.1 KiB

package main
import (
"crypto/tls"
"encoding/binary"
"errors"
"fmt"
"io"
"os"
"path"
"path/filepath"
"strings"
"sync"
"github.com/emersion/go-imap"
"github.com/emersion/go-imap/client"
"github.com/emersion/go-maildir"
"github.com/urfave/cli/v2"
"go.etcd.io/bbolt"
)
type mailbox struct {
mailbox *imap.MailboxInfo
uids []uint32
}
type message struct {
mailbox *imap.MailboxInfo
message *imap.Message
messageKey string
}
func main() {
app := cli.NewApp()
app.Authors = []*cli.Author{{Name: "Erik Hollensbe", Email: "github@hollensbe.org"}}
app.Flags = []cli.Flag{
&cli.UintFlag{
Name: "connections",
Usage: "Count of connections to spawn + 1 for listing mailboxes",
Aliases: []string{"c", "conn"},
Value: 5,
},
&cli.UintFlag{
Name: "batch",
Usage: "batch messages by this many",
Aliases: []string{"b"},
Value: 1000,
},
&cli.StringFlag{
Name: "username",
Usage: "Username to login with",
Aliases: []string{"u", "user"},
EnvVars: []string{"SINK_USERNAME"},
},
&cli.StringFlag{
Name: "password",
Usage: "Password to login with",
Aliases: []string{"p", "pass"},
EnvVars: []string{"SINK_PASSWORD"},
},
&cli.StringFlag{
Name: "host",
Usage: "Host to connect to: addr:port",
Aliases: []string{"t"},
EnvVars: []string{"SINK_HOST"},
Value: "imap.gmail.com:993",
},
&cli.StringFlag{
Name: "db",
Usage: "Host to connect to: addr:port",
Aliases: []string{"d"},
EnvVars: []string{"SINK_DB"},
Value: "/tmp/sink.db",
},
&cli.BoolFlag{
Name: "noverify",
Aliases: []string{"n"},
Usage: "Don't verify TLS certificates",
},
&cli.StringFlag{
Name: "namespace",
Aliases: []string{"ns", "N"},
Usage: "If non-empty, prepend this to the name of each mailbox, including INBOX",
},
}
app.Action = run
if err := app.Run(os.Args); err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}
}
func createMaildir(path, rootDir string) (maildir.Dir, error) {
dir := maildir.Dir(path)
if path != rootDir {
up := filepath.Dir(path)
rel, err := filepath.Rel(rootDir, up)
if err != nil {
return dir, err
}
if strings.Contains(rel, "../") {
return dir, fmt.Errorf("path %q is below root", path)
}
if _, err := os.Stat(up); err != nil {
if d, err := createMaildir(up, rootDir); err != nil {
return d, err
}
}
}
// FIXME rel checks, etc.
if err := os.MkdirAll(filepath.Dir(path), 0700); err != nil {
return dir, err
}
if err := dir.Init(); err != nil {
return dir, fmt.Errorf("could not init maikdir: %v", err)
}
if err := dir.Clean(); err != nil {
return dir, err
}
return dir, nil
}
func run(ctx *cli.Context) error {
if ctx.Args().Len() != 1 {
return errors.New("invalid arguments; try --help")
}
db, err := bbolt.Open(ctx.String("db"), 0600, nil)
if err != nil {
return err
}
defer db.Close()
tlsConfig := &tls.Config{InsecureSkipVerify: ctx.Bool("noverify")}
conns := ctx.Uint("connections")
batch := ctx.Uint("batch")
host := ctx.String("host")
username := ctx.String("username")
password := ctx.String("password")
rootDir := ctx.Args().First()
if os.Getenv("SINK_NO_WRITE") == "" {
if createMaildir(rootDir, rootDir); err != nil {
return err
}
}
c, err := client.DialTLS(host, tlsConfig)
if err != nil {
return fmt.Errorf("While dialing imap server: %v", err)
}
fmt.Println("IMAP connected")
if err := c.Login(username, password); err != nil {
return fmt.Errorf("Failed to login: %v", err)
}
defer c.Logout()
fmt.Println("Logged in")
mailboxes := make(chan *imap.MailboxInfo)
done := make(chan error, 1)
go func() { done <- c.List(ctx.String("namespace"), "*", mailboxes) }()
mbs := []*imap.MailboxInfo{}
for mb := range mailboxes {
mbs = append(mbs, mb)
}
if err := <-done; err != nil {
return err
}
mailinfo := make(chan mailbox, conns)
listdone := make(chan error, 1)
messages := make(chan message, conns*batch)
allDone := make(chan struct{})
wg := &sync.WaitGroup{}
wg.Add(int(conns))
go func() {
for _, mb := range mbs {
fmt.Println("Spooling", mb.Name)
selected := true
for _, attr := range mb.Attributes {
if attr == imap.NoSelectAttr {
selected = false
break
}
}
if selected {
stat, err := c.Select(mb.Name, true)
if err != nil {
listdone <- fmt.Errorf("Selecting mailbox %q: %v", mb.Name, err)
return
}
if stat.Messages > 0 {
uids, err := c.UidSearch(imap.NewSearchCriteria())
if err != nil {
listdone <- err
return
}
tx, err := db.Begin(true)
if err != nil {
listdone <- err
return
}
bucket, err := tx.CreateBucketIfNotExists([]byte(mb.Name))
if err != nil {
listdone <- err
return
}
filtered := []uint32{}
for _, uid := range uids {
byt := make([]byte, 8)
binary.BigEndian.PutUint64(byt, uint64(uid))
if value := bucket.Get(byt); value == nil {
filtered = append(filtered, uid)
}
}
if err := tx.Commit(); err != nil {
listdone <- err
return
}
for i := uint(0); i < uint(len(filtered)); i += batch {
b := i + batch
if b >= uint(len(filtered)) {
b = uint(len(filtered)) - 1
}
mailinfo <- mailbox{mailbox: mb, uids: filtered[i:b]}
}
}
}
}
close(mailinfo)
wg.Wait()
close(allDone)
}()
for i := uint(0); i < conns; i++ {
go func(wg *sync.WaitGroup) {
defer wg.Done()
c, err := client.DialTLS(ctx.String("host"), tlsConfig)
if err != nil {
listdone <- fmt.Errorf("While dialing imap server: %v", err)
return
}
if err := c.Login(ctx.String("username"), ctx.String("password")); err != nil {
listdone <- fmt.Errorf("Failed to login: %v", err)
return
}
defer c.Logout()
fmt.Println("Subconnection login")
for m := range mailinfo {
name := m.mailbox.Name
fmt.Printf("%p: selecting %q\n", c, name)
if _, err := c.Select(name, true); err != nil {
listdone <- err
return
}
for offset := uint(0); offset < uint(len(m.uids)); offset += batch {
set := &imap.SeqSet{}
for i := offset; i < offset+batch; i++ {
if uint(len(m.uids)) <= i {
break
}
set.AddNum(m.uids[i])
}
msgs := make(chan *imap.Message, batch)
done := make(chan error, 1)
section := imap.BodySectionName{Peek: true}
go func() {
done <- c.UidFetch(set, []imap.FetchItem{imap.FetchEnvelope, section.FetchItem()}, msgs)
}()
if err := <-done; err != nil {
listdone <- fmt.Errorf("%q: UidSearch response: %v: %v", name, err, set)
return
}
for msg := range msgs {
dir, err := createMaildir(path.Join(rootDir, m.mailbox.Name), rootDir)
if err != nil {
listdone <- err
}
messageKey := ""
if os.Getenv("SINK_NO_WRITE") == "" {
r := msg.GetBody(&section)
key, w, err := dir.Create(nil)
if err != nil {
listdone <- fmt.Errorf("%q: error creating entry in maildir: %v", name, err)
return
}
messageKey = key
if _, err := io.Copy(w, r); err != nil {
listdone <- fmt.Errorf("%q: failure writing mail entry: %v", name, err)
return
}
w.Close()
}
messages <- message{
mailbox: m.mailbox,
message: msg,
messageKey: messageKey,
}
}
}
}
}(wg)
}
count := 0
for {
select {
case err := <-listdone:
return err
case <-allDone:
return nil
case m := <-messages:
err := db.Update(func(tx *bbolt.Tx) error {
bucket, err := tx.CreateBucketIfNotExists([]byte(m.mailbox.Name))
if err != nil {
return err
}
byt := make([]byte, 8)
binary.BigEndian.PutUint64(byt, uint64(m.message.Uid))
return bucket.Put(byt, []byte(m.messageKey))
})
if err != nil {
return err
}
from := "No From Address"
if len(m.message.Envelope.From) > 0 {
from = m.message.Envelope.From[0].Address()
}
fmt.Printf("Message #%d: %s %s\n", count, from, m.message.Envelope.Subject)
count++
}
}
}