1
0
Fork 0

Merge changes Ib068109f,I9a00487f,I1861fe7c,I254983e5,I3e2bedca, ...

* changes:
  cluster/identd/ident: update README
  cluster/kube: deploy identd
  cluster/identd: implement
  cluster/identd/kubenat: implement
  cluster/identd/cri: import
  cluster/identd/ident: add TestE2E
  cluster/identd/ident: add Query function
  cluster/identd/ident: add IdentError
  cluster/identd/ident: add basic ident protocol server
  cluster/identd/ident: add basic ident protocol client
master
q3k 2021-05-28 23:08:10 +00:00 committed by Gerrit Code Review
commit 7251f2720e
26 changed files with 3908 additions and 0 deletions

View File

@ -0,0 +1,48 @@
load("@io_bazel_rules_docker//container:container.bzl", "container_image", "container_layer", "container_push")
load("@io_bazel_rules_go//go:def.bzl", "go_binary", "go_library")
go_library(
name = "go_default_library",
srcs = ["main.go"],
importpath = "code.hackerspace.pl/hscloud/cluster/identd",
visibility = ["//visibility:private"],
deps = [
"//cluster/identd/ident:go_default_library",
"//cluster/identd/kubenat:go_default_library",
"//go/mirko:go_default_library",
"@com_github_golang_glog//:go_default_library",
"@io_k8s_apimachinery//pkg/apis/meta/v1:go_default_library",
],
)
go_binary(
name = "identd",
embed = [":go_default_library"],
visibility = ["//visibility:public"],
)
container_layer(
name = "layer_bin",
files = [
":identd",
],
directory = "/cluster/identd/",
)
container_image(
name = "runtime",
base = "@prodimage-bionic//image",
layers = [
":layer_bin",
],
entrypoint = "/cluster/identd/identd",
)
container_push(
name = "push",
image = ":runtime",
format = "Docker",
registry = "registry.k0.hswaw.net",
repository = "q3k/identd",
tag = "{BUILD_TIMESTAMP}-{STABLE_GIT_COMMIT}",
)

61
cluster/identd/README.md Normal file
View File

@ -0,0 +1,61 @@
hscloud ident server
===
This server implements the ident protocol, as defined by RFC1413, which is
mostly used by IRC servers to determine the 'identity' of an incoming IRC
connection.
This implementation is supposed to run on production hosts which run containerd
with containers whose outgoing connections NATed to the host's public IP
address.
It responds with information about the namespace of the pod that originated the
connection. If the namespace is a personal-$owner namespace, it responds with
the owner of that namespace. Otherwise, it responds with `kns-$namespace`.
In addition, it has hardcoded special behaviour for when the pod terminating
the connection is named `appservice-irc-*` and runs in the `matrix` namespace.
If so, it performs an ident request to that pod on port 1113. This effectively
integrates it with appservice-irc's integrated identd, and allows us to server
correct identities for IRC connections.
Example flow
---
.----------------------------------.
| k8s host |
|----------------------------------|
.-------------. | .-------------. |
| remote host | | | pod | |
|-------------| | ...... |-------------| |
| IRCd<:-:6697----:xxxx-:--< NAT <---:yyyy-:- IRC client | |
| identd-:--------. | '''''' | | |
'.............| | | ^ .-:->identd | |
| | | query | '-------------' |
| | | | |
'--:113-:-->identd- - - -' forward? |
| | |
| | query |
| v |
| ( containerd ) |
'----------------------------------'
In the above diagram, the remote hosts' identd client would query identd for
information about the TCP connection `xxxx,6697`, which identd would attempt to
resolve back into the pod by consulting the NAT table. After that, it can
either return the pod's namespace information to identd, or (if the pod is an
appservice-irc) forward the query to another identd running within the pod,
this time asking for `yyyy,6697`, and passing that reponse to the remote identd
client.
Libraries and building blocks
---
- [ident/](//cluster/identd/ident/) is a Go ident server/client library, reusable across projects.
- [kubenat/](//cluster/idented/kubenat/) is a Go library for figuring out which pod behind a NAT originated a given 4-tuple.
Deployment
---
See //cluster/kube/lib/identd.libsonnet .

View File

@ -0,0 +1,24 @@
load("@rules_proto//proto:defs.bzl", "proto_library")
load("@io_bazel_rules_go//go:def.bzl", "go_library")
load("@io_bazel_rules_go//proto:def.bzl", "go_proto_library")
proto_library(
name = "cri_proto",
srcs = ["api.proto"],
visibility = ["//visibility:public"],
)
go_proto_library(
name = "cri_go_proto",
compilers = ["@io_bazel_rules_go//proto:go_grpc"],
importpath = "code.hackerspace.pl/hscloud/cluster/identd/cri",
proto = ":cri_proto",
visibility = ["//visibility:public"],
)
go_library(
name = "go_default_library",
embed = [":cri_go_proto"],
importpath = "code.hackerspace.pl/hscloud/cluster/identd/cri",
visibility = ["//visibility:public"],
)

1363
cluster/identd/cri/api.proto Normal file

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,27 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = [
"client.go",
"errors.go",
"request.go",
"response.go",
"server.go",
],
importpath = "code.hackerspace.pl/hscloud/cluster/identd/ident",
visibility = ["//visibility:public"],
deps = ["@com_github_golang_glog//:go_default_library"],
)
go_test(
name = "go_default_test",
srcs = [
"e2e_test.go",
"request_test.go",
"response_test.go",
"server_test.go",
],
embed = [":go_default_library"],
deps = ["@com_github_go_test_deep//:go_default_library"],
)

View File

@ -0,0 +1,3 @@
Implementation of the IDENT protocol (RFC 1413) protocl in Go.
Currently implements a server and client. Should be well documented enough to be usable.

View File

@ -0,0 +1,225 @@
package ident
import (
"bufio"
"context"
"fmt"
"io"
"net"
"strconv"
"github.com/golang/glog"
)
type DialOption func(d *dialOptions)
type dialOptions struct {
dialer func(context.Context, string, string) (net.Conn, error)
}
// WithDialer configures a Client to use a given dial function instead of the
// default implementation in net.
func WithDialer(dialer func(context.Context, string, string) (net.Conn, error)) DialOption {
return func(d *dialOptions) {
d.dialer = dialer
}
}
// parseTarget interprets a target string (ie. the target address of the Dial
// function) as an ident service address, either a host:port pair, or a host
// (in which case the default ident port, 113, is used).
func parseTarget(s string) (string, uint16, error) {
host, portStr, err := net.SplitHostPort(s)
if err == nil {
port, err := strconv.ParseUint(portStr, 10, 16)
if err != nil {
return "", 0, fmt.Errorf("can't parse port %q: %w", portStr, err)
}
return host, uint16(port), nil
}
// Doesn't look like a host:port pair? Default to port 113.
return s, 113, nil
}
// Dial sets up an ident protocol Client that will connect to the given target.
// Target can be either a host:port pair, or just a host (in which case the
// default ident port, 113, is used).
// This does not actually connect to identd over TCP - that will be done, as
// necessary, as requests are processed (including reconnections if multiple
// requests are processed on a Client which connects to a server that does not
// support long-standing ident donnections).
func Dial(target string, options ...DialOption) (*Client, error) {
host, port, err := parseTarget(target)
if err != nil {
return nil, fmt.Errorf("invalid target: %v", err)
}
dialer := net.Dialer{}
opts := dialOptions{
dialer: dialer.DialContext,
}
for _, opt := range options {
opt(&opts)
}
return &Client{
opts: opts,
target: net.JoinHostPort(host, fmt.Sprintf("%d", port)),
conn: nil,
scanner: nil,
}, nil
}
// Client is an ident protocol client. It maintains a connection to the ident
// server that it's been configured for, reconnecting as necessary. It is not
// safe to be used by multiple goroutines.
type Client struct {
// opts are the dialOptions with which the client has been constructed.
opts dialOptions
// target is the full host:port pair that the client should connect to.
target string
// conn is either nil or an active TCP connection to the ident server.
conn net.Conn
// scannner is either nil or a line-scanner attached to the receive side of
// conn.
scanner *bufio.Scanner
}
func (c *Client) connect(ctx context.Context) error {
glog.V(1).Infof("Dialing IDENT at %q", c.target)
conn, err := c.opts.dialer(ctx, "tcp", c.target)
if err != nil {
return fmt.Errorf("connecting: %w", err)
}
c.conn = conn
c.scanner = bufio.NewScanner(conn)
return nil
}
func (c *Client) disconnect() {
if c.conn == nil {
return
}
c.conn.Close()
c.conn = nil
}
// Do executes the given Request against the server to which the Client is
// connected.
func (c *Client) Do(ctx context.Context, r *Request) (*Response, error) {
glog.V(1).Infof("Do(%+v)", r)
// Connect if needed.
if c.conn == nil {
if err := c.connect(ctx); err != nil {
return nil, err
}
}
// Start a goroutine that will perform the actual request/response
// processing to the server. A successful response will land in resC, while
// any protocl-level error will land in errC.
// We make both channels buffered, because if the context expires without a
// response, we want the goroutine to be able to write to them even though
// we're not receiving anymore. The channel will then be garbage collected.
resC := make(chan *Response, 1)
errC := make(chan error, 1)
go func() {
data := r.encode()
glog.V(3).Infof(" -> %q", data)
_, err := c.conn.Write(data)
if err != nil {
errC <- fmt.Errorf("Write: %w", err)
return
}
if !c.scanner.Scan() {
// scanner.Err() returns nil on EOF. We want that EOF, as the ident
// protocol has special meaning for EOF sent by the server
// (indicating either a lack of support for multiple requests per
// connection, or a refusal to serve at an early stage of the
// connection).
if err := c.scanner.Err(); err != nil {
errC <- fmt.Errorf("Read: %w", err)
} else {
errC <- fmt.Errorf("Read: %w", io.EOF)
}
}
data = c.scanner.Bytes()
glog.V(3).Infof(" <- %q", data)
resp, err := decodeResponse(data)
if err != nil {
errC <- err
} else {
resC <- resp
}
}()
select {
case <-ctx.Done():
// If the context is closed, fail with the context error and kill the
// connection. The running goroutine will error out on any pending
// network I/O and fail at some later point.
// TODO(q3k): make the communication goroutine long-lived and don't
// kill it here, just let it finish whatever it's doing and ignore the
// result.
c.disconnect()
return nil, ctx.Err()
case res := <-resC:
return res, nil
case err := <-errC:
// TODO(q3k): interpret EOF, which can mean different things at
// different times according to the RFC.
if c.conn != nil {
c.conn.Close()
c.conn = nil
}
return nil, err
}
}
// Close closes the Client, closing any underlying TCP connection.
func (c *Client) Close() error {
if c.conn == nil {
return nil
}
return c.conn.Close()
}
// Query performs a single ident protocol request to a server running at target
// and returns the received ident response. If the ident server cannot be
// queries, or the ident server returns an ident error, an error is returned.
//
// This a convenience wrapper around Dial/Do. The given target must be either a
// host or host:port pair. If not given, the port defaults to 113.
//
// Returned ident server error resposes are *IdentError, and can be tested for
// using errors.Is/errors.As. See the IdentError type documentation for more
// information.
//
// The given context will be used to time out the request, either at the
// connection or request stage. If the context is canceled/times out, the
// context error will be returned and the query aborted.
//
// Since Query opens a connection to the ident server just for a single query,
// it should not be used if a single server is going to be queries about
// multiple addresses, and instead Dial/Do should be used to keep a
// long-standing connection if possible.
func Query(ctx context.Context, target string, client, server uint16) (*IdentResponse, error) {
cl, err := Dial(target)
if err != nil {
return nil, fmt.Errorf("could not dial: %w", err)
}
defer cl.Close()
resp, err := cl.Do(ctx, &Request{
ClientPort: client,
ServerPort: server,
})
if err != nil {
return nil, fmt.Errorf("could not query: %w", err)
}
if resp.Ident != nil {
return resp.Ident, nil
}
return nil, &IdentError{Inner: resp.Error}
}

View File

@ -0,0 +1,68 @@
package ident
import (
"context"
"errors"
"net"
"testing"
)
// TestE2E performs an end-to-end test of the ident client and server,
// exercising as much functionality as possible.
func TestE2E(t *testing.T) {
lis, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatalf("Listen: %v", err)
}
defer lis.Close()
isrv := NewServer()
isrv.HandleFunc(func(ctx context.Context, w ResponseWriter, r *Request) {
switch r.ServerPort {
case 1:
w.SendError(NoUser)
default:
w.SendIdent(&IdentResponse{
UserID: "q3k",
})
}
})
go isrv.Serve(lis)
target := lis.Addr().String()
ctx := context.Background()
// Full end-to-end test, from high-level client Query call, to server
// handler.
// This call should succeed.
resp, err := Query(ctx, target, 123, 234)
if err != nil {
t.Fatalf("Query: %v", err)
}
if want, got := "q3k", resp.UserID; want != got {
t.Errorf("Resp.UserID: wanted %q, got %q", want, got)
}
// This call should error out and return an error that can be converted
// back into ErrorResponse.
resp, err = Query(ctx, target, 123, 1)
if err == nil {
t.Fatalf("Query returned nil error")
}
identErr := &IdentError{}
if errors.As(err, &identErr) {
if want, got := NoUser, identErr.Inner; want != got {
t.Errorf("Query error is %q, wanted %q", want, got)
}
} else {
t.Errorf("Query error did not match AnyError")
}
// Test matches against specific errors.
if errors.Is(err, &IdentError{UnknownError}) {
t.Errorf("Query error should not have matched UnknownError")
}
if !errors.Is(err, &IdentError{NoUser}) {
t.Errorf("Query error should have matcher NoUser")
}
}

