package mirko import ( "context" "flag" "fmt" "net" "net/http" "net/http/pprof" "os" "os/signal" "sort" "strings" "time" "github.com/golang/glog" "golang.org/x/net/trace" "google.golang.org/grpc" "google.golang.org/grpc/reflection" "k8s.io/client-go/kubernetes" "code.hackerspace.pl/hscloud/go/pki" "code.hackerspace.pl/hscloud/go/statusz" ) var ( flagListenAddress string flagDebugAddress string flagDebugAllowAll bool ) func init() { flag.StringVar(&flagListenAddress, "listen_address", "127.0.0.1:4200", "gRPC listen address") flag.StringVar(&flagDebugAddress, "debug_address", "127.0.0.1:4201", "HTTP debug/status listen address") flag.BoolVar(&flagDebugAllowAll, "debug_allow_all", false, "HTTP debug/status available to everyone") flag.Set("logtostderr", "true") } type Mirko struct { grpcListen net.Listener grpcServer *grpc.Server httpListen net.Listener httpServer *http.Server httpMux *http.ServeMux kubernetesCS *kubernetes.Clientset ctx context.Context cancel context.CancelFunc } func New() *Mirko { ctx, cancel := context.WithCancel(context.Background()) return &Mirko{ ctx: ctx, cancel: cancel, } } func authRequest(req *http.Request) (any, sensitive bool) { host, _, err := net.SplitHostPort(req.RemoteAddr) if err != nil { host = req.RemoteAddr } if flagDebugAllowAll { return true, true } switch host { case "localhost", "127.0.0.1", "::1": return true, true default: return false, false } } func (m *Mirko) Listen() error { grpc.EnableTracing = true trace.AuthRequest = authRequest grpcLis, err := net.Listen("tcp", flagListenAddress) if err != nil { return fmt.Errorf("net.Listen: %v", err) } m.grpcListen = grpcLis m.grpcServer = grpc.NewServer(pki.WithServerHSPKI()...) reflection.Register(m.grpcServer) httpLis, err := net.Listen("tcp", flagDebugAddress) if err != nil { return fmt.Errorf("net.Listen: %v", err) } m.httpMux = http.NewServeMux() // Canonical URLs m.httpMux.HandleFunc("/debug/status", func(w http.ResponseWriter, r *http.Request) { any, _ := authRequest(r) if !any { http.Error(w, "not allowed", http.StatusUnauthorized) return } statusz.StatusHandler(w, r) }) m.httpMux.HandleFunc("/debug/requests", trace.Traces) // pprof endpoints m.httpMux.HandleFunc("/debug/pprof/", pprof.Index) m.httpMux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline) m.httpMux.HandleFunc("/debug/pprof/profile", pprof.Profile) m.httpMux.HandleFunc("/debug/pprof/symbol", pprof.Symbol) m.httpMux.HandleFunc("/debug/pprof/trace", pprof.Trace) // -z legacy URLs m.httpMux.HandleFunc("/statusz", func(w http.ResponseWriter, r *http.Request) { http.Redirect(w, r, "/debug/status", http.StatusSeeOther) }) m.httpMux.HandleFunc("/rpcz", func(w http.ResponseWriter, r *http.Request) { http.Redirect(w, r, "/debug/requests", http.StatusSeeOther) }) m.httpMux.HandleFunc("/requestz", func(w http.ResponseWriter, r *http.Request) { http.Redirect(w, r, "/debug/requests", http.StatusSeeOther) }) m.httpMux.HandleFunc("/profilez", func(w http.ResponseWriter, r *http.Request) { http.Redirect(w, r, "/debug/pprof", http.StatusSeeOther) }) // root redirect m.httpMux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { http.Redirect(w, r, "/debug/status", http.StatusSeeOther) }) m.kubernetesConnect() debugParts := strings.Split(flagDebugAddress, ":") debugPort := debugParts[len(debugParts)-1] statusz.PublicAddress = fmt.Sprintf("http://%s:%s/", m.Address().String(), debugPort) m.httpListen = httpLis m.httpServer = &http.Server{ Addr: flagDebugAddress, Handler: m.httpMux, } return nil } // Trace logs debug information to either a context trace (if present) // or stderr (if not) func Trace(ctx context.Context, f string, args ...interface{}) { tr, ok := trace.FromContext(ctx) if !ok { fmtd := fmt.Sprintf(f, args...) glog.Warningf("No trace in %v: %s", ctx, fmtd) return } tr.LazyPrintf(f, args...) } // GRPC returns the microservice's grpc.Server object func (m *Mirko) GRPC() *grpc.Server { if m.grpcServer == nil { panic("GRPC() called before Listen()") } return m.grpcServer } // HTTPMux returns the microservice's debug HTTP mux func (m *Mirko) HTTPMux() *http.ServeMux { if m.httpMux == nil { panic("HTTPMux() called before Listen()") } return m.httpMux } // Context returns a background microservice context that will be canceled // when the service is shut down func (m *Mirko) Context() context.Context { return m.ctx } // Done() returns a channel that will emit a value when the service is // shut down. This should be used in the main() function instead of a select{} // call, to allow the background context to be canceled fully. func (m *Mirko) Done() <-chan struct{} { return m.Context().Done() } // Serve starts serving HTTP and gRPC requests func (m *Mirko) Serve() error { errs := make(chan error, 1) go func() { if err := m.grpcServer.Serve(m.grpcListen); err != nil { errs <- err } }() go func() { if err := m.httpServer.Serve(m.httpListen); err != nil { errs <- err } }() signalCh := make(chan os.Signal, 1) signal.Notify(signalCh, os.Interrupt) go func() { select { case <-signalCh: m.cancel() } }() ticker := time.NewTicker(1 * time.Second) select { case <-ticker.C: glog.Infof("gRPC listening on %s", flagListenAddress) glog.Infof("HTTP listening on %s", flagDebugAddress) return nil case err := <-errs: return err } } // Address returns a linkable address where this service is running, sans port. // If running within kubernetes, this will return the pod IP. // Otherwise, this will guess the main, 'external' IP address of the machine it's running on. // On failures, returns loopback address. func (m *Mirko) Address() net.IP { // If we're not running in Kubernetes and binding to 127.0.0.1, return loopback. if m.kubernetesCS == nil && strings.HasPrefix(flagListenAddress, "127.0.0.1:") { return net.ParseIP("127.0.0.1") } ifaces, err := net.Interfaces() if err != nil { glog.Errorf("net.Interface(): %v", err) return net.ParseIP("127.0.0.1") } addrmap := make(map[string]net.IP) for _, iface := range ifaces { addrs, err := iface.Addrs() if err != nil { glog.Errorf("iface(%q).Addrs(): %v", iface.Name, err) continue } for _, addr := range addrs { var ip net.IP switch v := addr.(type) { case *net.IPNet: ip = v.IP case *net.IPAddr: ip = v.IP default: continue } if strings.HasPrefix(ip.String(), "fe80:") { continue } addrmap[iface.Name] = ip } } if m.kubernetesCS != nil { addr, ok := addrmap["eth0"] if !ok { glog.Errorf("Running on Kubernetes but no eth0! Available interfaces: %v", addrmap) return net.ParseIP("127.0.0.1") } return addr } if len(addrmap) == 0 { glog.Errorf("No interfaces found!") return net.ParseIP("127.0.0.1") } // Heuristics ahoy! prioritized := []*ifaceWithPriority{} for iface, addr := range addrmap { prio := &ifaceWithPriority{ iface: iface, addr: addr, } switch { case strings.HasPrefix(iface, "lo"): prio.priority = -10 case strings.HasPrefix(iface, "tap"): prio.priority = -5 case strings.HasPrefix(iface, "tun"): prio.priority = -5 case strings.HasPrefix(iface, "veth"): prio.priority = 5 case strings.HasPrefix(iface, "wl"): prio.priority = 5 case strings.HasPrefix(iface, "enp"): prio.priority = 10 case strings.HasPrefix(iface, "eth"): prio.priority = 10 } prioritized = append(prioritized, prio) } sort.Slice(prioritized, func(i, j int) bool { return prioritized[i].priority > prioritized[j].priority }) return prioritized[0].addr } type ifaceWithPriority struct { iface string addr net.IP priority int }