forked from hswaw/hscloud
131 lines
4.0 KiB
Go
131 lines
4.0 KiB
Go
// kubenat implements a data source for undoing NAT on hosts running
|
|
// Kubernetes/containerd workloads.
|
|
//
|
|
// It parses the kernel conntrack NAT translation table to figure out the IP
|
|
// address of the pod that was making the connection.
|
|
//
|
|
// It then uses the containerd API to figure out what pod runs under what IP
|
|
// address.
|
|
//
|
|
// Both conntrack and containerd access is cached and only updated when needed.
|
|
// This means that as long as a TCP connection is open, identd will be able to
|
|
// respond about its information without having to perform any OS/containerd
|
|
// queries.
|
|
//
|
|
// Unfortunately, there is very little in terms of development/test harnesses
|
|
// for kubenat. You will have to have a locally running containerd, or do some
|
|
// mounts/forwards from a remote host.
|
|
package kubenat
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"net"
|
|
"time"
|
|
|
|
"github.com/cenkalti/backoff"
|
|
"github.com/golang/glog"
|
|
)
|
|
|
|
// Resolver is the main interface for kubenat. It runs background processing to
|
|
// update conntrack/containerd state, and resolves Tuple4s into PodInfo.
|
|
type Resolver struct {
|
|
conntrackPath string
|
|
criPath string
|
|
|
|
translationC chan *translationReq
|
|
podInfoC chan *podInfoReq
|
|
}
|
|
|
|
// Tuple4 is a 4-tuple of a TCP connection. Local describes the machine running
|
|
// this code, not the listen/connect 'ends' of TCP.
|
|
type Tuple4 struct {
|
|
RemoteIP net.IP
|
|
RemotePort uint16
|
|
LocalIP net.IP
|
|
LocalPort uint16
|
|
}
|
|
|
|
func (t *Tuple4) String() string {
|
|
local := net.JoinHostPort(t.LocalIP.String(), fmt.Sprintf("%d", t.LocalPort))
|
|
remote := net.JoinHostPort(t.RemoteIP.String(), fmt.Sprintf("%d", t.RemotePort))
|
|
return fmt.Sprintf("L: %s R: %s", local, remote)
|
|
}
|
|
|
|
// PodInfo describes a Kubernetes pod which terminates a given Tuple4 connection.
|
|
type PodInfo struct {
|
|
// PodIP is the IP address of the pod within the pod network.
|
|
PodIP net.IP
|
|
// PodTranslatedPort is the port on the PodIP corresponding to the Tuple4
|
|
// that this PodInfo was requested for.
|
|
PodTranslatedPort uint16
|
|
// KubernetesNamespace is the kubernetes namespace in which this pod is
|
|
// running.
|
|
KubernetesNamespace string
|
|
// Name is the name of the pod, as seen by kubernetes.
|
|
Name string
|
|
}
|
|
|
|
// NewResolver startss a resolver with a given path to /paroc/net/nf_conntrack
|
|
// and a CRI gRPC domain socket.
|
|
func NewResolver(ctx context.Context, conntrackPath, criPath string) (*Resolver, error) {
|
|
r := Resolver{
|
|
conntrackPath: conntrackPath,
|
|
criPath: criPath,
|
|
|
|
translationC: make(chan *translationReq),
|
|
podInfoC: make(chan *podInfoReq),
|
|
}
|
|
// TODO(q3k): bubble up errors from the translation worker into here?
|
|
go r.runTranslationWorker(ctx)
|
|
// The pod worker might fail on CRI connectivity issues, so we attempt to
|
|
// restart it with a backoff if needed.
|
|
go func() {
|
|
bo := backoff.NewExponentialBackOff()
|
|
bo.MaxElapsedTime = 0
|
|
bo.Reset()
|
|
for {
|
|
err := r.runPodWorker(ctx)
|
|
if err == nil || errors.Is(err, ctx.Err()) {
|
|
glog.Infof("podWorker exiting")
|
|
return
|
|
}
|
|
glog.Errorf("podWorker failed: %v", err)
|
|
wait := bo.NextBackOff()
|
|
glog.Errorf("restarting podWorker in %v", wait)
|
|
time.Sleep(wait)
|
|
}
|
|
}()
|
|
|
|
return &r, nil
|
|
}
|
|
|
|
// ResolvePod returns information about a running pod for a given TCP 4-tuple.
|
|
// If the 4-tuple or pod cannot be resolved, an error will be returned.
|
|
func (r *Resolver) ResolvePod(ctx context.Context, t *Tuple4) (*PodInfo, error) {
|
|
// TODO(q3k): expose translation/pod not found errors as package-level
|
|
// vars, or use gRPC statuses?
|
|
podAddr, err := r.translate(ctx, t)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("translate: %w", err)
|
|
}
|
|
if podAddr == nil {
|
|
return nil, fmt.Errorf("translation not found")
|
|
}
|
|
podInfo, err := r.getPodInfo(ctx, podAddr.localIP)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("getPodInfo: %w", err)
|
|
}
|
|
if podInfo == nil {
|
|
return nil, fmt.Errorf("pod not found")
|
|
}
|
|
|
|
return &PodInfo{
|
|
PodIP: podAddr.localIP,
|
|
PodTranslatedPort: podAddr.localPort,
|
|
KubernetesNamespace: podInfo.namespace,
|
|
Name: podInfo.name,
|
|
}, nil
|
|
}
|