View File

@ -0,0 +1,38 @@
package ident
// IdentError is an ErrorResponse received from a ident server, wrapped as a Go
// error type.
// When using errors.Is/errors.As against an IdentError, the Inner field
// controls the matching behaviour.
// - If set, the error will match if the tested error is an IdentError with
// same ErrorResponse as the IdentError tested against
// - If not set, the error will always match if the tested error is any
// IdentError.
//
// For example:
// errors.Is(err, &IdentError{}
// will be true if err is any *IdentError, but:
// errors.Is(err, &IdentError{NoUser})
// will be true only if err is an *IdentError with an Inner NoUser.
type IdentError struct {
// Inner is the ErrorResponse contained by this error.
Inner ErrorResponse
}
func (e *IdentError) Error() string {
return string(e.Inner)
}
func (e *IdentError) Is(target error) bool {
t, ok := target.(*IdentError)
if !ok {
return false
}
if t.Inner == "" {
return true
}
if t.Inner == e.Inner {
return true
}
return false
}

View File

@ -0,0 +1,58 @@
package ident
import (
"fmt"
"net"
"regexp"
"strconv"
)
// Request is an ident protocol request, as seen by the client or server.
type Request struct {
// ClientPort is the port number on the client side of the indent protocol,
// ie. the port local to the ident client.
ClientPort uint16
// ServerPort is the port number on the server side of the ident protocol,
// ie. the port local to the ident server.
ServerPort uint16
// ClientAddress is the address of the ident protocol client. This is set
// by the ident Server before invoking handlers, and is ignored by the
// ident protocol Client.
// In handlers this can be used to ensure that responses are only returned
// to clients who are running on the remote side of the connection that
// they are querying about.
ClientAddress net.Addr
}
// encode encodes ths Request as per RFC1413, including the terminating \r\n.
func (r *Request) encode() []byte {
return []byte(fmt.Sprintf("%d,%d\r\n", r.ServerPort, r.ClientPort))
}
var (
// reRequest matches request from RFC1413, but allows extra whitespace
// between significant tokens.
reRequest = regexp.MustCompile(`^\s*(\d{1,5})\s*,\s*(\d{1,5})\s*$`)
)
// decodeRequest parses the given bytes as an ident request. The data must be
// stripped of the trailing \r\n.
func decodeRequest(data []byte) (*Request, error) {
match := reRequest.FindStringSubmatch(string(data))
if match == nil {
return nil, fmt.Errorf("unparseable request")
}
serverPort, err := strconv.ParseUint(match[1], 10, 16)
if err != nil {
return nil, fmt.Errorf("invalid server port: %w", err)
}
clientPort, err := strconv.ParseUint(match[2], 10, 16)
if err != nil {
return nil, fmt.Errorf("invalid client port: %w", err)
}
return &Request{
ClientPort: uint16(clientPort),
ServerPort: uint16(serverPort),
}, nil
}

View File

@ -0,0 +1,14 @@
package ident
import "testing"
// TestRequestEncode exercises the (simple) functionality of Reequest.encode.
func TestRequestEncode(t *testing.T) {
r := Request{
ClientPort: 123,
ServerPort: 234,
}
if want, got := "234,123\r\n", string(r.encode()); want != got {
t.Errorf("Wanted %q, got %q", want, got)
}
}

View File

