I have added a new CRD ApiGateway
to Kubernetes and I want to watch for new/changed resources of it.
This works with a simple Rest Client as shown in the example below.
But I´d like to watch for these resources with k8s.io/client-go/kubernetes
.
While it is simple to get the standard resources like in the client-go example below , I don´t get anything working for CRDs. Is it possible to get that done with client-go?
client-go example for standard resources
import (
....
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
)
func handleNewServices(clientset *kubernetes.Clientset) {
for {
serviceStreamWatcher, err := clientset.CoreV1().Services("").Watch(metav1.ListOptions{})
if err != nil {
panic(err.Error())
}
//fmt.Printf("%T\n", serviceStreamWatcher)
for {
select {
case event := <-serviceStreamWatcher.ResultChan():
service := event.Object.(*v1.Service)
for key, value := range service.Labels {
fmt.Printf("Key, VAlue: %s %s\n", key, value)
}
...
RestClient (working fine)
package main
import (
"net/http"
....
)
func main() {
for {
// Url "cw.com" must match the config spec.group in api-gateway-crd.yaml
// URL "apigateways" must match the config spec.names.plural in api-gateway-crd.yaml
resp, err := http.Get("http://localhost:8001/apis/cw.com/v1/apigateways?watch=true")
if err != nil {
panic(err)
}
defer resp.Body.Close()
decoder := json.NewDecoder(resp.Body)
for {
var event v1.ApiGatewayWatchEvent
if err := decoder.Decode(&event); err == io.EOF {
break
} else if err != nil {
log.Fatal(err)
}
log.Printf("Received watch event: %s: %s: \n", event.Type, event.Object.Metadata.Name)
}
}
}
CRD
apiVersion: apiextensions.k8s.io/v1beta1
kind: CustomResourceDefinition
metadata:
name: apigateways.cw.com
spec:
scope: Namespaced
group: cw.com
version: v1
names:
kind: ApiGateway
singular: apigateway
plural: apigateways
The custom resource created from a CRD object can be either namespaced or cluster-scoped, as specified in the CRD's spec.
A custom resource definition (CRD) is a powerful feature introduced in Kubernetes 1.7. The standard Kubernetes distribution ships with many built-in API objects and resources. CRDs enable IT admins to introduce unique objects or types into the Kubernetes cluster to meet their custom requirements.
To create a new CRD, we use “apiextensions.k8s.io/v1beta1” as the value. The kind key specifies what kind of object you want to create. As we are about to create a CRD, we put “CustomResourceDefinition” as the value. The metadata key is used to define the data that can uniquely identify the object.
If you think about it, client-go
knows about deployments
, services
, pods
etc resources. But it doesn't recognize your CRD ApiGateway
.
So, client-go
can't be used as a client for your custom made resources (wait-for-it), unless you made them recognizable to client-go
!
How?!
You have to generate your own client for the CRDs. Kubernetes already have the tools to auto-generate the clients, all you need to specify the structs
of API
. This is known as code-generation
.
Here is a blog post about code generation by STEFAN SCHIMANSKI (who is one of the top contributors to kubernetes).
Example Controller
Here is a sample-controller example given by kubernetes itself. The pkg
folder contains all the APIS
and Client
. The main.go
and controller.go
contains the sample code to watch for the CRD and do some task accordingly.
!!Update!!
It's easier now to generate client configs and controllers with kubebuilder (github repo), which is maintained by kubernetes-sigs.
Using dynamic package of client-go may be a good choice to operate CRD.
Basically watch example:
cliSet, err := dynamic.NewForConfig(&rest.Config{})
if err != nil {
return err
}
cliSet.Resource(schema.GroupVersionResource{
// replace it with your CRD's corresponding property
Group: CRDGroup,
Version: CRDVersion,
Resource: CRDResourceName,
}).Watch(context.Background(), metav1.ListOptions{})
Advanced watch, informer example:
cliSet, err := dynamic.NewForConfig(&rest.Config{})
if err != nil {
return err
}
fac := dynamicinformer.NewFilteredDynamicSharedInformerFactory(cliSet, 0, metav1.NamespaceAll, nil)
informer := fac.ForResource(schema.GroupVersionResource{
// replace it with your CRD's corresponding property
Group: CRDGroup,
Version: CRDVersion,
Resource: CRDResourceName,
}).Informer()
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
// converting the dynamic object to your CRD struct
typedObj := obj.(*unstructured.Unstructured)
bytes, _ := typedObj.MarshalJSON()
var crdObj *crd.CRD
json.Unmarshal(bytes, &crdObj)
},
UpdateFunc: func(oldObj, newObj interface{}) {
},
})
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With