feat: refactor processor singleton to multi singleton (#5)

Reviewed-on: #5
Co-authored-by: Timo Behrendt <t.behrendt@t00n.de>
Co-committed-by: Timo Behrendt <t.behrendt@t00n.de>
This commit was merged in pull request #5.
This commit is contained in:
2025-05-05 20:06:51 +02:00
committed by t.behrendt
parent 4a8a5df6fd
commit 73f2d9d460
2 changed files with 18 additions and 18 deletions

View File

@@ -38,7 +38,7 @@ func createTracesProcessor(ctx context.Context, set processor.Settings, cfg comp
logger := set.Logger logger := set.Logger
tpCfg := cfg.(*Config) tpCfg := cfg.(*Config)
traceProc := NewLogSamplerProcessorSingleton(logger, nextConsumer, nil, tpCfg) traceProc := NewLogSamplerProcessorSingleton(set.ID, logger, nextConsumer, nil, tpCfg)
return traceProc, nil return traceProc, nil
} }
@@ -47,7 +47,7 @@ func createLogsProcessor(ctx context.Context, set processor.Settings, cfg compon
logger := set.Logger logger := set.Logger
lpCfg := cfg.(*Config) lpCfg := cfg.(*Config)
logProc := NewLogSamplerProcessorSingleton(logger, nil, nextConsumer, lpCfg) logProc := NewLogSamplerProcessorSingleton(set.ID, logger, nil, nextConsumer, lpCfg)
return logProc, nil return logProc, nil
} }

View File

@@ -27,38 +27,39 @@ type LogSamplerProcessor struct {
} }
var logSampleProcessorLock = sync.Mutex{} var logSampleProcessorLock = sync.Mutex{}
var logSampleProcessor *LogSamplerProcessor var logSampleProcessor = make(map[component.ID]*LogSamplerProcessor)
/** /**
* Creates a new LogSamplerProcessor as a singleton. * Creates a new LogSamplerProcessor as a singleton.
*/ */
func NewLogSamplerProcessorSingleton(logger *zap.Logger, nextTracesConsumer consumer.Traces, nextLogsConsumer consumer.Logs, cfg *Config) *LogSamplerProcessor { func NewLogSamplerProcessorSingleton(id component.ID, logger *zap.Logger, nextTracesConsumer consumer.Traces, nextLogsConsumer consumer.Logs, cfg *Config) *LogSamplerProcessor {
maxTraceTtl, _ := time.ParseDuration(cfg.BufferDurationTraces) maxTraceTtl, _ := time.ParseDuration(cfg.BufferDurationTraces)
logSampleProcessorLock.Lock() logSampleProcessorLock.Lock()
defer logSampleProcessorLock.Unlock() defer logSampleProcessorLock.Unlock()
if logSampleProcessor != nil { if existing, ok := logSampleProcessor[id]; ok {
if nextTracesConsumer != nil { if nextTracesConsumer != nil {
logSampleProcessor.nextTracesConsumer = nextTracesConsumer existing.nextTracesConsumer = nextTracesConsumer
} }
if nextLogsConsumer != nil { if nextLogsConsumer != nil {
logSampleProcessor.nextLogsConsumer = nextLogsConsumer existing.nextLogsConsumer = nextLogsConsumer
} }
} else { return existing
logSampleProcessor = &LogSamplerProcessor{
logger: logger,
nextTracesConsumer: nextTracesConsumer,
nextLogsConsumer: nextLogsConsumer,
config: cfg,
// TODO: Pass size from config as well.
traceIdTtlMap: traceIdTtlMap.New(1000, int(maxTraceTtl.Seconds())),
}
} }
return logSampleProcessor p := &LogSamplerProcessor{
logger: logger,
nextTracesConsumer: nextTracesConsumer,
nextLogsConsumer: nextLogsConsumer,
config: cfg,
traceIdTtlMap: traceIdTtlMap.New(1000, int(maxTraceTtl.Seconds())),
}
logSampleProcessor[id] = p
return p
} }
func (tp *LogSamplerProcessor) Start(ctx context.Context, host component.Host) error { func (tp *LogSamplerProcessor) Start(ctx context.Context, host component.Host) error {
@@ -124,7 +125,6 @@ func (tp *LogSamplerProcessor) ConsumeLogs(ctx context.Context, log plog.Logs) e
go func(ctx context.Context, log plog.Logs) { go func(ctx context.Context, log plog.Logs) {
tp.logger.Debug("Log added to buffer", zap.String("traceId", hex.EncodeToString(traceId[:]))) tp.logger.Debug("Log added to buffer", zap.String("traceId", hex.EncodeToString(traceId[:])))
// TODO: Find a better place for the parsed config, instead of using the non-parsed strings.
duration, _ := time.ParseDuration(tp.config.BufferDurationLogs) duration, _ := time.ParseDuration(tp.config.BufferDurationLogs)
time.Sleep(duration) time.Sleep(duration)