@ -0,0 +1,183 @@
package ident
import (
"fmt"
"regexp"
"strconv"
"strings"
)
// Response is an ident protocol response, as seen by the client or server.
type Response struct {
// ClientPort is the port number on the client side of the indent protocol,
// ie. the port local to the ident client.
ClientPort uint16
// ServerPort is the port number on the server side of the ident protocol,
// ie. the port local to the ident server.
ServerPort uint16
// Exactly one of {Error, Ident} must be non-zero.
// Error is either NoError (the zero value) or one of the ErrorResponse
// types if this response represents an ident protocol error reply.
Error ErrorResponse
// Ident is either nil or a IdentResponse if this response represents an
// ident protocol ident reply.
Ident *IdentResponse
}
// ErrorResponse is error-type from RFC1413, indicating one of the possible
// errors returned by the ident protocol server.
type ErrorResponse string
const (
// NoError is an ErrorResponse that indicates a lack of error.
NoError ErrorResponse = ""
// InvalidPort indicates that either the local or foreign port was
// improperly specified.
InvalidPort ErrorResponse = "INVALID-PORT"
// NoUser indicates that the port pair is not currently in use or currently
// not owned by an identifiable entity.
NoUser ErrorResponse = "NO-USER"
// HiddenUser indicates that the server was able to identify the user of
// this port, but the information was not returned at the request of the
// user.
HiddenUser ErrorResponse = "HIDDEN-USER"
// UnknownError indicates that the server could not determine the
// connection owner for an unknown reason.
UnknownError ErrorResponse = "UNKNOWN-ERROR"
)
// IsStandardError returns whether ErrorResponse represents a standard error.
func (e ErrorResponse) IsStandardError() bool {
switch e {
case InvalidPort, NoUser, HiddenUser, UnknownError:
return true
default:
return false
}
}
// IsNonStandardError returns ehther the ErrorResponse represents a
// non-standard error.
func (e ErrorResponse) IsNonStandardError() bool {
return len(e) > 0 && e[0] == 'X'
}
func (e ErrorResponse) IsError() bool {
if e.IsStandardError() {
return true
}
if e.IsNonStandardError() {
return true
}
return false
}
// IdentResponse is the combined opsys, charset and user-id fields from
// RFC1413. It represents a non-error response from the ident protocol server.
type IdentResponse struct {
// OperatingSystem is an operating system identifier as per RFC1340. This
// is usually UNIX. OTHER has a special meaning, see RFC1413 for more
// information.
OperatingSystem string
// CharacterSet a character set as per RFC1340, defaulting to US-ASCII.
CharacterSet string
// UserID is the 'normal' user identification of the owner of the
// connection, unless the operating system is set to OTHER. See RFC1413 for
// more information.
UserID string
}
// encode encodes the given Response. If the Response is unencodable/malformed,
// nil is returned.
func (r *Response) encode() []byte {
// Both Error and Ident cannot be set at once.
if r.Error != "" && r.Ident != nil {
return nil
}
if r.Error != "" {
if !r.Error.IsError() {
return nil
}
return []byte(fmt.Sprintf("%d,%d:ERROR:%s\r\n", r.ServerPort, r.ClientPort, r.Error))
}
if r.Ident != nil {
id := r.Ident
os := id.OperatingSystem
if os == "" {
return nil
}
// For compatibility, do not set US-ASCII explicitly.
if id.CharacterSet != "" && id.CharacterSet != "US-ASCII" {
os += "," + id.CharacterSet
}
return []byte(fmt.Sprintf("%d,%d:USERID:%s:%s\r\n", r.ServerPort, r.ClientPort, os, id.UserID))
}
// Malformed response, return nil.
return nil
}
var (
// reErrorReply matches error-reply from RFC1413, but also allows extra
// whitespace between significant tokens. It does not ensure that the
// error-type is one of the standardized values.
reErrorReply = regexp.MustCompile(`^\s*(\d{1,5})\s*,\s*(\d{1,5})\s*:\s*ERROR\s*:\s*(.+)$`)
// reIdentReply matches ident-reply from RFC1413, but also allows extra
// whitespace between significant tokens. It does not ensure that that
// opsys-field and user-id parts are RFC compliant.
reIdentReply = regexp.MustCompile(`^\s*(\d{1,5})\s*,\s*(\d{1,5})\s*:\s*USERID\s*:\s*([^:,]+)(,([^:]+))?\s*:(.+)$`)
)
// decodeResponse parses the given bytes as an ident response. The data must be
// stripped of the trailing \r\n.
func decodeResponse(data []byte) (*Response, error) {
if match := reErrorReply.FindStringSubmatch(string(data)); match != nil {
serverPort, err := strconv.ParseUint(match[1], 10, 16)
if err != nil {
return nil, fmt.Errorf("invalid server port: %w", err)
}
clientPort, err := strconv.ParseUint(match[2], 10, 16)
if err != nil {
return nil, fmt.Errorf("invalid client port: %w", err)
}
errResp := ErrorResponse(strings.TrimSpace(match[3]))
if !errResp.IsError() {
// The RFC doesn't tell us what we should do in this case. For
// reliability, we downcast any unknown error to UNKNOWN-ERROR.
errResp = UnknownError
}
return &Response{
ClientPort: uint16(clientPort),
ServerPort: uint16(serverPort),
Error: errResp,
}, nil
}
if match := reIdentReply.FindStringSubmatch(string(data)); match != nil {
serverPort, err := strconv.ParseUint(match[1], 10, 16)
if err != nil {
return nil, fmt.Errorf("invalid server port: %w", err)
}
clientPort, err := strconv.ParseUint(match[2], 10, 16)
if err != nil {
return nil, fmt.Errorf("invalid client port: %w", err)
}
os := strings.TrimSpace(match[3])
charset := strings.TrimSpace(match[5])
if charset == "" {
charset = "US-ASCII"
}
userid := strings.TrimSpace(match[6])
return &Response{
ClientPort: uint16(clientPort),
ServerPort: uint16(serverPort),
Ident: &IdentResponse{
OperatingSystem: os,
CharacterSet: charset,
UserID: userid,
},
}, nil
}
return nil, fmt.Errorf("unparseable response")
}

View File

@ -0,0 +1,142 @@
package ident
import (
"testing"
"github.com/go-test/deep"
)
// TestResponseDecode exercises the response decode implementation.
func TestResponseDecode(t *testing.T) {
for i, te := range []struct {
data string
want *Response
}{
// 0: Everything okay, server returned error.
{"123, 234 : ERROR : INVALID-PORT", &Response{
ServerPort: 123,
ClientPort: 234,
Error: InvalidPort,
}},
// 1: Everything okay, server returned error but also added some weird
// whitespace.
{" 123\t ,234 :ERROR: NO-USER ", &Response{
ServerPort: 123,
ClientPort: 234,
Error: NoUser,
}},
// 2: Everything okay, server returned a simple ident response.
{"123,234 : USERID : UNIX :q3k", &Response{
ServerPort: 123,
ClientPort: 234,
Ident: &IdentResponse{
OperatingSystem: "UNIX",
CharacterSet: "US-ASCII",
UserID: "q3k",
},
}},
// 3: Everything okay, server returned an ident response with a
// charset.
{"123,234 : USERID : UNIX, PETSCII :q3k", &Response{
ServerPort: 123,
ClientPort: 234,
Ident: &IdentResponse{
OperatingSystem: "UNIX",
CharacterSet: "PETSCII",
UserID: "q3k",
},
}},
} {
res, err := decodeResponse([]byte(te.data))
if err != nil {
if te.want != nil {
t.Errorf("%d: wanted result, got err %v", i, err)
}
} else {
if diff := deep.Equal(te.want, res); diff != nil {
t.Errorf("%d: %s", i, diff)
}
}
}
}
// TestResponseEncode exercises the encode method of Response.
func TestResponseEncode(t *testing.T) {
for i, te := range []struct {
r *Response
want string
}{
/// Standard features
// 0: simple error
{&Response{
ServerPort: 123,
ClientPort: 234,
Error: InvalidPort,
}, "123,234:ERROR:INVALID-PORT\r\n"},
// 1: simple response, stripped US-ASCII
{&Response{
ServerPort: 123,
ClientPort: 234,
Ident: &IdentResponse{
OperatingSystem: "UNIX",
UserID: "q3k",
},
}, "123,234:USERID:UNIX:q3k\r\n"},
// 2: simple response, stripped US-ASCII
{&Response{
ServerPort: 123,
ClientPort: 234,
Ident: &IdentResponse{
OperatingSystem: "UNIX",
CharacterSet: "US-ASCII",
UserID: "q3k",
},
}, "123,234:USERID:UNIX:q3k\r\n"},
/// Unusual features
// 3: unusual response
{&Response{
ServerPort: 123,
ClientPort: 234,
Ident: &IdentResponse{
OperatingSystem: "SUN",
CharacterSet: "PETSCII",
UserID: "q3k",
},
}, "123,234:USERID:SUN,PETSCII:q3k\r\n"},
// 4: non-standard error code
{&Response{
ServerPort: 123,
ClientPort: 234,
Error: ErrorResponse("XNOMANA"),
}, "123,234:ERROR:XNOMANA\r\n"},
/// Errors
// 5: invalid error code (should return nil)
{&Response{
ServerPort: 123,
ClientPort: 234,
Error: ErrorResponse("NOT ENOUGH MANA"),
}, ""},
// 6: no error/ident set (should return nil)
{&Response{
ServerPort: 123,
ClientPort: 234,
}, ""},
// 7: both error and ident set (should return nil)
{&Response{
ServerPort: 123,
ClientPort: 234,
Ident: &IdentResponse{
OperatingSystem: "UNIX",
CharacterSet: "US-ASCII",
UserID: "q3k",
},
Error: InvalidPort,
}, ""},
} {
if want, got := te.want, string(te.r.encode()); want != got {
t.Errorf("%d: wanted %q, got %q", i, want, got)
}
}
}

View File

