Kubernetes | Kubebuilder - Stream Ingress基础部分

2021 02 15, Mon

昨天提到Kubebuilder,但是这两天状态不大好,分成两篇写。这篇是上一篇的实现,记录一下实现一个简单的Controller的过程。

说起来Controller也叫Operator,像是操作员一样默默帮我们管理资源的状态。

设计

我们是希望可以有一个类型,可以一个资源对应Nginx Stream模块中的一个Server。在创建资源的时候可以填写协议、监听地址端口和上游,对应

  • 哪一个configmap(TCP?UDP?)
  • 地址端口
  • 上游在集群内

我们希望可以判断上游是否存在,但是考虑到上游可能在集群中也可能不在,我们就加一个资源的属性,用来判断是否需要检查Service。

如果这些属性都正常,我们的操作员才会继续操作,把这些信息写到ConfigMap里面,更新Service,并且更新自己的状态。

需求很简单,那我们

Let’s GO

安装Kubebuilder

最起码我们需要Golang,还需要Kubebuilder和Kustomize的二进制放在PATH里面。要验证结果还需要有一个K8S环境并且配置了kubeconfig。

限于篇幅并且没有必要,这里就不讲了。

Golang下载: https://golang.google.cn

Kubebuilder下载: https://github.com/kubernetes-sigs/kubebuilder/releases/latest

Kubebuilder下载好以后应该包含有开发用的所有二进制文件,主要是kubebuilder、etcd、kube-apiserver、kubectl。在终端里面直接执行kubebuilder没有问题以后我们继续。

% kubebuilder version
Version: version.Version{KubeBuilderVersion:"2.3.1", KubernetesVendor:"1.16.4", GitCommit:"8b53abeb4280186e494b726edf8f54ca7aa64a49", BuildDate:"2020-03-26T16:42:00Z", GoOs:"unknown", GoArch:"unknown"}

Note: 写博客的时候,kubebuilder出了3.x的测试版本,这些版本的releases没有打包其他的东西,需要的话可以自己准备。kubebuilder测试和自动化都需要这些二进制,单独的一个kubebuilder只能生成代码,没有办法测试和自动部署。

项目初始状态

项目一开始我们就是简简单单的需要一个空的go module的目录。kubebuilder需要

% go mod init github.com/jeffguorg/demo-stream-nginx-controller
go: creating new go.mod: module github.com/jeffguorg/demo-stream-nginx-controller
% cat go.mod 
module github.com/jeffguorg/demo-stream-nginx-controller

go 1.16

初始化项目

Kubebuilder提供了工具用来初始化项目结构,也是一条命令就好的

% kubebuilder init --domain jeffthecoder.xyz 
Writing scaffold for you to edit...
Get controller runtime:
$ go get sigs.k8s.io/controller-runtime@v0.5.0
Update go.mod:
$ go mod tidy
Running make:
$ make
/Users/guochao/.asdf/shims/controller-gen object:headerFile="hack/boilerplate.go.txt" paths="./..."
go fmt ./...
go vet ./...
go build -o bin/manager main.go
Next: define a resource with:
$ kubebuilder create api

kubebuilder会生成所有目录,然后go get下来需要的模块。

尝试运行一下

这个时候项目已经可以运行了,并且有一个自动编译的Makefile,生成资源的逻辑不需要自己手动执行。

% make run  
/Users/guochao/.asdf/shims/controller-gen object:headerFile="hack/boilerplate.go.txt" paths="./..."
go fmt ./...
go vet ./...
/Users/guochao/.asdf/shims/controller-gen "crd:trivialVersions=true" rbac:roleName=manager-role webhook paths="./..." output:crd:artifacts:config=config/crd/bases
go run ./main.go
2021-02-05T16:15:16.165+0800    INFO    controller-runtime.metrics      metrics server is starting to listen    {"addr": ":8080"}
2021-02-05T16:15:16.165+0800    INFO    setup   starting manager
2021-02-05T16:15:16.166+0800    INFO    controller-runtime.manager      starting metrics server {"path": "/metrics"}

有细微的差别没关系,毕竟每个人用的目录不一定一样。能看到starting metrics server {"path": "/metrics"}就算成了。

增加API

之前我们提过K8S里面,一类资源就是一个API。比如说一个类型,一个类型的实例,都是API。我们希望增加一个类型,实际上就是增加了一个API。在Kubebuilder中,我们初始化以后,kubebuilder也有提示Next: define a resource with: kubebuilder create api

那我们就建一个API。中间kubebuilder问我们要不要创建资源和要不要创建Controller,我们都选y试试看:

