
Kubernetes | Kubebuilder - Stream Ingress基础部分
昨天提到Kubebuilder,但是这两天状态不大好,分成两篇写。这篇是上一篇的实现,记录一下实现一个简单的Controller的过程。
说起来Controller也叫Operator,像是操作员一样默默帮我们管理资源的状态。
设计
我们是希望可以有一个类型,可以一个资源对应Nginx Stream模块中的一个Server。在创建资源的时候可以填写协议、监听地址端口和上游,对应
- 哪一个configmap(TCP?UDP?)
- 地址端口
- 上游在集群内
我们希望可以判断上游是否存在,但是考虑到上游可能在集群中也可能不在,我们就加一个资源的属性,用来判断是否需要检查Service。
如果这些属性都正常,我们的操作员才会继续操作,把这些信息写到ConfigMap里面,更新Service,并且更新自己的状态。
需求很简单,那我们
Let’s GO
安装Kubebuilder
最起码我们需要Golang,还需要Kubebuilder和Kustomize的二进制放在PATH里面。要验证结果还需要有一个K8S环境并且配置了kubeconfig。
限于篇幅并且没有必要,这里就不讲了。
Golang下载: https://golang.google.cn
Kubebuilder下载: https://github.com/kubernetes-sigs/kubebuilder/releases/latest
Kubebuilder下载好以后应该包含有开发用的所有二进制文件,主要是kubebuilder、etcd、kube-apiserver、kubectl。在终端里面直接执行kubebuilder没有问题以后我们继续。
% kubebuilder version
Version: version.Version{KubeBuilderVersion:"2.3.1", KubernetesVendor:"1.16.4", GitCommit:"8b53abeb4280186e494b726edf8f54ca7aa64a49", BuildDate:"2020-03-26T16:42:00Z", GoOs:"unknown", GoArch:"unknown"}
Note: 写博客的时候,kubebuilder出了3.x的测试版本,这些版本的releases没有打包其他的东西,需要的话可以自己准备。kubebuilder测试和自动化都需要这些二进制,单独的一个kubebuilder只能生成代码,没有办法测试和自动部署。
项目初始状态
项目一开始我们就是简简单单的需要一个空的go module的目录。kubebuilder需要
% go mod init github.com/jeffguorg/demo-stream-nginx-controller
go: creating new go.mod: module github.com/jeffguorg/demo-stream-nginx-controller
% cat go.mod
module github.com/jeffguorg/demo-stream-nginx-controller
go 1.16
初始化项目
Kubebuilder提供了工具用来初始化项目结构,也是一条命令就好的
% kubebuilder init --domain jeffthecoder.xyz
Writing scaffold for you to edit...
Get controller runtime:
$ go get sigs.k8s.io/controller-runtime@v0.5.0
Update go.mod:
$ go mod tidy
Running make:
$ make
/Users/guochao/.asdf/shims/controller-gen object:headerFile="hack/boilerplate.go.txt" paths="./..."
go fmt ./...
go vet ./...
go build -o bin/manager main.go
Next: define a resource with:
$ kubebuilder create api
kubebuilder会生成所有目录,然后go get下来需要的模块。
尝试运行一下
这个时候项目已经可以运行了,并且有一个自动编译的Makefile,生成资源的逻辑不需要自己手动执行。
% make run
/Users/guochao/.asdf/shims/controller-gen object:headerFile="hack/boilerplate.go.txt" paths="./..."
go fmt ./...
go vet ./...
/Users/guochao/.asdf/shims/controller-gen "crd:trivialVersions=true" rbac:roleName=manager-role webhook paths="./..." output:crd:artifacts:config=config/crd/bases
go run ./main.go
2021-02-05T16:15:16.165+0800 INFO controller-runtime.metrics metrics server is starting to listen {"addr": ":8080"}
2021-02-05T16:15:16.165+0800 INFO setup starting manager
2021-02-05T16:15:16.166+0800 INFO controller-runtime.manager starting metrics server {"path": "/metrics"}
有细微的差别没关系,毕竟每个人用的目录不一定一样。能看到starting metrics server {"path": "/metrics"}
就算成了。
增加API
之前我们提过K8S里面,一类资源就是一个API。比如说一个类型,一个类型的实例,都是API。我们希望增加一个类型,实际上就是增加了一个API。在Kubebuilder中,我们初始化以后,kubebuilder也有提示Next: define a resource with: kubebuilder create api
。
那我们就建一个API。中间kubebuilder问我们要不要创建资源和要不要创建Controller,我们都选y试试看:
% kubebuilder create api --group nginx-stream --version v1beta1 --kind StreamIngress
Create Resource [y/n]
y
Create Controller [y/n]
y
Writing scaffold for you to edit...
api/v1beta1/streamingress_types.go
controllers/streamingress_controller.go
Running make:
$ make
/Users/guochao/.asdf/shims/controller-gen object:headerFile="hack/boilerplate.go.txt" paths="./..."
Error: go [list -e -json -compiled=true -test=false -export=false -deps=true -find=false -tags ignore_autogenerated -- ./...]: exit status 1: go: updates to go.mod needed; try 'go mod tidy' first
Usage:
controller-gen [flags]
Examples:
# Generate RBAC manifests and crds for all types under apis/,
# outputting crds to /tmp/crds and everything else to stdout
controller-gen rbac:roleName=<role name> crd paths=./apis/... output:crd:dir=/tmp/crds output:stdout
# Generate deepcopy/runtime.Object implementations for a particular file
controller-gen object paths=./apis/v1beta1/some_types.go
# Generate OpenAPI v3 schemas for API packages and merge them into existing CRD manifests
controller-gen schemapatch:manifests=./manifests output:dir=./manifests paths=./pkg/apis/...
# Run all the generators for a given project
controller-gen paths=./apis/...
# Explain the markers for generating CRDs, and their arguments
controller-gen crd -ww
Flags:
-h, --detailed-help count print out more detailed help
(up to -hhh for the most detailed output, or -hhhh for json output)
--help print out usage and a summary of options
--version show version
-w, --which-markers count print out all markers available with the requested generators
(up to -www for the most detailed output, or -wwww for json output)
Options
generators
+webhook package generates (partial) {Mutating,Validating}WebhookConfiguration objects.
+schemapatch:manifests=<string>[,maxDescLen=<int>] package patches existing CRDs with new schemata.
+rbac:roleName=<string> package generates ClusterRole objects.
+object[:headerFile=<string>][,year=<string>] package generates code containing DeepCopy, DeepCopyInto, and DeepCopyObject method implementations.
+crd[:crdVersions=<[]string>][,maxDescLen=<int>][,preserveUnknownFields=<bool>][,trivialVersions=<bool>] package generates CustomResourceDefinition objects.
generic
+paths=<[]string> package represents paths and go-style path patterns to use as package roots.
output rules (optionally as output:<generator>:...)
+output:artifacts[:code=<string>],config=<string> package outputs artifacts to different locations, depending on whether they're package-associated or not.
+output:dir=<string> package outputs each artifact to the given directory, regardless of if it's package-associated or not.
+output:none package skips outputting anything.
+output:stdout package outputs everything to standard-out, with no separation.
run `controller-gen object:headerFile=hack/boilerplate.go.txt paths=./... -w` to see all available markers, or `controller-gen object:headerFile=hack/boilerplate.go.txt paths=./... -h` for usage
make: *** [generate] Error 1
哟,报错了。不过看起来没有什么严重的,文件都已经写好了,只是make的时候发现go module的文件有点问题。那我们go mod tidy一下就好了
看看生成的代码
根据上面的提示,实际上Kubebuilder就是加了两个go文件,如果你用了Git,还会看到实际上还修改了config里面的诸多文件还有main.go和PROJECT这两个文件。
先看看新增的两个go文件都写了什么:
api/v1beta1/groupversion_info.go
比较小一些,实际上有用的只有几行,记录了这一组API的版本信息。
/* ... */
// Package v1beta1 contains API Schema definitions for the nginx-stream v1beta1 API group
// +kubebuilder:object:generate=true
// +groupName=nginx-stream.jeffthecoder.xyz
package v1beta1
import (
"k8s.io/apimachinery/pkg/runtime/schema"
"sigs.k8s.io/controller-runtime/pkg/scheme"
)
var (
// GroupVersion is group version used to register these objects
GroupVersion = schema.GroupVersion{Group: "nginx-stream.jeffthecoder.xyz", Version: "v1beta1"}
// SchemeBuilder is used to add go types to the GroupVersionKind scheme
SchemeBuilder = &scheme.Builder{GroupVersion: GroupVersion}
// AddToScheme adds the types in this group-version to the given scheme.
AddToScheme = SchemeBuilder.AddToScheme
)
另一个文件是streamingress_types.go
,其中最主要的是StreamIngress
类型,定义了StreamIngress这个CRD会有哪些成员,比如说StreamIngressSpec
包含了StreamIngress
的spec部分,现在是一个包含了Foo Bar例子的脚手架。文件最后还包含了一个init,把类型注册到了SchemeBuilder
中。之后我们要更新CRD类型,也是通过修改这个文件来进行。生成出来的代码如下:
package v1beta1
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// EDIT THIS FILE! THIS IS SCAFFOLDING FOR YOU TO OWN!
// NOTE: json tags are required. Any new fields you add must have json tags for the fields to be serialized.
// StreamIngressSpec defines the desired state of StreamIngress
type StreamIngressSpec struct {
// INSERT ADDITIONAL SPEC FIELDS - desired state of cluster
// Important: Run "make" to regenerate code after modifying this file
// Foo is an example field of StreamIngress. Edit StreamIngress_types.go to remove/update
Foo string `json:"foo,omitempty"`
}
// StreamIngressStatus defines the observed state of StreamIngress
type StreamIngressStatus struct {
// INSERT ADDITIONAL STATUS FIELD - define observed state of cluster
// Important: Run "make" to regenerate code after modifying this file
}
// +kubebuilder:object:root=true
// StreamIngress is the Schema for the streamingresses API
type StreamIngress struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`
Spec StreamIngressSpec `json:"spec,omitempty"`
Status StreamIngressStatus `json:"status,omitempty"`
}
// +kubebuilder:object:root=true
// StreamIngressList contains a list of StreamIngress
type StreamIngressList struct {
metav1.TypeMeta `json:",inline"`
metav1.ListMeta `json:"metadata,omitempty"`
Items []StreamIngress `json:"items"`
}
func init() {
SchemeBuilder.Register(&StreamIngress{}, &StreamIngressList{})
}
把生成的CRD安装到K8S里面
到目前为止我们还没有写一行代码。可以先把生成的CRD安装到K8S集群中试试看:
% make install
/Users/guochao/.asdf/shims/controller-gen "crd:trivialVersions=true" rbac:roleName=manager-role webhook paths="./..." output:crd:artifacts:config=config/crd/bases
kustomize build config/crd | kubectl apply -f -
customresourcedefinition.apiextensions.k8s.io/streamingresses.nginx-stream.jeffthecoder.xyz created
% kubectl get crds
NAME CREATED AT
streamingresses.nginx-stream.jeffthecoder.xyz 2021-02-06T13:55:33Z
% kubectl get crds streamingresses.nginx-stream.jeffthecoder.xyz -o yaml
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
annotations:
controller-gen.kubebuilder.io/version: v0.4.1
kubectl.kubernetes.io/last-applied-configuration: |
{"apiVersion":"apiextensions.k8s.io/v1","kind":"CustomResourceDefinition","metadata":{"annotations":{"controller-gen.kubebuilder.io/version":"v0.4.1"},"creationTimestamp":null,"name":"streamingresses.nginx-stream.jeffthecoder.xyz"},"spec":{"group":"nginx-stream.jeffthecoder.xyz","names":{"kind":"StreamIngress","listKind":"StreamIngressList","plural":"streamingresses","singular":"streamingress"},"scope":"Namespaced","versions":[{"name":"v1beta1","schema":{"openAPIV3Schema":{"description":"StreamIngress is the Schema for the streamingresses API","properties":{"apiVersion":{"description":"APIVersion defines the versioned schema of this representation of an object. Servers should convert recognized schemas to the latest internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources","type":"string"},"kind":{"description":"Kind is a string value representing the REST resource this object represents. Servers may infer this from the endpoint the client submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds","type":"string"},"metadata":{"type":"object"},"spec":{"description":"StreamIngressSpec defines the desired state of StreamIngress","properties":{"foo":{"description":"Foo is an example field of StreamIngress. Edit StreamIngress_types.go to remove/update","type":"string"}},"type":"object"},"status":{"description":"StreamIngressStatus defines the observed state of StreamIngress","type":"object"}},"type":"object"}},"served":true,"storage":true}]},"status":{"acceptedNames":{"kind":"","plural":""},"conditions":[],"storedVersions":[]}}
creationTimestamp: "2021-02-06T13:55:33Z"
generation: 1
managedFields:
- apiVersion: apiextensions.k8s.io/v1
fieldsType: FieldsV1
fieldsV1:
f:status:
f:acceptedNames:
f:kind: {}
f:listKind: {}
f:plural: {}
f:singular: {}
f:conditions: {}
manager: kube-apiserver
operation: Update
time: "2021-02-06T13:55:33Z"
- apiVersion: apiextensions.k8s.io/v1
fieldsType: FieldsV1
fieldsV1:
f:metadata:
f:annotations:
.: {}
f:controller-gen.kubebuilder.io/version: {}
f:kubectl.kubernetes.io/last-applied-configuration: {}
f:spec:
f:conversion:
.: {}
f:strategy: {}
f:group: {}
f:names:
f:kind: {}
f:listKind: {}
f:plural: {}
f:singular: {}
f:scope: {}
f:versions: {}
f:status:
f:storedVersions: {}
manager: kubectl-client-side-apply
operation: Update
time: "2021-02-06T13:55:33Z"
name: streamingresses.nginx-stream.jeffthecoder.xyz
resourceVersion: "469"
uid: 706dec4d-9ad3-4f09-b87d-c699ef11c661
spec:
conversion:
strategy: None
group: nginx-stream.jeffthecoder.xyz
names:
kind: StreamIngress
listKind: StreamIngressList
plural: streamingresses
singular: streamingress
scope: Namespaced
versions:
- name: v1beta1
schema:
openAPIV3Schema:
description: StreamIngress is the Schema for the streamingresses API
properties:
apiVersion:
description: 'APIVersion defines the versioned schema of this representation
of an object. Servers should convert recognized schemas to the latest
internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources'
type: string
kind:
description: 'Kind is a string value representing the REST resource this
object represents. Servers may infer this from the endpoint the client
submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds'
type: string
metadata:
type: object
spec:
description: StreamIngressSpec defines the desired state of StreamIngress
properties:
foo:
description: Foo is an example field of StreamIngress. Edit StreamIngress_types.go
to remove/update
type: string
type: object
status:
description: StreamIngressStatus defines the observed state of StreamIngress
type: object
type: object
served: true
storage: true
status:
acceptedNames:
kind: StreamIngress
listKind: StreamIngressList
plural: streamingresses
singular: streamingress
conditions:
- lastTransitionTime: "2021-02-06T13:55:33Z"
message: no conflicts found
reason: NoConflicts
status: "True"
type: NamesAccepted
- lastTransitionTime: "2021-02-06T13:55:33Z"
message: the initial names have been accepted
reason: InitialNamesAccepted
status: "True"
type: Established
storedVersions:
- v1beta1
可以看到实际上make install
做了两件事情:
- controller-gen生成资源
- kustomize生成yaml并且应用到k8s中
我们可以用kustomize再生成一下yaml,然后和上面kubectl get的结果比较一下:
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
annotations:
controller-gen.kubebuilder.io/version: v0.4.1
creationTimestamp: null
name: streamingresses.nginx-stream.jeffthecoder.xyz
spec:
group: nginx-stream.jeffthecoder.xyz
names:
kind: StreamIngress
listKind: StreamIngressList
plural: streamingresses
singular: streamingress
scope: Namespaced
versions:
- name: v1beta1
schema:
openAPIV3Schema:
description: StreamIngress is the Schema for the streamingresses API
properties:
apiVersion:
description: 'APIVersion defines the versioned schema of this representation of an object. Servers should convert recognized schemas to the latest internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources'
type: string
kind:
description: 'Kind is a string value representing the REST resource this object represents. Servers may infer this from the endpoint the client submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds'
type: string
metadata:
type: object
spec:
description: StreamIngressSpec defines the desired state of StreamIngress
properties:
foo:
description: Foo is an example field of StreamIngress. Edit StreamIngress_types.go to remove/update
type: string
type: object
status:
description: StreamIngressStatus defines the observed state of StreamIngress
type: object
type: object
served: true
storage: true
status:
acceptedNames:
kind: ""
plural: ""
conditions: []
storedVersions: []
在kustomize生成的CRD中我们可以明显看出来,schema和streamingress_types.go
中的类型是对应的。比如说spec
中的foo
对应StreamIngressSpec
中的Foo
成员
还修改了一些其他的文件
上面的步骤中,除了生成了streamingress类型有关的文件以外,还修改了一些其他的文件,略微提一下:
- PROJECT: 记录了一下资源:
resources: - group: nginx-stream kind: StreamIngress version: v1beta1
- configs/: 在
kubebuilder create api
的过程中,kubebuilder会在configs中增加一些文件,主要是根据类型生成kustomize的配置,用来生成具体的yaml,其中包括crd的定义、rbac的定义以及StreaIngress
的一个样例实例。 - controllers/: 在controllers目录中增加了一个
StreamIngressReconciler
类型,这个类型主要就是负责监视和迁移StreamIngress
的状态的。 - main.go: 修改了main函数,在main函数中,为Manager注册了
StreamIngressReconciler
。
修改类型
这个Foo Bar样例其实不大满足我们的需求。我们可以根据需求修改一下:
- 哪一个configmap(TCP?UDP?): Protocol(默认TCP)
- 地址端口
- 地址(必需)
- 端口(必需)
- 上游在集群内(默认True)
对应的streamingress_types.go
中,StreamIngressSpec就需要改一下:
// StreamAddress defines the address for a stream server or upstream
type StreamAddress struct {
Namespace string `json:"namespace,omitempty"`
Service string `json:"service"`
Port uint `json:"port"`
}
// StreamIngressSpec defines the desired state of StreamIngress
type StreamIngressSpec struct {
// INSERT ADDITIONAL SPEC FIELDS - desired state of cluster
// Important: Run "make" to regenerate code after modifying this file
Protocol string `json:"protocol,omitempty"`
Listen uint `json:"listen"`
Upstream StreamAddress `json:"upstream"`
IsService bool `json:"is_service,omitempty"`
}
再次make install
我们再次生成crd并且安装的时候,应该就可以看到config/crd/bases/nginx-stream.jeffthecoder.xyz_streamingresses.yaml
被修改了:
% make install
/Users/guochao/.asdf/shims/controller-gen "crd:trivialVersions=true" rbac:roleName=manager-role webhook paths="./..." output:crd:artifacts:config=config/crd/bases
kustomize build config/crd | kubectl apply -f -
customresourcedefinition.apiextensions.k8s.io/streamingresses.nginx-stream.jeffthecoder.xyz configured
% git diff
diff --git a/config/crd/bases/nginx-stream.jeffthecoder.xyz_streamingresses.yaml b/config/crd/bases/nginx-stream.jeffthecoder.xyz_streamingresses.yaml
index ff12f1e..1c21c6a 100644
--- a/config/crd/bases/nginx-stream.jeffthecoder.xyz_streamingresses.yaml
+++ b/config/crd/bases/nginx-stream.jeffthecoder.xyz_streamingresses.yaml
@@ -36,10 +36,38 @@ spec:
spec:
description: StreamIngressSpec defines the desired state of StreamIngress
properties:
- foo:
- description: Foo is an example field of StreamIngress. Edit StreamIngress_types.go
- to remove/update
+ is_service:
+ type: boolean
+ listen:
+ type: integer
+ protocol:
type: string
+ upstream:
+ description: StreamAddress defines the address for a stream server
+ or upstream
+ properties:
+ host:
+ type: string
+ port:
+ type: integer
+ required:
+ - host
+ - port
+ type: object
+ required:
+ - listen
+ - protocol
+ - upstream
type: object
status:
description: StreamIngressStatus defines the observed state of StreamIngress
创建一个StreamIngress试试看
上面我们提到kubebuilder建了一个StreaIngress
的一个样例实例,我们可以试着把这个样例apply进去试试看:
% kubectl apply -f config/samples/nginx-stream_v1beta1_streamingress.yaml
error: error validating "config/samples/nginx-stream_v1beta1_streamingress.yaml": error validating data: [ValidationError(StreamIngress.spec): unknown field "foo" in xyz.jeffthecoder.nginx-stream.v1beta1.StreamIngress.spec, ValidationError(StreamIngress.spec): missing required field "listen" in xyz.jeffthecoder.nginx-stream.v1beta1.StreamIngress.spec, ValidationError(StreamIngress.spec): missing required field "protocol" in xyz.jeffthecoder.nginx-stream.v1beta1.StreamIngress.spec, ValidationError(StreamIngress.spec): missing required field "upstream" in xyz.jeffthecoder.nginx-stream.v1beta1.StreamIngress.spec]; if you choose to ignore these errors, turn validation off with --validate=false
诶呀,出错了。看提示是说validation error,foo不知道是什么东西,listen protocol和upstream这三个需要填写进去。这是crd的schema起作用了。到这一步我获得的crd定义如下:
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
annotations:
controller-gen.kubebuilder.io/version: v0.4.1
creationTimestamp: null
name: streamingresses.nginx-stream.jeffthecoder.xyz
spec:
group: nginx-stream.jeffthecoder.xyz
names:
kind: StreamIngress
listKind: StreamIngressList
plural: streamingresses
singular: streamingress
scope: Namespaced
versions:
- name: v1beta1
schema:
openAPIV3Schema:
description: StreamIngress is the Schema for the streamingresses API
properties:
apiVersion:
description: 'APIVersion defines the versioned schema of this representation
of an object. Servers should convert recognized schemas to the latest
internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources'
type: string
kind:
description: 'Kind is a string value representing the REST resource this
object represents. Servers may infer this from the endpoint the client
submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds'
type: string
metadata:
type: object
spec:
description: StreamIngressSpec defines the desired state of StreamIngress
properties:
is_service:
type: boolean
listen:
type: integer
protocol:
type: string
upstream:
description: StreamAddress defines the address for a stream server
or upstream
properties:
host:
type: string
port:
type: integer
required:
- host
- port
type: object
required:
- listen
- upstream
type: object
status:
description: StreamIngressStatus defines the observed state of StreamIngress
type: object
type: object
served: true
storage: true
status:
acceptedNames:
kind: ""
plural: ""
conditions: []
storedVersions: []
其中versions下面的各个版本,每个都有自己的schema.openAPIV3Schema
,用来定义有什么东西和需要什么东西,在apply的时候k8s会根据这些定义来验证数据有效性,避免出错。
那么我们就定义一个符合schema的、有效的定义吧:
apiVersion: nginx-stream.jeffthecoder.xyz/v1beta1
kind: StreamIngress
metadata:
name: streamingress-sample
spec:
# Add fields here
listen: 8080
upstream:
host: some-tcp-port
port: 1234
这一次我们再apply的时候就没有问题了:
% kubectl apply -f config/samples/nginx-stream_v1beta1_streamingress.yaml
streamingress.nginx-stream.jeffthecoder.xyz/streamingress-sample created
让StreamIngress生效
现在我们有了StreamIngress,我们需要让它生效。这个时候我们就需要修改controller的部分了:
原始的代码在controllers/streamingress_controller.go
中
package controllers
import (
"context"
"github.com/go-logr/logr"
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
nginxstreamv1beta1 "github.com/jeffguorg/demo-stream-nginx-controller/api/v1beta1"
)
// StreamIngressReconciler reconciles a StreamIngress object
type StreamIngressReconciler struct {
client.Client
Log logr.Logger
Scheme *runtime.Scheme
}
// +kubebuilder:rbac:groups=nginx-stream.jeffthecoder.xyz,resources=streamingresses,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=nginx-stream.jeffthecoder.xyz,resources=streamingresses/status,verbs=get;update;patch
func (r *StreamIngressReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
_ = context.Background()
_ = r.Log.WithValues("streamingress", req.NamespacedName)
// your logic here
return ctrl.Result{}, nil
}
func (r *StreamIngressReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&nginxstreamv1beta1.StreamIngress{}).
Complete(r)
}
其中核心的类型就是StreamIngressReconciler,有两个函数,一个是Reconcile,每次Controller运行时判断需要检查或者梳理资源状态,就会由运行时调用这个函数,另一个是SetupWithManager,主要是在启动的时候在运行时中注册StreamIngressReconciler。上面有几行注释,主要是供controller-gen参考来生成各种资源或者规则的。
我们可以在Reconcile中获取修改各种资源的状态,比如说:
func (r *StreamIngressReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
ctx := context.Background()
logger := r.Log.WithValues("streamingress", req.NamespacedName)
// your logic here
var (
requestIngress v1beta1.StreamIngress
)
if err := r.Get(ctx, client.ObjectKey{Namespace: req.Namespace, Name: req.Name}, &requestIngress); err != nil {
logger.Error(err, "failed to list stream ingress")
return ctrl.Result{RequeueAfter: time.Second * 5}, err
} else {
logger.WithValues("request ingress", requestIngress)
}
return ctrl.Result{}, nil
}
我们就会获取到当前在reconcile的对象。
按照我们之前梳理的逻辑,我们需要在这里
- 检查StreamIngress有没有什么问题
- 更新Service,把端口暴露出来
- 更新ConfigMap,告诉Nginx这个端口需要代理
代码大致如下:
// package controllers
// import (
// "context"
// "fmt"
// "strconv"
// "time"
// "github.com/go-logr/logr"
// v1 "k8s.io/api/core/v1"
// "k8s.io/apimachinery/pkg/runtime"
// "k8s.io/apimachinery/pkg/util/intstr"
// ctrl "sigs.k8s.io/controller-runtime"
// "sigs.k8s.io/controller-runtime/pkg/client"
// "github.com/jeffguorg/demo-stream-nginx-controller/api/v1beta1"
// nginxstreamv1beta1 "github.com/jeffguorg/demo-stream-nginx-controller/api/v1beta1"
// )
// // StreamIngressReconciler reconciles a StreamIngress object
// type StreamIngressReconciler struct {
// client.Client
// Log logr.Logger
// Scheme *runtime.Scheme
TCPConfigMapKey client.ObjectKey
UDPConfigMapKey client.ObjectKey
ServiceKey client.ObjectKey
// }
// // +kubebuilder:rbac:groups=nginx-stream.jeffthecoder.xyz,resources=streamingresses,verbs=get;list;watch;create;update;patch;delete
// // +kubebuilder:rbac:groups=nginx-stream.jeffthecoder.xyz,resources=streamingresses/status,verbs=get;update;patch
// func (r *StreamIngressReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
ctx := context.Background()
logger := r.Log.WithValues("streamingress", req.NamespacedName)
// fetch stream ingress and validate
var (
requestIngress v1beta1.StreamIngress
ingressList v1beta1.StreamIngressList
)
if err := r.Get(ctx, client.ObjectKey{Namespace: req.Namespace, Name: req.Name}, &requestIngress); err != nil {
logger.Error(err, "failed to list stream ingress")
return ctrl.Result{RequeueAfter: time.Second * 5}, err
}
if err := r.List(ctx, &ingressList, client.MatchingFields{
".spec.listen": strconv.FormatUint(uint64(requestIngress.Spec.Listen), 10),
".spec.protocol": requestIngress.Spec.Protocol,
}); err != nil {
logger.Error(err, "failed to list stream ingress")
return ctrl.Result{RequeueAfter: time.Second * 5}, err
}
if len(ingressList.Items) > 1 {
logger.WithValues("count", len(ingressList.Items)).Info("multiple listen on one port")
return ctrl.Result{RequeueAfter: time.Second * 5}, nil
}
// check if upstream service exists
var (
upstreamService v1.Service
upstreamServicePortIsOpen = false
)
if err := r.Get(ctx, client.ObjectKey{Namespace: requestIngress.Spec.Upstream.Namespace, Name: requestIngress.Spec.Upstream.Service}, &upstreamService); err != nil {
logger.WithValues("service", fmt.Sprintf("%v/%v", requestIngress.Spec.Upstream.Namespace, requestIngress.Spec.Upstream.Service)).Info("unable to get service")
return ctrl.Result{RequeueAfter: time.Second * 5}, nil
}
for _, port := range upstreamService.Spec.Ports {
if port.Port == int32(requestIngress.Spec.Upstream.Port) {
upstreamServicePortIsOpen = true
}
}
if !upstreamServicePortIsOpen {
logger.WithValues("upstream-port", requestIngress.Spec.Upstream.Port).Info("upstream service is not open")
return ctrl.Result{RequeueAfter: time.Second * 5}, nil
}
// fetch service and config map to update
var (
service v1.Service
configMap v1.ConfigMap
portIsAlreadyOnService = false
)
if err := r.Get(ctx, r.ServiceKey, &service); err != nil {
logger.Error(err, "failed to get service")
return ctrl.Result{RequeueAfter: time.Second * 5}, err
}
if requestIngress.Spec.Protocol == string(v1.ProtocolUDP) {
if err := r.Get(ctx, r.UDPConfigMapKey, &configMap); err != nil {
logger.Error(err, "failed to list stream ingress")
return ctrl.Result{RequeueAfter: time.Second * 5}, err
}
} else {
if err := r.Get(ctx, r.TCPConfigMapKey, &configMap); err != nil {
logger.Error(err, "failed to list stream ingress")
return ctrl.Result{RequeueAfter: time.Second * 5}, err
}
}
// check if port is opened and open it if needed
for idx, port := range service.Spec.Ports {
if port.Protocol == v1.Protocol(requestIngress.Spec.Protocol) && port.Port == int32(requestIngress.Spec.Listen) {
service.Spec.Ports[idx].TargetPort = intstr.FromInt(int(requestIngress.Spec.Listen))
portIsAlreadyOnService = true
}
}
if !portIsAlreadyOnService {
service.Spec.Ports = append(service.Spec.Ports, v1.ServicePort{
Protocol: v1.Protocol(requestIngress.Spec.Protocol),
Port: int32(requestIngress.Spec.Listen),
TargetPort: intstr.FromInt(int(requestIngress.Spec.Listen)),
})
}
if err := r.Update(ctx, &service); err != nil {
logger.Error(err, "failed to update service")
return ctrl.Result{RequeueAfter: time.Second * 5}, err
}
// update configmap
configMap.Data[strconv.FormatInt(int64(requestIngress.Spec.Listen), 10)] = fmt.Sprintf("%v/%v:%v", requestIngress.Spec.Upstream.Namespace, requestIngress.Spec.Upstream.Service, requestIngress.Spec.Upstream.Port)
if err := r.Update(ctx, &configMap); err != nil {
logger.Error(err, "failed to update configMap")
return ctrl.Result{RequeueAfter: time.Second * 5}, err
}
// ok
return ctrl.Result{}, nil
// }
// func (r *StreamIngressReconciler) SetupWithManager(mgr ctrl.Manager) error {
// return ctrl.NewControllerManagedBy(mgr).
// For(&nginxstreamv1beta1.StreamIngress{}).
// Complete(r)
// }
大致来说就是检查StreamIngress之间有没有冲突,上游Service存在不存在,Service上端口有没有打开,然后更新Service更新ConfigMap就好了。
修改结束以后我们直接make run一下,会得到错误Index with name field:listen does not exist
。
这是因为每次列举各种东西的时候都请求一次apiserver很慢,也不明智,所以controller运行时在本地有缓存。我们需要在程序初始化的时候设置一下缓存,改一下main.go
,顺便加上flag:
// package main
// import (
// "flag"
"fmt"
// "os"
// "k8s.io/apimachinery/pkg/runtime"
// clientgoscheme "k8s.io/client-go/kubernetes/scheme"
// _ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
// ctrl "sigs.k8s.io/controller-runtime"
// "sigs.k8s.io/controller-runtime/pkg/client"
// "sigs.k8s.io/controller-runtime/pkg/log/zap"
// nginxstreamv1beta1 "github.com/jeffguorg/demo-stream-nginx-controller/api/v1beta1"
// "github.com/jeffguorg/demo-stream-nginx-controller/controllers"
// // +kubebuilder:scaffold:imports
// )
// var (
// scheme = runtime.NewScheme()
// setupLog = ctrl.Log.WithName("setup")
// )
// func init() {
// _ = clientgoscheme.AddToScheme(scheme)
// _ = nginxstreamv1beta1.AddToScheme(scheme)
// // +kubebuilder:scaffold:scheme
// }
// func main() {
// var metricsAddr string
// var enableLeaderElection bool
var (
tcpConfigMapName string
udpConfigMapName string
ingressServiceName string
ingressNamespace string
)
// flag.StringVar(&metricsAddr, "metrics-addr", ":8080", "The address the metric endpoint binds to.")
// flag.BoolVar(&enableLeaderElection, "enable-leader-election", false,
// "Enable leader election for controller manager. "+
// "Enabling this will ensure there is only one active controller manager.")
// flag.Parse()
// ctrl.SetLogger(zap.New(zap.UseDevMode(true)))
// mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
// Scheme: scheme,
// MetricsBindAddress: metricsAddr,
// Port: 9443,
// LeaderElection: enableLeaderElection,
// LeaderElectionID: "3c21781e.jeffthecoder.xyz",
// })
// if err != nil {
// setupLog.Error(err, "unable to start manager")
// os.Exit(1)
// }
cache := mgr.GetCache()
cache.IndexField(&nginxstreamv1beta1.StreamIngress{}, "listen", func(o runtime.Object) []string {
switch o := o.(type) {
case *nginxstreamv1beta1.StreamIngress:
return []string{fmt.Sprintf("%v/%v", o.Spec.Protocol, o.Spec.Listen)}
}
return []string{}
})
// if err = (&controllers.StreamIngressReconciler{
// Client: mgr.GetClient(),
// Log: ctrl.Log.WithName("controllers").WithName("StreamIngress"),
// Scheme: mgr.GetScheme(),
TCPConfigMapKey: client.ObjectKey{Namespace: ingressNamespace, Name: tcpConfigMapName},
UDPConfigMapKey: client.ObjectKey{Namespace: ingressNamespace, Name: udpConfigMapName},
ServiceKey: client.ObjectKey{Namespace: ingressNamespace, Name: ingressServiceName},
// }).SetupWithManager(mgr); err != nil {
// setupLog.Error(err, "unable to create controller", "controller", "StreamIngress")
// os.Exit(1)
// }
// // +kubebuilder:scaffold:builder
// setupLog.Info("starting manager")
// if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
// setupLog.Error(err, "problem running manager")
// os.Exit(1)
// }
}
基础版controller大概就是这样了。