需求背景分析

基于 k8s 的二次开发过程中,有些场景我们会定制化的去开发自己的 CRD + Controller,即 Operator来实现基于k8s的云原生化的部署与自动化运维功能,暂且称之为底层的基座能力

如果我们想基于底层能力,并想要将其封装为控制台来供上层业务调用的话,我们需要有机会能够去控制与使用这样的接口能力,基于对Client-go的使用,也许有胖友会想到dynamic client的使用,但是作为设计与开发人员,我们应该清醒的认识到对于序列化与反序列化过程,扔一堆map是多么的头疼(除非恰巧业务开发人员与Operator设计设计者是同一人>..<)。

我们能不能有机会像使用k8s 中的原生资源如DeploymentsService等一样方便的去使用呢?

必备概念与技能

在进行具体分析前,建议胖友先去了解下Kubernetes API 概念,同时具备查阅Kubernetes API的能力。

为了方便举例,我在本地k8s集群注册了emqx-operator 中的 CRD自定义资源,将其视为与 k8s 原生资源同等地位。

1
2
emqxbrokers.apps.emqx.io                    2021-12-09T03:59:28Z
emqxenterprises.apps.emqx.io 2021-12-09T03:59:28Z

另外可以通过kubectl api-version查看其API相关信息:

1
2
$kubectl api-versions | grep emqx
apps.emqx.io/v1beta1

通过 kubectl api-resoures 查看相关 Group,Version,Kind信息。

1
2
3
4
$kubectl api-resources              
NAME SHORTNAMES APIVERSION NAMESPACED KIND
emqxbrokers emqx apps.emqx.io/v1beta1 true EmqxBroker
emqxenterprises emqx-ee apps.emqx.io/v1beta1 true EmqxEnterprise

相信胖友应该都掌握如上知识概念及具备如上的基础技能了。^.^

设计实现

关于 client-go 的官方文档描述还是蛮少的,作为设计与开发者,胖友们得具备源码分析能力。下面让我们切入client-go 官方 Repo 中。其中examplescreate-update-delete-deployment的示例展示了如何使用client-go库来进行rest请求的方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
func main() {
var kubeconfig *string
if home := homedir.HomeDir(); home != "" {
kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file")
} else {
kubeconfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file")
}
flag.Parse()

// Config 的初始化
config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
...
// clientset 的构造
clientset, err := kubernetes.NewForConfig(config)

// 使用 resource client 进行 resource 资源的操作
deploymentsClient := clientset.AppsV1().Deployments(apiv1.NamespaceDefault)

// Create Deployment
fmt.Println("Creating deployment...")
result, err := deploymentsClient.Create(context.TODO(), deployment, metav1.CreateOptions{})
if err != nil {
panic(err)
}
fmt.Printf("Created deployment %q.\n", result.GetObjectMeta().GetName())

// Update Deployment
prompt()
fmt.Println("Updating deployment...")
// You have two options to Update() this Deployment:
//
// 1. Modify the "deployment" variable and call: Update(deployment).
// This works like the "kubectl replace" command and it overwrites/loses changes
// made by other clients between you Create() and Update() the object.
// 2. Modify the "result" returned by Get() and retry Update(result) until
// you no longer get a conflict error. This way, you can preserve changes made
// by other clients between Create() and Update(). This is implemented below
// using the retry utility package included with client-go. (RECOMMENDED)
//
// More Info:
// https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#concurrency-control-and-consistency

retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error {
// Retrieve the latest version of Deployment before attempting update
// RetryOnConflict uses exponential backoff to avoid exhausting the apiserver
result, getErr := deploymentsClient.Get(context.TODO(), "demo-deployment", metav1.GetOptions{})
if getErr != nil {
panic(fmt.Errorf("Failed to get latest version of Deployment: %v", getErr))
}

result.Spec.Replicas = int32Ptr(1) // reduce replica count
result.Spec.Template.Spec.Containers[0].Image = "nginx:1.13" // change nginx version
_, updateErr := deploymentsClient.Update(context.TODO(), result, metav1.UpdateOptions{})
return updateErr
})
if retryErr != nil {
panic(fmt.Errorf("Update failed: %v", retryErr))
}
fmt.Println("Updated deployment...")

// List Deployments
...
fmt.Printf("Listing deployments in namespace %q:\n", apiv1.NamespaceDefault)
list, err := deploymentsClient.List(context.TODO(), metav1.ListOptions{})
if err != nil {
panic(err)
}
for _, d := range list.Items {
fmt.Printf(" * %s (%d replicas)\n", d.Name, *d.Spec.Replicas)
}

