package main import ( "encoding/json" "flag" "log" "os" "os/signal" "syscall" "time" "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/strategicpatch" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" ) const ( defaultAnnotation = "initializer.kubernetes.io/abc" defaultInitializerName = "abc.initializer.kubernetes.io" ) var ( annotation string initializerName string ) func main() { flag.StringVar(&annotation, "annotation", defaultAnnotation, "The annotation to trigger initialization") flag.StringVar(&initializerName, "initializer-name", defaultInitializerName, "The initializer name") flag.Parse() log.Println("Starting the Kubernetes initializer...") log.Printf("Initializer name set to: %s", initializerName) clusterConfig, err := rest.InClusterConfig() if err != nil { log.Fatal(err.Error()) } clientset, err := kubernetes.NewForConfig(clusterConfig) if err != nil { log.Fatal(err) } // Watch uninitialized Deployments in all namespaces. restClient := clientset.AppsV1().RESTClient() watchlist := cache.NewListWatchFromClient(restClient, "deployments", corev1.NamespaceAll, fields.Everything()) // Wrap the returned watchlist to workaround the inability to include // the `IncludeUninitialized` list option when setting up watch clients. includeUninitializedWatchlist := &cache.ListWatch{ ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { options.IncludeUninitialized = true return watchlist.List(options) }, WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { options.IncludeUninitialized = true return watchlist.Watch(options) }, } resyncPeriod := 30 * time.Second _, controller := cache.NewInformer(includeUninitializedWatchlist, &v1.Deployment{}, resyncPeriod, cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { err := initializeDeployment(obj.(*v1.Deployment), clientset) if err != nil { log.Println(err) } }, }, ) stop := make(chan struct{}) go controller.Run(stop) signalChan := make(chan os.Signal, 1) signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM) <-signalChan log.Println("Shutdown signal received, exiting...") close(stop) } func initializeDeployment(deployment *v1.Deployment, clientset *kubernetes.Clientset) error { if deployment.ObjectMeta.GetInitializers() != nil { pendingInitializers := deployment.ObjectMeta.GetInitializers().Pending if len(pendingInitializers) == 0 { return nil } if initializerName == pendingInitializers[0].Name { log.Printf("Initializing deployment: %s", deployment.Name) initializedDeployment := deployment.DeepCopy() // Remove self from the list of pending Initializers while preserving ordering. if len(pendingInitializers) == 1 { initializedDeployment.ObjectMeta.Initializers = nil } else { initializedDeployment.ObjectMeta.Initializers.Pending = append(pendingInitializers[:0], pendingInitializers[1:]...) } // Modify the Deployment here initializedDeployment.Annotations["k8s-meetup"] = "hosted-by-dockercloud" oldData, err := json.Marshal(deployment) if err != nil { return err } newData, err := json.Marshal(initializedDeployment) if err != nil { return err } patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, v1.Deployment{}) if err != nil { return err } _, err = clientset.AppsV1beta1().Deployments(deployment.Namespace).Patch(deployment.Name, types.StrategicMergePatchType, patchBytes) if err != nil { return err } } } return nil }