@ -0,0 +1,282 @@
package ident
import (
"bufio"
"context"
"fmt"
"net"
"sync"
"time"
"github.com/golang/glog"
)
// NewServer returns an ident Server.
func NewServer() *Server {
return &Server{
handler: unsetHandler,
}
}
// Server is an ident protocol server (per RFC1413). It must be configured with
// a HandlerFunc before Serve is called.
// Multiple goroutines may invoke methods on Server simultaneously, but the
// Server can only Serve one listener at a time.
type Server struct {
handler HandlerFunc
// mu guards stopC and stop.
mu sync.Mutex
// stopC is set if Serve() is already running. If it gets closed, Serve()
// will quit and set stopC to nil.
stopC chan struct{}
// stop can be set to true if Serve() is not yet running but has already
// been requested to Stop() (eg. if Serve() is ran in a goroutine which
// hasn't yet scheduled). It will be set back to false when Serve() sees it
// set and exits.
stop bool
}
// ResponseWriter is passed to HandlerFuncs and is used to signal to the Server
// that the HandlerFunc wants to respond to the incoming Request in a certain
// way.
// Only the goroutine that the HandlerFunc has been started in may invoke
// methods on the ResponseWriter.
type ResponseWriter interface {
// SendError returns an ident ErrorResponse to the ident client. This can
// only be called once, and cannot be called after SendIdent.
SendError(ErrorResponse) error
// SendIdent returns an ident IdentResponse to the ident client. This can
// only be called once, and cannot be called after SendError.
SendIdent(*IdentResponse) error
}
// HandlerFunc is a function that will be called to serve a given ident
// Request. Users of the Server must implement this and configure a Server to
// use it by invoking Server.HandleFunc.
// Each HandlerFunc will be started in its own goroutine. When HandlerFunc
// returns, the Server will attempt to serve more incoming requests from the
// ident client.
// The Server does not limit the amount of concurrent ident connections that it
// serves. If the Server user wishes to limit concurrency, she must do it
// herself, eg. by using a semaphore. The Server will continue accepting new
// connections and starting new HandlerFuncs, if the user code needs to push
// back it should return as early as possible. There currently is no way to
// make the Server refuse connections above some concurrncy limit.
// The Server does not impose any execution timeout on handlers. If the Server
// user wishes to impose an execution timeout, she must do it herself, eg.
// using context.WithTimeout or time.After.
// The passed Context will be canceled when the ident client disconnects or the
// Server shuts down. The HandlerFunc must return as early as it can detect
// that the context is done.
type HandlerFunc func(ctx context.Context, w ResponseWriter, r *Request)
// responseWriter implements ResponseWriter for a Server.
type responseWriter struct {
conn net.Conn
req *Request
responded bool
}
// sendResponse sends a Response to the ident client. The Response must already
// be fully populated.
func (w *responseWriter) sendResponse(r *Response) error {
if w.responded {
return fmt.Errorf("handler already sent a response")
}
w.responded = true
data := r.encode()
if data == nil {
return fmt.Errorf("failed to encode response")
}
glog.V(3).Infof(" -> %q", data)
_, err := w.conn.Write(data)
if err != nil {
return fmt.Errorf("writing response failed: %w", err)
}
return nil
}
func (w *responseWriter) SendError(e ErrorResponse) error {
if !e.IsError() {
return fmt.Errorf("error response must contain a valid error")
}
return w.sendResponse(&Response{
ClientPort: w.req.ClientPort,
ServerPort: w.req.ServerPort,
Error: e,
})
}
func (w *responseWriter) SendIdent(i *IdentResponse) error {
ir := *i
// TODO(q3k): enforce RFC1413 limits.
if ir.OperatingSystem == "" {
ir.OperatingSystem = "UNIX"
}
if ir.UserID == "" {
return fmt.Errorf("ident response must have UserID set")
}
return w.sendResponse(&Response{
ClientPort: w.req.ClientPort,
ServerPort: w.req.ServerPort,
Ident: &ir,
})
}
var (
unsetHandlerErrorOnce sync.Once
)
// unsetHandler is the default handler that is configured for a Server. It
// returns UNKNOWN-ERROR to the ident client and logs an error once if it's
// called (telling the user about a misconfiguration / programming error).
func unsetHandler(ctx context.Context, w ResponseWriter, r *Request) {
unsetHandlerErrorOnce.Do(func() {
glog.Errorf("Server with no handler configured - will always return UNKNOWN-ERROR")
})
w.SendError(UnknownError)
}
// HandleFunc sets the HandlerFunc that the server will call for every incoming
// ident request. If a HandlerFunc is already set, it will be overwritten by
// the given new function.
func (s *Server) HandleFunc(fn HandlerFunc) {
s.handler = fn
}
// Serve runs the ident server, blocking until a transport-level error occurs
// or Stop() is invoked. The returned error will be nil on Stop(), and will
// wrap the underlying transport-level error otherwise.
//
// Only one invokation of Serve() can be run at a time, but Serve can be called
// again after Stop() is called, and can be ran on a different Listener - no
// state is kept in between subsequent Serve() runs.
func (s *Server) Serve(lis net.Listener) error {
s.mu.Lock()
if s.stopC != nil {
s.mu.Unlock()
return fmt.Errorf("cannot Serve() an already serving server")
}
// Stop() has been invoked before Serve() started.
if s.stop == true {
s.stop = false
s.mu.Unlock()
return nil
}
// Set stopC to allow Stop() calls to stop this running Serve(). It will be
// set to nil on exit.
stopC := make(chan struct{})
s.stopC = stopC
s.mu.Unlock()
defer func() {
s.mu.Lock()
s.stopC = nil
s.mu.Unlock()
}()
ctx, ctxC := context.WithCancel(context.Background())
for {
lisConnC := make(chan net.Conn)
lisErrC := make(chan error)
go func() {
conn, err := lis.Accept()
select {
case <-stopC:
// Server stopped, drop the accepted connection (if any)
// and return.
glog.V(2).Infof("Accept goroutine stopping...")
if err == nil {
conn.Close()
}
return
default:
}
if err == nil {
glog.V(5).Infof("Accept ok: %v", conn.RemoteAddr())
lisConnC <- conn
} else {
glog.V(5).Infof("Accept err: %v", err)
lisErrC <- err
}
}()
select {
case <-stopC:
// Server stopped, return.
ctxC()
return nil
case err := <-lisErrC:
ctxC()
// Accept() failed, return error.
return err
case conn := <-lisConnC:
// Accept() succeeded, serve request.
go s.serve(ctx, conn)
}
}
}
func (s *Server) Stop() {
s.mu.Lock()
defer s.mu.Unlock()
if s.stopC != nil {
close(s.stopC)
} else {
s.stop = true
}
}
func (s *Server) serve(ctx context.Context, conn net.Conn) {
glog.V(2).Infof("Serving connection %v", conn.RemoteAddr())
scanner := bufio.NewScanner(conn)
// The RFC does not place a limit on the request line length, only on
// response length. We set an arbitrary limit to 1024 bytes.
scanner.Buffer(nil, 1024)
for {
// Implement an arbitrary timeout for receiving data from client.
// TODO(q3k): make this configurable
go func() {
timer := time.NewTimer(10 * time.Second)
defer timer.Stop()
select {
case <-ctx.Done():
return
case <-timer.C:
glog.V(1).Infof("Connection %v: terminating on receive timeout", conn.RemoteAddr())
conn.Close()
}
}()
if !scanner.Scan() {
err := scanner.Err()
if err == nil {
// EOF, just return.
return
}
// Some other transport level error occured, or the request line
// was too long. We can only log this and be done.
glog.V(1).Infof("Connection %v: scan failed: %v", conn.RemoteAddr(), err)
conn.Close()
return
}
data := scanner.Bytes()
glog.V(3).Infof(" <- %q", data)
req, err := decodeRequest(data)
if err != nil {
glog.V(1).Infof("Connection %v: could not decode request: %v", conn.RemoteAddr(), err)
conn.Close()
return
}
req.ClientAddress = conn.RemoteAddr()
rw := responseWriter{
req: req,
conn: conn,
}
s.handler(ctx, &rw, req)
if !rw.responded {
glog.Warningf("Connection %v: handler did not send response, sending UNKNOWN-ERROR", conn.RemoteAddr())
rw.SendError(UnknownError)
}
}
}

View File

@ -0,0 +1,158 @@
package ident
import (
"bufio"
"context"
"fmt"
"io"
"net"
"strings"
"testing"
)
// loopback sets up a net.Listener on any available TCP port and returns it and
// a dialer function that returns open connections to that listener.
func loopback(t *testing.T) (net.Listener, func() net.Conn) {
t.Helper()
lis, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatalf("Listen: %v", err)
}
return lis, func() net.Conn {
t.Helper()
conn, err := net.Dial("tcp", lis.Addr().String())
if err != nil {
t.Fatalf("Dial: %v", err)
}
return conn
}
}
// dumbHandler is a handler that returns USERID:UNIX:q3k for every request.
func dumbHandler(ctx context.Context, w ResponseWriter, r *Request) {
w.SendIdent(&IdentResponse{
UserID: "q3k",
})
}
// reqRessps send an ident query to the conn and expects a response with
// USERID:UNIX:q3k on the scanner.
func reqResp(t *testing.T, conn net.Conn, scanner *bufio.Scanner, client, server uint16) {
t.Helper()
if _, err := fmt.Fprintf(conn, "%d,%d\r\n", client, server); err != nil {
t.Fatalf("Write: %v", err)
}
if !scanner.Scan() {
t.Fatalf("Scan: %v", scanner.Err())
}
if want, got := fmt.Sprintf("%d,%d:USERID:UNIX:q3k", client, server), scanner.Text(); want != got {
t.Fatalf("Wanted %q, got %q", want, got)
}
}
// TestServeSimple exercises the basic Server functionality: responding to
// ident requests.
func TestServeSimple(t *testing.T) {
lis, dial := loopback(t)
defer lis.Close()
isrv := NewServer()
isrv.HandleFunc(dumbHandler)
go isrv.Serve(lis)
conn := dial()
defer conn.Close()
scanner := bufio.NewScanner(conn)
// Send a request, expect response.
reqResp(t, conn, scanner, 123, 234)
// Send another request on the same conn, expect response.
reqResp(t, conn, scanner, 234, 345)
// Send another request in parallel, expect response.
conn2 := dial()
defer conn2.Close()
scanner2 := bufio.NewScanner(conn2)
reqResp(t, conn2, scanner2, 345, 456)
}
// TestServeError exercises situations where the server has to deal with
// nasty/broken clients.
func TestServeErrors(t *testing.T) {
lis, dial := loopback(t)
defer lis.Close()
isrv := NewServer()
isrv.HandleFunc(dumbHandler)
go isrv.Serve(lis)
conn := dial()
defer conn.Close()
// Send something that's not ident.
fmt.Fprintf(conn, "GET / HTTP/1.1\r\n\r\n")
// Expect EOF on read.
data := make([]byte, 100)
_, err := conn.Read(data)
if want, got := io.EOF, err; want != got {
t.Fatalf("Expected %v, got %v", want, got)
}
conn = dial()
defer conn.Close()
// Send a very long request line, expect to not be served.
fmt.Fprintf(conn, "123,%s123\r\n", strings.Repeat(" ", 4096))
data = make([]byte, 100)
_, err = conn.Read(data)
// In a large write, the connection will be closed by the server before
// we're finished writing. That will cause the connection to be reset, not
// just EOF'd as above.
if err == nil {
t.Fatalf("Read did not fail")
}
}
// TestServerRestart ensures that the server's serve/stop logic works as expected.
func TestServerRestart(t *testing.T) {
lis, dial := loopback(t)
defer lis.Close()
isrv := NewServer()
isrv.HandleFunc(dumbHandler)
// Stop the server before it's even started.
isrv.Stop()
// The server should now exit immediately.
if err := isrv.Serve(lis); err != nil {
t.Fatalf("Serve: %v", err)
}
// On a subsequent run it should, however, start and serve.
go isrv.Serve(lis)
conn := dial()
defer conn.Close()
scanner := bufio.NewScanner(conn)
// Send a request, expect response.
reqResp(t, conn, scanner, 123, 234)
// Attempting another simultaneous Serve() shoud fail.
if err := isrv.Serve(lis); err == nil {
t.Fatal("Serve() returned nil, wanted error")
}
// Send a request, expect response.
reqResp(t, conn, scanner, 234, 345)
// Stop server, restart server.
isrv.Stop()
go isrv.Serve(lis)
// Send a request, expect response.
reqResp(t, conn, scanner, 345, 456)
}

