diff --git a/internals/traceIdTtlMap/traceIdTtlMap.go b/internals/traceIdTtlMap/traceIdTtlMap.go index 565b9d4..cdc817b 100644 --- a/internals/traceIdTtlMap/traceIdTtlMap.go +++ b/internals/traceIdTtlMap/traceIdTtlMap.go @@ -90,19 +90,7 @@ func (m *TTLMap) Add(key string) { * Removes the trace id from the map if it has expired. */ func (m *TTLMap) Exists(key string) bool { - m.mu.RLock() - value, ok := m.m[key] - m.mu.RUnlock() - - if ok { - if value < time.Now().Unix() { - m.mu.Lock() - delete(m.m, key) - m.mu.Unlock() - return false - } - } - + _, ok := m.Get(key) return ok } @@ -116,6 +104,26 @@ func (m *TTLMap) insertEntry(key string, value int64) { m.mu.Unlock() } +/** + * Gets an entry from the map. + */ +func (m *TTLMap) Get(key string) (int64, bool) { + m.mu.RLock() + value, ok := m.m[key] + m.mu.RUnlock() + + if ok { + if value < time.Now().Unix() { + m.mu.Lock() + delete(m.m, key) + m.mu.Unlock() + return 0, false + } + } + + return value, ok +} + /** * Gets an entry from the map. * Only used for testing. @@ -124,7 +132,6 @@ func (m *TTLMap) getEntry(key string) (int64, bool) { m.mu.RLock() value, ok := m.m[key] m.mu.RUnlock() - return value, ok } diff --git a/processor.go b/processor.go index 18cfaa2..ff358fd 100644 --- a/processor.go +++ b/processor.go @@ -12,9 +12,63 @@ import ( "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/ptrace" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/metric" "go.uber.org/zap" ) +var ( + meter = otel.GetMeterProvider().Meter("tracebasedlogsampler") + + tracesProcessedCounter, _ = meter.Int64Counter( + "otelcol_processor_tracebasedlogsampler_traces_processed", + metric.WithDescription("Number of traces processed"), + metric.WithUnit("1"), + ) + + emptyTraceIdsCounter, _ = meter.Int64Counter( + "otelcol_processor_tracebasedlogsampler_traces_empty_trace_ids", + metric.WithDescription("Number of spans with empty trace IDs"), + metric.WithUnit("1"), + ) + + traceProcessingTimeHistogram, _ = meter.Int64Histogram( + "otelcol_processor_tracebasedlogsampler_traces_processing_time", + metric.WithDescription("Time spent processing traces"), + metric.WithUnit("ms"), + ) + + logsProcessedCounter, _ = meter.Int64Counter( + "otelcol_processor_tracebasedlogsampler_logs_processed", + metric.WithDescription("Number of log batches processed"), + metric.WithUnit("1"), + ) + + logsForwardedCounter, _ = meter.Int64Counter( + "otelcol_processor_tracebasedlogsampler_logs_forwarded", + metric.WithDescription("Number of log batches forwarded"), + metric.WithUnit("1"), + ) + + logsDiscardedCounter, _ = meter.Int64Counter( + "otelcol_processor_tracebasedlogsampler_logs_discarded", + metric.WithDescription("Number of log batches discarded"), + metric.WithUnit("1"), + ) + + logProcessingTimeHistogram, _ = meter.Int64Histogram( + "otelcol_processor_tracebasedlogsampler_logs_processing_time", + metric.WithDescription("Time spent processing logs"), + metric.WithUnit("ms"), + ) + + traceAgeAtLogCheckHistogram, _ = meter.Int64Histogram( + "otelcol_processor_tracebasedlogsampler_logs_trace_age_at_log_check", + metric.WithDescription("Age of trace when checking logs after buffer expiration"), + metric.WithUnit("ms"), + ) +) + type LogSamplerProcessor struct { host component.Host cancel context.CancelFunc @@ -92,6 +146,9 @@ func (tp *LogSamplerProcessor) Capabilities() consumer.Capabilities { * Every trace's trace id that is processed by the processor is being remembered. */ func (tp *LogSamplerProcessor) ConsumeTraces(ctx context.Context, td ptrace.Traces) error { + startTime := time.Now() + tracesProcessedCounter.Add(ctx, 1) + for i := range td.ResourceSpans().Len() { resourceSpans := td.ResourceSpans().At(i) @@ -107,11 +164,16 @@ func (tp *LogSamplerProcessor) ConsumeTraces(ctx context.Context, td ptrace.Trac tp.traceIdTtlMap.Add(traceIdStr) tp.logger.Debug("Trace added to buffer", zap.String("traceId", traceIdStr)) + } else { + emptyTraceIdsCounter.Add(ctx, 1) + tp.logger.Warn("Empty trace ID encountered") } } } } + traceProcessingTimeHistogram.Record(ctx, time.Since(startTime).Milliseconds()) + return tp.nextTracesConsumer.ConsumeTraces(ctx, td) } @@ -125,6 +187,9 @@ func (tp *LogSamplerProcessor) ConsumeTraces(ctx context.Context, td ptrace.Trac * If not, discard the log. */ func (tp *LogSamplerProcessor) ConsumeLogs(ctx context.Context, logs plog.Logs) error { + startTime := time.Now() + logsProcessedCounter.Add(ctx, 1) + // Rough estimation of the number of unique trace ids in the logs as the number of logs itself. // It may be an option to make this configurable or use runtime metrics to zero in on the right size. logsByTraceId := make(map[string]plog.Logs, logs.LogRecordCount()) @@ -153,6 +218,7 @@ func (tp *LogSamplerProcessor) ConsumeLogs(ctx context.Context, logs plog.Logs) newRecord := batchScopeLogs.LogRecords().AppendEmpty() logRecord.CopyTo(newRecord) } else { + emptyTraceIdsCounter.Add(ctx, 1) tp.logger.Warn("Log has no trace id", zap.Any("log", logs)) } } @@ -160,9 +226,15 @@ func (tp *LogSamplerProcessor) ConsumeLogs(ctx context.Context, logs plog.Logs) } for traceIdStr, batch := range logsByTraceId { - exists := tp.traceIdTtlMap.Exists(traceIdStr) - if exists { + ttl, ok := tp.traceIdTtlMap.Get(traceIdStr) + if ok && ttl > 0 { + // The tll value is the deletion time. So the traceId was initially stored at the ttl time - buffer duration. + bufferDuration, _ := time.ParseDuration(tp.config.BufferDurationTraces) + traceAge := time.Since(time.Unix(ttl, 0).Add(-bufferDuration)) + traceAgeAtLogCheckHistogram.Record(ctx, traceAge.Milliseconds()) + tp.logger.Debug("Logs forwarded directly", zap.String("traceId", traceIdStr)) + logsForwardedCounter.Add(ctx, 1) tp.nextLogsConsumer.ConsumeLogs(ctx, batch) continue } @@ -172,15 +244,24 @@ func (tp *LogSamplerProcessor) ConsumeLogs(ctx context.Context, logs plog.Logs) duration, _ := time.ParseDuration(tp.config.BufferDurationLogs) time.Sleep(duration) - exists := tp.traceIdTtlMap.Exists(traceIdStr) - if exists { + ttl, ok := tp.traceIdTtlMap.Get(traceIdStr) + if ok && ttl > 0 { + // The tll value is the deletion time. So the traceId was initially stored at the ttl time - buffer duration. + bufferDuration, _ := time.ParseDuration(tp.config.BufferDurationTraces) + traceAge := time.Since(time.Unix(ttl, 0).Add(-bufferDuration)) + tp.logger.Debug("Logs forwarded after buffer expiration", zap.String("traceId", traceIdStr)) + logsForwardedCounter.Add(ctx, 1) + traceAgeAtLogCheckHistogram.Record(ctx, traceAge.Milliseconds()) tp.nextLogsConsumer.ConsumeLogs(ctx, batch) } else { tp.logger.Debug("Logs discarded", zap.String("traceId", traceIdStr)) + logsDiscardedCounter.Add(ctx, 1) } }(ctx, traceIdStr, batch) } + logProcessingTimeHistogram.Record(ctx, time.Since(startTime).Milliseconds()) + return nil }