|
|
|
|
@@ -12,9 +12,63 @@ import (
|
|
|
|
|
"go.opentelemetry.io/collector/consumer"
|
|
|
|
|
"go.opentelemetry.io/collector/pdata/plog"
|
|
|
|
|
"go.opentelemetry.io/collector/pdata/ptrace"
|
|
|
|
|
"go.opentelemetry.io/otel"
|
|
|
|
|
"go.opentelemetry.io/otel/metric"
|
|
|
|
|
"go.uber.org/zap"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
var (
|
|
|
|
|
meter = otel.GetMeterProvider().Meter("tracebasedlogsampler")
|
|
|
|
|
|
|
|
|
|
tracesProcessedCounter, _ = meter.Int64Counter(
|
|
|
|
|
"otelcol_processor_tracebasedlogsampler_traces_processed",
|
|
|
|
|
metric.WithDescription("Number of traces processed"),
|
|
|
|
|
metric.WithUnit("1"),
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
emptyTraceIdsCounter, _ = meter.Int64Counter(
|
|
|
|
|
"otelcol_processor_tracebasedlogsampler_traces_empty_trace_ids",
|
|
|
|
|
metric.WithDescription("Number of spans with empty trace IDs"),
|
|
|
|
|
metric.WithUnit("1"),
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
traceProcessingTimeHistogram, _ = meter.Int64Histogram(
|
|
|
|
|
"otelcol_processor_tracebasedlogsampler_traces_processing_time",
|
|
|
|
|
metric.WithDescription("Time spent processing traces"),
|
|
|
|
|
metric.WithUnit("ms"),
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
logsProcessedCounter, _ = meter.Int64Counter(
|
|
|
|
|
"otelcol_processor_tracebasedlogsampler_logs_processed",
|
|
|
|
|
metric.WithDescription("Number of log batches processed"),
|
|
|
|
|
metric.WithUnit("1"),
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
logsForwardedCounter, _ = meter.Int64Counter(
|
|
|
|
|
"otelcol_processor_tracebasedlogsampler_logs_forwarded",
|
|
|
|
|
metric.WithDescription("Number of log batches forwarded"),
|
|
|
|
|
metric.WithUnit("1"),
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
logsDiscardedCounter, _ = meter.Int64Counter(
|
|
|
|
|
"otelcol_processor_tracebasedlogsampler_logs_discarded",
|
|
|
|
|
metric.WithDescription("Number of log batches discarded"),
|
|
|
|
|
metric.WithUnit("1"),
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
logProcessingTimeHistogram, _ = meter.Int64Histogram(
|
|
|
|
|
"otelcol_processor_tracebasedlogsampler_logs_processing_time",
|
|
|
|
|
metric.WithDescription("Time spent processing logs"),
|
|
|
|
|
metric.WithUnit("ms"),
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
traceAgeAtLogCheckHistogram, _ = meter.Int64Histogram(
|
|
|
|
|
"otelcol_processor_tracebasedlogsampler_logs_trace_age_at_log_check",
|
|
|
|
|
metric.WithDescription("Age of trace when checking logs after buffer expiration"),
|
|
|
|
|
metric.WithUnit("ms"),
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
type LogSamplerProcessor struct {
|
|
|
|
|
host component.Host
|
|
|
|
|
cancel context.CancelFunc
|
|
|
|
|
@@ -92,6 +146,9 @@ func (tp *LogSamplerProcessor) Capabilities() consumer.Capabilities {
|
|
|
|
|
* Every trace's trace id that is processed by the processor is being remembered.
|
|
|
|
|
*/
|
|
|
|
|
func (tp *LogSamplerProcessor) ConsumeTraces(ctx context.Context, td ptrace.Traces) error {
|
|
|
|
|
startTime := time.Now()
|
|
|
|
|
tracesProcessedCounter.Add(ctx, 1)
|
|
|
|
|
|
|
|
|
|
for i := range td.ResourceSpans().Len() {
|
|
|
|
|
resourceSpans := td.ResourceSpans().At(i)
|
|
|
|
|
|
|
|
|
|
@@ -107,11 +164,16 @@ func (tp *LogSamplerProcessor) ConsumeTraces(ctx context.Context, td ptrace.Trac
|
|
|
|
|
tp.traceIdTtlMap.Add(traceIdStr)
|
|
|
|
|
|
|
|
|
|
tp.logger.Debug("Trace added to buffer", zap.String("traceId", traceIdStr))
|
|
|
|
|
} else {
|
|
|
|
|
emptyTraceIdsCounter.Add(ctx, 1)
|
|
|
|
|
tp.logger.Warn("Empty trace ID encountered")
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
traceProcessingTimeHistogram.Record(ctx, time.Since(startTime).Milliseconds())
|
|
|
|
|
|
|
|
|
|
return tp.nextTracesConsumer.ConsumeTraces(ctx, td)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -125,6 +187,9 @@ func (tp *LogSamplerProcessor) ConsumeTraces(ctx context.Context, td ptrace.Trac
|
|
|
|
|
* If not, discard the log.
|
|
|
|
|
*/
|
|
|
|
|
func (tp *LogSamplerProcessor) ConsumeLogs(ctx context.Context, logs plog.Logs) error {
|
|
|
|
|
startTime := time.Now()
|
|
|
|
|
logsProcessedCounter.Add(ctx, 1)
|
|
|
|
|
|
|
|
|
|
// Rough estimation of the number of unique trace ids in the logs as the number of logs itself.
|
|
|
|
|
// It may be an option to make this configurable or use runtime metrics to zero in on the right size.
|
|
|
|
|
logsByTraceId := make(map[string]plog.Logs, logs.LogRecordCount())
|
|
|
|
|
@@ -153,6 +218,7 @@ func (tp *LogSamplerProcessor) ConsumeLogs(ctx context.Context, logs plog.Logs)
|
|
|
|
|
newRecord := batchScopeLogs.LogRecords().AppendEmpty()
|
|
|
|
|
logRecord.CopyTo(newRecord)
|
|
|
|
|
} else {
|
|
|
|
|
emptyTraceIdsCounter.Add(ctx, 1)
|
|
|
|
|
tp.logger.Warn("Log has no trace id", zap.Any("log", logs))
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
@@ -160,9 +226,15 @@ func (tp *LogSamplerProcessor) ConsumeLogs(ctx context.Context, logs plog.Logs)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for traceIdStr, batch := range logsByTraceId {
|
|
|
|
|
exists := tp.traceIdTtlMap.Exists(traceIdStr)
|
|
|
|
|
if exists {
|
|
|
|
|
ttl, ok := tp.traceIdTtlMap.Get(traceIdStr)
|
|
|
|
|
if ok && ttl > 0 {
|
|
|
|
|
// The tll value is the deletion time. So the traceId was initially stored at the ttl time - buffer duration.
|
|
|
|
|
bufferDuration, _ := time.ParseDuration(tp.config.BufferDurationTraces)
|
|
|
|
|
traceAge := time.Since(time.Unix(ttl, 0).Add(-bufferDuration))
|
|
|
|
|
traceAgeAtLogCheckHistogram.Record(ctx, traceAge.Milliseconds())
|
|
|
|
|
|
|
|
|
|
tp.logger.Debug("Logs forwarded directly", zap.String("traceId", traceIdStr))
|
|
|
|
|
logsForwardedCounter.Add(ctx, 1)
|
|
|
|
|
tp.nextLogsConsumer.ConsumeLogs(ctx, batch)
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
@@ -172,15 +244,24 @@ func (tp *LogSamplerProcessor) ConsumeLogs(ctx context.Context, logs plog.Logs)
|
|
|
|
|
duration, _ := time.ParseDuration(tp.config.BufferDurationLogs)
|
|
|
|
|
time.Sleep(duration)
|
|
|
|
|
|
|
|
|
|
exists := tp.traceIdTtlMap.Exists(traceIdStr)
|
|
|
|
|
if exists {
|
|
|
|
|
ttl, ok := tp.traceIdTtlMap.Get(traceIdStr)
|
|
|
|
|
if ok && ttl > 0 {
|
|
|
|
|
// The tll value is the deletion time. So the traceId was initially stored at the ttl time - buffer duration.
|
|
|
|
|
bufferDuration, _ := time.ParseDuration(tp.config.BufferDurationTraces)
|
|
|
|
|
traceAge := time.Since(time.Unix(ttl, 0).Add(-bufferDuration))
|
|
|
|
|
|
|
|
|
|
tp.logger.Debug("Logs forwarded after buffer expiration", zap.String("traceId", traceIdStr))
|
|
|
|
|
logsForwardedCounter.Add(ctx, 1)
|
|
|
|
|
traceAgeAtLogCheckHistogram.Record(ctx, traceAge.Milliseconds())
|
|
|
|
|
tp.nextLogsConsumer.ConsumeLogs(ctx, batch)
|
|
|
|
|
} else {
|
|
|
|
|
tp.logger.Debug("Logs discarded", zap.String("traceId", traceIdStr))
|
|
|
|
|
logsDiscardedCounter.Add(ctx, 1)
|
|
|
|
|
}
|
|
|
|
|
}(ctx, traceIdStr, batch)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
logProcessingTimeHistogram.Record(ctx, time.Since(startTime).Milliseconds())
|
|
|
|
|
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|