26bd576690
Co-authored-by: Timo Behrendt <t.behrendt@t00n.de> Co-committed-by: Timo Behrendt <t.behrendt@t00n.de>
95 lines
2.3 KiB
Go
95 lines
2.3 KiB
Go
package baseController
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"time"
|
|
|
|
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
|
"k8s.io/apimachinery/pkg/util/wait"
|
|
"k8s.io/client-go/tools/cache"
|
|
"k8s.io/client-go/tools/record"
|
|
"k8s.io/client-go/util/workqueue"
|
|
"k8s.io/klog/v2"
|
|
)
|
|
|
|
type SyncHandler func(ctx context.Context, objRef cache.ObjectName) error
|
|
|
|
type Controller struct {
|
|
workqueue workqueue.TypedRateLimitingInterface[cache.ObjectName]
|
|
recorder record.EventRecorder
|
|
synced cache.InformerSynced
|
|
syncHandler SyncHandler
|
|
}
|
|
|
|
func NewController(
|
|
ctx context.Context,
|
|
workqueue workqueue.TypedRateLimitingInterface[cache.ObjectName],
|
|
recorder record.EventRecorder,
|
|
synced cache.InformerSynced,
|
|
syncHandler SyncHandler,
|
|
) *Controller {
|
|
return &Controller{
|
|
workqueue: workqueue,
|
|
recorder: recorder,
|
|
synced: synced,
|
|
syncHandler: syncHandler,
|
|
}
|
|
}
|
|
|
|
func (c *Controller) Run(ctx context.Context, workers int) error {
|
|
defer utilruntime.HandleCrash()
|
|
defer c.workqueue.ShutDown()
|
|
logger := klog.FromContext(ctx)
|
|
|
|
logger.Info("Starting PolicyBinding controller")
|
|
|
|
logger.Info("Waiting for informer caches to sync")
|
|
if ok := cache.WaitForCacheSync(ctx.Done(), c.synced); !ok {
|
|
return fmt.Errorf("failed to wait for caches to sync")
|
|
}
|
|
|
|
logger.Info("Starting workers", "count", workers)
|
|
for i := 0; i < workers; i++ {
|
|
go wait.UntilWithContext(ctx, c.runWorker, time.Second)
|
|
}
|
|
|
|
logger.Info("Started workers")
|
|
<-ctx.Done()
|
|
logger.Info("Shutting down workers")
|
|
return nil
|
|
}
|
|
|
|
func (c *Controller) runWorker(ctx context.Context) {
|
|
for c.processNextWorkItem(ctx) {
|
|
}
|
|
}
|
|
|
|
func (c *Controller) processNextWorkItem(ctx context.Context) bool {
|
|
objRef, shutdown := c.workqueue.Get()
|
|
logger := klog.FromContext(ctx)
|
|
if shutdown {
|
|
return false
|
|
}
|
|
defer c.workqueue.Done(objRef)
|
|
|
|
err := c.syncHandler(ctx, objRef)
|
|
if err == nil {
|
|
c.workqueue.Forget(objRef)
|
|
logger.Info("Successfully synced", "objectName", objRef)
|
|
return true
|
|
}
|
|
utilruntime.HandleErrorWithContext(ctx, err, "Error syncing; requeuing for later retry", "objectReference", objRef)
|
|
c.workqueue.AddRateLimited(objRef)
|
|
return true
|
|
}
|
|
|
|
func (c *Controller) Enqueue(obj interface{}) {
|
|
objectRef, err := cache.ObjectToName(obj)
|
|
if err != nil {
|
|
utilruntime.HandleError(err)
|
|
return
|
|
}
|
|
c.workqueue.Add(objectRef)
|
|
}
|