Files
authentik-kubernetes-operator/pkg/controllers/policybinding/controller.go
T
t.behrendt 4367bc78bc
CI / image check (pull_request) Successful in 58s
CI / install-dependencies (pull_request) Successful in 7m20s
CI / check format (pull_request) Successful in 25s
CI / test (pull_request) Successful in 25s
CI / build check (pull_request) Successful in 1m39s
CI / check lint (pull_request) Successful in 12m48s
feat: add bare policy binding controller
2026-05-17 19:38:29 +02:00

288 lines
10 KiB
Go

/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package policybinding
import (
"context"
"fmt"
"net/http"
"slices"
"strconv"
"time"
"golang.org/x/time/rate"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
v1alpha1 "gitea.t000-n.de/t.behrendt/authentik-kubernetes-operator/pkg/apis/policybinding/v1alpha1"
clientset "gitea.t000-n.de/t.behrendt/authentik-kubernetes-operator/pkg/generated/clientset/versioned"
operatorscheme "gitea.t000-n.de/t.behrendt/authentik-kubernetes-operator/pkg/generated/clientset/versioned/scheme"
informers "gitea.t000-n.de/t.behrendt/authentik-kubernetes-operator/pkg/generated/informers/externalversions/policybinding/v1alpha1"
listers "gitea.t000-n.de/t.behrendt/authentik-kubernetes-operator/pkg/generated/listers/policybinding/v1alpha1"
authentikapi "goauthentik.io/api/v3"
)
const controllerAgentName = "policybinding-controller"
const (
SuccessSynced = "Synced"
ErrResourceExists = "ErrResourceExists"
MessageResourceExists = "Resource %q already exists and is not managed by PolicyBinding"
MessageResourceSynced = "PolicyBinding synced successfully"
FieldManager = controllerAgentName
)
// Finalizers
const (
DeleteAuthentikPolicyBindingFinalizer = "policybinding.t000-n.de/delete-authentik-policybinding"
)
type Controller struct {
kubeclientset kubernetes.Interface
policyBindingClientset clientset.Interface
authentik *authentikapi.APIClient
policyBindingListener listers.PolicyBindingLister
policyBindingSynced cache.InformerSynced
workqueue workqueue.TypedRateLimitingInterface[cache.ObjectName]
recorder record.EventRecorder
}
func NewController(
ctx context.Context,
kubeclientset kubernetes.Interface,
policyBindingClientset clientset.Interface,
authentik *authentikapi.APIClient,
policyBindingInformer informers.PolicyBindingInformer,
) *Controller {
logger := klog.FromContext(ctx)
utilruntime.Must(operatorscheme.AddToScheme(scheme.Scheme))
logger.V(4).Info("Creating event broadcaster")
eventBroadcaster := record.NewBroadcaster(record.WithContext(ctx))
eventBroadcaster.StartStructuredLogging(0)
eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeclientset.CoreV1().Events("")})
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName})
ratelimiter := workqueue.NewTypedMaxOfRateLimiter(
workqueue.NewTypedItemExponentialFailureRateLimiter[cache.ObjectName](5*time.Millisecond, 1000*time.Second),
&workqueue.TypedBucketRateLimiter[cache.ObjectName]{Limiter: rate.NewLimiter(rate.Limit(50), 300)},
)
c := &Controller{
kubeclientset: kubeclientset,
policyBindingClientset: policyBindingClientset,
authentik: authentik,
policyBindingListener: policyBindingInformer.Lister(),
policyBindingSynced: policyBindingInformer.Informer().HasSynced,
workqueue: workqueue.NewTypedRateLimitingQueue(ratelimiter),
recorder: recorder,
}
logger.Info("Setting up event handlers")
policyBindingInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.enqueuePolicyBinding,
UpdateFunc: func(_, newObj interface{}) {
c.enqueuePolicyBinding(newObj)
},
})
return c
}
func (c *Controller) Run(ctx context.Context, workers int) error {
defer utilruntime.HandleCrash()
defer c.workqueue.ShutDown()
logger := klog.FromContext(ctx)
logger.Info("Starting PolicyBinding controller")
logger.Info("Waiting for informer caches to sync")
if ok := cache.WaitForCacheSync(ctx.Done(), c.policyBindingSynced); !ok {
return fmt.Errorf("failed to wait for caches to sync")
}
logger.Info("Starting workers", "count", workers)
for i := 0; i < workers; i++ {
go wait.UntilWithContext(ctx, c.runWorker, time.Second)
}
logger.Info("Started workers")
<-ctx.Done()
logger.Info("Shutting down workers")
return nil
}
func (c *Controller) runWorker(ctx context.Context) {
for c.processNextWorkItem(ctx) {
}
}
func (c *Controller) processNextWorkItem(ctx context.Context) bool {
objRef, shutdown := c.workqueue.Get()
logger := klog.FromContext(ctx)
if shutdown {
return false
}
defer c.workqueue.Done(objRef)
err := c.syncHandler(ctx, objRef)
if err == nil {
c.workqueue.Forget(objRef)
logger.Info("Successfully synced", "objectName", objRef)
return true
}
utilruntime.HandleErrorWithContext(ctx, err, "Error syncing; requeuing for later retry", "objectReference", objRef)
c.workqueue.AddRateLimited(objRef)
return true
}
func (c *Controller) syncHandler(ctx context.Context, objectRef cache.ObjectName) error {
logger := klog.LoggerWithValues(klog.FromContext(ctx), "objectRef", objectRef)
pb, err := c.policyBindingListener.PolicyBindings(objectRef.Namespace).Get(objectRef.Name)
if err != nil {
if errors.IsNotFound(err) {
logger.V(4).Info("PolicyBinding no longer exists")
return nil
}
return err
}
logger.V(4).Info("sync PolicyBinding", "name", pb.Name)
if !pb.ObjectMeta.DeletionTimestamp.IsZero() {
logger.Info("Reconciling deletion of PolicyBinding", "name", pb.Name)
return c.reconcileDelete(ctx, pb)
}
if pb.Status.PK == "" {
logger.Info("Reconciling creation of PolicyBinding", "name", pb.Name)
return c.reconcileCreate(ctx, pb)
}
// Check if all finalizers are present. If not, we add them. Same pattern as above, just needs a helper function to check for presence of a finalizer.
if !slices.Contains(pb.ObjectMeta.Finalizers, DeleteAuthentikPolicyBindingFinalizer) {
logger.Info("Ensuring finalizers are present", "name", pb.Name)
return c.ensureFinalizers(ctx, pb)
}
logger.Info("Reconciling update of PolicyBinding", "name", pb.Name)
return c.reconcileUpdate(ctx, pb)
}
func (c *Controller) ensureFinalizers(ctx context.Context, pb *v1alpha1.PolicyBinding) error {
pb.ObjectMeta.Finalizers = append(pb.ObjectMeta.Finalizers, DeleteAuthentikPolicyBindingFinalizer)
return c.updatePolicyBinding(ctx, pb)
}
func (c *Controller) reconcileDelete(ctx context.Context, pb *v1alpha1.PolicyBinding) error {
pk, err := strconv.ParseInt(pb.Status.PK, 10, 32)
if err != nil {
return fmt.Errorf("error parsing PK: %v", err)
}
r, err := c.authentik.ProvidersApi.ProvidersProxyDestroy(ctx, int32(pk)).Execute()
if err != nil {
// This handles an edge-case, where when the ProxyProvider on Authentik has already been deleted, but the finalizer is still present. We just remove the finalizer and return.
if r != nil && r.StatusCode != http.StatusNotFound {
return fmt.Errorf("error when calling `ProvidersAPI.ProvidersProxyDestroy`: %w with response %v", err, r)
}
}
pb.ObjectMeta.Finalizers = slices.Delete(pb.ObjectMeta.Finalizers, slices.Index(pb.ObjectMeta.Finalizers, DeleteAuthentikPolicyBindingFinalizer), 1)
return c.updatePolicyBinding(ctx, pb)
}
func (c *Controller) reconcileUpdate(ctx context.Context, pb *v1alpha1.PolicyBinding) error {
_, r, err := c.authentik.PoliciesApi.PoliciesBindingsRetrieve(ctx, pb.Status.PK).Execute()
if err != nil {
if r != nil && r.StatusCode == http.StatusNotFound {
// This handles an edge-case, where when the PolicyBinding on Authentik has been deleted, e.g. by mistake. We just remove the PK and return.
// During the next reconciliation, the PolicyBinding will be re-created.
pb.Status.PK = ""
return c.updatePolicyBindingStatus(ctx, pb)
}
return fmt.Errorf("error retrieving existing PolicyBinding: %v with response %v", err, r)
}
patchedPolicyBindingRequest := &authentikapi.PatchedPolicyBindingRequest{
Policy: *authentikapi.NewNullableString(&pb.Spec.Policy),
Group: *authentikapi.NewNullableString(&pb.Spec.Group),
User: *authentikapi.NewNullableInt32(&pb.Spec.User),
Target: &pb.Spec.Target,
Order: &pb.Spec.Order,
}
resp, r, err := c.authentik.PoliciesApi.PoliciesBindingsPartialUpdate(ctx, pb.Status.PK).PatchedPolicyBindingRequest(*patchedPolicyBindingRequest).Execute()
if err != nil {
return fmt.Errorf("error when calling `ProvidersAPI.ProvidersProxyPartialUpdate`: %w with response %v", err, r)
}
pb.Status.PK = resp.Pk
return c.updatePolicyBindingStatus(ctx, pb)
}
func (c *Controller) reconcileCreate(ctx context.Context, pb *v1alpha1.PolicyBinding) error {
policyBindingRequest := &authentikapi.PolicyBindingRequest{
Policy: *authentikapi.NewNullableString(&pb.Spec.Policy),
Group: *authentikapi.NewNullableString(&pb.Spec.Group),
User: *authentikapi.NewNullableInt32(&pb.Spec.User),
Target: pb.Spec.Target,
Order: pb.Spec.Order,
}
resp, r, err := c.authentik.PoliciesApi.PoliciesBindingsCreate(ctx).PolicyBindingRequest(*policyBindingRequest).Execute()
if err != nil {
return fmt.Errorf("error when calling `PoliciesAPI.PoliciesBindingsCreate`: %w with response %v", err, r)
}
pb.Status.PK = resp.Pk
return c.updatePolicyBindingStatus(ctx, pb)
}
func (c *Controller) enqueuePolicyBinding(obj interface{}) {
objectRef, err := cache.ObjectToName(obj)
if err != nil {
utilruntime.HandleError(err)
return
}
c.workqueue.Add(objectRef)
}
func (c *Controller) updatePolicyBindingStatus(ctx context.Context, pb *v1alpha1.PolicyBinding) error {
pbCopy := pb.DeepCopy()
_, err := c.policyBindingClientset.PolicyBindingV1alpha1().PolicyBindings(pbCopy.Namespace).UpdateStatus(ctx, pbCopy, metav1.UpdateOptions{FieldManager: FieldManager})
return err
}
// Update metadata, spec, etc. of the PolicyBinding object.
func (c *Controller) updatePolicyBinding(ctx context.Context, pb *v1alpha1.PolicyBinding) error {
pbCopy := pb.DeepCopy()
_, err := c.policyBindingClientset.PolicyBindingV1alpha1().PolicyBindings(pbCopy.Namespace).Update(ctx, pbCopy, metav1.UpdateOptions{FieldManager: FieldManager})
return err
}