Kubernetes | Custom Ingress Controller

2021 01 17, Sun

上次我们提到说Ingress Controller就是一个外部可以访问到的Service,背后的Pod把流量代理到其他地方。提供服务的Pod可以监听Ingress这项资源,根据资源变动来修改代理配置,所以叫做Ingress Controller。

今天我们用Go写一个简单的Ingress Controller。仅供展示操作K8S内置API的方法,帮助理解Ingress Controller具体做了什么,并不具有在生产上使用的能力。

用到的SDK

k8s.io/client-go

k8s.io/client-go是k8s主要用的Go-SDK,可以非常方便的和k8s里面自带的各个group中的API交互。

版本号有三大类,除了一个已经废弃的版本号记法以外,比较常用的版本号和K8S集群的关系是集群版本1.x.y对应v0.x.y。要记得go get k8s.io/client-go@1.x.y。比如说我测试用的minikube集群,k8s版本是1.20.0,那么我对应的sdk版本,就应该用v0.20.0

Gorilla Mux

一个相对来说比较平衡的http路由。不多介绍了。

实现

我们今天要做的就是要写一个服务,根据Ingress配置自己的路径。可以在本地运行,也可以放到集群里面、配置Service后使用。

要解决的就是这么几个问题:

  • 启动http服务
  • 连接集群
  • 根据Ingress配置自己
    • 删除Ingress或者修改Ingress的时候删除路由规则
    • 增加Ingress或者修改Ingress的时候增加路由
    • 更新Ingress的Status

程序使用的参数

监听什么端口,指定使用的kubeconfig,如果不指定就当作是在集群里面运行。

var (
  fListenAddr = flag.String("listen", envString("LISTEN_ADDR", ":1234"), "")
  fKubeConfigPath = flag.String("kubeconfig", envString("KUBECONFIG", ""), "")
)

func main() {
  flag.Parse()
}

// ...

func envString(k, d string) string {
  if v, ok := os.LookupEnv(k); ok {
    return v
  }
  return d
}

共用的变量

我们主要是需要在httpServer和Controller之间共用router mux。除此之外,因为要启动两个goroutine,所以最好是用一个errgroup。

func main() {
  // ...
  ctx := context.Background()
  errGroup, ctx := errgroup.WithContext(ctx)
  router := mux.NewRouter()
}

http服务

用router作为handler,启动一个服务。context用

func main() {
  // ...
  httpServer := http.Server{
    Addr: *fListenAddr,
    Handler: router,
  }
  errGroup.Go(func() error {
    log.Println("listen on ", *fListenAddr)
    return httpServer.ListenAndServe()
  })
}

定义Controller

其实就是构建K8S的连接参数,和router一起保存到struct里面作为上下文。

type Controller struct {
  client *kubernetes.Clientset

  router *mux.Router
}

func New(kubeConfig string, router *mux.Router) (*Controller, error) {
  config, err := clientcmd.BuildConfigFromFlags("", kubeConfig)
  if err != nil {
    return nil, err
  }
  clientSet, err := kubernetes.NewForConfig(config)
  if err != nil {
    return nil, err
  }
  return &Controller{client: clientSet, router: router}, nil
}

启动Controller

func (c *Controller) Run(ctx context.Context) error {
  watcher, err := c.client.NetworkingV1().Ingresses("default").Watch(ctx, metav1.ListOptions{})
  if err != nil {
    return err
  }
  for {
    select {
    case event := <-watcher.ResultChan():
      // ...
    
    case <-ctx.Done():
      return nil
    }
  }
}

// ...


func main() {
  // ...
  errGroup.Go(func() error {
    ingressController, err := controller.New(*fKubeConfigPath, router)
    if err != nil {
      return err
    }

    log.Println("watching for ingress")
    return ingressController.Run(ctx)
  })
}

然后main函数里面就只需要阻塞直到全部返回nil或者有一个goroutine返回error。

  if err := errGroup.Wait(); err != nil {
    log.Println(err)
  }

处理消息

我们上面定义了Run函数,里面不停的处理Watcher的事件。我们写一个简单的逻辑来处理这些事件就行。

