Compare commits
3 Commits
bf09c0e32e
...
feat-add-i
| Author | SHA1 | Date | |
|---|---|---|---|
| d38a5e7607 | |||
| 00944d01ad | |||
| ce7d18e880 |
@@ -90,19 +90,7 @@ func (m *TTLMap) Add(key string) {
|
|||||||
* Removes the trace id from the map if it has expired.
|
* Removes the trace id from the map if it has expired.
|
||||||
*/
|
*/
|
||||||
func (m *TTLMap) Exists(key string) bool {
|
func (m *TTLMap) Exists(key string) bool {
|
||||||
m.mu.RLock()
|
_, ok := m.Get(key)
|
||||||
value, ok := m.m[key]
|
|
||||||
m.mu.RUnlock()
|
|
||||||
|
|
||||||
if ok {
|
|
||||||
if value < time.Now().Unix() {
|
|
||||||
m.mu.Lock()
|
|
||||||
delete(m.m, key)
|
|
||||||
m.mu.Unlock()
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return ok
|
return ok
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -116,6 +104,26 @@ func (m *TTLMap) insertEntry(key string, value int64) {
|
|||||||
m.mu.Unlock()
|
m.mu.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Gets an entry from the map.
|
||||||
|
*/
|
||||||
|
func (m *TTLMap) Get(key string) (int64, bool) {
|
||||||
|
m.mu.RLock()
|
||||||
|
value, ok := m.m[key]
|
||||||
|
m.mu.RUnlock()
|
||||||
|
|
||||||
|
if ok {
|
||||||
|
if value < time.Now().Unix() {
|
||||||
|
m.mu.Lock()
|
||||||
|
delete(m.m, key)
|
||||||
|
m.mu.Unlock()
|
||||||
|
return 0, false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return value, ok
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Gets an entry from the map.
|
* Gets an entry from the map.
|
||||||
* Only used for testing.
|
* Only used for testing.
|
||||||
@@ -124,7 +132,6 @@ func (m *TTLMap) getEntry(key string) (int64, bool) {
|
|||||||
m.mu.RLock()
|
m.mu.RLock()
|
||||||
value, ok := m.m[key]
|
value, ok := m.m[key]
|
||||||
m.mu.RUnlock()
|
m.mu.RUnlock()
|
||||||
|
|
||||||
return value, ok
|
return value, ok
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
171
processor.go
171
processor.go
@@ -12,9 +12,63 @@ import (
|
|||||||
"go.opentelemetry.io/collector/consumer"
|
"go.opentelemetry.io/collector/consumer"
|
||||||
"go.opentelemetry.io/collector/pdata/plog"
|
"go.opentelemetry.io/collector/pdata/plog"
|
||||||
"go.opentelemetry.io/collector/pdata/ptrace"
|
"go.opentelemetry.io/collector/pdata/ptrace"
|
||||||
|
"go.opentelemetry.io/otel"
|
||||||
|
"go.opentelemetry.io/otel/metric"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
meter = otel.GetMeterProvider().Meter("tracebasedlogsampler")
|
||||||
|
|
||||||
|
tracesProcessedCounter, _ = meter.Int64Counter(
|
||||||
|
"otelcol_processor_tracebasedlogsampler_traces_processed",
|
||||||
|
metric.WithDescription("Number of traces processed"),
|
||||||
|
metric.WithUnit("1"),
|
||||||
|
)
|
||||||
|
|
||||||
|
emptyTraceIdsCounter, _ = meter.Int64Counter(
|
||||||
|
"otelcol_processor_tracebasedlogsampler_traces_empty_trace_ids",
|
||||||
|
metric.WithDescription("Number of spans with empty trace IDs"),
|
||||||
|
metric.WithUnit("1"),
|
||||||
|
)
|
||||||
|
|
||||||
|
traceProcessingTimeHistogram, _ = meter.Int64Histogram(
|
||||||
|
"otelcol_processor_tracebasedlogsampler_traces_processing_time",
|
||||||
|
metric.WithDescription("Time spent processing traces"),
|
||||||
|
metric.WithUnit("ms"),
|
||||||
|
)
|
||||||
|
|
||||||
|
logsProcessedCounter, _ = meter.Int64Counter(
|
||||||
|
"otelcol_processor_tracebasedlogsampler_logs_processed",
|
||||||
|
metric.WithDescription("Number of log batches processed"),
|
||||||
|
metric.WithUnit("1"),
|
||||||
|
)
|
||||||
|
|
||||||
|
logsForwardedCounter, _ = meter.Int64Counter(
|
||||||
|
"otelcol_processor_tracebasedlogsampler_logs_forwarded",
|
||||||
|
metric.WithDescription("Number of log batches forwarded"),
|
||||||
|
metric.WithUnit("1"),
|
||||||
|
)
|
||||||
|
|
||||||
|
logsDiscardedCounter, _ = meter.Int64Counter(
|
||||||
|
"otelcol_processor_tracebasedlogsampler_logs_discarded",
|
||||||
|
metric.WithDescription("Number of log batches discarded"),
|
||||||
|
metric.WithUnit("1"),
|
||||||
|
)
|
||||||
|
|
||||||
|
logProcessingTimeHistogram, _ = meter.Int64Histogram(
|
||||||
|
"otelcol_processor_tracebasedlogsampler_logs_processing_time",
|
||||||
|
metric.WithDescription("Time spent processing logs"),
|
||||||
|
metric.WithUnit("ms"),
|
||||||
|
)
|
||||||
|
|
||||||
|
traceAgeAtLogCheckHistogram, _ = meter.Int64Histogram(
|
||||||
|
"otelcol_processor_tracebasedlogsampler_logs_trace_age_at_log_check",
|
||||||
|
metric.WithDescription("Age of trace when checking logs after buffer expiration"),
|
||||||
|
metric.WithUnit("ms"),
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
type LogSamplerProcessor struct {
|
type LogSamplerProcessor struct {
|
||||||
host component.Host
|
host component.Host
|
||||||
cancel context.CancelFunc
|
cancel context.CancelFunc
|
||||||
@@ -89,22 +143,42 @@ func (tp *LogSamplerProcessor) Capabilities() consumer.Capabilities {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* For each trace, record the trace id in a buffer.
|
* Every trace's trace id that is processed by the processor is being remembered.
|
||||||
*/
|
*/
|
||||||
func (tp *LogSamplerProcessor) ConsumeTraces(ctx context.Context, td ptrace.Traces) error {
|
func (tp *LogSamplerProcessor) ConsumeTraces(ctx context.Context, td ptrace.Traces) error {
|
||||||
traceId := td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).TraceID()
|
startTime := time.Now()
|
||||||
|
tracesProcessedCounter.Add(ctx, 1)
|
||||||
|
|
||||||
if !traceId.IsEmpty() {
|
for i := range td.ResourceSpans().Len() {
|
||||||
tp.traceIdTtlMap.Add(hex.EncodeToString(traceId[:]))
|
resourceSpans := td.ResourceSpans().At(i)
|
||||||
|
|
||||||
tp.logger.Debug("Trace added to buffer", zap.String("traceId", hex.EncodeToString(traceId[:])))
|
for j := range resourceSpans.ScopeSpans().Len() {
|
||||||
|
scopeSpans := resourceSpans.ScopeSpans().At(j)
|
||||||
|
|
||||||
|
for k := range scopeSpans.Spans().Len() {
|
||||||
|
span := scopeSpans.Spans().At(k)
|
||||||
|
traceId := span.TraceID()
|
||||||
|
|
||||||
|
if !traceId.IsEmpty() {
|
||||||
|
traceIdStr := hex.EncodeToString(traceId[:])
|
||||||
|
tp.traceIdTtlMap.Add(traceIdStr)
|
||||||
|
|
||||||
|
tp.logger.Debug("Trace added to buffer", zap.String("traceId", traceIdStr))
|
||||||
|
} else {
|
||||||
|
emptyTraceIdsCounter.Add(ctx, 1)
|
||||||
|
tp.logger.Warn("Empty trace ID encountered")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
traceProcessingTimeHistogram.Record(ctx, time.Since(startTime).Milliseconds())
|
||||||
|
|
||||||
return tp.nextTracesConsumer.ConsumeTraces(ctx, td)
|
return tp.nextTracesConsumer.ConsumeTraces(ctx, td)
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Upon receiving a log, check if the log's trace id matches any trace id in the buffer.
|
* Upon receiving logs, check if each log's trace id matches any trace id in the buffer.
|
||||||
* If it doesn't, put the log in the buffer for the configured amount of time.
|
* If it doesn't, put the log in the buffer for the configured amount of time.
|
||||||
* If it does, forward the log.
|
* If it does, forward the log.
|
||||||
*
|
*
|
||||||
@@ -112,33 +186,82 @@ func (tp *LogSamplerProcessor) ConsumeTraces(ctx context.Context, td ptrace.Trac
|
|||||||
* If it does, forward the log.
|
* If it does, forward the log.
|
||||||
* If not, discard the log.
|
* If not, discard the log.
|
||||||
*/
|
*/
|
||||||
func (tp *LogSamplerProcessor) ConsumeLogs(ctx context.Context, log plog.Logs) error {
|
func (tp *LogSamplerProcessor) ConsumeLogs(ctx context.Context, logs plog.Logs) error {
|
||||||
traceId := log.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).TraceID()
|
startTime := time.Now()
|
||||||
|
logsProcessedCounter.Add(ctx, 1)
|
||||||
|
|
||||||
if !traceId.IsEmpty() {
|
// Rough estimation of the number of unique trace ids in the logs as the number of logs itself.
|
||||||
exists := tp.traceIdTtlMap.Exists(hex.EncodeToString(traceId[:]))
|
// It may be an option to make this configurable or use runtime metrics to zero in on the right size.
|
||||||
if exists {
|
logsByTraceId := make(map[string]plog.Logs, logs.LogRecordCount())
|
||||||
tp.logger.Debug("Log forwarded directly", zap.String("traceId", hex.EncodeToString(traceId[:])))
|
|
||||||
return tp.nextLogsConsumer.ConsumeLogs(ctx, log)
|
for i := range logs.ResourceLogs().Len() {
|
||||||
|
resourceLogs := logs.ResourceLogs().At(i)
|
||||||
|
for j := range resourceLogs.ScopeLogs().Len() {
|
||||||
|
scopeLogs := resourceLogs.ScopeLogs().At(j)
|
||||||
|
for k := range scopeLogs.LogRecords().Len() {
|
||||||
|
logRecord := scopeLogs.LogRecords().At(k)
|
||||||
|
traceId := logRecord.TraceID()
|
||||||
|
|
||||||
|
if !traceId.IsEmpty() {
|
||||||
|
traceIdStr := hex.EncodeToString(traceId[:])
|
||||||
|
|
||||||
|
batch, exists := logsByTraceId[traceIdStr]
|
||||||
|
if !exists {
|
||||||
|
batch = plog.NewLogs()
|
||||||
|
logsByTraceId[traceIdStr] = batch
|
||||||
|
}
|
||||||
|
|
||||||
|
batchResourceLogs := batch.ResourceLogs().AppendEmpty()
|
||||||
|
resourceLogs.Resource().CopyTo(batchResourceLogs.Resource())
|
||||||
|
batchScopeLogs := batchResourceLogs.ScopeLogs().AppendEmpty()
|
||||||
|
scopeLogs.Scope().CopyTo(batchScopeLogs.Scope())
|
||||||
|
newRecord := batchScopeLogs.LogRecords().AppendEmpty()
|
||||||
|
logRecord.CopyTo(newRecord)
|
||||||
|
} else {
|
||||||
|
emptyTraceIdsCounter.Add(ctx, 1)
|
||||||
|
tp.logger.Warn("Log has no trace id", zap.Any("log", logs))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for traceIdStr, batch := range logsByTraceId {
|
||||||
|
ttl, ok := tp.traceIdTtlMap.Get(traceIdStr)
|
||||||
|
if ok && ttl > 0 {
|
||||||
|
// The tll value is the deletion time. So the traceId was initially stored at the ttl time - buffer duration.
|
||||||
|
bufferDuration, _ := time.ParseDuration(tp.config.BufferDurationTraces)
|
||||||
|
traceAge := time.Since(time.Unix(ttl, 0).Add(-bufferDuration))
|
||||||
|
traceAgeAtLogCheckHistogram.Record(ctx, traceAge.Milliseconds())
|
||||||
|
|
||||||
|
tp.logger.Debug("Logs forwarded directly", zap.String("traceId", traceIdStr))
|
||||||
|
logsForwardedCounter.Add(ctx, 1)
|
||||||
|
tp.nextLogsConsumer.ConsumeLogs(ctx, batch)
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
go func(ctx context.Context, log plog.Logs) {
|
go func(ctx context.Context, traceIdStr string, batch plog.Logs) {
|
||||||
tp.logger.Debug("Log added to buffer", zap.String("traceId", hex.EncodeToString(traceId[:])))
|
tp.logger.Debug("Logs added to buffer", zap.String("traceId", traceIdStr))
|
||||||
|
|
||||||
duration, _ := time.ParseDuration(tp.config.BufferDurationLogs)
|
duration, _ := time.ParseDuration(tp.config.BufferDurationLogs)
|
||||||
time.Sleep(duration)
|
time.Sleep(duration)
|
||||||
|
|
||||||
exists := tp.traceIdTtlMap.Exists(hex.EncodeToString(traceId[:]))
|
ttl, ok := tp.traceIdTtlMap.Get(traceIdStr)
|
||||||
if exists {
|
if ok && ttl > 0 {
|
||||||
tp.logger.Debug("Log forwarded after buffer expiration", zap.String("traceId", hex.EncodeToString(traceId[:])))
|
// The tll value is the deletion time. So the traceId was initially stored at the ttl time - buffer duration.
|
||||||
tp.nextLogsConsumer.ConsumeLogs(ctx, log)
|
bufferDuration, _ := time.ParseDuration(tp.config.BufferDurationTraces)
|
||||||
|
traceAge := time.Since(time.Unix(ttl, 0).Add(-bufferDuration))
|
||||||
|
|
||||||
|
tp.logger.Debug("Logs forwarded after buffer expiration", zap.String("traceId", traceIdStr))
|
||||||
|
logsForwardedCounter.Add(ctx, 1)
|
||||||
|
traceAgeAtLogCheckHistogram.Record(ctx, traceAge.Milliseconds())
|
||||||
|
tp.nextLogsConsumer.ConsumeLogs(ctx, batch)
|
||||||
} else {
|
} else {
|
||||||
tp.logger.Debug("Log discarded", zap.String("traceId", hex.EncodeToString(traceId[:])))
|
tp.logger.Debug("Logs discarded", zap.String("traceId", traceIdStr))
|
||||||
|
logsDiscardedCounter.Add(ctx, 1)
|
||||||
}
|
}
|
||||||
}(ctx, log)
|
}(ctx, traceIdStr, batch)
|
||||||
} else {
|
|
||||||
tp.logger.Warn("Log has no trace id", zap.Any("log", log))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
logProcessingTimeHistogram.Record(ctx, time.Since(startTime).Milliseconds())
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -2,6 +2,8 @@ package tracebasedlogsampler
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"encoding/hex"
|
||||||
|
"fmt"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@@ -128,6 +130,254 @@ func TestConsumeTraces_AddsToBuffer(t *testing.T) {
|
|||||||
mockLog.AssertCalled(t, "ConsumeLogs", mock.Anything)
|
mockLog.AssertCalled(t, "ConsumeLogs", mock.Anything)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestConsumeTraces_AddsBatchedTracesToBuffer(t *testing.T) {
|
||||||
|
traceID1 := [16]byte{0xde, 0xad, 0xbe, 0xef}
|
||||||
|
traceID2 := [16]byte{0xba, 0xad, 0xf0, 0x0d}
|
||||||
|
traceID3 := [16]byte{0xca, 0xfe, 0xba, 0xbe}
|
||||||
|
|
||||||
|
mockTrace := new(mockTraceConsumer)
|
||||||
|
mockTrace.On("ConsumeTraces", mock.Anything).Return(nil)
|
||||||
|
|
||||||
|
mockLog := new(mockLogConsumer)
|
||||||
|
mockLog.On("ConsumeLogs", mock.Anything).Return(nil)
|
||||||
|
|
||||||
|
cfg := &Config{
|
||||||
|
BufferDurationTraces: "1s",
|
||||||
|
BufferDurationLogs: "300ms",
|
||||||
|
}
|
||||||
|
|
||||||
|
processor := NewLogSamplerProcessorSingleton(component.NewIDWithName(componentType, "addsBatchedTracesToBuffer"), zap.NewNop(), mockTrace, mockLog, cfg)
|
||||||
|
defer processor.Shutdown(context.Background())
|
||||||
|
|
||||||
|
td := ptrace.NewTraces()
|
||||||
|
|
||||||
|
resourceSpans1 := td.ResourceSpans().AppendEmpty()
|
||||||
|
scopeSpans1 := resourceSpans1.ScopeSpans().AppendEmpty()
|
||||||
|
span1 := scopeSpans1.Spans().AppendEmpty()
|
||||||
|
span1.SetTraceID(traceID1)
|
||||||
|
span2 := scopeSpans1.Spans().AppendEmpty()
|
||||||
|
span2.SetTraceID(traceID2)
|
||||||
|
|
||||||
|
resourceSpans2 := td.ResourceSpans().AppendEmpty()
|
||||||
|
scopeSpans2 := resourceSpans2.ScopeSpans().AppendEmpty()
|
||||||
|
span3 := scopeSpans2.Spans().AppendEmpty()
|
||||||
|
span3.SetTraceID(traceID3)
|
||||||
|
|
||||||
|
err := processor.ConsumeTraces(context.Background(), td)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
assert.True(t, processor.traceIdTtlMap.Exists(hex.EncodeToString(traceID1[:])))
|
||||||
|
assert.True(t, processor.traceIdTtlMap.Exists(hex.EncodeToString(traceID2[:])))
|
||||||
|
assert.True(t, processor.traceIdTtlMap.Exists(hex.EncodeToString(traceID3[:])))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestConsumeLogs_BatchesLogsWithSameTraceId(t *testing.T) {
|
||||||
|
traceID := [16]byte{0xde, 0xad, 0xbe, 0xef}
|
||||||
|
|
||||||
|
mockTrace := new(mockTraceConsumer)
|
||||||
|
mockTrace.On("ConsumeTraces", mock.Anything).Return(nil)
|
||||||
|
|
||||||
|
mockLog := new(mockLogConsumer)
|
||||||
|
mockLog.On("ConsumeLogs", mock.Anything).Return(nil)
|
||||||
|
|
||||||
|
cfg := &Config{
|
||||||
|
BufferDurationTraces: "1s",
|
||||||
|
BufferDurationLogs: "300ms",
|
||||||
|
}
|
||||||
|
|
||||||
|
processor := NewLogSamplerProcessorSingleton(component.NewIDWithName(componentType, "batchesLogsWithSameTraceId"), zap.NewNop(), mockTrace, mockLog, cfg)
|
||||||
|
defer processor.Shutdown(context.Background())
|
||||||
|
|
||||||
|
logs := plog.NewLogs()
|
||||||
|
resourceLogs := logs.ResourceLogs().AppendEmpty()
|
||||||
|
scopeLogs := resourceLogs.ScopeLogs().AppendEmpty()
|
||||||
|
|
||||||
|
for i := 0; i < 3; i++ {
|
||||||
|
logRecord := scopeLogs.LogRecords().AppendEmpty()
|
||||||
|
logRecord.SetTraceID(traceID)
|
||||||
|
logRecord.Body().SetStr(fmt.Sprintf("log %d", i))
|
||||||
|
}
|
||||||
|
|
||||||
|
td := generateTestTrace(traceID)
|
||||||
|
err := processor.ConsumeTraces(context.Background(), td)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
err = processor.ConsumeLogs(context.Background(), logs)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
mockLog.AssertNumberOfCalls(t, "ConsumeLogs", 1)
|
||||||
|
forwardedLogs := mockLog.logs[0]
|
||||||
|
assert.Equal(t, 3, forwardedLogs.LogRecordCount())
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestConsumeLogs_BatchesLogsWithDifferentTraceIds(t *testing.T) {
|
||||||
|
traceID1 := [16]byte{0xde, 0xad, 0xbe, 0xef}
|
||||||
|
traceID2 := [16]byte{0xba, 0xad, 0xf0, 0x0d}
|
||||||
|
|
||||||
|
mockTrace := new(mockTraceConsumer)
|
||||||
|
mockTrace.On("ConsumeTraces", mock.Anything).Return(nil)
|
||||||
|
|
||||||
|
mockLog := new(mockLogConsumer)
|
||||||
|
mockLog.On("ConsumeLogs", mock.Anything).Return(nil)
|
||||||
|
|
||||||
|
cfg := &Config{
|
||||||
|
BufferDurationTraces: "1s",
|
||||||
|
BufferDurationLogs: "300ms",
|
||||||
|
}
|
||||||
|
|
||||||
|
processor := NewLogSamplerProcessorSingleton(component.NewIDWithName(componentType, "batchesLogsWithDifferentTraceIds"), zap.NewNop(), mockTrace, mockLog, cfg)
|
||||||
|
defer processor.Shutdown(context.Background())
|
||||||
|
|
||||||
|
logs := plog.NewLogs()
|
||||||
|
resourceLogs := logs.ResourceLogs().AppendEmpty()
|
||||||
|
scopeLogs := resourceLogs.ScopeLogs().AppendEmpty()
|
||||||
|
|
||||||
|
for i := range 2 {
|
||||||
|
logRecord := scopeLogs.LogRecords().AppendEmpty()
|
||||||
|
logRecord.SetTraceID(traceID1)
|
||||||
|
logRecord.Body().SetStr(fmt.Sprintf("log1_%d", i))
|
||||||
|
}
|
||||||
|
|
||||||
|
logRecord := scopeLogs.LogRecords().AppendEmpty()
|
||||||
|
logRecord.SetTraceID(traceID2)
|
||||||
|
logRecord.Body().SetStr("log2_0")
|
||||||
|
|
||||||
|
td1 := generateTestTrace(traceID1)
|
||||||
|
err := processor.ConsumeTraces(context.Background(), td1)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
td2 := generateTestTrace(traceID2)
|
||||||
|
err = processor.ConsumeTraces(context.Background(), td2)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
err = processor.ConsumeLogs(context.Background(), logs)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
mockLog.AssertNumberOfCalls(t, "ConsumeLogs", 2)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestConsumeLogs_PreservesIntegrity(t *testing.T) {
|
||||||
|
/*
|
||||||
|
Input structure:
|
||||||
|
resource1 (service1)
|
||||||
|
└── scope1
|
||||||
|
├── log1 (traceId1)
|
||||||
|
└── log2 (traceId2)
|
||||||
|
resource2 (service2)
|
||||||
|
└── scope1
|
||||||
|
└── log3 (traceId1)
|
||||||
|
|
||||||
|
Expected output structure (batched by traceId):
|
||||||
|
traceId1 batch:
|
||||||
|
resource1 (service1)
|
||||||
|
└── scope1
|
||||||
|
└── log1
|
||||||
|
resource2 (service2)
|
||||||
|
└── scope1
|
||||||
|
└── log3
|
||||||
|
traceId2 batch:
|
||||||
|
resource1 (service1)
|
||||||
|
└── scope1
|
||||||
|
└── log2
|
||||||
|
*/
|
||||||
|
|
||||||
|
traceID1 := [16]byte{0xde, 0xad, 0xbe, 0xef}
|
||||||
|
traceID2 := [16]byte{0xba, 0xad, 0xf0, 0x0d}
|
||||||
|
|
||||||
|
mockTrace := new(mockTraceConsumer)
|
||||||
|
mockTrace.On("ConsumeTraces", mock.Anything).Return(nil)
|
||||||
|
|
||||||
|
mockLog := new(mockLogConsumer)
|
||||||
|
mockLog.On("ConsumeLogs", mock.Anything).Return(nil)
|
||||||
|
|
||||||
|
cfg := &Config{
|
||||||
|
BufferDurationTraces: "1s",
|
||||||
|
BufferDurationLogs: "300ms",
|
||||||
|
}
|
||||||
|
|
||||||
|
processor := NewLogSamplerProcessorSingleton(component.NewIDWithName(componentType, "preservesIntegrity"), zap.NewNop(), mockTrace, mockLog, cfg)
|
||||||
|
defer processor.Shutdown(context.Background())
|
||||||
|
|
||||||
|
logs := plog.NewLogs()
|
||||||
|
|
||||||
|
resourceLogs1 := logs.ResourceLogs().AppendEmpty()
|
||||||
|
resourceLogs1.Resource().Attributes().PutStr("service.name", "service1")
|
||||||
|
scopeLogs1 := resourceLogs1.ScopeLogs().AppendEmpty()
|
||||||
|
scopeLogs1.Scope().SetName("scope1")
|
||||||
|
|
||||||
|
log1 := scopeLogs1.LogRecords().AppendEmpty()
|
||||||
|
log1.SetTraceID(traceID1)
|
||||||
|
log1.Body().SetStr("log1")
|
||||||
|
log2 := scopeLogs1.LogRecords().AppendEmpty()
|
||||||
|
log2.SetTraceID(traceID2)
|
||||||
|
log2.Body().SetStr("log2")
|
||||||
|
|
||||||
|
resourceLogs2 := logs.ResourceLogs().AppendEmpty()
|
||||||
|
resourceLogs2.Resource().Attributes().PutStr("service.name", "service2")
|
||||||
|
scopeLogs2 := resourceLogs2.ScopeLogs().AppendEmpty()
|
||||||
|
scopeLogs2.Scope().SetName("scope1")
|
||||||
|
log3 := scopeLogs2.LogRecords().AppendEmpty()
|
||||||
|
log3.SetTraceID(traceID1)
|
||||||
|
log3.Body().SetStr("log3")
|
||||||
|
|
||||||
|
for _, traceID := range [][16]byte{traceID1, traceID2} {
|
||||||
|
td := generateTestTrace(traceID)
|
||||||
|
err := processor.ConsumeTraces(context.Background(), td)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
err := processor.ConsumeLogs(context.Background(), logs)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
mockLog.AssertNumberOfCalls(t, "ConsumeLogs", 2)
|
||||||
|
|
||||||
|
var traceID1Batch, traceID2Batch plog.Logs
|
||||||
|
for _, batch := range mockLog.logs {
|
||||||
|
if batch.LogRecordCount() > 0 {
|
||||||
|
record := batch.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0)
|
||||||
|
traceID := record.TraceID()
|
||||||
|
if hex.EncodeToString(traceID[:]) == hex.EncodeToString(traceID1[:]) {
|
||||||
|
traceID1Batch = batch
|
||||||
|
} else {
|
||||||
|
traceID2Batch = batch
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
assert.Equal(t, 2, traceID1Batch.LogRecordCount(), "traceID1 batch should have 2 logs")
|
||||||
|
assert.Equal(t, 2, traceID1Batch.ResourceLogs().Len(), "traceID1 batch should have 2 resources")
|
||||||
|
|
||||||
|
resourceLogs1 = traceID1Batch.ResourceLogs().At(0)
|
||||||
|
val1, exists := resourceLogs1.Resource().Attributes().Get("service.name")
|
||||||
|
assert.True(t, exists, "service.name attribute should exist for first resource")
|
||||||
|
assert.Equal(t, "service1", val1.Str(), "first resource should have service1")
|
||||||
|
|
||||||
|
scopeLogs1 = resourceLogs1.ScopeLogs().At(0)
|
||||||
|
assert.Equal(t, "scope1", scopeLogs1.Scope().Name(), "first scope name should be preserved")
|
||||||
|
assert.Equal(t, "log1", scopeLogs1.LogRecords().At(0).Body().Str(), "first log body should be preserved")
|
||||||
|
|
||||||
|
resourceLogs2 = traceID1Batch.ResourceLogs().At(1)
|
||||||
|
val2, exists := resourceLogs2.Resource().Attributes().Get("service.name")
|
||||||
|
assert.True(t, exists, "service.name attribute should exist for second resource")
|
||||||
|
assert.Equal(t, "service2", val2.Str(), "second resource should have service2")
|
||||||
|
|
||||||
|
scopeLogs2 = resourceLogs2.ScopeLogs().At(0)
|
||||||
|
assert.Equal(t, "scope1", scopeLogs2.Scope().Name(), "second scope name should be preserved")
|
||||||
|
assert.Equal(t, "log3", scopeLogs2.LogRecords().At(0).Body().Str(), "second log body should be preserved")
|
||||||
|
|
||||||
|
assert.Equal(t, 1, traceID2Batch.LogRecordCount(), "traceID2 batch should have 1 log")
|
||||||
|
assert.Equal(t, 1, traceID2Batch.ResourceLogs().Len(), "traceID2 batch should have 1 resource")
|
||||||
|
|
||||||
|
resourceLogs := traceID2Batch.ResourceLogs().At(0)
|
||||||
|
val, exists := resourceLogs.Resource().Attributes().Get("service.name")
|
||||||
|
assert.True(t, exists, "service.name attribute should exist")
|
||||||
|
assert.Equal(t, "service1", val.Str(), "should have service1")
|
||||||
|
|
||||||
|
scopeLogs := resourceLogs.ScopeLogs().At(0)
|
||||||
|
assert.Equal(t, "scope1", scopeLogs.Scope().Name(), "scope name should be preserved")
|
||||||
|
assert.Equal(t, "log2", scopeLogs.LogRecords().At(0).Body().Str(), "log body should be preserved")
|
||||||
|
}
|
||||||
|
|
||||||
func TestConsumeLogs_LogIsDroppedIfNoTraceAppears(t *testing.T) {
|
func TestConsumeLogs_LogIsDroppedIfNoTraceAppears(t *testing.T) {
|
||||||
traceID := [16]byte{0xaa, 0xbb, 0xcc}
|
traceID := [16]byte{0xaa, 0xbb, 0xcc}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user