forked from hswaw/hscloud
Serge Bazanski
ba28a04c65
We want to access the clientset (or at least check the fact that we're in a cluster) outside of the Mirko object lifecycle. In reality, this should _probably_ be moved outside of the Mirko library and get a better API than this (ie. one that returns complete information about the state of being in production/dev/...). Change-Id: I86444477e0306a39a1611207855127a7b963603e
316 lines
7.6 KiB
Go
316 lines
7.6 KiB
Go
package mirko
|
|
|
|
import (
|
|
"context"
|
|
"flag"
|
|
"fmt"
|
|
"net"
|
|
"net/http"
|
|
"net/http/pprof"
|
|
"os"
|
|
"os/signal"
|
|
"sort"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/golang/glog"
|
|
"golang.org/x/net/trace"
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/reflection"
|
|
"k8s.io/client-go/kubernetes"
|
|
|
|
"code.hackerspace.pl/hscloud/go/pki"
|
|
"code.hackerspace.pl/hscloud/go/statusz"
|
|
)
|
|
|
|
var (
|
|
flagListenAddress string
|
|
flagDebugAddress string
|
|
flagDebugAllowAll bool
|
|
)
|
|
|
|
func init() {
|
|
flag.StringVar(&flagListenAddress, "listen_address", "127.0.0.1:4200", "gRPC listen address")
|
|
flag.StringVar(&flagDebugAddress, "debug_address", "127.0.0.1:4201", "HTTP debug/status listen address")
|
|
flag.BoolVar(&flagDebugAllowAll, "debug_allow_all", false, "HTTP debug/status available to everyone")
|
|
flag.Set("logtostderr", "true")
|
|
}
|
|
|
|
type Mirko struct {
|
|
grpcListen net.Listener
|
|
grpcServer *grpc.Server
|
|
httpListen net.Listener
|
|
httpServer *http.Server
|
|
httpMux *http.ServeMux
|
|
|
|
kubernetesCS *kubernetes.Clientset
|
|
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
}
|
|
|
|
func New() *Mirko {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
return &Mirko{
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
}
|
|
}
|
|
|
|
func authRequest(req *http.Request) (any, sensitive bool) {
|
|
host, _, err := net.SplitHostPort(req.RemoteAddr)
|
|
if err != nil {
|
|
host = req.RemoteAddr
|
|
}
|
|
|
|
if flagDebugAllowAll {
|
|
return true, true
|
|
}
|
|
|
|
switch host {
|
|
case "localhost", "127.0.0.1", "::1":
|
|
return true, true
|
|
default:
|
|
return false, false
|
|
}
|
|
}
|
|
|
|
func (m *Mirko) Listen() error {
|
|
grpc.EnableTracing = true
|
|
trace.AuthRequest = authRequest
|
|
|
|
grpcLis, err := net.Listen("tcp", flagListenAddress)
|
|
if err != nil {
|
|
return fmt.Errorf("net.Listen: %v", err)
|
|
}
|
|
m.grpcListen = grpcLis
|
|
m.grpcServer = grpc.NewServer(pki.WithServerHSPKI()...)
|
|
reflection.Register(m.grpcServer)
|
|
|
|
httpLis, err := net.Listen("tcp", flagDebugAddress)
|
|
if err != nil {
|
|
return fmt.Errorf("net.Listen: %v", err)
|
|
}
|
|
|
|
m.httpMux = http.NewServeMux()
|
|
// Canonical URLs
|
|
m.httpMux.HandleFunc("/debug/status", func(w http.ResponseWriter, r *http.Request) {
|
|
any, _ := authRequest(r)
|
|
if !any {
|
|
http.Error(w, "not allowed", http.StatusUnauthorized)
|
|
return
|
|
}
|
|
statusz.StatusHandler(w, r)
|
|
})
|
|
m.httpMux.HandleFunc("/debug/requests", trace.Traces)
|
|
|
|
// pprof endpoints
|
|
m.httpMux.HandleFunc("/debug/pprof/", pprof.Index)
|
|
m.httpMux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
|
|
m.httpMux.HandleFunc("/debug/pprof/profile", pprof.Profile)
|
|
m.httpMux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
|
|
m.httpMux.HandleFunc("/debug/pprof/trace", pprof.Trace)
|
|
|
|
// -z legacy URLs
|
|
m.httpMux.HandleFunc("/statusz", func(w http.ResponseWriter, r *http.Request) {
|
|
http.Redirect(w, r, "/debug/status", http.StatusSeeOther)
|
|
})
|
|
m.httpMux.HandleFunc("/rpcz", func(w http.ResponseWriter, r *http.Request) {
|
|
http.Redirect(w, r, "/debug/requests", http.StatusSeeOther)
|
|
})
|
|
m.httpMux.HandleFunc("/requestz", func(w http.ResponseWriter, r *http.Request) {
|
|
http.Redirect(w, r, "/debug/requests", http.StatusSeeOther)
|
|
})
|
|
m.httpMux.HandleFunc("/profilez", func(w http.ResponseWriter, r *http.Request) {
|
|
http.Redirect(w, r, "/debug/pprof", http.StatusSeeOther)
|
|
})
|
|
|
|
// root redirect
|
|
m.httpMux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
|
|
http.Redirect(w, r, "/debug/status", http.StatusSeeOther)
|
|
})
|
|
|
|
m.kubernetesCS = KubernetesClient()
|
|
|
|
debugParts := strings.Split(flagDebugAddress, ":")
|
|
debugPort := debugParts[len(debugParts)-1]
|
|
statusz.PublicAddress = fmt.Sprintf("http://%s:%s/", m.Address().String(), debugPort)
|
|
|
|
m.httpListen = httpLis
|
|
m.httpServer = &http.Server{
|
|
Addr: flagDebugAddress,
|
|
Handler: m.httpMux,
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Trace logs debug information to either a context trace (if present)
|
|
// or stderr (if not)
|
|
func Trace(ctx context.Context, f string, args ...interface{}) {
|
|
tr, ok := trace.FromContext(ctx)
|
|
if !ok {
|
|
fmtd := fmt.Sprintf(f, args...)
|
|
glog.Warningf("No trace in %v: %s", ctx, fmtd)
|
|
return
|
|
}
|
|
tr.LazyPrintf(f, args...)
|
|
}
|
|
|
|
// GRPC returns the microservice's grpc.Server object
|
|
func (m *Mirko) GRPC() *grpc.Server {
|
|
if m.grpcServer == nil {
|
|
panic("GRPC() called before Listen()")
|
|
}
|
|
return m.grpcServer
|
|
}
|
|
|
|
// HTTPMux returns the microservice's debug HTTP mux
|
|
func (m *Mirko) HTTPMux() *http.ServeMux {
|
|
if m.httpMux == nil {
|
|
panic("HTTPMux() called before Listen()")
|
|
}
|
|
return m.httpMux
|
|
}
|
|
|
|
// Context returns a background microservice context that will be canceled
|
|
// when the service is shut down
|
|
func (m *Mirko) Context() context.Context {
|
|
return m.ctx
|
|
}
|
|
|
|
// Done() returns a channel that will emit a value when the service is
|
|
// shut down. This should be used in the main() function instead of a select{}
|
|
// call, to allow the background context to be canceled fully.
|
|
func (m *Mirko) Done() <-chan struct{} {
|
|
return m.Context().Done()
|
|
}
|
|
|
|
// Serve starts serving HTTP and gRPC requests
|
|
func (m *Mirko) Serve() error {
|
|
errs := make(chan error, 1)
|
|
go func() {
|
|
if err := m.grpcServer.Serve(m.grpcListen); err != nil {
|
|
errs <- err
|
|
}
|
|
}()
|
|
go func() {
|
|
if err := m.httpServer.Serve(m.httpListen); err != nil {
|
|
errs <- err
|
|
}
|
|
}()
|
|
|
|
signalCh := make(chan os.Signal, 1)
|
|
signal.Notify(signalCh, os.Interrupt)
|
|
go func() {
|
|
select {
|
|
case <-signalCh:
|
|
m.cancel()
|
|
}
|
|
}()
|
|
|
|
ticker := time.NewTicker(1 * time.Second)
|
|
select {
|
|
case <-ticker.C:
|
|
glog.Infof("gRPC listening on %s", flagListenAddress)
|
|
glog.Infof("HTTP listening on %s", flagDebugAddress)
|
|
return nil
|
|
case err := <-errs:
|
|
return err
|
|
}
|
|
}
|
|
|
|
// Address returns a linkable address where this service is running, sans port.
|
|
// If running within kubernetes, this will return the pod IP.
|
|
// Otherwise, this will guess the main, 'external' IP address of the machine it's running on.
|
|
// On failures, returns loopback address.
|
|
func (m *Mirko) Address() net.IP {
|
|
// If we're not running in Kubernetes and binding to 127.0.0.1, return loopback.
|
|
if m.kubernetesCS == nil && strings.HasPrefix(flagListenAddress, "127.0.0.1:") {
|
|
return net.ParseIP("127.0.0.1")
|
|
}
|
|
|
|
ifaces, err := net.Interfaces()
|
|
if err != nil {
|
|
glog.Errorf("net.Interface(): %v", err)
|
|
return net.ParseIP("127.0.0.1")
|
|
}
|
|
|
|
addrmap := make(map[string]net.IP)
|
|
|
|
for _, iface := range ifaces {
|
|
addrs, err := iface.Addrs()
|
|
if err != nil {
|
|
glog.Errorf("iface(%q).Addrs(): %v", iface.Name, err)
|
|
continue
|
|
}
|
|
|
|
for _, addr := range addrs {
|
|
var ip net.IP
|
|
switch v := addr.(type) {
|
|
case *net.IPNet:
|
|
ip = v.IP
|
|
case *net.IPAddr:
|
|
ip = v.IP
|
|
default:
|
|
continue
|
|
}
|
|
|
|
if strings.HasPrefix(ip.String(), "fe80:") {
|
|
continue
|
|
}
|
|
addrmap[iface.Name] = ip
|
|
}
|
|
}
|
|
|
|
if m.kubernetesCS != nil {
|
|
addr, ok := addrmap["eth0"]
|
|
if !ok {
|
|
glog.Errorf("Running on Kubernetes but no eth0! Available interfaces: %v", addrmap)
|
|
return net.ParseIP("127.0.0.1")
|
|
}
|
|
|
|
return addr
|
|
}
|
|
|
|
if len(addrmap) == 0 {
|
|
glog.Errorf("No interfaces found!")
|
|
return net.ParseIP("127.0.0.1")
|
|
}
|
|
|
|
// Heuristics ahoy!
|
|
prioritized := []*ifaceWithPriority{}
|
|
for iface, addr := range addrmap {
|
|
prio := &ifaceWithPriority{
|
|
iface: iface,
|
|
addr: addr,
|
|
}
|
|
switch {
|
|
case strings.HasPrefix(iface, "lo"):
|
|
prio.priority = -10
|
|
case strings.HasPrefix(iface, "tap"):
|
|
prio.priority = -5
|
|
case strings.HasPrefix(iface, "tun"):
|
|
prio.priority = -5
|
|
case strings.HasPrefix(iface, "veth"):
|
|
prio.priority = 5
|
|
case strings.HasPrefix(iface, "wl"):
|
|
prio.priority = 5
|
|
case strings.HasPrefix(iface, "enp"):
|
|
prio.priority = 10
|
|
case strings.HasPrefix(iface, "eth"):
|
|
prio.priority = 10
|
|
}
|
|
|
|
prioritized = append(prioritized, prio)
|
|
}
|
|
|
|
sort.Slice(prioritized, func(i, j int) bool { return prioritized[i].priority > prioritized[j].priority })
|
|
return prioritized[0].addr
|
|
}
|
|
|
|
type ifaceWithPriority struct {
|
|
iface string
|
|
addr net.IP
|
|
priority int
|
|
}
|