Compare commits
2 Commits
feat-add-i
...
v1.1.0
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ec27c88bba | ||
| a459994a24 |
31
.gitea/workflows/cd.yaml
Normal file
31
.gitea/workflows/cd.yaml
Normal file
@@ -0,0 +1,31 @@
|
|||||||
|
name: CI & Release
|
||||||
|
|
||||||
|
on:
|
||||||
|
push:
|
||||||
|
branches:
|
||||||
|
- main
|
||||||
|
|
||||||
|
jobs:
|
||||||
|
ci:
|
||||||
|
uses: ./.gitea/workflows/ci.yaml
|
||||||
|
|
||||||
|
release:
|
||||||
|
needs: ci
|
||||||
|
runs-on: ubuntu-latest
|
||||||
|
steps:
|
||||||
|
- uses: actions/checkout@v4
|
||||||
|
with:
|
||||||
|
fetch-depth: 0
|
||||||
|
- uses: actions/setup-node@v4
|
||||||
|
with:
|
||||||
|
node-version: 22
|
||||||
|
- run: npm install -g standard-version
|
||||||
|
- run: |
|
||||||
|
git config --global user.name "Release Bot"
|
||||||
|
git config --global user.email "release-bot@t00n.de"
|
||||||
|
- run: |
|
||||||
|
standard-version --no-verify --skip-changelog
|
||||||
|
tag=$(git describe --tags --abbrev=0)
|
||||||
|
git push origin "$tag"
|
||||||
|
env:
|
||||||
|
GITHUB_TOKEN: ${{ secrets.GITEA_TOKEN }}
|
||||||
@@ -4,6 +4,7 @@ on:
|
|||||||
push:
|
push:
|
||||||
branches-ignore:
|
branches-ignore:
|
||||||
- main
|
- main
|
||||||
|
workflow_call:
|
||||||
|
|
||||||
jobs:
|
jobs:
|
||||||
test:
|
test:
|
||||||
|
|||||||
11
CHANGELOG.md
Normal file
11
CHANGELOG.md
Normal file
@@ -0,0 +1,11 @@
|
|||||||
|
# Changelog
|
||||||
|
|
||||||
|
All notable changes to this project will be documented in this file. See [standard-version](https://github.com/conventional-changelog/standard-version) for commit guidelines.
|
||||||
|
|
||||||
|
## 1.1.0 (2025-05-20)
|
||||||
|
|
||||||
|
|
||||||
|
### Features
|
||||||
|
|
||||||
|
* refactor processor singleton to multi singleton ([#5](https://gitea.t000-n.de/t.behrendt/tracebasedlogsampler/issues/5)) ([73f2d9d](https://gitea.t000-n.de/t.behrendt/tracebasedlogsampler/commit/73f2d9d460e4f476d973965b264dabf4e16bb8b7))
|
||||||
|
* support batches ([#8](https://gitea.t000-n.de/t.behrendt/tracebasedlogsampler/issues/8)) ([ce7d18e](https://gitea.t000-n.de/t.behrendt/tracebasedlogsampler/commit/ce7d18e8804267745851c06ebe400d9c7a56930d))
|
||||||
@@ -90,7 +90,19 @@ func (m *TTLMap) Add(key string) {
|
|||||||
* Removes the trace id from the map if it has expired.
|
* Removes the trace id from the map if it has expired.
|
||||||
*/
|
*/
|
||||||
func (m *TTLMap) Exists(key string) bool {
|
func (m *TTLMap) Exists(key string) bool {
|
||||||
_, ok := m.Get(key)
|
m.mu.RLock()
|
||||||
|
value, ok := m.m[key]
|
||||||
|
m.mu.RUnlock()
|
||||||
|
|
||||||
|
if ok {
|
||||||
|
if value < time.Now().Unix() {
|
||||||
|
m.mu.Lock()
|
||||||
|
delete(m.m, key)
|
||||||
|
m.mu.Unlock()
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return ok
|
return ok
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -104,26 +116,6 @@ func (m *TTLMap) insertEntry(key string, value int64) {
|
|||||||
m.mu.Unlock()
|
m.mu.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Gets an entry from the map.
|
|
||||||
*/
|
|
||||||
func (m *TTLMap) Get(key string) (int64, bool) {
|
|
||||||
m.mu.RLock()
|
|
||||||
value, ok := m.m[key]
|
|
||||||
m.mu.RUnlock()
|
|
||||||
|
|
||||||
if ok {
|
|
||||||
if value < time.Now().Unix() {
|
|
||||||
m.mu.Lock()
|
|
||||||
delete(m.m, key)
|
|
||||||
m.mu.Unlock()
|
|
||||||
return 0, false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return value, ok
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Gets an entry from the map.
|
* Gets an entry from the map.
|
||||||
* Only used for testing.
|
* Only used for testing.
|
||||||
@@ -132,6 +124,7 @@ func (m *TTLMap) getEntry(key string) (int64, bool) {
|
|||||||
m.mu.RLock()
|
m.mu.RLock()
|
||||||
value, ok := m.m[key]
|
value, ok := m.m[key]
|
||||||
m.mu.RUnlock()
|
m.mu.RUnlock()
|
||||||
|
|
||||||
return value, ok
|
return value, ok
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
89
processor.go
89
processor.go
@@ -12,63 +12,9 @@ import (
|
|||||||
"go.opentelemetry.io/collector/consumer"
|
"go.opentelemetry.io/collector/consumer"
|
||||||
"go.opentelemetry.io/collector/pdata/plog"
|
"go.opentelemetry.io/collector/pdata/plog"
|
||||||
"go.opentelemetry.io/collector/pdata/ptrace"
|
"go.opentelemetry.io/collector/pdata/ptrace"
|
||||||
"go.opentelemetry.io/otel"
|
|
||||||
"go.opentelemetry.io/otel/metric"
|
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
|
||||||
meter = otel.GetMeterProvider().Meter("tracebasedlogsampler")
|
|
||||||
|
|
||||||
tracesProcessedCounter, _ = meter.Int64Counter(
|
|
||||||
"otelcol_processor_tracebasedlogsampler_traces_processed",
|
|
||||||
metric.WithDescription("Number of traces processed"),
|
|
||||||
metric.WithUnit("1"),
|
|
||||||
)
|
|
||||||
|
|
||||||
emptyTraceIdsCounter, _ = meter.Int64Counter(
|
|
||||||
"otelcol_processor_tracebasedlogsampler_traces_empty_trace_ids",
|
|
||||||
metric.WithDescription("Number of spans with empty trace IDs"),
|
|
||||||
metric.WithUnit("1"),
|
|
||||||
)
|
|
||||||
|
|
||||||
traceProcessingTimeHistogram, _ = meter.Int64Histogram(
|
|
||||||
"otelcol_processor_tracebasedlogsampler_traces_processing_time",
|
|
||||||
metric.WithDescription("Time spent processing traces"),
|
|
||||||
metric.WithUnit("ms"),
|
|
||||||
)
|
|
||||||
|
|
||||||
logsProcessedCounter, _ = meter.Int64Counter(
|
|
||||||
"otelcol_processor_tracebasedlogsampler_logs_processed",
|
|
||||||
metric.WithDescription("Number of log batches processed"),
|
|
||||||
metric.WithUnit("1"),
|
|
||||||
)
|
|
||||||
|
|
||||||
logsForwardedCounter, _ = meter.Int64Counter(
|
|
||||||
"otelcol_processor_tracebasedlogsampler_logs_forwarded",
|
|
||||||
metric.WithDescription("Number of log batches forwarded"),
|
|
||||||
metric.WithUnit("1"),
|
|
||||||
)
|
|
||||||
|
|
||||||
logsDiscardedCounter, _ = meter.Int64Counter(
|
|
||||||
"otelcol_processor_tracebasedlogsampler_logs_discarded",
|
|
||||||
metric.WithDescription("Number of log batches discarded"),
|
|
||||||
metric.WithUnit("1"),
|
|
||||||
)
|
|
||||||
|
|
||||||
logProcessingTimeHistogram, _ = meter.Int64Histogram(
|
|
||||||
"otelcol_processor_tracebasedlogsampler_logs_processing_time",
|
|
||||||
metric.WithDescription("Time spent processing logs"),
|
|
||||||
metric.WithUnit("ms"),
|
|
||||||
)
|
|
||||||
|
|
||||||
traceAgeAtLogCheckHistogram, _ = meter.Int64Histogram(
|
|
||||||
"otelcol_processor_tracebasedlogsampler_logs_trace_age_at_log_check",
|
|
||||||
metric.WithDescription("Age of trace when checking logs after buffer expiration"),
|
|
||||||
metric.WithUnit("ms"),
|
|
||||||
)
|
|
||||||
)
|
|
||||||
|
|
||||||
type LogSamplerProcessor struct {
|
type LogSamplerProcessor struct {
|
||||||
host component.Host
|
host component.Host
|
||||||
cancel context.CancelFunc
|
cancel context.CancelFunc
|
||||||
@@ -146,9 +92,6 @@ func (tp *LogSamplerProcessor) Capabilities() consumer.Capabilities {
|
|||||||
* Every trace's trace id that is processed by the processor is being remembered.
|
* Every trace's trace id that is processed by the processor is being remembered.
|
||||||
*/
|
*/
|
||||||
func (tp *LogSamplerProcessor) ConsumeTraces(ctx context.Context, td ptrace.Traces) error {
|
func (tp *LogSamplerProcessor) ConsumeTraces(ctx context.Context, td ptrace.Traces) error {
|
||||||
startTime := time.Now()
|
|
||||||
tracesProcessedCounter.Add(ctx, 1)
|
|
||||||
|
|
||||||
for i := range td.ResourceSpans().Len() {
|
for i := range td.ResourceSpans().Len() {
|
||||||
resourceSpans := td.ResourceSpans().At(i)
|
resourceSpans := td.ResourceSpans().At(i)
|
||||||
|
|
||||||
@@ -164,16 +107,11 @@ func (tp *LogSamplerProcessor) ConsumeTraces(ctx context.Context, td ptrace.Trac
|
|||||||
tp.traceIdTtlMap.Add(traceIdStr)
|
tp.traceIdTtlMap.Add(traceIdStr)
|
||||||
|
|
||||||
tp.logger.Debug("Trace added to buffer", zap.String("traceId", traceIdStr))
|
tp.logger.Debug("Trace added to buffer", zap.String("traceId", traceIdStr))
|
||||||
} else {
|
|
||||||
emptyTraceIdsCounter.Add(ctx, 1)
|
|
||||||
tp.logger.Warn("Empty trace ID encountered")
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
traceProcessingTimeHistogram.Record(ctx, time.Since(startTime).Milliseconds())
|
|
||||||
|
|
||||||
return tp.nextTracesConsumer.ConsumeTraces(ctx, td)
|
return tp.nextTracesConsumer.ConsumeTraces(ctx, td)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -187,9 +125,6 @@ func (tp *LogSamplerProcessor) ConsumeTraces(ctx context.Context, td ptrace.Trac
|
|||||||
* If not, discard the log.
|
* If not, discard the log.
|
||||||
*/
|
*/
|
||||||
func (tp *LogSamplerProcessor) ConsumeLogs(ctx context.Context, logs plog.Logs) error {
|
func (tp *LogSamplerProcessor) ConsumeLogs(ctx context.Context, logs plog.Logs) error {
|
||||||
startTime := time.Now()
|
|
||||||
logsProcessedCounter.Add(ctx, 1)
|
|
||||||
|
|
||||||
// Rough estimation of the number of unique trace ids in the logs as the number of logs itself.
|
// Rough estimation of the number of unique trace ids in the logs as the number of logs itself.
|
||||||
// It may be an option to make this configurable or use runtime metrics to zero in on the right size.
|
// It may be an option to make this configurable or use runtime metrics to zero in on the right size.
|
||||||
logsByTraceId := make(map[string]plog.Logs, logs.LogRecordCount())
|
logsByTraceId := make(map[string]plog.Logs, logs.LogRecordCount())
|
||||||
@@ -218,7 +153,6 @@ func (tp *LogSamplerProcessor) ConsumeLogs(ctx context.Context, logs plog.Logs)
|
|||||||
newRecord := batchScopeLogs.LogRecords().AppendEmpty()
|
newRecord := batchScopeLogs.LogRecords().AppendEmpty()
|
||||||
logRecord.CopyTo(newRecord)
|
logRecord.CopyTo(newRecord)
|
||||||
} else {
|
} else {
|
||||||
emptyTraceIdsCounter.Add(ctx, 1)
|
|
||||||
tp.logger.Warn("Log has no trace id", zap.Any("log", logs))
|
tp.logger.Warn("Log has no trace id", zap.Any("log", logs))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -226,15 +160,9 @@ func (tp *LogSamplerProcessor) ConsumeLogs(ctx context.Context, logs plog.Logs)
|
|||||||
}
|
}
|
||||||
|
|
||||||
for traceIdStr, batch := range logsByTraceId {
|
for traceIdStr, batch := range logsByTraceId {
|
||||||
ttl, ok := tp.traceIdTtlMap.Get(traceIdStr)
|
exists := tp.traceIdTtlMap.Exists(traceIdStr)
|
||||||
if ok && ttl > 0 {
|
if exists {
|
||||||
// The tll value is the deletion time. So the traceId was initially stored at the ttl time - buffer duration.
|
|
||||||
bufferDuration, _ := time.ParseDuration(tp.config.BufferDurationTraces)
|
|
||||||
traceAge := time.Since(time.Unix(ttl, 0).Add(-bufferDuration))
|
|
||||||
traceAgeAtLogCheckHistogram.Record(ctx, traceAge.Milliseconds())
|
|
||||||
|
|
||||||
tp.logger.Debug("Logs forwarded directly", zap.String("traceId", traceIdStr))
|
tp.logger.Debug("Logs forwarded directly", zap.String("traceId", traceIdStr))
|
||||||
logsForwardedCounter.Add(ctx, 1)
|
|
||||||
tp.nextLogsConsumer.ConsumeLogs(ctx, batch)
|
tp.nextLogsConsumer.ConsumeLogs(ctx, batch)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
@@ -244,24 +172,15 @@ func (tp *LogSamplerProcessor) ConsumeLogs(ctx context.Context, logs plog.Logs)
|
|||||||
duration, _ := time.ParseDuration(tp.config.BufferDurationLogs)
|
duration, _ := time.ParseDuration(tp.config.BufferDurationLogs)
|
||||||
time.Sleep(duration)
|
time.Sleep(duration)
|
||||||
|
|
||||||
ttl, ok := tp.traceIdTtlMap.Get(traceIdStr)
|
exists := tp.traceIdTtlMap.Exists(traceIdStr)
|
||||||
if ok && ttl > 0 {
|
if exists {
|
||||||
// The tll value is the deletion time. So the traceId was initially stored at the ttl time - buffer duration.
|
|
||||||
bufferDuration, _ := time.ParseDuration(tp.config.BufferDurationTraces)
|
|
||||||
traceAge := time.Since(time.Unix(ttl, 0).Add(-bufferDuration))
|
|
||||||
|
|
||||||
tp.logger.Debug("Logs forwarded after buffer expiration", zap.String("traceId", traceIdStr))
|
tp.logger.Debug("Logs forwarded after buffer expiration", zap.String("traceId", traceIdStr))
|
||||||
logsForwardedCounter.Add(ctx, 1)
|
|
||||||
traceAgeAtLogCheckHistogram.Record(ctx, traceAge.Milliseconds())
|
|
||||||
tp.nextLogsConsumer.ConsumeLogs(ctx, batch)
|
tp.nextLogsConsumer.ConsumeLogs(ctx, batch)
|
||||||
} else {
|
} else {
|
||||||
tp.logger.Debug("Logs discarded", zap.String("traceId", traceIdStr))
|
tp.logger.Debug("Logs discarded", zap.String("traceId", traceIdStr))
|
||||||
logsDiscardedCounter.Add(ctx, 1)
|
|
||||||
}
|
}
|
||||||
}(ctx, traceIdStr, batch)
|
}(ctx, traceIdStr, batch)
|
||||||
}
|
}
|
||||||
|
|
||||||
logProcessingTimeHistogram.Record(ctx, time.Since(startTime).Milliseconds())
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user