From 73f2d9d460e4f476d973965b264dabf4e16bb8b7 Mon Sep 17 00:00:00 2001 From: Timo Behrendt Date: Mon, 5 May 2025 20:06:51 +0200 Subject: [PATCH] feat: refactor processor singleton to multi singleton (#5) Reviewed-on: https://gitea.t000-n.de/t.behrendt/tracebasedlogsampler/pulls/5 Co-authored-by: Timo Behrendt Co-committed-by: Timo Behrendt --- factory.go | 4 ++-- processor.go | 32 ++++++++++++++++---------------- 2 files changed, 18 insertions(+), 18 deletions(-) diff --git a/factory.go b/factory.go index 97fb3a3..0c122d5 100644 --- a/factory.go +++ b/factory.go @@ -38,7 +38,7 @@ func createTracesProcessor(ctx context.Context, set processor.Settings, cfg comp logger := set.Logger tpCfg := cfg.(*Config) - traceProc := NewLogSamplerProcessorSingleton(logger, nextConsumer, nil, tpCfg) + traceProc := NewLogSamplerProcessorSingleton(set.ID, logger, nextConsumer, nil, tpCfg) return traceProc, nil } @@ -47,7 +47,7 @@ func createLogsProcessor(ctx context.Context, set processor.Settings, cfg compon logger := set.Logger lpCfg := cfg.(*Config) - logProc := NewLogSamplerProcessorSingleton(logger, nil, nextConsumer, lpCfg) + logProc := NewLogSamplerProcessorSingleton(set.ID, logger, nil, nextConsumer, lpCfg) return logProc, nil } diff --git a/processor.go b/processor.go index 08dbf5c..d423680 100644 --- a/processor.go +++ b/processor.go @@ -27,38 +27,39 @@ type LogSamplerProcessor struct { } var logSampleProcessorLock = sync.Mutex{} -var logSampleProcessor *LogSamplerProcessor +var logSampleProcessor = make(map[component.ID]*LogSamplerProcessor) /** * Creates a new LogSamplerProcessor as a singleton. */ -func NewLogSamplerProcessorSingleton(logger *zap.Logger, nextTracesConsumer consumer.Traces, nextLogsConsumer consumer.Logs, cfg *Config) *LogSamplerProcessor { +func NewLogSamplerProcessorSingleton(id component.ID, logger *zap.Logger, nextTracesConsumer consumer.Traces, nextLogsConsumer consumer.Logs, cfg *Config) *LogSamplerProcessor { maxTraceTtl, _ := time.ParseDuration(cfg.BufferDurationTraces) logSampleProcessorLock.Lock() defer logSampleProcessorLock.Unlock() - if logSampleProcessor != nil { + if existing, ok := logSampleProcessor[id]; ok { if nextTracesConsumer != nil { - logSampleProcessor.nextTracesConsumer = nextTracesConsumer + existing.nextTracesConsumer = nextTracesConsumer } if nextLogsConsumer != nil { - logSampleProcessor.nextLogsConsumer = nextLogsConsumer + existing.nextLogsConsumer = nextLogsConsumer } - } else { - logSampleProcessor = &LogSamplerProcessor{ - logger: logger, - nextTracesConsumer: nextTracesConsumer, - nextLogsConsumer: nextLogsConsumer, - config: cfg, - // TODO: Pass size from config as well. - traceIdTtlMap: traceIdTtlMap.New(1000, int(maxTraceTtl.Seconds())), - } + return existing } - return logSampleProcessor + p := &LogSamplerProcessor{ + logger: logger, + nextTracesConsumer: nextTracesConsumer, + nextLogsConsumer: nextLogsConsumer, + config: cfg, + traceIdTtlMap: traceIdTtlMap.New(1000, int(maxTraceTtl.Seconds())), + } + logSampleProcessor[id] = p + + return p } func (tp *LogSamplerProcessor) Start(ctx context.Context, host component.Host) error { @@ -124,7 +125,6 @@ func (tp *LogSamplerProcessor) ConsumeLogs(ctx context.Context, log plog.Logs) e go func(ctx context.Context, log plog.Logs) { tp.logger.Debug("Log added to buffer", zap.String("traceId", hex.EncodeToString(traceId[:]))) - // TODO: Find a better place for the parsed config, instead of using the non-parsed strings. duration, _ := time.ParseDuration(tp.config.BufferDurationLogs) time.Sleep(duration)