% kubebuilder create api --group nginx-stream --version v1beta1 --kind StreamIngress
Create Resource [y/n]
y
Create Controller [y/n]
y
Writing scaffold for you to edit...
api/v1beta1/streamingress_types.go
controllers/streamingress_controller.go
Running make:
$ make
/Users/guochao/.asdf/shims/controller-gen object:headerFile="hack/boilerplate.go.txt" paths="./..."
Error: go [list -e -json -compiled=true -test=false -export=false -deps=true -find=false -tags ignore_autogenerated -- ./...]: exit status 1: go: updates to go.mod needed; try 'go mod tidy' first

Usage:
  controller-gen [flags]

Examples:
        # Generate RBAC manifests and crds for all types under apis/,
        # outputting crds to /tmp/crds and everything else to stdout
        controller-gen rbac:roleName=<role name> crd paths=./apis/... output:crd:dir=/tmp/crds output:stdout

        # Generate deepcopy/runtime.Object implementations for a particular file
        controller-gen object paths=./apis/v1beta1/some_types.go

        # Generate OpenAPI v3 schemas for API packages and merge them into existing CRD manifests
        controller-gen schemapatch:manifests=./manifests output:dir=./manifests paths=./pkg/apis/... 

        # Run all the generators for a given project
        controller-gen paths=./apis/...

        # Explain the markers for generating CRDs, and their arguments
        controller-gen crd -ww


Flags:
  -h, --detailed-help count   print out more detailed help
                              (up to -hhh for the most detailed output, or -hhhh for json output)
      --help                  print out usage and a summary of options
      --version               show version
  -w, --which-markers count   print out all markers available with the requested generators
                              (up to -www for the most detailed output, or -wwww for json output)


Options


generators

+webhook                                                                                                  package  generates (partial) {Mutating,Validating}WebhookConfiguration objects.                        
+schemapatch:manifests=<string>[,maxDescLen=<int>]                                                        package  patches existing CRDs with new schemata.                                                      
+rbac:roleName=<string>                                                                                   package  generates ClusterRole objects.                                                                
+object[:headerFile=<string>][,year=<string>]                                                             package  generates code containing DeepCopy, DeepCopyInto, and DeepCopyObject method implementations.  
+crd[:crdVersions=<[]string>][,maxDescLen=<int>][,preserveUnknownFields=<bool>][,trivialVersions=<bool>]  package  generates CustomResourceDefinition objects.                                                   


generic

+paths=<[]string>  package  represents paths and go-style path patterns to use as package roots.  


output rules (optionally as output:<generator>:...)

+output:artifacts[:code=<string>],config=<string>  package  outputs artifacts to different locations, depending on whether they're package-associated or not.   
+output:dir=<string>                               package  outputs each artifact to the given directory, regardless of if it's package-associated or not.      
+output:none                                       package  skips outputting anything.                                                                          
+output:stdout                                     package  outputs everything to standard-out, with no separation.                                             

run `controller-gen object:headerFile=hack/boilerplate.go.txt paths=./... -w` to see all available markers, or `controller-gen object:headerFile=hack/boilerplate.go.txt paths=./... -h` for usage
make: *** [generate] Error 1

哟,报错了。不过看起来没有什么严重的,文件都已经写好了,只是make的时候发现go module的文件有点问题。那我们go mod tidy一下就好了

看看生成的代码

根据上面的提示,实际上Kubebuilder就是加了两个go文件,如果你用了Git,还会看到实际上还修改了config里面的诸多文件还有main.go和PROJECT这两个文件。

先看看新增的两个go文件都写了什么:

api/v1beta1/groupversion_info.go比较小一些,实际上有用的只有几行,记录了这一组API的版本信息。

/* ... */
// Package v1beta1 contains API Schema definitions for the nginx-stream v1beta1 API group
// +kubebuilder:object:generate=true
// +groupName=nginx-stream.jeffthecoder.xyz
package v1beta1

import (
	"k8s.io/apimachinery/pkg/runtime/schema"
	"sigs.k8s.io/controller-runtime/pkg/scheme"
)

var (
	// GroupVersion is group version used to register these objects
	GroupVersion = schema.GroupVersion{Group: "nginx-stream.jeffthecoder.xyz", Version: "v1beta1"}

	// SchemeBuilder is used to add go types to the GroupVersionKind scheme
	SchemeBuilder = &scheme.Builder{GroupVersion: GroupVersion}

	// AddToScheme adds the types in this group-version to the given scheme.
	AddToScheme = SchemeBuilder.AddToScheme
)

另一个文件是streamingress_types.go,其中最主要的是StreamIngress类型,定义了StreamIngress这个CRD会有哪些成员,比如说StreamIngressSpec包含了StreamIngress的spec部分,现在是一个包含了Foo Bar例子的脚手架。文件最后还包含了一个init,把类型注册到了SchemeBuilder中。之后我们要更新CRD类型,也是通过修改这个文件来进行。生成出来的代码如下:

package v1beta1

