commit f5fbad64d3f8269915914150acc95602ef4b4632 Author: Timo Behrendt Date: Tue Apr 29 18:15:33 2025 +0200 initial commit diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..f6b52e2 --- /dev/null +++ b/Makefile @@ -0,0 +1,5 @@ +test: + go test -cover -race ./... + +benchmark: + go test -bench=. -benchtime=30s -benchmem ./... diff --git a/README.md b/README.md new file mode 100644 index 0000000..247ec24 --- /dev/null +++ b/README.md @@ -0,0 +1,62 @@ +# Trace-based Log Sampling + +This processor is used to sample logs based on the sampling decision of the trace they correlate to. + +## How It Works + +When a trace is sampled, the processor caches its `traceId`. +Logs are then filtered: + +- If a log references a known sampled `traceId`, it is beimg forwarded. +- If a log references an unknown or unsampled `traceId`, it is buffered for a certain amount of time. After the buffer time expires, the `traceId` is checked again. If it exists, the log is forwarded. If not, it is discarded. + +## Configuration + +| Field | Type | Default | Description | +| ---------------------- | -------- | ------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ | +| buffer_duration_traces | duration | 180s | The duration for which traceIds are being remembered. The timer starts when the first trace or span of one traceId is received. | +| buffer_duration_logs | duration | 90s | The duration for which logs are being buffered for, before being re-evaluated. If your pipeline includes e.g. a tailbasedsampler processor, set this to above it's collection time. This ensures that logs "wait" until the traces have been processed | + +### Example Configuration + +The followinh config is an example configuration for the processor. It is configured to buffer traceIds for `180 seconds` and logs for `90 seconds`. + +Note that both a traces and logs pipeline is required and both have to use the same instance of the processor. + +```yaml +receivers: + otlp: + protocols: + grpc: + endpoint: 0.0.0.0:4317 + +exporters: + otlp: + endpoint: 0.0.0.0:4317 + +processors: + logtracesampler: + buffer_duration_traces: 180s + buffer_duration_logs: 90s + +service: + pipelines: + traces: + receivers: [otlp] + processors: [logtracesampler] + exporters: [otlp] + + logs: + receivers: [otlp] + processors: [logtracesampler] + exporters: [otlp] +``` + +## Building + +When building a custom collector you can add this processor to the mainfest like the following (refer to [Building a custom collector](https://opentelemetry.io/docs/collector/custom-collector/) for more information): + +```yaml +processors: + - gomod: gitea.t000-n.de/t.behrendt/tracebasedlogsampler v0.0.0 +``` diff --git a/config.go b/config.go new file mode 100644 index 0000000..783a619 --- /dev/null +++ b/config.go @@ -0,0 +1,28 @@ +package tracebasedlogsampler + +import ( + "fmt" + "time" +) + +type Config struct { + // How long to buffer trace ids. Dictates how long to wait for logs to arrive. + BufferDurationTraces string `mapstructure:"buffer_duration_traces"` + + // How long to buffer logs, before checking if a trace id exists in the trace id buffer. + BufferDurationLogs string `mapstructure:"buffer_duration_logs"` +} + +func (cfg *Config) Validate() error { + bufferDurationTraces, _ := time.ParseDuration(cfg.BufferDurationTraces) + if bufferDurationTraces.Minutes() <= 0 { + return fmt.Errorf("buffer_duration_traces must be greater than 0") + } + + bufferDurationLogs, _ := time.ParseDuration(cfg.BufferDurationLogs) + if bufferDurationLogs <= 0 { + return fmt.Errorf("buffer_duration_logs must be greater than 0") + } + + return nil +} diff --git a/config_test.go b/config_test.go new file mode 100644 index 0000000..83c7144 --- /dev/null +++ b/config_test.go @@ -0,0 +1,48 @@ +package tracebasedlogsampler + +import ( + "path/filepath" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/confmap/confmaptest" +) + +func TestLoadConfig(t *testing.T) { + cm, err := confmaptest.LoadConf(filepath.Join("testdata", "sample_config.yaml")) + assert.NoError(t, err) + + factory := NewFactory() + cfg := factory.CreateDefaultConfig() + + sub, err := cm.Sub(component.NewIDWithName(typeStr, "").String()) + assert.NoError(t, err) + require.NoError(t, sub.Unmarshal(cfg)) + + assert.Equal(t, &Config{ + BufferDurationTraces: "180s", + BufferDurationLogs: "90s", + }, cfg) +} + +func TestValidate(t *testing.T) { + cfg := &Config{ + BufferDurationTraces: "0", + BufferDurationLogs: "10s", + } + assert.Error(t, cfg.Validate()) + + cfg = &Config{ + BufferDurationTraces: "10s", + BufferDurationLogs: "0", + } + assert.Error(t, cfg.Validate()) + + cfg = &Config{ + BufferDurationTraces: "10s", + BufferDurationLogs: "10s", + } + assert.NoError(t, cfg.Validate()) +} diff --git a/factory.go b/factory.go new file mode 100644 index 0000000..97fb3a3 --- /dev/null +++ b/factory.go @@ -0,0 +1,53 @@ +package tracebasedlogsampler + +import ( + "context" + "time" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/processor" +) + +var ( + typeStr = component.MustNewType("tracebasedlogsampler") +) + +const ( + defaultTraceBufferDuration = 180 * time.Second + defaultLogBufferDuration = 90 * time.Second +) + +func NewFactory() processor.Factory { + return processor.NewFactory( + typeStr, + createDefaultConfig, + processor.WithTraces(createTracesProcessor, component.StabilityLevelDevelopment), + processor.WithLogs(createLogsProcessor, component.StabilityLevelDevelopment), + ) +} + +func createDefaultConfig() component.Config { + return &Config{ + BufferDurationTraces: defaultTraceBufferDuration.String(), + BufferDurationLogs: defaultLogBufferDuration.String(), + } +} + +func createTracesProcessor(ctx context.Context, set processor.Settings, cfg component.Config, nextConsumer consumer.Traces) (processor.Traces, error) { + logger := set.Logger + tpCfg := cfg.(*Config) + + traceProc := NewLogSamplerProcessorSingleton(logger, nextConsumer, nil, tpCfg) + + return traceProc, nil +} + +func createLogsProcessor(ctx context.Context, set processor.Settings, cfg component.Config, nextConsumer consumer.Logs) (processor.Logs, error) { + logger := set.Logger + lpCfg := cfg.(*Config) + + logProc := NewLogSamplerProcessorSingleton(logger, nil, nextConsumer, lpCfg) + + return logProc, nil +} diff --git a/factory_test.go b/factory_test.go new file mode 100644 index 0000000..a415cb4 --- /dev/null +++ b/factory_test.go @@ -0,0 +1,66 @@ +package tracebasedlogsampler + +import ( + "context" + "path/filepath" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/confmap/confmaptest" + "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/processor/processortest" +) + +func TestCreateDefaultConfig(t *testing.T) { + cfg := createDefaultConfig() + + assert.NotNil(t, cfg) + assert.NoError(t, componenttest.CheckConfigStruct(cfg)) +} + +func TestCreateTracesProcessor(t *testing.T) { + cm, err := confmaptest.LoadConf(filepath.Join("testdata", "sample_config.yaml")) + require.NoError(t, err) + + factory := NewFactory() + cfg := factory.CreateDefaultConfig() + + sub, err := cm.Sub(component.NewIDWithName(typeStr, "").String()) + require.NoError(t, err) + require.NoError(t, sub.Unmarshal(cfg)) + + params := processortest.NewNopSettings(typeStr) + tp, err := factory.CreateTraces(context.Background(), params, cfg, consumertest.NewNop()) + assert.NotNil(t, tp) + assert.NoError(t, err, "cannot create trace processor") + + // this will cause the processor to properly initialize, so that we can later shutdown and + // have all the go routines cleanly shut down + assert.NoError(t, tp.Start(context.Background(), componenttest.NewNopHost())) + assert.NoError(t, tp.Shutdown(context.Background())) +} + +func TestCreateLogsProcessor(t *testing.T) { + cm, err := confmaptest.LoadConf(filepath.Join("testdata", "sample_config.yaml")) + require.NoError(t, err) + + factory := NewFactory() + cfg := factory.CreateDefaultConfig() + + sub, err := cm.Sub(component.NewIDWithName(typeStr, "").String()) + require.NoError(t, err) + require.NoError(t, sub.Unmarshal(cfg)) + + params := processortest.NewNopSettings(typeStr) + tp, err := factory.CreateLogs(context.Background(), params, cfg, consumertest.NewNop()) + assert.NotNil(t, tp) + assert.NoError(t, err, "cannot create log processor") + + // this will cause the processor to properly initialize, so that we can later shutdown and + // have all the go routines cleanly shut down + assert.NoError(t, tp.Start(context.Background(), componenttest.NewNopHost())) + assert.NoError(t, tp.Shutdown(context.Background())) +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..b193b39 --- /dev/null +++ b/go.mod @@ -0,0 +1,63 @@ +module gitea.t000-n.de/t.behrendt/tracebasedlogsampler + +go 1.24.2 + +require ( + github.com/stretchr/testify v1.10.0 + go.uber.org/zap v1.27.0 +) + +require ( + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/go-logr/logr v1.4.2 // indirect + github.com/go-logr/stdr v1.2.2 // indirect + github.com/go-viper/mapstructure/v2 v2.2.1 // indirect + github.com/gogo/protobuf v1.3.2 // indirect + github.com/google/uuid v1.6.0 // indirect + github.com/hashicorp/go-version v1.7.0 // indirect + github.com/json-iterator/go v1.1.12 // indirect + github.com/knadh/koanf/maps v0.1.2 // indirect + github.com/knadh/koanf/providers/confmap v1.0.0 // indirect + github.com/knadh/koanf/v2 v2.2.0 // indirect + github.com/mitchellh/copystructure v1.2.0 // indirect + github.com/mitchellh/reflectwalk v1.0.2 // indirect + github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect + github.com/modern-go/reflect2 v1.0.2 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + go.opentelemetry.io/auto/sdk v1.1.0 // indirect + go.opentelemetry.io/collector/component/componentstatus v0.125.0 // indirect + go.opentelemetry.io/collector/consumer/xconsumer v0.125.0 // indirect + go.opentelemetry.io/collector/featuregate v1.31.0 // indirect + go.opentelemetry.io/collector/internal/telemetry v0.125.0 // indirect + go.opentelemetry.io/collector/pdata/pprofile v0.125.0 // indirect + go.opentelemetry.io/collector/pdata/testdata v0.125.0 // indirect + go.opentelemetry.io/collector/pipeline v0.125.0 // indirect + go.opentelemetry.io/collector/processor/xprocessor v0.125.0 // indirect + go.opentelemetry.io/contrib/bridges/otelzap v0.10.0 // indirect + go.opentelemetry.io/otel v1.35.0 // indirect + go.opentelemetry.io/otel/log v0.11.0 // indirect + go.opentelemetry.io/otel/metric v1.35.0 // indirect + go.opentelemetry.io/otel/sdk v1.35.0 // indirect + go.opentelemetry.io/otel/sdk/metric v1.35.0 // indirect + go.opentelemetry.io/otel/trace v1.35.0 // indirect + golang.org/x/net v0.39.0 // indirect + golang.org/x/sys v0.32.0 // indirect + golang.org/x/text v0.24.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20250218202821-56aae31c358a // indirect + google.golang.org/grpc v1.72.0 // indirect + google.golang.org/protobuf v1.36.6 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect + sigs.k8s.io/yaml v1.4.0 // indirect +) + +require ( + go.opentelemetry.io/collector/component v1.31.0 + go.opentelemetry.io/collector/component/componenttest v0.125.0 + go.opentelemetry.io/collector/confmap v1.31.0 + go.opentelemetry.io/collector/consumer v1.31.0 + go.opentelemetry.io/collector/consumer/consumertest v0.125.0 + go.opentelemetry.io/collector/pdata v1.31.0 + go.opentelemetry.io/collector/processor v1.31.0 + go.opentelemetry.io/collector/processor/processortest v0.125.0 + go.uber.org/multierr v1.11.0 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..0c3aeaa --- /dev/null +++ b/go.sum @@ -0,0 +1,153 @@ +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= +github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/go-viper/mapstructure/v2 v2.2.1 h1:ZAaOCxANMuZx5RCeg0mBdEZk7DZasvvZIxtHqx8aGss= +github.com/go-viper/mapstructure/v2 v2.2.1/go.mod h1:oJDH3BJKyqBA2TXFhDsKDGDTlndYOZ6rGS0BRZIxGhM= +github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= +github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= +github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= +github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= +github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= +github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= +github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/hashicorp/go-version v1.7.0 h1:5tqGy27NaOTB8yJKUZELlFAS/LTKJkrmONwQKeRZfjY= +github.com/hashicorp/go-version v1.7.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= +github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= +github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= +github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= +github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/knadh/koanf/maps v0.1.2 h1:RBfmAW5CnZT+PJ1CVc1QSJKf4Xu9kxfQgYVQSu8hpbo= +github.com/knadh/koanf/maps v0.1.2/go.mod h1:npD/QZY3V6ghQDdcQzl1W4ICNVTkohC8E73eI2xW4yI= +github.com/knadh/koanf/providers/confmap v1.0.0 h1:mHKLJTE7iXEys6deO5p6olAiZdG5zwp8Aebir+/EaRE= +github.com/knadh/koanf/providers/confmap v1.0.0/go.mod h1:txHYHiI2hAtF0/0sCmcuol4IDcuQbKTybiB1nOcUo1A= +github.com/knadh/koanf/v2 v2.2.0 h1:FZFwd9bUjpb8DyCWARUBy5ovuhDs1lI87dOEn2K8UVU= +github.com/knadh/koanf/v2 v2.2.0/go.mod h1:PSFru3ufQgTsI7IF+95rf9s8XA1+aHxKuO/W+dPoHEY= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/mitchellh/copystructure v1.2.0 h1:vpKXTN4ewci03Vljg/q9QvCGUDttBOGBIa15WveJJGw= +github.com/mitchellh/copystructure v1.2.0/go.mod h1:qLl+cE2AmVv+CoeAwDPye/v+N2HKCj9FbZEVFJRxO9s= +github.com/mitchellh/reflectwalk v1.0.2 h1:G2LzWKi524PWgd3mLHV8Y5k7s6XUvT0Gef6zxSIeXaQ= +github.com/mitchellh/reflectwalk v1.0.2/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= +github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII= +github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= +github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA= +go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A= +go.opentelemetry.io/collector/component v1.31.0 h1:9LzU8X1RhV3h8/QsAoTX23aFUfoJ3EUc9O/vK+hFpSI= +go.opentelemetry.io/collector/component v1.31.0/go.mod h1:JbZl/KywXJxpUXPbt96qlEXJSym1zQ2hauMxYMuvlxM= +go.opentelemetry.io/collector/component/componentstatus v0.125.0 h1:zlxGQZYd9kknRZSjRpOYW5SBjl0a5zYFYRPbreobXoU= +go.opentelemetry.io/collector/component/componentstatus v0.125.0/go.mod h1:bHXc2W8bqqo9adOvCgvhcO7pYzJOSpyV4cuQ1wiIl04= +go.opentelemetry.io/collector/component/componenttest v0.125.0 h1:E2mpnMQbkMpYoZ3Q8pHx4kod7kedjwRs1xqDpzCe/84= +go.opentelemetry.io/collector/component/componenttest v0.125.0/go.mod h1:pQtsE1u/SPZdTphP5BZP64XbjXSq6wc+mDut5Ws/JDI= +go.opentelemetry.io/collector/confmap v1.31.0 h1:+AW5VJc1rCtgEyGd+1J5uSNw/kVZ98+lKO/pqXEwVvU= +go.opentelemetry.io/collector/confmap v1.31.0/go.mod h1:TdutQlIoHDPXcZ2xZ0QWGRkSFC8oTKO61zTx569dvrY= +go.opentelemetry.io/collector/consumer v1.31.0 h1:L+y66ywxLHnAxnUxv0JDwUf5bFj53kMxCCyEfRKlM7s= +go.opentelemetry.io/collector/consumer v1.31.0/go.mod h1:rPsqy5ni+c6xNMUkOChleZYO/nInVY6eaBNZ1FmWJVk= +go.opentelemetry.io/collector/consumer/consumertest v0.125.0 h1:TUkxomGS4DAtjBvcWQd2UY4FDLLEKMQD6iOIDUr/5dM= +go.opentelemetry.io/collector/consumer/consumertest v0.125.0/go.mod h1:vkHf3y85cFLDHARO/cTREVjLjOPAV+cQg7lkC44DWOY= +go.opentelemetry.io/collector/consumer/xconsumer v0.125.0 h1:oTreUlk1KpMSWwuHFnstW+orrjGTyvs2xd3o/Dpy+hI= +go.opentelemetry.io/collector/consumer/xconsumer v0.125.0/go.mod h1:FX0G37r0W+wXRgxxFtwEJ4rlsCB+p0cIaxtU3C4hskw= +go.opentelemetry.io/collector/featuregate v1.31.0 h1:20q7plPQZwmAiaYAa6l1m/i2qDITZuWlhjr4EkmeQls= +go.opentelemetry.io/collector/featuregate v1.31.0/go.mod h1:Y/KsHbvREENKvvN9RlpiWk/IGBK+CATBYzIIpU7nccc= +go.opentelemetry.io/collector/internal/telemetry v0.125.0 h1:6lcGOxw3dAg7LfXTKdN8ZjR+l7KvzLdEiPMhhLwG4r4= +go.opentelemetry.io/collector/internal/telemetry v0.125.0/go.mod h1:5GyFslLqjZgq1DZTtFiluxYhhXrCofHgOOOybodDPGE= +go.opentelemetry.io/collector/pdata v1.31.0 h1:P5WuLr1l2JcIvr6Dw2hl01ltp2ZafPnC4Isv+BLTBqU= +go.opentelemetry.io/collector/pdata v1.31.0/go.mod h1:m41io9nWpy7aCm/uD1L9QcKiZwOP0ldj83JEA34dmlk= +go.opentelemetry.io/collector/pdata/pprofile v0.125.0 h1:Qqlx8w1HpiYZ9RQqjmMQIysI0cHNO1nh3E/fCTeFysA= +go.opentelemetry.io/collector/pdata/pprofile v0.125.0/go.mod h1:p/yK023VxAp8hm27/1G5DPTcMIpnJy3cHGAFUQZGyaQ= +go.opentelemetry.io/collector/pdata/testdata v0.125.0 h1:due1Hl0EEVRVwfCkiamRy5E8lS6yalv0lo8Zl/SJtGw= +go.opentelemetry.io/collector/pdata/testdata v0.125.0/go.mod h1:1GpEWlgdMrd+fWsBk37ZC2YmOP5YU3gFQ4rWuCu9g24= +go.opentelemetry.io/collector/pipeline v0.125.0 h1:oitBgcAFqntDB4ihQJUHJSQ8IHqKFpPkaTVbTYdIUzM= +go.opentelemetry.io/collector/pipeline v0.125.0/go.mod h1:TO02zju/K6E+oFIOdi372Wk0MXd+Szy72zcTsFQwXl4= +go.opentelemetry.io/collector/processor v1.31.0 h1:+u7sBUpnCBsHYoALp4hfr9VEjLHHYa4uKENGITe0K9Q= +go.opentelemetry.io/collector/processor v1.31.0/go.mod h1:5hDYJ7/hTdfd2tF2Rj5Hs6+mfyFz2O7CaPzVvW1qHQc= +go.opentelemetry.io/collector/processor/processortest v0.125.0 h1:ZVAN4iZPDcWhpzKqnuok2NIuS5hwGVVQUOWkJFR12tA= +go.opentelemetry.io/collector/processor/processortest v0.125.0/go.mod h1:VAw0IRG35cWTBjBtreXeXJEgqkRegfjrH/EuLhNX2+I= +go.opentelemetry.io/collector/processor/xprocessor v0.125.0 h1:VWYPMW1VmDq6xB7M5SYjBpQCCIq3MhQ3W++wU47QpZM= +go.opentelemetry.io/collector/processor/xprocessor v0.125.0/go.mod h1:bCxUyFVlksANg8wjYZqWVsRB33lkLQ294rTrju/IZiM= +go.opentelemetry.io/contrib/bridges/otelzap v0.10.0 h1:ojdSRDvjrnm30beHOmwsSvLpoRF40MlwNCA+Oo93kXU= +go.opentelemetry.io/contrib/bridges/otelzap v0.10.0/go.mod h1:oTTm4g7NEtHSV2i/0FeVdPaPgUIZPfQkFbq0vbzqnv0= +go.opentelemetry.io/otel v1.35.0 h1:xKWKPxrxB6OtMCbmMY021CqC45J+3Onta9MqjhnusiQ= +go.opentelemetry.io/otel v1.35.0/go.mod h1:UEqy8Zp11hpkUrL73gSlELM0DupHoiq72dR+Zqel/+Y= +go.opentelemetry.io/otel/log v0.11.0 h1:c24Hrlk5WJ8JWcwbQxdBqxZdOK7PcP/LFtOtwpDTe3Y= +go.opentelemetry.io/otel/log v0.11.0/go.mod h1:U/sxQ83FPmT29trrifhQg+Zj2lo1/IPN1PF6RTFqdwc= +go.opentelemetry.io/otel/metric v1.35.0 h1:0znxYu2SNyuMSQT4Y9WDWej0VpcsxkuklLa4/siN90M= +go.opentelemetry.io/otel/metric v1.35.0/go.mod h1:nKVFgxBZ2fReX6IlyW28MgZojkoAkJGaE8CpgeAU3oE= +go.opentelemetry.io/otel/sdk v1.35.0 h1:iPctf8iprVySXSKJffSS79eOjl9pvxV9ZqOWT0QejKY= +go.opentelemetry.io/otel/sdk v1.35.0/go.mod h1:+ga1bZliga3DxJ3CQGg3updiaAJoNECOgJREo9KHGQg= +go.opentelemetry.io/otel/sdk/metric v1.35.0 h1:1RriWBmCKgkeHEhM7a2uMjMUfP7MsOF5JpUCaEqEI9o= +go.opentelemetry.io/otel/sdk/metric v1.35.0/go.mod h1:is6XYCUMpcKi+ZsOvfluY5YstFnhW0BidkR+gL+qN+w= +go.opentelemetry.io/otel/trace v1.35.0 h1:dPpEfJu1sDIqruz7BHFG3c7528f6ddfSWfFDVt/xgMs= +go.opentelemetry.io/otel/trace v1.35.0/go.mod h1:WUk7DtFp1Aw2MkvqGdwiXYDZZNvA/1J8o6xRXLrIkyc= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= +go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= +go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= +go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= +go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/net v0.39.0 h1:ZCu7HMWDxpXpaiKdhzIfaltL9Lp31x/3fCP11bc6/fY= +golang.org/x/net v0.39.0/go.mod h1:X7NRbYVEA+ewNkCNyJ513WmMdQ3BineSwVtN2zD/d+E= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.32.0 h1:s77OFDvIQeibCmezSnk/q6iAfkdiQaJi4VzroCFrN20= +golang.org/x/sys v0.32.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.24.0 h1:dd5Bzh4yt5KYA8f9CJHCP4FB4D51c2c6JvN37xJJkJ0= +golang.org/x/text v0.24.0/go.mod h1:L8rBsPeo2pSS+xqN0d5u2ikmjtmoJbDBT1b7nHvFCdU= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= +golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250218202821-56aae31c358a h1:51aaUVRocpvUOSQKM6Q7VuoaktNIaMCLuhZB6DKksq4= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250218202821-56aae31c358a/go.mod h1:uRxBH1mhmO8PGhU89cMcHaXKZqO+OfakD8QQO0oYwlQ= +google.golang.org/grpc v1.72.0 h1:S7UkcVa60b5AAQTaO6ZKamFp1zMZSU0fGDK2WZLbBnM= +google.golang.org/grpc v1.72.0/go.mod h1:wH5Aktxcg25y1I3w7H69nHfXdOG3UiadoBtjh3izSDM= +google.golang.org/protobuf v1.36.6 h1:z1NpPI8ku2WgiWnf+t9wTPsn6eP1L7ksHUlkfLvd9xY= +google.golang.org/protobuf v1.36.6/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +sigs.k8s.io/yaml v1.4.0 h1:Mk1wCc2gy/F0THH0TAp1QYyJNzRm2KCLy3o5ASXVI5E= +sigs.k8s.io/yaml v1.4.0/go.mod h1:Ejl7/uTz7PSA4eKMyQCUTnhZYNmLIl+5c2lQPGR2BPY= diff --git a/internals/traceIdTtlMap/traceIdTtlMap.go b/internals/traceIdTtlMap/traceIdTtlMap.go new file mode 100644 index 0000000..565b9d4 --- /dev/null +++ b/internals/traceIdTtlMap/traceIdTtlMap.go @@ -0,0 +1,141 @@ +package traceIdTtlMap + +import ( + "sync" + "time" +) + +type TTLMap struct { + m map[string]int64 + mu sync.RWMutex + maxTtl int64 + stopCh chan struct{} + stopOnce sync.Once +} + +/** + * Creates a new TTLMap with the given maximum TTL. + * The TTLMap will automatically remove expired trace ids from the map, every second. + * Map is thread-safe. + */ +func New(initSize int, maxTtl int) (m *TTLMap) { + m = &TTLMap{ + m: make(map[string]int64, initSize), + maxTtl: int64(maxTtl), + stopCh: make(chan struct{}), + } + + go func() { + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + var expiredKeys []string + now := time.Now().Unix() + + m.mu.RLock() + for key, value := range m.m { + if value < now { + expiredKeys = append(expiredKeys, key) + } + } + m.mu.RUnlock() + + for _, key := range expiredKeys { + m.mu.Lock() + delete(m.m, key) + m.mu.Unlock() + } + case <-m.stopCh: + return + } + } + }() + + return +} + +/** + * Stops all go routines. + * Should be called when the TTLMap is no longer needed. + */ +func (m *TTLMap) Stop() { + m.stopOnce.Do(func() { + close(m.stopCh) + }) +} + +/** + * Adds a trace id to the map. + * When providing the same trace id twice, the second invocation will be ignored. + */ +func (m *TTLMap) Add(key string) { + m.mu.Lock() + defer m.mu.Unlock() + + _, ok := m.m[key] + if ok { + return + } + + m.m[key] = time.Now().Unix() + m.maxTtl +} + +/** + * Checks if a trace id exists in the map. + * Returns true if the trace id exists and has not expired. + * Returns false if the trace id does not exist or has expired. + * Removes the trace id from the map if it has expired. + */ +func (m *TTLMap) Exists(key string) bool { + m.mu.RLock() + value, ok := m.m[key] + m.mu.RUnlock() + + if ok { + if value < time.Now().Unix() { + m.mu.Lock() + delete(m.m, key) + m.mu.Unlock() + return false + } + } + + return ok +} + +/** + * Inserts a new entry into the map. + * Only used for testing. + */ +func (m *TTLMap) insertEntry(key string, value int64) { + m.mu.Lock() + m.m[key] = value + m.mu.Unlock() +} + +/** + * Gets an entry from the map. + * Only used for testing. + */ +func (m *TTLMap) getEntry(key string) (int64, bool) { + m.mu.RLock() + value, ok := m.m[key] + m.mu.RUnlock() + + return value, ok +} + +/** + * Gets the size of the map. + * Only used for testing. + */ +func (m *TTLMap) getSize() int { + m.mu.RLock() + size := len(m.m) + m.mu.RUnlock() + + return size +} diff --git a/internals/traceIdTtlMap/traceIdTtlMap_benchmark_test.go b/internals/traceIdTtlMap/traceIdTtlMap_benchmark_test.go new file mode 100644 index 0000000..dcc21f2 --- /dev/null +++ b/internals/traceIdTtlMap/traceIdTtlMap_benchmark_test.go @@ -0,0 +1,43 @@ +package traceIdTtlMap + +import ( + "strconv" + "testing" +) + +func BenchmarkTTLMap_AddExists(b *testing.B) { + m := New(1000, 10) + defer m.Stop() + + b.Run("Add", func(b *testing.B) { + for i := 0; b.Loop(); i++ { + m.Add(strconv.Itoa(i)) + } + }) + + for i := range 1000 { + m.Add(strconv.Itoa(i)) + } + + b.Run("Exists", func(b *testing.B) { + for i := 0; i < b.N; i++ { + key := strconv.Itoa(i % 1000) + m.Exists(key) + } + }) +} + +func BenchmarkTTLMap_Concurrent(b *testing.B) { + m := New(1000, 10) + defer m.Stop() + + b.RunParallel(func(pb *testing.PB) { + i := 0 + for pb.Next() { + key := strconv.Itoa(i % 1000) + m.Add(key) + m.Exists(key) + i++ + } + }) +} diff --git a/internals/traceIdTtlMap/traceIdTtlMap_test.go b/internals/traceIdTtlMap/traceIdTtlMap_test.go new file mode 100644 index 0000000..40a0853 --- /dev/null +++ b/internals/traceIdTtlMap/traceIdTtlMap_test.go @@ -0,0 +1,113 @@ +package traceIdTtlMap + +import ( + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestNew(t *testing.T) { + m := New(10, 10) + defer m.Stop() + + assert.Equal(t, 0, m.getSize()) +} + +func TestNew_Cleanup(t *testing.T) { + m := New(10, 10) + defer m.Stop() + + m.insertEntry("4bf92f3577b34da6a3ce929d0e0e4736", time.Now().Unix()-10) + assert.Equal(t, 1, m.getSize()) + + // Inserted entry should be deleted after >1 second + time.Sleep(time.Second * 2) + assert.Equal(t, 0, m.getSize()) +} + +func TestAdd(t *testing.T) { + m := New(10, 10) + defer m.Stop() + + m.Add("4bf92f3577b34da6a3ce929d0e0e4736") + // Intentionally adding the same trace id twice, should not add it again + m.Add("4bf92f3577b34da6a3ce929d0e0e4736") + m.Add("d0240fe9f68b48e687d25c185d4c17c5") + + assert.Equal(t, 2, m.getSize()) + + _, ok := m.getEntry("4bf92f3577b34da6a3ce929d0e0e4736") + assert.True(t, ok) + _, ok = m.getEntry("d0240fe9f68b48e687d25c185d4c17c5") + assert.True(t, ok) +} + +func TestAdd_ResetTtl(t *testing.T) { + m := New(10, 10) + defer m.Stop() + + m.Add("4bf92f3577b34da6a3ce929d0e0e4736") + insertTime, _ := m.getEntry("4bf92f3577b34da6a3ce929d0e0e4736") + + time.Sleep(time.Second) + + m.Add("4bf92f3577b34da6a3ce929d0e0e4736") + updatedTime, _ := m.getEntry("4bf92f3577b34da6a3ce929d0e0e4736") + + // Delete time of the second entry should remain the same. + assert.Equal(t, updatedTime, insertTime) +} + +func TestExists(t *testing.T) { + m := New(10, 10) + defer m.Stop() + + m.insertEntry("4bf92f3577b34da6a3ce929d0e0e4736", time.Now().Unix()+10) + + // Existing and valid trace + assert.True(t, m.Exists("4bf92f3577b34da6a3ce929d0e0e4736")) + // Non existing trace + assert.False(t, m.Exists("d0240fe9f68b48e687d25c185d4c17c5")) +} + +func TestExists_ExpiredTrace(t *testing.T) { + m := New(10, 10) + defer m.Stop() + + m.insertEntry("4bf92f3577b34da6a3ce929d0e0e4736", time.Now().Unix()-10) + + // Existing and but invalid trace + assert.False(t, m.Exists("4bf92f3577b34da6a3ce929d0e0e4736")) +} + +func TestAddExists_Concurrent(t *testing.T) { + m := New(10, 10) + defer m.Stop() + + var wg sync.WaitGroup + keys := []string{"k1", "k2", "k3", "k4", "k5", "k6", "k7", "k8", "k9", "k10"} + + for i := range 100 { + wg.Add(2) + + go func(i int) { + defer wg.Done() + m.Add(keys[i%len(keys)]) + }(i) + + go func(i int) { + defer wg.Done() + _ = m.Exists(keys[i%len(keys)]) + }(i) + } + + wg.Wait() +} + +func TestStop(t *testing.T) { + m := New(10, 10) + m.Stop() + m.Stop() +} diff --git a/processor.go b/processor.go new file mode 100644 index 0000000..08dbf5c --- /dev/null +++ b/processor.go @@ -0,0 +1,144 @@ +package tracebasedlogsampler + +import ( + "context" + "encoding/hex" + "sync" + "time" + + traceIdTtlMap "gitea.t000-n.de/t.behrendt/tracebasedlogsampler/internals/traceIdTtlMap" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/ptrace" + "go.uber.org/zap" +) + +type LogSamplerProcessor struct { + host component.Host + cancel context.CancelFunc + logger *zap.Logger + nextTracesConsumer consumer.Traces + nextLogsConsumer consumer.Logs + config *Config + + traceIdTtlMap *traceIdTtlMap.TTLMap +} + +var logSampleProcessorLock = sync.Mutex{} +var logSampleProcessor *LogSamplerProcessor + +/** + * Creates a new LogSamplerProcessor as a singleton. + */ +func NewLogSamplerProcessorSingleton(logger *zap.Logger, nextTracesConsumer consumer.Traces, nextLogsConsumer consumer.Logs, cfg *Config) *LogSamplerProcessor { + maxTraceTtl, _ := time.ParseDuration(cfg.BufferDurationTraces) + + logSampleProcessorLock.Lock() + defer logSampleProcessorLock.Unlock() + + if logSampleProcessor != nil { + if nextTracesConsumer != nil { + logSampleProcessor.nextTracesConsumer = nextTracesConsumer + } + + if nextLogsConsumer != nil { + logSampleProcessor.nextLogsConsumer = nextLogsConsumer + } + + } else { + logSampleProcessor = &LogSamplerProcessor{ + logger: logger, + nextTracesConsumer: nextTracesConsumer, + nextLogsConsumer: nextLogsConsumer, + config: cfg, + // TODO: Pass size from config as well. + traceIdTtlMap: traceIdTtlMap.New(1000, int(maxTraceTtl.Seconds())), + } + } + + return logSampleProcessor +} + +func (tp *LogSamplerProcessor) Start(ctx context.Context, host component.Host) error { + tp.host = host + ctx = context.Background() + _, tp.cancel = context.WithCancel(ctx) + + tp.logger.Debug("LogSamplerProcessor started") + + return nil +} + +func (tp *LogSamplerProcessor) Shutdown(ctx context.Context) error { + if tp.cancel != nil { + tp.cancel() + } + + tp.traceIdTtlMap.Stop() + + tp.logger.Debug("LogSamplerProcessor stopped") + + return nil +} + +func (tp *LogSamplerProcessor) Capabilities() consumer.Capabilities { + return consumer.Capabilities{} +} + +/** + * For each trace, record the trace id in a buffer. + */ +func (tp *LogSamplerProcessor) ConsumeTraces(ctx context.Context, td ptrace.Traces) error { + traceId := td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).TraceID() + + if !traceId.IsEmpty() { + tp.traceIdTtlMap.Add(hex.EncodeToString(traceId[:])) + + tp.logger.Debug("Trace added to buffer", zap.String("traceId", hex.EncodeToString(traceId[:]))) + } + + return tp.nextTracesConsumer.ConsumeTraces(ctx, td) +} + +/** + * Upon receiving a log, check if the log's trace id matches any trace id in the buffer. + * If it doesn't, put the log in the buffer for the configured amount of time. + * If it does, forward the log. + * + * After the buffer expires, check again if the log's trace id matches any trace id in the buffer. + * If it does, forward the log. + * If not, discard the log. + */ +func (tp *LogSamplerProcessor) ConsumeLogs(ctx context.Context, log plog.Logs) error { + traceId := log.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).TraceID() + + if !traceId.IsEmpty() { + exists := tp.traceIdTtlMap.Exists(hex.EncodeToString(traceId[:])) + if exists { + tp.logger.Debug("Log forwarded directly", zap.String("traceId", hex.EncodeToString(traceId[:]))) + return tp.nextLogsConsumer.ConsumeLogs(ctx, log) + } + + go func(ctx context.Context, log plog.Logs) { + tp.logger.Debug("Log added to buffer", zap.String("traceId", hex.EncodeToString(traceId[:]))) + + // TODO: Find a better place for the parsed config, instead of using the non-parsed strings. + duration, _ := time.ParseDuration(tp.config.BufferDurationLogs) + time.Sleep(duration) + + exists := tp.traceIdTtlMap.Exists(hex.EncodeToString(traceId[:])) + if exists { + tp.logger.Debug("Log forwarded after buffer expiration", zap.String("traceId", hex.EncodeToString(traceId[:]))) + tp.nextLogsConsumer.ConsumeLogs(ctx, log) + } else { + tp.logger.Debug("Log discarded", zap.String("traceId", hex.EncodeToString(traceId[:]))) + } + }(ctx, log) + } else { + tp.logger.Warn("Log has no trace id", zap.Any("log", log)) + } + + return nil +} diff --git a/renovate.json b/renovate.json new file mode 100644 index 0000000..d73c1cb --- /dev/null +++ b/renovate.json @@ -0,0 +1,9 @@ +{ + "$schema": "https://docs.renovatebot.com/renovate-schema.json", + "packageRules": [ + { + "matchUpdateTypes": ["patch", "minor"], + "groupName": "devDependencies (non-major)" + } + ] +} diff --git a/testdata/sample_config.yaml b/testdata/sample_config.yaml new file mode 100644 index 0000000..9e18d34 --- /dev/null +++ b/testdata/sample_config.yaml @@ -0,0 +1,3 @@ +tracebasedlogsampler: + buffer_duration_traces: 180s + buffer_duration_logs: 90s