// AI generated tests and not yet reviewed. package controllers import ( "context" "errors" "sync/atomic" "testing" "time" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" ) func newTestController(t *testing.T, synced cache.InformerSynced, syncHandler SyncHandler) (*Controller, workqueue.TypedRateLimitingInterface[cache.ObjectName]) { t.Helper() ratelimiter := workqueue.NewTypedItemExponentialFailureRateLimiter[cache.ObjectName](time.Millisecond, time.Second) q := workqueue.NewTypedRateLimitingQueue(ratelimiter) t.Cleanup(q.ShutDown) if synced == nil { synced = func() bool { return true } } ctrl := NewController( context.Background(), q, record.NewFakeRecorder(10), synced, syncHandler, ) return ctrl, q } func TestController_processNextWorkItem_success(t *testing.T) { objRef := cache.ObjectName{Namespace: "default", Name: "test"} var syncedRef cache.ObjectName ctrl, q := newTestController(t, nil, func(_ context.Context, ref cache.ObjectName) error { syncedRef = ref return nil }) q.Add(objRef) if !ctrl.processNextWorkItem(context.Background()) { t.Fatal("processNextWorkItem() = false, want true") } if syncedRef != objRef { t.Fatalf("syncHandler object = %+v, want %+v", syncedRef, objRef) } if q.Len() != 0 { t.Fatalf("queue length = %d, want 0 after successful sync", q.Len()) } if q.NumRequeues(objRef) != 0 { t.Fatalf("requeues = %d, want 0 after successful sync", q.NumRequeues(objRef)) } } func TestController_processNextWorkItem_syncError(t *testing.T) { objRef := cache.ObjectName{Namespace: "default", Name: "test"} syncErr := errors.New("sync failed") ctrl, q := newTestController(t, nil, func(context.Context, cache.ObjectName) error { return syncErr }) q.Add(objRef) if !ctrl.processNextWorkItem(context.Background()) { t.Fatal("processNextWorkItem() = false, want true") } if q.NumRequeues(objRef) != 1 { t.Fatalf("requeues = %d, want 1 after sync error", q.NumRequeues(objRef)) } } func TestController_processNextWorkItem_shutdown(t *testing.T) { ctrl, q := newTestController(t, nil, func(context.Context, cache.ObjectName) error { return nil }) q.ShutDown() if ctrl.processNextWorkItem(context.Background()) { t.Fatal("processNextWorkItem() = true, want false on shutdown") } } func TestController_Enqueue(t *testing.T) { ctrl, q := newTestController(t, nil, func(context.Context, cache.ObjectName) error { return nil }) obj := &corev1.ConfigMap{ ObjectMeta: metav1.ObjectMeta{ Namespace: "default", Name: "test", }, } ctrl.Enqueue(obj) if q.Len() != 1 { t.Fatalf("queue length = %d, want 1 after Enqueue", q.Len()) } } func TestController_Enqueue_invalidObject(t *testing.T) { ctrl, q := newTestController(t, nil, func(context.Context, cache.ObjectName) error { return nil }) ctrl.Enqueue("not-a-kubernetes-object") if q.Len() != 0 { t.Fatalf("queue length = %d, want 0 for invalid object", q.Len()) } } func TestController_Run_cacheSyncFails(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() ctrl, _ := newTestController(t, func() bool { return false }, func(context.Context, cache.ObjectName) error { return nil }) go func() { time.Sleep(10 * time.Millisecond) cancel() }() err := ctrl.Run(ctx, 1) if err == nil { t.Fatal("Run() error = nil, want cache sync failure") } } func TestController_Run_shutsDownOnCancel(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) ctrl, _ := newTestController(t, nil, func(context.Context, cache.ObjectName) error { return nil }) errCh := make(chan error, 1) go func() { errCh <- ctrl.Run(ctx, 1) }() time.Sleep(50 * time.Millisecond) cancel() select { case err := <-errCh: if err != nil { t.Fatalf("Run() error = %v, want nil on context cancel", err) } case <-time.After(2 * time.Second): t.Fatal("Run() did not return after context cancellation") } } func TestController_runWorker_processesQueuedItem(t *testing.T) { objRef := cache.ObjectName{Namespace: "default", Name: "test"} var calls atomic.Int32 ctrl, q := newTestController(t, nil, func(context.Context, cache.ObjectName) error { calls.Add(1) return nil }) q.Add(objRef) ctx, cancel := context.WithCancel(context.Background()) defer cancel() go ctrl.runWorker(ctx) deadline := time.Now().Add(2 * time.Second) for time.Now().Before(deadline) { if calls.Load() == 1 && q.Len() == 0 { cancel() return } time.Sleep(5 * time.Millisecond) } cancel() t.Fatalf("runWorker did not process queued item: calls=%d queueLen=%d", calls.Load(), q.Len()) }