feat: refactor processor singleton to multi singleton #5
@@ -38,7 +38,7 @@ func createTracesProcessor(ctx context.Context, set processor.Settings, cfg comp
|
||||
logger := set.Logger
|
||||
tpCfg := cfg.(*Config)
|
||||
|
||||
traceProc := NewLogSamplerProcessorSingleton(logger, nextConsumer, nil, tpCfg)
|
||||
traceProc := NewLogSamplerProcessorSingleton(set.ID, logger, nextConsumer, nil, tpCfg)
|
||||
|
||||
return traceProc, nil
|
||||
}
|
||||
@@ -47,7 +47,7 @@ func createLogsProcessor(ctx context.Context, set processor.Settings, cfg compon
|
||||
logger := set.Logger
|
||||
lpCfg := cfg.(*Config)
|
||||
|
||||
logProc := NewLogSamplerProcessorSingleton(logger, nil, nextConsumer, lpCfg)
|
||||
logProc := NewLogSamplerProcessorSingleton(set.ID, logger, nil, nextConsumer, lpCfg)
|
||||
|
||||
return logProc, nil
|
||||
}
|
||||
|
||||
32
processor.go
32
processor.go
@@ -27,38 +27,39 @@ type LogSamplerProcessor struct {
|
||||
}
|
||||
|
||||
var logSampleProcessorLock = sync.Mutex{}
|
||||
var logSampleProcessor *LogSamplerProcessor
|
||||
var logSampleProcessor = make(map[component.ID]*LogSamplerProcessor)
|
||||
|
||||
/**
|
||||
* Creates a new LogSamplerProcessor as a singleton.
|
||||
*/
|
||||
func NewLogSamplerProcessorSingleton(logger *zap.Logger, nextTracesConsumer consumer.Traces, nextLogsConsumer consumer.Logs, cfg *Config) *LogSamplerProcessor {
|
||||
func NewLogSamplerProcessorSingleton(id component.ID, logger *zap.Logger, nextTracesConsumer consumer.Traces, nextLogsConsumer consumer.Logs, cfg *Config) *LogSamplerProcessor {
|
||||
maxTraceTtl, _ := time.ParseDuration(cfg.BufferDurationTraces)
|
||||
|
||||
logSampleProcessorLock.Lock()
|
||||
defer logSampleProcessorLock.Unlock()
|
||||
|
||||
if logSampleProcessor != nil {
|
||||
if existing, ok := logSampleProcessor[id]; ok {
|
||||
if nextTracesConsumer != nil {
|
||||
logSampleProcessor.nextTracesConsumer = nextTracesConsumer
|
||||
existing.nextTracesConsumer = nextTracesConsumer
|
||||
}
|
||||
|
||||
if nextLogsConsumer != nil {
|
||||
logSampleProcessor.nextLogsConsumer = nextLogsConsumer
|
||||
existing.nextLogsConsumer = nextLogsConsumer
|
||||
}
|
||||
|
||||
} else {
|
||||
logSampleProcessor = &LogSamplerProcessor{
|
||||
logger: logger,
|
||||
nextTracesConsumer: nextTracesConsumer,
|
||||
nextLogsConsumer: nextLogsConsumer,
|
||||
config: cfg,
|
||||
// TODO: Pass size from config as well.
|
||||
traceIdTtlMap: traceIdTtlMap.New(1000, int(maxTraceTtl.Seconds())),
|
||||
}
|
||||
return existing
|
||||
}
|
||||
|
||||
return logSampleProcessor
|
||||
p := &LogSamplerProcessor{
|
||||
logger: logger,
|
||||
nextTracesConsumer: nextTracesConsumer,
|
||||
nextLogsConsumer: nextLogsConsumer,
|
||||
config: cfg,
|
||||
traceIdTtlMap: traceIdTtlMap.New(1000, int(maxTraceTtl.Seconds())),
|
||||
}
|
||||
logSampleProcessor[id] = p
|
||||
|
||||
return p
|
||||
}
|
||||
|
||||
func (tp *LogSamplerProcessor) Start(ctx context.Context, host component.Host) error {
|
||||
@@ -124,7 +125,6 @@ func (tp *LogSamplerProcessor) ConsumeLogs(ctx context.Context, log plog.Logs) e
|
||||
go func(ctx context.Context, log plog.Logs) {
|
||||
tp.logger.Debug("Log added to buffer", zap.String("traceId", hex.EncodeToString(traceId[:])))
|
||||
|
||||
// TODO: Find a better place for the parsed config, instead of using the non-parsed strings.
|
||||
duration, _ := time.ParseDuration(tp.config.BufferDurationLogs)
|
||||
time.Sleep(duration)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user