就是上面说的,删除或者修改的时候先删掉route,增加或者修改的时候加上route,这样就可以处理ingress的变化。

  for {
    select {
    case event := <-watcher.ResultChan():
      switch obj := event.Object.(type) {
      case *v1.Ingress:
        switch event.Type {
        case watch.Added:
          c.add(obj)
        case watch.Deleted:
          c.remove(obj)
        case watch.Modified:
          c.remove(obj)
          c.add(obj)
        }
      }
    case <-ctx.Done():
      return nil
    }
  }

增加和删除的逻辑

这部分比较简单,无非就是一条条创建规则,每条规则都有一个名字,删除的时候设置这个名字的handler为nil(这里有点小问题,我没找到哪里删除route,设置为nil还是会影响性能)。


func (c *Controller) add(ing *v1.Ingress) {
  for _, rule := range ing.Spec.Rules {
    for _, path := range rule.IngressRuleValue.HTTP.Paths {
      route := c.router.Name(fmt.Sprintf("%v:%v", rule.Host, path.Path))
      if rule.Host != "" {
        route = route.Host(rule.Host)
      }
      if path.PathType == nil || *path.PathType == v1.PathTypeImplementationSpecific || *path.PathType == v1.PathTypePrefix {
        route = route.PathPrefix(path.Path)
      } else {
        route = route.Path(path.Path)
      }
      log.Printf("%v/%v(%v) -> %v:%v", rule.Host, path.Path, *path.PathType, path.Backend.Service.Name, path.Backend.Service.Port.Number)
      route.Handler(c.ProxyToService(path.Backend.Service.Name, path.Backend.Service.Port.Number))
    }
  }
}

func (c *Controller) remove(ing *v1.Ingress) {
  for _, rule := range ing.Spec.Rules {
    for _, path := range rule.IngressRuleValue.HTTP.Paths {
      c.router.Name(fmt.Sprintf("%v:%v", rule.Host, path.Path)).Handler(nil)
      log.Printf("%v/%v(%v) -> nil", rule.Host, path.Path, *path.PathType)
    }
  }
}

代理的逻辑

上面的代码中还缺一块儿,就是ProxyToService函数的定义。这个函数会返回一个http.Handler,用来处理请求,实际上就是发给其他的service

package controller

import (
  "context"
  "fmt"
  v1 "k8s.io/api/core/v1"
  metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  "net/http"
  "net/http/httputil"
  "net/url"
)

func (c Controller) ProxyToService(serviceName string, port int32) http.Handler {
  return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
    if svc, err := c.client.CoreV1().Services("default").Get(context.Background(), serviceName, metaV1.GetOptions{}); err != nil {
      http.Error(w, err.Error(), http.StatusInternalServerError)
      return
    } else {
      target, err := url.Parse(fmt.Sprintf("http://%v:%v", svc.Spec.ExternalName, port))
      if err != nil {
        http.Error(w, err.Error(), http.StatusInternalServerError)
        return
      }
      // Add proxy headers here
      if svc.Spec.Type == v1.ServiceTypeExternalName {
        r.Host = r.URL.Host
      }
      httputil.NewSingleHostReverseProxy(target).ServeHTTP(w, r)
    }
  })
}

这个时候我们已经可以用这段代码作为ingress controller了。我们可以apply下面这段定义:

kind: Service
apiVersion: v1
metadata:
  name: external-blog
spec:
  type: ExternalName
  externalName: blog.jeffthecoder.xyz
---
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
  name: minimal-ingress
  annotations:
    nginx.ingress.kubernetes.io/rewrite-target: /
spec:
  rules:
  - http:
      paths:
      - path: /
        pathType: Prefix
        backend:
          service:
            name: external-blog
            port:
              number: 80

访问http://127.0.0.1:1234已经可以访问到对应的服务了。