// Delete Deployment
...
fmt.Println("Deleting deployment...")
deletePolicy := metav1.DeletePropagationForeground
if err := deploymentsClient.Delete(context.TODO(), "demo-deployment", metav1.DeleteOptions{
PropagationPolicy: &deletePolicy,
}); err != nil {
panic(err)
}
fmt.Println("Deleted deployment.")
}

设计的重点就在于如何设计与实现CRD对应的 resourceclientset

clientset 的设计

通过源码我们发现实际上就是通过 Config 去构造 rest http 的客户端。

NewForConfig

1
2
3
4
5
6
7
8
9
10
11
func NewForConfig(c *rest.Config) (*Clientset, error) {
configShallowCopy := *c

// share the transport between all clients
httpClient, err := rest.HTTPClientFor(&configShallowCopy)
if err != nil {
return nil, err
}

return NewForConfigAndClient(&configShallowCopy, httpClient)
}

Clientset

1
2
3
4
5
6
7
8
9
10
11
12
13
type Interface interface {
Discovery() discovery.DiscoveryInterface
...
AppsV1() appsv1.AppsV1Interface // Deployment 资源调用接口
...
}

type Clientset struct {
*discovery.DiscoveryClient
...
appsV1 *appsv1.AppsV1Client
...
}

到了这里我们发现 Clientset 就是对 AppSV1 下具体资源比如Deployment的抽象,对外暴露引用,我们最终需要的也是提供这样的一个抽象层面。

实际上AppsV1 也是一个维度的抽象,让我们继续往下看:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
type AppsV1Interface interface {
RESTClient() rest.Interface
...
DeploymentsGetter
...
}

// AppsV1Client is used to interact with features provided by the apps group.
type AppsV1Client struct {
restClient rest.Interface
}

...
func (c *AppsV1Client) Deployments(namespace string) DeploymentInterface {
return newDeployments(c, namespace)
}
...
func NewForConfig(c *rest.Config) (*AppsV1Client, error) {
config := *c
// 需要重点留意 setConfigDefaults(&config)
if err := setConfigDefaults(&config); err != nil {
return nil, err
}
httpClient, err := rest.HTTPClientFor(&config)
if err != nil {
return nil, err
}
return NewForConfigAndClient(&config, httpClient)
}

func NewForConfigAndClient(c *rest.Config, h *http.Client) (*AppsV1Client, error) {
...
return &AppsV1Client{client}, nil
}

...
func setConfigDefaults(config *rest.Config) error {
// 需要重点留意 gv
gv := v1.SchemeGroupVersion
config.GroupVersion = &gv
config.APIPath = "/apis"
config.NegotiatedSerializer = scheme.Codecs.WithoutConversion()

if config.UserAgent == "" {
config.UserAgent = rest.DefaultKubernetesUserAgent()
}

return nil
}

func (c *AppsV1Client) RESTClient() rest.Interface {
if c == nil {
return nil
}
return c.restClient
}

相信看到这里我们已经有了清晰的认识了,实际上这里就是去构造对应具体Resourcerest client

Resourceclient 大体构造我们是有了,那么客户端如何知道去请求啥URL的了?

这里考察我们对概念的理解与掌握,我们应该对SchemeGroupVersion这样的关键变量有敏锐的捕捉能力,我们去看看这个SchemeGroupVersion究竟是啥?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// GroupName is the group name use in this package
const GroupName = "apps"

// SchemeGroupVersion is group version used to register these objects
var SchemeGroupVersion = schema.GroupVersion{Group: GroupName, Version: "v1"}

// Resource takes an unqualified resource and returns a Group qualified GroupResource
func Resource(resource string) schema.GroupResource {
return SchemeGroupVersion.WithResource(resource).GroupResource()
}

var (
// TODO: move SchemeBuilder with zz_generated.deepcopy.go to k8s.io/api.
// localSchemeBuilder and AddToScheme will stay in k8s.io/kubernetes.
SchemeBuilder = runtime.NewSchemeBuilder(addKnownTypes)
localSchemeBuilder = &SchemeBuilder
AddToScheme = localSchemeBuilder.AddToScheme
)

// Adds the list of known types to the given scheme.
func addKnownTypes(scheme *runtime.Scheme) error {
scheme.AddKnownTypes(SchemeGroupVersion,
&Deployment{},
&DeploymentList{},
&StatefulSet{},
&StatefulSetList{},
&DaemonSet{},
&DaemonSetList{},
&ReplicaSet{},
&ReplicaSetList{},
&ControllerRevision{},
&ControllerRevisionList{},
)
metav1.AddToGroupVersion(scheme, SchemeGroupVersion)
return nil
}

