268 lines
8.3 KiB
Go
268 lines
8.3 KiB
Go
package tracebasedlogsampler
|
|
|
|
import (
|
|
"context"
|
|
"encoding/hex"
|
|
"sync"
|
|
"time"
|
|
|
|
traceIdTtlMap "gitea.t000-n.de/t.behrendt/tracebasedlogsampler/internals/traceIdTtlMap"
|
|
|
|
"go.opentelemetry.io/collector/component"
|
|
"go.opentelemetry.io/collector/consumer"
|
|
"go.opentelemetry.io/collector/pdata/plog"
|
|
"go.opentelemetry.io/collector/pdata/ptrace"
|
|
"go.opentelemetry.io/otel"
|
|
"go.opentelemetry.io/otel/metric"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
var (
|
|
meter = otel.GetMeterProvider().Meter("tracebasedlogsampler")
|
|
|
|
tracesProcessedCounter, _ = meter.Int64Counter(
|
|
"otelcol_processor_tracebasedlogsampler_traces_processed",
|
|
metric.WithDescription("Number of traces processed"),
|
|
metric.WithUnit("1"),
|
|
)
|
|
|
|
emptyTraceIdsCounter, _ = meter.Int64Counter(
|
|
"otelcol_processor_tracebasedlogsampler_traces_empty_trace_ids",
|
|
metric.WithDescription("Number of spans with empty trace IDs"),
|
|
metric.WithUnit("1"),
|
|
)
|
|
|
|
traceProcessingTimeHistogram, _ = meter.Int64Histogram(
|
|
"otelcol_processor_tracebasedlogsampler_traces_processing_time",
|
|
metric.WithDescription("Time spent processing traces"),
|
|
metric.WithUnit("ms"),
|
|
)
|
|
|
|
logsProcessedCounter, _ = meter.Int64Counter(
|
|
"otelcol_processor_tracebasedlogsampler_logs_processed",
|
|
metric.WithDescription("Number of log batches processed"),
|
|
metric.WithUnit("1"),
|
|
)
|
|
|
|
logsForwardedCounter, _ = meter.Int64Counter(
|
|
"otelcol_processor_tracebasedlogsampler_logs_forwarded",
|
|
metric.WithDescription("Number of log batches forwarded"),
|
|
metric.WithUnit("1"),
|
|
)
|
|
|
|
logsDiscardedCounter, _ = meter.Int64Counter(
|
|
"otelcol_processor_tracebasedlogsampler_logs_discarded",
|
|
metric.WithDescription("Number of log batches discarded"),
|
|
metric.WithUnit("1"),
|
|
)
|
|
|
|
logProcessingTimeHistogram, _ = meter.Int64Histogram(
|
|
"otelcol_processor_tracebasedlogsampler_logs_processing_time",
|
|
metric.WithDescription("Time spent processing logs"),
|
|
metric.WithUnit("ms"),
|
|
)
|
|
|
|
traceAgeAtLogCheckHistogram, _ = meter.Int64Histogram(
|
|
"otelcol_processor_tracebasedlogsampler_logs_trace_age_at_log_check",
|
|
metric.WithDescription("Age of trace when checking logs after buffer expiration"),
|
|
metric.WithUnit("ms"),
|
|
)
|
|
)
|
|
|
|
type LogSamplerProcessor struct {
|
|
host component.Host
|
|
cancel context.CancelFunc
|
|
logger *zap.Logger
|
|
nextTracesConsumer consumer.Traces
|
|
nextLogsConsumer consumer.Logs
|
|
config *Config
|
|
|
|
traceIdTtlMap *traceIdTtlMap.TTLMap
|
|
}
|
|
|
|
var logSampleProcessorLock = sync.Mutex{}
|
|
var logSampleProcessor = make(map[component.ID]*LogSamplerProcessor)
|
|
|
|
/**
|
|
* Creates a new LogSamplerProcessor as a singleton.
|
|
*/
|
|
func NewLogSamplerProcessorSingleton(id component.ID, logger *zap.Logger, nextTracesConsumer consumer.Traces, nextLogsConsumer consumer.Logs, cfg *Config) *LogSamplerProcessor {
|
|
maxTraceTtl, _ := time.ParseDuration(cfg.BufferDurationTraces)
|
|
|
|
logSampleProcessorLock.Lock()
|
|
defer logSampleProcessorLock.Unlock()
|
|
|
|
if existing, ok := logSampleProcessor[id]; ok {
|
|
if nextTracesConsumer != nil {
|
|
existing.nextTracesConsumer = nextTracesConsumer
|
|
}
|
|
|
|
if nextLogsConsumer != nil {
|
|
existing.nextLogsConsumer = nextLogsConsumer
|
|
}
|
|
|
|
return existing
|
|
}
|
|
|
|
p := &LogSamplerProcessor{
|
|
logger: logger,
|
|
nextTracesConsumer: nextTracesConsumer,
|
|
nextLogsConsumer: nextLogsConsumer,
|
|
config: cfg,
|
|
traceIdTtlMap: traceIdTtlMap.New(1000, int(maxTraceTtl.Seconds())),
|
|
}
|
|
logSampleProcessor[id] = p
|
|
|
|
return p
|
|
}
|
|
|
|
func (tp *LogSamplerProcessor) Start(ctx context.Context, host component.Host) error {
|
|
tp.host = host
|
|
ctx = context.Background()
|
|
_, tp.cancel = context.WithCancel(ctx)
|
|
|
|
tp.logger.Debug("LogSamplerProcessor started")
|
|
|
|
return nil
|
|
}
|
|
|
|
func (tp *LogSamplerProcessor) Shutdown(ctx context.Context) error {
|
|
if tp.cancel != nil {
|
|
tp.cancel()
|
|
}
|
|
|
|
tp.traceIdTtlMap.Stop()
|
|
|
|
tp.logger.Debug("LogSamplerProcessor stopped")
|
|
|
|
return nil
|
|
}
|
|
|
|
func (tp *LogSamplerProcessor) Capabilities() consumer.Capabilities {
|
|
return consumer.Capabilities{}
|
|
}
|
|
|
|
/**
|
|
* Every trace's trace id that is processed by the processor is being remembered.
|
|
*/
|
|
func (tp *LogSamplerProcessor) ConsumeTraces(ctx context.Context, td ptrace.Traces) error {
|
|
startTime := time.Now()
|
|
tracesProcessedCounter.Add(ctx, 1)
|
|
|
|
for i := range td.ResourceSpans().Len() {
|
|
resourceSpans := td.ResourceSpans().At(i)
|
|
|
|
for j := range resourceSpans.ScopeSpans().Len() {
|
|
scopeSpans := resourceSpans.ScopeSpans().At(j)
|
|
|
|
for k := range scopeSpans.Spans().Len() {
|
|
span := scopeSpans.Spans().At(k)
|
|
traceId := span.TraceID()
|
|
|
|
if !traceId.IsEmpty() {
|
|
traceIdStr := hex.EncodeToString(traceId[:])
|
|
tp.traceIdTtlMap.Add(traceIdStr)
|
|
|
|
tp.logger.Debug("Trace added to buffer", zap.String("traceId", traceIdStr))
|
|
} else {
|
|
emptyTraceIdsCounter.Add(ctx, 1)
|
|
tp.logger.Warn("Empty trace ID encountered")
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
traceProcessingTimeHistogram.Record(ctx, time.Since(startTime).Milliseconds())
|
|
|
|
return tp.nextTracesConsumer.ConsumeTraces(ctx, td)
|
|
}
|
|
|
|
/**
|
|
* Upon receiving logs, check if each log's trace id matches any trace id in the buffer.
|
|
* If it doesn't, put the log in the buffer for the configured amount of time.
|
|
* If it does, forward the log.
|
|
*
|
|
* After the buffer expires, check again if the log's trace id matches any trace id in the buffer.
|
|
* If it does, forward the log.
|
|
* If not, discard the log.
|
|
*/
|
|
func (tp *LogSamplerProcessor) ConsumeLogs(ctx context.Context, logs plog.Logs) error {
|
|
startTime := time.Now()
|
|
logsProcessedCounter.Add(ctx, 1)
|
|
|
|
// Rough estimation of the number of unique trace ids in the logs as the number of logs itself.
|
|
// It may be an option to make this configurable or use runtime metrics to zero in on the right size.
|
|
logsByTraceId := make(map[string]plog.Logs, logs.LogRecordCount())
|
|
|
|
for i := range logs.ResourceLogs().Len() {
|
|
resourceLogs := logs.ResourceLogs().At(i)
|
|
for j := range resourceLogs.ScopeLogs().Len() {
|
|
scopeLogs := resourceLogs.ScopeLogs().At(j)
|
|
for k := range scopeLogs.LogRecords().Len() {
|
|
logRecord := scopeLogs.LogRecords().At(k)
|
|
traceId := logRecord.TraceID()
|
|
|
|
if !traceId.IsEmpty() {
|
|
traceIdStr := hex.EncodeToString(traceId[:])
|
|
|
|
batch, exists := logsByTraceId[traceIdStr]
|
|
if !exists {
|
|
batch = plog.NewLogs()
|
|
logsByTraceId[traceIdStr] = batch
|
|
}
|
|
|
|
batchResourceLogs := batch.ResourceLogs().AppendEmpty()
|
|
resourceLogs.Resource().CopyTo(batchResourceLogs.Resource())
|
|
batchScopeLogs := batchResourceLogs.ScopeLogs().AppendEmpty()
|
|
scopeLogs.Scope().CopyTo(batchScopeLogs.Scope())
|
|
newRecord := batchScopeLogs.LogRecords().AppendEmpty()
|
|
logRecord.CopyTo(newRecord)
|
|
} else {
|
|
emptyTraceIdsCounter.Add(ctx, 1)
|
|
tp.logger.Warn("Log has no trace id", zap.Any("log", logs))
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
for traceIdStr, batch := range logsByTraceId {
|
|
ttl, ok := tp.traceIdTtlMap.Get(traceIdStr)
|
|
if ok && ttl > 0 {
|
|
// The tll value is the deletion time. So the traceId was initially stored at the ttl time - buffer duration.
|
|
bufferDuration, _ := time.ParseDuration(tp.config.BufferDurationTraces)
|
|
traceAge := time.Since(time.Unix(ttl, 0).Add(-bufferDuration))
|
|
traceAgeAtLogCheckHistogram.Record(ctx, traceAge.Milliseconds())
|
|
|
|
tp.logger.Debug("Logs forwarded directly", zap.String("traceId", traceIdStr))
|
|
logsForwardedCounter.Add(ctx, 1)
|
|
tp.nextLogsConsumer.ConsumeLogs(ctx, batch)
|
|
continue
|
|
}
|
|
|
|
go func(ctx context.Context, traceIdStr string, batch plog.Logs) {
|
|
tp.logger.Debug("Logs added to buffer", zap.String("traceId", traceIdStr))
|
|
duration, _ := time.ParseDuration(tp.config.BufferDurationLogs)
|
|
time.Sleep(duration)
|
|
|
|
ttl, ok := tp.traceIdTtlMap.Get(traceIdStr)
|
|
if ok && ttl > 0 {
|
|
// The tll value is the deletion time. So the traceId was initially stored at the ttl time - buffer duration.
|
|
bufferDuration, _ := time.ParseDuration(tp.config.BufferDurationTraces)
|
|
traceAge := time.Since(time.Unix(ttl, 0).Add(-bufferDuration))
|
|
|
|
tp.logger.Debug("Logs forwarded after buffer expiration", zap.String("traceId", traceIdStr))
|
|
logsForwardedCounter.Add(ctx, 1)
|
|
traceAgeAtLogCheckHistogram.Record(ctx, traceAge.Milliseconds())
|
|
tp.nextLogsConsumer.ConsumeLogs(ctx, batch)
|
|
} else {
|
|
tp.logger.Debug("Logs discarded", zap.String("traceId", traceIdStr))
|
|
logsDiscardedCounter.Add(ctx, 1)
|
|
}
|
|
}(ctx, traceIdStr, batch)
|
|
}
|
|
|
|
logProcessingTimeHistogram.Record(ctx, time.Since(startTime).Milliseconds())
|
|
|
|
return nil
|
|
}
|