Files
t.behrendt 26bd576690
CD / Create tag (push) Successful in 11s
CD / Build and push (amd64) (push) Successful in 1m32s
CD / Create manifest (push) Successful in 7s
feat: vertical slice application -> provider -> binding (#4)
Co-authored-by: Timo Behrendt <t.behrendt@t00n.de>
Co-committed-by: Timo Behrendt <t.behrendt@t00n.de>
2026-05-25 17:14:35 +02:00

246 lines
9.2 KiB
Go

/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package policybinding
import (
"context"
"fmt"
"net/http"
"slices"
"time"
"golang.org/x/time/rate"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
"gitea.t000-n.de/t.behrendt/authentik-kubernetes-operator/internal/baseController"
v1alpha1 "gitea.t000-n.de/t.behrendt/authentik-kubernetes-operator/pkg/apis/policybinding/v1alpha1"
clientset "gitea.t000-n.de/t.behrendt/authentik-kubernetes-operator/pkg/generated/clientset/versioned"
operatorscheme "gitea.t000-n.de/t.behrendt/authentik-kubernetes-operator/pkg/generated/clientset/versioned/scheme"
informers "gitea.t000-n.de/t.behrendt/authentik-kubernetes-operator/pkg/generated/informers/externalversions/policybinding/v1alpha1"
listers "gitea.t000-n.de/t.behrendt/authentik-kubernetes-operator/pkg/generated/listers/policybinding/v1alpha1"
authentikapi "goauthentik.io/api/v3"
)
const controllerAgentName = "policybinding-controller"
const (
SuccessSynced = "Synced"
ErrResourceExists = "ErrResourceExists"
MessageResourceExists = "Resource %q already exists and is not managed by PolicyBinding"
MessageResourceSynced = "PolicyBinding synced successfully"
FieldManager = controllerAgentName
)
// Finalizers
const (
DeleteAuthentikPolicyBindingFinalizer = "policybinding.t000-n.de/delete-authentik-policybinding"
)
type PolicyBindingController struct {
kubeclientset kubernetes.Interface
policyBindingClientset clientset.Interface
authentik *authentikapi.APIClient
policyBindingListener listers.PolicyBindingLister
controller *baseController.Controller
}
func NewController(
ctx context.Context,
kubeclientset kubernetes.Interface,
policyBindingClientset clientset.Interface,
authentik *authentikapi.APIClient,
policyBindingInformer informers.PolicyBindingInformer,
) *PolicyBindingController {
logger := klog.FromContext(ctx)
utilruntime.Must(operatorscheme.AddToScheme(scheme.Scheme))
logger.V(4).Info("Creating event broadcaster")
eventBroadcaster := record.NewBroadcaster(record.WithContext(ctx))
eventBroadcaster.StartStructuredLogging(0)
eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeclientset.CoreV1().Events("")})
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName})
ratelimiter := workqueue.NewTypedMaxOfRateLimiter(
workqueue.NewTypedItemExponentialFailureRateLimiter[cache.ObjectName](5*time.Millisecond, 1000*time.Second),
&workqueue.TypedBucketRateLimiter[cache.ObjectName]{Limiter: rate.NewLimiter(rate.Limit(50), 300)},
)
c := &PolicyBindingController{
kubeclientset: kubeclientset,
policyBindingClientset: policyBindingClientset,
authentik: authentik,
policyBindingListener: policyBindingInformer.Lister(),
}
c.controller = baseController.NewController(
ctx,
workqueue.NewTypedRateLimitingQueue(ratelimiter),
recorder,
policyBindingInformer.Informer().HasSynced,
c.syncHandler,
)
logger.Info("Setting up event handlers")
policyBindingInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.controller.Enqueue,
UpdateFunc: func(_, newObj interface{}) {
c.controller.Enqueue(newObj)
},
})
return c
}
func (c *PolicyBindingController) Run(ctx context.Context, workers int) error {
return c.controller.Run(ctx, workers)
}
func (c *PolicyBindingController) syncHandler(ctx context.Context, objectRef cache.ObjectName) error {
logger := klog.LoggerWithValues(klog.FromContext(ctx), "objectRef", objectRef)
pb, err := c.policyBindingListener.PolicyBindings(objectRef.Namespace).Get(objectRef.Name)
if err != nil {
if errors.IsNotFound(err) {
logger.V(4).Info("PolicyBinding no longer exists")
return nil
}
return err
}
logger.V(4).Info("sync PolicyBinding", "name", pb.Name)
if !pb.ObjectMeta.DeletionTimestamp.IsZero() {
logger.Info("Reconciling deletion of PolicyBinding", "name", pb.Name)
return c.reconcileDelete(ctx, pb)
}
if pb.Status.PK == "" {
logger.Info("Reconciling creation of PolicyBinding", "name", pb.Name)
return c.reconcileCreate(ctx, pb)
}
// Check if all finalizers are present. If not, we add them. Same pattern as above, just needs a helper function to check for presence of a finalizer.
if !slices.Contains(pb.ObjectMeta.Finalizers, DeleteAuthentikPolicyBindingFinalizer) {
logger.Info("Ensuring finalizers are present", "name", pb.Name)
return c.ensureFinalizers(ctx, pb)
}
logger.Info("Reconciling update of PolicyBinding", "name", pb.Name)
return c.reconcileUpdate(ctx, pb)
}
func (c *PolicyBindingController) ensureFinalizers(ctx context.Context, pb *v1alpha1.PolicyBinding) error {
pb.ObjectMeta.Finalizers = append(pb.ObjectMeta.Finalizers, DeleteAuthentikPolicyBindingFinalizer)
return c.updatePolicyBinding(ctx, pb)
}
func (c *PolicyBindingController) reconcileDelete(ctx context.Context, pb *v1alpha1.PolicyBinding) error {
r, err := c.authentik.PoliciesApi.PoliciesBindingsDestroy(ctx, pb.Status.PK).Execute()
if err != nil {
// This handles an edge-case, where when the PolicyBinding on Authentik has already been deleted, but the finalizer is still present. We just remove the finalizer and return.
if r != nil && r.StatusCode != http.StatusNotFound {
return fmt.Errorf("error when calling `PoliciesAPI.PoliciesBindingsDestroy`: %w with response %v", err, r)
}
}
pb.ObjectMeta.Finalizers = slices.Delete(pb.ObjectMeta.Finalizers, slices.Index(pb.ObjectMeta.Finalizers, DeleteAuthentikPolicyBindingFinalizer), 1)
return c.updatePolicyBinding(ctx, pb)
}
func (c *PolicyBindingController) reconcileUpdate(ctx context.Context, pb *v1alpha1.PolicyBinding) error {
_, r, err := c.authentik.PoliciesApi.PoliciesBindingsRetrieve(ctx, pb.Status.PK).Execute()
if err != nil {
if r != nil && r.StatusCode == http.StatusNotFound {
// This handles an edge-case, where when the PolicyBinding on Authentik has been deleted, e.g. by mistake. We just remove the PK and return.
// During the next reconciliation, the PolicyBinding will be re-created.
pb.Status.PK = ""
return c.updatePolicyBindingStatus(ctx, pb)
}
return fmt.Errorf("error retrieving existing PolicyBinding: %v with response %v", err, r)
}
patchedPolicyBindingRequest := &authentikapi.PatchedPolicyBindingRequest{
Target: &pb.Spec.Target,
Order: &pb.Spec.Order,
}
if pb.Spec.Policy != "" {
patchedPolicyBindingRequest.SetPolicy(pb.Spec.Policy)
}
if pb.Spec.Group != "" {
patchedPolicyBindingRequest.SetGroup(pb.Spec.Group)
}
if pb.Spec.User != 0 {
patchedPolicyBindingRequest.SetUser(pb.Spec.User)
}
resp, r, err := c.authentik.PoliciesApi.PoliciesBindingsPartialUpdate(ctx, pb.Status.PK).PatchedPolicyBindingRequest(*patchedPolicyBindingRequest).Execute()
if err != nil {
return fmt.Errorf("error when calling `PoliciesAPI.PoliciesBindingsPartialUpdate`: %w with response %v", err, r)
}
pb.Status.PK = resp.Pk
return c.updatePolicyBindingStatus(ctx, pb)
}
func (c *PolicyBindingController) reconcileCreate(ctx context.Context, pb *v1alpha1.PolicyBinding) error {
policyBindingRequest := &authentikapi.PolicyBindingRequest{
Target: pb.Spec.Target,
Order: pb.Spec.Order,
}
if pb.Spec.Policy != "" {
policyBindingRequest.SetPolicy(pb.Spec.Policy)
}
if pb.Spec.Group != "" {
policyBindingRequest.SetGroup(pb.Spec.Group)
}
if pb.Spec.User != 0 {
policyBindingRequest.SetUser(pb.Spec.User)
}
resp, r, err := c.authentik.PoliciesApi.PoliciesBindingsCreate(ctx).PolicyBindingRequest(*policyBindingRequest).Execute()
if err != nil {
return fmt.Errorf("error when calling `PoliciesAPI.PoliciesBindingsCreate`: %w with response %v", err, r)
}
pb.Status.PK = resp.Pk
return c.updatePolicyBindingStatus(ctx, pb)
}
func (c *PolicyBindingController) updatePolicyBindingStatus(ctx context.Context, pb *v1alpha1.PolicyBinding) error {
pbCopy := pb.DeepCopy()
_, err := c.policyBindingClientset.PolicyBindingV1alpha1().PolicyBindings(pbCopy.Namespace).UpdateStatus(ctx, pbCopy, metav1.UpdateOptions{FieldManager: FieldManager})
return err
}
// Update metadata, spec, etc. of the PolicyBinding object.
func (c *PolicyBindingController) updatePolicyBinding(ctx context.Context, pb *v1alpha1.PolicyBinding) error {
pbCopy := pb.DeepCopy()
_, err := c.policyBindingClientset.PolicyBindingV1alpha1().PolicyBindings(pbCopy.Namespace).Update(ctx, pbCopy, metav1.UpdateOptions{FieldManager: FieldManager})
return err
}