2 Commits

Author SHA1 Message Date
d38a5e7607 add basic internal metrics
All checks were successful
CI / test (push) Successful in 22s
2025-05-19 21:19:32 +02:00
00944d01ad add Get method to traceIdTtlMap 2025-05-19 21:19:24 +02:00
5 changed files with 106 additions and 61 deletions

View File

@@ -1,31 +0,0 @@
name: CI & Release
on:
push:
branches:
- main
jobs:
ci:
uses: ./.gitea/workflows/ci.yaml
release:
needs: ci
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
with:
fetch-depth: 0
- uses: actions/setup-node@v4
with:
node-version: 22
- run: npm install -g standard-version
- run: |
git config --global user.name "Release Bot"
git config --global user.email "release-bot@t00n.de"
- run: |
standard-version --no-verify --skip-changelog
tag=$(git describe --tags --abbrev=0)
git push origin "$tag"
env:
GITHUB_TOKEN: ${{ secrets.GITEA_TOKEN }}

View File

@@ -4,7 +4,6 @@ on:
push:
branches-ignore:
- main
workflow_call:
jobs:
test:

View File

@@ -1,11 +0,0 @@
# Changelog
All notable changes to this project will be documented in this file. See [standard-version](https://github.com/conventional-changelog/standard-version) for commit guidelines.
## 1.1.0 (2025-05-20)
### Features
* refactor processor singleton to multi singleton ([#5](https://gitea.t000-n.de/t.behrendt/tracebasedlogsampler/issues/5)) ([73f2d9d](https://gitea.t000-n.de/t.behrendt/tracebasedlogsampler/commit/73f2d9d460e4f476d973965b264dabf4e16bb8b7))
* support batches ([#8](https://gitea.t000-n.de/t.behrendt/tracebasedlogsampler/issues/8)) ([ce7d18e](https://gitea.t000-n.de/t.behrendt/tracebasedlogsampler/commit/ce7d18e8804267745851c06ebe400d9c7a56930d))

View File

@@ -90,19 +90,7 @@ func (m *TTLMap) Add(key string) {
* Removes the trace id from the map if it has expired.
*/
func (m *TTLMap) Exists(key string) bool {
m.mu.RLock()
value, ok := m.m[key]
m.mu.RUnlock()
if ok {
if value < time.Now().Unix() {
m.mu.Lock()
delete(m.m, key)
m.mu.Unlock()
return false
}
}
_, ok := m.Get(key)
return ok
}
@@ -116,6 +104,26 @@ func (m *TTLMap) insertEntry(key string, value int64) {
m.mu.Unlock()
}
/**
* Gets an entry from the map.
*/
func (m *TTLMap) Get(key string) (int64, bool) {
m.mu.RLock()
value, ok := m.m[key]
m.mu.RUnlock()
if ok {
if value < time.Now().Unix() {
m.mu.Lock()
delete(m.m, key)
m.mu.Unlock()
return 0, false
}
}
return value, ok
}
/**
* Gets an entry from the map.
* Only used for testing.
@@ -124,7 +132,6 @@ func (m *TTLMap) getEntry(key string) (int64, bool) {
m.mu.RLock()
value, ok := m.m[key]
m.mu.RUnlock()
return value, ok
}

View File

@@ -12,9 +12,63 @@ import (
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/metric"
"go.uber.org/zap"
)
var (
meter = otel.GetMeterProvider().Meter("tracebasedlogsampler")
tracesProcessedCounter, _ = meter.Int64Counter(
"otelcol_processor_tracebasedlogsampler_traces_processed",
metric.WithDescription("Number of traces processed"),
metric.WithUnit("1"),
)
emptyTraceIdsCounter, _ = meter.Int64Counter(
"otelcol_processor_tracebasedlogsampler_traces_empty_trace_ids",
metric.WithDescription("Number of spans with empty trace IDs"),
metric.WithUnit("1"),
)
traceProcessingTimeHistogram, _ = meter.Int64Histogram(
"otelcol_processor_tracebasedlogsampler_traces_processing_time",
metric.WithDescription("Time spent processing traces"),
metric.WithUnit("ms"),
)
logsProcessedCounter, _ = meter.Int64Counter(
"otelcol_processor_tracebasedlogsampler_logs_processed",
metric.WithDescription("Number of log batches processed"),
metric.WithUnit("1"),
)
logsForwardedCounter, _ = meter.Int64Counter(
"otelcol_processor_tracebasedlogsampler_logs_forwarded",
metric.WithDescription("Number of log batches forwarded"),
metric.WithUnit("1"),
)
logsDiscardedCounter, _ = meter.Int64Counter(
"otelcol_processor_tracebasedlogsampler_logs_discarded",
metric.WithDescription("Number of log batches discarded"),
metric.WithUnit("1"),
)
logProcessingTimeHistogram, _ = meter.Int64Histogram(
"otelcol_processor_tracebasedlogsampler_logs_processing_time",
metric.WithDescription("Time spent processing logs"),
metric.WithUnit("ms"),
)
traceAgeAtLogCheckHistogram, _ = meter.Int64Histogram(
"otelcol_processor_tracebasedlogsampler_logs_trace_age_at_log_check",
metric.WithDescription("Age of trace when checking logs after buffer expiration"),
metric.WithUnit("ms"),
)
)
type LogSamplerProcessor struct {
host component.Host
cancel context.CancelFunc
@@ -92,6 +146,9 @@ func (tp *LogSamplerProcessor) Capabilities() consumer.Capabilities {
* Every trace's trace id that is processed by the processor is being remembered.
*/
func (tp *LogSamplerProcessor) ConsumeTraces(ctx context.Context, td ptrace.Traces) error {
startTime := time.Now()
tracesProcessedCounter.Add(ctx, 1)
for i := range td.ResourceSpans().Len() {
resourceSpans := td.ResourceSpans().At(i)
@@ -107,11 +164,16 @@ func (tp *LogSamplerProcessor) ConsumeTraces(ctx context.Context, td ptrace.Trac
tp.traceIdTtlMap.Add(traceIdStr)
tp.logger.Debug("Trace added to buffer", zap.String("traceId", traceIdStr))
} else {
emptyTraceIdsCounter.Add(ctx, 1)
tp.logger.Warn("Empty trace ID encountered")
}
}
}
}
traceProcessingTimeHistogram.Record(ctx, time.Since(startTime).Milliseconds())
return tp.nextTracesConsumer.ConsumeTraces(ctx, td)
}
@@ -125,6 +187,9 @@ func (tp *LogSamplerProcessor) ConsumeTraces(ctx context.Context, td ptrace.Trac
* If not, discard the log.
*/
func (tp *LogSamplerProcessor) ConsumeLogs(ctx context.Context, logs plog.Logs) error {
startTime := time.Now()
logsProcessedCounter.Add(ctx, 1)
// Rough estimation of the number of unique trace ids in the logs as the number of logs itself.
// It may be an option to make this configurable or use runtime metrics to zero in on the right size.
logsByTraceId := make(map[string]plog.Logs, logs.LogRecordCount())
@@ -153,6 +218,7 @@ func (tp *LogSamplerProcessor) ConsumeLogs(ctx context.Context, logs plog.Logs)
newRecord := batchScopeLogs.LogRecords().AppendEmpty()
logRecord.CopyTo(newRecord)
} else {
emptyTraceIdsCounter.Add(ctx, 1)
tp.logger.Warn("Log has no trace id", zap.Any("log", logs))
}
}
@@ -160,9 +226,15 @@ func (tp *LogSamplerProcessor) ConsumeLogs(ctx context.Context, logs plog.Logs)
}
for traceIdStr, batch := range logsByTraceId {
exists := tp.traceIdTtlMap.Exists(traceIdStr)
if exists {
ttl, ok := tp.traceIdTtlMap.Get(traceIdStr)
if ok && ttl > 0 {
// The tll value is the deletion time. So the traceId was initially stored at the ttl time - buffer duration.
bufferDuration, _ := time.ParseDuration(tp.config.BufferDurationTraces)
traceAge := time.Since(time.Unix(ttl, 0).Add(-bufferDuration))
traceAgeAtLogCheckHistogram.Record(ctx, traceAge.Milliseconds())
tp.logger.Debug("Logs forwarded directly", zap.String("traceId", traceIdStr))
logsForwardedCounter.Add(ctx, 1)
tp.nextLogsConsumer.ConsumeLogs(ctx, batch)
continue
}
@@ -172,15 +244,24 @@ func (tp *LogSamplerProcessor) ConsumeLogs(ctx context.Context, logs plog.Logs)
duration, _ := time.ParseDuration(tp.config.BufferDurationLogs)
time.Sleep(duration)
exists := tp.traceIdTtlMap.Exists(traceIdStr)
if exists {
ttl, ok := tp.traceIdTtlMap.Get(traceIdStr)
if ok && ttl > 0 {
// The tll value is the deletion time. So the traceId was initially stored at the ttl time - buffer duration.
bufferDuration, _ := time.ParseDuration(tp.config.BufferDurationTraces)
traceAge := time.Since(time.Unix(ttl, 0).Add(-bufferDuration))
tp.logger.Debug("Logs forwarded after buffer expiration", zap.String("traceId", traceIdStr))
logsForwardedCounter.Add(ctx, 1)
traceAgeAtLogCheckHistogram.Record(ctx, traceAge.Milliseconds())
tp.nextLogsConsumer.ConsumeLogs(ctx, batch)
} else {
tp.logger.Debug("Logs discarded", zap.String("traceId", traceIdStr))
logsDiscardedCounter.Add(ctx, 1)
}
}(ctx, traceIdStr, batch)
}
logProcessingTimeHistogram.Record(ctx, time.Since(startTime).Milliseconds())
return nil
}