相信看到这里一切都柳暗花明了,其实就是告知k8s 我注册了这些schemak8s 知道了这些Resource的存在,那么当我去请求的Resouce操作的时候能够按照我们的预期达到功能实现。

CRD 关键实现

那么对于自定义的 CRD 实现肯定少不了如上分心的setConfigDefaults实现,如下为针对与emqx-operator实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
var (
emqxbrokerGVR = schema.GroupVersion{Group: "apps.emqx.io", Version: "v1beta1"}
)
func setConfigDefaults(config *rest.Config) error {
gv := emqxbrokerGVR
config.GroupVersion = &gv
config.APIPath = "/apis"
config.NegotiatedSerializer = scheme.Codecs.WithoutConversion()

if config.UserAgent == "" {
config.UserAgent = rest.DefaultKubernetesUserAgent()
}

return nil
}

到了这里对于Client的构造就算结束了,这其中的彩蛋还包括请求APIURL

CRD 资源操作具体实现

如上我们主要讲述了如何去构造client,那么对于CRD的具体操作我们是如何实现的呢?

这里让我们将注意力再返回到AppsV1Interface

1
2
3
4
5
6
type AppsV1Interface interface {
RESTClient() rest.Interface
...
DeploymentsGetter
...
}

让我们看看DeploymentGetter究竟是啥。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
...
type DeploymentsGetter interface {
Deployments(namespace string) DeploymentInterface
}

// DeploymentInterface has methods to work with Deployment resources.
type DeploymentInterface interface {
Create(ctx context.Context, deployment *v1.Deployment, opts metav1.CreateOptions) (*v1.Deployment, error)
Update(ctx context.Context, deployment *v1.Deployment, opts metav1.UpdateOptions) (*v1.Deployment, error)
UpdateStatus(ctx context.Context, deployment *v1.Deployment, opts metav1.UpdateOptions) (*v1.Deployment, error)
Delete(ctx context.Context, name string, opts metav1.DeleteOptions) error
DeleteCollection(ctx context.Context, opts metav1.DeleteOptions, listOpts metav1.ListOptions) error
Get(ctx context.Context, name string, opts metav1.GetOptions) (*v1.Deployment, error)
List(ctx context.Context, opts metav1.ListOptions) (*v1.DeploymentList, error)
Watch(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error)
Patch(ctx context.Context, name string, pt types.PatchType, data []byte, opts metav1.PatchOptions, subresources ...string) (result *v1.Deployment, err error)
Apply(ctx context.Context, deployment *appsv1.DeploymentApplyConfiguration, opts metav1.ApplyOptions) (result *v1.Deployment, err error)
ApplyStatus(ctx context.Context, deployment *appsv1.DeploymentApplyConfiguration, opts metav1.ApplyOptions) (result *v1.Deployment, err error)
GetScale(ctx context.Context, deploymentName string, options metav1.GetOptions) (*autoscalingv1.Scale, error)
UpdateScale(ctx context.Context, deploymentName string, scale *autoscalingv1.Scale, opts metav1.UpdateOptions) (*autoscalingv1.Scale, error)
ApplyScale(ctx context.Context, deploymentName string, scale *applyconfigurationsautoscalingv1.ScaleApplyConfiguration, opts metav1.ApplyOptions) (*autoscalingv1.Scale, error)

DeploymentExpansion
}
...

看到这里实际上一切都已经明了,通过Interface抽象了对于Deployment的操作,具体的实现就不展开分析了。

验证

环境准备

运行环境为通过minikube启动的本地k8s集群,另外在集群中注册CRD:

1
2
3
$kubectl get crd | grep emqx 
emqxbrokers.apps.emqx.io 2021-12-09T03:59:28Z
emqxenterprises.apps.emqx.io 2021-12-09T03:59:28Z

验证CRD Client

下面让我们验证下Client实际运行情况,验证对自定义的CRD实例的Create,Get,List,Delete的验证。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// Demo 演示

