diff --git a/mirko/README b/mirko/README new file mode 100644 index 0000000..d2c369f --- /dev/null +++ b/mirko/README @@ -0,0 +1,56 @@ +Mirko, the HSWAW microservice helper library +============================================ + +Wanna write a Go microservice for HSWAW? Can't be arsed to copy paste code? This is the library for you! + +Usage (dev) +----------- + + package main + + import ( + "code.hackerspace.pl/q3k/mirko" + ) + + func main() { + m := mirko.New() + + // setup/checks before TCP ports are opened... + // ... + + if err := m.Listen(); err != nil { + glog.Exitf("Listen(): %v", err) + } + + // register your gRPC and http handlers... + // (relfection and basic debug http is automatically registered) + // pb.RegisterFooServer(m.GRPC(), s) + // m.HTTPMux().HandleFunc("/debug/foo", fooHandler) + + if err := m.Serve(); err != nil { + glog.Exitf("Serve(): %v", err) + } + + // start any other background processing... + // (you can use m.Context() to get a context that will get + // canceled when the service is about to shut down) + + <-m.Done() + } + +Usage (running) +--------------- + +The following flags are automatically registered: + + - `-listen_address` (default: `127.0.0.1:4200`): where to listen for gRPC requests + - `-debug_address` (default: `127.0.0.1:4201`): where to listen for debug HTTP requests + - `-debug_allow_all` (default: false): whether to allow all IP address (vs. localhost) to connect to debug endpoint + +Since this library also includes [hspki](https://code.hackerspace.pl/q3k/hspki), you also get all the typical `-hspki_{...}` flags included. + +The following debug HTTP handlers are installed: + + - `/debug/status`: show the [statusz](https://github.com/q3k/statusz) page + - `/debug/requests`: show the [net/trace](https://godoc.org/golang.org/x/net/trace) page (including gRPC traces) + diff --git a/mirko/mirko.go b/mirko/mirko.go new file mode 100644 index 0000000..6ce91fa --- /dev/null +++ b/mirko/mirko.go @@ -0,0 +1,206 @@ +package mirko + +import ( + "context" + "flag" + "fmt" + "net" + "net/http" + "os" + "os/signal" + "time" + + "code.hackerspace.pl/q3k/hspki" + "github.com/golang/glog" + "github.com/q3k/statusz" + "golang.org/x/net/trace" + "google.golang.org/grpc" + "google.golang.org/grpc/reflection" +) + +var ( + flagListenAddress string + flagDebugAddress string + flagDebugAllowAll bool +) + +func init() { + flag.StringVar(&flagListenAddress, "listen_address", "127.0.0.1:4200", "gRPC listen address") + flag.StringVar(&flagDebugAddress, "debug_address", "127.0.0.1:4201", "HTTP debug/status listen address") + flag.BoolVar(&flagDebugAllowAll, "debug_allow_all", false, "HTTP debug/status available to everyone") + flag.Set("logtostderr", "true") +} + +type Mirko struct { + grpcListen net.Listener + grpcServer *grpc.Server + httpListen net.Listener + httpServer *http.Server + httpMux *http.ServeMux + + ctx context.Context + cancel context.CancelFunc + waiters []chan bool +} + +func New() *Mirko { + ctx, cancel := context.WithCancel(context.Background()) + return &Mirko{ + ctx: ctx, + cancel: cancel, + waiters: []chan bool{}, + } +} + +func authRequest(req *http.Request) (any, sensitive bool) { + host, _, err := net.SplitHostPort(req.RemoteAddr) + if err != nil { + host = req.RemoteAddr + } + + if flagDebugAllowAll { + return true, true + } + + switch host { + case "localhost", "127.0.0.1", "::1": + return true, true + default: + return false, false + } +} + +func (m *Mirko) Listen() error { + grpc.EnableTracing = true + trace.AuthRequest = authRequest + + grpcLis, err := net.Listen("tcp", flagListenAddress) + if err != nil { + return fmt.Errorf("net.Listen: %v", err) + } + m.grpcListen = grpcLis + m.grpcServer = grpc.NewServer(hspki.WithServerHSPKI()...) + reflection.Register(m.grpcServer) + + httpLis, err := net.Listen("tcp", flagDebugAddress) + if err != nil { + return fmt.Errorf("net.Listen: %v", err) + } + + m.httpMux = http.NewServeMux() + // Canonical URLs + m.httpMux.HandleFunc("/debug/status", func(w http.ResponseWriter, r *http.Request) { + any, _ := authRequest(r) + if !any { + http.Error(w, "not allowed", http.StatusUnauthorized) + return + } + statusz.StatusHandler(w, r) + }) + m.httpMux.HandleFunc("/debug/requests", trace.Traces) + + // -z legacy URLs + m.httpMux.HandleFunc("/statusz", func(w http.ResponseWriter, r *http.Request) { + http.Redirect(w, r, "/debug/status", http.StatusSeeOther) + }) + m.httpMux.HandleFunc("/rpcz", func(w http.ResponseWriter, r *http.Request) { + http.Redirect(w, r, "/debug/requests", http.StatusSeeOther) + }) + m.httpMux.HandleFunc("/requestz", func(w http.ResponseWriter, r *http.Request) { + http.Redirect(w, r, "/debug/requests", http.StatusSeeOther) + }) + + // root redirect + m.httpMux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + http.Redirect(w, r, "/debug/status", http.StatusSeeOther) + }) + + m.httpListen = httpLis + m.httpServer = &http.Server{ + Addr: flagDebugAddress, + Handler: m.httpMux, + } + + return nil +} + +// Trace logs debug information to either a context trace (if present) +// or stderr (if not) +func Trace(ctx context.Context, f string, args ...interface{}) { + tr, ok := trace.FromContext(ctx) + if !ok { + fmtd := fmt.Sprintf(f, args...) + glog.Warningf("No trace in %v: %s", ctx, fmtd) + return + } + tr.LazyPrintf(f, args...) +} + +// GRPC returns the microservice's grpc.Server object +func (m *Mirko) GRPC() *grpc.Server { + if m.grpcServer == nil { + panic("GRPC() called before Listen()") + } + return m.grpcServer +} + +// HTTPMux returns the microservice's debug HTTP mux +func (m *Mirko) HTTPMux() *http.ServeMux { + if m.httpMux == nil { + panic("HTTPMux() called before Listen()") + } + return m.httpMux +} + +// Context returns a background microservice context that will be canceled +// when the service is shut down +func (m *Mirko) Context() context.Context { + return m.ctx +} + +// Done() returns a channel that will emit a value when the service is +// shut down. This should be used in the main() function instead of a select{} +// call, to allow the background context to be canceled fully. +func (m *Mirko) Done() chan bool { + c := make(chan bool, 1) + m.waiters = append(m.waiters, c) + return c +} + +// Serve starts serving HTTP and gRPC requests +func (m *Mirko) Serve() error { + errs := make(chan error, 1) + go func() { + if err := m.grpcServer.Serve(m.grpcListen); err != nil { + errs <- err + } + }() + go func() { + if err := m.httpServer.Serve(m.httpListen); err != nil { + errs <- err + } + }() + + signalCh := make(chan os.Signal, 1) + signal.Notify(signalCh, os.Interrupt) + go func() { + select { + case <-signalCh: + m.cancel() + time.Sleep(time.Second) + for _, w := range m.waiters { + w <- true + } + } + }() + + ticker := time.NewTicker(1 * time.Second) + select { + case <-ticker.C: + glog.Infof("gRPC listening on %s", flagListenAddress) + glog.Infof("HTTP listening on %s", flagDebugAddress) + return nil + case err := <-errs: + return err + } +}