/* Copyright 2017 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package main import ( "context" "fmt" "strconv" "time" "golang.org/x/time/rate" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" appsinformers "k8s.io/client-go/informers/apps/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" appslisters "k8s.io/client-go/listers/apps/v1" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" v1 "gitea.t000-n.de/t.behrendt/authentik-kubernetes-operator/pkg/apis/proxyprovider/v1" clientset "gitea.t000-n.de/t.behrendt/authentik-kubernetes-operator/pkg/generated/clientset/versioned" operatorscheme "gitea.t000-n.de/t.behrendt/authentik-kubernetes-operator/pkg/generated/clientset/versioned/scheme" informers "gitea.t000-n.de/t.behrendt/authentik-kubernetes-operator/pkg/generated/informers/externalversions/proxyprovider/v1" listers "gitea.t000-n.de/t.behrendt/authentik-kubernetes-operator/pkg/generated/listers/proxyprovider/v1" authentikapi "goauthentik.io/api/v3" ) const controllerAgentName = "proxy-provider-controller" const ( SuccessSynced = "Synced" ErrResourceExists = "ErrResourceExists" MessageResourceExists = "Resource %q already exists and is not managed by ProxyProvider" MessageResourceSynced = "ProxyProvider synced successfully" FieldManager = controllerAgentName ) type Controller struct { kubeclientset kubernetes.Interface proxyProviderClientset clientset.Interface authentik *authentikapi.APIClient deploymentsLister appslisters.DeploymentLister deploymentsSynced cache.InformerSynced proxyLister listers.ProxyProviderLister proxySynced cache.InformerSynced workqueue workqueue.TypedRateLimitingInterface[cache.ObjectName] recorder record.EventRecorder } func NewController( ctx context.Context, kubeclientset kubernetes.Interface, proxyProviderClientset clientset.Interface, authentik *authentikapi.APIClient, deploymentInformer appsinformers.DeploymentInformer, proxyInformer informers.ProxyProviderInformer, ) *Controller { logger := klog.FromContext(ctx) utilruntime.Must(operatorscheme.AddToScheme(scheme.Scheme)) logger.V(4).Info("Creating event broadcaster") eventBroadcaster := record.NewBroadcaster(record.WithContext(ctx)) eventBroadcaster.StartStructuredLogging(0) eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeclientset.CoreV1().Events("")}) recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName}) ratelimiter := workqueue.NewTypedMaxOfRateLimiter( workqueue.NewTypedItemExponentialFailureRateLimiter[cache.ObjectName](5*time.Millisecond, 1000*time.Second), &workqueue.TypedBucketRateLimiter[cache.ObjectName]{Limiter: rate.NewLimiter(rate.Limit(50), 300)}, ) c := &Controller{ kubeclientset: kubeclientset, proxyProviderClientset: proxyProviderClientset, authentik: authentik, deploymentsLister: deploymentInformer.Lister(), deploymentsSynced: deploymentInformer.Informer().HasSynced, proxyLister: proxyInformer.Lister(), proxySynced: proxyInformer.Informer().HasSynced, workqueue: workqueue.NewTypedRateLimitingQueue(ratelimiter), recorder: recorder, } logger.Info("Setting up event handlers") proxyInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: c.enqueueProxyProvider, UpdateFunc: func(_, newObj interface{}) { c.enqueueProxyProvider(newObj) }, }) deploymentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: c.handleObject, UpdateFunc: func(old, new interface{}) { newDepl := new.(*appsv1.Deployment) oldDepl := old.(*appsv1.Deployment) if newDepl.ResourceVersion == oldDepl.ResourceVersion { return } c.handleObject(new) }, DeleteFunc: c.handleObject, }) return c } func (c *Controller) Run(ctx context.Context, workers int) error { defer utilruntime.HandleCrash() defer c.workqueue.ShutDown() logger := klog.FromContext(ctx) logger.Info("Starting ProxyProvider controller") logger.Info("Waiting for informer caches to sync") if ok := cache.WaitForCacheSync(ctx.Done(), c.deploymentsSynced, c.proxySynced); !ok { return fmt.Errorf("failed to wait for caches to sync") } logger.Info("Starting workers", "count", workers) for i := 0; i < workers; i++ { go wait.UntilWithContext(ctx, c.runWorker, time.Second) } logger.Info("Started workers") <-ctx.Done() logger.Info("Shutting down workers") return nil } func (c *Controller) runWorker(ctx context.Context) { for c.processNextWorkItem(ctx) { } } func (c *Controller) processNextWorkItem(ctx context.Context) bool { objRef, shutdown := c.workqueue.Get() logger := klog.FromContext(ctx) if shutdown { return false } defer c.workqueue.Done(objRef) err := c.syncHandler(ctx, objRef) if err == nil { c.workqueue.Forget(objRef) logger.Info("Successfully synced", "objectName", objRef) return true } utilruntime.HandleErrorWithContext(ctx, err, "Error syncing; requeuing for later retry", "objectReference", objRef) c.workqueue.AddRateLimited(objRef) return true } func (c *Controller) syncHandler(ctx context.Context, objectRef cache.ObjectName) error { logger := klog.LoggerWithValues(klog.FromContext(ctx), "objectRef", objectRef) pp, err := c.proxyLister.ProxyProviders(objectRef.Namespace).Get(objectRef.Name) if err != nil { if errors.IsNotFound(err) { logger.V(4).Info("ProxyProvider no longer exists") return nil } return err } logger.V(4).Info("sync ProxyProvider", "name", pp.Name) if pp.Status.PK != "" { // We retrieve the existing PP from the API by slug. pk, err := strconv.ParseInt(pp.Status.PK, 10, 32) if err != nil { return fmt.Errorf("error parsing PK: %v", err) } _, _, err = c.authentik.ProvidersApi.ProvidersAllRetrieve(ctx, int32(pk)).Execute() if err != nil { return fmt.Errorf("error retrieving existing ProxyProvider: %v", err) } // We update the existing PP with the new spec. proxyProviderRequest := &authentikapi.ProxyProviderRequest{ Name: pp.Spec.Name, AuthorizationFlow: pp.Spec.AuthorizationFlow, InvalidationFlow: pp.Spec.InvalidationFlow, ExternalHost: pp.Spec.ExternalHost, Mode: authentikapi.PROXYMODE_FORWARD_SINGLE.Ptr(), } resp, r, err := c.authentik.ProvidersApi.ProvidersProxyUpdate(ctx, int32(pk)).ProxyProviderRequest(*proxyProviderRequest).Execute() if err != nil { return fmt.Errorf("error when calling `ProvidersAPI.ProvidersProxyUpdate`: %w with response %v", err, r) } pp.Status.PK = strconv.Itoa(int(resp.Pk)) err = c.updateProxyProviderStatus(ctx, pp) if err != nil { return fmt.Errorf("error updating ProxyProvider status: %v", err) } } else { proxyProviderRequest := &authentikapi.ProxyProviderRequest{ Name: pp.Spec.Name, AuthorizationFlow: pp.Spec.AuthorizationFlow, InvalidationFlow: pp.Spec.InvalidationFlow, ExternalHost: pp.Spec.ExternalHost, Mode: authentikapi.PROXYMODE_FORWARD_SINGLE.Ptr(), } resp, r, err := c.authentik.ProvidersApi.ProvidersProxyCreate(ctx).ProxyProviderRequest(*proxyProviderRequest).Execute() if err != nil { return fmt.Errorf("error when calling `ProvidersAPI.ProvidersProxyCreate`: %w with response %v", err, r) } pp.Status.PK = strconv.Itoa(int(resp.Pk)) err = c.updateProxyProviderStatus(ctx, pp) if err != nil { return fmt.Errorf("error updating ProxyProvider status: %v", err) } } return nil } func (c *Controller) enqueueProxyProvider(obj interface{}) { objectRef, err := cache.ObjectToName(obj) if err != nil { utilruntime.HandleError(err) return } c.workqueue.Add(objectRef) } func (c *Controller) handleObject(obj interface{}) { // Optional: resolve Deployment owners back to ProxyProvider and enqueue. _, ok := obj.(metav1.Object) if !ok { tombstone, ok := obj.(cache.DeletedFinalStateUnknown) if !ok { utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %#v", obj)) return } _, ok = tombstone.Obj.(metav1.Object) if !ok { utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a metav1.Object %#v", obj)) return } } } func (c *Controller) updateProxyProviderStatus(ctx context.Context, pp *v1.ProxyProvider) error { ppCopy := pp.DeepCopy() ppCopy.Status.PK = pp.Status.PK _, err := c.proxyProviderClientset.ProxyproviderV1().ProxyProviders(pp.Namespace).UpdateStatus(ctx, ppCopy, metav1.UpdateOptions{FieldManager: FieldManager}) return err }