From bf09c0e32e6d06d38bbb74042f6076ff6bcdc58a Mon Sep 17 00:00:00 2001 From: Timo Behrendt Date: Mon, 5 May 2025 20:26:34 +0200 Subject: [PATCH] test: processor (#6) Reviewed-on: https://gitea.t000-n.de/t.behrendt/tracebasedlogsampler/pulls/6 Co-authored-by: Timo Behrendt Co-committed-by: Timo Behrendt --- go.mod | 1 + go.sum | 2 + processor_test.go | 187 ++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 190 insertions(+) create mode 100644 processor_test.go diff --git a/go.mod b/go.mod index b193b39..81803f7 100644 --- a/go.mod +++ b/go.mod @@ -24,6 +24,7 @@ require ( github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/stretchr/objx v0.5.2 // indirect go.opentelemetry.io/auto/sdk v1.1.0 // indirect go.opentelemetry.io/collector/component/componentstatus v0.125.0 // indirect go.opentelemetry.io/collector/consumer/xconsumer v0.125.0 // indirect diff --git a/go.sum b/go.sum index 0c3aeaa..13b4fa8 100644 --- a/go.sum +++ b/go.sum @@ -48,6 +48,8 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII= github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= +github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= diff --git a/processor_test.go b/processor_test.go new file mode 100644 index 0000000..0a2fbac --- /dev/null +++ b/processor_test.go @@ -0,0 +1,187 @@ +package tracebasedlogsampler + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/ptrace" + "go.uber.org/zap" +) + +type mockTraceConsumer struct { + mock.Mock +} + +func (m *mockTraceConsumer) Capabilities() consumer.Capabilities { + return consumer.Capabilities{} +} + +func (m *mockTraceConsumer) ConsumeTraces(_ context.Context, td ptrace.Traces) error { + args := m.Called(td) + return args.Error(0) +} + +type mockLogConsumer struct { + mock.Mock + logs []plog.Logs +} + +func (m *mockLogConsumer) Capabilities() consumer.Capabilities { + return consumer.Capabilities{} +} + +func (m *mockLogConsumer) ConsumeLogs(_ context.Context, l plog.Logs) error { + m.Called(l) + m.logs = append(m.logs, l) + return nil +} + +func generateTestTrace(traceID [16]byte) ptrace.Traces { + td := ptrace.NewTraces() + span := td.ResourceSpans().AppendEmpty(). + ScopeSpans().AppendEmpty(). + Spans().AppendEmpty() + span.SetTraceID(traceID) + return td +} + +func generateTestLog(traceID [16]byte) plog.Logs { + logs := plog.NewLogs() + record := logs.ResourceLogs().AppendEmpty(). + ScopeLogs().AppendEmpty(). + LogRecords().AppendEmpty() + record.SetTraceID(traceID) + return logs +} + +var componentType = component.MustNewType("logbasedtracesampler") + +func TestNewLogSamplerProcessorSingleton_CreatesDistinctForDifferentNames(t *testing.T) { + cfg := &Config{ + BufferDurationTraces: "1s", + BufferDurationLogs: "1s", + } + logger := zap.NewNop() + + idA := component.NewIDWithName(componentType, "a") + idB := component.NewIDWithName(componentType, "b") + + p1 := NewLogSamplerProcessorSingleton(idA, logger, new(mockTraceConsumer), new(mockLogConsumer), cfg) + p2 := NewLogSamplerProcessorSingleton(idB, logger, new(mockTraceConsumer), new(mockLogConsumer), cfg) + + assert.NotSame(t, p1, p2, "different instance names should yield different processors") +} + +func TestNewLogSamplerProcessorSingleton_SameNameReturnsSameWithUpdatedConsumers(t *testing.T) { + cfg := &Config{ + BufferDurationTraces: "1s", + BufferDurationLogs: "1s", + } + logger := zap.NewNop() + + id := component.NewIDWithName(componentType, "foo") + + consumer := new(mockTraceConsumer) + p1 := NewLogSamplerProcessorSingleton(id, logger, consumer, nil, cfg) + + logs := new(mockLogConsumer) + p2 := NewLogSamplerProcessorSingleton(id, logger, nil, logs, cfg) + + assert.Same(t, p1, p2, "same instance name should return same processor") + assert.Equal(t, logs, p2.nextLogsConsumer, "traces consumer should update on re-create") + assert.Equal(t, consumer, p2.nextTracesConsumer, "logs consumer should remain as first set when nil passed") +} + +func TestConsumeTraces_AddsToBuffer(t *testing.T) { + traceID := [16]byte{0xde, 0xad, 0xbe, 0xef} + + mockTrace := new(mockTraceConsumer) + mockTrace.On("ConsumeTraces", mock.Anything).Return(nil) + + mockLog := new(mockLogConsumer) + mockLog.On("ConsumeLogs", mock.Anything).Return(nil) + + cfg := &Config{ + BufferDurationTraces: "1s", + BufferDurationLogs: "300ms", + } + + processor := NewLogSamplerProcessorSingleton(component.NewIDWithName(componentType, "addsToBuffer"), zap.NewNop(), mockTrace, mockLog, cfg) + defer processor.Shutdown(context.Background()) + + td := generateTestTrace(traceID) + err := processor.ConsumeTraces(context.Background(), td) + assert.NoError(t, err) + + log := generateTestLog(traceID) + err = processor.ConsumeLogs(context.Background(), log) + assert.NoError(t, err) + + time.Sleep(100 * time.Millisecond) + + mockLog.AssertCalled(t, "ConsumeLogs", mock.Anything) +} + +func TestConsumeLogs_LogIsDroppedIfNoTraceAppears(t *testing.T) { + traceID := [16]byte{0xaa, 0xbb, 0xcc} + + mockTrace := new(mockTraceConsumer) + mockTrace.On("ConsumeTraces", mock.Anything).Return(nil) + + mockLog := new(mockLogConsumer) + mockLog.On("ConsumeLogs", mock.Anything).Return(nil) + + cfg := &Config{ + BufferDurationTraces: "200ms", + BufferDurationLogs: "200ms", + } + + processor := NewLogSamplerProcessorSingleton(component.NewIDWithName(componentType, "logIsDroppedIfNoTraceAppears"), zap.NewNop(), mockTrace, mockLog, cfg) + defer processor.Shutdown(context.Background()) + + log := generateTestLog(traceID) + err := processor.ConsumeLogs(context.Background(), log) + assert.NoError(t, err) + + time.Sleep(300 * time.Millisecond) + + mockLog.AssertNotCalled(t, "ConsumeLogs", mock.Anything) +} + +func TestConsumeLogs_LogIsForwardedAfterSleep(t *testing.T) { + traceID := [16]byte{0xba, 0xad, 0xf0, 0x0d} + + mockTrace := new(mockTraceConsumer) + mockTrace.On("ConsumeTraces", mock.Anything).Return(nil) + + mockLog := new(mockLogConsumer) + mockLog.On("ConsumeLogs", mock.Anything, mock.Anything).Return(nil) + + cfg := &Config{ + BufferDurationTraces: "30s", + BufferDurationLogs: "50ms", + } + + processor := NewLogSamplerProcessorSingleton(component.NewIDWithName(componentType, "logIsForwardedAfterSleep"), zap.NewNop(), mockTrace, mockLog, cfg) + defer processor.Shutdown(context.Background()) + + logs := generateTestLog(traceID) + err := processor.ConsumeLogs(context.Background(), logs) + assert.NoError(t, err) + + td := generateTestTrace(traceID) + err = processor.ConsumeTraces(context.Background(), td) + assert.NoError(t, err) + + mockLog.AssertNotCalled(t, "ConsumeLogs", mock.Anything, mock.Anything) + + time.Sleep(100 * time.Millisecond) + + mockLog.AssertNumberOfCalls(t, "ConsumeLogs", 1) +}