需求背景分析 基于 k8s
的二次开发过程中,有些场景我们会定制化的去开发自己的 CRD
+ Controller
,即 Operator
来实现基于k8s
的云原生化的部署与自动化运维功能,暂且称之为底层的基座能力 。
如果我们想基于底层能力,并想要将其封装为控制台来供上层业务调用的话,我们需要有机会能够去控制与使用这样的接口能力,基于对Client-go
的使用,也许有胖友会想到dynamic client
的使用,但是作为设计与开发人员,我们应该清醒的认识到对于序列化与反序列化过程,扔一堆map
是多么的头疼(除非恰巧业务开发人员与Operator
设计设计者是同一人>..<)。
我们能不能有机会像使用k8s
中的原生资源如Deployments
、Service
等一样方便的去使用呢?
必备概念与技能 在进行具体分析前,建议胖友先去了解下Kubernetes API
概念 ,同时具备查阅Kubernetes API
的能力。
为了方便举例,我在本地k8s
集群注册了emqx-operator
中的 CRD
自定义资源,将其视为与 k8s
原生资源同等地位。
1 2 emqxbrokers.apps.emqx.io 2021-12-09T03:59:28Z emqxenterprises.apps.emqx.io 2021-12-09T03:59:28Z
另外可以通过kubectl api-version
查看其API
相关信息:
1 2 $kubectl api-versions | grep emqxapps.emqx.io/v1beta1
通过 kubectl api-resoures
查看相关 Group
,Version
,Kind
信息。
1 2 3 4 $kubectl api-resources NAME SHORTNAMES APIVERSION NAMESPACED KIND emqxbrokers emqx apps.emqx.io/v1beta1 true EmqxBroker emqxenterprises emqx-ee apps.emqx.io/v1beta1 true EmqxEnterprise
相信胖友应该都掌握如上知识概念及具备如上的基础技能了。^.^
设计实现 关于 client-go
的官方文档描述还是蛮少的,作为设计与开发者,胖友们得具备源码分析能力。下面让我们切入client-go
官方 Repo
中。其中examples
的create-update-delete-deployment
的示例展示了如何使用client-go
库来进行rest
请求的方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 func main () { var kubeconfig *string if home := homedir.HomeDir(); home != "" { kubeconfig = flag.String("kubeconfig" , filepath.Join(home, ".kube" , "config" ), "(optional) absolute path to the kubeconfig file" ) } else { kubeconfig = flag.String("kubeconfig" , "" , "absolute path to the kubeconfig file" ) } flag.Parse() config, err := clientcmd.BuildConfigFromFlags("" , *kubeconfig) ... clientset, err := kubernetes.NewForConfig(config) deploymentsClient := clientset.AppsV1().Deployments(apiv1.NamespaceDefault) fmt.Println("Creating deployment..." ) result, err := deploymentsClient.Create(context.TODO(), deployment, metav1.CreateOptions{}) if err != nil { panic (err) } fmt.Printf("Created deployment %q.\n" , result.GetObjectMeta().GetName()) prompt() fmt.Println("Updating deployment..." ) retryErr := retry.RetryOnConflict(retry.DefaultRetry, func () error { result, getErr := deploymentsClient.Get(context.TODO(), "demo-deployment" , metav1.GetOptions{}) if getErr != nil { panic (fmt.Errorf("Failed to get latest version of Deployment: %v" , getErr)) } result.Spec.Replicas = int32Ptr(1 ) result.Spec.Template.Spec.Containers[0 ].Image = "nginx:1.13" _, updateErr := deploymentsClient.Update(context.TODO(), result, metav1.UpdateOptions{}) return updateErr }) if retryErr != nil { panic (fmt.Errorf("Update failed: %v" , retryErr)) } fmt.Println("Updated deployment..." ) ... fmt.Printf("Listing deployments in namespace %q:\n" , apiv1.NamespaceDefault) list, err := deploymentsClient.List(context.TODO(), metav1.ListOptions{}) if err != nil { panic (err) } for _, d := range list.Items { fmt.Printf(" * %s (%d replicas)\n" , d.Name, *d.Spec.Replicas) } ... fmt.Println("Deleting deployment..." ) deletePolicy := metav1.DeletePropagationForeground if err := deploymentsClient.Delete(context.TODO(), "demo-deployment" , metav1.DeleteOptions{ PropagationPolicy: &deletePolicy, }); err != nil { panic (err) } fmt.Println("Deleted deployment." ) }
设计的重点就在于如何设计与实现CRD
对应的 resource
的clientset
。
clientset
的设计通过源码我们发现实际上就是通过 Config
去构造 rest http
的客户端。
NewForConfig
1 2 3 4 5 6 7 8 9 10 11 func NewForConfig (c *rest.Config) (*Clientset, error) { configShallowCopy := *c httpClient, err := rest.HTTPClientFor(&configShallowCopy) if err != nil { return nil , err } return NewForConfigAndClient(&configShallowCopy, httpClient) }
Clientset
1 2 3 4 5 6 7 8 9 10 11 12 13 type Interface interface { Discovery() discovery.DiscoveryInterface ... AppsV1() appsv1.AppsV1Interface ... } type Clientset struct { *discovery.DiscoveryClient ... appsV1 *appsv1.AppsV1Client ... }
到了这里我们发现 Clientset
就是对 AppSV1
下具体资源比如Deployment
的抽象,对外暴露引用,我们最终需要的也是提供这样的一个抽象层面。
实际上AppsV1
也是一个维度的抽象,让我们继续往下看:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 type AppsV1Interface interface { RESTClient() rest.Interface ... DeploymentsGetter ... } type AppsV1Client struct { restClient rest.Interface } ... func (c *AppsV1Client) Deployments (namespace string ) DeploymentInterface { return newDeployments(c, namespace) } ... func NewForConfig (c *rest.Config) (*AppsV1Client, error) { config := *c if err := setConfigDefaults(&config); err != nil { return nil , err } httpClient, err := rest.HTTPClientFor(&config) if err != nil { return nil , err } return NewForConfigAndClient(&config, httpClient) } func NewForConfigAndClient (c *rest.Config, h *http.Client) (*AppsV1Client, error) { ... return &AppsV1Client{client}, nil } ... func setConfigDefaults (config *rest.Config) error { gv := v1.SchemeGroupVersion config.GroupVersion = &gv config.APIPath = "/apis" config.NegotiatedSerializer = scheme.Codecs.WithoutConversion() if config.UserAgent == "" { config.UserAgent = rest.DefaultKubernetesUserAgent() } return nil } func (c *AppsV1Client) RESTClient () rest .Interface { if c == nil { return nil } return c.restClient }
相信看到这里我们已经有了清晰的认识了,实际上这里就是去构造对应具体Resource
的rest client
。
Resource
的client
大体构造我们是有了,那么客户端如何知道去请求啥URL
的了?
这里考察我们对概念的理解与掌握,我们应该对SchemeGroupVersion
这样的关键变量有敏锐的捕捉能力,我们去看看这个SchemeGroupVersion
究竟是啥?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 const GroupName = "apps" var SchemeGroupVersion = schema.GroupVersion{Group: GroupName, Version: "v1" }func Resource (resource string ) schema .GroupResource { return SchemeGroupVersion.WithResource(resource).GroupResource() } var ( SchemeBuilder = runtime.NewSchemeBuilder(addKnownTypes) localSchemeBuilder = &SchemeBuilder AddToScheme = localSchemeBuilder.AddToScheme ) func addKnownTypes (scheme *runtime.Scheme) error { scheme.AddKnownTypes(SchemeGroupVersion, &Deployment{}, &DeploymentList{}, &StatefulSet{}, &StatefulSetList{}, &DaemonSet{}, &DaemonSetList{}, &ReplicaSet{}, &ReplicaSetList{}, &ControllerRevision{}, &ControllerRevisionList{}, ) metav1.AddToGroupVersion(scheme, SchemeGroupVersion) return nil }
相信看到这里一切都柳暗花明了,其实就是告知k8s
我注册了这些schema
,k8s
知道了这些Resource
的存在,那么当我去请求的Resouce
操作的时候能够按照我们的预期达到功能实现。
CRD
关键实现那么对于自定义的 CRD
实现肯定少不了如上分心的setConfigDefaults
实现,如下为针对与emqx-operator
实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 var ( emqxbrokerGVR = schema.GroupVersion{Group: "apps.emqx.io" , Version: "v1beta1" } ) func setConfigDefaults (config *rest.Config) error { gv := emqxbrokerGVR config.GroupVersion = &gv config.APIPath = "/apis" config.NegotiatedSerializer = scheme.Codecs.WithoutConversion() if config.UserAgent == "" { config.UserAgent = rest.DefaultKubernetesUserAgent() } return nil }
到了这里对于Client
的构造就算结束了,这其中的彩蛋还包括请求API
的URL
。
CRD
资源操作具体实现如上我们主要讲述了如何去构造client
,那么对于CRD
的具体操作我们是如何实现的呢?
这里让我们将注意力再返回到AppsV1Interface
:
1 2 3 4 5 6 type AppsV1Interface interface { RESTClient() rest.Interface ... DeploymentsGetter ... }
让我们看看DeploymentGetter
究竟是啥。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 ... type DeploymentsGetter interface { Deployments(namespace string ) DeploymentInterface } type DeploymentInterface interface { Create(ctx context.Context, deployment *v1.Deployment, opts metav1.CreateOptions) (*v1.Deployment, error) Update(ctx context.Context, deployment *v1.Deployment, opts metav1.UpdateOptions) (*v1.Deployment, error) UpdateStatus(ctx context.Context, deployment *v1.Deployment, opts metav1.UpdateOptions) (*v1.Deployment, error) Delete(ctx context.Context, name string , opts metav1.DeleteOptions) error DeleteCollection(ctx context.Context, opts metav1.DeleteOptions, listOpts metav1.ListOptions) error Get(ctx context.Context, name string , opts metav1.GetOptions) (*v1.Deployment, error) List(ctx context.Context, opts metav1.ListOptions) (*v1.DeploymentList, error) Watch(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) Patch(ctx context.Context, name string , pt types.PatchType, data []byte , opts metav1.PatchOptions, subresources ...string ) (result *v1.Deployment, err error) Apply(ctx context.Context, deployment *appsv1.DeploymentApplyConfiguration, opts metav1.ApplyOptions) (result *v1.Deployment, err error) ApplyStatus(ctx context.Context, deployment *appsv1.DeploymentApplyConfiguration, opts metav1.ApplyOptions) (result *v1.Deployment, err error) GetScale(ctx context.Context, deploymentName string , options metav1.GetOptions) (*autoscalingv1.Scale, error) UpdateScale(ctx context.Context, deploymentName string , scale *autoscalingv1.Scale, opts metav1.UpdateOptions) (*autoscalingv1.Scale, error) ApplyScale(ctx context.Context, deploymentName string , scale *applyconfigurationsautoscalingv1.ScaleApplyConfiguration, opts metav1.ApplyOptions) (*autoscalingv1.Scale, error) DeploymentExpansion } ...
看到这里实际上一切都已经明了,通过Interface
抽象了对于Deployment
的操作,具体的实现就不展开分析了。
验证 环境准备 运行环境为通过minikube
启动的本地k8s
集群,另外在集群中注册CRD
:
1 2 3 $kubectl get crd | grep emqx emqxbrokers.apps.emqx.io 2021-12-09T03:59:28Z emqxenterprises.apps.emqx.io 2021-12-09T03:59:28Z
验证CRD Client
下面让我们验证下Client
实际运行情况,验证对自定义的CRD
实例的Create
,Get
,List
,Delete
的验证。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 func DemoForEmqxBroker (config *rest.Config, ns string ) { clientset, err := pkg.NewForConfig(config) if err != nil { panic (err) } emqxbrokerClient := clientset.EmqxBrokersV1Beta1().EmqxBrokers(ns) Prompt() fmt.Println("[> create emqxbroker" ) emqxbroker, err := emqxbrokerClient.Create(context.TODO(), resource.GenerateEmqxbroker(ns), metav1.CreateOptions{}) if err != nil { panic (err) } fmt.Printf("create emqxbroker: %+v\n" , emqxbroker) Prompt() fmt.Println("[> get emqxbroker" ) eb, err := emqxbrokerClient.Get(context.TODO(), "emqx" , metav1.GetOptions{}) if err != nil { panic (err) } fmt.Printf("emqxbroker found: %+v\n" , eb) Prompt() fmt.Println("[> list emqxbroker" ) eblist, err := emqxbrokerClient.List(context.TODO(), metav1.ListOptions{}) if err != nil { panic (err) } fmt.Printf("emqxbroker list: %+v\n" , eblist) Prompt() fmt.Println("[> delete emqxbroker" ) err = emqxbrokerClient.Delete(context.TODO(), "emqx" , metav1.DeleteOptions{}) if err != nil { panic (err) } fmt.Printf("Delete emqxbroker successfully" ) }
1 2 [> create emqxbroker create emqxbroker: EmqxBroker instance [emqx],Image [emqx/emqx:4.3.10]
查看下K8s
集群中实例的情况:
1 2 3 4 5 6 7 8 $kubectl get emqx emqx NAME AGE emqx 2m26s $kubectl get pods NAME READY STATUS RESTARTS AGE emqx-0 1/1 Running 0 2m30s emqx-1 1/1 Running 0 2m30s emqx-2 1/1 Running 0 2m30s
1 2 [> get emqxbroker emqxbroker found: EmqxBroker instance [emqx],Image [emqx/emqx:4.3.10]
1 2 [> list emqxbroker emqxbroker list: &{TypeMeta:{Kind: APIVersion:} ListMeta:{SelfLink: ResourceVersion:157139 Continue: RemainingItemCount:<nil>} Items:[{TypeMeta:{Kind:EmqxBroker APIVersion:apps.emqx.io/v1beta1} ObjectMeta:{Name:emqx GenerateName: Namespace:default SelfLink: UID:74896493-0134-460a-a04d-d3bdaed21902 ResourceVersion:157121 Generation:1 CreationTimestamp:2021-12-26 20:23:15 +0800 CST DeletionTimestamp:<nil> DeletionGracePeriodSeconds:<nil> Labels:map[] Annotations:map[] OwnerReferences:[] Finalizers:[] ClusterName: ManagedFields:[{Manager:Go-http-client Operation:Update APIVersion:apps.emqx.io/v1beta1 Time:2021-12-26 20:23:15 +0800 CST FieldsType:FieldsV1 FieldsV1:{"f:spec" :{"." :{},"f:image" :{},"f:labels" :{"." :{},"f:cluster" :{}},"f:listener" :{"." :{},"f:nodePorts" :{},"f:ports" :{}},"f:replicas" :{},"f:resources" :{},"f:serviceAccountName" :{}}} Subresource:} {Manager:__debug_bin4215542756 Operation:Update APIVersion:apps.emqx.io/v1beta1 Time:2021-12-26 20:23:15 +0800 CST FieldsType:FieldsV1 FieldsV1:{"f:status" :{"." :{},"f:conditions" :{}}} Subresource:}]} Spec:{Replicas:0xc000492ec8 Image:emqx/emqx:4.3.10 ServiceAccountName:emqx Resources:{Limits:map[] Requests:map[]} Storage:<nil> Labels:map[cluster:emqx] Listener:{Type: LoadBalancerIP: LoadBalancerSourceRanges:[] ExternalIPs:[] Ports:{MQTT:0 MQTTS:0 WS:0 WSS:0 Dashboard:0 API:0} NodePorts:{MQTT:0 MQTTS:0 WS:0 WSS:0 Dashboard:0 API:0}} Affinity:nil ToleRations:[] NodeSelector:map[] ImagePullPolicy: ExtraVolumes:[] ExtraVolumeMounts:[] Env:[] ACL:[] Plugins:[] Modules:[]} Status:{Conditions:[{Type:Healthy Status:True LastUpdateTime:2021-12-26T20:27:26+08:00 LastUpdateAt:0001-01-01 00:00:00 +0000 UTC LastTransitionTime:2021-12-26T20:23:16+08:00 Reason:Cluster available Message:Cluster ok} {Type:Creating Status:True LastUpdateTime:2021-12-26T20:23:15+08:00 LastUpdateAt:0001-01-01 00:00:00 +0000 UTC LastTransitionTime:2021-12-26T20:23:15+08:00 Reason:Creating Message:Bootstrap emqx cluster}]}}]}
1 2 [> delete emqxbroker Delete emqxbroker successfull