diff --git a/processor.go b/processor.go index d423680..ef1195a 100644 --- a/processor.go +++ b/processor.go @@ -89,15 +89,27 @@ func (tp *LogSamplerProcessor) Capabilities() consumer.Capabilities { } /** - * For each trace, record the trace id in a buffer. + * Every trace's trace id that is processed by the processor is being remembered. */ func (tp *LogSamplerProcessor) ConsumeTraces(ctx context.Context, td ptrace.Traces) error { - traceId := td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).TraceID() + for i := range td.ResourceSpans().Len() { + resourceSpans := td.ResourceSpans().At(i) - if !traceId.IsEmpty() { - tp.traceIdTtlMap.Add(hex.EncodeToString(traceId[:])) + for j := range resourceSpans.ScopeSpans().Len() { + scopeSpans := resourceSpans.ScopeSpans().At(j) - tp.logger.Debug("Trace added to buffer", zap.String("traceId", hex.EncodeToString(traceId[:]))) + for k := range scopeSpans.Spans().Len() { + span := scopeSpans.Spans().At(k) + traceId := span.TraceID() + + if !traceId.IsEmpty() { + traceIdStr := hex.EncodeToString(traceId[:]) + tp.traceIdTtlMap.Add(traceIdStr) + + tp.logger.Debug("Trace added to buffer", zap.String("traceId", traceIdStr)) + } + } + } } return tp.nextTracesConsumer.ConsumeTraces(ctx, td) diff --git a/processor_test.go b/processor_test.go index 0a2fbac..e7ccda7 100644 --- a/processor_test.go +++ b/processor_test.go @@ -2,6 +2,7 @@ package tracebasedlogsampler import ( "context" + "encoding/hex" "testing" "time" @@ -128,6 +129,47 @@ func TestConsumeTraces_AddsToBuffer(t *testing.T) { mockLog.AssertCalled(t, "ConsumeLogs", mock.Anything) } +func TestConsumeTraces_AddsBatchedTracesToBuffer(t *testing.T) { + traceID1 := [16]byte{0xde, 0xad, 0xbe, 0xef} + traceID2 := [16]byte{0xba, 0xad, 0xf0, 0x0d} + traceID3 := [16]byte{0xca, 0xfe, 0xba, 0xbe} + + mockTrace := new(mockTraceConsumer) + mockTrace.On("ConsumeTraces", mock.Anything).Return(nil) + + mockLog := new(mockLogConsumer) + mockLog.On("ConsumeLogs", mock.Anything).Return(nil) + + cfg := &Config{ + BufferDurationTraces: "1s", + BufferDurationLogs: "300ms", + } + + processor := NewLogSamplerProcessorSingleton(component.NewIDWithName(componentType, "addsBatchedTracesToBuffer"), zap.NewNop(), mockTrace, mockLog, cfg) + defer processor.Shutdown(context.Background()) + + td := ptrace.NewTraces() + + resourceSpans1 := td.ResourceSpans().AppendEmpty() + scopeSpans1 := resourceSpans1.ScopeSpans().AppendEmpty() + span1 := scopeSpans1.Spans().AppendEmpty() + span1.SetTraceID(traceID1) + span2 := scopeSpans1.Spans().AppendEmpty() + span2.SetTraceID(traceID2) + + resourceSpans2 := td.ResourceSpans().AppendEmpty() + scopeSpans2 := resourceSpans2.ScopeSpans().AppendEmpty() + span3 := scopeSpans2.Spans().AppendEmpty() + span3.SetTraceID(traceID3) + + err := processor.ConsumeTraces(context.Background(), td) + assert.NoError(t, err) + + assert.True(t, processor.traceIdTtlMap.Exists(hex.EncodeToString(traceID1[:]))) + assert.True(t, processor.traceIdTtlMap.Exists(hex.EncodeToString(traceID2[:]))) + assert.True(t, processor.traceIdTtlMap.Exists(hex.EncodeToString(traceID3[:]))) +} + func TestConsumeLogs_LogIsDroppedIfNoTraceAppears(t *testing.T) { traceID := [16]byte{0xaa, 0xbb, 0xcc}