feat: support batched logs
All checks were successful
CI / test (push) Successful in 19s

This commit is contained in:
2025-05-06 19:55:30 +02:00
parent f74dcd4ed3
commit 27ba521d0d
2 changed files with 255 additions and 17 deletions

View File

@@ -116,7 +116,7 @@ func (tp *LogSamplerProcessor) ConsumeTraces(ctx context.Context, td ptrace.Trac
} }
/** /**
* Upon receiving a log, check if the log's trace id matches any trace id in the buffer. * Upon receiving logs, check if each log's trace id matches any trace id in the buffer.
* If it doesn't, put the log in the buffer for the configured amount of time. * If it doesn't, put the log in the buffer for the configured amount of time.
* If it does, forward the log. * If it does, forward the log.
* *
@@ -124,32 +124,62 @@ func (tp *LogSamplerProcessor) ConsumeTraces(ctx context.Context, td ptrace.Trac
* If it does, forward the log. * If it does, forward the log.
* If not, discard the log. * If not, discard the log.
*/ */
func (tp *LogSamplerProcessor) ConsumeLogs(ctx context.Context, log plog.Logs) error { func (tp *LogSamplerProcessor) ConsumeLogs(ctx context.Context, logs plog.Logs) error {
traceId := log.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).TraceID() // Rough estimation of the number of unique trace ids in the logs as the number of logs itself.
// It may be an option to make this configurable or use runtime metrics to zero in on the right size.
logsByTraceId := make(map[string]plog.Logs, logs.LogRecordCount())
if !traceId.IsEmpty() { for i := range logs.ResourceLogs().Len() {
exists := tp.traceIdTtlMap.Exists(hex.EncodeToString(traceId[:])) resourceLogs := logs.ResourceLogs().At(i)
for j := range resourceLogs.ScopeLogs().Len() {
scopeLogs := resourceLogs.ScopeLogs().At(j)
for k := range scopeLogs.LogRecords().Len() {
logRecord := scopeLogs.LogRecords().At(k)
traceId := logRecord.TraceID()
if !traceId.IsEmpty() {
traceIdStr := hex.EncodeToString(traceId[:])
batch, exists := logsByTraceId[traceIdStr]
if !exists {
batch = plog.NewLogs()
logsByTraceId[traceIdStr] = batch
}
batchResourceLogs := batch.ResourceLogs().AppendEmpty()
resourceLogs.Resource().CopyTo(batchResourceLogs.Resource())
batchScopeLogs := batchResourceLogs.ScopeLogs().AppendEmpty()
scopeLogs.Scope().CopyTo(batchScopeLogs.Scope())
newRecord := batchScopeLogs.LogRecords().AppendEmpty()
logRecord.CopyTo(newRecord)
} else {
tp.logger.Warn("Log has no trace id", zap.Any("log", logs))
}
}
}
}
for traceIdStr, batch := range logsByTraceId {
exists := tp.traceIdTtlMap.Exists(traceIdStr)
if exists { if exists {
tp.logger.Debug("Log forwarded directly", zap.String("traceId", hex.EncodeToString(traceId[:]))) tp.logger.Debug("Logs forwarded directly", zap.String("traceId", traceIdStr))
return tp.nextLogsConsumer.ConsumeLogs(ctx, log) tp.nextLogsConsumer.ConsumeLogs(ctx, batch)
continue
} }
go func(ctx context.Context, log plog.Logs) { go func(ctx context.Context, traceIdStr string, batch plog.Logs) {
tp.logger.Debug("Log added to buffer", zap.String("traceId", hex.EncodeToString(traceId[:]))) tp.logger.Debug("Logs added to buffer", zap.String("traceId", traceIdStr))
duration, _ := time.ParseDuration(tp.config.BufferDurationLogs) duration, _ := time.ParseDuration(tp.config.BufferDurationLogs)
time.Sleep(duration) time.Sleep(duration)
exists := tp.traceIdTtlMap.Exists(hex.EncodeToString(traceId[:])) exists := tp.traceIdTtlMap.Exists(traceIdStr)
if exists { if exists {
tp.logger.Debug("Log forwarded after buffer expiration", zap.String("traceId", hex.EncodeToString(traceId[:]))) tp.logger.Debug("Logs forwarded after buffer expiration", zap.String("traceId", traceIdStr))
tp.nextLogsConsumer.ConsumeLogs(ctx, log) tp.nextLogsConsumer.ConsumeLogs(ctx, batch)
} else { } else {
tp.logger.Debug("Log discarded", zap.String("traceId", hex.EncodeToString(traceId[:]))) tp.logger.Debug("Logs discarded", zap.String("traceId", traceIdStr))
} }
}(ctx, log) }(ctx, traceIdStr, batch)
} else {
tp.logger.Warn("Log has no trace id", zap.Any("log", log))
} }
return nil return nil

View File

@@ -3,6 +3,7 @@ package tracebasedlogsampler
import ( import (
"context" "context"
"encoding/hex" "encoding/hex"
"fmt"
"testing" "testing"
"time" "time"
@@ -170,6 +171,213 @@ func TestConsumeTraces_AddsBatchedTracesToBuffer(t *testing.T) {
assert.True(t, processor.traceIdTtlMap.Exists(hex.EncodeToString(traceID3[:]))) assert.True(t, processor.traceIdTtlMap.Exists(hex.EncodeToString(traceID3[:])))
} }
func TestConsumeLogs_BatchesLogsWithSameTraceId(t *testing.T) {
traceID := [16]byte{0xde, 0xad, 0xbe, 0xef}
mockTrace := new(mockTraceConsumer)
mockTrace.On("ConsumeTraces", mock.Anything).Return(nil)
mockLog := new(mockLogConsumer)
mockLog.On("ConsumeLogs", mock.Anything).Return(nil)
cfg := &Config{
BufferDurationTraces: "1s",
BufferDurationLogs: "300ms",
}
processor := NewLogSamplerProcessorSingleton(component.NewIDWithName(componentType, "batchesLogsWithSameTraceId"), zap.NewNop(), mockTrace, mockLog, cfg)
defer processor.Shutdown(context.Background())
logs := plog.NewLogs()
resourceLogs := logs.ResourceLogs().AppendEmpty()
scopeLogs := resourceLogs.ScopeLogs().AppendEmpty()
for i := 0; i < 3; i++ {
logRecord := scopeLogs.LogRecords().AppendEmpty()
logRecord.SetTraceID(traceID)
logRecord.Body().SetStr(fmt.Sprintf("log %d", i))
}
td := generateTestTrace(traceID)
err := processor.ConsumeTraces(context.Background(), td)
assert.NoError(t, err)
err = processor.ConsumeLogs(context.Background(), logs)
assert.NoError(t, err)
mockLog.AssertNumberOfCalls(t, "ConsumeLogs", 1)
forwardedLogs := mockLog.logs[0]
assert.Equal(t, 3, forwardedLogs.LogRecordCount())
}
func TestConsumeLogs_BatchesLogsWithDifferentTraceIds(t *testing.T) {
traceID1 := [16]byte{0xde, 0xad, 0xbe, 0xef}
traceID2 := [16]byte{0xba, 0xad, 0xf0, 0x0d}
mockTrace := new(mockTraceConsumer)
mockTrace.On("ConsumeTraces", mock.Anything).Return(nil)
mockLog := new(mockLogConsumer)
mockLog.On("ConsumeLogs", mock.Anything).Return(nil)
cfg := &Config{
BufferDurationTraces: "1s",
BufferDurationLogs: "300ms",
}
processor := NewLogSamplerProcessorSingleton(component.NewIDWithName(componentType, "batchesLogsWithDifferentTraceIds"), zap.NewNop(), mockTrace, mockLog, cfg)
defer processor.Shutdown(context.Background())
logs := plog.NewLogs()
resourceLogs := logs.ResourceLogs().AppendEmpty()
scopeLogs := resourceLogs.ScopeLogs().AppendEmpty()
for i := range 2 {
logRecord := scopeLogs.LogRecords().AppendEmpty()
logRecord.SetTraceID(traceID1)
logRecord.Body().SetStr(fmt.Sprintf("log1_%d", i))
}
logRecord := scopeLogs.LogRecords().AppendEmpty()
logRecord.SetTraceID(traceID2)
logRecord.Body().SetStr("log2_0")
td1 := generateTestTrace(traceID1)
err := processor.ConsumeTraces(context.Background(), td1)
assert.NoError(t, err)
td2 := generateTestTrace(traceID2)
err = processor.ConsumeTraces(context.Background(), td2)
assert.NoError(t, err)
err = processor.ConsumeLogs(context.Background(), logs)
assert.NoError(t, err)
mockLog.AssertNumberOfCalls(t, "ConsumeLogs", 2)
}
func TestConsumeLogs_PreservesIntegrity(t *testing.T) {
/*
Input structure:
resource1 (service1)
└── scope1
├── log1 (traceId1)
└── log2 (traceId2)
resource2 (service2)
└── scope1
└── log3 (traceId1)
Expected output structure (batched by traceId):
traceId1 batch:
resource1 (service1)
└── scope1
└── log1
resource2 (service2)
└── scope1
└── log3
traceId2 batch:
resource1 (service1)
└── scope1
└── log2
*/
traceID1 := [16]byte{0xde, 0xad, 0xbe, 0xef}
traceID2 := [16]byte{0xba, 0xad, 0xf0, 0x0d}
mockTrace := new(mockTraceConsumer)
mockTrace.On("ConsumeTraces", mock.Anything).Return(nil)
mockLog := new(mockLogConsumer)
mockLog.On("ConsumeLogs", mock.Anything).Return(nil)
cfg := &Config{
BufferDurationTraces: "1s",
BufferDurationLogs: "300ms",
}
processor := NewLogSamplerProcessorSingleton(component.NewIDWithName(componentType, "preservesIntegrity"), zap.NewNop(), mockTrace, mockLog, cfg)
defer processor.Shutdown(context.Background())
logs := plog.NewLogs()
resourceLogs1 := logs.ResourceLogs().AppendEmpty()
resourceLogs1.Resource().Attributes().PutStr("service.name", "service1")
scopeLogs1 := resourceLogs1.ScopeLogs().AppendEmpty()
scopeLogs1.Scope().SetName("scope1")
log1 := scopeLogs1.LogRecords().AppendEmpty()
log1.SetTraceID(traceID1)
log1.Body().SetStr("log1")
log2 := scopeLogs1.LogRecords().AppendEmpty()
log2.SetTraceID(traceID2)
log2.Body().SetStr("log2")
resourceLogs2 := logs.ResourceLogs().AppendEmpty()
resourceLogs2.Resource().Attributes().PutStr("service.name", "service2")
scopeLogs2 := resourceLogs2.ScopeLogs().AppendEmpty()
scopeLogs2.Scope().SetName("scope1")
log3 := scopeLogs2.LogRecords().AppendEmpty()
log3.SetTraceID(traceID1)
log3.Body().SetStr("log3")
for _, traceID := range [][16]byte{traceID1, traceID2} {
td := generateTestTrace(traceID)
err := processor.ConsumeTraces(context.Background(), td)
assert.NoError(t, err)
}
err := processor.ConsumeLogs(context.Background(), logs)
assert.NoError(t, err)
mockLog.AssertNumberOfCalls(t, "ConsumeLogs", 2)
var traceID1Batch, traceID2Batch plog.Logs
for _, batch := range mockLog.logs {
if batch.LogRecordCount() > 0 {
record := batch.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0)
traceID := record.TraceID()
if hex.EncodeToString(traceID[:]) == hex.EncodeToString(traceID1[:]) {
traceID1Batch = batch
} else {
traceID2Batch = batch
}
}
}
assert.Equal(t, 2, traceID1Batch.LogRecordCount(), "traceID1 batch should have 2 logs")
assert.Equal(t, 2, traceID1Batch.ResourceLogs().Len(), "traceID1 batch should have 2 resources")
resourceLogs1 = traceID1Batch.ResourceLogs().At(0)
val1, exists := resourceLogs1.Resource().Attributes().Get("service.name")
assert.True(t, exists, "service.name attribute should exist for first resource")
assert.Equal(t, "service1", val1.Str(), "first resource should have service1")
scopeLogs1 = resourceLogs1.ScopeLogs().At(0)
assert.Equal(t, "scope1", scopeLogs1.Scope().Name(), "first scope name should be preserved")
assert.Equal(t, "log1", scopeLogs1.LogRecords().At(0).Body().Str(), "first log body should be preserved")
resourceLogs2 = traceID1Batch.ResourceLogs().At(1)
val2, exists := resourceLogs2.Resource().Attributes().Get("service.name")
assert.True(t, exists, "service.name attribute should exist for second resource")
assert.Equal(t, "service2", val2.Str(), "second resource should have service2")
scopeLogs2 = resourceLogs2.ScopeLogs().At(0)
assert.Equal(t, "scope1", scopeLogs2.Scope().Name(), "second scope name should be preserved")
assert.Equal(t, "log3", scopeLogs2.LogRecords().At(0).Body().Str(), "second log body should be preserved")
assert.Equal(t, 1, traceID2Batch.LogRecordCount(), "traceID2 batch should have 1 log")
assert.Equal(t, 1, traceID2Batch.ResourceLogs().Len(), "traceID2 batch should have 1 resource")
resourceLogs := traceID2Batch.ResourceLogs().At(0)
val, exists := resourceLogs.Resource().Attributes().Get("service.name")
assert.True(t, exists, "service.name attribute should exist")
assert.Equal(t, "service1", val.Str(), "should have service1")
scopeLogs := resourceLogs.ScopeLogs().At(0)
assert.Equal(t, "scope1", scopeLogs.Scope().Name(), "scope name should be preserved")
assert.Equal(t, "log2", scopeLogs.LogRecords().At(0).Body().Str(), "log body should be preserved")
}
func TestConsumeLogs_LogIsDroppedIfNoTraceAppears(t *testing.T) { func TestConsumeLogs_LogIsDroppedIfNoTraceAppears(t *testing.T) {
traceID := [16]byte{0xaa, 0xbb, 0xcc} traceID := [16]byte{0xaa, 0xbb, 0xcc}