
Kubernetes | Custom Ingress Controller
上次我们提到说Ingress Controller就是一个外部可以访问到的Service,背后的Pod把流量代理到其他地方。提供服务的Pod可以监听Ingress这项资源,根据资源变动来修改代理配置,所以叫做Ingress Controller。
今天我们用Go写一个简单的Ingress Controller。仅供展示操作K8S内置API的方法,帮助理解Ingress Controller具体做了什么,并不具有在生产上使用的能力。
用到的SDK
k8s.io/client-go
k8s.io/client-go
是k8s主要用的Go-SDK,可以非常方便的和k8s里面自带的各个group中的API交互。
版本号有三大类,除了一个已经废弃的版本号记法以外,比较常用的版本号和K8S集群的关系是集群版本1.x.y
对应v0.x.y
。要记得go get k8s.io/client-go@1.x.y
。比如说我测试用的minikube集群,k8s版本是1.20.0,那么我对应的sdk版本,就应该用v0.20.0
。
Gorilla Mux
一个相对来说比较平衡的http路由。不多介绍了。
实现
我们今天要做的就是要写一个服务,根据Ingress配置自己的路径。可以在本地运行,也可以放到集群里面、配置Service后使用。
要解决的就是这么几个问题:
- 启动http服务
- 连接集群
- 根据Ingress配置自己
- 删除Ingress或者修改Ingress的时候删除路由规则
- 增加Ingress或者修改Ingress的时候增加路由
- 更新Ingress的Status
程序使用的参数
监听什么端口,指定使用的kubeconfig,如果不指定就当作是在集群里面运行。
var (
fListenAddr = flag.String("listen", envString("LISTEN_ADDR", ":1234"), "")
fKubeConfigPath = flag.String("kubeconfig", envString("KUBECONFIG", ""), "")
)
func main() {
flag.Parse()
}
// ...
func envString(k, d string) string {
if v, ok := os.LookupEnv(k); ok {
return v
}
return d
}
共用的变量
我们主要是需要在httpServer和Controller之间共用router mux。除此之外,因为要启动两个goroutine,所以最好是用一个errgroup。
func main() {
// ...
ctx := context.Background()
errGroup, ctx := errgroup.WithContext(ctx)
router := mux.NewRouter()
}
http服务
用router作为handler,启动一个服务。context用
func main() {
// ...
httpServer := http.Server{
Addr: *fListenAddr,
Handler: router,
}
errGroup.Go(func() error {
log.Println("listen on ", *fListenAddr)
return httpServer.ListenAndServe()
})
}
定义Controller
其实就是构建K8S的连接参数,和router一起保存到struct里面作为上下文。
type Controller struct {
client *kubernetes.Clientset
router *mux.Router
}
func New(kubeConfig string, router *mux.Router) (*Controller, error) {
config, err := clientcmd.BuildConfigFromFlags("", kubeConfig)
if err != nil {
return nil, err
}
clientSet, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, err
}
return &Controller{client: clientSet, router: router}, nil
}
启动Controller
func (c *Controller) Run(ctx context.Context) error {
watcher, err := c.client.NetworkingV1().Ingresses("default").Watch(ctx, metav1.ListOptions{})
if err != nil {
return err
}
for {
select {
case event := <-watcher.ResultChan():
// ...
case <-ctx.Done():
return nil
}
}
}
// ...
func main() {
// ...
errGroup.Go(func() error {
ingressController, err := controller.New(*fKubeConfigPath, router)
if err != nil {
return err
}
log.Println("watching for ingress")
return ingressController.Run(ctx)
})
}
然后main函数里面就只需要阻塞直到全部返回nil或者有一个goroutine返回error。
if err := errGroup.Wait(); err != nil {
log.Println(err)
}
处理消息
我们上面定义了Run函数,里面不停的处理Watcher的事件。我们写一个简单的逻辑来处理这些事件就行。
就是上面说的,删除或者修改的时候先删掉route,增加或者修改的时候加上route,这样就可以处理ingress的变化。
for {
select {
case event := <-watcher.ResultChan():
switch obj := event.Object.(type) {
case *v1.Ingress:
switch event.Type {
case watch.Added:
c.add(obj)
case watch.Deleted:
c.remove(obj)
case watch.Modified:
c.remove(obj)
c.add(obj)
}
}
case <-ctx.Done():
return nil
}
}
增加和删除的逻辑
这部分比较简单,无非就是一条条创建规则,每条规则都有一个名字,删除的时候设置这个名字的handler为nil(这里有点小问题,我没找到哪里删除route,设置为nil还是会影响性能)。
func (c *Controller) add(ing *v1.Ingress) {
for _, rule := range ing.Spec.Rules {
for _, path := range rule.IngressRuleValue.HTTP.Paths {
route := c.router.Name(fmt.Sprintf("%v:%v", rule.Host, path.Path))
if rule.Host != "" {
route = route.Host(rule.Host)
}
if path.PathType == nil || *path.PathType == v1.PathTypeImplementationSpecific || *path.PathType == v1.PathTypePrefix {
route = route.PathPrefix(path.Path)
} else {
route = route.Path(path.Path)
}
log.Printf("%v/%v(%v) -> %v:%v", rule.Host, path.Path, *path.PathType, path.Backend.Service.Name, path.Backend.Service.Port.Number)
route.Handler(c.ProxyToService(path.Backend.Service.Name, path.Backend.Service.Port.Number))
}
}
}
func (c *Controller) remove(ing *v1.Ingress) {
for _, rule := range ing.Spec.Rules {
for _, path := range rule.IngressRuleValue.HTTP.Paths {
c.router.Name(fmt.Sprintf("%v:%v", rule.Host, path.Path)).Handler(nil)
log.Printf("%v/%v(%v) -> nil", rule.Host, path.Path, *path.PathType)
}
}
}
代理的逻辑
上面的代码中还缺一块儿,就是ProxyToService函数的定义。这个函数会返回一个http.Handler,用来处理请求,实际上就是发给其他的service
package controller
import (
"context"
"fmt"
v1 "k8s.io/api/core/v1"
metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"net/http"
"net/http/httputil"
"net/url"
)
func (c Controller) ProxyToService(serviceName string, port int32) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if svc, err := c.client.CoreV1().Services("default").Get(context.Background(), serviceName, metaV1.GetOptions{}); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
} else {
target, err := url.Parse(fmt.Sprintf("http://%v:%v", svc.Spec.ExternalName, port))
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
// Add proxy headers here
if svc.Spec.Type == v1.ServiceTypeExternalName {
r.Host = r.URL.Host
}
httputil.NewSingleHostReverseProxy(target).ServeHTTP(w, r)
}
})
}
这个时候我们已经可以用这段代码作为ingress controller了。我们可以apply下面这段定义:
kind: Service
apiVersion: v1
metadata:
name: external-blog
spec:
type: ExternalName
externalName: blog.jeffthecoder.xyz
---
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: minimal-ingress
annotations:
nginx.ingress.kubernetes.io/rewrite-target: /
spec:
rules:
- http:
paths:
- path: /
pathType: Prefix
backend:
service:
name: external-blog
port:
number: 80
访问http://127.0.0.1:1234已经可以访问到对应的服务了。
curl http://127.0.0.1:1234/ -v
* Trying 127.0.0.1...
* TCP_NODELAY set
* Connected to 127.0.0.1 (127.0.0.1) port 1234 (#0)
> GET / HTTP/1.1
> Host: 127.0.0.1:1234
> User-Agent: curl/7.64.1
> Accept: */*
>
< HTTP/1.1 301 Moved Permanently
< Content-Length: 278
< Content-Type: text/html
< Date: Sat, 16 Jan 2021 17:45:41 GMT
< Eagleid: 74cf71a016108191416415202e
< Location: https://blog.jeffthecoder.xyz/
< Server: Tengine
< Timing-Allow-Origin: *
< Via: kunlun2.cn1241[,0]
<
<!DOCTYPE HTML PUBLIC "-//IETF//DTD HTML 2.0//EN">
<html>
<head><title>301 Moved Permanently</title></head>
<body bgcolor="white">
<h1>301 Moved Permanently</h1>
<p>The requested resource has been assigned a new permanent URI.</p>
<hr/>Powered by Tengine</body>
</html>
* Connection #0 to host 127.0.0.1 left intact
* Closing connection 0
剩下的就是补足代理的其他逻辑,或者我们可以把这部分逻辑改成配置其他的代理工具。
更新Status
K8S的大部分资源都是带有status的,我们也稍微更新一下Status吧。更新以前,我们的Ingress看起来是这样子的
$ kubectl get ingress
NAME CLASS HOSTS ADDRESS PORTS AGE
minimal-ingress <none> * 80 23m
$ kubectl get ingress minimal-ingress -o yaml
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
......
status:
loadBalancer: {}
我们也没有什么特别的信息要更上去,只是作为演示,我们设置一下hostname:
// func (c *Controller) Run(ctx context.Context) error {
// watcher, err := c.client.NetworkingV1().Ingresses("default").Watch(ctx, metav1.ListOptions{})
// if err != nil {
// return err
// }
// for {
// select {
// case event := <-watcher.ResultChan():
// switch obj := event.Object.(type) {
// case *v1.Ingress:
// switch event.Type {
// case watch.Added:
// c.add(obj)
obj.Status.LoadBalancer.Ingress = []networkv1.LoadBalancerIngress{
{
Hostname: "my-ingress",
},
}
if _, err := c.client.NetworkingV1().Ingresses("default").UpdateStatus(ctx, obj, metav1.UpdateOptions{}); err != nil {
log.Print("error while update ingress: ", err)
}
// case watch.Deleted:
// c.remove(obj)
// case watch.Modified:
// c.remove(obj)
// c.add(obj)
// }
// }
// case <-ctx.Done():
// return nil
// }
// }
// }
更新以后,就会展示:
$ kubectl get ingress
NAME CLASS HOSTS ADDRESS PORTS AGE
minimal-ingress <none> * my-ingress 80 23m
$ kubectl get ingress minimal-ingress -o yaml
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
annotations:
kubectl.kubernetes.io/last-applied-configuration: |
{"apiVersion":"networking.k8s.io/v1","kind":"Ingress","metadata":{"annotations":{"nginx.ingress.kubernetes.io/rewrite-target":"/"},"name":"minimal-ingress","namespace":"default"},"spec":{"rules":[{"http":{"paths":[{"backend":{"service":{"name":"external-blog","port":{"number":80}}},"path":"/","pathType":"Prefix"}]}}]}}
nginx.ingress.kubernetes.io/rewrite-target: /
creationTimestamp: "2021-01-15T00:24:02Z"
generation: 1
managedFields:
- apiVersion: networking.k8s.io/v1
fieldsType: FieldsV1
fieldsV1:
f:metadata:
f:annotations:
.: {}
f:kubectl.kubernetes.io/last-applied-configuration: {}
f:nginx.ingress.kubernetes.io/rewrite-target: {}
f:spec:
f:rules: {}
manager: kubectl-client-side-apply
operation: Update
time: "2021-01-15T00:24:02Z"
- apiVersion: networking.k8s.io/v1
fieldsType: FieldsV1
fieldsV1:
f:status:
f:loadBalancer:
f:ingress: {}
manager: ___8go_build_demo_ingress_controller_cmd_controller
operation: Update
time: "2021-01-15T00:34:25Z"
name: minimal-ingress
namespace: default
resourceVersion: "102482"
uid: 5555e564-31a8-4f96-aeb5-1d8ab634848f
spec:
rules:
- http:
paths:
- backend:
service:
name: external-blog
port:
number: 80
path: /
pathType: Prefix
status:
loadBalancer:
ingress:
- hostname: my-ingress
完整代码
Github: https://github.com/jeffguorg/demo-ingress-controller Gitee同步: https://gitee.com/jeffguorg/demo-ingress-controller