View File

@ -0,0 +1,34 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = [
"kubenat.go",
"pods.go",
"translation.go",
],
importpath = "code.hackerspace.pl/hscloud/cluster/identd/kubenat",
visibility = ["//visibility:public"],
deps = [
"//cluster/identd/cri:go_default_library",
"@com_github_cenkalti_backoff//:go_default_library",
"@com_github_golang_glog//:go_default_library",
"@org_golang_google_grpc//:go_default_library",
"@org_golang_google_grpc//codes:go_default_library",
"@org_golang_google_grpc//status:go_default_library",
],
)
go_test(
name = "go_default_test",
srcs = [
"kubenat_test.go",
"pods_test.go",
"translation_test.go",
],
embed = [":go_default_library"],
deps = [
"@com_github_go_test_deep//:go_default_library",
"@com_github_golang_glog//:go_default_library",
],
)

View File

@ -0,0 +1,130 @@
// kubenat implements a data source for undoing NAT on hosts running
// Kubernetes/containerd workloads.
//
// It parses the kernel conntrack NAT translation table to figure out the IP
// address of the pod that was making the connection.
//
// It then uses the containerd API to figure out what pod runs under what IP
// address.
//
// Both conntrack and containerd access is cached and only updated when needed.
// This means that as long as a TCP connection is open, identd will be able to
// respond about its information without having to perform any OS/containerd
// queries.
//
// Unfortunately, there is very little in terms of development/test harnesses
// for kubenat. You will have to have a locally running containerd, or do some
// mounts/forwards from a remote host.
package kubenat
import (
"context"
"errors"
"fmt"
"net"
"time"
"github.com/cenkalti/backoff"
"github.com/golang/glog"
)
// Resolver is the main interface for kubenat. It runs background processing to
// update conntrack/containerd state, and resolves Tuple4s into PodInfo.
type Resolver struct {
conntrackPath string
criPath string
translationC chan *translationReq
podInfoC chan *podInfoReq
}
// Tuple4 is a 4-tuple of a TCP connection. Local describes the machine running
// this code, not the listen/connect 'ends' of TCP.
type Tuple4 struct {
RemoteIP net.IP
RemotePort uint16
LocalIP net.IP
LocalPort uint16
}
func (t *Tuple4) String() string {
local := net.JoinHostPort(t.LocalIP.String(), fmt.Sprintf("%d", t.LocalPort))
remote := net.JoinHostPort(t.RemoteIP.String(), fmt.Sprintf("%d", t.RemotePort))
return fmt.Sprintf("L: %s R: %s", local, remote)
}
// PodInfo describes a Kubernetes pod which terminates a given Tuple4 connection.
type PodInfo struct {
// PodIP is the IP address of the pod within the pod network.
PodIP net.IP
// PodTranslatedPort is the port on the PodIP corresponding to the Tuple4
// that this PodInfo was requested for.
PodTranslatedPort uint16
// KubernetesNamespace is the kubernetes namespace in which this pod is
// running.
KubernetesNamespace string
// Name is the name of the pod, as seen by kubernetes.
Name string
}
// NewResolver startss a resolver with a given path to /paroc/net/nf_conntrack
// and a CRI gRPC domain socket.
func NewResolver(ctx context.Context, conntrackPath, criPath string) (*Resolver, error) {
r := Resolver{
conntrackPath: conntrackPath,
criPath: criPath,
translationC: make(chan *translationReq),
podInfoC: make(chan *podInfoReq),
}
// TODO(q3k): bubble up errors from the translation worker into here?
go r.runTranslationWorker(ctx)
// The pod worker might fail on CRI connectivity issues, so we attempt to
// restart it with a backoff if needed.
go func() {
bo := backoff.NewExponentialBackOff()
bo.MaxElapsedTime = 0
bo.Reset()
for {
err := r.runPodWorker(ctx)
if err == nil || errors.Is(err, ctx.Err()) {
glog.Infof("podWorker exiting")
return
}
glog.Errorf("podWorker failed: %v", err)
wait := bo.NextBackOff()
glog.Errorf("restarting podWorker in %v", wait)
time.Sleep(wait)
}
}()
return &r, nil
}
// ResolvePod returns information about a running pod for a given TCP 4-tuple.
// If the 4-tuple or pod cannot be resolved, an error will be returned.
func (r *Resolver) ResolvePod(ctx context.Context, t *Tuple4) (*PodInfo, error) {
// TODO(q3k): expose translation/pod not found errors as package-level
// vars, or use gRPC statuses?
podAddr, err := r.translate(ctx, t)
if err != nil {
return nil, fmt.Errorf("translate: %w", err)
}
if podAddr == nil {
return nil, fmt.Errorf("translation not found")
}
podInfo, err := r.getPodInfo(ctx, podAddr.localIP)
if err != nil {
return nil, fmt.Errorf("getPodInfo: %w", err)
}
if podInfo == nil {
return nil, fmt.Errorf("pod not found")
}
return &PodInfo{
PodIP: podAddr.localIP,
PodTranslatedPort: podAddr.localPort,
KubernetesNamespace: podInfo.namespace,
Name: podInfo.name,
}, nil
}

View File

@ -0,0 +1,43 @@
package kubenat
import (
"context"
"flag"
"net"
"testing"
)
func TestResolvePod(t *testing.T) {
t.Skip("needs containerd running on host and unhardcoded test data")
flag.Set("logtostderr", "true")
ctx, ctxC := context.WithCancel(context.Background())
defer ctxC()
r, err := NewResolver(ctx, "/tmp/conntrack", "/tmp/containerd.sock")
if err != nil {
t.Fatalf("NewResolver: %v", err)
}
pi, err := r.ResolvePod(ctx, &Tuple4{
RemoteIP: net.IPv4(185, 191, 225, 10),
RemotePort: 6697,
LocalIP: net.IPv4(185, 236, 240, 36),
LocalPort: 53449,
})
if err != nil {
t.Fatalf("ResolvePod: %v", err)
}
if want, got := net.IPv4(10, 10, 26, 23), pi.PodIP; !want.Equal(got) {
t.Errorf("Wanted pod IP %v, got %v", want, got)
}
if want, got := uint16(54782), pi.PodTranslatedPort; want != got {
t.Errorf("Wanted pod port %d, got %d", want, got)
}
if want, got := "matrix", pi.KubernetesNamespace; want != got {
t.Errorf("Wanted pod namespace %q, got %q", want, got)
}
if want, got := "appservice-irc-freenode-68977cdd5f-kfzl6", pi.Name; want != got {
t.Errorf("Wanted pod name %q, got %q", want, got)
}
}

View File

