Compare commits

..

1 Commits

Author SHA1 Message Date
t.behrendt 6d9496185e feat: vertical slice proxy provider (#2)
CD / Create tag (push) Successful in 25s
CD / Build and push (amd64) (push) Successful in 1m31s
CD / Create manifest (push) Successful in 7s
Reviewed-on: #2
Co-authored-by: Timo Behrendt <t.behrendt@t00n.de>
Co-committed-by: Timo Behrendt <t.behrendt@t00n.de>
2026-05-17 21:02:00 +02:00
13 changed files with 284 additions and 796 deletions
-40
View File
@@ -11,8 +11,6 @@ Manual changes to the resources in Authentik will be overwritten by the operator
| Custom Resource | CRD File | Short Name |
| --------------- | ---------------------------------------------------------- | ---------- |
| ProxyProvider | [`proxyProvider.yaml`](`artifacts/crd/proxyProvider.yaml`) | pp |
| Application | [`application.yaml`](`artifacts/crd/application.yaml`) | app |
| PolicyBinding | [`policyBinding.yaml`](`artifacts/crd/policyBinding.yaml`) | pb |
### ProxyProvider
@@ -38,44 +36,6 @@ spec:
The ProxyProvider will be created in Authentik, but will not be assigned to an outpost or an application (Resources are TBD).
### Application
The Application only supports a reduced set of fields.
Example [`application.yaml`](`artifacts/examples/application.yaml`):
```yaml
apiVersion: application.t000-n.de/v1alpha1
kind: Application
metadata:
name: application-example
spec:
name: Application Example
slug: application-example
# The ID of the provider, which can be retrieved from e.g. the ProxyPRovider via "kubectl get pp proxy-provider-example -o jsonpath='{.status.pk}'"
provider: 105
```
### PolicyBinding
The PolicyBinding is used to bind a policy to a target, e.g. allow a group or user to access an application.
The PolicyBinding only supports a reduced set of fields.
Example [`policyBinding.yaml`](`artifacts/examples/policyBinding.yaml`):
```yaml
apiVersion: policybinding.t000-n.de/v1alpha1
kind: PolicyBinding
metadata:
name: policy-binding-example
spec:
group: 14ab813f-a7f9-481b-9b08-781953ae9ebf
# The ID of the target, e.g. an Application, which can be retrieved from e.g. the Application via "kubectl get app application-example -o jsonpath='{.status.pk}'"
target: 8dd85627-9c48-49c2-8afc-d73dd122ffc2
# The order in which the policy is applied. This needs to be unique for each PolicyBinding.
order: 1
```
## Versioning
As soon as the operator covers an entire use case, the version will be raised to v1 and follow default versioning rules. Before that, the version will be v1alpha1.
-8
View File
@@ -1,8 +0,0 @@
apiVersion: application.t000-n.de/v1alpha1
kind: Application
metadata:
name: application-example
spec:
name: Application Example
slug: application-example
provider: 105
-8
View File
@@ -1,8 +0,0 @@
apiVersion: policybinding.t000-n.de/v1alpha1
kind: PolicyBinding
metadata:
name: policy-binding-example
spec:
group: 14ab813f-a7f9-481b-9b08-781953ae9ebf
target: 8dd85627-9c48-49c2-8afc-d73dd122ffc2
order: 1
+1 -1
View File
@@ -35,7 +35,7 @@ type PolicyBinding struct {
type PolicyBindingSpec struct {
Policy string `json:"policy,omitempty"`
Group string `json:"group,omitempty"`
User int32 `json:"user,omitempty"`
User int32 `json:"user"`
Target string `json:"target"`
Order int32 `json:"order"`
}
+83 -27
View File
@@ -21,6 +21,7 @@ import (
"fmt"
"net/http"
"slices"
"strconv"
"time"
"golang.org/x/time/rate"
@@ -29,6 +30,7 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
@@ -38,7 +40,6 @@ import (
"k8s.io/klog/v2"
v1alpha1 "gitea.t000-n.de/t.behrendt/authentik-kubernetes-operator/pkg/apis/application/v1alpha1"
controllers "gitea.t000-n.de/t.behrendt/authentik-kubernetes-operator/pkg/controllers"
clientset "gitea.t000-n.de/t.behrendt/authentik-kubernetes-operator/pkg/generated/clientset/versioned"
operatorscheme "gitea.t000-n.de/t.behrendt/authentik-kubernetes-operator/pkg/generated/clientset/versioned/scheme"
informers "gitea.t000-n.de/t.behrendt/authentik-kubernetes-operator/pkg/generated/informers/externalversions/application/v1alpha1"
@@ -61,14 +62,16 @@ const (
DeleteAuthentikApplicationFinalizer = "application.t000-n.de/delete-authentik-application"
)
type ApplicationController struct {
type Controller struct {
kubeclientset kubernetes.Interface
applicationClientset clientset.Interface
authentik *authentikapi.APIClient
applicationListener listers.ApplicationLister
applicationSynced cache.InformerSynced
controller *controllers.Controller
workqueue workqueue.TypedRateLimitingInterface[cache.ObjectName]
recorder record.EventRecorder
}
func NewController(
@@ -77,7 +80,7 @@ func NewController(
applicationClientset clientset.Interface,
authentik *authentikapi.APIClient,
applicationInformer informers.ApplicationInformer,
) *ApplicationController {
) *Controller {
logger := klog.FromContext(ctx)
utilruntime.Must(operatorscheme.AddToScheme(scheme.Scheme))
@@ -92,36 +95,75 @@ func NewController(
&workqueue.TypedBucketRateLimiter[cache.ObjectName]{Limiter: rate.NewLimiter(rate.Limit(50), 300)},
)
c := &ApplicationController{
c := &Controller{
kubeclientset: kubeclientset,
applicationClientset: applicationClientset,
authentik: authentik,
applicationListener: applicationInformer.Lister(),
applicationSynced: applicationInformer.Informer().HasSynced,
workqueue: workqueue.NewTypedRateLimitingQueue(ratelimiter),
recorder: recorder,
}
c.controller = controllers.NewController(
ctx,
workqueue.NewTypedRateLimitingQueue(ratelimiter),
recorder,
applicationInformer.Informer().HasSynced,
c.syncHandler,
)
logger.Info("Setting up event handlers")
applicationInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.controller.Enqueue,
AddFunc: c.enqueueApplication,
UpdateFunc: func(_, newObj interface{}) {
c.controller.Enqueue(newObj)
c.enqueueApplication(newObj)
},
})
return c
}
func (c *ApplicationController) Run(ctx context.Context, workers int) error {
return c.controller.Run(ctx, workers)
func (c *Controller) Run(ctx context.Context, workers int) error {
defer utilruntime.HandleCrash()
defer c.workqueue.ShutDown()
logger := klog.FromContext(ctx)
logger.Info("Starting Application controller")
logger.Info("Waiting for informer caches to sync")
if ok := cache.WaitForCacheSync(ctx.Done(), c.applicationSynced); !ok {
return fmt.Errorf("failed to wait for caches to sync")
}
logger.Info("Starting workers", "count", workers)
for i := 0; i < workers; i++ {
go wait.UntilWithContext(ctx, c.runWorker, time.Second)
}
logger.Info("Started workers")
<-ctx.Done()
logger.Info("Shutting down workers")
return nil
}
func (c *ApplicationController) syncHandler(ctx context.Context, objectRef cache.ObjectName) error {
func (c *Controller) runWorker(ctx context.Context) {
for c.processNextWorkItem(ctx) {
}
}
func (c *Controller) processNextWorkItem(ctx context.Context) bool {
objRef, shutdown := c.workqueue.Get()
logger := klog.FromContext(ctx)
if shutdown {
return false
}
defer c.workqueue.Done(objRef)
err := c.syncHandler(ctx, objRef)
if err == nil {
c.workqueue.Forget(objRef)
logger.Info("Successfully synced", "objectName", objRef)
return true
}
utilruntime.HandleErrorWithContext(ctx, err, "Error syncing; requeuing for later retry", "objectReference", objRef)
c.workqueue.AddRateLimited(objRef)
return true
}
func (c *Controller) syncHandler(ctx context.Context, objectRef cache.ObjectName) error {
logger := klog.LoggerWithValues(klog.FromContext(ctx), "objectRef", objectRef)
app, err := c.applicationListener.Applications(objectRef.Namespace).Get(objectRef.Name)
@@ -154,17 +196,22 @@ func (c *ApplicationController) syncHandler(ctx context.Context, objectRef cache
return c.reconcileUpdate(ctx, app)
}
func (c *ApplicationController) ensureFinalizers(ctx context.Context, app *v1alpha1.Application) error {
func (c *Controller) ensureFinalizers(ctx context.Context, app *v1alpha1.Application) error {
app.ObjectMeta.Finalizers = append(app.ObjectMeta.Finalizers, DeleteAuthentikApplicationFinalizer)
return c.updateApplication(ctx, app)
}
func (c *ApplicationController) reconcileDelete(ctx context.Context, app *v1alpha1.Application) error {
r, err := c.authentik.CoreApi.CoreApplicationsDestroy(ctx, app.Status.PK).Execute()
func (c *Controller) reconcileDelete(ctx context.Context, app *v1alpha1.Application) error {
pk, err := strconv.ParseInt(app.Status.PK, 10, 32)
if err != nil {
// This handles an edge-case, where when the Application on Authentik has already been deleted, but the finalizer is still present. We just remove the finalizer and return.
return fmt.Errorf("error parsing PK: %v", err)
}
r, err := c.authentik.ProvidersApi.ProvidersProxyDestroy(ctx, int32(pk)).Execute()
if err != nil {
// This handles an edge-case, where when the ProxyProvider on Authentik has already been deleted, but the finalizer is still present. We just remove the finalizer and return.
if r != nil && r.StatusCode != http.StatusNotFound {
return fmt.Errorf("error when calling `CoreAPI.CoreApplicationsDestroy`: %w with response %v", err, r)
return fmt.Errorf("error when calling `ProvidersAPI.ProvidersProxyDestroy`: %w with response %v", err, r)
}
}
@@ -172,7 +219,7 @@ func (c *ApplicationController) reconcileDelete(ctx context.Context, app *v1alph
return c.updateApplication(ctx, app)
}
func (c *ApplicationController) reconcileUpdate(ctx context.Context, app *v1alpha1.Application) error {
func (c *Controller) reconcileUpdate(ctx context.Context, app *v1alpha1.Application) error {
_, r, err := c.authentik.CoreApi.CoreApplicationsRetrieve(ctx, app.Spec.Slug).Execute()
if err != nil {
if r != nil && r.StatusCode == http.StatusNotFound {
@@ -191,14 +238,14 @@ func (c *ApplicationController) reconcileUpdate(ctx context.Context, app *v1alph
}
resp, r, err := c.authentik.CoreApi.CoreApplicationsPartialUpdate(ctx, app.Spec.Slug).PatchedApplicationRequest(*patchedApplicationRequest).Execute()
if err != nil {
return fmt.Errorf("error when calling `CoreAPI.CoreApplicationsPartialUpdate`: %w with response %v", err, r)
return fmt.Errorf("error when calling `ProvidersAPI.ProvidersProxyPartialUpdate`: %w with response %v", err, r)
}
app.Status.PK = resp.Pk
return c.updateApplicationStatus(ctx, app)
}
func (c *ApplicationController) reconcileCreate(ctx context.Context, app *v1alpha1.Application) error {
func (c *Controller) reconcileCreate(ctx context.Context, app *v1alpha1.Application) error {
applicationRequest := &authentikapi.ApplicationRequest{
Name: app.Spec.Name,
Slug: app.Spec.Slug,
@@ -213,14 +260,23 @@ func (c *ApplicationController) reconcileCreate(ctx context.Context, app *v1alph
return c.updateApplicationStatus(ctx, app)
}
func (c *ApplicationController) updateApplicationStatus(ctx context.Context, app *v1alpha1.Application) error {
func (c *Controller) enqueueApplication(obj interface{}) {
objectRef, err := cache.ObjectToName(obj)
if err != nil {
utilruntime.HandleError(err)
return
}
c.workqueue.Add(objectRef)
}
func (c *Controller) updateApplicationStatus(ctx context.Context, app *v1alpha1.Application) error {
appCopy := app.DeepCopy()
_, err := c.applicationClientset.ApplicationV1alpha1().Applications(appCopy.Namespace).UpdateStatus(ctx, appCopy, metav1.UpdateOptions{FieldManager: FieldManager})
return err
}
// Update metadata, spec, etc. of the Application object.
func (c *ApplicationController) updateApplication(ctx context.Context, app *v1alpha1.Application) error {
func (c *Controller) updateApplication(ctx context.Context, app *v1alpha1.Application) error {
appCopy := app.DeepCopy()
_, err := c.applicationClientset.ApplicationV1alpha1().Applications(appCopy.Namespace).Update(ctx, appCopy, metav1.UpdateOptions{FieldManager: FieldManager})
if err != nil {
+17 -3
View File
@@ -220,6 +220,20 @@ func TestController_syncHandler_invalidPK(t *testing.T) {
}
}
func TestController_enqueueApplication(t *testing.T) {
server := newAuthentikTestServer(t, authentikTestHandlers{})
t.Cleanup(server.Close)
ctrl, _, cancel := newTestController(t, testApplication(), server.URL)
t.Cleanup(cancel)
ctrl.enqueueApplication(testApplication())
if ctrl.workqueue.Len() != 1 {
t.Fatalf("workqueue length = %d, want 1", ctrl.workqueue.Len())
}
}
// --- test helpers ---
func testApplication() *v1alpha1.Application {
@@ -240,7 +254,7 @@ func testApplication() *v1alpha1.Application {
}
}
func newTestController(t *testing.T, app *v1alpha1.Application, authentikURL string) (*ApplicationController, context.Context, context.CancelFunc) {
func newTestController(t *testing.T, app *v1alpha1.Application, authentikURL string) (*Controller, context.Context, context.CancelFunc) {
t.Helper()
ctx, cancel := context.WithCancel(context.Background())
ctrl, _, stop := newTestControllerWithContext(t, ctx, app, authentikURL)
@@ -250,7 +264,7 @@ func newTestController(t *testing.T, app *v1alpha1.Application, authentikURL str
}
}
func newTestControllerWithContext(t *testing.T, ctx context.Context, app *v1alpha1.Application, authentikURL string) (*ApplicationController, context.Context, func()) {
func newTestControllerWithContext(t *testing.T, ctx context.Context, app *v1alpha1.Application, authentikURL string) (*Controller, context.Context, func()) {
t.Helper()
authentikClient := newAuthentikAPIClientForTest(t, authentikURL)
@@ -368,7 +382,7 @@ func writeJSON(t *testing.T, w http.ResponseWriter, status int, body any) {
}
}
func getApplication(t *testing.T, ctrl *ApplicationController, namespace, name string) *v1alpha1.Application {
func getApplication(t *testing.T, ctrl *Controller, namespace, name string) *v1alpha1.Application {
t.Helper()
got, err := ctrl.applicationClientset.ApplicationV1alpha1().Applications(namespace).Get(
-94
View File
@@ -1,94 +0,0 @@
package controllers
import (
"context"
"fmt"
"time"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
)
type SyncHandler func(ctx context.Context, objRef cache.ObjectName) error
type Controller struct {
workqueue workqueue.TypedRateLimitingInterface[cache.ObjectName]
recorder record.EventRecorder
synced cache.InformerSynced
syncHandler SyncHandler
}
func NewController(
ctx context.Context,
workqueue workqueue.TypedRateLimitingInterface[cache.ObjectName],
recorder record.EventRecorder,
synced cache.InformerSynced,
syncHandler SyncHandler,
) *Controller {
return &Controller{
workqueue: workqueue,
recorder: recorder,
synced: synced,
syncHandler: syncHandler,
}
}
func (c *Controller) Run(ctx context.Context, workers int) error {
defer utilruntime.HandleCrash()
defer c.workqueue.ShutDown()
logger := klog.FromContext(ctx)
logger.Info("Starting PolicyBinding controller")
logger.Info("Waiting for informer caches to sync")
if ok := cache.WaitForCacheSync(ctx.Done(), c.synced); !ok {
return fmt.Errorf("failed to wait for caches to sync")
}
logger.Info("Starting workers", "count", workers)
for i := 0; i < workers; i++ {
go wait.UntilWithContext(ctx, c.runWorker, time.Second)
}
logger.Info("Started workers")
<-ctx.Done()
logger.Info("Shutting down workers")
return nil
}
func (c *Controller) runWorker(ctx context.Context) {
for c.processNextWorkItem(ctx) {
}
}
func (c *Controller) processNextWorkItem(ctx context.Context) bool {
objRef, shutdown := c.workqueue.Get()
logger := klog.FromContext(ctx)
if shutdown {
return false
}
defer c.workqueue.Done(objRef)
err := c.syncHandler(ctx, objRef)
if err == nil {
c.workqueue.Forget(objRef)
logger.Info("Successfully synced", "objectName", objRef)
return true
}
utilruntime.HandleErrorWithContext(ctx, err, "Error syncing; requeuing for later retry", "objectReference", objRef)
c.workqueue.AddRateLimited(objRef)
return true
}
func (c *Controller) Enqueue(obj interface{}) {
objectRef, err := cache.ObjectToName(obj)
if err != nil {
utilruntime.HandleError(err)
return
}
c.workqueue.Add(objectRef)
}
-190
View File
@@ -1,190 +0,0 @@
// AI generated tests and not yet reviewed.
package controllers
import (
"context"
"errors"
"sync/atomic"
"testing"
"time"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
)
func newTestController(t *testing.T, synced cache.InformerSynced, syncHandler SyncHandler) (*Controller, workqueue.TypedRateLimitingInterface[cache.ObjectName]) {
t.Helper()
ratelimiter := workqueue.NewTypedItemExponentialFailureRateLimiter[cache.ObjectName](time.Millisecond, time.Second)
q := workqueue.NewTypedRateLimitingQueue(ratelimiter)
t.Cleanup(q.ShutDown)
if synced == nil {
synced = func() bool { return true }
}
ctrl := NewController(
context.Background(),
q,
record.NewFakeRecorder(10),
synced,
syncHandler,
)
return ctrl, q
}
func TestController_processNextWorkItem_success(t *testing.T) {
objRef := cache.ObjectName{Namespace: "default", Name: "test"}
var syncedRef cache.ObjectName
ctrl, q := newTestController(t, nil, func(_ context.Context, ref cache.ObjectName) error {
syncedRef = ref
return nil
})
q.Add(objRef)
if !ctrl.processNextWorkItem(context.Background()) {
t.Fatal("processNextWorkItem() = false, want true")
}
if syncedRef != objRef {
t.Fatalf("syncHandler object = %+v, want %+v", syncedRef, objRef)
}
if q.Len() != 0 {
t.Fatalf("queue length = %d, want 0 after successful sync", q.Len())
}
if q.NumRequeues(objRef) != 0 {
t.Fatalf("requeues = %d, want 0 after successful sync", q.NumRequeues(objRef))
}
}
func TestController_processNextWorkItem_syncError(t *testing.T) {
objRef := cache.ObjectName{Namespace: "default", Name: "test"}
syncErr := errors.New("sync failed")
ctrl, q := newTestController(t, nil, func(context.Context, cache.ObjectName) error {
return syncErr
})
q.Add(objRef)
if !ctrl.processNextWorkItem(context.Background()) {
t.Fatal("processNextWorkItem() = false, want true")
}
if q.NumRequeues(objRef) != 1 {
t.Fatalf("requeues = %d, want 1 after sync error", q.NumRequeues(objRef))
}
}
func TestController_processNextWorkItem_shutdown(t *testing.T) {
ctrl, q := newTestController(t, nil, func(context.Context, cache.ObjectName) error {
return nil
})
q.ShutDown()
if ctrl.processNextWorkItem(context.Background()) {
t.Fatal("processNextWorkItem() = true, want false on shutdown")
}
}
func TestController_Enqueue(t *testing.T) {
ctrl, q := newTestController(t, nil, func(context.Context, cache.ObjectName) error {
return nil
})
obj := &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "test",
},
}
ctrl.Enqueue(obj)
if q.Len() != 1 {
t.Fatalf("queue length = %d, want 1 after Enqueue", q.Len())
}
}
func TestController_Enqueue_invalidObject(t *testing.T) {
ctrl, q := newTestController(t, nil, func(context.Context, cache.ObjectName) error {
return nil
})
ctrl.Enqueue("not-a-kubernetes-object")
if q.Len() != 0 {
t.Fatalf("queue length = %d, want 0 for invalid object", q.Len())
}
}
func TestController_Run_cacheSyncFails(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ctrl, _ := newTestController(t, func() bool { return false }, func(context.Context, cache.ObjectName) error {
return nil
})
go func() {
time.Sleep(10 * time.Millisecond)
cancel()
}()
err := ctrl.Run(ctx, 1)
if err == nil {
t.Fatal("Run() error = nil, want cache sync failure")
}
}
func TestController_Run_shutsDownOnCancel(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
ctrl, _ := newTestController(t, nil, func(context.Context, cache.ObjectName) error {
return nil
})
errCh := make(chan error, 1)
go func() {
errCh <- ctrl.Run(ctx, 1)
}()
time.Sleep(50 * time.Millisecond)
cancel()
select {
case err := <-errCh:
if err != nil {
t.Fatalf("Run() error = %v, want nil on context cancel", err)
}
case <-time.After(2 * time.Second):
t.Fatal("Run() did not return after context cancellation")
}
}
func TestController_runWorker_processesQueuedItem(t *testing.T) {
objRef := cache.ObjectName{Namespace: "default", Name: "test"}
var calls atomic.Int32
ctrl, q := newTestController(t, nil, func(context.Context, cache.ObjectName) error {
calls.Add(1)
return nil
})
q.Add(objRef)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go ctrl.runWorker(ctx)
deadline := time.Now().Add(2 * time.Second)
for time.Now().Before(deadline) {
if calls.Load() == 1 && q.Len() == 0 {
cancel()
return
}
time.Sleep(5 * time.Millisecond)
}
cancel()
t.Fatalf("runWorker did not process queued item: calls=%d queueLen=%d", calls.Load(), q.Len())
}
+89 -47
View File
@@ -21,6 +21,7 @@ import (
"fmt"
"net/http"
"slices"
"strconv"
"time"
"golang.org/x/time/rate"
@@ -29,6 +30,7 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
@@ -38,7 +40,6 @@ import (
"k8s.io/klog/v2"
v1alpha1 "gitea.t000-n.de/t.behrendt/authentik-kubernetes-operator/pkg/apis/policybinding/v1alpha1"
controllers "gitea.t000-n.de/t.behrendt/authentik-kubernetes-operator/pkg/controllers"
clientset "gitea.t000-n.de/t.behrendt/authentik-kubernetes-operator/pkg/generated/clientset/versioned"
operatorscheme "gitea.t000-n.de/t.behrendt/authentik-kubernetes-operator/pkg/generated/clientset/versioned/scheme"
informers "gitea.t000-n.de/t.behrendt/authentik-kubernetes-operator/pkg/generated/informers/externalversions/policybinding/v1alpha1"
@@ -61,14 +62,16 @@ const (
DeleteAuthentikPolicyBindingFinalizer = "policybinding.t000-n.de/delete-authentik-policybinding"
)
type PolicyBindingController struct {
type Controller struct {
kubeclientset kubernetes.Interface
policyBindingClientset clientset.Interface
authentik *authentikapi.APIClient
policyBindingListener listers.PolicyBindingLister
policyBindingSynced cache.InformerSynced
controller *controllers.Controller
workqueue workqueue.TypedRateLimitingInterface[cache.ObjectName]
recorder record.EventRecorder
}
func NewController(
@@ -77,7 +80,7 @@ func NewController(
policyBindingClientset clientset.Interface,
authentik *authentikapi.APIClient,
policyBindingInformer informers.PolicyBindingInformer,
) *PolicyBindingController {
) *Controller {
logger := klog.FromContext(ctx)
utilruntime.Must(operatorscheme.AddToScheme(scheme.Scheme))
@@ -92,36 +95,75 @@ func NewController(
&workqueue.TypedBucketRateLimiter[cache.ObjectName]{Limiter: rate.NewLimiter(rate.Limit(50), 300)},
)
c := &PolicyBindingController{
c := &Controller{
kubeclientset: kubeclientset,
policyBindingClientset: policyBindingClientset,
authentik: authentik,
policyBindingListener: policyBindingInformer.Lister(),
policyBindingSynced: policyBindingInformer.Informer().HasSynced,
workqueue: workqueue.NewTypedRateLimitingQueue(ratelimiter),
recorder: recorder,
}
c.controller = controllers.NewController(
ctx,
workqueue.NewTypedRateLimitingQueue(ratelimiter),
recorder,
policyBindingInformer.Informer().HasSynced,
c.syncHandler,
)
logger.Info("Setting up event handlers")
policyBindingInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.controller.Enqueue,
AddFunc: c.enqueuePolicyBinding,
UpdateFunc: func(_, newObj interface{}) {
c.controller.Enqueue(newObj)
c.enqueuePolicyBinding(newObj)
},
})
return c
}
func (c *PolicyBindingController) Run(ctx context.Context, workers int) error {
return c.controller.Run(ctx, workers)
func (c *Controller) Run(ctx context.Context, workers int) error {
defer utilruntime.HandleCrash()
defer c.workqueue.ShutDown()
logger := klog.FromContext(ctx)
logger.Info("Starting PolicyBinding controller")
logger.Info("Waiting for informer caches to sync")
if ok := cache.WaitForCacheSync(ctx.Done(), c.policyBindingSynced); !ok {
return fmt.Errorf("failed to wait for caches to sync")
}
logger.Info("Starting workers", "count", workers)
for i := 0; i < workers; i++ {
go wait.UntilWithContext(ctx, c.runWorker, time.Second)
}
logger.Info("Started workers")
<-ctx.Done()
logger.Info("Shutting down workers")
return nil
}
func (c *PolicyBindingController) syncHandler(ctx context.Context, objectRef cache.ObjectName) error {
func (c *Controller) runWorker(ctx context.Context) {
for c.processNextWorkItem(ctx) {
}
}
func (c *Controller) processNextWorkItem(ctx context.Context) bool {
objRef, shutdown := c.workqueue.Get()
logger := klog.FromContext(ctx)
if shutdown {
return false
}
defer c.workqueue.Done(objRef)
err := c.syncHandler(ctx, objRef)
if err == nil {
c.workqueue.Forget(objRef)
logger.Info("Successfully synced", "objectName", objRef)
return true
}
utilruntime.HandleErrorWithContext(ctx, err, "Error syncing; requeuing for later retry", "objectReference", objRef)
c.workqueue.AddRateLimited(objRef)
return true
}
func (c *Controller) syncHandler(ctx context.Context, objectRef cache.ObjectName) error {
logger := klog.LoggerWithValues(klog.FromContext(ctx), "objectRef", objectRef)
pb, err := c.policyBindingListener.PolicyBindings(objectRef.Namespace).Get(objectRef.Name)
@@ -154,17 +196,22 @@ func (c *PolicyBindingController) syncHandler(ctx context.Context, objectRef cac
return c.reconcileUpdate(ctx, pb)
}
func (c *PolicyBindingController) ensureFinalizers(ctx context.Context, pb *v1alpha1.PolicyBinding) error {
func (c *Controller) ensureFinalizers(ctx context.Context, pb *v1alpha1.PolicyBinding) error {
pb.ObjectMeta.Finalizers = append(pb.ObjectMeta.Finalizers, DeleteAuthentikPolicyBindingFinalizer)
return c.updatePolicyBinding(ctx, pb)
}
func (c *PolicyBindingController) reconcileDelete(ctx context.Context, pb *v1alpha1.PolicyBinding) error {
r, err := c.authentik.PoliciesApi.PoliciesBindingsDestroy(ctx, pb.Status.PK).Execute()
func (c *Controller) reconcileDelete(ctx context.Context, pb *v1alpha1.PolicyBinding) error {
pk, err := strconv.ParseInt(pb.Status.PK, 10, 32)
if err != nil {
// This handles an edge-case, where when the PolicyBinding on Authentik has already been deleted, but the finalizer is still present. We just remove the finalizer and return.
return fmt.Errorf("error parsing PK: %v", err)
}
r, err := c.authentik.ProvidersApi.ProvidersProxyDestroy(ctx, int32(pk)).Execute()
if err != nil {
// This handles an edge-case, where when the ProxyProvider on Authentik has already been deleted, but the finalizer is still present. We just remove the finalizer and return.
if r != nil && r.StatusCode != http.StatusNotFound {
return fmt.Errorf("error when calling `PoliciesAPI.PoliciesBindingsDestroy`: %w with response %v", err, r)
return fmt.Errorf("error when calling `ProvidersAPI.ProvidersProxyDestroy`: %w with response %v", err, r)
}
}
@@ -172,7 +219,7 @@ func (c *PolicyBindingController) reconcileDelete(ctx context.Context, pb *v1alp
return c.updatePolicyBinding(ctx, pb)
}
func (c *PolicyBindingController) reconcileUpdate(ctx context.Context, pb *v1alpha1.PolicyBinding) error {
func (c *Controller) reconcileUpdate(ctx context.Context, pb *v1alpha1.PolicyBinding) error {
_, r, err := c.authentik.PoliciesApi.PoliciesBindingsRetrieve(ctx, pb.Status.PK).Execute()
if err != nil {
if r != nil && r.StatusCode == http.StatusNotFound {
@@ -185,43 +232,29 @@ func (c *PolicyBindingController) reconcileUpdate(ctx context.Context, pb *v1alp
}
patchedPolicyBindingRequest := &authentikapi.PatchedPolicyBindingRequest{
Policy: *authentikapi.NewNullableString(&pb.Spec.Policy),
Group: *authentikapi.NewNullableString(&pb.Spec.Group),
User: *authentikapi.NewNullableInt32(&pb.Spec.User),
Target: &pb.Spec.Target,
Order: &pb.Spec.Order,
}
if pb.Spec.Policy != "" {
patchedPolicyBindingRequest.SetPolicy(pb.Spec.Policy)
}
if pb.Spec.Group != "" {
patchedPolicyBindingRequest.SetGroup(pb.Spec.Group)
}
if pb.Spec.User != 0 {
patchedPolicyBindingRequest.SetUser(pb.Spec.User)
}
resp, r, err := c.authentik.PoliciesApi.PoliciesBindingsPartialUpdate(ctx, pb.Status.PK).PatchedPolicyBindingRequest(*patchedPolicyBindingRequest).Execute()
if err != nil {
return fmt.Errorf("error when calling `PoliciesAPI.PoliciesBindingsPartialUpdate`: %w with response %v", err, r)
return fmt.Errorf("error when calling `ProvidersAPI.ProvidersProxyPartialUpdate`: %w with response %v", err, r)
}
pb.Status.PK = resp.Pk
return c.updatePolicyBindingStatus(ctx, pb)
}
func (c *PolicyBindingController) reconcileCreate(ctx context.Context, pb *v1alpha1.PolicyBinding) error {
func (c *Controller) reconcileCreate(ctx context.Context, pb *v1alpha1.PolicyBinding) error {
policyBindingRequest := &authentikapi.PolicyBindingRequest{
Policy: *authentikapi.NewNullableString(&pb.Spec.Policy),
Group: *authentikapi.NewNullableString(&pb.Spec.Group),
User: *authentikapi.NewNullableInt32(&pb.Spec.User),
Target: pb.Spec.Target,
Order: pb.Spec.Order,
}
if pb.Spec.Policy != "" {
policyBindingRequest.SetPolicy(pb.Spec.Policy)
}
if pb.Spec.Group != "" {
policyBindingRequest.SetGroup(pb.Spec.Group)
}
if pb.Spec.User != 0 {
policyBindingRequest.SetUser(pb.Spec.User)
}
resp, r, err := c.authentik.PoliciesApi.PoliciesBindingsCreate(ctx).PolicyBindingRequest(*policyBindingRequest).Execute()
if err != nil {
return fmt.Errorf("error when calling `PoliciesAPI.PoliciesBindingsCreate`: %w with response %v", err, r)
@@ -231,14 +264,23 @@ func (c *PolicyBindingController) reconcileCreate(ctx context.Context, pb *v1alp
return c.updatePolicyBindingStatus(ctx, pb)
}
func (c *PolicyBindingController) updatePolicyBindingStatus(ctx context.Context, pb *v1alpha1.PolicyBinding) error {
func (c *Controller) enqueuePolicyBinding(obj interface{}) {
objectRef, err := cache.ObjectToName(obj)
if err != nil {
utilruntime.HandleError(err)
return
}
c.workqueue.Add(objectRef)
}
func (c *Controller) updatePolicyBindingStatus(ctx context.Context, pb *v1alpha1.PolicyBinding) error {
pbCopy := pb.DeepCopy()
_, err := c.policyBindingClientset.PolicyBindingV1alpha1().PolicyBindings(pbCopy.Namespace).UpdateStatus(ctx, pbCopy, metav1.UpdateOptions{FieldManager: FieldManager})
return err
}
// Update metadata, spec, etc. of the PolicyBinding object.
func (c *PolicyBindingController) updatePolicyBinding(ctx context.Context, pb *v1alpha1.PolicyBinding) error {
func (c *Controller) updatePolicyBinding(ctx context.Context, pb *v1alpha1.PolicyBinding) error {
pbCopy := pb.DeepCopy()
_, err := c.policyBindingClientset.PolicyBindingV1alpha1().PolicyBindings(pbCopy.Namespace).Update(ctx, pbCopy, metav1.UpdateOptions{FieldManager: FieldManager})
return err
@@ -1,349 +0,0 @@
// AI generated tests and not yet reviewed.
package policybinding
import (
"context"
"encoding/json"
"net/http"
"net/http/httptest"
"net/url"
"slices"
"strings"
"testing"
v1alpha1 "gitea.t000-n.de/t.behrendt/authentik-kubernetes-operator/pkg/apis/policybinding/v1alpha1"
operatorfake "gitea.t000-n.de/t.behrendt/authentik-kubernetes-operator/pkg/generated/clientset/versioned/fake"
operatorinformers "gitea.t000-n.de/t.behrendt/authentik-kubernetes-operator/pkg/generated/informers/externalversions"
authentikapi "goauthentik.io/api/v3"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/tools/cache"
)
func TestController_syncHandler_create(t *testing.T) {
const wantPK = "42"
server := newAuthentikTestServer(t, authentikTestHandlers{
policyBindingCreate: func(w http.ResponseWriter, _ *http.Request) {
writeJSON(t, w, http.StatusCreated, map[string]any{"pk": wantPK})
},
})
t.Cleanup(server.Close)
ctrl, ctx, cancel := newTestController(t, testPolicyBinding(), server.URL)
t.Cleanup(cancel)
err := ctrl.syncHandler(ctx, cache.ObjectName{Namespace: "default", Name: "test-pb"})
if err != nil {
t.Fatalf("syncHandler() error = %v", err)
}
got := getPolicyBinding(t, ctrl, "default", "test-pb")
if got.Status.PK != wantPK {
t.Fatalf("status.pk = %q, want %q", got.Status.PK, wantPK)
}
}
func TestController_syncHandler_ensureFinalizers(t *testing.T) {
pb := testPolicyBinding()
pb.Status.PK = "42"
server := newAuthentikTestServer(t, authentikTestHandlers{})
t.Cleanup(server.Close)
ctrl, ctx, cancel := newTestController(t, pb, server.URL)
t.Cleanup(cancel)
err := ctrl.syncHandler(ctx, cache.ObjectName{Namespace: pb.Namespace, Name: pb.Name})
if err != nil {
t.Fatalf("syncHandler() error = %v", err)
}
got := getPolicyBinding(t, ctrl, pb.Namespace, pb.Name)
if !slices.Contains(got.Finalizers, DeleteAuthentikPolicyBindingFinalizer) {
t.Fatalf("finalizers = %v, want %q", got.Finalizers, DeleteAuthentikPolicyBindingFinalizer)
}
}
func TestController_syncHandler_update(t *testing.T) {
pb := testPolicyBinding()
pb.Status.PK = "42"
pb.Finalizers = []string{DeleteAuthentikPolicyBindingFinalizer}
server := newAuthentikTestServer(t, authentikTestHandlers{
policyBindingRetrieve: func(w http.ResponseWriter, _ *http.Request) {
writeJSON(t, w, http.StatusOK, map[string]any{"pk": "42"})
},
policyBindingPartialUpdate: func(w http.ResponseWriter, _ *http.Request) {
writeJSON(t, w, http.StatusOK, map[string]any{"pk": "42"})
},
})
t.Cleanup(server.Close)
ctrl, ctx, cancel := newTestController(t, pb, server.URL)
t.Cleanup(cancel)
err := ctrl.syncHandler(ctx, cache.ObjectName{Namespace: pb.Namespace, Name: pb.Name})
if err != nil {
t.Fatalf("syncHandler() error = %v", err)
}
got := getPolicyBinding(t, ctrl, pb.Namespace, pb.Name)
if got.Status.PK != "42" {
t.Fatalf("status.pk = %q, want 42", got.Status.PK)
}
}
func TestController_syncHandler_update_policyBindingNotFound(t *testing.T) {
pb := testPolicyBinding()
pb.Status.PK = "42"
pb.Finalizers = []string{DeleteAuthentikPolicyBindingFinalizer}
server := newAuthentikTestServer(t, authentikTestHandlers{
policyBindingRetrieve: func(w http.ResponseWriter, _ *http.Request) {
http.NotFound(w, nil)
},
})
t.Cleanup(server.Close)
ctrl, ctx, cancel := newTestController(t, pb, server.URL)
t.Cleanup(cancel)
err := ctrl.syncHandler(ctx, cache.ObjectName{Namespace: pb.Namespace, Name: pb.Name})
if err != nil {
t.Fatalf("syncHandler() error = %v", err)
}
got := getPolicyBinding(t, ctrl, pb.Namespace, pb.Name)
if got.Status.PK != "" {
t.Fatalf("status.pk = %q, want empty after policy binding not found", got.Status.PK)
}
}
func TestController_syncHandler_delete(t *testing.T) {
now := metav1.Now()
pb := testPolicyBinding()
pb.Status.PK = "42"
pb.DeletionTimestamp = &now
pb.Finalizers = []string{DeleteAuthentikPolicyBindingFinalizer}
var destroyCalled bool
server := newAuthentikTestServer(t, authentikTestHandlers{
policyBindingDestroy: func(w http.ResponseWriter, r *http.Request) {
destroyCalled = true
if r.Method != http.MethodDelete {
t.Errorf("destroy method = %s, want DELETE", r.Method)
}
w.WriteHeader(http.StatusNoContent)
},
})
t.Cleanup(server.Close)
ctrl, ctx, cancel := newTestController(t, pb, server.URL)
t.Cleanup(cancel)
err := ctrl.syncHandler(ctx, cache.ObjectName{Namespace: pb.Namespace, Name: pb.Name})
if err != nil {
t.Fatalf("syncHandler() error = %v", err)
}
if !destroyCalled {
t.Fatal("expected Authentik destroy call")
}
got := getPolicyBinding(t, ctrl, pb.Namespace, pb.Name)
if slices.Contains(got.Finalizers, DeleteAuthentikPolicyBindingFinalizer) {
t.Fatalf("finalizers = %v, want finalizer removed", got.Finalizers)
}
}
func TestController_syncHandler_delete_policyBindingAlreadyGone(t *testing.T) {
now := metav1.Now()
pb := testPolicyBinding()
pb.Status.PK = "42"
pb.DeletionTimestamp = &now
pb.Finalizers = []string{DeleteAuthentikPolicyBindingFinalizer}
server := newAuthentikTestServer(t, authentikTestHandlers{
policyBindingDestroy: func(w http.ResponseWriter, _ *http.Request) {
http.NotFound(w, nil)
},
})
t.Cleanup(server.Close)
ctrl, ctx, cancel := newTestController(t, pb, server.URL)
t.Cleanup(cancel)
err := ctrl.syncHandler(ctx, cache.ObjectName{Namespace: pb.Namespace, Name: pb.Name})
if err != nil {
t.Fatalf("syncHandler() error = %v", err)
}
got := getPolicyBinding(t, ctrl, pb.Namespace, pb.Name)
if slices.Contains(got.Finalizers, DeleteAuthentikPolicyBindingFinalizer) {
t.Fatalf("finalizers = %v, want finalizer removed after 404", got.Finalizers)
}
}
func TestController_syncHandler_notFound(t *testing.T) {
server := newAuthentikTestServer(t, authentikTestHandlers{})
t.Cleanup(server.Close)
ctrl, ctx, cancel := newTestController(t, nil, server.URL)
t.Cleanup(cancel)
err := ctrl.syncHandler(ctx, cache.ObjectName{Namespace: "default", Name: "missing"})
if err != nil {
t.Fatalf("syncHandler() error = %v, want nil for missing object", err)
}
}
// --- test helpers ---
func testPolicyBinding() *v1alpha1.PolicyBinding {
return &v1alpha1.PolicyBinding{
TypeMeta: metav1.TypeMeta{
APIVersion: v1alpha1.SchemeGroupVersion.String(),
Kind: "PolicyBinding",
},
ObjectMeta: metav1.ObjectMeta{
Name: "test-pb",
Namespace: "default",
},
Spec: v1alpha1.PolicyBindingSpec{
Group: "14ab813f-a7f9-481b-9b08-781953ae9ebf",
Target: "8dd85627-9c48-49c2-8afc-d73dd122ffc2",
Order: 1,
},
}
}
func newTestController(t *testing.T, pb *v1alpha1.PolicyBinding, authentikURL string) (*PolicyBindingController, context.Context, context.CancelFunc) {
t.Helper()
ctx, cancel := context.WithCancel(context.Background())
ctrl, _, stop := newTestControllerWithContext(t, ctx, pb, authentikURL)
return ctrl, ctx, func() {
cancel()
stop()
}
}
func newTestControllerWithContext(t *testing.T, ctx context.Context, pb *v1alpha1.PolicyBinding, authentikURL string) (*PolicyBindingController, context.Context, func()) {
t.Helper()
authentikClient := newAuthentikAPIClientForTest(t, authentikURL)
var objects []runtime.Object
if pb != nil {
objects = append(objects, pb)
}
policyBindingClient := operatorfake.NewSimpleClientset(objects...)
informerFactory := operatorinformers.NewSharedInformerFactory(policyBindingClient, 0)
policyBindingInformer := informerFactory.PolicyBinding().V1alpha1().PolicyBindings()
ctrl := NewController(ctx, fake.NewClientset(), policyBindingClient, authentikClient, policyBindingInformer)
informerFactory.Start(ctx.Done())
for informerType, synced := range informerFactory.WaitForCacheSync(ctx.Done()) {
if !synced {
t.Fatalf("informer %v failed to sync", informerType)
}
}
return ctrl, ctx, func() {}
}
func newAuthentikAPIClientForTest(t *testing.T, serverURL string) *authentikapi.APIClient {
t.Helper()
u, err := url.Parse(serverURL)
if err != nil {
t.Fatalf("parse server URL: %v", err)
}
cfg := authentikapi.NewConfiguration()
cfg.Scheme = u.Scheme
cfg.Host = u.Host
return authentikapi.NewAPIClient(cfg)
}
type authentikTestHandlers struct {
policyBindingCreate http.HandlerFunc
policyBindingRetrieve http.HandlerFunc
policyBindingPartialUpdate http.HandlerFunc
policyBindingDestroy http.HandlerFunc
}
func newAuthentikTestServer(t *testing.T, handlers authentikTestHandlers) *httptest.Server {
t.Helper()
handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
path := r.URL.Path
switch {
case path == "/api/v3/policies/bindings/" && r.Method == http.MethodPost:
if handlers.policyBindingCreate != nil {
handlers.policyBindingCreate(w, r)
return
}
http.NotFound(w, r)
case strings.HasPrefix(path, "/api/v3/policies/bindings/") && strings.HasSuffix(path, "/"):
idPath := strings.TrimPrefix(path, "/api/v3/policies/bindings/")
if idPath == "" {
http.NotFound(w, r)
return
}
switch r.Method {
case http.MethodGet:
if handlers.policyBindingRetrieve != nil {
handlers.policyBindingRetrieve(w, r)
return
}
http.NotFound(w, r)
case http.MethodPatch:
if handlers.policyBindingPartialUpdate != nil {
handlers.policyBindingPartialUpdate(w, r)
return
}
http.NotFound(w, r)
case http.MethodDelete:
if handlers.policyBindingDestroy != nil {
handlers.policyBindingDestroy(w, r)
return
}
http.NotFound(w, r)
default:
http.Error(w, "unexpected method on policy binding instance", http.StatusMethodNotAllowed)
}
default:
http.NotFound(w, r)
}
})
return httptest.NewServer(handler)
}
func writeJSON(t *testing.T, w http.ResponseWriter, status int, body any) {
t.Helper()
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(status)
if err := json.NewEncoder(w).Encode(body); err != nil {
t.Fatalf("write JSON response: %v", err)
}
}
func getPolicyBinding(t *testing.T, ctrl *PolicyBindingController, namespace, name string) *v1alpha1.PolicyBinding {
t.Helper()
got, err := ctrl.policyBindingClientset.PolicyBindingV1alpha1().PolicyBindings(namespace).Get(
context.Background(), name, metav1.GetOptions{},
)
if err != nil {
t.Fatalf("get PolicyBinding: %v", err)
}
return got
}
+73 -23
View File
@@ -30,6 +30,7 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
@@ -39,7 +40,6 @@ import (
"k8s.io/klog/v2"
v1alpha1 "gitea.t000-n.de/t.behrendt/authentik-kubernetes-operator/pkg/apis/proxyprovider/v1alpha1"
controllers "gitea.t000-n.de/t.behrendt/authentik-kubernetes-operator/pkg/controllers"
clientset "gitea.t000-n.de/t.behrendt/authentik-kubernetes-operator/pkg/generated/clientset/versioned"
operatorscheme "gitea.t000-n.de/t.behrendt/authentik-kubernetes-operator/pkg/generated/clientset/versioned/scheme"
informers "gitea.t000-n.de/t.behrendt/authentik-kubernetes-operator/pkg/generated/informers/externalversions/proxyprovider/v1alpha1"
@@ -62,14 +62,16 @@ const (
DeleteAuthentikProxyProviderFinalizer = "proxyprovider.t000-n.de/delete-authentik-proxyprovider"
)
type ProxyProviderController struct {
type Controller struct {
kubeclientset kubernetes.Interface
proxyProviderClientset clientset.Interface
authentik *authentikapi.APIClient
proxyLister listers.ProxyProviderLister
proxySynced cache.InformerSynced
controller *controllers.Controller
workqueue workqueue.TypedRateLimitingInterface[cache.ObjectName]
recorder record.EventRecorder
}
func NewController(
@@ -78,7 +80,7 @@ func NewController(
proxyProviderClientset clientset.Interface,
authentik *authentikapi.APIClient,
proxyInformer informers.ProxyProviderInformer,
) *ProxyProviderController {
) *Controller {
logger := klog.FromContext(ctx)
utilruntime.Must(operatorscheme.AddToScheme(scheme.Scheme))
@@ -93,36 +95,75 @@ func NewController(
&workqueue.TypedBucketRateLimiter[cache.ObjectName]{Limiter: rate.NewLimiter(rate.Limit(50), 300)},
)
c := &ProxyProviderController{
c := &Controller{
kubeclientset: kubeclientset,
proxyProviderClientset: proxyProviderClientset,
authentik: authentik,
proxyLister: proxyInformer.Lister(),
proxySynced: proxyInformer.Informer().HasSynced,
workqueue: workqueue.NewTypedRateLimitingQueue(ratelimiter),
recorder: recorder,
}
c.controller = controllers.NewController(
ctx,
workqueue.NewTypedRateLimitingQueue(ratelimiter),
recorder,
proxyInformer.Informer().HasSynced,
c.syncHandler,
)
logger.Info("Setting up event handlers")
proxyInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.controller.Enqueue,
AddFunc: c.enqueueProxyProvider,
UpdateFunc: func(_, newObj interface{}) {
c.controller.Enqueue(newObj)
c.enqueueProxyProvider(newObj)
},
})
return c
}
func (c *ProxyProviderController) Run(ctx context.Context, workers int) error {
return c.controller.Run(ctx, workers)
func (c *Controller) Run(ctx context.Context, workers int) error {
defer utilruntime.HandleCrash()
defer c.workqueue.ShutDown()
logger := klog.FromContext(ctx)
logger.Info("Starting ProxyProvider controller")
logger.Info("Waiting for informer caches to sync")
if ok := cache.WaitForCacheSync(ctx.Done(), c.proxySynced); !ok {
return fmt.Errorf("failed to wait for caches to sync")
}
logger.Info("Starting workers", "count", workers)
for i := 0; i < workers; i++ {
go wait.UntilWithContext(ctx, c.runWorker, time.Second)
}
logger.Info("Started workers")
<-ctx.Done()
logger.Info("Shutting down workers")
return nil
}
func (c *ProxyProviderController) syncHandler(ctx context.Context, objectRef cache.ObjectName) error {
func (c *Controller) runWorker(ctx context.Context) {
for c.processNextWorkItem(ctx) {
}
}
func (c *Controller) processNextWorkItem(ctx context.Context) bool {
objRef, shutdown := c.workqueue.Get()
logger := klog.FromContext(ctx)
if shutdown {
return false
}
defer c.workqueue.Done(objRef)
err := c.syncHandler(ctx, objRef)
if err == nil {
c.workqueue.Forget(objRef)
logger.Info("Successfully synced", "objectName", objRef)
return true
}
utilruntime.HandleErrorWithContext(ctx, err, "Error syncing; requeuing for later retry", "objectReference", objRef)
c.workqueue.AddRateLimited(objRef)
return true
}
func (c *Controller) syncHandler(ctx context.Context, objectRef cache.ObjectName) error {
logger := klog.LoggerWithValues(klog.FromContext(ctx), "objectRef", objectRef)
pp, err := c.proxyLister.ProxyProviders(objectRef.Namespace).Get(objectRef.Name)
@@ -155,12 +196,12 @@ func (c *ProxyProviderController) syncHandler(ctx context.Context, objectRef cac
return c.reconcileUpdate(ctx, pp)
}
func (c *ProxyProviderController) ensureFinalizers(ctx context.Context, pp *v1alpha1.ProxyProvider) error {
func (c *Controller) ensureFinalizers(ctx context.Context, pp *v1alpha1.ProxyProvider) error {
pp.ObjectMeta.Finalizers = append(pp.ObjectMeta.Finalizers, DeleteAuthentikProxyProviderFinalizer)
return c.updateProxyProvider(ctx, pp)
}
func (c *ProxyProviderController) reconcileDelete(ctx context.Context, pp *v1alpha1.ProxyProvider) error {
func (c *Controller) reconcileDelete(ctx context.Context, pp *v1alpha1.ProxyProvider) error {
pk, err := strconv.ParseInt(pp.Status.PK, 10, 32)
if err != nil {
return fmt.Errorf("error parsing PK: %v", err)
@@ -178,7 +219,7 @@ func (c *ProxyProviderController) reconcileDelete(ctx context.Context, pp *v1alp
return c.updateProxyProvider(ctx, pp)
}
func (c *ProxyProviderController) reconcileUpdate(ctx context.Context, pp *v1alpha1.ProxyProvider) error {
func (c *Controller) reconcileUpdate(ctx context.Context, pp *v1alpha1.ProxyProvider) error {
// We retrieve the existing PP from the API by slug.
pk, err := strconv.ParseInt(pp.Status.PK, 10, 32)
if err != nil {
@@ -212,7 +253,7 @@ func (c *ProxyProviderController) reconcileUpdate(ctx context.Context, pp *v1alp
return c.updateProxyProviderStatus(ctx, pp)
}
func (c *ProxyProviderController) reconcileCreate(ctx context.Context, pp *v1alpha1.ProxyProvider) error {
func (c *Controller) reconcileCreate(ctx context.Context, pp *v1alpha1.ProxyProvider) error {
proxyProviderRequest := &authentikapi.ProxyProviderRequest{
Name: pp.Spec.Name,
AuthorizationFlow: pp.Spec.AuthorizationFlow,
@@ -229,14 +270,23 @@ func (c *ProxyProviderController) reconcileCreate(ctx context.Context, pp *v1alp
return c.updateProxyProviderStatus(ctx, pp)
}
func (c *ProxyProviderController) updateProxyProviderStatus(ctx context.Context, pp *v1alpha1.ProxyProvider) error {
func (c *Controller) enqueueProxyProvider(obj interface{}) {
objectRef, err := cache.ObjectToName(obj)
if err != nil {
utilruntime.HandleError(err)
return
}
c.workqueue.Add(objectRef)
}
func (c *Controller) updateProxyProviderStatus(ctx context.Context, pp *v1alpha1.ProxyProvider) error {
ppCopy := pp.DeepCopy()
_, err := c.proxyProviderClientset.ProxyproviderV1alpha1().ProxyProviders(ppCopy.Namespace).UpdateStatus(ctx, ppCopy, metav1.UpdateOptions{FieldManager: FieldManager})
return err
}
// Update metadata, spec, etc. of the ProxyProvider object.
func (c *ProxyProviderController) updateProxyProvider(ctx context.Context, pp *v1alpha1.ProxyProvider) error {
func (c *Controller) updateProxyProvider(ctx context.Context, pp *v1alpha1.ProxyProvider) error {
ppCopy := pp.DeepCopy()
_, err := c.proxyProviderClientset.ProxyproviderV1alpha1().ProxyProviders(ppCopy.Namespace).Update(ctx, ppCopy, metav1.UpdateOptions{FieldManager: FieldManager})
if err != nil {
@@ -218,6 +218,20 @@ func TestController_syncHandler_invalidPK(t *testing.T) {
}
}
func TestController_enqueueProxyProvider(t *testing.T) {
server := newAuthentikTestServer(t, authentikTestHandlers{})
t.Cleanup(server.Close)
ctrl, _, cancel := newTestController(t, testProxyProvider(), server.URL)
t.Cleanup(cancel)
ctrl.enqueueProxyProvider(testProxyProvider())
if ctrl.workqueue.Len() != 1 {
t.Fatalf("workqueue length = %d, want 1", ctrl.workqueue.Len())
}
}
// --- test helpers ---
func testProxyProvider() *v1alpha1.ProxyProvider {
@@ -239,7 +253,7 @@ func testProxyProvider() *v1alpha1.ProxyProvider {
}
}
func newTestController(t *testing.T, pp *v1alpha1.ProxyProvider, authentikURL string) (*ProxyProviderController, context.Context, context.CancelFunc) {
func newTestController(t *testing.T, pp *v1alpha1.ProxyProvider, authentikURL string) (*Controller, context.Context, context.CancelFunc) {
t.Helper()
ctx, cancel := context.WithCancel(context.Background())
ctrl, _, stop := newTestControllerWithContext(t, ctx, pp, authentikURL)
@@ -249,7 +263,7 @@ func newTestController(t *testing.T, pp *v1alpha1.ProxyProvider, authentikURL st
}
}
func newTestControllerWithContext(t *testing.T, ctx context.Context, pp *v1alpha1.ProxyProvider, authentikURL string) (*ProxyProviderController, context.Context, func()) {
func newTestControllerWithContext(t *testing.T, ctx context.Context, pp *v1alpha1.ProxyProvider, authentikURL string) (*Controller, context.Context, func()) {
t.Helper()
authentikClient := newAuthentikAPIClientForTest(t, authentikURL)
@@ -358,7 +372,7 @@ func writeJSON(t *testing.T, w http.ResponseWriter, status int, body any) {
}
}
func getProxyProvider(t *testing.T, ctrl *ProxyProviderController, namespace, name string) *v1alpha1.ProxyProvider {
func getProxyProvider(t *testing.T, ctrl *Controller, namespace, name string) *v1alpha1.ProxyProvider {
t.Helper()
got, err := ctrl.proxyProviderClientset.ProxyproviderV1alpha1().ProxyProviders(namespace).Get(
@@ -363,6 +363,7 @@ func schema_pkg_apis_policybinding_v1alpha1_PolicyBindingSpec(ref common.Referen
},
"user": {
SchemaProps: spec.SchemaProps{
Default: 0,
Type: []string{"integer"},
Format: "int32",
},
@@ -382,7 +383,7 @@ func schema_pkg_apis_policybinding_v1alpha1_PolicyBindingSpec(ref common.Referen
},
},
},
Required: []string{"target", "order"},
Required: []string{"user", "target", "order"},
},
},
}