From d38a5e7607ebe1a35ec054a401411bc1bbf90a2f Mon Sep 17 00:00:00 2001 From: Timo Behrendt Date: Mon, 19 May 2025 21:19:32 +0200 Subject: [PATCH] add basic internal metrics --- processor.go | 89 +++++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 85 insertions(+), 4 deletions(-) diff --git a/processor.go b/processor.go index 18cfaa2..ff358fd 100644 --- a/processor.go +++ b/processor.go @@ -12,9 +12,63 @@ import ( "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/ptrace" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/metric" "go.uber.org/zap" ) +var ( + meter = otel.GetMeterProvider().Meter("tracebasedlogsampler") + + tracesProcessedCounter, _ = meter.Int64Counter( + "otelcol_processor_tracebasedlogsampler_traces_processed", + metric.WithDescription("Number of traces processed"), + metric.WithUnit("1"), + ) + + emptyTraceIdsCounter, _ = meter.Int64Counter( + "otelcol_processor_tracebasedlogsampler_traces_empty_trace_ids", + metric.WithDescription("Number of spans with empty trace IDs"), + metric.WithUnit("1"), + ) + + traceProcessingTimeHistogram, _ = meter.Int64Histogram( + "otelcol_processor_tracebasedlogsampler_traces_processing_time", + metric.WithDescription("Time spent processing traces"), + metric.WithUnit("ms"), + ) + + logsProcessedCounter, _ = meter.Int64Counter( + "otelcol_processor_tracebasedlogsampler_logs_processed", + metric.WithDescription("Number of log batches processed"), + metric.WithUnit("1"), + ) + + logsForwardedCounter, _ = meter.Int64Counter( + "otelcol_processor_tracebasedlogsampler_logs_forwarded", + metric.WithDescription("Number of log batches forwarded"), + metric.WithUnit("1"), + ) + + logsDiscardedCounter, _ = meter.Int64Counter( + "otelcol_processor_tracebasedlogsampler_logs_discarded", + metric.WithDescription("Number of log batches discarded"), + metric.WithUnit("1"), + ) + + logProcessingTimeHistogram, _ = meter.Int64Histogram( + "otelcol_processor_tracebasedlogsampler_logs_processing_time", + metric.WithDescription("Time spent processing logs"), + metric.WithUnit("ms"), + ) + + traceAgeAtLogCheckHistogram, _ = meter.Int64Histogram( + "otelcol_processor_tracebasedlogsampler_logs_trace_age_at_log_check", + metric.WithDescription("Age of trace when checking logs after buffer expiration"), + metric.WithUnit("ms"), + ) +) + type LogSamplerProcessor struct { host component.Host cancel context.CancelFunc @@ -92,6 +146,9 @@ func (tp *LogSamplerProcessor) Capabilities() consumer.Capabilities { * Every trace's trace id that is processed by the processor is being remembered. */ func (tp *LogSamplerProcessor) ConsumeTraces(ctx context.Context, td ptrace.Traces) error { + startTime := time.Now() + tracesProcessedCounter.Add(ctx, 1) + for i := range td.ResourceSpans().Len() { resourceSpans := td.ResourceSpans().At(i) @@ -107,11 +164,16 @@ func (tp *LogSamplerProcessor) ConsumeTraces(ctx context.Context, td ptrace.Trac tp.traceIdTtlMap.Add(traceIdStr) tp.logger.Debug("Trace added to buffer", zap.String("traceId", traceIdStr)) + } else { + emptyTraceIdsCounter.Add(ctx, 1) + tp.logger.Warn("Empty trace ID encountered") } } } } + traceProcessingTimeHistogram.Record(ctx, time.Since(startTime).Milliseconds()) + return tp.nextTracesConsumer.ConsumeTraces(ctx, td) } @@ -125,6 +187,9 @@ func (tp *LogSamplerProcessor) ConsumeTraces(ctx context.Context, td ptrace.Trac * If not, discard the log. */ func (tp *LogSamplerProcessor) ConsumeLogs(ctx context.Context, logs plog.Logs) error { + startTime := time.Now() + logsProcessedCounter.Add(ctx, 1) + // Rough estimation of the number of unique trace ids in the logs as the number of logs itself. // It may be an option to make this configurable or use runtime metrics to zero in on the right size. logsByTraceId := make(map[string]plog.Logs, logs.LogRecordCount()) @@ -153,6 +218,7 @@ func (tp *LogSamplerProcessor) ConsumeLogs(ctx context.Context, logs plog.Logs) newRecord := batchScopeLogs.LogRecords().AppendEmpty() logRecord.CopyTo(newRecord) } else { + emptyTraceIdsCounter.Add(ctx, 1) tp.logger.Warn("Log has no trace id", zap.Any("log", logs)) } } @@ -160,9 +226,15 @@ func (tp *LogSamplerProcessor) ConsumeLogs(ctx context.Context, logs plog.Logs) } for traceIdStr, batch := range logsByTraceId { - exists := tp.traceIdTtlMap.Exists(traceIdStr) - if exists { + ttl, ok := tp.traceIdTtlMap.Get(traceIdStr) + if ok && ttl > 0 { + // The tll value is the deletion time. So the traceId was initially stored at the ttl time - buffer duration. + bufferDuration, _ := time.ParseDuration(tp.config.BufferDurationTraces) + traceAge := time.Since(time.Unix(ttl, 0).Add(-bufferDuration)) + traceAgeAtLogCheckHistogram.Record(ctx, traceAge.Milliseconds()) + tp.logger.Debug("Logs forwarded directly", zap.String("traceId", traceIdStr)) + logsForwardedCounter.Add(ctx, 1) tp.nextLogsConsumer.ConsumeLogs(ctx, batch) continue } @@ -172,15 +244,24 @@ func (tp *LogSamplerProcessor) ConsumeLogs(ctx context.Context, logs plog.Logs) duration, _ := time.ParseDuration(tp.config.BufferDurationLogs) time.Sleep(duration) - exists := tp.traceIdTtlMap.Exists(traceIdStr) - if exists { + ttl, ok := tp.traceIdTtlMap.Get(traceIdStr) + if ok && ttl > 0 { + // The tll value is the deletion time. So the traceId was initially stored at the ttl time - buffer duration. + bufferDuration, _ := time.ParseDuration(tp.config.BufferDurationTraces) + traceAge := time.Since(time.Unix(ttl, 0).Add(-bufferDuration)) + tp.logger.Debug("Logs forwarded after buffer expiration", zap.String("traceId", traceIdStr)) + logsForwardedCounter.Add(ctx, 1) + traceAgeAtLogCheckHistogram.Record(ctx, traceAge.Milliseconds()) tp.nextLogsConsumer.ConsumeLogs(ctx, batch) } else { tp.logger.Debug("Logs discarded", zap.String("traceId", traceIdStr)) + logsDiscardedCounter.Add(ctx, 1) } }(ctx, traceIdStr, batch) } + logProcessingTimeHistogram.Record(ctx, time.Since(startTime).Milliseconds()) + return nil }