@ -0,0 +1,176 @@
package kubenat
import (
"context"
"fmt"
"net"
"github.com/golang/glog"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"code.hackerspace.pl/hscloud/cluster/identd/cri"
)
// podInfoReq is a request passed to the podWorker.
type podInfoReq struct {
local net.IP
res chan *podInfoResp
}
// podInfoResp is a response from a podWorker, sent over the res channel in a
// podInfoReq.
type podInfoResp struct {
name string
namespace string
}
// reply sends a reply to the given podInfoReq based on a CRI PodSandboxStatus,
// sending nil if the status is nil.
func (r *podInfoReq) reply(s *cri.PodSandboxStatus) {
if s == nil {
r.res <- nil
return
}
r.res <- &podInfoResp{
name: s.Metadata.Name,
namespace: s.Metadata.Namespace,
}
}
// getPodInfo performs a podInfoReq/podInfoResp exchange under a context that
// can be used to time out the query.
func (r *Resolver) getPodInfo(ctx context.Context, local net.IP) (*podInfoResp, error) {
resC := make(chan *podInfoResp, 1)
r.podInfoC <- &podInfoReq{
local: local,
res: resC,
}
select {
case <-ctx.Done():
return nil, ctx.Err()
case res := <-resC:
return res, nil
}
}
// podStatus is a cache of data retrieved from CRI.
type podStatus struct {
// info is a map from pod sandbox ID to PodSandboxStatus as retrieved from
// CRI.
info map[string]*cri.PodSandboxStatus
// byIP is a map from pod IP (as string) to pod sandbox ID.
byIP map[string]string
}
// update performs an update of the podStatus from CRI. It only retrieves
// information about pods that it doesn't yet have, and ensures that pods which
// do not exist in CRI are also removed from podStatus.
// TODO(q3k): make sure we don't cache PodSandboxStatus too early, eg. when
// it's not yet fully running?
func (p *podStatus) update(ctx context.Context, client cri.RuntimeServiceClient) error {
res, err := client.ListPodSandbox(ctx, &cri.ListPodSandboxRequest{})
if err != nil {
return fmt.Errorf("ListPodSandbox: %w", err)
}
// set of all pod sandbox IDs in CRI.
want := make(map[string]bool)
// set of pod sandbox IDs in CRI that are not in podStatus.
missing := make(map[string]bool)
for _, item := range res.Items {
want[item.Id] = true
if _, ok := p.info[item.Id]; ok {
continue
}
missing[item.Id] = true
}
// Get information about missing pod IDs into podStatus.
for id, _ := range missing {
res, err := client.PodSandboxStatus(ctx, &cri.PodSandboxStatusRequest{
PodSandboxId: id,
})
if err != nil {
if st, ok := status.FromError(err); ok && st.Code() == codes.NotFound {
continue
} else {
return fmt.Errorf("while getting sandbox %s: %v", id, err)
}
}
p.info[id] = res.Status
}
// byIP is fully repopulated on each update.
p.byIP = make(map[string]string)
// remove is the set of pods sandbox IDs that should be removed from podStatus.
remove := make(map[string]bool)
// Populate remove and p.byId in a single pass.
for id, info := range p.info {
if _, ok := want[id]; !ok {
remove[id] = true
continue
}
if info.Network == nil {
continue
}
if info.Network.Ip == "" {
continue
}
p.byIP[info.Network.Ip] = id
}
// Remove stale pod sandbox IDs from podStatus.
for id, _ := range remove {
delete(p.info, id)
}
return nil
}
// findByPodID returns a PodSandboxStatus for the pod running under a given pod
// IP address, or nil if not found.
func (p *podStatus) findByPodIP(ip net.IP) *cri.PodSandboxStatus {
id, ok := p.byIP[ip.String()]
if !ok {
return nil
}
return p.info[id]
}
// runPodWorker runs the CRI cache 'pod worker'. It responds to requests over
// podInfoC until ctx is canceled.
func (r *Resolver) runPodWorker(ctx context.Context) error {
conn, err := grpc.Dial(fmt.Sprintf("unix://%s", r.criPath), grpc.WithInsecure())
if err != nil {
return fmt.Errorf("Dial: %w", err)
}
defer conn.Close()
client := cri.NewRuntimeServiceClient(conn)
ps := &podStatus{
info: make(map[string]*cri.PodSandboxStatus),
}
if err := ps.update(ctx, client); err != nil {
return fmt.Errorf("initial pod update: %w", err)
}
for {
select {
case req := <-r.podInfoC:
info := ps.findByPodIP(req.local)
if info != nil {
req.reply(info)
continue
}
err := ps.update(ctx, client)
if err != nil {
glog.Errorf("Updating pods failed: %v", err)
continue
}
req.reply(ps.findByPodIP(req.local))
case <-ctx.Done():
return ctx.Err()
}
}
}

View File

@ -0,0 +1,42 @@
package kubenat
import (
"context"
"flag"
"net"
"testing"
"github.com/golang/glog"
)
func TestPodWorker(t *testing.T) {
t.Skip("needs containerd running on host and unhardcoded test data")
flag.Set("logtostderr", "true")
r := &Resolver{
criPath: "/tmp/containerd.sock",
podInfoC: make(chan *podInfoReq),
}
ctx, ctxC := context.WithCancel(context.Background())
defer ctxC()
go func() {
err := r.runPodWorker(ctx)
if err != nil && err != ctx.Err() {
glog.Errorf("runPodWorker: %v", err)
}
}()
res, err := r.getPodInfo(ctx, net.IPv4(10, 10, 26, 23))
if err != nil {
t.Fatalf("got err: %v", err)
}
if res == nil {
t.Fatalf("got nil pod response")
}
if want, got := "matrix", res.namespace; want != got {
t.Errorf("namespace: got %q, wanted %q", want, got)
}
}

View File

@ -0,0 +1,341 @@
package kubenat
import (
"bufio"
"bytes"
"context"
"fmt"
"io/ioutil"
"net"
"strconv"
"strings"
"github.com/golang/glog"
)
// translationReq is a request passed to the translationWorker.
type translationReq struct {
t *Tuple4
res chan *translationResp
}
// translationResp is a response from the translationWorker, sent over the res
// channel in a translationReq.
type translationResp struct {
localIP net.IP
localPort uint16
}
// reply sends a reply to the given translationReq based on a conntrackEntry,
// sending nil if the entry is nil.
func (r *translationReq) reply(e *conntrackEntry) {
if e == nil {
r.res <- nil
return
}
localPort, err := strconv.ParseUint(e.request["sport"], 10, 16)
if err != nil {
r.res <- nil
return
}
r.res <- &translationResp{
localIP: net.ParseIP(e.request["src"]),
localPort: uint16(localPort),
}
}
// translate performs a translationReq/translationResp exchange under a context
// that can be used to time out the query.
func (r *Resolver) translate(ctx context.Context, t *Tuple4) (*translationResp, error) {
resC := make(chan *translationResp, 1)
r.translationC <- &translationReq{
t: t,
res: resC,
}
select {
case <-ctx.Done():
return nil, ctx.Err()
case res := <-resC:
return res, nil
}
}
// conntrackEntry is an entry parsed from /proc/net/nf_conntrack. The format is
// not well documented, and the best resource I could find is:
// https://stackoverflow.com/questions/16034698/details-of-proc-net-ip-conntrack-and-proc-net-nf-conntrack
type conntrackEntry struct {
// networkProtocol is currently always "ipv4".
networkProtocol string
// transmissionProtocol is currently "tcp" or "udp".
transmissionProtocol string
invalidateTimeout int64
state string
// request key-value pairs. For NAT, these are entries relating to the
// connection as seen as the 'inside' of the NAT, eg. the pod-originated
// connection.
request map[string]string
// response key-value parirs. For NAT, these are entries relating to the
// connection as seen by the 'outside' of the NAT, eg. the internet.
response map[string]string
tags map[string]bool
}
// conntrackParseEntry parses a line from /proc/net/nf_conntrack into a conntrackEntry.
func conntrackParseEntry(line string) (*conntrackEntry, error) {
entry := conntrackEntry{
request: make(map[string]string),
response: make(map[string]string),
tags: make(map[string]bool),
}
fields := strings.Fields(line)
if len(fields) < 5 {
// This should never happen unless the file format drastically
// changed. Don't bother to parse the rest, error early, and let
// someone debug this.
return nil, fmt.Errorf("invalid field count: %v", fields)
}
switch fields[0] {
case "ipv4":
if fields[1] != "2" {
return nil, fmt.Errorf("ipv4 with proto number %q, wanted 2", fields[1])
}
// TODO(q3k): support IPv6 when we get it on prod.
default:
return nil, nil
}
entry.networkProtocol = fields[0]
rest := fields[5:]
switch fields[2] {
case "tcp":
if fields[3] != "6" {
return nil, fmt.Errorf("tcp with proto number %q, wanted 6", fields[3])
}
if len(fields) < 6 {
return nil, fmt.Errorf("tcp with missing state field")
}
entry.state = fields[5]
rest = fields[6:]
case "udp":
if fields[3] != "17" {
return nil, fmt.Errorf("udp with proto number %q, wanted 17", fields[3])
}
default:
return nil, nil
}
entry.transmissionProtocol = fields[2]
invalidateTimeout, err := strconv.ParseInt(fields[4], 10, 64)
if err != nil {
return nil, fmt.Errorf("unparseable timeout %q", fields[4])
}
entry.invalidateTimeout = invalidateTimeout
for _, el := range rest {
parts := strings.Split(el, "=")
switch len(parts) {
case 1:
// This is a tag.
tag := parts[0]
// Ensure the tag starts and ends with [] (eg. [ASSURED].
if !strings.HasPrefix(tag, "[") || !strings.HasSuffix(tag, "]") {
continue
}
// Strip [ and ].
tag = tag[1:]
tag = tag[:len(tag)-1]
if _, ok := entry.tags[tag]; ok {
return nil, fmt.Errorf("repeated tag %q", tag)
}
entry.tags[tag] = true
case 2:
// This is a k/v field.
k := parts[0]
v := parts[1]
if _, ok := entry.request[k]; ok {
if _, ok := entry.response[k]; ok {
return nil, fmt.Errorf("field %q encountered more than twice", k)
} else {
entry.response[k] = v
}
} else {
entry.request[k] = v
}
default:
return nil, fmt.Errorf("unparseable column %q", el)
}
}
return &entry, nil
}
// conntrackParse parses the contents of a /proc/net/nf_conntrack file into
// multiple entries. If the majority of the entries could not be parsed, an
// error is returned.
func conntrackParse(data []byte) ([]conntrackEntry, error) {
buf := bytes.NewBuffer(data)
scanner := bufio.NewScanner(buf)
var res []conntrackEntry
var errors []error
for scanner.Scan() {
line := strings.TrimSpace(scanner.Text())
if line == "" {
continue
}
entry, err := conntrackParseEntry(line)
if err != nil {
glog.Errorf("Error while parsing %q: %v", line, err)
errors = append(errors, err)
} else if entry != nil {
res = append(res, *entry)
}
}
if len(errors) == 0 || len(errors) < len(res) {
return res, nil
} else {
return nil, fmt.Errorf("encountered too many errors during conntrack parse, check logs; first error: %w", errors[0])
}
}
// contrackIndex is an index into a list of conntrackEntries. It allows lookup
// by request/response k/v pairs.
type conntrackIndex struct {
entries []conntrackEntry
// byRequest is a map from key to value to list of indixes into entries.
byRequest map[string]map[string][]int
// byResponse is a map from key to value to list of indixes into entries.
byResponse map[string]map[string][]int
}
// buildIndex builds a conntrackIndex from a list of conntrackEntries.
func buildIndex(entries []conntrackEntry) *conntrackIndex {
ix := conntrackIndex{
entries: entries,
byRequest: make(map[string]map[string][]int),
byResponse: make(map[string]map[string][]int),
}
for i, entry := range ix.entries {
for k, v := range entry.request {
if _, ok := ix.byRequest[k]; !ok {
ix.byRequest[k] = make(map[string][]int)
}
ix.byRequest[k][v] = append(ix.byRequest[k][v], i)
}
for k, v := range entry.response {
if _, ok := ix.byResponse[k]; !ok {
ix.byResponse[k] = make(map[string][]int)
}
ix.byResponse[k][v] = append(ix.byResponse[k][v], i)
}
}
return &ix
}
// getByRequest returns conntrackEntries that match a given k/v pair in their
// request fields.
func (c *conntrackIndex) getByRequest(k, v string) []*conntrackEntry {
m, ok := c.byRequest[k]
if !ok {
return nil
}
ixes, ok := m[v]
if !ok {
return nil
}
res := make([]*conntrackEntry, len(ixes))
for i, ix := range ixes {
res[i] = &c.entries[ix]
}
return res
}
// getByResponse returns conntrackEntries that match a given k/v pair in their
// response fields.
func (c *conntrackIndex) getByResponse(k, v string) []*conntrackEntry {
m, ok := c.byResponse[k]
if !ok {
return nil
}
ixes, ok := m[v]
if !ok {
return nil
}
res := make([]*conntrackEntry, len(ixes))
for i, ix := range ixes {
res[i] = &c.entries[ix]
}
return res
}
// find returns a conntrackEntry corresponding to a TCP connection defined on
// the 'outside' of the NAT by a 4-tuple, or nil if no such connection is
// found.
func (c *conntrackIndex) find(t *Tuple4) *conntrackEntry {
// TODO(q3k): support IPv6
if t.RemoteIP.To4() == nil || t.LocalIP.To4() == nil {
return nil
}
entries := c.getByResponse("src", t.RemoteIP.String())
for _, entry := range entries {
if entry.transmissionProtocol != "tcp" {
continue
}
if entry.response["sport"] != fmt.Sprintf("%d", t.RemotePort) {
continue
}
if entry.response["dst"] != t.LocalIP.String() {
continue
}
if entry.response["dport"] != fmt.Sprintf("%d", t.LocalPort) {
continue
}
return entry
}
return nil
}
// runTranslationWorker runs the conntrack 'translation worker'. It responds to
// requests over translationC until ctx is canceled.
func (r *Resolver) runTranslationWorker(ctx context.Context) {
var ix *conntrackIndex
readConntrack := func() {
var entries []conntrackEntry
data, err := ioutil.ReadFile(r.conntrackPath)
if err != nil {
glog.Errorf("Failed to read conntrack file: %v", err)
} else {
entries, err = conntrackParse(data)
if err != nil {
glog.Errorf("failed to parse conntrack entries: %v", err)
}
}
ix = buildIndex(entries)
}
readConntrack()
for {
select {
case req := <-r.translationC:
entry := ix.find(req.t)
if entry != nil {
req.reply(entry)
} else {
readConntrack()
entry = ix.find(req.t)
if entry != nil {
req.reply(entry)
} else {
req.reply(nil)
}
}
case <-ctx.Done():
return
}
}
}

