feat: support batches #7
20
processor.go
20
processor.go
@@ -89,15 +89,27 @@ func (tp *LogSamplerProcessor) Capabilities() consumer.Capabilities {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* For each trace, record the trace id in a buffer.
|
* Every trace's trace id that is processed by the processor is being remembered.
|
||||||
*/
|
*/
|
||||||
func (tp *LogSamplerProcessor) ConsumeTraces(ctx context.Context, td ptrace.Traces) error {
|
func (tp *LogSamplerProcessor) ConsumeTraces(ctx context.Context, td ptrace.Traces) error {
|
||||||
traceId := td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).TraceID()
|
for i := range td.ResourceSpans().Len() {
|
||||||
|
resourceSpans := td.ResourceSpans().At(i)
|
||||||
|
|
||||||
|
for j := range resourceSpans.ScopeSpans().Len() {
|
||||||
|
scopeSpans := resourceSpans.ScopeSpans().At(j)
|
||||||
|
|
||||||
|
for k := range scopeSpans.Spans().Len() {
|
||||||
|
span := scopeSpans.Spans().At(k)
|
||||||
|
traceId := span.TraceID()
|
||||||
|
|
||||||
if !traceId.IsEmpty() {
|
if !traceId.IsEmpty() {
|
||||||
tp.traceIdTtlMap.Add(hex.EncodeToString(traceId[:]))
|
traceIdStr := hex.EncodeToString(traceId[:])
|
||||||
|
tp.traceIdTtlMap.Add(traceIdStr)
|
||||||
|
|
||||||
tp.logger.Debug("Trace added to buffer", zap.String("traceId", hex.EncodeToString(traceId[:])))
|
tp.logger.Debug("Trace added to buffer", zap.String("traceId", traceIdStr))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return tp.nextTracesConsumer.ConsumeTraces(ctx, td)
|
return tp.nextTracesConsumer.ConsumeTraces(ctx, td)
|
||||||
|
|||||||
@@ -2,6 +2,7 @@ package tracebasedlogsampler
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"encoding/hex"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@@ -128,6 +129,47 @@ func TestConsumeTraces_AddsToBuffer(t *testing.T) {
|
|||||||
mockLog.AssertCalled(t, "ConsumeLogs", mock.Anything)
|
mockLog.AssertCalled(t, "ConsumeLogs", mock.Anything)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestConsumeTraces_AddsBatchedTracesToBuffer(t *testing.T) {
|
||||||
|
traceID1 := [16]byte{0xde, 0xad, 0xbe, 0xef}
|
||||||
|
traceID2 := [16]byte{0xba, 0xad, 0xf0, 0x0d}
|
||||||
|
traceID3 := [16]byte{0xca, 0xfe, 0xba, 0xbe}
|
||||||
|
|
||||||
|
mockTrace := new(mockTraceConsumer)
|
||||||
|
mockTrace.On("ConsumeTraces", mock.Anything).Return(nil)
|
||||||
|
|
||||||
|
mockLog := new(mockLogConsumer)
|
||||||
|
mockLog.On("ConsumeLogs", mock.Anything).Return(nil)
|
||||||
|
|
||||||
|
cfg := &Config{
|
||||||
|
BufferDurationTraces: "1s",
|
||||||
|
BufferDurationLogs: "300ms",
|
||||||
|
}
|
||||||
|
|
||||||
|
processor := NewLogSamplerProcessorSingleton(component.NewIDWithName(componentType, "addsBatchedTracesToBuffer"), zap.NewNop(), mockTrace, mockLog, cfg)
|
||||||
|
defer processor.Shutdown(context.Background())
|
||||||
|
|
||||||
|
td := ptrace.NewTraces()
|
||||||
|
|
||||||
|
resourceSpans1 := td.ResourceSpans().AppendEmpty()
|
||||||
|
scopeSpans1 := resourceSpans1.ScopeSpans().AppendEmpty()
|
||||||
|
span1 := scopeSpans1.Spans().AppendEmpty()
|
||||||
|
span1.SetTraceID(traceID1)
|
||||||
|
span2 := scopeSpans1.Spans().AppendEmpty()
|
||||||
|
span2.SetTraceID(traceID2)
|
||||||
|
|
||||||
|
resourceSpans2 := td.ResourceSpans().AppendEmpty()
|
||||||
|
scopeSpans2 := resourceSpans2.ScopeSpans().AppendEmpty()
|
||||||
|
span3 := scopeSpans2.Spans().AppendEmpty()
|
||||||
|
span3.SetTraceID(traceID3)
|
||||||
|
|
||||||
|
err := processor.ConsumeTraces(context.Background(), td)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
assert.True(t, processor.traceIdTtlMap.Exists(hex.EncodeToString(traceID1[:])))
|
||||||
|
assert.True(t, processor.traceIdTtlMap.Exists(hex.EncodeToString(traceID2[:])))
|
||||||
|
assert.True(t, processor.traceIdTtlMap.Exists(hex.EncodeToString(traceID3[:])))
|
||||||
|
}
|
||||||
|
|
||||||
func TestConsumeLogs_LogIsDroppedIfNoTraceAppears(t *testing.T) {
|
func TestConsumeLogs_LogIsDroppedIfNoTraceAppears(t *testing.T) {
|
||||||
traceID := [16]byte{0xaa, 0xbb, 0xcc}
|
traceID := [16]byte{0xaa, 0xbb, 0xcc}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user