func DemoForEmqxBroker(config *rest.Config, ns string) {
// Create emqxbroker restclient
clientset, err := pkg.NewForConfig(config)
if err != nil {
panic(err)
}

emqxbrokerClient := clientset.EmqxBrokersV1Beta1().EmqxBrokers(ns)

// Create emqxbroker instance
Prompt()
fmt.Println("[> create emqxbroker")
emqxbroker, err := emqxbrokerClient.Create(context.TODO(), resource.GenerateEmqxbroker(ns), metav1.CreateOptions{})
if err != nil {
panic(err)
}
fmt.Printf("create emqxbroker: %+v\n", emqxbroker)

// Get emqxbroker instance
Prompt()
fmt.Println("[> get emqxbroker")
eb, err := emqxbrokerClient.Get(context.TODO(), "emqx", metav1.GetOptions{})
if err != nil {
panic(err)
}
fmt.Printf("emqxbroker found: %+v\n", eb)

// Get emqxbroker list
Prompt()
fmt.Println("[> list emqxbroker")
eblist, err := emqxbrokerClient.List(context.TODO(), metav1.ListOptions{})
if err != nil {
panic(err)
}
fmt.Printf("emqxbroker list: %+v\n", eblist)

// Delete emqxbroker instance
Prompt()
fmt.Println("[> delete emqxbroker")
err = emqxbrokerClient.Delete(context.TODO(), "emqx", metav1.DeleteOptions{})
if err != nil {
panic(err)
}
fmt.Printf("Delete emqxbroker successfully")
}
  • Create
1
2
[> create emqxbroker
create emqxbroker: EmqxBroker instance [emqx],Image [emqx/emqx:4.3.10]

查看下K8s集群中实例的情况:

1
2
3
4
5
6
7
8
$kubectl get emqx emqx         
NAME AGE
emqx 2m26s
$kubectl get pods
NAME READY STATUS RESTARTS AGE
emqx-0 1/1 Running 0 2m30s
emqx-1 1/1 Running 0 2m30s
emqx-2 1/1 Running 0 2m30s
  • Get
1
2
[> get emqxbroker
emqxbroker found: EmqxBroker instance [emqx],Image [emqx/emqx:4.3.10]
  • List
1
2
[> list emqxbroker
emqxbroker list: &{TypeMeta:{Kind: APIVersion:} ListMeta:{SelfLink: ResourceVersion:157139 Continue: RemainingItemCount:<nil>} Items:[{TypeMeta:{Kind:EmqxBroker APIVersion:apps.emqx.io/v1beta1} ObjectMeta:{Name:emqx GenerateName: Namespace:default SelfLink: UID:74896493-0134-460a-a04d-d3bdaed21902 ResourceVersion:157121 Generation:1 CreationTimestamp:2021-12-26 20:23:15 +0800 CST DeletionTimestamp:<nil> DeletionGracePeriodSeconds:<nil> Labels:map[] Annotations:map[] OwnerReferences:[] Finalizers:[] ClusterName: ManagedFields:[{Manager:Go-http-client Operation:Update APIVersion:apps.emqx.io/v1beta1 Time:2021-12-26 20:23:15 +0800 CST FieldsType:FieldsV1 FieldsV1:{"f:spec":{".":{},"f:image":{},"f:labels":{".":{},"f:cluster":{}},"f:listener":{".":{},"f:nodePorts":{},"f:ports":{}},"f:replicas":{},"f:resources":{},"f:serviceAccountName":{}}} Subresource:} {Manager:__debug_bin4215542756 Operation:Update APIVersion:apps.emqx.io/v1beta1 Time:2021-12-26 20:23:15 +0800 CST FieldsType:FieldsV1 FieldsV1:{"f:status":{".":{},"f:conditions":{}}} Subresource:}]} Spec:{Replicas:0xc000492ec8 Image:emqx/emqx:4.3.10 ServiceAccountName:emqx Resources:{Limits:map[] Requests:map[]} Storage:<nil> Labels:map[cluster:emqx] Listener:{Type: LoadBalancerIP: LoadBalancerSourceRanges:[] ExternalIPs:[] Ports:{MQTT:0 MQTTS:0 WS:0 WSS:0 Dashboard:0 API:0} NodePorts:{MQTT:0 MQTTS:0 WS:0 WSS:0 Dashboard:0 API:0}} Affinity:nil ToleRations:[] NodeSelector:map[] ImagePullPolicy: ExtraVolumes:[] ExtraVolumeMounts:[] Env:[] ACL:[] Plugins:[] Modules:[]} Status:{Conditions:[{Type:Healthy Status:True LastUpdateTime:2021-12-26T20:27:26+08:00 LastUpdateAt:0001-01-01 00:00:00 +0000 UTC LastTransitionTime:2021-12-26T20:23:16+08:00 Reason:Cluster available Message:Cluster ok} {Type:Creating Status:True LastUpdateTime:2021-12-26T20:23:15+08:00 LastUpdateAt:0001-01-01 00:00:00 +0000 UTC LastTransitionTime:2021-12-26T20:23:15+08:00 Reason:Creating Message:Bootstrap emqx cluster}]}}]}
  • Delete
1
2
[> delete emqxbroker
Delete emqxbroker successfull