Compare commits

..

1 Commits

Author SHA1 Message Date
t.behrendt 6d9496185e feat: vertical slice proxy provider (#2)
CD / Create tag (push) Successful in 25s
CD / Build and push (amd64) (push) Successful in 1m31s
CD / Create manifest (push) Successful in 7s
Reviewed-on: #2
Co-authored-by: Timo Behrendt <t.behrendt@t00n.de>
Co-committed-by: Timo Behrendt <t.behrendt@t00n.de>
2026-05-17 21:02:00 +02:00
16 changed files with 288 additions and 806 deletions
+1 -1
View File
@@ -9,6 +9,6 @@ COPY . .
RUN CGO_ENABLED=0 GOOS=linux GOARCH=${GOARCH} \ RUN CGO_ENABLED=0 GOOS=linux GOARCH=${GOARCH} \
go build -trimpath -ldflags="-s -w" -o main . go build -trimpath -ldflags="-s -w" -o main .
FROM gcr.io/distroless/static-debian12@sha256:9c346e4be81b5ca7ff31a0d89eaeade58b0f95cfd3baed1f36083ddb47ca3160 FROM gcr.io/distroless/static-debian12@sha256:20bc6c0bc4d625a22a8fde3e55f6515709b32055ef8fb9cfbddaa06d1760f838
COPY --from=build /app/main / COPY --from=build /app/main /
CMD ["/main"] CMD ["/main"]
-40
View File
@@ -11,8 +11,6 @@ Manual changes to the resources in Authentik will be overwritten by the operator
| Custom Resource | CRD File | Short Name | | Custom Resource | CRD File | Short Name |
| --------------- | ---------------------------------------------------------- | ---------- | | --------------- | ---------------------------------------------------------- | ---------- |
| ProxyProvider | [`proxyProvider.yaml`](`artifacts/crd/proxyProvider.yaml`) | pp | | ProxyProvider | [`proxyProvider.yaml`](`artifacts/crd/proxyProvider.yaml`) | pp |
| Application | [`application.yaml`](`artifacts/crd/application.yaml`) | app |
| PolicyBinding | [`policyBinding.yaml`](`artifacts/crd/policyBinding.yaml`) | pb |
### ProxyProvider ### ProxyProvider
@@ -38,44 +36,6 @@ spec:
The ProxyProvider will be created in Authentik, but will not be assigned to an outpost or an application (Resources are TBD). The ProxyProvider will be created in Authentik, but will not be assigned to an outpost or an application (Resources are TBD).
### Application
The Application only supports a reduced set of fields.
Example [`application.yaml`](`artifacts/examples/application.yaml`):
```yaml
apiVersion: application.t000-n.de/v1alpha1
kind: Application
metadata:
name: application-example
spec:
name: Application Example
slug: application-example
# The ID of the provider, which can be retrieved from e.g. the ProxyPRovider via "kubectl get pp proxy-provider-example -o jsonpath='{.status.pk}'"
provider: 105
```
### PolicyBinding
The PolicyBinding is used to bind a policy to a target, e.g. allow a group or user to access an application.
The PolicyBinding only supports a reduced set of fields.
Example [`policyBinding.yaml`](`artifacts/examples/policyBinding.yaml`):
```yaml
apiVersion: policybinding.t000-n.de/v1alpha1
kind: PolicyBinding
metadata:
name: policy-binding-example
spec:
group: 14ab813f-a7f9-481b-9b08-781953ae9ebf
# The ID of the target, e.g. an Application, which can be retrieved from e.g. the Application via "kubectl get app application-example -o jsonpath='{.status.pk}'"
target: 8dd85627-9c48-49c2-8afc-d73dd122ffc2
# The order in which the policy is applied. This needs to be unique for each PolicyBinding.
order: 1
```
## Versioning ## Versioning
As soon as the operator covers an entire use case, the version will be raised to v1 and follow default versioning rules. Before that, the version will be v1alpha1. As soon as the operator covers an entire use case, the version will be raised to v1 and follow default versioning rules. Before that, the version will be v1alpha1.
-8
View File
@@ -1,8 +0,0 @@
apiVersion: application.t000-n.de/v1alpha1
kind: Application
metadata:
name: application-example
spec:
name: Application Example
slug: application-example
provider: 105
-8
View File
@@ -1,8 +0,0 @@
apiVersion: policybinding.t000-n.de/v1alpha1
kind: PolicyBinding
metadata:
name: policy-binding-example
spec:
group: 14ab813f-a7f9-481b-9b08-781953ae9ebf
target: 8dd85627-9c48-49c2-8afc-d73dd122ffc2
order: 1
+3 -3
View File
@@ -7,9 +7,9 @@ godebug default=go1.26
require ( require (
goauthentik.io/api/v3 v3.2026020.16 goauthentik.io/api/v3 v3.2026020.16
golang.org/x/time v0.15.0 golang.org/x/time v0.15.0
k8s.io/api v0.36.0 k8s.io/api v0.0.0-20260509204538-0dfb117cc6ec
k8s.io/apimachinery v0.36.0 k8s.io/apimachinery v0.0.0-20260513183604-f9371b815e42
k8s.io/client-go v0.36.0 k8s.io/client-go v0.0.0-20260509205101-ca52b81a2940
k8s.io/klog/v2 v2.140.0 k8s.io/klog/v2 v2.140.0
k8s.io/kube-openapi v0.0.0-20260511211612-da4e56fe5676 k8s.io/kube-openapi v0.0.0-20260511211612-da4e56fe5676
sigs.k8s.io/structured-merge-diff/v6 v6.4.0 sigs.k8s.io/structured-merge-diff/v6 v6.4.0
-6
View File
@@ -121,16 +121,10 @@ gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
k8s.io/api v0.0.0-20260509204538-0dfb117cc6ec h1:xf12Yh3ltN4fnNyP0CyyM0TwNVnZDfLJjV3+bf9fPFY= k8s.io/api v0.0.0-20260509204538-0dfb117cc6ec h1:xf12Yh3ltN4fnNyP0CyyM0TwNVnZDfLJjV3+bf9fPFY=
k8s.io/api v0.0.0-20260509204538-0dfb117cc6ec/go.mod h1:C+fcNlNQ9TcKHspN+DD7UybdfnjDAGyBjfCd6W7ogbY= k8s.io/api v0.0.0-20260509204538-0dfb117cc6ec/go.mod h1:C+fcNlNQ9TcKHspN+DD7UybdfnjDAGyBjfCd6W7ogbY=
k8s.io/api v0.36.0 h1:SgqDhZzHdOtMk40xVSvCXkP9ME0H05hPM3p9AB1kL80=
k8s.io/api v0.36.0/go.mod h1:m1LVrGPNYax5NBHdO+QuAedXyuzTt4RryI/qnmNvs34=
k8s.io/apimachinery v0.0.0-20260513183604-f9371b815e42 h1:rWdGOTor3z0WSyZcRl9ms4dn9Cw9CqmNBqXuf2z0k1k= k8s.io/apimachinery v0.0.0-20260513183604-f9371b815e42 h1:rWdGOTor3z0WSyZcRl9ms4dn9Cw9CqmNBqXuf2z0k1k=
k8s.io/apimachinery v0.0.0-20260513183604-f9371b815e42/go.mod h1:hiubQ6UTHIdr0bS8ExXOJEywFVOoudnldm/l/NiNVlA= k8s.io/apimachinery v0.0.0-20260513183604-f9371b815e42/go.mod h1:hiubQ6UTHIdr0bS8ExXOJEywFVOoudnldm/l/NiNVlA=
k8s.io/apimachinery v0.36.0 h1:jZyPzhd5Z+3h9vJLt0z9XdzW9VzNzWAUw+P1xZ9PXtQ=
k8s.io/apimachinery v0.36.0/go.mod h1:FklypaRJt6n5wUIwWXIP6GJlIpUizTgfo1T/As+Tyxc=
k8s.io/client-go v0.0.0-20260509205101-ca52b81a2940 h1:n5t5Jx3VpLdiAGxIvIHsZDmsExtZVwghUPLM3wFi6Go= k8s.io/client-go v0.0.0-20260509205101-ca52b81a2940 h1:n5t5Jx3VpLdiAGxIvIHsZDmsExtZVwghUPLM3wFi6Go=
k8s.io/client-go v0.0.0-20260509205101-ca52b81a2940/go.mod h1:0e7OLwg7kdXISVFwn7ishFdvxfVgi7wsqHqsQPHl61w= k8s.io/client-go v0.0.0-20260509205101-ca52b81a2940/go.mod h1:0e7OLwg7kdXISVFwn7ishFdvxfVgi7wsqHqsQPHl61w=
k8s.io/client-go v0.36.0 h1:pOYi7C4RHChYjMiHpZSpSbIM6ZxVbRXBy7CuiIwqA3c=
k8s.io/client-go v0.36.0/go.mod h1:ZKKcpwF0aLYfkHFCjillCKaTK/yBkEDHTDXCFY6AS9Y=
k8s.io/klog/v2 v2.140.0 h1:Tf+J3AH7xnUzZyVVXhTgGhEKnFqye14aadWv7bzXdzc= k8s.io/klog/v2 v2.140.0 h1:Tf+J3AH7xnUzZyVVXhTgGhEKnFqye14aadWv7bzXdzc=
k8s.io/klog/v2 v2.140.0/go.mod h1:o+/RWfJ6PwpnFn7OyAG3QnO47BFsymfEfrz6XyYSSp0= k8s.io/klog/v2 v2.140.0/go.mod h1:o+/RWfJ6PwpnFn7OyAG3QnO47BFsymfEfrz6XyYSSp0=
k8s.io/kube-openapi v0.0.0-20260511211612-da4e56fe5676 h1:ahjrVu/DBcaAhw/GcblfaOvvQ2wi8kqXWvn62nud3UU= k8s.io/kube-openapi v0.0.0-20260511211612-da4e56fe5676 h1:ahjrVu/DBcaAhw/GcblfaOvvQ2wi8kqXWvn62nud3UU=
+1 -1
View File
@@ -35,7 +35,7 @@ type PolicyBinding struct {
type PolicyBindingSpec struct { type PolicyBindingSpec struct {
Policy string `json:"policy,omitempty"` Policy string `json:"policy,omitempty"`
Group string `json:"group,omitempty"` Group string `json:"group,omitempty"`
User int32 `json:"user,omitempty"` User int32 `json:"user"`
Target string `json:"target"` Target string `json:"target"`
Order int32 `json:"order"` Order int32 `json:"order"`
} }
+83 -27
View File
@@ -21,6 +21,7 @@ import (
"fmt" "fmt"
"net/http" "net/http"
"slices" "slices"
"strconv"
"time" "time"
"golang.org/x/time/rate" "golang.org/x/time/rate"
@@ -29,6 +30,7 @@ import (
"k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/kubernetes/scheme"
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
@@ -38,7 +40,6 @@ import (
"k8s.io/klog/v2" "k8s.io/klog/v2"
v1alpha1 "gitea.t000-n.de/t.behrendt/authentik-kubernetes-operator/pkg/apis/application/v1alpha1" v1alpha1 "gitea.t000-n.de/t.behrendt/authentik-kubernetes-operator/pkg/apis/application/v1alpha1"
controllers "gitea.t000-n.de/t.behrendt/authentik-kubernetes-operator/pkg/controllers"
clientset "gitea.t000-n.de/t.behrendt/authentik-kubernetes-operator/pkg/generated/clientset/versioned" clientset "gitea.t000-n.de/t.behrendt/authentik-kubernetes-operator/pkg/generated/clientset/versioned"
operatorscheme "gitea.t000-n.de/t.behrendt/authentik-kubernetes-operator/pkg/generated/clientset/versioned/scheme" operatorscheme "gitea.t000-n.de/t.behrendt/authentik-kubernetes-operator/pkg/generated/clientset/versioned/scheme"
informers "gitea.t000-n.de/t.behrendt/authentik-kubernetes-operator/pkg/generated/informers/externalversions/application/v1alpha1" informers "gitea.t000-n.de/t.behrendt/authentik-kubernetes-operator/pkg/generated/informers/externalversions/application/v1alpha1"
@@ -61,14 +62,16 @@ const (
DeleteAuthentikApplicationFinalizer = "application.t000-n.de/delete-authentik-application" DeleteAuthentikApplicationFinalizer = "application.t000-n.de/delete-authentik-application"
) )
type ApplicationController struct { type Controller struct {
kubeclientset kubernetes.Interface kubeclientset kubernetes.Interface
applicationClientset clientset.Interface applicationClientset clientset.Interface
authentik *authentikapi.APIClient authentik *authentikapi.APIClient
applicationListener listers.ApplicationLister applicationListener listers.ApplicationLister
applicationSynced cache.InformerSynced
controller *controllers.Controller workqueue workqueue.TypedRateLimitingInterface[cache.ObjectName]
recorder record.EventRecorder
} }
func NewController( func NewController(
@@ -77,7 +80,7 @@ func NewController(
applicationClientset clientset.Interface, applicationClientset clientset.Interface,
authentik *authentikapi.APIClient, authentik *authentikapi.APIClient,
applicationInformer informers.ApplicationInformer, applicationInformer informers.ApplicationInformer,
) *ApplicationController { ) *Controller {
logger := klog.FromContext(ctx) logger := klog.FromContext(ctx)
utilruntime.Must(operatorscheme.AddToScheme(scheme.Scheme)) utilruntime.Must(operatorscheme.AddToScheme(scheme.Scheme))
@@ -92,36 +95,75 @@ func NewController(
&workqueue.TypedBucketRateLimiter[cache.ObjectName]{Limiter: rate.NewLimiter(rate.Limit(50), 300)}, &workqueue.TypedBucketRateLimiter[cache.ObjectName]{Limiter: rate.NewLimiter(rate.Limit(50), 300)},
) )
c := &ApplicationController{ c := &Controller{
kubeclientset: kubeclientset, kubeclientset: kubeclientset,
applicationClientset: applicationClientset, applicationClientset: applicationClientset,
authentik: authentik, authentik: authentik,
applicationListener: applicationInformer.Lister(), applicationListener: applicationInformer.Lister(),
applicationSynced: applicationInformer.Informer().HasSynced,
workqueue: workqueue.NewTypedRateLimitingQueue(ratelimiter),
recorder: recorder,
} }
c.controller = controllers.NewController(
ctx,
workqueue.NewTypedRateLimitingQueue(ratelimiter),
recorder,
applicationInformer.Informer().HasSynced,
c.syncHandler,
)
logger.Info("Setting up event handlers") logger.Info("Setting up event handlers")
applicationInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ applicationInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.controller.Enqueue, AddFunc: c.enqueueApplication,
UpdateFunc: func(_, newObj interface{}) { UpdateFunc: func(_, newObj interface{}) {
c.controller.Enqueue(newObj) c.enqueueApplication(newObj)
}, },
}) })
return c return c
} }
func (c *ApplicationController) Run(ctx context.Context, workers int) error { func (c *Controller) Run(ctx context.Context, workers int) error {
return c.controller.Run(ctx, workers) defer utilruntime.HandleCrash()
defer c.workqueue.ShutDown()
logger := klog.FromContext(ctx)
logger.Info("Starting Application controller")
logger.Info("Waiting for informer caches to sync")
if ok := cache.WaitForCacheSync(ctx.Done(), c.applicationSynced); !ok {
return fmt.Errorf("failed to wait for caches to sync")
}
logger.Info("Starting workers", "count", workers)
for i := 0; i < workers; i++ {
go wait.UntilWithContext(ctx, c.runWorker, time.Second)
}
logger.Info("Started workers")
<-ctx.Done()
logger.Info("Shutting down workers")
return nil
} }
func (c *ApplicationController) syncHandler(ctx context.Context, objectRef cache.ObjectName) error { func (c *Controller) runWorker(ctx context.Context) {
for c.processNextWorkItem(ctx) {
}
}
func (c *Controller) processNextWorkItem(ctx context.Context) bool {
objRef, shutdown := c.workqueue.Get()
logger := klog.FromContext(ctx)
if shutdown {
return false
}
defer c.workqueue.Done(objRef)
err := c.syncHandler(ctx, objRef)
if err == nil {
c.workqueue.Forget(objRef)
logger.Info("Successfully synced", "objectName", objRef)
return true
}
utilruntime.HandleErrorWithContext(ctx, err, "Error syncing; requeuing for later retry", "objectReference", objRef)
c.workqueue.AddRateLimited(objRef)
return true
}
func (c *Controller) syncHandler(ctx context.Context, objectRef cache.ObjectName) error {
logger := klog.LoggerWithValues(klog.FromContext(ctx), "objectRef", objectRef) logger := klog.LoggerWithValues(klog.FromContext(ctx), "objectRef", objectRef)
app, err := c.applicationListener.Applications(objectRef.Namespace).Get(objectRef.Name) app, err := c.applicationListener.Applications(objectRef.Namespace).Get(objectRef.Name)
@@ -154,17 +196,22 @@ func (c *ApplicationController) syncHandler(ctx context.Context, objectRef cache
return c.reconcileUpdate(ctx, app) return c.reconcileUpdate(ctx, app)
} }
func (c *ApplicationController) ensureFinalizers(ctx context.Context, app *v1alpha1.Application) error { func (c *Controller) ensureFinalizers(ctx context.Context, app *v1alpha1.Application) error {
app.ObjectMeta.Finalizers = append(app.ObjectMeta.Finalizers, DeleteAuthentikApplicationFinalizer) app.ObjectMeta.Finalizers = append(app.ObjectMeta.Finalizers, DeleteAuthentikApplicationFinalizer)
return c.updateApplication(ctx, app) return c.updateApplication(ctx, app)
} }
func (c *ApplicationController) reconcileDelete(ctx context.Context, app *v1alpha1.Application) error { func (c *Controller) reconcileDelete(ctx context.Context, app *v1alpha1.Application) error {
r, err := c.authentik.CoreApi.CoreApplicationsDestroy(ctx, app.Status.PK).Execute() pk, err := strconv.ParseInt(app.Status.PK, 10, 32)
if err != nil { if err != nil {
// This handles an edge-case, where when the Application on Authentik has already been deleted, but the finalizer is still present. We just remove the finalizer and return. return fmt.Errorf("error parsing PK: %v", err)
}
r, err := c.authentik.ProvidersApi.ProvidersProxyDestroy(ctx, int32(pk)).Execute()
if err != nil {
// This handles an edge-case, where when the ProxyProvider on Authentik has already been deleted, but the finalizer is still present. We just remove the finalizer and return.
if r != nil && r.StatusCode != http.StatusNotFound { if r != nil && r.StatusCode != http.StatusNotFound {
return fmt.Errorf("error when calling `CoreAPI.CoreApplicationsDestroy`: %w with response %v", err, r) return fmt.Errorf("error when calling `ProvidersAPI.ProvidersProxyDestroy`: %w with response %v", err, r)
} }
} }
@@ -172,7 +219,7 @@ func (c *ApplicationController) reconcileDelete(ctx context.Context, app *v1alph
return c.updateApplication(ctx, app) return c.updateApplication(ctx, app)
} }
func (c *ApplicationController) reconcileUpdate(ctx context.Context, app *v1alpha1.Application) error { func (c *Controller) reconcileUpdate(ctx context.Context, app *v1alpha1.Application) error {
_, r, err := c.authentik.CoreApi.CoreApplicationsRetrieve(ctx, app.Spec.Slug).Execute() _, r, err := c.authentik.CoreApi.CoreApplicationsRetrieve(ctx, app.Spec.Slug).Execute()
if err != nil { if err != nil {
if r != nil && r.StatusCode == http.StatusNotFound { if r != nil && r.StatusCode == http.StatusNotFound {
@@ -191,14 +238,14 @@ func (c *ApplicationController) reconcileUpdate(ctx context.Context, app *v1alph
} }
resp, r, err := c.authentik.CoreApi.CoreApplicationsPartialUpdate(ctx, app.Spec.Slug).PatchedApplicationRequest(*patchedApplicationRequest).Execute() resp, r, err := c.authentik.CoreApi.CoreApplicationsPartialUpdate(ctx, app.Spec.Slug).PatchedApplicationRequest(*patchedApplicationRequest).Execute()
if err != nil { if err != nil {
return fmt.Errorf("error when calling `CoreAPI.CoreApplicationsPartialUpdate`: %w with response %v", err, r) return fmt.Errorf("error when calling `ProvidersAPI.ProvidersProxyPartialUpdate`: %w with response %v", err, r)
} }
app.Status.PK = resp.Pk app.Status.PK = resp.Pk
return c.updateApplicationStatus(ctx, app) return c.updateApplicationStatus(ctx, app)
} }
func (c *ApplicationController) reconcileCreate(ctx context.Context, app *v1alpha1.Application) error { func (c *Controller) reconcileCreate(ctx context.Context, app *v1alpha1.Application) error {
applicationRequest := &authentikapi.ApplicationRequest{ applicationRequest := &authentikapi.ApplicationRequest{
Name: app.Spec.Name, Name: app.Spec.Name,
Slug: app.Spec.Slug, Slug: app.Spec.Slug,
@@ -213,14 +260,23 @@ func (c *ApplicationController) reconcileCreate(ctx context.Context, app *v1alph
return c.updateApplicationStatus(ctx, app) return c.updateApplicationStatus(ctx, app)
} }
func (c *ApplicationController) updateApplicationStatus(ctx context.Context, app *v1alpha1.Application) error { func (c *Controller) enqueueApplication(obj interface{}) {
objectRef, err := cache.ObjectToName(obj)
if err != nil {
utilruntime.HandleError(err)
return
}
c.workqueue.Add(objectRef)
}
func (c *Controller) updateApplicationStatus(ctx context.Context, app *v1alpha1.Application) error {
appCopy := app.DeepCopy() appCopy := app.DeepCopy()
_, err := c.applicationClientset.ApplicationV1alpha1().Applications(appCopy.Namespace).UpdateStatus(ctx, appCopy, metav1.UpdateOptions{FieldManager: FieldManager}) _, err := c.applicationClientset.ApplicationV1alpha1().Applications(appCopy.Namespace).UpdateStatus(ctx, appCopy, metav1.UpdateOptions{FieldManager: FieldManager})
return err return err
} }
// Update metadata, spec, etc. of the Application object. // Update metadata, spec, etc. of the Application object.
func (c *ApplicationController) updateApplication(ctx context.Context, app *v1alpha1.Application) error { func (c *Controller) updateApplication(ctx context.Context, app *v1alpha1.Application) error {
appCopy := app.DeepCopy() appCopy := app.DeepCopy()
_, err := c.applicationClientset.ApplicationV1alpha1().Applications(appCopy.Namespace).Update(ctx, appCopy, metav1.UpdateOptions{FieldManager: FieldManager}) _, err := c.applicationClientset.ApplicationV1alpha1().Applications(appCopy.Namespace).Update(ctx, appCopy, metav1.UpdateOptions{FieldManager: FieldManager})
if err != nil { if err != nil {
+17 -3
View File
@@ -220,6 +220,20 @@ func TestController_syncHandler_invalidPK(t *testing.T) {
} }
} }
func TestController_enqueueApplication(t *testing.T) {
server := newAuthentikTestServer(t, authentikTestHandlers{})
t.Cleanup(server.Close)
ctrl, _, cancel := newTestController(t, testApplication(), server.URL)
t.Cleanup(cancel)
ctrl.enqueueApplication(testApplication())
if ctrl.workqueue.Len() != 1 {
t.Fatalf("workqueue length = %d, want 1", ctrl.workqueue.Len())
}
}
// --- test helpers --- // --- test helpers ---
func testApplication() *v1alpha1.Application { func testApplication() *v1alpha1.Application {
@@ -240,7 +254,7 @@ func testApplication() *v1alpha1.Application {
} }
} }
func newTestController(t *testing.T, app *v1alpha1.Application, authentikURL string) (*ApplicationController, context.Context, context.CancelFunc) { func newTestController(t *testing.T, app *v1alpha1.Application, authentikURL string) (*Controller, context.Context, context.CancelFunc) {
t.Helper() t.Helper()
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
ctrl, _, stop := newTestControllerWithContext(t, ctx, app, authentikURL) ctrl, _, stop := newTestControllerWithContext(t, ctx, app, authentikURL)
@@ -250,7 +264,7 @@ func newTestController(t *testing.T, app *v1alpha1.Application, authentikURL str
} }
} }
func newTestControllerWithContext(t *testing.T, ctx context.Context, app *v1alpha1.Application, authentikURL string) (*ApplicationController, context.Context, func()) { func newTestControllerWithContext(t *testing.T, ctx context.Context, app *v1alpha1.Application, authentikURL string) (*Controller, context.Context, func()) {
t.Helper() t.Helper()
authentikClient := newAuthentikAPIClientForTest(t, authentikURL) authentikClient := newAuthentikAPIClientForTest(t, authentikURL)
@@ -368,7 +382,7 @@ func writeJSON(t *testing.T, w http.ResponseWriter, status int, body any) {
} }
} }
func getApplication(t *testing.T, ctrl *ApplicationController, namespace, name string) *v1alpha1.Application { func getApplication(t *testing.T, ctrl *Controller, namespace, name string) *v1alpha1.Application {
t.Helper() t.Helper()
got, err := ctrl.applicationClientset.ApplicationV1alpha1().Applications(namespace).Get( got, err := ctrl.applicationClientset.ApplicationV1alpha1().Applications(namespace).Get(
-94
View File
@@ -1,94 +0,0 @@
package controllers
import (
"context"
"fmt"
"time"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
)
type SyncHandler func(ctx context.Context, objRef cache.ObjectName) error
type Controller struct {
workqueue workqueue.TypedRateLimitingInterface[cache.ObjectName]
recorder record.EventRecorder
synced cache.InformerSynced
syncHandler SyncHandler
}
func NewController(
ctx context.Context,
workqueue workqueue.TypedRateLimitingInterface[cache.ObjectName],
recorder record.EventRecorder,
synced cache.InformerSynced,
syncHandler SyncHandler,
) *Controller {
return &Controller{
workqueue: workqueue,
recorder: recorder,
synced: synced,
syncHandler: syncHandler,
}
}
func (c *Controller) Run(ctx context.Context, workers int) error {
defer utilruntime.HandleCrash()
defer c.workqueue.ShutDown()
logger := klog.FromContext(ctx)
logger.Info("Starting PolicyBinding controller")
logger.Info("Waiting for informer caches to sync")
if ok := cache.WaitForCacheSync(ctx.Done(), c.synced); !ok {
return fmt.Errorf("failed to wait for caches to sync")
}
logger.Info("Starting workers", "count", workers)
for i := 0; i < workers; i++ {
go wait.UntilWithContext(ctx, c.runWorker, time.Second)
}
logger.Info("Started workers")
<-ctx.Done()
logger.Info("Shutting down workers")
return nil
}
func (c *Controller) runWorker(ctx context.Context) {
for c.processNextWorkItem(ctx) {
}
}
func (c *Controller) processNextWorkItem(ctx context.Context) bool {
objRef, shutdown := c.workqueue.Get()
logger := klog.FromContext(ctx)
if shutdown {
return false
}
defer c.workqueue.Done(objRef)
err := c.syncHandler(ctx, objRef)
if err == nil {
c.workqueue.Forget(objRef)
logger.Info("Successfully synced", "objectName", objRef)
return true
}
utilruntime.HandleErrorWithContext(ctx, err, "Error syncing; requeuing for later retry", "objectReference", objRef)
c.workqueue.AddRateLimited(objRef)
return true
}
func (c *Controller) Enqueue(obj interface{}) {
objectRef, err := cache.ObjectToName(obj)
if err != nil {
utilruntime.HandleError(err)
return
}
c.workqueue.Add(objectRef)
}
-190
View File
@@ -1,190 +0,0 @@
// AI generated tests and not yet reviewed.
package controllers
import (
"context"
"errors"
"sync/atomic"
"testing"
"time"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
)
func newTestController(t *testing.T, synced cache.InformerSynced, syncHandler SyncHandler) (*Controller, workqueue.TypedRateLimitingInterface[cache.ObjectName]) {
t.Helper()
ratelimiter := workqueue.NewTypedItemExponentialFailureRateLimiter[cache.ObjectName](time.Millisecond, time.Second)
q := workqueue.NewTypedRateLimitingQueue(ratelimiter)
t.Cleanup(q.ShutDown)
if synced == nil {
synced = func() bool { return true }
}
ctrl := NewController(
context.Background(),
q,
record.NewFakeRecorder(10),
synced,
syncHandler,
)
return ctrl, q
}
func TestController_processNextWorkItem_success(t *testing.T) {
objRef := cache.ObjectName{Namespace: "default", Name: "test"}
var syncedRef cache.ObjectName
ctrl, q := newTestController(t, nil, func(_ context.Context, ref cache.ObjectName) error {
syncedRef = ref
return nil
})
q.Add(objRef)
if !ctrl.processNextWorkItem(context.Background()) {
t.Fatal("processNextWorkItem() = false, want true")
}
if syncedRef != objRef {
t.Fatalf("syncHandler object = %+v, want %+v", syncedRef, objRef)
}
if q.Len() != 0 {
t.Fatalf("queue length = %d, want 0 after successful sync", q.Len())
}
if q.NumRequeues(objRef) != 0 {
t.Fatalf("requeues = %d, want 0 after successful sync", q.NumRequeues(objRef))
}
}
func TestController_processNextWorkItem_syncError(t *testing.T) {
objRef := cache.ObjectName{Namespace: "default", Name: "test"}
syncErr := errors.New("sync failed")
ctrl, q := newTestController(t, nil, func(context.Context, cache.ObjectName) error {
return syncErr
})
q.Add(objRef)
if !ctrl.processNextWorkItem(context.Background()) {
t.Fatal("processNextWorkItem() = false, want true")
}
if q.NumRequeues(objRef) != 1 {
t.Fatalf("requeues = %d, want 1 after sync error", q.NumRequeues(objRef))
}
}
func TestController_processNextWorkItem_shutdown(t *testing.T) {
ctrl, q := newTestController(t, nil, func(context.Context, cache.ObjectName) error {
return nil
})
q.ShutDown()
if ctrl.processNextWorkItem(context.Background()) {
t.Fatal("processNextWorkItem() = true, want false on shutdown")
}
}
func TestController_Enqueue(t *testing.T) {
ctrl, q := newTestController(t, nil, func(context.Context, cache.ObjectName) error {
return nil
})
obj := &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "test",
},
}
ctrl.Enqueue(obj)
if q.Len() != 1 {
t.Fatalf("queue length = %d, want 1 after Enqueue", q.Len())
}
}
func TestController_Enqueue_invalidObject(t *testing.T) {
ctrl, q := newTestController(t, nil, func(context.Context, cache.ObjectName) error {
return nil
})
ctrl.Enqueue("not-a-kubernetes-object")
if q.Len() != 0 {
t.Fatalf("queue length = %d, want 0 for invalid object", q.Len())
}
}
func TestController_Run_cacheSyncFails(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ctrl, _ := newTestController(t, func() bool { return false }, func(context.Context, cache.ObjectName) error {
return nil
})
go func() {
time.Sleep(10 * time.Millisecond)
cancel()
}()
err := ctrl.Run(ctx, 1)
if err == nil {
t.Fatal("Run() error = nil, want cache sync failure")
}
}
func TestController_Run_shutsDownOnCancel(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
ctrl, _ := newTestController(t, nil, func(context.Context, cache.ObjectName) error {
return nil
})
errCh := make(chan error, 1)
go func() {
errCh <- ctrl.Run(ctx, 1)
}()
time.Sleep(50 * time.Millisecond)
cancel()
select {
case err := <-errCh:
if err != nil {
t.Fatalf("Run() error = %v, want nil on context cancel", err)
}
case <-time.After(2 * time.Second):
t.Fatal("Run() did not return after context cancellation")
}
}
func TestController_runWorker_processesQueuedItem(t *testing.T) {
objRef := cache.ObjectName{Namespace: "default", Name: "test"}
var calls atomic.Int32
ctrl, q := newTestController(t, nil, func(context.Context, cache.ObjectName) error {
calls.Add(1)
return nil
})
q.Add(objRef)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go ctrl.runWorker(ctx)
deadline := time.Now().Add(2 * time.Second)
for time.Now().Before(deadline) {
if calls.Load() == 1 && q.Len() == 0 {
cancel()
return
}
time.Sleep(5 * time.Millisecond)
}
cancel()
t.Fatalf("runWorker did not process queued item: calls=%d queueLen=%d", calls.Load(), q.Len())
}
+89 -47
View File
@@ -21,6 +21,7 @@ import (
"fmt" "fmt"
"net/http" "net/http"
"slices" "slices"
"strconv"
"time" "time"
"golang.org/x/time/rate" "golang.org/x/time/rate"
@@ -29,6 +30,7 @@ import (
"k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/kubernetes/scheme"
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
@@ -38,7 +40,6 @@ import (
"k8s.io/klog/v2" "k8s.io/klog/v2"
v1alpha1 "gitea.t000-n.de/t.behrendt/authentik-kubernetes-operator/pkg/apis/policybinding/v1alpha1" v1alpha1 "gitea.t000-n.de/t.behrendt/authentik-kubernetes-operator/pkg/apis/policybinding/v1alpha1"
controllers "gitea.t000-n.de/t.behrendt/authentik-kubernetes-operator/pkg/controllers"
clientset "gitea.t000-n.de/t.behrendt/authentik-kubernetes-operator/pkg/generated/clientset/versioned" clientset "gitea.t000-n.de/t.behrendt/authentik-kubernetes-operator/pkg/generated/clientset/versioned"
operatorscheme "gitea.t000-n.de/t.behrendt/authentik-kubernetes-operator/pkg/generated/clientset/versioned/scheme" operatorscheme "gitea.t000-n.de/t.behrendt/authentik-kubernetes-operator/pkg/generated/clientset/versioned/scheme"
informers "gitea.t000-n.de/t.behrendt/authentik-kubernetes-operator/pkg/generated/informers/externalversions/policybinding/v1alpha1" informers "gitea.t000-n.de/t.behrendt/authentik-kubernetes-operator/pkg/generated/informers/externalversions/policybinding/v1alpha1"
@@ -61,14 +62,16 @@ const (
DeleteAuthentikPolicyBindingFinalizer = "policybinding.t000-n.de/delete-authentik-policybinding" DeleteAuthentikPolicyBindingFinalizer = "policybinding.t000-n.de/delete-authentik-policybinding"
) )
type PolicyBindingController struct { type Controller struct {
kubeclientset kubernetes.Interface kubeclientset kubernetes.Interface
policyBindingClientset clientset.Interface policyBindingClientset clientset.Interface
authentik *authentikapi.APIClient authentik *authentikapi.APIClient
policyBindingListener listers.PolicyBindingLister policyBindingListener listers.PolicyBindingLister
policyBindingSynced cache.InformerSynced
controller *controllers.Controller workqueue workqueue.TypedRateLimitingInterface[cache.ObjectName]
recorder record.EventRecorder
} }
func NewController( func NewController(
@@ -77,7 +80,7 @@ func NewController(
policyBindingClientset clientset.Interface, policyBindingClientset clientset.Interface,
authentik *authentikapi.APIClient, authentik *authentikapi.APIClient,
policyBindingInformer informers.PolicyBindingInformer, policyBindingInformer informers.PolicyBindingInformer,
) *PolicyBindingController { ) *Controller {
logger := klog.FromContext(ctx) logger := klog.FromContext(ctx)
utilruntime.Must(operatorscheme.AddToScheme(scheme.Scheme)) utilruntime.Must(operatorscheme.AddToScheme(scheme.Scheme))
@@ -92,36 +95,75 @@ func NewController(
&workqueue.TypedBucketRateLimiter[cache.ObjectName]{Limiter: rate.NewLimiter(rate.Limit(50), 300)}, &workqueue.TypedBucketRateLimiter[cache.ObjectName]{Limiter: rate.NewLimiter(rate.Limit(50), 300)},
) )
c := &PolicyBindingController{ c := &Controller{
kubeclientset: kubeclientset, kubeclientset: kubeclientset,
policyBindingClientset: policyBindingClientset, policyBindingClientset: policyBindingClientset,
authentik: authentik, authentik: authentik,
policyBindingListener: policyBindingInformer.Lister(), policyBindingListener: policyBindingInformer.Lister(),
policyBindingSynced: policyBindingInformer.Informer().HasSynced,
workqueue: workqueue.NewTypedRateLimitingQueue(ratelimiter),
recorder: recorder,
} }
c.controller = controllers.NewController(
ctx,
workqueue.NewTypedRateLimitingQueue(ratelimiter),
recorder,
policyBindingInformer.Informer().HasSynced,
c.syncHandler,
)
logger.Info("Setting up event handlers") logger.Info("Setting up event handlers")
policyBindingInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ policyBindingInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.controller.Enqueue, AddFunc: c.enqueuePolicyBinding,
UpdateFunc: func(_, newObj interface{}) { UpdateFunc: func(_, newObj interface{}) {
c.controller.Enqueue(newObj) c.enqueuePolicyBinding(newObj)
}, },
}) })
return c return c
} }
func (c *PolicyBindingController) Run(ctx context.Context, workers int) error { func (c *Controller) Run(ctx context.Context, workers int) error {
return c.controller.Run(ctx, workers) defer utilruntime.HandleCrash()
defer c.workqueue.ShutDown()
logger := klog.FromContext(ctx)
logger.Info("Starting PolicyBinding controller")
logger.Info("Waiting for informer caches to sync")
if ok := cache.WaitForCacheSync(ctx.Done(), c.policyBindingSynced); !ok {
return fmt.Errorf("failed to wait for caches to sync")
}
logger.Info("Starting workers", "count", workers)
for i := 0; i < workers; i++ {
go wait.UntilWithContext(ctx, c.runWorker, time.Second)
}
logger.Info("Started workers")
<-ctx.Done()
logger.Info("Shutting down workers")
return nil
} }
func (c *PolicyBindingController) syncHandler(ctx context.Context, objectRef cache.ObjectName) error { func (c *Controller) runWorker(ctx context.Context) {
for c.processNextWorkItem(ctx) {
}
}
func (c *Controller) processNextWorkItem(ctx context.Context) bool {
objRef, shutdown := c.workqueue.Get()
logger := klog.FromContext(ctx)
if shutdown {
return false
}
defer c.workqueue.Done(objRef)
err := c.syncHandler(ctx, objRef)
if err == nil {
c.workqueue.Forget(objRef)
logger.Info("Successfully synced", "objectName", objRef)
return true
}
utilruntime.HandleErrorWithContext(ctx, err, "Error syncing; requeuing for later retry", "objectReference", objRef)
c.workqueue.AddRateLimited(objRef)
return true
}
func (c *Controller) syncHandler(ctx context.Context, objectRef cache.ObjectName) error {
logger := klog.LoggerWithValues(klog.FromContext(ctx), "objectRef", objectRef) logger := klog.LoggerWithValues(klog.FromContext(ctx), "objectRef", objectRef)
pb, err := c.policyBindingListener.PolicyBindings(objectRef.Namespace).Get(objectRef.Name) pb, err := c.policyBindingListener.PolicyBindings(objectRef.Namespace).Get(objectRef.Name)
@@ -154,17 +196,22 @@ func (c *PolicyBindingController) syncHandler(ctx context.Context, objectRef cac
return c.reconcileUpdate(ctx, pb) return c.reconcileUpdate(ctx, pb)
} }
func (c *PolicyBindingController) ensureFinalizers(ctx context.Context, pb *v1alpha1.PolicyBinding) error { func (c *Controller) ensureFinalizers(ctx context.Context, pb *v1alpha1.PolicyBinding) error {
pb.ObjectMeta.Finalizers = append(pb.ObjectMeta.Finalizers, DeleteAuthentikPolicyBindingFinalizer) pb.ObjectMeta.Finalizers = append(pb.ObjectMeta.Finalizers, DeleteAuthentikPolicyBindingFinalizer)
return c.updatePolicyBinding(ctx, pb) return c.updatePolicyBinding(ctx, pb)
} }
func (c *PolicyBindingController) reconcileDelete(ctx context.Context, pb *v1alpha1.PolicyBinding) error { func (c *Controller) reconcileDelete(ctx context.Context, pb *v1alpha1.PolicyBinding) error {
r, err := c.authentik.PoliciesApi.PoliciesBindingsDestroy(ctx, pb.Status.PK).Execute() pk, err := strconv.ParseInt(pb.Status.PK, 10, 32)
if err != nil { if err != nil {
// This handles an edge-case, where when the PolicyBinding on Authentik has already been deleted, but the finalizer is still present. We just remove the finalizer and return. return fmt.Errorf("error parsing PK: %v", err)
}
r, err := c.authentik.ProvidersApi.ProvidersProxyDestroy(ctx, int32(pk)).Execute()
if err != nil {
// This handles an edge-case, where when the ProxyProvider on Authentik has already been deleted, but the finalizer is still present. We just remove the finalizer and return.
if r != nil && r.StatusCode != http.StatusNotFound { if r != nil && r.StatusCode != http.StatusNotFound {
return fmt.Errorf("error when calling `PoliciesAPI.PoliciesBindingsDestroy`: %w with response %v", err, r) return fmt.Errorf("error when calling `ProvidersAPI.ProvidersProxyDestroy`: %w with response %v", err, r)
} }
} }
@@ -172,7 +219,7 @@ func (c *PolicyBindingController) reconcileDelete(ctx context.Context, pb *v1alp
return c.updatePolicyBinding(ctx, pb) return c.updatePolicyBinding(ctx, pb)
} }
func (c *PolicyBindingController) reconcileUpdate(ctx context.Context, pb *v1alpha1.PolicyBinding) error { func (c *Controller) reconcileUpdate(ctx context.Context, pb *v1alpha1.PolicyBinding) error {
_, r, err := c.authentik.PoliciesApi.PoliciesBindingsRetrieve(ctx, pb.Status.PK).Execute() _, r, err := c.authentik.PoliciesApi.PoliciesBindingsRetrieve(ctx, pb.Status.PK).Execute()
if err != nil { if err != nil {
if r != nil && r.StatusCode == http.StatusNotFound { if r != nil && r.StatusCode == http.StatusNotFound {
@@ -185,43 +232,29 @@ func (c *PolicyBindingController) reconcileUpdate(ctx context.Context, pb *v1alp
} }
patchedPolicyBindingRequest := &authentikapi.PatchedPolicyBindingRequest{ patchedPolicyBindingRequest := &authentikapi.PatchedPolicyBindingRequest{
Policy: *authentikapi.NewNullableString(&pb.Spec.Policy),
Group: *authentikapi.NewNullableString(&pb.Spec.Group),
User: *authentikapi.NewNullableInt32(&pb.Spec.User),
Target: &pb.Spec.Target, Target: &pb.Spec.Target,
Order: &pb.Spec.Order, Order: &pb.Spec.Order,
} }
if pb.Spec.Policy != "" {
patchedPolicyBindingRequest.SetPolicy(pb.Spec.Policy)
}
if pb.Spec.Group != "" {
patchedPolicyBindingRequest.SetGroup(pb.Spec.Group)
}
if pb.Spec.User != 0 {
patchedPolicyBindingRequest.SetUser(pb.Spec.User)
}
resp, r, err := c.authentik.PoliciesApi.PoliciesBindingsPartialUpdate(ctx, pb.Status.PK).PatchedPolicyBindingRequest(*patchedPolicyBindingRequest).Execute() resp, r, err := c.authentik.PoliciesApi.PoliciesBindingsPartialUpdate(ctx, pb.Status.PK).PatchedPolicyBindingRequest(*patchedPolicyBindingRequest).Execute()
if err != nil { if err != nil {
return fmt.Errorf("error when calling `PoliciesAPI.PoliciesBindingsPartialUpdate`: %w with response %v", err, r) return fmt.Errorf("error when calling `ProvidersAPI.ProvidersProxyPartialUpdate`: %w with response %v", err, r)
} }
pb.Status.PK = resp.Pk pb.Status.PK = resp.Pk
return c.updatePolicyBindingStatus(ctx, pb) return c.updatePolicyBindingStatus(ctx, pb)
} }
func (c *PolicyBindingController) reconcileCreate(ctx context.Context, pb *v1alpha1.PolicyBinding) error { func (c *Controller) reconcileCreate(ctx context.Context, pb *v1alpha1.PolicyBinding) error {
policyBindingRequest := &authentikapi.PolicyBindingRequest{ policyBindingRequest := &authentikapi.PolicyBindingRequest{
Policy: *authentikapi.NewNullableString(&pb.Spec.Policy),
Group: *authentikapi.NewNullableString(&pb.Spec.Group),
User: *authentikapi.NewNullableInt32(&pb.Spec.User),
Target: pb.Spec.Target, Target: pb.Spec.Target,
Order: pb.Spec.Order, Order: pb.Spec.Order,
} }
if pb.Spec.Policy != "" {
policyBindingRequest.SetPolicy(pb.Spec.Policy)
}
if pb.Spec.Group != "" {
policyBindingRequest.SetGroup(pb.Spec.Group)
}
if pb.Spec.User != 0 {
policyBindingRequest.SetUser(pb.Spec.User)
}
resp, r, err := c.authentik.PoliciesApi.PoliciesBindingsCreate(ctx).PolicyBindingRequest(*policyBindingRequest).Execute() resp, r, err := c.authentik.PoliciesApi.PoliciesBindingsCreate(ctx).PolicyBindingRequest(*policyBindingRequest).Execute()
if err != nil { if err != nil {
return fmt.Errorf("error when calling `PoliciesAPI.PoliciesBindingsCreate`: %w with response %v", err, r) return fmt.Errorf("error when calling `PoliciesAPI.PoliciesBindingsCreate`: %w with response %v", err, r)
@@ -231,14 +264,23 @@ func (c *PolicyBindingController) reconcileCreate(ctx context.Context, pb *v1alp
return c.updatePolicyBindingStatus(ctx, pb) return c.updatePolicyBindingStatus(ctx, pb)
} }
func (c *PolicyBindingController) updatePolicyBindingStatus(ctx context.Context, pb *v1alpha1.PolicyBinding) error { func (c *Controller) enqueuePolicyBinding(obj interface{}) {
objectRef, err := cache.ObjectToName(obj)
if err != nil {
utilruntime.HandleError(err)
return
}
c.workqueue.Add(objectRef)
}
func (c *Controller) updatePolicyBindingStatus(ctx context.Context, pb *v1alpha1.PolicyBinding) error {
pbCopy := pb.DeepCopy() pbCopy := pb.DeepCopy()
_, err := c.policyBindingClientset.PolicyBindingV1alpha1().PolicyBindings(pbCopy.Namespace).UpdateStatus(ctx, pbCopy, metav1.UpdateOptions{FieldManager: FieldManager}) _, err := c.policyBindingClientset.PolicyBindingV1alpha1().PolicyBindings(pbCopy.Namespace).UpdateStatus(ctx, pbCopy, metav1.UpdateOptions{FieldManager: FieldManager})
return err return err
} }
// Update metadata, spec, etc. of the PolicyBinding object. // Update metadata, spec, etc. of the PolicyBinding object.
func (c *PolicyBindingController) updatePolicyBinding(ctx context.Context, pb *v1alpha1.PolicyBinding) error { func (c *Controller) updatePolicyBinding(ctx context.Context, pb *v1alpha1.PolicyBinding) error {
pbCopy := pb.DeepCopy() pbCopy := pb.DeepCopy()
_, err := c.policyBindingClientset.PolicyBindingV1alpha1().PolicyBindings(pbCopy.Namespace).Update(ctx, pbCopy, metav1.UpdateOptions{FieldManager: FieldManager}) _, err := c.policyBindingClientset.PolicyBindingV1alpha1().PolicyBindings(pbCopy.Namespace).Update(ctx, pbCopy, metav1.UpdateOptions{FieldManager: FieldManager})
return err return err
@@ -1,349 +0,0 @@
// AI generated tests and not yet reviewed.
package policybinding
import (
"context"
"encoding/json"
"net/http"
"net/http/httptest"
"net/url"
"slices"
"strings"
"testing"
v1alpha1 "gitea.t000-n.de/t.behrendt/authentik-kubernetes-operator/pkg/apis/policybinding/v1alpha1"
operatorfake "gitea.t000-n.de/t.behrendt/authentik-kubernetes-operator/pkg/generated/clientset/versioned/fake"
operatorinformers "gitea.t000-n.de/t.behrendt/authentik-kubernetes-operator/pkg/generated/informers/externalversions"
authentikapi "goauthentik.io/api/v3"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/tools/cache"
)
func TestController_syncHandler_create(t *testing.T) {
const wantPK = "42"
server := newAuthentikTestServer(t, authentikTestHandlers{
policyBindingCreate: func(w http.ResponseWriter, _ *http.Request) {
writeJSON(t, w, http.StatusCreated, map[string]any{"pk": wantPK})
},
})
t.Cleanup(server.Close)
ctrl, ctx, cancel := newTestController(t, testPolicyBinding(), server.URL)
t.Cleanup(cancel)
err := ctrl.syncHandler(ctx, cache.ObjectName{Namespace: "default", Name: "test-pb"})
if err != nil {
t.Fatalf("syncHandler() error = %v", err)
}
got := getPolicyBinding(t, ctrl, "default", "test-pb")
if got.Status.PK != wantPK {
t.Fatalf("status.pk = %q, want %q", got.Status.PK, wantPK)
}
}
func TestController_syncHandler_ensureFinalizers(t *testing.T) {
pb := testPolicyBinding()
pb.Status.PK = "42"
server := newAuthentikTestServer(t, authentikTestHandlers{})
t.Cleanup(server.Close)
ctrl, ctx, cancel := newTestController(t, pb, server.URL)
t.Cleanup(cancel)
err := ctrl.syncHandler(ctx, cache.ObjectName{Namespace: pb.Namespace, Name: pb.Name})
if err != nil {
t.Fatalf("syncHandler() error = %v", err)
}
got := getPolicyBinding(t, ctrl, pb.Namespace, pb.Name)
if !slices.Contains(got.Finalizers, DeleteAuthentikPolicyBindingFinalizer) {
t.Fatalf("finalizers = %v, want %q", got.Finalizers, DeleteAuthentikPolicyBindingFinalizer)
}
}
func TestController_syncHandler_update(t *testing.T) {
pb := testPolicyBinding()
pb.Status.PK = "42"
pb.Finalizers = []string{DeleteAuthentikPolicyBindingFinalizer}
server := newAuthentikTestServer(t, authentikTestHandlers{
policyBindingRetrieve: func(w http.ResponseWriter, _ *http.Request) {
writeJSON(t, w, http.StatusOK, map[string]any{"pk": "42"})
},
policyBindingPartialUpdate: func(w http.ResponseWriter, _ *http.Request) {
writeJSON(t, w, http.StatusOK, map[string]any{"pk": "42"})
},
})
t.Cleanup(server.Close)
ctrl, ctx, cancel := newTestController(t, pb, server.URL)
t.Cleanup(cancel)
err := ctrl.syncHandler(ctx, cache.ObjectName{Namespace: pb.Namespace, Name: pb.Name})
if err != nil {
t.Fatalf("syncHandler() error = %v", err)
}
got := getPolicyBinding(t, ctrl, pb.Namespace, pb.Name)
if got.Status.PK != "42" {
t.Fatalf("status.pk = %q, want 42", got.Status.PK)
}
}
func TestController_syncHandler_update_policyBindingNotFound(t *testing.T) {
pb := testPolicyBinding()
pb.Status.PK = "42"
pb.Finalizers = []string{DeleteAuthentikPolicyBindingFinalizer}
server := newAuthentikTestServer(t, authentikTestHandlers{
policyBindingRetrieve: func(w http.ResponseWriter, _ *http.Request) {
http.NotFound(w, nil)
},
})
t.Cleanup(server.Close)
ctrl, ctx, cancel := newTestController(t, pb, server.URL)
t.Cleanup(cancel)
err := ctrl.syncHandler(ctx, cache.ObjectName{Namespace: pb.Namespace, Name: pb.Name})
if err != nil {
t.Fatalf("syncHandler() error = %v", err)
}
got := getPolicyBinding(t, ctrl, pb.Namespace, pb.Name)
if got.Status.PK != "" {
t.Fatalf("status.pk = %q, want empty after policy binding not found", got.Status.PK)
}
}
func TestController_syncHandler_delete(t *testing.T) {
now := metav1.Now()
pb := testPolicyBinding()
pb.Status.PK = "42"
pb.DeletionTimestamp = &now
pb.Finalizers = []string{DeleteAuthentikPolicyBindingFinalizer}
var destroyCalled bool
server := newAuthentikTestServer(t, authentikTestHandlers{
policyBindingDestroy: func(w http.ResponseWriter, r *http.Request) {
destroyCalled = true
if r.Method != http.MethodDelete {
t.Errorf("destroy method = %s, want DELETE", r.Method)
}
w.WriteHeader(http.StatusNoContent)
},
})
t.Cleanup(server.Close)
ctrl, ctx, cancel := newTestController(t, pb, server.URL)
t.Cleanup(cancel)
err := ctrl.syncHandler(ctx, cache.ObjectName{Namespace: pb.Namespace, Name: pb.Name})
if err != nil {
t.Fatalf("syncHandler() error = %v", err)
}
if !destroyCalled {
t.Fatal("expected Authentik destroy call")
}
got := getPolicyBinding(t, ctrl, pb.Namespace, pb.Name)
if slices.Contains(got.Finalizers, DeleteAuthentikPolicyBindingFinalizer) {
t.Fatalf("finalizers = %v, want finalizer removed", got.Finalizers)
}
}
func TestController_syncHandler_delete_policyBindingAlreadyGone(t *testing.T) {
now := metav1.Now()
pb := testPolicyBinding()
pb.Status.PK = "42"
pb.DeletionTimestamp = &now
pb.Finalizers = []string{DeleteAuthentikPolicyBindingFinalizer}
server := newAuthentikTestServer(t, authentikTestHandlers{
policyBindingDestroy: func(w http.ResponseWriter, _ *http.Request) {
http.NotFound(w, nil)
},
})
t.Cleanup(server.Close)
ctrl, ctx, cancel := newTestController(t, pb, server.URL)
t.Cleanup(cancel)
err := ctrl.syncHandler(ctx, cache.ObjectName{Namespace: pb.Namespace, Name: pb.Name})
if err != nil {
t.Fatalf("syncHandler() error = %v", err)
}
got := getPolicyBinding(t, ctrl, pb.Namespace, pb.Name)
if slices.Contains(got.Finalizers, DeleteAuthentikPolicyBindingFinalizer) {
t.Fatalf("finalizers = %v, want finalizer removed after 404", got.Finalizers)
}
}
func TestController_syncHandler_notFound(t *testing.T) {
server := newAuthentikTestServer(t, authentikTestHandlers{})
t.Cleanup(server.Close)
ctrl, ctx, cancel := newTestController(t, nil, server.URL)
t.Cleanup(cancel)
err := ctrl.syncHandler(ctx, cache.ObjectName{Namespace: "default", Name: "missing"})
if err != nil {
t.Fatalf("syncHandler() error = %v, want nil for missing object", err)
}
}
// --- test helpers ---
func testPolicyBinding() *v1alpha1.PolicyBinding {
return &v1alpha1.PolicyBinding{
TypeMeta: metav1.TypeMeta{
APIVersion: v1alpha1.SchemeGroupVersion.String(),
Kind: "PolicyBinding",
},
ObjectMeta: metav1.ObjectMeta{
Name: "test-pb",
Namespace: "default",
},
Spec: v1alpha1.PolicyBindingSpec{
Group: "14ab813f-a7f9-481b-9b08-781953ae9ebf",
Target: "8dd85627-9c48-49c2-8afc-d73dd122ffc2",
Order: 1,
},
}
}
func newTestController(t *testing.T, pb *v1alpha1.PolicyBinding, authentikURL string) (*PolicyBindingController, context.Context, context.CancelFunc) {
t.Helper()
ctx, cancel := context.WithCancel(context.Background())
ctrl, _, stop := newTestControllerWithContext(t, ctx, pb, authentikURL)
return ctrl, ctx, func() {
cancel()
stop()
}
}
func newTestControllerWithContext(t *testing.T, ctx context.Context, pb *v1alpha1.PolicyBinding, authentikURL string) (*PolicyBindingController, context.Context, func()) {
t.Helper()
authentikClient := newAuthentikAPIClientForTest(t, authentikURL)
var objects []runtime.Object
if pb != nil {
objects = append(objects, pb)
}
policyBindingClient := operatorfake.NewSimpleClientset(objects...)
informerFactory := operatorinformers.NewSharedInformerFactory(policyBindingClient, 0)
policyBindingInformer := informerFactory.PolicyBinding().V1alpha1().PolicyBindings()
ctrl := NewController(ctx, fake.NewClientset(), policyBindingClient, authentikClient, policyBindingInformer)
informerFactory.Start(ctx.Done())
for informerType, synced := range informerFactory.WaitForCacheSync(ctx.Done()) {
if !synced {
t.Fatalf("informer %v failed to sync", informerType)
}
}
return ctrl, ctx, func() {}
}
func newAuthentikAPIClientForTest(t *testing.T, serverURL string) *authentikapi.APIClient {
t.Helper()
u, err := url.Parse(serverURL)
if err != nil {
t.Fatalf("parse server URL: %v", err)
}
cfg := authentikapi.NewConfiguration()
cfg.Scheme = u.Scheme
cfg.Host = u.Host
return authentikapi.NewAPIClient(cfg)
}
type authentikTestHandlers struct {
policyBindingCreate http.HandlerFunc
policyBindingRetrieve http.HandlerFunc
policyBindingPartialUpdate http.HandlerFunc
policyBindingDestroy http.HandlerFunc
}
func newAuthentikTestServer(t *testing.T, handlers authentikTestHandlers) *httptest.Server {
t.Helper()
handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
path := r.URL.Path
switch {
case path == "/api/v3/policies/bindings/" && r.Method == http.MethodPost:
if handlers.policyBindingCreate != nil {
handlers.policyBindingCreate(w, r)
return
}
http.NotFound(w, r)
case strings.HasPrefix(path, "/api/v3/policies/bindings/") && strings.HasSuffix(path, "/"):
idPath := strings.TrimPrefix(path, "/api/v3/policies/bindings/")
if idPath == "" {
http.NotFound(w, r)
return
}
switch r.Method {
case http.MethodGet:
if handlers.policyBindingRetrieve != nil {
handlers.policyBindingRetrieve(w, r)
return
}
http.NotFound(w, r)
case http.MethodPatch:
if handlers.policyBindingPartialUpdate != nil {
handlers.policyBindingPartialUpdate(w, r)
return
}
http.NotFound(w, r)
case http.MethodDelete:
if handlers.policyBindingDestroy != nil {
handlers.policyBindingDestroy(w, r)
return
}
http.NotFound(w, r)
default:
http.Error(w, "unexpected method on policy binding instance", http.StatusMethodNotAllowed)
}
default:
http.NotFound(w, r)
}
})
return httptest.NewServer(handler)
}
func writeJSON(t *testing.T, w http.ResponseWriter, status int, body any) {
t.Helper()
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(status)
if err := json.NewEncoder(w).Encode(body); err != nil {
t.Fatalf("write JSON response: %v", err)
}
}
func getPolicyBinding(t *testing.T, ctrl *PolicyBindingController, namespace, name string) *v1alpha1.PolicyBinding {
t.Helper()
got, err := ctrl.policyBindingClientset.PolicyBindingV1alpha1().PolicyBindings(namespace).Get(
context.Background(), name, metav1.GetOptions{},
)
if err != nil {
t.Fatalf("get PolicyBinding: %v", err)
}
return got
}
+73 -23
View File
@@ -30,6 +30,7 @@ import (
"k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/kubernetes/scheme"
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
@@ -39,7 +40,6 @@ import (
"k8s.io/klog/v2" "k8s.io/klog/v2"
v1alpha1 "gitea.t000-n.de/t.behrendt/authentik-kubernetes-operator/pkg/apis/proxyprovider/v1alpha1" v1alpha1 "gitea.t000-n.de/t.behrendt/authentik-kubernetes-operator/pkg/apis/proxyprovider/v1alpha1"
controllers "gitea.t000-n.de/t.behrendt/authentik-kubernetes-operator/pkg/controllers"
clientset "gitea.t000-n.de/t.behrendt/authentik-kubernetes-operator/pkg/generated/clientset/versioned" clientset "gitea.t000-n.de/t.behrendt/authentik-kubernetes-operator/pkg/generated/clientset/versioned"
operatorscheme "gitea.t000-n.de/t.behrendt/authentik-kubernetes-operator/pkg/generated/clientset/versioned/scheme" operatorscheme "gitea.t000-n.de/t.behrendt/authentik-kubernetes-operator/pkg/generated/clientset/versioned/scheme"
informers "gitea.t000-n.de/t.behrendt/authentik-kubernetes-operator/pkg/generated/informers/externalversions/proxyprovider/v1alpha1" informers "gitea.t000-n.de/t.behrendt/authentik-kubernetes-operator/pkg/generated/informers/externalversions/proxyprovider/v1alpha1"
@@ -62,14 +62,16 @@ const (
DeleteAuthentikProxyProviderFinalizer = "proxyprovider.t000-n.de/delete-authentik-proxyprovider" DeleteAuthentikProxyProviderFinalizer = "proxyprovider.t000-n.de/delete-authentik-proxyprovider"
) )
type ProxyProviderController struct { type Controller struct {
kubeclientset kubernetes.Interface kubeclientset kubernetes.Interface
proxyProviderClientset clientset.Interface proxyProviderClientset clientset.Interface
authentik *authentikapi.APIClient authentik *authentikapi.APIClient
proxyLister listers.ProxyProviderLister proxyLister listers.ProxyProviderLister
proxySynced cache.InformerSynced
controller *controllers.Controller workqueue workqueue.TypedRateLimitingInterface[cache.ObjectName]
recorder record.EventRecorder
} }
func NewController( func NewController(
@@ -78,7 +80,7 @@ func NewController(
proxyProviderClientset clientset.Interface, proxyProviderClientset clientset.Interface,
authentik *authentikapi.APIClient, authentik *authentikapi.APIClient,
proxyInformer informers.ProxyProviderInformer, proxyInformer informers.ProxyProviderInformer,
) *ProxyProviderController { ) *Controller {
logger := klog.FromContext(ctx) logger := klog.FromContext(ctx)
utilruntime.Must(operatorscheme.AddToScheme(scheme.Scheme)) utilruntime.Must(operatorscheme.AddToScheme(scheme.Scheme))
@@ -93,36 +95,75 @@ func NewController(
&workqueue.TypedBucketRateLimiter[cache.ObjectName]{Limiter: rate.NewLimiter(rate.Limit(50), 300)}, &workqueue.TypedBucketRateLimiter[cache.ObjectName]{Limiter: rate.NewLimiter(rate.Limit(50), 300)},
) )
c := &ProxyProviderController{ c := &Controller{
kubeclientset: kubeclientset, kubeclientset: kubeclientset,
proxyProviderClientset: proxyProviderClientset, proxyProviderClientset: proxyProviderClientset,
authentik: authentik, authentik: authentik,
proxyLister: proxyInformer.Lister(), proxyLister: proxyInformer.Lister(),
proxySynced: proxyInformer.Informer().HasSynced,
workqueue: workqueue.NewTypedRateLimitingQueue(ratelimiter),
recorder: recorder,
} }
c.controller = controllers.NewController(
ctx,
workqueue.NewTypedRateLimitingQueue(ratelimiter),
recorder,
proxyInformer.Informer().HasSynced,
c.syncHandler,
)
logger.Info("Setting up event handlers") logger.Info("Setting up event handlers")
proxyInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ proxyInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.controller.Enqueue, AddFunc: c.enqueueProxyProvider,
UpdateFunc: func(_, newObj interface{}) { UpdateFunc: func(_, newObj interface{}) {
c.controller.Enqueue(newObj) c.enqueueProxyProvider(newObj)
}, },
}) })
return c return c
} }
func (c *ProxyProviderController) Run(ctx context.Context, workers int) error { func (c *Controller) Run(ctx context.Context, workers int) error {
return c.controller.Run(ctx, workers) defer utilruntime.HandleCrash()
defer c.workqueue.ShutDown()
logger := klog.FromContext(ctx)
logger.Info("Starting ProxyProvider controller")
logger.Info("Waiting for informer caches to sync")
if ok := cache.WaitForCacheSync(ctx.Done(), c.proxySynced); !ok {
return fmt.Errorf("failed to wait for caches to sync")
}
logger.Info("Starting workers", "count", workers)
for i := 0; i < workers; i++ {
go wait.UntilWithContext(ctx, c.runWorker, time.Second)
}
logger.Info("Started workers")
<-ctx.Done()
logger.Info("Shutting down workers")
return nil
} }
func (c *ProxyProviderController) syncHandler(ctx context.Context, objectRef cache.ObjectName) error { func (c *Controller) runWorker(ctx context.Context) {
for c.processNextWorkItem(ctx) {
}
}
func (c *Controller) processNextWorkItem(ctx context.Context) bool {
objRef, shutdown := c.workqueue.Get()
logger := klog.FromContext(ctx)
if shutdown {
return false
}
defer c.workqueue.Done(objRef)
err := c.syncHandler(ctx, objRef)
if err == nil {
c.workqueue.Forget(objRef)
logger.Info("Successfully synced", "objectName", objRef)
return true
}
utilruntime.HandleErrorWithContext(ctx, err, "Error syncing; requeuing for later retry", "objectReference", objRef)
c.workqueue.AddRateLimited(objRef)
return true
}
func (c *Controller) syncHandler(ctx context.Context, objectRef cache.ObjectName) error {
logger := klog.LoggerWithValues(klog.FromContext(ctx), "objectRef", objectRef) logger := klog.LoggerWithValues(klog.FromContext(ctx), "objectRef", objectRef)
pp, err := c.proxyLister.ProxyProviders(objectRef.Namespace).Get(objectRef.Name) pp, err := c.proxyLister.ProxyProviders(objectRef.Namespace).Get(objectRef.Name)
@@ -155,12 +196,12 @@ func (c *ProxyProviderController) syncHandler(ctx context.Context, objectRef cac
return c.reconcileUpdate(ctx, pp) return c.reconcileUpdate(ctx, pp)
} }
func (c *ProxyProviderController) ensureFinalizers(ctx context.Context, pp *v1alpha1.ProxyProvider) error { func (c *Controller) ensureFinalizers(ctx context.Context, pp *v1alpha1.ProxyProvider) error {
pp.ObjectMeta.Finalizers = append(pp.ObjectMeta.Finalizers, DeleteAuthentikProxyProviderFinalizer) pp.ObjectMeta.Finalizers = append(pp.ObjectMeta.Finalizers, DeleteAuthentikProxyProviderFinalizer)
return c.updateProxyProvider(ctx, pp) return c.updateProxyProvider(ctx, pp)
} }
func (c *ProxyProviderController) reconcileDelete(ctx context.Context, pp *v1alpha1.ProxyProvider) error { func (c *Controller) reconcileDelete(ctx context.Context, pp *v1alpha1.ProxyProvider) error {
pk, err := strconv.ParseInt(pp.Status.PK, 10, 32) pk, err := strconv.ParseInt(pp.Status.PK, 10, 32)
if err != nil { if err != nil {
return fmt.Errorf("error parsing PK: %v", err) return fmt.Errorf("error parsing PK: %v", err)
@@ -178,7 +219,7 @@ func (c *ProxyProviderController) reconcileDelete(ctx context.Context, pp *v1alp
return c.updateProxyProvider(ctx, pp) return c.updateProxyProvider(ctx, pp)
} }
func (c *ProxyProviderController) reconcileUpdate(ctx context.Context, pp *v1alpha1.ProxyProvider) error { func (c *Controller) reconcileUpdate(ctx context.Context, pp *v1alpha1.ProxyProvider) error {
// We retrieve the existing PP from the API by slug. // We retrieve the existing PP from the API by slug.
pk, err := strconv.ParseInt(pp.Status.PK, 10, 32) pk, err := strconv.ParseInt(pp.Status.PK, 10, 32)
if err != nil { if err != nil {
@@ -212,7 +253,7 @@ func (c *ProxyProviderController) reconcileUpdate(ctx context.Context, pp *v1alp
return c.updateProxyProviderStatus(ctx, pp) return c.updateProxyProviderStatus(ctx, pp)
} }
func (c *ProxyProviderController) reconcileCreate(ctx context.Context, pp *v1alpha1.ProxyProvider) error { func (c *Controller) reconcileCreate(ctx context.Context, pp *v1alpha1.ProxyProvider) error {
proxyProviderRequest := &authentikapi.ProxyProviderRequest{ proxyProviderRequest := &authentikapi.ProxyProviderRequest{
Name: pp.Spec.Name, Name: pp.Spec.Name,
AuthorizationFlow: pp.Spec.AuthorizationFlow, AuthorizationFlow: pp.Spec.AuthorizationFlow,
@@ -229,14 +270,23 @@ func (c *ProxyProviderController) reconcileCreate(ctx context.Context, pp *v1alp
return c.updateProxyProviderStatus(ctx, pp) return c.updateProxyProviderStatus(ctx, pp)
} }
func (c *ProxyProviderController) updateProxyProviderStatus(ctx context.Context, pp *v1alpha1.ProxyProvider) error { func (c *Controller) enqueueProxyProvider(obj interface{}) {
objectRef, err := cache.ObjectToName(obj)
if err != nil {
utilruntime.HandleError(err)
return
}
c.workqueue.Add(objectRef)
}
func (c *Controller) updateProxyProviderStatus(ctx context.Context, pp *v1alpha1.ProxyProvider) error {
ppCopy := pp.DeepCopy() ppCopy := pp.DeepCopy()
_, err := c.proxyProviderClientset.ProxyproviderV1alpha1().ProxyProviders(ppCopy.Namespace).UpdateStatus(ctx, ppCopy, metav1.UpdateOptions{FieldManager: FieldManager}) _, err := c.proxyProviderClientset.ProxyproviderV1alpha1().ProxyProviders(ppCopy.Namespace).UpdateStatus(ctx, ppCopy, metav1.UpdateOptions{FieldManager: FieldManager})
return err return err
} }
// Update metadata, spec, etc. of the ProxyProvider object. // Update metadata, spec, etc. of the ProxyProvider object.
func (c *ProxyProviderController) updateProxyProvider(ctx context.Context, pp *v1alpha1.ProxyProvider) error { func (c *Controller) updateProxyProvider(ctx context.Context, pp *v1alpha1.ProxyProvider) error {
ppCopy := pp.DeepCopy() ppCopy := pp.DeepCopy()
_, err := c.proxyProviderClientset.ProxyproviderV1alpha1().ProxyProviders(ppCopy.Namespace).Update(ctx, ppCopy, metav1.UpdateOptions{FieldManager: FieldManager}) _, err := c.proxyProviderClientset.ProxyproviderV1alpha1().ProxyProviders(ppCopy.Namespace).Update(ctx, ppCopy, metav1.UpdateOptions{FieldManager: FieldManager})
if err != nil { if err != nil {
@@ -218,6 +218,20 @@ func TestController_syncHandler_invalidPK(t *testing.T) {
} }
} }
func TestController_enqueueProxyProvider(t *testing.T) {
server := newAuthentikTestServer(t, authentikTestHandlers{})
t.Cleanup(server.Close)
ctrl, _, cancel := newTestController(t, testProxyProvider(), server.URL)
t.Cleanup(cancel)
ctrl.enqueueProxyProvider(testProxyProvider())
if ctrl.workqueue.Len() != 1 {
t.Fatalf("workqueue length = %d, want 1", ctrl.workqueue.Len())
}
}
// --- test helpers --- // --- test helpers ---
func testProxyProvider() *v1alpha1.ProxyProvider { func testProxyProvider() *v1alpha1.ProxyProvider {
@@ -239,7 +253,7 @@ func testProxyProvider() *v1alpha1.ProxyProvider {
} }
} }
func newTestController(t *testing.T, pp *v1alpha1.ProxyProvider, authentikURL string) (*ProxyProviderController, context.Context, context.CancelFunc) { func newTestController(t *testing.T, pp *v1alpha1.ProxyProvider, authentikURL string) (*Controller, context.Context, context.CancelFunc) {
t.Helper() t.Helper()
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
ctrl, _, stop := newTestControllerWithContext(t, ctx, pp, authentikURL) ctrl, _, stop := newTestControllerWithContext(t, ctx, pp, authentikURL)
@@ -249,7 +263,7 @@ func newTestController(t *testing.T, pp *v1alpha1.ProxyProvider, authentikURL st
} }
} }
func newTestControllerWithContext(t *testing.T, ctx context.Context, pp *v1alpha1.ProxyProvider, authentikURL string) (*ProxyProviderController, context.Context, func()) { func newTestControllerWithContext(t *testing.T, ctx context.Context, pp *v1alpha1.ProxyProvider, authentikURL string) (*Controller, context.Context, func()) {
t.Helper() t.Helper()
authentikClient := newAuthentikAPIClientForTest(t, authentikURL) authentikClient := newAuthentikAPIClientForTest(t, authentikURL)
@@ -358,7 +372,7 @@ func writeJSON(t *testing.T, w http.ResponseWriter, status int, body any) {
} }
} }
func getProxyProvider(t *testing.T, ctrl *ProxyProviderController, namespace, name string) *v1alpha1.ProxyProvider { func getProxyProvider(t *testing.T, ctrl *Controller, namespace, name string) *v1alpha1.ProxyProvider {
t.Helper() t.Helper()
got, err := ctrl.proxyProviderClientset.ProxyproviderV1alpha1().ProxyProviders(namespace).Get( got, err := ctrl.proxyProviderClientset.ProxyproviderV1alpha1().ProxyProviders(namespace).Get(
@@ -363,6 +363,7 @@ func schema_pkg_apis_policybinding_v1alpha1_PolicyBindingSpec(ref common.Referen
}, },
"user": { "user": {
SchemaProps: spec.SchemaProps{ SchemaProps: spec.SchemaProps{
Default: 0,
Type: []string{"integer"}, Type: []string{"integer"},
Format: "int32", Format: "int32",
}, },
@@ -382,7 +383,7 @@ func schema_pkg_apis_policybinding_v1alpha1_PolicyBindingSpec(ref common.Referen
}, },
}, },
}, },
Required: []string{"target", "order"}, Required: []string{"user", "target", "order"},
}, },
}, },
} }