From 27ba521d0d726cb9aa2c947f3c0416235eb93fee Mon Sep 17 00:00:00 2001 From: Timo Behrendt Date: Tue, 6 May 2025 19:55:30 +0200 Subject: [PATCH] feat: support batched logs --- processor.go | 64 ++++++++++---- processor_test.go | 208 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 255 insertions(+), 17 deletions(-) diff --git a/processor.go b/processor.go index ef1195a..18cfaa2 100644 --- a/processor.go +++ b/processor.go @@ -116,7 +116,7 @@ func (tp *LogSamplerProcessor) ConsumeTraces(ctx context.Context, td ptrace.Trac } /** - * Upon receiving a log, check if the log's trace id matches any trace id in the buffer. + * Upon receiving logs, check if each log's trace id matches any trace id in the buffer. * If it doesn't, put the log in the buffer for the configured amount of time. * If it does, forward the log. * @@ -124,32 +124,62 @@ func (tp *LogSamplerProcessor) ConsumeTraces(ctx context.Context, td ptrace.Trac * If it does, forward the log. * If not, discard the log. */ -func (tp *LogSamplerProcessor) ConsumeLogs(ctx context.Context, log plog.Logs) error { - traceId := log.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).TraceID() +func (tp *LogSamplerProcessor) ConsumeLogs(ctx context.Context, logs plog.Logs) error { + // Rough estimation of the number of unique trace ids in the logs as the number of logs itself. + // It may be an option to make this configurable or use runtime metrics to zero in on the right size. + logsByTraceId := make(map[string]plog.Logs, logs.LogRecordCount()) - if !traceId.IsEmpty() { - exists := tp.traceIdTtlMap.Exists(hex.EncodeToString(traceId[:])) + for i := range logs.ResourceLogs().Len() { + resourceLogs := logs.ResourceLogs().At(i) + for j := range resourceLogs.ScopeLogs().Len() { + scopeLogs := resourceLogs.ScopeLogs().At(j) + for k := range scopeLogs.LogRecords().Len() { + logRecord := scopeLogs.LogRecords().At(k) + traceId := logRecord.TraceID() + + if !traceId.IsEmpty() { + traceIdStr := hex.EncodeToString(traceId[:]) + + batch, exists := logsByTraceId[traceIdStr] + if !exists { + batch = plog.NewLogs() + logsByTraceId[traceIdStr] = batch + } + + batchResourceLogs := batch.ResourceLogs().AppendEmpty() + resourceLogs.Resource().CopyTo(batchResourceLogs.Resource()) + batchScopeLogs := batchResourceLogs.ScopeLogs().AppendEmpty() + scopeLogs.Scope().CopyTo(batchScopeLogs.Scope()) + newRecord := batchScopeLogs.LogRecords().AppendEmpty() + logRecord.CopyTo(newRecord) + } else { + tp.logger.Warn("Log has no trace id", zap.Any("log", logs)) + } + } + } + } + + for traceIdStr, batch := range logsByTraceId { + exists := tp.traceIdTtlMap.Exists(traceIdStr) if exists { - tp.logger.Debug("Log forwarded directly", zap.String("traceId", hex.EncodeToString(traceId[:]))) - return tp.nextLogsConsumer.ConsumeLogs(ctx, log) + tp.logger.Debug("Logs forwarded directly", zap.String("traceId", traceIdStr)) + tp.nextLogsConsumer.ConsumeLogs(ctx, batch) + continue } - go func(ctx context.Context, log plog.Logs) { - tp.logger.Debug("Log added to buffer", zap.String("traceId", hex.EncodeToString(traceId[:]))) - + go func(ctx context.Context, traceIdStr string, batch plog.Logs) { + tp.logger.Debug("Logs added to buffer", zap.String("traceId", traceIdStr)) duration, _ := time.ParseDuration(tp.config.BufferDurationLogs) time.Sleep(duration) - exists := tp.traceIdTtlMap.Exists(hex.EncodeToString(traceId[:])) + exists := tp.traceIdTtlMap.Exists(traceIdStr) if exists { - tp.logger.Debug("Log forwarded after buffer expiration", zap.String("traceId", hex.EncodeToString(traceId[:]))) - tp.nextLogsConsumer.ConsumeLogs(ctx, log) + tp.logger.Debug("Logs forwarded after buffer expiration", zap.String("traceId", traceIdStr)) + tp.nextLogsConsumer.ConsumeLogs(ctx, batch) } else { - tp.logger.Debug("Log discarded", zap.String("traceId", hex.EncodeToString(traceId[:]))) + tp.logger.Debug("Logs discarded", zap.String("traceId", traceIdStr)) } - }(ctx, log) - } else { - tp.logger.Warn("Log has no trace id", zap.Any("log", log)) + }(ctx, traceIdStr, batch) } return nil diff --git a/processor_test.go b/processor_test.go index e7ccda7..002df27 100644 --- a/processor_test.go +++ b/processor_test.go @@ -3,6 +3,7 @@ package tracebasedlogsampler import ( "context" "encoding/hex" + "fmt" "testing" "time" @@ -170,6 +171,213 @@ func TestConsumeTraces_AddsBatchedTracesToBuffer(t *testing.T) { assert.True(t, processor.traceIdTtlMap.Exists(hex.EncodeToString(traceID3[:]))) } +func TestConsumeLogs_BatchesLogsWithSameTraceId(t *testing.T) { + traceID := [16]byte{0xde, 0xad, 0xbe, 0xef} + + mockTrace := new(mockTraceConsumer) + mockTrace.On("ConsumeTraces", mock.Anything).Return(nil) + + mockLog := new(mockLogConsumer) + mockLog.On("ConsumeLogs", mock.Anything).Return(nil) + + cfg := &Config{ + BufferDurationTraces: "1s", + BufferDurationLogs: "300ms", + } + + processor := NewLogSamplerProcessorSingleton(component.NewIDWithName(componentType, "batchesLogsWithSameTraceId"), zap.NewNop(), mockTrace, mockLog, cfg) + defer processor.Shutdown(context.Background()) + + logs := plog.NewLogs() + resourceLogs := logs.ResourceLogs().AppendEmpty() + scopeLogs := resourceLogs.ScopeLogs().AppendEmpty() + + for i := 0; i < 3; i++ { + logRecord := scopeLogs.LogRecords().AppendEmpty() + logRecord.SetTraceID(traceID) + logRecord.Body().SetStr(fmt.Sprintf("log %d", i)) + } + + td := generateTestTrace(traceID) + err := processor.ConsumeTraces(context.Background(), td) + assert.NoError(t, err) + + err = processor.ConsumeLogs(context.Background(), logs) + assert.NoError(t, err) + + mockLog.AssertNumberOfCalls(t, "ConsumeLogs", 1) + forwardedLogs := mockLog.logs[0] + assert.Equal(t, 3, forwardedLogs.LogRecordCount()) +} + +func TestConsumeLogs_BatchesLogsWithDifferentTraceIds(t *testing.T) { + traceID1 := [16]byte{0xde, 0xad, 0xbe, 0xef} + traceID2 := [16]byte{0xba, 0xad, 0xf0, 0x0d} + + mockTrace := new(mockTraceConsumer) + mockTrace.On("ConsumeTraces", mock.Anything).Return(nil) + + mockLog := new(mockLogConsumer) + mockLog.On("ConsumeLogs", mock.Anything).Return(nil) + + cfg := &Config{ + BufferDurationTraces: "1s", + BufferDurationLogs: "300ms", + } + + processor := NewLogSamplerProcessorSingleton(component.NewIDWithName(componentType, "batchesLogsWithDifferentTraceIds"), zap.NewNop(), mockTrace, mockLog, cfg) + defer processor.Shutdown(context.Background()) + + logs := plog.NewLogs() + resourceLogs := logs.ResourceLogs().AppendEmpty() + scopeLogs := resourceLogs.ScopeLogs().AppendEmpty() + + for i := range 2 { + logRecord := scopeLogs.LogRecords().AppendEmpty() + logRecord.SetTraceID(traceID1) + logRecord.Body().SetStr(fmt.Sprintf("log1_%d", i)) + } + + logRecord := scopeLogs.LogRecords().AppendEmpty() + logRecord.SetTraceID(traceID2) + logRecord.Body().SetStr("log2_0") + + td1 := generateTestTrace(traceID1) + err := processor.ConsumeTraces(context.Background(), td1) + assert.NoError(t, err) + + td2 := generateTestTrace(traceID2) + err = processor.ConsumeTraces(context.Background(), td2) + assert.NoError(t, err) + + err = processor.ConsumeLogs(context.Background(), logs) + assert.NoError(t, err) + + mockLog.AssertNumberOfCalls(t, "ConsumeLogs", 2) +} + +func TestConsumeLogs_PreservesIntegrity(t *testing.T) { + /* + Input structure: + resource1 (service1) + └── scope1 + ├── log1 (traceId1) + └── log2 (traceId2) + resource2 (service2) + └── scope1 + └── log3 (traceId1) + + Expected output structure (batched by traceId): + traceId1 batch: + resource1 (service1) + └── scope1 + └── log1 + resource2 (service2) + └── scope1 + └── log3 + traceId2 batch: + resource1 (service1) + └── scope1 + └── log2 + */ + + traceID1 := [16]byte{0xde, 0xad, 0xbe, 0xef} + traceID2 := [16]byte{0xba, 0xad, 0xf0, 0x0d} + + mockTrace := new(mockTraceConsumer) + mockTrace.On("ConsumeTraces", mock.Anything).Return(nil) + + mockLog := new(mockLogConsumer) + mockLog.On("ConsumeLogs", mock.Anything).Return(nil) + + cfg := &Config{ + BufferDurationTraces: "1s", + BufferDurationLogs: "300ms", + } + + processor := NewLogSamplerProcessorSingleton(component.NewIDWithName(componentType, "preservesIntegrity"), zap.NewNop(), mockTrace, mockLog, cfg) + defer processor.Shutdown(context.Background()) + + logs := plog.NewLogs() + + resourceLogs1 := logs.ResourceLogs().AppendEmpty() + resourceLogs1.Resource().Attributes().PutStr("service.name", "service1") + scopeLogs1 := resourceLogs1.ScopeLogs().AppendEmpty() + scopeLogs1.Scope().SetName("scope1") + + log1 := scopeLogs1.LogRecords().AppendEmpty() + log1.SetTraceID(traceID1) + log1.Body().SetStr("log1") + log2 := scopeLogs1.LogRecords().AppendEmpty() + log2.SetTraceID(traceID2) + log2.Body().SetStr("log2") + + resourceLogs2 := logs.ResourceLogs().AppendEmpty() + resourceLogs2.Resource().Attributes().PutStr("service.name", "service2") + scopeLogs2 := resourceLogs2.ScopeLogs().AppendEmpty() + scopeLogs2.Scope().SetName("scope1") + log3 := scopeLogs2.LogRecords().AppendEmpty() + log3.SetTraceID(traceID1) + log3.Body().SetStr("log3") + + for _, traceID := range [][16]byte{traceID1, traceID2} { + td := generateTestTrace(traceID) + err := processor.ConsumeTraces(context.Background(), td) + assert.NoError(t, err) + } + + err := processor.ConsumeLogs(context.Background(), logs) + assert.NoError(t, err) + + mockLog.AssertNumberOfCalls(t, "ConsumeLogs", 2) + + var traceID1Batch, traceID2Batch plog.Logs + for _, batch := range mockLog.logs { + if batch.LogRecordCount() > 0 { + record := batch.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0) + traceID := record.TraceID() + if hex.EncodeToString(traceID[:]) == hex.EncodeToString(traceID1[:]) { + traceID1Batch = batch + } else { + traceID2Batch = batch + } + } + } + + assert.Equal(t, 2, traceID1Batch.LogRecordCount(), "traceID1 batch should have 2 logs") + assert.Equal(t, 2, traceID1Batch.ResourceLogs().Len(), "traceID1 batch should have 2 resources") + + resourceLogs1 = traceID1Batch.ResourceLogs().At(0) + val1, exists := resourceLogs1.Resource().Attributes().Get("service.name") + assert.True(t, exists, "service.name attribute should exist for first resource") + assert.Equal(t, "service1", val1.Str(), "first resource should have service1") + + scopeLogs1 = resourceLogs1.ScopeLogs().At(0) + assert.Equal(t, "scope1", scopeLogs1.Scope().Name(), "first scope name should be preserved") + assert.Equal(t, "log1", scopeLogs1.LogRecords().At(0).Body().Str(), "first log body should be preserved") + + resourceLogs2 = traceID1Batch.ResourceLogs().At(1) + val2, exists := resourceLogs2.Resource().Attributes().Get("service.name") + assert.True(t, exists, "service.name attribute should exist for second resource") + assert.Equal(t, "service2", val2.Str(), "second resource should have service2") + + scopeLogs2 = resourceLogs2.ScopeLogs().At(0) + assert.Equal(t, "scope1", scopeLogs2.Scope().Name(), "second scope name should be preserved") + assert.Equal(t, "log3", scopeLogs2.LogRecords().At(0).Body().Str(), "second log body should be preserved") + + assert.Equal(t, 1, traceID2Batch.LogRecordCount(), "traceID2 batch should have 1 log") + assert.Equal(t, 1, traceID2Batch.ResourceLogs().Len(), "traceID2 batch should have 1 resource") + + resourceLogs := traceID2Batch.ResourceLogs().At(0) + val, exists := resourceLogs.Resource().Attributes().Get("service.name") + assert.True(t, exists, "service.name attribute should exist") + assert.Equal(t, "service1", val.Str(), "should have service1") + + scopeLogs := resourceLogs.ScopeLogs().At(0) + assert.Equal(t, "scope1", scopeLogs.Scope().Name(), "scope name should be preserved") + assert.Equal(t, "log2", scopeLogs.LogRecords().At(0).Body().Str(), "log body should be preserved") +} + func TestConsumeLogs_LogIsDroppedIfNoTraceAppears(t *testing.T) { traceID := [16]byte{0xaa, 0xbb, 0xcc}