forked from hswaw/hscloud
177 lines
4.4 KiB
Go
177 lines
4.4 KiB
Go
|
package kubenat
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
"fmt"
|
||
|
"net"
|
||
|
|
||
|
"github.com/golang/glog"
|
||
|
"google.golang.org/grpc"
|
||
|
"google.golang.org/grpc/codes"
|
||
|
"google.golang.org/grpc/status"
|
||
|
|
||
|
"code.hackerspace.pl/hscloud/cluster/identd/cri"
|
||
|
)
|
||
|
|
||
|
// podInfoReq is a request passed to the podWorker.
|
||
|
type podInfoReq struct {
|
||
|
local net.IP
|
||
|
res chan *podInfoResp
|
||
|
}
|
||
|
|
||
|
// podInfoResp is a response from a podWorker, sent over the res channel in a
|
||
|
// podInfoReq.
|
||
|
type podInfoResp struct {
|
||
|
name string
|
||
|
namespace string
|
||
|
}
|
||
|
|
||
|
// reply sends a reply to the given podInfoReq based on a CRI PodSandboxStatus,
|
||
|
// sending nil if the status is nil.
|
||
|
func (r *podInfoReq) reply(s *cri.PodSandboxStatus) {
|
||
|
if s == nil {
|
||
|
r.res <- nil
|
||
|
return
|
||
|
}
|
||
|
r.res <- &podInfoResp{
|
||
|
name: s.Metadata.Name,
|
||
|
namespace: s.Metadata.Namespace,
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// getPodInfo performs a podInfoReq/podInfoResp exchange under a context that
|
||
|
// can be used to time out the query.
|
||
|
func (r *Resolver) getPodInfo(ctx context.Context, local net.IP) (*podInfoResp, error) {
|
||
|
resC := make(chan *podInfoResp, 1)
|
||
|
r.podInfoC <- &podInfoReq{
|
||
|
local: local,
|
||
|
res: resC,
|
||
|
}
|
||
|
select {
|
||
|
case <-ctx.Done():
|
||
|
return nil, ctx.Err()
|
||
|
case res := <-resC:
|
||
|
return res, nil
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// podStatus is a cache of data retrieved from CRI.
|
||
|
type podStatus struct {
|
||
|
// info is a map from pod sandbox ID to PodSandboxStatus as retrieved from
|
||
|
// CRI.
|
||
|
info map[string]*cri.PodSandboxStatus
|
||
|
// byIP is a map from pod IP (as string) to pod sandbox ID.
|
||
|
byIP map[string]string
|
||
|
}
|
||
|
|
||
|
// update performs an update of the podStatus from CRI. It only retrieves
|
||
|
// information about pods that it doesn't yet have, and ensures that pods which
|
||
|
// do not exist in CRI are also removed from podStatus.
|
||
|
// TODO(q3k): make sure we don't cache PodSandboxStatus too early, eg. when
|
||
|
// it's not yet fully running?
|
||
|
func (p *podStatus) update(ctx context.Context, client cri.RuntimeServiceClient) error {
|
||
|
res, err := client.ListPodSandbox(ctx, &cri.ListPodSandboxRequest{})
|
||
|
if err != nil {
|
||
|
return fmt.Errorf("ListPodSandbox: %w", err)
|
||
|
}
|
||
|
|
||
|
// set of all pod sandbox IDs in CRI.
|
||
|
want := make(map[string]bool)
|
||
|
// set of pod sandbox IDs in CRI that are not in podStatus.
|
||
|
missing := make(map[string]bool)
|
||
|
for _, item := range res.Items {
|
||
|
want[item.Id] = true
|
||
|
if _, ok := p.info[item.Id]; ok {
|
||
|
continue
|
||
|
}
|
||
|
missing[item.Id] = true
|
||
|
}
|
||
|
|
||
|
// Get information about missing pod IDs into podStatus.
|
||
|
for id, _ := range missing {
|
||
|
res, err := client.PodSandboxStatus(ctx, &cri.PodSandboxStatusRequest{
|
||
|
PodSandboxId: id,
|
||
|
})
|
||
|
if err != nil {
|
||
|
if st, ok := status.FromError(err); ok && st.Code() == codes.NotFound {
|
||
|
continue
|
||
|
} else {
|
||
|
return fmt.Errorf("while getting sandbox %s: %v", id, err)
|
||
|
}
|
||
|
}
|
||
|
p.info[id] = res.Status
|
||
|
}
|
||
|
|
||
|
// byIP is fully repopulated on each update.
|
||
|
p.byIP = make(map[string]string)
|
||
|
|
||
|
// remove is the set of pods sandbox IDs that should be removed from podStatus.
|
||
|
remove := make(map[string]bool)
|
||
|
// Populate remove and p.byId in a single pass.
|
||
|
for id, info := range p.info {
|
||
|
if _, ok := want[id]; !ok {
|
||
|
remove[id] = true
|
||
|
continue
|
||
|
}
|
||
|
if info.Network == nil {
|
||
|
continue
|
||
|
}
|
||
|
if info.Network.Ip == "" {
|
||
|
continue
|
||
|
}
|
||
|
p.byIP[info.Network.Ip] = id
|
||
|
}
|
||
|
// Remove stale pod sandbox IDs from podStatus.
|
||
|
for id, _ := range remove {
|
||
|
delete(p.info, id)
|
||
|
}
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// findByPodID returns a PodSandboxStatus for the pod running under a given pod
|
||
|
// IP address, or nil if not found.
|
||
|
func (p *podStatus) findByPodIP(ip net.IP) *cri.PodSandboxStatus {
|
||
|
id, ok := p.byIP[ip.String()]
|
||
|
if !ok {
|
||
|
return nil
|
||
|
}
|
||
|
return p.info[id]
|
||
|
}
|
||
|
|
||
|
// runPodWorker runs the CRI cache 'pod worker'. It responds to requests over
|
||
|
// podInfoC until ctx is canceled.
|
||
|
func (r *Resolver) runPodWorker(ctx context.Context) error {
|
||
|
conn, err := grpc.Dial(fmt.Sprintf("unix://%s", r.criPath), grpc.WithInsecure())
|
||
|
if err != nil {
|
||
|
return fmt.Errorf("Dial: %w", err)
|
||
|
}
|
||
|
defer conn.Close()
|
||
|
client := cri.NewRuntimeServiceClient(conn)
|
||
|
|
||
|
ps := &podStatus{
|
||
|
info: make(map[string]*cri.PodSandboxStatus),
|
||
|
}
|
||
|
if err := ps.update(ctx, client); err != nil {
|
||
|
return fmt.Errorf("initial pod update: %w", err)
|
||
|
}
|
||
|
|
||
|
for {
|
||
|
select {
|
||
|
case req := <-r.podInfoC:
|
||
|
info := ps.findByPodIP(req.local)
|
||
|
if info != nil {
|
||
|
req.reply(info)
|
||
|
continue
|
||
|
}
|
||
|
err := ps.update(ctx, client)
|
||
|
if err != nil {
|
||
|
glog.Errorf("Updating pods failed: %v", err)
|
||
|
continue
|
||
|
}
|
||
|
req.reply(ps.findByPodIP(req.local))
|
||
|
case <-ctx.Done():
|
||
|
return ctx.Err()
|
||
|
}
|
||
|
}
|
||
|
}
|