View File

@ -0,0 +1,130 @@
package kubenat
import (
"context"
"flag"
"io/ioutil"
"net"
"os"
"testing"
"github.com/go-test/deep"
)
// testConntrack is the anonymized content of a production host.
// The first entry is an appservice-irc connection from a pod to an IRC server.
// The second connection is an UDP connection between two pods.
// The third to last entry is not a NAT entry, but an incoming external
// connection.
// The fourth connection has a mangled/incomplete entry.
const testConntrack = `
ipv4 2 tcp 6 86384 ESTABLISHED src=10.10.26.23 dst=192.0.2.180 sport=51336 dport=6697 src=192.0.2.180 dst=185.236.240.36 sport=6697 dport=28706 [ASSURED] mark=0 zone=0 use=2
ipv4 2 udp 17 35 src=10.10.24.162 dst=10.10.26.108 sport=49347 dport=53 src=10.10.26.108 dst=10.10.24.162 sport=53 dport=49347 [ASSURED] mark=0 zone=0 use=2
ipv4 2 tcp 6 2 SYN_SENT src=198.51.100.67 dst=185.236.240.56 sport=51053 dport=3359 [UNREPLIED] src=185.236.240.56 dst=198.51.100.67 sport=3359 dport=51053 mark=0 zone=0 use=2
ipv4 2 tcp 6 2
`
// TestConntrackParse exercises the conntrack parser for all entries in testConntrack.
func TestConntrackParse(t *testing.T) {
// Last line is truncated and should be ignored.
got, err := conntrackParse([]byte(testConntrack))
if err != nil {
t.Fatalf("conntrackParse: %v", err)
}
want := []conntrackEntry{
{
"ipv4", "tcp", 86384, "ESTABLISHED",
map[string]string{
"src": "10.10.26.23", "dst": "192.0.2.180", "sport": "57640", "dport": "6697",
"mark": "0", "zone": "0", "use": "2",
},
map[string]string{
"src": "192.0.2.180", "dst": "185.236.240.36", "sport": "6697", "dport": "28706",
},
map[string]bool{
"ASSURED": true,
},
},
{
"ipv4", "udp", 35, "",
map[string]string{
"src": "10.10.24.162", "dst": "10.10.26.108", "sport": "49347", "dport": "53",
"mark": "0", "zone": "0", "use": "2",
},
map[string]string{
"src": "10.10.26.108", "dst": "10.10.24.162", "sport": "53", "dport": "49347",
},
map[string]bool{
"ASSURED": true,
},
},
{
"ipv4", "tcp", 2, "SYN_SENT",
map[string]string{
"src": "198.51.100.67", "dst": "185.236.240.56", "sport": "51053", "dport": "3359",
"mark": "0", "zone": "0", "use": "2",
},
map[string]string{
"src": "185.236.240.56", "dst": "198.51.100.67", "sport": "3359", "dport": "51053",
},
map[string]bool{
"UNREPLIED": true,
},
},
}
if diff := deep.Equal(want, got); diff != nil {
t.Error(diff)
}
ix := buildIndex(got)
if want, got := 0, len(ix.getByRequest("src", "1.2.3.4")); want != got {
t.Errorf("by request, src, 1.2.3.4 should have returned %d result, wanted %d", want, got)
}
if want, got := 1, len(ix.getByRequest("src", "10.10.26.23")); want != got {
t.Errorf("by request, src, 1.2.3.4 should have returned %d result, wanted %d", want, got)
}
if want, got := "10.10.26.23", ix.getByRequest("src", "10.10.26.23")[0].request["src"]; want != got {
t.Errorf("by request, wanted src %q, got %q", want, got)
}
if want, got := 3, len(ix.getByRequest("mark", "0")); want != got {
t.Errorf("by request, mark, 0 should have returned %d result, wanted %d", want, got)
}
}
// TestTranslationWorker exercises a translation worker with a
// testConntrack-backed conntrack file.
func TestTranslationWorker(t *testing.T) {
flag.Set("logtostderr", "true")
tmpfile, err := ioutil.TempFile("", "conntack")
if err != nil {
t.Fatal(err)
}
defer os.Remove(tmpfile.Name())
if _, err := tmpfile.Write([]byte(testConntrack)); err != nil {
t.Fatal(err)
}
r := &Resolver{
conntrackPath: tmpfile.Name(),
translationC: make(chan *translationReq),
}
ctx, ctxC := context.WithCancel(context.Background())
defer ctxC()
go r.runTranslationWorker(ctx)
res, err := r.translate(ctx, &Tuple4{
RemoteIP: net.ParseIP("192.0.2.180"),
RemotePort: 6697,
LocalIP: net.ParseIP("185.236.240.36"),
LocalPort: 28706,
})
if err != nil {
t.Fatalf("translate: %v", err)
}
if want, got := net.ParseIP("10.10.26.23"), res.localIP; !want.Equal(got) {
t.Errorf("local ip: wanted %v, got %v", want, got)
}
if want, got := uint16(51336), res.localPort; want != got {
t.Errorf("local port: wanted %d, got %d", want, got)
}
}

195
cluster/identd/main.go Normal file
View File

