forked from hswaw/hscloud
188 lines
4.2 KiB
Go
188 lines
4.2 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"flag"
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
|
|
pb "code.hackerspace.pl/hscloud/bgpwtf/cccampix/proto"
|
|
"code.hackerspace.pl/hscloud/bgpwtf/cccampix/verifier/model"
|
|
"code.hackerspace.pl/hscloud/go/mirko"
|
|
"code.hackerspace.pl/hscloud/go/pki"
|
|
"code.hackerspace.pl/hscloud/go/statusz"
|
|
"github.com/golang/glog"
|
|
"github.com/lib/pq"
|
|
"golang.org/x/net/trace"
|
|
"google.golang.org/grpc"
|
|
)
|
|
|
|
type processorState struct {
|
|
name string
|
|
p processor
|
|
lastRun *time.Time
|
|
lastErr error
|
|
}
|
|
|
|
type routerState struct {
|
|
name string
|
|
last *time.Time
|
|
version string
|
|
}
|
|
|
|
func (p *processorState) nextRun() *time.Time {
|
|
if p.lastRun == nil {
|
|
return nil
|
|
}
|
|
nr := p.p.NextRun(*p.lastRun, p.lastErr != nil)
|
|
return &nr
|
|
}
|
|
|
|
type service struct {
|
|
model model.Model
|
|
|
|
processors map[string]*processorState
|
|
processorsMu sync.RWMutex
|
|
|
|
routers map[string]*routerState
|
|
routersLastVersion string
|
|
routersMu sync.RWMutex
|
|
|
|
requiredChecks []string
|
|
|
|
pgp pb.PGPEncryptorClient
|
|
}
|
|
|
|
func (s *service) run(ctx context.Context) {
|
|
t := time.NewTicker(time.Second)
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-t.C:
|
|
break
|
|
}
|
|
|
|
s.runProcessors(ctx)
|
|
}
|
|
}
|
|
|
|
func (s *service) runProcessors(ctx context.Context) {
|
|
s.processorsMu.RLock()
|
|
defer s.processorsMu.RUnlock()
|
|
|
|
now := time.Now()
|
|
|
|
for _, p := range s.processors {
|
|
nr := p.nextRun()
|
|
if nr == nil || nr.Before(now) {
|
|
glog.Infof("Running processor %q...", p.name)
|
|
start := time.Now()
|
|
|
|
tr := trace.New(fmt.Sprintf("processor.%s", p.name), "Run")
|
|
pctx := trace.NewContext(ctx, tr)
|
|
err := p.p.RunAll(pctx, s.model)
|
|
tr.LazyPrintf("Processor done: %v", err)
|
|
tr.Finish()
|
|
|
|
if err != nil {
|
|
glog.Errorf("Running processor %q failed: %v", p.name, err)
|
|
} else {
|
|
diff := time.Since(start)
|
|
tr.LazyPrintf("Took %s", diff.String())
|
|
glog.Infof("Processor %q took %s", p.name, diff.String())
|
|
}
|
|
p.lastErr = err
|
|
p.lastRun = &now
|
|
}
|
|
}
|
|
}
|
|
|
|
var (
|
|
flagDSN string
|
|
flagIRR string
|
|
flagOctoRPKI string
|
|
flagPGPEncryptor string
|
|
flagPeeringDB string
|
|
)
|
|
|
|
func main() {
|
|
flag.StringVar(&flagDSN, "dsn", "", "PostrgreSQL connection string")
|
|
flag.StringVar(&flagIRR, "irr", "", "Address of irr service")
|
|
flag.StringVar(&flagOctoRPKI, "octorpki", "", "Address of octorpki service")
|
|
flag.StringVar(&flagPGPEncryptor, "pgpencryptor", "", "Address of pgpencryptor service")
|
|
flag.StringVar(&flagPeeringDB, "peeringdb", "", "Address of peeringdb service")
|
|
flag.Parse()
|
|
|
|
// Picking an existing postgres-like driver for sqlx.BindType to work
|
|
// See: https://github.com/jmoiron/sqlx/blob/ed7c52c43ee1e12a35efbcfea8dbae2d62a90370/bind.go#L24
|
|
mirko.TraceSQL(&pq.Driver{}, "pgx")
|
|
mi := mirko.New()
|
|
|
|
m, err := model.Connect(mi.Context(), "pgx", flagDSN)
|
|
if err != nil {
|
|
glog.Exitf("Failed to create model: %v", err)
|
|
}
|
|
|
|
err = m.MigrateUp()
|
|
if err != nil {
|
|
glog.Exitf("Failed to migrate up: %v", err)
|
|
}
|
|
|
|
if err := mi.Listen(); err != nil {
|
|
glog.Exitf("Listen failed: %v", err)
|
|
}
|
|
|
|
conn, err := grpc.Dial(flagPGPEncryptor, pki.WithClientHSPKI())
|
|
if err != nil {
|
|
glog.Exitf("could not connect to pgpencryptor service: %v", err)
|
|
}
|
|
|
|
s := &service{
|
|
model: m,
|
|
processors: make(map[string]*processorState),
|
|
requiredChecks: []string{"irr", "pgp"},
|
|
routers: make(map[string]*routerState),
|
|
pgp: pb.NewPGPEncryptorClient(conn),
|
|
}
|
|
|
|
must := func(p processor, err error) processor {
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
return p
|
|
}
|
|
s.addProcessor(must(newPeeringDB(flagPeeringDB)))
|
|
s.addProcessor(must(newIRR(flagIRR)))
|
|
s.addProcessor(must(newSecretGen()))
|
|
s.addProcessor(must(newRPKI(flagOctoRPKI)))
|
|
s.addProcessor(must(newPGP(s.pgp)))
|
|
statusz.AddStatusPart("Processors", processorsFragment, s.statuszProcessors)
|
|
|
|
go s.run(mi.Context())
|
|
|
|
pb.RegisterVerifierServer(mi.GRPC(), s)
|
|
|
|
if err := mi.Serve(); err != nil {
|
|
glog.Exitf("Serve failed: %v", err)
|
|
}
|
|
|
|
<-mi.Done()
|
|
}
|
|
|
|
func (s *service) addProcessor(p processor) {
|
|
s.processorsMu.Lock()
|
|
defer s.processorsMu.Unlock()
|
|
|
|
name := p.Name()
|
|
if _, ok := s.processors[name]; ok {
|
|
panic(fmt.Sprintf("duplicated processor %q", name))
|
|
}
|
|
s.processors[name] = &processorState{
|
|
name: name,
|
|
p: p,
|
|
lastRun: nil,
|
|
}
|
|
}
|