package baseController import ( "context" "fmt" "time" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" ) type SyncHandler func(ctx context.Context, objRef cache.ObjectName) error type Controller struct { workqueue workqueue.TypedRateLimitingInterface[cache.ObjectName] recorder record.EventRecorder synced cache.InformerSynced syncHandler SyncHandler } func NewController( ctx context.Context, workqueue workqueue.TypedRateLimitingInterface[cache.ObjectName], recorder record.EventRecorder, synced cache.InformerSynced, syncHandler SyncHandler, ) *Controller { return &Controller{ workqueue: workqueue, recorder: recorder, synced: synced, syncHandler: syncHandler, } } func (c *Controller) Run(ctx context.Context, workers int) error { defer utilruntime.HandleCrash() defer c.workqueue.ShutDown() logger := klog.FromContext(ctx) logger.Info("Starting PolicyBinding controller") logger.Info("Waiting for informer caches to sync") if ok := cache.WaitForCacheSync(ctx.Done(), c.synced); !ok { return fmt.Errorf("failed to wait for caches to sync") } logger.Info("Starting workers", "count", workers) for i := 0; i < workers; i++ { go wait.UntilWithContext(ctx, c.runWorker, time.Second) } logger.Info("Started workers") <-ctx.Done() logger.Info("Shutting down workers") return nil } func (c *Controller) runWorker(ctx context.Context) { for c.processNextWorkItem(ctx) { } } func (c *Controller) processNextWorkItem(ctx context.Context) bool { objRef, shutdown := c.workqueue.Get() logger := klog.FromContext(ctx) if shutdown { return false } defer c.workqueue.Done(objRef) err := c.syncHandler(ctx, objRef) if err == nil { c.workqueue.Forget(objRef) logger.Info("Successfully synced", "objectName", objRef) return true } utilruntime.HandleErrorWithContext(ctx, err, "Error syncing; requeuing for later retry", "objectReference", objRef) c.workqueue.AddRateLimited(objRef) return true } func (c *Controller) Enqueue(obj interface{}) { objectRef, err := cache.ObjectToName(obj) if err != nil { utilruntime.HandleError(err) return } c.workqueue.Add(objectRef) }