@ -0,0 +1,195 @@
package main
import (
"context"
"errors"
"flag"
"fmt"
"net"
"os"
"os/signal"
"strings"
"github.com/golang/glog"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"code.hackerspace.pl/hscloud/cluster/identd/ident"
"code.hackerspace.pl/hscloud/cluster/identd/kubenat"
"code.hackerspace.pl/hscloud/go/mirko"
)
func init() {
flag.Set("logtostderr", "true")
}
var (
flagIdentdListen = "127.0.0.1:8113"
flagContainerdSocket = "/var/run/containerd/containerd.sock"
flagConntrackProc = "/proc/net/nf_conntrack"
flagPodName = ""
flagPodNamespace = ""
)
func main() {
flag.StringVar(&flagIdentdListen, "identd_listen", flagIdentdListen, "Address at which to listen for incoming ident protocol connections")
flag.StringVar(&flagContainerdSocket, "identd_containerd_socket", flagContainerdSocket, "Containerd gRPC socket path")
flag.StringVar(&flagConntrackProc, "identd_conntrack_proc", flagConntrackProc, "Conntrack procfs file")
flag.StringVar(&flagPodName, "identd_pod_name", flagPodName, "Name of this pod, if on k8s. Needed for public IP resolution.")
flag.StringVar(&flagPodNamespace, "identd_pod_namespace", flagPodNamespace, "Namespace where this pod is running, if on k8s. Needed for public IP resolution.")
flag.Parse()
ctx, ctxC := context.WithCancel(context.Background())
resolver, err := kubenat.NewResolver(ctx, flagConntrackProc, flagContainerdSocket)
if err != nil {
glog.Exitf("Could not start kubenet resolver: %v", err)
}
var localIP net.IP
localIPStr, _, err := net.SplitHostPort(flagIdentdListen)
if err != nil {
glog.Warningf("Could not parse identd listen flag %q", flagIdentdListen)
} else {
localIP = net.ParseIP(localIPStr)
if localIP == nil || !localIP.IsGlobalUnicast() {
glog.Warningf("Could not parse unicast IP from identd flag %q", localIPStr)
localIP = nil
}
}
if localIP == nil {
glog.Infof("Could not figure out public IP address for identd, attempting to retrieve from k8s...")
cs := mirko.KubernetesClient()
if cs == nil {
glog.Exitf("Not in k8s and identd_listen set to invalid public IP address - exiting.")
}
if flagPodName == "" {
glog.Exitf("identd_pod_name must be set")
}
if flagPodNamespace == "" {
glog.Exitf("identd_pod_namespace must be set")
}
pod, err := cs.CoreV1().Pods(flagPodNamespace).Get(ctx, flagPodName, v1.GetOptions{})
if err != nil {
glog.Exitf("Could not find pod %q in namespace %q: %v", flagPodName, flagPodNamespace, err)
}
ipStr := pod.Status.HostIP
if ipStr == "" {
glog.Exitf("HostIP in status of pod %q is empty", flagPodName)
}
glog.Infof("Resolved k8s node IP to %s", ipStr)
localIP = net.ParseIP(ipStr)
if localIP == nil {
glog.Exitf("HostIP in status of pod %q is unparseable", flagPodName, ipStr)
}
}
glog.Infof("Will respond to identd queries on %s...", localIP)
s := &service{
resolver: resolver,
localIP: localIP,
}
lis, err := net.Listen("tcp", flagIdentdListen)
if err != nil {
glog.Exitf("Could not listen for identd: %v", err)
}
isrv := ident.NewServer()
isrv.HandleFunc(s.handleIdent)
go func() {
glog.Infof("Starting identd on %s...", flagIdentdListen)
err := isrv.Serve(lis)
if err != nil {
glog.Exitf("identd Serve: %v", err)
}
}()
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, os.Interrupt)
go func() {
<-signalChan
ctxC()
}()
<-ctx.Done()
glog.Infof("Stopping identd...")
isrv.Stop()
lis.Close()
}
type service struct {
resolver *kubenat.Resolver
localIP net.IP
}
func (s *service) handleIdent(ctx context.Context, w ident.ResponseWriter, r *ident.Request) {
clientIPStr, _, err := net.SplitHostPort(r.ClientAddress.String())
if err != nil {
glog.Errorf("Unparseable ClientAddres %q", r.ClientAddress)
w.SendError(ident.UnknownError)
return
}
clientIP := net.ParseIP(clientIPStr)
if clientIP == nil {
glog.Errorf("Unparseable ClientAddres IP %q", r.ClientAddress)
w.SendError(ident.UnknownError)
return
}
t4 := kubenat.Tuple4{
RemoteIP: clientIP,
RemotePort: r.ClientPort,
LocalIP: s.localIP,
LocalPort: r.ServerPort,
}
glog.Infof("Running query for %s...", t4.String())
info, err := s.resolver.ResolvePod(ctx, &t4)
if err != nil {
glog.Errorf("ResolvePod(%q): %v", t4.String(), err)
w.SendError(ident.NoUser)
return
}
ns := info.KubernetesNamespace
pod := info.Name
if ns == "matrix" && strings.HasPrefix(pod, "appservice-irc-") {
target := net.JoinHostPort(info.PodIP.String(), "1113")
clientPort := r.ClientPort
serverPort := info.PodTranslatedPort
glog.Infof("Forwarding to appservice-irc at %q, clientPort: %d, serverPort: %d", target, clientPort, serverPort)
res, err := ident.Query(ctx, target, clientPort, serverPort)
if err != nil {
var identErr *ident.IdentError
if errors.As(err, &identErr) {
glog.Infof("appservice-irc: %s", identErr.Inner)
w.SendError(identErr.Inner)
} else {
glog.Infof("appservice-irc: error: %v", err)
w.SendError(ident.UnknownError)
}
} else {
glog.Infof("Response from appservice-irc: %q", res.UserID)
w.SendIdent(&ident.IdentResponse{
UserID: res.UserID,
})
}
return
}
// default to kns-*
user := fmt.Sprintf("kns-%s", ns)
// q3k's old personal namespace.
if ns == "q3k" {
user = "q3k"
}
// personal-* namespaces.
if strings.HasPrefix(ns, "personal-") {
user = strings.TrimPrefix(ns, "personal-")
}
glog.Infof("Returning %q (from %q) for %q", user, ns, t4.String())
w.SendIdent(&ident.IdentResponse{
UserID: user,
})
}

View File

@ -10,6 +10,7 @@ local policies = import "../../kube/policies.libsonnet";
local calico = import "lib/calico.libsonnet";
local certmanager = import "lib/cert-manager.libsonnet";
local coredns = import "lib/coredns.libsonnet";
local identd = import "lib/identd.libsonnet";
local metallb = import "lib/metallb.libsonnet";
local metrics = import "lib/metrics.libsonnet";
local nginx = import "lib/nginx.libsonnet";
@ -204,6 +205,9 @@ local pki = import "lib/pki.libsonnet";
},
},
// Ident service
identd: identd.Environment {},
// Rook Ceph storage operator.
rook: rook.Operator {
operator+: {

View File

@ -0,0 +1,7 @@
// Only the identd instance in k0.
local k0 = (import "k0.libsonnet").k0;
{
identd: k0.cluster.identd,
}

View File

@ -0,0 +1,112 @@
// Deploys identd, an ident protocol (RFC1413) service on all cluster nodes.
//
// See //cluster/identd for more information about the service and how it
// works.
//
// Deployment notes:
// - We run the service within the host network namespace, as that's the only
// way to reliably route traffic destined for a given node into a service
// running on that node. We could use NodePort services, but low-numbered
// ports are denied by kubelet with out current configuration.
// - We pass the host containerd socket to the service, and the service runs
// as root. This means that the service basically has root on the machine.
// The fact that it's directly exposed to the Internet isn't great, but the
// only alternative seems to to split this up into two services - one
// responsible for maintaining the state of translations/pods (running as
// root), another responding to the actual queries. Considering this is Go
// and we run a bare minimum of code (no mirko default service), this is
// probably good enough and not worth the extra complexity.
// - The service has to be able to resolve the node IP on which it's running,
// to know what address to look up in the conntrack table. Currently it does
// so by connection to the k8s API and getting information about its own pod
// and then getting the node's IP address from there.
// This might be overly complicated, as perhaps we can
// 1. figure out the server IP from the incoming ident connections, or
// 2. find a better way to retrieve the nodeIP via the 'downstream API'
// TODO(q3k): figure this out.
local kube = import "../../../kube/kube.libsonnet";
local policies = import "../../../kube/policies.libsonnet";
{
Environment: {
local env = self,
local cfg = env.cfg,
cfg:: {
namespace: "identd",
image: "registry.k0.hswaw.net/q3k/identd:315532800-9f29c14dad77036d0820948139b5aee3fac25d59",
},
namespace: kube.Namespace(cfg.namespace),
local ns = self.namespace,
allowInsecure: policies.AllowNamespaceInsecure(cfg.namespace),
sa: ns.Contain(kube.ServiceAccount("identd")),
role: ns.Contain(kube.Role("access-pod-info")) {
rules: [
{
apiGroups: [""],
resources: ["pods"],
verbs: ["get"],
},
],
},
rb: ns.Contain(kube.RoleBinding("identd")) {
roleRef_: env.role,
subjects_: [env.sa],
},
daemonset: ns.Contain(kube.DaemonSet("identd")) {
spec+: {
template+: {
spec+: {
serviceAccountName: env.sa.metadata.name,
hostNetwork: true,
containers_: {
default: kube.Container("default") {
image: cfg.image,
env_: {
POD_NAME: kube.FieldRef("metadata.name"),
POD_NAMESPACE: kube.FieldRef("metadata.namespace"),
},
command: [
"/cluster/identd/identd",
"-identd_listen", "0.0.0.0:113",
"-identd_conntrack_proc", "/host/conntrack",
"-identd_containerd_socket", "/host/containerd.sock",
// Used by the service to figure out which
// node it's running on.
"-identd_pod_name", "$(POD_NAME)",
"-identd_pod_namespace", "$(POD_NAMESPACE)",
"-logtostderr",
],
volumeMounts_: {
conntrack: { mountPath: "/host/conntrack", },
containerd: { mountPath: "/host/containerd.sock", },
},
resources: {
requests: {
cpu: "0.1",
memory: "64M",
},
// Allow identd to spike to 1 CPU. This
// makes it faster when fetching
// information from containerd.
limits: {
cpu: "1",
memory: "256M",
},
},
},
},
volumes_: {
conntrack: kube.HostPathVolume("/proc/net/nf_conntrack", "File"),
containerd: kube.HostPathVolume("/var/run/containerd/containerd.sock", "Socket"),
},
},
},
},
},
}
}