From 7311822b8380c6fe805df05cdd52c5f6023cd278 Mon Sep 17 00:00:00 2001 From: Timo Behrendt Date: Mon, 18 May 2026 20:41:17 +0200 Subject: [PATCH] refactor: consolidate common controller code --- pkg/controllers/application/controller.go | 96 +++------ .../application/controller_test.go | 20 +- pkg/controllers/controller.go | 94 +++++++++ pkg/controllers/controller_test.go | 190 ++++++++++++++++++ pkg/controllers/policybinding/controller.go | 96 +++------ .../policybinding/controller_test.go | 20 +- pkg/controllers/proxyprovider/controller.go | 96 +++------ .../proxyprovider/controller_test.go | 20 +- 8 files changed, 362 insertions(+), 270 deletions(-) create mode 100644 pkg/controllers/controller.go create mode 100644 pkg/controllers/controller_test.go diff --git a/pkg/controllers/application/controller.go b/pkg/controllers/application/controller.go index 4875452..7afa0d7 100644 --- a/pkg/controllers/application/controller.go +++ b/pkg/controllers/application/controller.go @@ -29,7 +29,6 @@ import ( "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" utilruntime "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" @@ -39,6 +38,7 @@ import ( "k8s.io/klog/v2" v1alpha1 "gitea.t000-n.de/t.behrendt/authentik-kubernetes-operator/pkg/apis/application/v1alpha1" + controllers "gitea.t000-n.de/t.behrendt/authentik-kubernetes-operator/pkg/controllers" clientset "gitea.t000-n.de/t.behrendt/authentik-kubernetes-operator/pkg/generated/clientset/versioned" operatorscheme "gitea.t000-n.de/t.behrendt/authentik-kubernetes-operator/pkg/generated/clientset/versioned/scheme" informers "gitea.t000-n.de/t.behrendt/authentik-kubernetes-operator/pkg/generated/informers/externalversions/application/v1alpha1" @@ -61,16 +61,14 @@ const ( DeleteAuthentikApplicationFinalizer = "application.t000-n.de/delete-authentik-application" ) -type Controller struct { +type ApplicationController struct { kubeclientset kubernetes.Interface applicationClientset clientset.Interface authentik *authentikapi.APIClient applicationListener listers.ApplicationLister - applicationSynced cache.InformerSynced - workqueue workqueue.TypedRateLimitingInterface[cache.ObjectName] - recorder record.EventRecorder + controller *controllers.Controller } func NewController( @@ -79,7 +77,7 @@ func NewController( applicationClientset clientset.Interface, authentik *authentikapi.APIClient, applicationInformer informers.ApplicationInformer, -) *Controller { +) *ApplicationController { logger := klog.FromContext(ctx) utilruntime.Must(operatorscheme.AddToScheme(scheme.Scheme)) @@ -94,75 +92,36 @@ func NewController( &workqueue.TypedBucketRateLimiter[cache.ObjectName]{Limiter: rate.NewLimiter(rate.Limit(50), 300)}, ) - c := &Controller{ + c := &ApplicationController{ kubeclientset: kubeclientset, applicationClientset: applicationClientset, authentik: authentik, applicationListener: applicationInformer.Lister(), - applicationSynced: applicationInformer.Informer().HasSynced, - workqueue: workqueue.NewTypedRateLimitingQueue(ratelimiter), - recorder: recorder, } + c.controller = controllers.NewController( + ctx, + workqueue.NewTypedRateLimitingQueue(ratelimiter), + recorder, + applicationInformer.Informer().HasSynced, + c.syncHandler, + ) logger.Info("Setting up event handlers") applicationInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: c.enqueueApplication, + AddFunc: c.controller.Enqueue, UpdateFunc: func(_, newObj interface{}) { - c.enqueueApplication(newObj) + c.controller.Enqueue(newObj) }, }) return c } -func (c *Controller) Run(ctx context.Context, workers int) error { - defer utilruntime.HandleCrash() - defer c.workqueue.ShutDown() - logger := klog.FromContext(ctx) - - logger.Info("Starting Application controller") - - logger.Info("Waiting for informer caches to sync") - if ok := cache.WaitForCacheSync(ctx.Done(), c.applicationSynced); !ok { - return fmt.Errorf("failed to wait for caches to sync") - } - - logger.Info("Starting workers", "count", workers) - for i := 0; i < workers; i++ { - go wait.UntilWithContext(ctx, c.runWorker, time.Second) - } - - logger.Info("Started workers") - <-ctx.Done() - logger.Info("Shutting down workers") - return nil +func (c *ApplicationController) Run(ctx context.Context, workers int) error { + return c.controller.Run(ctx, workers) } -func (c *Controller) runWorker(ctx context.Context) { - for c.processNextWorkItem(ctx) { - } -} - -func (c *Controller) processNextWorkItem(ctx context.Context) bool { - objRef, shutdown := c.workqueue.Get() - logger := klog.FromContext(ctx) - if shutdown { - return false - } - defer c.workqueue.Done(objRef) - - err := c.syncHandler(ctx, objRef) - if err == nil { - c.workqueue.Forget(objRef) - logger.Info("Successfully synced", "objectName", objRef) - return true - } - utilruntime.HandleErrorWithContext(ctx, err, "Error syncing; requeuing for later retry", "objectReference", objRef) - c.workqueue.AddRateLimited(objRef) - return true -} - -func (c *Controller) syncHandler(ctx context.Context, objectRef cache.ObjectName) error { +func (c *ApplicationController) syncHandler(ctx context.Context, objectRef cache.ObjectName) error { logger := klog.LoggerWithValues(klog.FromContext(ctx), "objectRef", objectRef) app, err := c.applicationListener.Applications(objectRef.Namespace).Get(objectRef.Name) @@ -195,12 +154,12 @@ func (c *Controller) syncHandler(ctx context.Context, objectRef cache.ObjectName return c.reconcileUpdate(ctx, app) } -func (c *Controller) ensureFinalizers(ctx context.Context, app *v1alpha1.Application) error { +func (c *ApplicationController) ensureFinalizers(ctx context.Context, app *v1alpha1.Application) error { app.ObjectMeta.Finalizers = append(app.ObjectMeta.Finalizers, DeleteAuthentikApplicationFinalizer) return c.updateApplication(ctx, app) } -func (c *Controller) reconcileDelete(ctx context.Context, app *v1alpha1.Application) error { +func (c *ApplicationController) reconcileDelete(ctx context.Context, app *v1alpha1.Application) error { r, err := c.authentik.CoreApi.CoreApplicationsDestroy(ctx, app.Status.PK).Execute() if err != nil { // This handles an edge-case, where when the Application on Authentik has already been deleted, but the finalizer is still present. We just remove the finalizer and return. @@ -213,7 +172,7 @@ func (c *Controller) reconcileDelete(ctx context.Context, app *v1alpha1.Applicat return c.updateApplication(ctx, app) } -func (c *Controller) reconcileUpdate(ctx context.Context, app *v1alpha1.Application) error { +func (c *ApplicationController) reconcileUpdate(ctx context.Context, app *v1alpha1.Application) error { _, r, err := c.authentik.CoreApi.CoreApplicationsRetrieve(ctx, app.Spec.Slug).Execute() if err != nil { if r != nil && r.StatusCode == http.StatusNotFound { @@ -239,7 +198,7 @@ func (c *Controller) reconcileUpdate(ctx context.Context, app *v1alpha1.Applicat return c.updateApplicationStatus(ctx, app) } -func (c *Controller) reconcileCreate(ctx context.Context, app *v1alpha1.Application) error { +func (c *ApplicationController) reconcileCreate(ctx context.Context, app *v1alpha1.Application) error { applicationRequest := &authentikapi.ApplicationRequest{ Name: app.Spec.Name, Slug: app.Spec.Slug, @@ -254,23 +213,14 @@ func (c *Controller) reconcileCreate(ctx context.Context, app *v1alpha1.Applicat return c.updateApplicationStatus(ctx, app) } -func (c *Controller) enqueueApplication(obj interface{}) { - objectRef, err := cache.ObjectToName(obj) - if err != nil { - utilruntime.HandleError(err) - return - } - c.workqueue.Add(objectRef) -} - -func (c *Controller) updateApplicationStatus(ctx context.Context, app *v1alpha1.Application) error { +func (c *ApplicationController) updateApplicationStatus(ctx context.Context, app *v1alpha1.Application) error { appCopy := app.DeepCopy() _, err := c.applicationClientset.ApplicationV1alpha1().Applications(appCopy.Namespace).UpdateStatus(ctx, appCopy, metav1.UpdateOptions{FieldManager: FieldManager}) return err } // Update metadata, spec, etc. of the Application object. -func (c *Controller) updateApplication(ctx context.Context, app *v1alpha1.Application) error { +func (c *ApplicationController) updateApplication(ctx context.Context, app *v1alpha1.Application) error { appCopy := app.DeepCopy() _, err := c.applicationClientset.ApplicationV1alpha1().Applications(appCopy.Namespace).Update(ctx, appCopy, metav1.UpdateOptions{FieldManager: FieldManager}) if err != nil { diff --git a/pkg/controllers/application/controller_test.go b/pkg/controllers/application/controller_test.go index 4f28b63..a7cf098 100644 --- a/pkg/controllers/application/controller_test.go +++ b/pkg/controllers/application/controller_test.go @@ -220,20 +220,6 @@ func TestController_syncHandler_invalidPK(t *testing.T) { } } -func TestController_enqueueApplication(t *testing.T) { - server := newAuthentikTestServer(t, authentikTestHandlers{}) - t.Cleanup(server.Close) - - ctrl, _, cancel := newTestController(t, testApplication(), server.URL) - t.Cleanup(cancel) - - ctrl.enqueueApplication(testApplication()) - - if ctrl.workqueue.Len() != 1 { - t.Fatalf("workqueue length = %d, want 1", ctrl.workqueue.Len()) - } -} - // --- test helpers --- func testApplication() *v1alpha1.Application { @@ -254,7 +240,7 @@ func testApplication() *v1alpha1.Application { } } -func newTestController(t *testing.T, app *v1alpha1.Application, authentikURL string) (*Controller, context.Context, context.CancelFunc) { +func newTestController(t *testing.T, app *v1alpha1.Application, authentikURL string) (*ApplicationController, context.Context, context.CancelFunc) { t.Helper() ctx, cancel := context.WithCancel(context.Background()) ctrl, _, stop := newTestControllerWithContext(t, ctx, app, authentikURL) @@ -264,7 +250,7 @@ func newTestController(t *testing.T, app *v1alpha1.Application, authentikURL str } } -func newTestControllerWithContext(t *testing.T, ctx context.Context, app *v1alpha1.Application, authentikURL string) (*Controller, context.Context, func()) { +func newTestControllerWithContext(t *testing.T, ctx context.Context, app *v1alpha1.Application, authentikURL string) (*ApplicationController, context.Context, func()) { t.Helper() authentikClient := newAuthentikAPIClientForTest(t, authentikURL) @@ -382,7 +368,7 @@ func writeJSON(t *testing.T, w http.ResponseWriter, status int, body any) { } } -func getApplication(t *testing.T, ctrl *Controller, namespace, name string) *v1alpha1.Application { +func getApplication(t *testing.T, ctrl *ApplicationController, namespace, name string) *v1alpha1.Application { t.Helper() got, err := ctrl.applicationClientset.ApplicationV1alpha1().Applications(namespace).Get( diff --git a/pkg/controllers/controller.go b/pkg/controllers/controller.go new file mode 100644 index 0000000..b305f73 --- /dev/null +++ b/pkg/controllers/controller.go @@ -0,0 +1,94 @@ +package controllers + +import ( + "context" + "fmt" + "time" + + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/record" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog/v2" +) + +type SyncHandler func(ctx context.Context, objRef cache.ObjectName) error + +type Controller struct { + workqueue workqueue.TypedRateLimitingInterface[cache.ObjectName] + recorder record.EventRecorder + synced cache.InformerSynced + syncHandler SyncHandler +} + +func NewController( + ctx context.Context, + workqueue workqueue.TypedRateLimitingInterface[cache.ObjectName], + recorder record.EventRecorder, + synced cache.InformerSynced, + syncHandler SyncHandler, +) *Controller { + return &Controller{ + workqueue: workqueue, + recorder: recorder, + synced: synced, + syncHandler: syncHandler, + } +} + +func (c *Controller) Run(ctx context.Context, workers int) error { + defer utilruntime.HandleCrash() + defer c.workqueue.ShutDown() + logger := klog.FromContext(ctx) + + logger.Info("Starting PolicyBinding controller") + + logger.Info("Waiting for informer caches to sync") + if ok := cache.WaitForCacheSync(ctx.Done(), c.synced); !ok { + return fmt.Errorf("failed to wait for caches to sync") + } + + logger.Info("Starting workers", "count", workers) + for i := 0; i < workers; i++ { + go wait.UntilWithContext(ctx, c.runWorker, time.Second) + } + + logger.Info("Started workers") + <-ctx.Done() + logger.Info("Shutting down workers") + return nil +} + +func (c *Controller) runWorker(ctx context.Context) { + for c.processNextWorkItem(ctx) { + } +} + +func (c *Controller) processNextWorkItem(ctx context.Context) bool { + objRef, shutdown := c.workqueue.Get() + logger := klog.FromContext(ctx) + if shutdown { + return false + } + defer c.workqueue.Done(objRef) + + err := c.syncHandler(ctx, objRef) + if err == nil { + c.workqueue.Forget(objRef) + logger.Info("Successfully synced", "objectName", objRef) + return true + } + utilruntime.HandleErrorWithContext(ctx, err, "Error syncing; requeuing for later retry", "objectReference", objRef) + c.workqueue.AddRateLimited(objRef) + return true +} + +func (c *Controller) Enqueue(obj interface{}) { + objectRef, err := cache.ObjectToName(obj) + if err != nil { + utilruntime.HandleError(err) + return + } + c.workqueue.Add(objectRef) +} diff --git a/pkg/controllers/controller_test.go b/pkg/controllers/controller_test.go new file mode 100644 index 0000000..833763e --- /dev/null +++ b/pkg/controllers/controller_test.go @@ -0,0 +1,190 @@ +// AI generated tests and not yet reviewed. +package controllers + +import ( + "context" + "errors" + "sync/atomic" + "testing" + "time" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/record" + "k8s.io/client-go/util/workqueue" +) + +func newTestController(t *testing.T, synced cache.InformerSynced, syncHandler SyncHandler) (*Controller, workqueue.TypedRateLimitingInterface[cache.ObjectName]) { + t.Helper() + + ratelimiter := workqueue.NewTypedItemExponentialFailureRateLimiter[cache.ObjectName](time.Millisecond, time.Second) + q := workqueue.NewTypedRateLimitingQueue(ratelimiter) + t.Cleanup(q.ShutDown) + + if synced == nil { + synced = func() bool { return true } + } + + ctrl := NewController( + context.Background(), + q, + record.NewFakeRecorder(10), + synced, + syncHandler, + ) + return ctrl, q +} + +func TestController_processNextWorkItem_success(t *testing.T) { + objRef := cache.ObjectName{Namespace: "default", Name: "test"} + + var syncedRef cache.ObjectName + ctrl, q := newTestController(t, nil, func(_ context.Context, ref cache.ObjectName) error { + syncedRef = ref + return nil + }) + q.Add(objRef) + + if !ctrl.processNextWorkItem(context.Background()) { + t.Fatal("processNextWorkItem() = false, want true") + } + if syncedRef != objRef { + t.Fatalf("syncHandler object = %+v, want %+v", syncedRef, objRef) + } + if q.Len() != 0 { + t.Fatalf("queue length = %d, want 0 after successful sync", q.Len()) + } + if q.NumRequeues(objRef) != 0 { + t.Fatalf("requeues = %d, want 0 after successful sync", q.NumRequeues(objRef)) + } +} + +func TestController_processNextWorkItem_syncError(t *testing.T) { + objRef := cache.ObjectName{Namespace: "default", Name: "test"} + syncErr := errors.New("sync failed") + + ctrl, q := newTestController(t, nil, func(context.Context, cache.ObjectName) error { + return syncErr + }) + q.Add(objRef) + + if !ctrl.processNextWorkItem(context.Background()) { + t.Fatal("processNextWorkItem() = false, want true") + } + if q.NumRequeues(objRef) != 1 { + t.Fatalf("requeues = %d, want 1 after sync error", q.NumRequeues(objRef)) + } +} + +func TestController_processNextWorkItem_shutdown(t *testing.T) { + ctrl, q := newTestController(t, nil, func(context.Context, cache.ObjectName) error { + return nil + }) + q.ShutDown() + + if ctrl.processNextWorkItem(context.Background()) { + t.Fatal("processNextWorkItem() = true, want false on shutdown") + } +} + +func TestController_Enqueue(t *testing.T) { + ctrl, q := newTestController(t, nil, func(context.Context, cache.ObjectName) error { + return nil + }) + + obj := &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "test", + }, + } + ctrl.Enqueue(obj) + + if q.Len() != 1 { + t.Fatalf("queue length = %d, want 1 after Enqueue", q.Len()) + } +} + +func TestController_Enqueue_invalidObject(t *testing.T) { + ctrl, q := newTestController(t, nil, func(context.Context, cache.ObjectName) error { + return nil + }) + + ctrl.Enqueue("not-a-kubernetes-object") + + if q.Len() != 0 { + t.Fatalf("queue length = %d, want 0 for invalid object", q.Len()) + } +} + +func TestController_Run_cacheSyncFails(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ctrl, _ := newTestController(t, func() bool { return false }, func(context.Context, cache.ObjectName) error { + return nil + }) + + go func() { + time.Sleep(10 * time.Millisecond) + cancel() + }() + + err := ctrl.Run(ctx, 1) + if err == nil { + t.Fatal("Run() error = nil, want cache sync failure") + } +} + +func TestController_Run_shutsDownOnCancel(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + + ctrl, _ := newTestController(t, nil, func(context.Context, cache.ObjectName) error { + return nil + }) + + errCh := make(chan error, 1) + go func() { + errCh <- ctrl.Run(ctx, 1) + }() + + time.Sleep(50 * time.Millisecond) + cancel() + + select { + case err := <-errCh: + if err != nil { + t.Fatalf("Run() error = %v, want nil on context cancel", err) + } + case <-time.After(2 * time.Second): + t.Fatal("Run() did not return after context cancellation") + } +} + +func TestController_runWorker_processesQueuedItem(t *testing.T) { + objRef := cache.ObjectName{Namespace: "default", Name: "test"} + var calls atomic.Int32 + + ctrl, q := newTestController(t, nil, func(context.Context, cache.ObjectName) error { + calls.Add(1) + return nil + }) + q.Add(objRef) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go ctrl.runWorker(ctx) + + deadline := time.Now().Add(2 * time.Second) + for time.Now().Before(deadline) { + if calls.Load() == 1 && q.Len() == 0 { + cancel() + return + } + time.Sleep(5 * time.Millisecond) + } + cancel() + t.Fatalf("runWorker did not process queued item: calls=%d queueLen=%d", calls.Load(), q.Len()) +} diff --git a/pkg/controllers/policybinding/controller.go b/pkg/controllers/policybinding/controller.go index e0fbbc0..7ad18e6 100644 --- a/pkg/controllers/policybinding/controller.go +++ b/pkg/controllers/policybinding/controller.go @@ -29,7 +29,6 @@ import ( "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" utilruntime "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" @@ -39,6 +38,7 @@ import ( "k8s.io/klog/v2" v1alpha1 "gitea.t000-n.de/t.behrendt/authentik-kubernetes-operator/pkg/apis/policybinding/v1alpha1" + controllers "gitea.t000-n.de/t.behrendt/authentik-kubernetes-operator/pkg/controllers" clientset "gitea.t000-n.de/t.behrendt/authentik-kubernetes-operator/pkg/generated/clientset/versioned" operatorscheme "gitea.t000-n.de/t.behrendt/authentik-kubernetes-operator/pkg/generated/clientset/versioned/scheme" informers "gitea.t000-n.de/t.behrendt/authentik-kubernetes-operator/pkg/generated/informers/externalversions/policybinding/v1alpha1" @@ -61,16 +61,14 @@ const ( DeleteAuthentikPolicyBindingFinalizer = "policybinding.t000-n.de/delete-authentik-policybinding" ) -type Controller struct { +type PolicyBindingController struct { kubeclientset kubernetes.Interface policyBindingClientset clientset.Interface authentik *authentikapi.APIClient policyBindingListener listers.PolicyBindingLister - policyBindingSynced cache.InformerSynced - workqueue workqueue.TypedRateLimitingInterface[cache.ObjectName] - recorder record.EventRecorder + controller *controllers.Controller } func NewController( @@ -79,7 +77,7 @@ func NewController( policyBindingClientset clientset.Interface, authentik *authentikapi.APIClient, policyBindingInformer informers.PolicyBindingInformer, -) *Controller { +) *PolicyBindingController { logger := klog.FromContext(ctx) utilruntime.Must(operatorscheme.AddToScheme(scheme.Scheme)) @@ -94,75 +92,36 @@ func NewController( &workqueue.TypedBucketRateLimiter[cache.ObjectName]{Limiter: rate.NewLimiter(rate.Limit(50), 300)}, ) - c := &Controller{ + c := &PolicyBindingController{ kubeclientset: kubeclientset, policyBindingClientset: policyBindingClientset, authentik: authentik, policyBindingListener: policyBindingInformer.Lister(), - policyBindingSynced: policyBindingInformer.Informer().HasSynced, - workqueue: workqueue.NewTypedRateLimitingQueue(ratelimiter), - recorder: recorder, } + c.controller = controllers.NewController( + ctx, + workqueue.NewTypedRateLimitingQueue(ratelimiter), + recorder, + policyBindingInformer.Informer().HasSynced, + c.syncHandler, + ) logger.Info("Setting up event handlers") policyBindingInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: c.enqueuePolicyBinding, + AddFunc: c.controller.Enqueue, UpdateFunc: func(_, newObj interface{}) { - c.enqueuePolicyBinding(newObj) + c.controller.Enqueue(newObj) }, }) return c } -func (c *Controller) Run(ctx context.Context, workers int) error { - defer utilruntime.HandleCrash() - defer c.workqueue.ShutDown() - logger := klog.FromContext(ctx) - - logger.Info("Starting PolicyBinding controller") - - logger.Info("Waiting for informer caches to sync") - if ok := cache.WaitForCacheSync(ctx.Done(), c.policyBindingSynced); !ok { - return fmt.Errorf("failed to wait for caches to sync") - } - - logger.Info("Starting workers", "count", workers) - for i := 0; i < workers; i++ { - go wait.UntilWithContext(ctx, c.runWorker, time.Second) - } - - logger.Info("Started workers") - <-ctx.Done() - logger.Info("Shutting down workers") - return nil +func (c *PolicyBindingController) Run(ctx context.Context, workers int) error { + return c.controller.Run(ctx, workers) } -func (c *Controller) runWorker(ctx context.Context) { - for c.processNextWorkItem(ctx) { - } -} - -func (c *Controller) processNextWorkItem(ctx context.Context) bool { - objRef, shutdown := c.workqueue.Get() - logger := klog.FromContext(ctx) - if shutdown { - return false - } - defer c.workqueue.Done(objRef) - - err := c.syncHandler(ctx, objRef) - if err == nil { - c.workqueue.Forget(objRef) - logger.Info("Successfully synced", "objectName", objRef) - return true - } - utilruntime.HandleErrorWithContext(ctx, err, "Error syncing; requeuing for later retry", "objectReference", objRef) - c.workqueue.AddRateLimited(objRef) - return true -} - -func (c *Controller) syncHandler(ctx context.Context, objectRef cache.ObjectName) error { +func (c *PolicyBindingController) syncHandler(ctx context.Context, objectRef cache.ObjectName) error { logger := klog.LoggerWithValues(klog.FromContext(ctx), "objectRef", objectRef) pb, err := c.policyBindingListener.PolicyBindings(objectRef.Namespace).Get(objectRef.Name) @@ -195,12 +154,12 @@ func (c *Controller) syncHandler(ctx context.Context, objectRef cache.ObjectName return c.reconcileUpdate(ctx, pb) } -func (c *Controller) ensureFinalizers(ctx context.Context, pb *v1alpha1.PolicyBinding) error { +func (c *PolicyBindingController) ensureFinalizers(ctx context.Context, pb *v1alpha1.PolicyBinding) error { pb.ObjectMeta.Finalizers = append(pb.ObjectMeta.Finalizers, DeleteAuthentikPolicyBindingFinalizer) return c.updatePolicyBinding(ctx, pb) } -func (c *Controller) reconcileDelete(ctx context.Context, pb *v1alpha1.PolicyBinding) error { +func (c *PolicyBindingController) reconcileDelete(ctx context.Context, pb *v1alpha1.PolicyBinding) error { r, err := c.authentik.PoliciesApi.PoliciesBindingsDestroy(ctx, pb.Status.PK).Execute() if err != nil { // This handles an edge-case, where when the PolicyBinding on Authentik has already been deleted, but the finalizer is still present. We just remove the finalizer and return. @@ -213,7 +172,7 @@ func (c *Controller) reconcileDelete(ctx context.Context, pb *v1alpha1.PolicyBin return c.updatePolicyBinding(ctx, pb) } -func (c *Controller) reconcileUpdate(ctx context.Context, pb *v1alpha1.PolicyBinding) error { +func (c *PolicyBindingController) reconcileUpdate(ctx context.Context, pb *v1alpha1.PolicyBinding) error { _, r, err := c.authentik.PoliciesApi.PoliciesBindingsRetrieve(ctx, pb.Status.PK).Execute() if err != nil { if r != nil && r.StatusCode == http.StatusNotFound { @@ -248,7 +207,7 @@ func (c *Controller) reconcileUpdate(ctx context.Context, pb *v1alpha1.PolicyBin return c.updatePolicyBindingStatus(ctx, pb) } -func (c *Controller) reconcileCreate(ctx context.Context, pb *v1alpha1.PolicyBinding) error { +func (c *PolicyBindingController) reconcileCreate(ctx context.Context, pb *v1alpha1.PolicyBinding) error { policyBindingRequest := &authentikapi.PolicyBindingRequest{ Target: pb.Spec.Target, Order: pb.Spec.Order, @@ -272,23 +231,14 @@ func (c *Controller) reconcileCreate(ctx context.Context, pb *v1alpha1.PolicyBin return c.updatePolicyBindingStatus(ctx, pb) } -func (c *Controller) enqueuePolicyBinding(obj interface{}) { - objectRef, err := cache.ObjectToName(obj) - if err != nil { - utilruntime.HandleError(err) - return - } - c.workqueue.Add(objectRef) -} - -func (c *Controller) updatePolicyBindingStatus(ctx context.Context, pb *v1alpha1.PolicyBinding) error { +func (c *PolicyBindingController) updatePolicyBindingStatus(ctx context.Context, pb *v1alpha1.PolicyBinding) error { pbCopy := pb.DeepCopy() _, err := c.policyBindingClientset.PolicyBindingV1alpha1().PolicyBindings(pbCopy.Namespace).UpdateStatus(ctx, pbCopy, metav1.UpdateOptions{FieldManager: FieldManager}) return err } // Update metadata, spec, etc. of the PolicyBinding object. -func (c *Controller) updatePolicyBinding(ctx context.Context, pb *v1alpha1.PolicyBinding) error { +func (c *PolicyBindingController) updatePolicyBinding(ctx context.Context, pb *v1alpha1.PolicyBinding) error { pbCopy := pb.DeepCopy() _, err := c.policyBindingClientset.PolicyBindingV1alpha1().PolicyBindings(pbCopy.Namespace).Update(ctx, pbCopy, metav1.UpdateOptions{FieldManager: FieldManager}) return err diff --git a/pkg/controllers/policybinding/controller_test.go b/pkg/controllers/policybinding/controller_test.go index ad9abee..b0e30a4 100644 --- a/pkg/controllers/policybinding/controller_test.go +++ b/pkg/controllers/policybinding/controller_test.go @@ -198,20 +198,6 @@ func TestController_syncHandler_notFound(t *testing.T) { } } -func TestController_enqueuePolicyBinding(t *testing.T) { - server := newAuthentikTestServer(t, authentikTestHandlers{}) - t.Cleanup(server.Close) - - ctrl, _, cancel := newTestController(t, testPolicyBinding(), server.URL) - t.Cleanup(cancel) - - ctrl.enqueuePolicyBinding(testPolicyBinding()) - - if ctrl.workqueue.Len() != 1 { - t.Fatalf("workqueue length = %d, want 1", ctrl.workqueue.Len()) - } -} - // --- test helpers --- func testPolicyBinding() *v1alpha1.PolicyBinding { @@ -232,7 +218,7 @@ func testPolicyBinding() *v1alpha1.PolicyBinding { } } -func newTestController(t *testing.T, pb *v1alpha1.PolicyBinding, authentikURL string) (*Controller, context.Context, context.CancelFunc) { +func newTestController(t *testing.T, pb *v1alpha1.PolicyBinding, authentikURL string) (*PolicyBindingController, context.Context, context.CancelFunc) { t.Helper() ctx, cancel := context.WithCancel(context.Background()) ctrl, _, stop := newTestControllerWithContext(t, ctx, pb, authentikURL) @@ -242,7 +228,7 @@ func newTestController(t *testing.T, pb *v1alpha1.PolicyBinding, authentikURL st } } -func newTestControllerWithContext(t *testing.T, ctx context.Context, pb *v1alpha1.PolicyBinding, authentikURL string) (*Controller, context.Context, func()) { +func newTestControllerWithContext(t *testing.T, ctx context.Context, pb *v1alpha1.PolicyBinding, authentikURL string) (*PolicyBindingController, context.Context, func()) { t.Helper() authentikClient := newAuthentikAPIClientForTest(t, authentikURL) @@ -350,7 +336,7 @@ func writeJSON(t *testing.T, w http.ResponseWriter, status int, body any) { } } -func getPolicyBinding(t *testing.T, ctrl *Controller, namespace, name string) *v1alpha1.PolicyBinding { +func getPolicyBinding(t *testing.T, ctrl *PolicyBindingController, namespace, name string) *v1alpha1.PolicyBinding { t.Helper() got, err := ctrl.policyBindingClientset.PolicyBindingV1alpha1().PolicyBindings(namespace).Get( diff --git a/pkg/controllers/proxyprovider/controller.go b/pkg/controllers/proxyprovider/controller.go index 0a73aee..eb312ed 100644 --- a/pkg/controllers/proxyprovider/controller.go +++ b/pkg/controllers/proxyprovider/controller.go @@ -30,7 +30,6 @@ import ( "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" utilruntime "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" @@ -40,6 +39,7 @@ import ( "k8s.io/klog/v2" v1alpha1 "gitea.t000-n.de/t.behrendt/authentik-kubernetes-operator/pkg/apis/proxyprovider/v1alpha1" + controllers "gitea.t000-n.de/t.behrendt/authentik-kubernetes-operator/pkg/controllers" clientset "gitea.t000-n.de/t.behrendt/authentik-kubernetes-operator/pkg/generated/clientset/versioned" operatorscheme "gitea.t000-n.de/t.behrendt/authentik-kubernetes-operator/pkg/generated/clientset/versioned/scheme" informers "gitea.t000-n.de/t.behrendt/authentik-kubernetes-operator/pkg/generated/informers/externalversions/proxyprovider/v1alpha1" @@ -62,16 +62,14 @@ const ( DeleteAuthentikProxyProviderFinalizer = "proxyprovider.t000-n.de/delete-authentik-proxyprovider" ) -type Controller struct { +type ProxyProviderController struct { kubeclientset kubernetes.Interface proxyProviderClientset clientset.Interface authentik *authentikapi.APIClient proxyLister listers.ProxyProviderLister - proxySynced cache.InformerSynced - workqueue workqueue.TypedRateLimitingInterface[cache.ObjectName] - recorder record.EventRecorder + controller *controllers.Controller } func NewController( @@ -80,7 +78,7 @@ func NewController( proxyProviderClientset clientset.Interface, authentik *authentikapi.APIClient, proxyInformer informers.ProxyProviderInformer, -) *Controller { +) *ProxyProviderController { logger := klog.FromContext(ctx) utilruntime.Must(operatorscheme.AddToScheme(scheme.Scheme)) @@ -95,75 +93,36 @@ func NewController( &workqueue.TypedBucketRateLimiter[cache.ObjectName]{Limiter: rate.NewLimiter(rate.Limit(50), 300)}, ) - c := &Controller{ + c := &ProxyProviderController{ kubeclientset: kubeclientset, proxyProviderClientset: proxyProviderClientset, authentik: authentik, proxyLister: proxyInformer.Lister(), - proxySynced: proxyInformer.Informer().HasSynced, - workqueue: workqueue.NewTypedRateLimitingQueue(ratelimiter), - recorder: recorder, } + c.controller = controllers.NewController( + ctx, + workqueue.NewTypedRateLimitingQueue(ratelimiter), + recorder, + proxyInformer.Informer().HasSynced, + c.syncHandler, + ) logger.Info("Setting up event handlers") proxyInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: c.enqueueProxyProvider, + AddFunc: c.controller.Enqueue, UpdateFunc: func(_, newObj interface{}) { - c.enqueueProxyProvider(newObj) + c.controller.Enqueue(newObj) }, }) return c } -func (c *Controller) Run(ctx context.Context, workers int) error { - defer utilruntime.HandleCrash() - defer c.workqueue.ShutDown() - logger := klog.FromContext(ctx) - - logger.Info("Starting ProxyProvider controller") - - logger.Info("Waiting for informer caches to sync") - if ok := cache.WaitForCacheSync(ctx.Done(), c.proxySynced); !ok { - return fmt.Errorf("failed to wait for caches to sync") - } - - logger.Info("Starting workers", "count", workers) - for i := 0; i < workers; i++ { - go wait.UntilWithContext(ctx, c.runWorker, time.Second) - } - - logger.Info("Started workers") - <-ctx.Done() - logger.Info("Shutting down workers") - return nil +func (c *ProxyProviderController) Run(ctx context.Context, workers int) error { + return c.controller.Run(ctx, workers) } -func (c *Controller) runWorker(ctx context.Context) { - for c.processNextWorkItem(ctx) { - } -} - -func (c *Controller) processNextWorkItem(ctx context.Context) bool { - objRef, shutdown := c.workqueue.Get() - logger := klog.FromContext(ctx) - if shutdown { - return false - } - defer c.workqueue.Done(objRef) - - err := c.syncHandler(ctx, objRef) - if err == nil { - c.workqueue.Forget(objRef) - logger.Info("Successfully synced", "objectName", objRef) - return true - } - utilruntime.HandleErrorWithContext(ctx, err, "Error syncing; requeuing for later retry", "objectReference", objRef) - c.workqueue.AddRateLimited(objRef) - return true -} - -func (c *Controller) syncHandler(ctx context.Context, objectRef cache.ObjectName) error { +func (c *ProxyProviderController) syncHandler(ctx context.Context, objectRef cache.ObjectName) error { logger := klog.LoggerWithValues(klog.FromContext(ctx), "objectRef", objectRef) pp, err := c.proxyLister.ProxyProviders(objectRef.Namespace).Get(objectRef.Name) @@ -196,12 +155,12 @@ func (c *Controller) syncHandler(ctx context.Context, objectRef cache.ObjectName return c.reconcileUpdate(ctx, pp) } -func (c *Controller) ensureFinalizers(ctx context.Context, pp *v1alpha1.ProxyProvider) error { +func (c *ProxyProviderController) ensureFinalizers(ctx context.Context, pp *v1alpha1.ProxyProvider) error { pp.ObjectMeta.Finalizers = append(pp.ObjectMeta.Finalizers, DeleteAuthentikProxyProviderFinalizer) return c.updateProxyProvider(ctx, pp) } -func (c *Controller) reconcileDelete(ctx context.Context, pp *v1alpha1.ProxyProvider) error { +func (c *ProxyProviderController) reconcileDelete(ctx context.Context, pp *v1alpha1.ProxyProvider) error { pk, err := strconv.ParseInt(pp.Status.PK, 10, 32) if err != nil { return fmt.Errorf("error parsing PK: %v", err) @@ -219,7 +178,7 @@ func (c *Controller) reconcileDelete(ctx context.Context, pp *v1alpha1.ProxyProv return c.updateProxyProvider(ctx, pp) } -func (c *Controller) reconcileUpdate(ctx context.Context, pp *v1alpha1.ProxyProvider) error { +func (c *ProxyProviderController) reconcileUpdate(ctx context.Context, pp *v1alpha1.ProxyProvider) error { // We retrieve the existing PP from the API by slug. pk, err := strconv.ParseInt(pp.Status.PK, 10, 32) if err != nil { @@ -253,7 +212,7 @@ func (c *Controller) reconcileUpdate(ctx context.Context, pp *v1alpha1.ProxyProv return c.updateProxyProviderStatus(ctx, pp) } -func (c *Controller) reconcileCreate(ctx context.Context, pp *v1alpha1.ProxyProvider) error { +func (c *ProxyProviderController) reconcileCreate(ctx context.Context, pp *v1alpha1.ProxyProvider) error { proxyProviderRequest := &authentikapi.ProxyProviderRequest{ Name: pp.Spec.Name, AuthorizationFlow: pp.Spec.AuthorizationFlow, @@ -270,23 +229,14 @@ func (c *Controller) reconcileCreate(ctx context.Context, pp *v1alpha1.ProxyProv return c.updateProxyProviderStatus(ctx, pp) } -func (c *Controller) enqueueProxyProvider(obj interface{}) { - objectRef, err := cache.ObjectToName(obj) - if err != nil { - utilruntime.HandleError(err) - return - } - c.workqueue.Add(objectRef) -} - -func (c *Controller) updateProxyProviderStatus(ctx context.Context, pp *v1alpha1.ProxyProvider) error { +func (c *ProxyProviderController) updateProxyProviderStatus(ctx context.Context, pp *v1alpha1.ProxyProvider) error { ppCopy := pp.DeepCopy() _, err := c.proxyProviderClientset.ProxyproviderV1alpha1().ProxyProviders(ppCopy.Namespace).UpdateStatus(ctx, ppCopy, metav1.UpdateOptions{FieldManager: FieldManager}) return err } // Update metadata, spec, etc. of the ProxyProvider object. -func (c *Controller) updateProxyProvider(ctx context.Context, pp *v1alpha1.ProxyProvider) error { +func (c *ProxyProviderController) updateProxyProvider(ctx context.Context, pp *v1alpha1.ProxyProvider) error { ppCopy := pp.DeepCopy() _, err := c.proxyProviderClientset.ProxyproviderV1alpha1().ProxyProviders(ppCopy.Namespace).Update(ctx, ppCopy, metav1.UpdateOptions{FieldManager: FieldManager}) if err != nil { diff --git a/pkg/controllers/proxyprovider/controller_test.go b/pkg/controllers/proxyprovider/controller_test.go index 43d308c..0c37f47 100644 --- a/pkg/controllers/proxyprovider/controller_test.go +++ b/pkg/controllers/proxyprovider/controller_test.go @@ -218,20 +218,6 @@ func TestController_syncHandler_invalidPK(t *testing.T) { } } -func TestController_enqueueProxyProvider(t *testing.T) { - server := newAuthentikTestServer(t, authentikTestHandlers{}) - t.Cleanup(server.Close) - - ctrl, _, cancel := newTestController(t, testProxyProvider(), server.URL) - t.Cleanup(cancel) - - ctrl.enqueueProxyProvider(testProxyProvider()) - - if ctrl.workqueue.Len() != 1 { - t.Fatalf("workqueue length = %d, want 1", ctrl.workqueue.Len()) - } -} - // --- test helpers --- func testProxyProvider() *v1alpha1.ProxyProvider { @@ -253,7 +239,7 @@ func testProxyProvider() *v1alpha1.ProxyProvider { } } -func newTestController(t *testing.T, pp *v1alpha1.ProxyProvider, authentikURL string) (*Controller, context.Context, context.CancelFunc) { +func newTestController(t *testing.T, pp *v1alpha1.ProxyProvider, authentikURL string) (*ProxyProviderController, context.Context, context.CancelFunc) { t.Helper() ctx, cancel := context.WithCancel(context.Background()) ctrl, _, stop := newTestControllerWithContext(t, ctx, pp, authentikURL) @@ -263,7 +249,7 @@ func newTestController(t *testing.T, pp *v1alpha1.ProxyProvider, authentikURL st } } -func newTestControllerWithContext(t *testing.T, ctx context.Context, pp *v1alpha1.ProxyProvider, authentikURL string) (*Controller, context.Context, func()) { +func newTestControllerWithContext(t *testing.T, ctx context.Context, pp *v1alpha1.ProxyProvider, authentikURL string) (*ProxyProviderController, context.Context, func()) { t.Helper() authentikClient := newAuthentikAPIClientForTest(t, authentikURL) @@ -372,7 +358,7 @@ func writeJSON(t *testing.T, w http.ResponseWriter, status int, body any) { } } -func getProxyProvider(t *testing.T, ctrl *Controller, namespace, name string) *v1alpha1.ProxyProvider { +func getProxyProvider(t *testing.T, ctrl *ProxyProviderController, namespace, name string) *v1alpha1.ProxyProvider { t.Helper() got, err := ctrl.proxyProviderClientset.ProxyproviderV1alpha1().ProxyProviders(namespace).Get(