import (
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// EDIT THIS FILE!  THIS IS SCAFFOLDING FOR YOU TO OWN!
// NOTE: json tags are required.  Any new fields you add must have json tags for the fields to be serialized.

// StreamIngressSpec defines the desired state of StreamIngress
type StreamIngressSpec struct {
	// INSERT ADDITIONAL SPEC FIELDS - desired state of cluster
	// Important: Run "make" to regenerate code after modifying this file

	// Foo is an example field of StreamIngress. Edit StreamIngress_types.go to remove/update
	Foo string `json:"foo,omitempty"`
}

// StreamIngressStatus defines the observed state of StreamIngress
type StreamIngressStatus struct {
	// INSERT ADDITIONAL STATUS FIELD - define observed state of cluster
	// Important: Run "make" to regenerate code after modifying this file
}

// +kubebuilder:object:root=true

// StreamIngress is the Schema for the streamingresses API
type StreamIngress struct {
	metav1.TypeMeta   `json:",inline"`
	metav1.ObjectMeta `json:"metadata,omitempty"`

	Spec   StreamIngressSpec   `json:"spec,omitempty"`
	Status StreamIngressStatus `json:"status,omitempty"`
}

// +kubebuilder:object:root=true

// StreamIngressList contains a list of StreamIngress
type StreamIngressList struct {
	metav1.TypeMeta `json:",inline"`
	metav1.ListMeta `json:"metadata,omitempty"`
	Items           []StreamIngress `json:"items"`
}

func init() {
	SchemeBuilder.Register(&StreamIngress{}, &StreamIngressList{})
}

把生成的CRD安装到K8S里面

到目前为止我们还没有写一行代码。可以先把生成的CRD安装到K8S集群中试试看:

% make install                                   
/Users/guochao/.asdf/shims/controller-gen "crd:trivialVersions=true" rbac:roleName=manager-role webhook paths="./..." output:crd:artifacts:config=config/crd/bases
kustomize build config/crd | kubectl apply -f -
customresourcedefinition.apiextensions.k8s.io/streamingresses.nginx-stream.jeffthecoder.xyz created
% kubectl get crds
NAME                                            CREATED AT
streamingresses.nginx-stream.jeffthecoder.xyz   2021-02-06T13:55:33Z
% kubectl get crds streamingresses.nginx-stream.jeffthecoder.xyz -o yaml
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
  annotations:
    controller-gen.kubebuilder.io/version: v0.4.1
    kubectl.kubernetes.io/last-applied-configuration: |
      {"apiVersion":"apiextensions.k8s.io/v1","kind":"CustomResourceDefinition","metadata":{"annotations":{"controller-gen.kubebuilder.io/version":"v0.4.1"},"creationTimestamp":null,"name":"streamingresses.nginx-stream.jeffthecoder.xyz"},"spec":{"group":"nginx-stream.jeffthecoder.xyz","names":{"kind":"StreamIngress","listKind":"StreamIngressList","plural":"streamingresses","singular":"streamingress"},"scope":"Namespaced","versions":[{"name":"v1beta1","schema":{"openAPIV3Schema":{"description":"StreamIngress is the Schema for the streamingresses API","properties":{"apiVersion":{"description":"APIVersion defines the versioned schema of this representation of an object. Servers should convert recognized schemas to the latest internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources","type":"string"},"kind":{"description":"Kind is a string value representing the REST resource this object represents. Servers may infer this from the endpoint the client submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds","type":"string"},"metadata":{"type":"object"},"spec":{"description":"StreamIngressSpec defines the desired state of StreamIngress","properties":{"foo":{"description":"Foo is an example field of StreamIngress. Edit StreamIngress_types.go to remove/update","type":"string"}},"type":"object"},"status":{"description":"StreamIngressStatus defines the observed state of StreamIngress","type":"object"}},"type":"object"}},"served":true,"storage":true}]},"status":{"acceptedNames":{"kind":"","plural":""},"conditions":[],"storedVersions":[]}}
  creationTimestamp: "2021-02-06T13:55:33Z"
  generation: 1
  managedFields:
  - apiVersion: apiextensions.k8s.io/v1
    fieldsType: FieldsV1
    fieldsV1:
      f:status:
        f:acceptedNames:
          f:kind: {}
          f:listKind: {}
          f:plural: {}
          f:singular: {}
        f:conditions: {}
    manager: kube-apiserver
    operation: Update
    time: "2021-02-06T13:55:33Z"
  - apiVersion: apiextensions.k8s.io/v1
    fieldsType: FieldsV1
    fieldsV1:
      f:metadata:
        f:annotations:
          .: {}
          f:controller-gen.kubebuilder.io/version: {}
          f:kubectl.kubernetes.io/last-applied-configuration: {}
      f:spec:
        f:conversion:
          .: {}
          f:strategy: {}
        f:group: {}
        f:names:
          f:kind: {}
          f:listKind: {}
          f:plural: {}
          f:singular: {}
        f:scope: {}
        f:versions: {}
      f:status:
        f:storedVersions: {}
    manager: kubectl-client-side-apply
    operation: Update
    time: "2021-02-06T13:55:33Z"
  name: streamingresses.nginx-stream.jeffthecoder.xyz
  resourceVersion: "469"
  uid: 706dec4d-9ad3-4f09-b87d-c699ef11c661
spec:
  conversion:
    strategy: None
  group: nginx-stream.jeffthecoder.xyz
  names:
    kind: StreamIngress
    listKind: StreamIngressList
    plural: streamingresses
    singular: streamingress
  scope: Namespaced
  versions:
  - name: v1beta1
    schema:
      openAPIV3Schema:
        description: StreamIngress is the Schema for the streamingresses API
        properties:
          apiVersion:
            description: 'APIVersion defines the versioned schema of this representation
              of an object. Servers should convert recognized schemas to the latest
              internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources'
            type: string
          kind:
            description: 'Kind is a string value representing the REST resource this
              object represents. Servers may infer this from the endpoint the client
              submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds'
            type: string
          metadata:
            type: object
          spec:
            description: StreamIngressSpec defines the desired state of StreamIngress
            properties:
              foo:
                description: Foo is an example field of StreamIngress. Edit StreamIngress_types.go
                  to remove/update
                type: string
            type: object
          status:
            description: StreamIngressStatus defines the observed state of StreamIngress
            type: object
        type: object
    served: true
    storage: true
status:
  acceptedNames:
    kind: StreamIngress
    listKind: StreamIngressList
    plural: streamingresses
    singular: streamingress
  conditions:
  - lastTransitionTime: "2021-02-06T13:55:33Z"
    message: no conflicts found
    reason: NoConflicts
    status: "True"
    type: NamesAccepted
  - lastTransitionTime: "2021-02-06T13:55:33Z"
    message: the initial names have been accepted
    reason: InitialNamesAccepted
    status: "True"
    type: Established
  storedVersions:
  - v1beta1

可以看到实际上make install做了两件事情:

  • controller-gen生成资源
  • kustomize生成yaml并且应用到k8s中

我们可以用kustomize再生成一下yaml,然后和上面kubectl get的结果比较一下:

apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
  annotations:
    controller-gen.kubebuilder.io/version: v0.4.1
  creationTimestamp: null
  name: streamingresses.nginx-stream.jeffthecoder.xyz
spec:
  group: nginx-stream.jeffthecoder.xyz
  names:
    kind: StreamIngress
    listKind: StreamIngressList
    plural: streamingresses
    singular: streamingress
  scope: Namespaced
  versions:
  - name: v1beta1
    schema:
      openAPIV3Schema:
        description: StreamIngress is the Schema for the streamingresses API
        properties:
          apiVersion:
            description: 'APIVersion defines the versioned schema of this representation of an object. Servers should convert recognized schemas to the latest internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources'
            type: string
          kind:
            description: 'Kind is a string value representing the REST resource this object represents. Servers may infer this from the endpoint the client submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds'
            type: string
          metadata:
            type: object
          spec:
            description: StreamIngressSpec defines the desired state of StreamIngress
            properties:
              foo:
                description: Foo is an example field of StreamIngress. Edit StreamIngress_types.go to remove/update
                type: string
            type: object
          status:
            description: StreamIngressStatus defines the observed state of StreamIngress
            type: object
        type: object
    served: true
    storage: true
status:
  acceptedNames:
    kind: ""
    plural: ""
  conditions: []
  storedVersions: []

在kustomize生成的CRD中我们可以明显看出来,schema和streamingress_types.go中的类型是对应的。比如说spec中的foo对应StreamIngressSpec中的Foo成员

还修改了一些其他的文件

上面的步骤中,除了生成了streamingress类型有关的文件以外,还修改了一些其他的文件,略微提一下:

  • PROJECT: 记录了一下资源:
    resources:
    - group: nginx-stream
      kind: StreamIngress
      version: v1beta1
    
  • configs/: 在kubebuilder create api的过程中,kubebuilder会在configs中增加一些文件,主要是根据类型生成kustomize的配置,用来生成具体的yaml,其中包括crd的定义、rbac的定义以及StreaIngress的一个样例实例。
  • controllers/: 在controllers目录中增加了一个StreamIngressReconciler类型,这个类型主要就是负责监视和迁移StreamIngress的状态的。
  • main.go: 修改了main函数,在main函数中,为Manager注册了StreamIngressReconciler

修改类型

这个Foo Bar样例其实不大满足我们的需求。我们可以根据需求修改一下:

  • 哪一个configmap(TCP?UDP?): Protocol(默认TCP)
  • 地址端口
    • 地址(必需)
    • 端口(必需)
  • 上游在集群内(默认True)

对应的streamingress_types.go中,StreamIngressSpec就需要改一下:

// StreamAddress defines the address for a stream server or upstream
type StreamAddress struct {
	Namespace string `json:"namespace,omitempty"`
	Service   string `json:"service"`
	Port      uint   `json:"port"`
}

// StreamIngressSpec defines the desired state of StreamIngress
type StreamIngressSpec struct {
	// INSERT ADDITIONAL SPEC FIELDS - desired state of cluster
	// Important: Run "make" to regenerate code after modifying this file

	Protocol string `json:"protocol,omitempty"`
	Listen   uint   `json:"listen"`

	Upstream  StreamAddress `json:"upstream"`
	IsService bool          `json:"is_service,omitempty"`
}

再次make install

我们再次生成crd并且安装的时候,应该就可以看到config/crd/bases/nginx-stream.jeffthecoder.xyz_streamingresses.yaml被修改了:

% make install
/Users/guochao/.asdf/shims/controller-gen "crd:trivialVersions=true" rbac:roleName=manager-role webhook paths="./..." output:crd:artifacts:config=config/crd/bases
kustomize build config/crd | kubectl apply -f -
customresourcedefinition.apiextensions.k8s.io/streamingresses.nginx-stream.jeffthecoder.xyz configured
% git diff
diff --git a/config/crd/bases/nginx-stream.jeffthecoder.xyz_streamingresses.yaml b/config/crd/bases/nginx-stream.jeffthecoder.xyz_streamingresses.yaml
index ff12f1e..1c21c6a 100644
--- a/config/crd/bases/nginx-stream.jeffthecoder.xyz_streamingresses.yaml
+++ b/config/crd/bases/nginx-stream.jeffthecoder.xyz_streamingresses.yaml
@@ -36,10 +36,38 @@ spec:
           spec:
             description: StreamIngressSpec defines the desired state of StreamIngress
             properties:
-              foo:
-                description: Foo is an example field of StreamIngress. Edit StreamIngress_types.go
-                  to remove/update
+              is_service:
+                type: boolean
+              listen:
+                type: integer
+              protocol:
                 type: string
+              upstream:
+                description: StreamAddress defines the address for a stream server
+                  or upstream
+                properties:
+                  host:
+                    type: string
+                  port:
+                    type: integer
+                required:
+                - host
+                - port
+                type: object
+            required:
+            - listen
+            - protocol
+            - upstream
             type: object
           status:
             description: StreamIngressStatus defines the observed state of StreamIngress

创建一个StreamIngress试试看

上面我们提到kubebuilder建了一个StreaIngress的一个样例实例,我们可以试着把这个样例apply进去试试看:

% kubectl apply -f config/samples/nginx-stream_v1beta1_streamingress.yaml
error: error validating "config/samples/nginx-stream_v1beta1_streamingress.yaml": error validating data: [ValidationError(StreamIngress.spec): unknown field "foo" in xyz.jeffthecoder.nginx-stream.v1beta1.StreamIngress.spec, ValidationError(StreamIngress.spec): missing required field "listen" in xyz.jeffthecoder.nginx-stream.v1beta1.StreamIngress.spec, ValidationError(StreamIngress.spec): missing required field "protocol" in xyz.jeffthecoder.nginx-stream.v1beta1.StreamIngress.spec, ValidationError(StreamIngress.spec): missing required field "upstream" in xyz.jeffthecoder.nginx-stream.v1beta1.StreamIngress.spec]; if you choose to ignore these errors, turn validation off with --validate=false

诶呀,出错了。看提示是说validation error,foo不知道是什么东西,listen protocol和upstream这三个需要填写进去。这是crd的schema起作用了。到这一步我获得的crd定义如下:

apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
  annotations:
    controller-gen.kubebuilder.io/version: v0.4.1
  creationTimestamp: null
  name: streamingresses.nginx-stream.jeffthecoder.xyz
spec:
  group: nginx-stream.jeffthecoder.xyz
  names:
    kind: StreamIngress
    listKind: StreamIngressList
    plural: streamingresses
    singular: streamingress
  scope: Namespaced
  versions:
  - name: v1beta1
    schema:
      openAPIV3Schema:
        description: StreamIngress is the Schema for the streamingresses API
        properties:
          apiVersion:
            description: 'APIVersion defines the versioned schema of this representation
              of an object. Servers should convert recognized schemas to the latest
              internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources'
            type: string
          kind:
            description: 'Kind is a string value representing the REST resource this
              object represents. Servers may infer this from the endpoint the client
              submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds'
            type: string
          metadata:
            type: object
          spec:
            description: StreamIngressSpec defines the desired state of StreamIngress
            properties:
              is_service:
                type: boolean
              listen:
                type: integer
              protocol:
                type: string
              upstream:
                description: StreamAddress defines the address for a stream server
                  or upstream
                properties:
                  host:
                    type: string
                  port:
                    type: integer
                required:
                - host
                - port
                type: object
            required:
            - listen
            - upstream
            type: object
          status:
            description: StreamIngressStatus defines the observed state of StreamIngress
            type: object
        type: object
    served: true
    storage: true
status:
  acceptedNames:
    kind: ""
    plural: ""
  conditions: []
  storedVersions: []

其中versions下面的各个版本,每个都有自己的schema.openAPIV3Schema,用来定义有什么东西和需要什么东西,在apply的时候k8s会根据这些定义来验证数据有效性,避免出错。

那么我们就定义一个符合schema的、有效的定义吧:

apiVersion: nginx-stream.jeffthecoder.xyz/v1beta1
kind: StreamIngress
metadata:
  name: streamingress-sample
spec:
  # Add fields here
  listen: 8080
  upstream:
    host: some-tcp-port
    port: 1234

这一次我们再apply的时候就没有问题了:

% kubectl apply -f config/samples/nginx-stream_v1beta1_streamingress.yaml
streamingress.nginx-stream.jeffthecoder.xyz/streamingress-sample created

让StreamIngress生效

现在我们有了StreamIngress,我们需要让它生效。这个时候我们就需要修改controller的部分了:

原始的代码在controllers/streamingress_controller.go

package controllers

import (
	"context"

	"github.com/go-logr/logr"
	"k8s.io/apimachinery/pkg/runtime"
	ctrl "sigs.k8s.io/controller-runtime"
	"sigs.k8s.io/controller-runtime/pkg/client"

	nginxstreamv1beta1 "github.com/jeffguorg/demo-stream-nginx-controller/api/v1beta1"
)

// StreamIngressReconciler reconciles a StreamIngress object
type StreamIngressReconciler struct {
	client.Client
	Log    logr.Logger
	Scheme *runtime.Scheme
}

// +kubebuilder:rbac:groups=nginx-stream.jeffthecoder.xyz,resources=streamingresses,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=nginx-stream.jeffthecoder.xyz,resources=streamingresses/status,verbs=get;update;patch

func (r *StreamIngressReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
	_ = context.Background()
	_ = r.Log.WithValues("streamingress", req.NamespacedName)

	// your logic here


	return ctrl.Result{}, nil
}

func (r *StreamIngressReconciler) SetupWithManager(mgr ctrl.Manager) error {
	return ctrl.NewControllerManagedBy(mgr).
		For(&nginxstreamv1beta1.StreamIngress{}).
		Complete(r)
}

其中核心的类型就是StreamIngressReconciler,有两个函数,一个是Reconcile,每次Controller运行时判断需要检查或者梳理资源状态,就会由运行时调用这个函数,另一个是SetupWithManager,主要是在启动的时候在运行时中注册StreamIngressReconciler。上面有几行注释,主要是供controller-gen参考来生成各种资源或者规则的。

我们可以在Reconcile中获取修改各种资源的状态,比如说:

func (r *StreamIngressReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
	ctx := context.Background()
	logger := r.Log.WithValues("streamingress", req.NamespacedName)

	// your logic here
	var (
		requestIngress v1beta1.StreamIngress
	)
	if err := r.Get(ctx, client.ObjectKey{Namespace: req.Namespace, Name: req.Name}, &requestIngress); err != nil {
		logger.Error(err, "failed to list stream ingress")
		return ctrl.Result{RequeueAfter: time.Second * 5}, err
	} else {
		logger.WithValues("request ingress", requestIngress)
	}

	return ctrl.Result{}, nil
}

我们就会获取到当前在reconcile的对象。

按照我们之前梳理的逻辑,我们需要在这里

  • 检查StreamIngress有没有什么问题
  • 更新Service,把端口暴露出来
  • 更新ConfigMap,告诉Nginx这个端口需要代理

代码大致如下:

// package controllers

// import (
// 	"context"
// 	"fmt"
// 	"strconv"
// 	"time"

// 	"github.com/go-logr/logr"
// 	v1 "k8s.io/api/core/v1"
// 	"k8s.io/apimachinery/pkg/runtime"
// 	"k8s.io/apimachinery/pkg/util/intstr"
// 	ctrl "sigs.k8s.io/controller-runtime"
// 	"sigs.k8s.io/controller-runtime/pkg/client"

// 	"github.com/jeffguorg/demo-stream-nginx-controller/api/v1beta1"
// 	nginxstreamv1beta1 "github.com/jeffguorg/demo-stream-nginx-controller/api/v1beta1"
// )

// // StreamIngressReconciler reconciles a StreamIngress object
// type StreamIngressReconciler struct {
// 	client.Client
// 	Log    logr.Logger
// 	Scheme *runtime.Scheme

	TCPConfigMapKey client.ObjectKey
	UDPConfigMapKey client.ObjectKey
	ServiceKey      client.ObjectKey
// }

// // +kubebuilder:rbac:groups=nginx-stream.jeffthecoder.xyz,resources=streamingresses,verbs=get;list;watch;create;update;patch;delete
// // +kubebuilder:rbac:groups=nginx-stream.jeffthecoder.xyz,resources=streamingresses/status,verbs=get;update;patch

// func (r *StreamIngressReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
	ctx := context.Background()
	logger := r.Log.WithValues("streamingress", req.NamespacedName)

	// fetch stream ingress and validate
	var (
		requestIngress v1beta1.StreamIngress
		ingressList    v1beta1.StreamIngressList
	)
	if err := r.Get(ctx, client.ObjectKey{Namespace: req.Namespace, Name: req.Name}, &requestIngress); err != nil {
		logger.Error(err, "failed to list stream ingress")
		return ctrl.Result{RequeueAfter: time.Second * 5}, err
	}

	if err := r.List(ctx, &ingressList, client.MatchingFields{
		".spec.listen":   strconv.FormatUint(uint64(requestIngress.Spec.Listen), 10),
		".spec.protocol": requestIngress.Spec.Protocol,
	}); err != nil {
		logger.Error(err, "failed to list stream ingress")
		return ctrl.Result{RequeueAfter: time.Second * 5}, err
	}

	if len(ingressList.Items) > 1 {
		logger.WithValues("count", len(ingressList.Items)).Info("multiple listen on one port")
		return ctrl.Result{RequeueAfter: time.Second * 5}, nil
	}

	// check if upstream service exists
	var (
		upstreamService           v1.Service
		upstreamServicePortIsOpen = false
	)
	if err := r.Get(ctx, client.ObjectKey{Namespace: requestIngress.Spec.Upstream.Namespace, Name: requestIngress.Spec.Upstream.Service}, &upstreamService); err != nil {
		logger.WithValues("service", fmt.Sprintf("%v/%v", requestIngress.Spec.Upstream.Namespace, requestIngress.Spec.Upstream.Service)).Info("unable to get service")
		return ctrl.Result{RequeueAfter: time.Second * 5}, nil
	}
	for _, port := range upstreamService.Spec.Ports {
		if port.Port == int32(requestIngress.Spec.Upstream.Port) {
			upstreamServicePortIsOpen = true
		}
	}
	if !upstreamServicePortIsOpen {
		logger.WithValues("upstream-port", requestIngress.Spec.Upstream.Port).Info("upstream service is not open")
		return ctrl.Result{RequeueAfter: time.Second * 5}, nil
	}

	// fetch service and config map to update
	var (
		service   v1.Service
		configMap v1.ConfigMap

		portIsAlreadyOnService = false
	)
	if err := r.Get(ctx, r.ServiceKey, &service); err != nil {
		logger.Error(err, "failed to get service")
		return ctrl.Result{RequeueAfter: time.Second * 5}, err
	}
	if requestIngress.Spec.Protocol == string(v1.ProtocolUDP) {
		if err := r.Get(ctx, r.UDPConfigMapKey, &configMap); err != nil {
			logger.Error(err, "failed to list stream ingress")
			return ctrl.Result{RequeueAfter: time.Second * 5}, err
		}
	} else {
		if err := r.Get(ctx, r.TCPConfigMapKey, &configMap); err != nil {
			logger.Error(err, "failed to list stream ingress")
			return ctrl.Result{RequeueAfter: time.Second * 5}, err
		}
	}

	// check if port is opened and open it if needed
	for idx, port := range service.Spec.Ports {
		if port.Protocol == v1.Protocol(requestIngress.Spec.Protocol) && port.Port == int32(requestIngress.Spec.Listen) {
			service.Spec.Ports[idx].TargetPort = intstr.FromInt(int(requestIngress.Spec.Listen))
			portIsAlreadyOnService = true
		}
	}
	if !portIsAlreadyOnService {
		service.Spec.Ports = append(service.Spec.Ports, v1.ServicePort{
			Protocol:   v1.Protocol(requestIngress.Spec.Protocol),
			Port:       int32(requestIngress.Spec.Listen),
			TargetPort: intstr.FromInt(int(requestIngress.Spec.Listen)),
		})
	}
	if err := r.Update(ctx, &service); err != nil {
		logger.Error(err, "failed to update service")
		return ctrl.Result{RequeueAfter: time.Second * 5}, err
	}

	// update configmap
	configMap.Data[strconv.FormatInt(int64(requestIngress.Spec.Listen), 10)] = fmt.Sprintf("%v/%v:%v", requestIngress.Spec.Upstream.Namespace, requestIngress.Spec.Upstream.Service, requestIngress.Spec.Upstream.Port)
	if err := r.Update(ctx, &configMap); err != nil {
		logger.Error(err, "failed to update configMap")
		return ctrl.Result{RequeueAfter: time.Second * 5}, err
	}

  // ok
	return ctrl.Result{}, nil
// }

// func (r *StreamIngressReconciler) SetupWithManager(mgr ctrl.Manager) error {
// 	return ctrl.NewControllerManagedBy(mgr).
// 		For(&nginxstreamv1beta1.StreamIngress{}).
// 		Complete(r)
// }

大致来说就是检查StreamIngress之间有没有冲突,上游Service存在不存在,Service上端口有没有打开,然后更新Service更新ConfigMap就好了。

修改结束以后我们直接make run一下,会得到错误Index with name field:listen does not exist

这是因为每次列举各种东西的时候都请求一次apiserver很慢,也不明智,所以controller运行时在本地有缓存。我们需要在程序初始化的时候设置一下缓存,改一下main.go,顺便加上flag:

// package main

// import (
// 	"flag"
	"fmt"
// 	"os"

// 	"k8s.io/apimachinery/pkg/runtime"
// 	clientgoscheme "k8s.io/client-go/kubernetes/scheme"
// 	_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
// 	ctrl "sigs.k8s.io/controller-runtime"
// 	"sigs.k8s.io/controller-runtime/pkg/client"
// 	"sigs.k8s.io/controller-runtime/pkg/log/zap"

// 	nginxstreamv1beta1 "github.com/jeffguorg/demo-stream-nginx-controller/api/v1beta1"
// 	"github.com/jeffguorg/demo-stream-nginx-controller/controllers"
// 	// +kubebuilder:scaffold:imports
// )

// var (
// 	scheme   = runtime.NewScheme()
// 	setupLog = ctrl.Log.WithName("setup")
// )

// func init() {
// 	_ = clientgoscheme.AddToScheme(scheme)

// 	_ = nginxstreamv1beta1.AddToScheme(scheme)
// 	// +kubebuilder:scaffold:scheme
// }

// func main() {
// 	var metricsAddr string
// 	var enableLeaderElection bool

	var (
		tcpConfigMapName   string
		udpConfigMapName   string
		ingressServiceName string
		ingressNamespace   string
	)

	// flag.StringVar(&metricsAddr, "metrics-addr", ":8080", "The address the metric endpoint binds to.")
	// flag.BoolVar(&enableLeaderElection, "enable-leader-election", false,
	// 	"Enable leader election for controller manager. "+
	// 		"Enabling this will ensure there is only one active controller manager.")
	// flag.Parse()

	// ctrl.SetLogger(zap.New(zap.UseDevMode(true)))

	// mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
	// 	Scheme:             scheme,
	// 	MetricsBindAddress: metricsAddr,
	// 	Port:               9443,
	// 	LeaderElection:     enableLeaderElection,
	// 	LeaderElectionID:   "3c21781e.jeffthecoder.xyz",
	// })
	// if err != nil {
	// 	setupLog.Error(err, "unable to start manager")
	// 	os.Exit(1)
	// }

	cache := mgr.GetCache()
	cache.IndexField(&nginxstreamv1beta1.StreamIngress{}, "listen", func(o runtime.Object) []string {
		switch o := o.(type) {
		case *nginxstreamv1beta1.StreamIngress:
			return []string{fmt.Sprintf("%v/%v", o.Spec.Protocol, o.Spec.Listen)}
		}
		return []string{}
	})

	// if err = (&controllers.StreamIngressReconciler{
	// 	Client: mgr.GetClient(),
	// 	Log:    ctrl.Log.WithName("controllers").WithName("StreamIngress"),
	// 	Scheme: mgr.GetScheme(),

		TCPConfigMapKey: client.ObjectKey{Namespace: ingressNamespace, Name: tcpConfigMapName},
		UDPConfigMapKey: client.ObjectKey{Namespace: ingressNamespace, Name: udpConfigMapName},
		ServiceKey:      client.ObjectKey{Namespace: ingressNamespace, Name: ingressServiceName},
	// }).SetupWithManager(mgr); err != nil {
	// 	setupLog.Error(err, "unable to create controller", "controller", "StreamIngress")
	// 	os.Exit(1)
	// }
	// // +kubebuilder:scaffold:builder

	// setupLog.Info("starting manager")
	// if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
	// 	setupLog.Error(err, "problem running manager")
	// 	os.Exit(1)
	// }
}

基础版controller大概就是这样了。

https://github.com/jeffguorg/demo-stream-nginx-controller/tree/7ecf7d0ebf4ee69fd97e70f3b404928fcb66414b