curl http://127.0.0.1:1234/ -v
*   Trying 127.0.0.1...
* TCP_NODELAY set
* Connected to 127.0.0.1 (127.0.0.1) port 1234 (#0)
> GET / HTTP/1.1
> Host: 127.0.0.1:1234
> User-Agent: curl/7.64.1
> Accept: */*
>
< HTTP/1.1 301 Moved Permanently
< Content-Length: 278
< Content-Type: text/html
< Date: Sat, 16 Jan 2021 17:45:41 GMT
< Eagleid: 74cf71a016108191416415202e
< Location: https://blog.jeffthecoder.xyz/
< Server: Tengine
< Timing-Allow-Origin: *
< Via: kunlun2.cn1241[,0]
<
<!DOCTYPE HTML PUBLIC "-//IETF//DTD HTML 2.0//EN">
<html>
<head><title>301 Moved Permanently</title></head>
<body bgcolor="white">
<h1>301 Moved Permanently</h1>
<p>The requested resource has been assigned a new permanent URI.</p>
<hr/>Powered by Tengine</body>
</html>
* Connection #0 to host 127.0.0.1 left intact
* Closing connection 0

剩下的就是补足代理的其他逻辑,或者我们可以把这部分逻辑改成配置其他的代理工具。

更新Status

K8S的大部分资源都是带有status的,我们也稍微更新一下Status吧。更新以前,我们的Ingress看起来是这样子的

$ kubectl get ingress
NAME              CLASS    HOSTS   ADDRESS      PORTS   AGE
minimal-ingress   <none>   *                    80      23m
$ kubectl get ingress minimal-ingress -o yaml
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
......
status:
  loadBalancer: {}

我们也没有什么特别的信息要更上去,只是作为演示,我们设置一下hostname:

// func (c *Controller) Run(ctx context.Context) error {
//   watcher, err := c.client.NetworkingV1().Ingresses("default").Watch(ctx, metav1.ListOptions{})
//   if err != nil {
//     return err
//   }
//   for {
//     select {
//     case event := <-watcher.ResultChan():
//       switch obj := event.Object.(type) {
//       case *v1.Ingress:
//         switch event.Type {
//         case watch.Added:
//           c.add(obj)
          obj.Status.LoadBalancer.Ingress = []networkv1.LoadBalancerIngress{
            {
              Hostname: "my-ingress",
            },
          }
          if _, err := c.client.NetworkingV1().Ingresses("default").UpdateStatus(ctx, obj, metav1.UpdateOptions{}); err != nil {
            log.Print("error while update ingress: ", err)
          }
//         case watch.Deleted:
//           c.remove(obj)
//         case watch.Modified:
//           c.remove(obj)
//           c.add(obj)
//         }
//       }
//     case <-ctx.Done():
//       return nil
//     }
//   }
// }

更新以后,就会展示:

$ kubectl get ingress
NAME              CLASS    HOSTS   ADDRESS      PORTS   AGE
minimal-ingress   <none>   *       my-ingress   80      23m
$ kubectl get ingress minimal-ingress -o yaml
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
  annotations:
    kubectl.kubernetes.io/last-applied-configuration: |
      {"apiVersion":"networking.k8s.io/v1","kind":"Ingress","metadata":{"annotations":{"nginx.ingress.kubernetes.io/rewrite-target":"/"},"name":"minimal-ingress","namespace":"default"},"spec":{"rules":[{"http":{"paths":[{"backend":{"service":{"name":"external-blog","port":{"number":80}}},"path":"/","pathType":"Prefix"}]}}]}}
    nginx.ingress.kubernetes.io/rewrite-target: /
  creationTimestamp: "2021-01-15T00:24:02Z"
  generation: 1
  managedFields:
  - apiVersion: networking.k8s.io/v1
    fieldsType: FieldsV1
    fieldsV1:
      f:metadata:
        f:annotations:
          .: {}
          f:kubectl.kubernetes.io/last-applied-configuration: {}
          f:nginx.ingress.kubernetes.io/rewrite-target: {}
      f:spec:
        f:rules: {}
    manager: kubectl-client-side-apply
    operation: Update
    time: "2021-01-15T00:24:02Z"
  - apiVersion: networking.k8s.io/v1
    fieldsType: FieldsV1
    fieldsV1:
      f:status:
        f:loadBalancer:
          f:ingress: {}
    manager: ___8go_build_demo_ingress_controller_cmd_controller
    operation: Update
    time: "2021-01-15T00:34:25Z"
  name: minimal-ingress
  namespace: default
  resourceVersion: "102482"
  uid: 5555e564-31a8-4f96-aeb5-1d8ab634848f
spec:
  rules:
  - http:
      paths:
      - backend:
          service:
            name: external-blog
            port:
              number: 80
        path: /
        pathType: Prefix
status:
  loadBalancer:
    ingress:
    - hostname: my-ingress

完整代码

Github: https://github.com/jeffguorg/demo-ingress-controller Gitee同步: https://gitee.com/jeffguorg/demo-ingress-controller