|
// Copyright The OpenTelemetry Authors |
|
// SPDX-License-Identifier: Apache-2.0 |
|
|
|
package transformprocessor |
|
|
|
import ( |
|
"fmt" |
|
"strings" |
|
"testing" |
|
|
|
"go.opentelemetry.io/collector/consumer/consumertest" |
|
"go.opentelemetry.io/collector/pdata/pcommon" |
|
"go.opentelemetry.io/collector/pdata/plog" |
|
"go.opentelemetry.io/collector/processor" |
|
"go.opentelemetry.io/collector/processor/processortest" |
|
|
|
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common" |
|
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/metadata" |
|
) |
|
|
|
const ( |
|
benchPoolSize = 32 |
|
benchStmtCount = 15 |
|
) |
|
|
|
func createLogsProcessor(b *testing.B, stmts []string) processor.Logs { |
|
b.Helper() |
|
factory := NewFactory() |
|
cfg := factory.CreateDefaultConfig() |
|
oCfg := cfg.(*Config) |
|
oCfg.LogStatements = []common.ContextStatements{ |
|
{Context: "log", Statements: stmts}, |
|
} |
|
sink := new(consumertest.LogsSink) |
|
p, err := factory.CreateLogs(b.Context(), processortest.NewNopSettings(metadata.Type), oCfg, sink) |
|
if err != nil { |
|
b.Fatalf("failed to create processor: %v", err) |
|
} |
|
return p |
|
} |
|
|
|
// BenchmarkAttributesVsBodyMap runs 15 fixed transform statements through |
|
// the transform processor factory, varying two dimensions: |
|
// |
|
// - Map size: 10, 100, 1000 keys in the target map |
|
// - Nesting depth: 1 (flat), 5, and 10 levels deep |
|
// |
|
// Each combination is tested twice: once with data in log record |
|
// attributes and once with data in the body map. |
|
func BenchmarkAttributesVsBodyMap(b *testing.B) { |
|
mapSizes := []int{10, 100, 1000} |
|
nestDepths := []int{1, 5, 10} |
|
|
|
for _, size := range mapSizes { |
|
for _, depth := range nestDepths { |
|
attrProc := createLogsProcessor(b, buildAttrStmts(depth)) |
|
bodyProc := createLogsProcessor(b, buildBodyStmts(depth)) |
|
|
|
ctx := b.Context() |
|
|
|
b.Run(fmt.Sprintf("attributes/keys=%d/depth=%d", size, depth), func(b *testing.B) { |
|
pool := make([]plog.Logs, benchPoolSize) |
|
for i := range pool { |
|
pool[i] = buildLogs(size, depth, false) |
|
} |
|
b.ReportAllocs() |
|
b.ResetTimer() |
|
for i := 0; b.Loop(); i++ { |
|
if err := attrProc.ConsumeLogs(ctx, pool[i%benchPoolSize]); err != nil { |
|
b.Fatalf("ConsumeLogs failed: %v", err) |
|
} |
|
} |
|
}) |
|
|
|
b.Run(fmt.Sprintf("body_map/keys=%d/depth=%d", size, depth), func(b *testing.B) { |
|
pool := make([]plog.Logs, benchPoolSize) |
|
for i := range pool { |
|
pool[i] = buildLogs(size, depth, true) |
|
} |
|
b.ReportAllocs() |
|
b.ResetTimer() |
|
for i := 0; b.Loop(); i++ { |
|
if err := bodyProc.ConsumeLogs(ctx, pool[i%benchPoolSize]); err != nil { |
|
b.Fatalf("ConsumeLogs failed: %v", err) |
|
} |
|
} |
|
}) |
|
} |
|
} |
|
} |
|
|
|
// BenchmarkMaxBodyOverhead is designed to maximize the observable overhead |
|
// of body map access vs attributes by: |
|
// |
|
// - Using a tiny map (no padding) so map scan time is negligible |
|
// and the per-access type-switch overhead is proportionally largest. |
|
// - Using 50 simple set() statements to maximize the number of |
|
// body.Type() checks per iteration. |
|
// - Using replace_all_matches on the whole map, which exercises the |
|
// whole-field getter (GetValue type-switch for body) and whole-field |
|
// setter (SetValue 14-case switch for body vs SetMap 2-case switch |
|
// for attributes). |
|
func BenchmarkMaxBodyOverhead(b *testing.B) { |
|
ctx := b.Context() |
|
|
|
// --- 50 simple set() statements: maximum type-check ratio --- |
|
const manyStmts = 50 |
|
attrSimpleStmts := make([]string, manyStmts) |
|
bodySimpleStmts := make([]string, manyStmts) |
|
for i := range manyStmts { |
|
attrSimpleStmts[i] = fmt.Sprintf(`set(attributes["dst_%[1]d"], attributes["src_%[1]d"])`, i) |
|
bodySimpleStmts[i] = fmt.Sprintf(`set(body["dst_%[1]d"], body["src_%[1]d"])`, i) |
|
} |
|
|
|
attrSimpleProc := createLogsProcessor(b, attrSimpleStmts) |
|
bodySimpleProc := createLogsProcessor(b, bodySimpleStmts) |
|
|
|
b.Run("simple_set_x50/attributes", func(b *testing.B) { |
|
pool := make([]plog.Logs, benchPoolSize) |
|
for i := range pool { |
|
pool[i] = buildMinimalLogs(manyStmts, false) |
|
} |
|
b.ReportAllocs() |
|
b.ResetTimer() |
|
for i := 0; b.Loop(); i++ { |
|
if err := attrSimpleProc.ConsumeLogs(ctx, pool[i%benchPoolSize]); err != nil { |
|
b.Fatal(err) |
|
} |
|
} |
|
}) |
|
b.Run("simple_set_x50/body_map", func(b *testing.B) { |
|
pool := make([]plog.Logs, benchPoolSize) |
|
for i := range pool { |
|
pool[i] = buildMinimalLogs(manyStmts, true) |
|
} |
|
b.ReportAllocs() |
|
b.ResetTimer() |
|
for i := 0; b.Loop(); i++ { |
|
if err := bodySimpleProc.ConsumeLogs(ctx, pool[i%benchPoolSize]); err != nil { |
|
b.Fatal(err) |
|
} |
|
} |
|
}) |
|
|
|
// --- replace_all_matches on whole map --- |
|
for _, mapSize := range []int{10, 100, 1000} { |
|
attrReplaceStmts := make([]string, 15) |
|
bodyReplaceStmts := make([]string, 15) |
|
for i := range 15 { |
|
attrReplaceStmts[i] = fmt.Sprintf(`replace_all_matches(attributes, "val_%d_*", "replaced_%d")`, i, i) |
|
bodyReplaceStmts[i] = fmt.Sprintf(`replace_all_matches(body, "val_%d_*", "replaced_%d")`, i, i) |
|
} |
|
|
|
attrReplaceProc := createLogsProcessor(b, attrReplaceStmts) |
|
bodyReplaceProc := createLogsProcessor(b, bodyReplaceStmts) |
|
|
|
b.Run(fmt.Sprintf("replace_all_matches_x15/keys=%d/attributes", mapSize), func(b *testing.B) { |
|
pool := make([]plog.Logs, benchPoolSize) |
|
for i := range pool { |
|
pool[i] = buildReplaceMatchLogs(mapSize, false) |
|
} |
|
b.ReportAllocs() |
|
b.ResetTimer() |
|
for i := 0; b.Loop(); i++ { |
|
if err := attrReplaceProc.ConsumeLogs(ctx, pool[i%benchPoolSize]); err != nil { |
|
b.Fatal(err) |
|
} |
|
} |
|
}) |
|
b.Run(fmt.Sprintf("replace_all_matches_x15/keys=%d/body_map", mapSize), func(b *testing.B) { |
|
pool := make([]plog.Logs, benchPoolSize) |
|
for i := range pool { |
|
pool[i] = buildReplaceMatchLogs(mapSize, true) |
|
} |
|
b.ReportAllocs() |
|
b.ResetTimer() |
|
for i := 0; b.Loop(); i++ { |
|
if err := bodyReplaceProc.ConsumeLogs(ctx, pool[i%benchPoolSize]); err != nil { |
|
b.Fatal(err) |
|
} |
|
} |
|
}) |
|
} |
|
} |
|
|
|
// --------------------------------------------------------------------------- |
|
// Statement builders |
|
// --------------------------------------------------------------------------- |
|
|
|
func nestedPath(root string, depth int, leaf string) string { |
|
var sb strings.Builder |
|
sb.WriteString(root) |
|
for i := range depth - 1 { |
|
sb.WriteString(fmt.Sprintf(`["n_%d"]`, i)) |
|
} |
|
sb.WriteString(fmt.Sprintf(`["%s"]`, leaf)) |
|
return sb.String() |
|
} |
|
|
|
func buildAttrStmts(depth int) []string { |
|
stmts := make([]string, 0, benchStmtCount) |
|
for i := range benchStmtCount { |
|
src := nestedPath("attributes", depth, fmt.Sprintf("src_%d", i)) |
|
dst := nestedPath("attributes", depth, fmt.Sprintf("dst_%d", i)) |
|
switch i % 5 { |
|
case 0: |
|
stmts = append(stmts, fmt.Sprintf(`set(%s, %s)`, dst, src)) |
|
case 1: |
|
stmts = append(stmts, fmt.Sprintf(`set(%s, ToLowerCase(%s))`, dst, src)) |
|
case 2: |
|
stmts = append(stmts, fmt.Sprintf(`set(%s, Len(%s))`, dst, src)) |
|
case 3: |
|
stmts = append(stmts, fmt.Sprintf(`set(%s, %s) where severity_number >= 9`, dst, src)) |
|
default: |
|
stmts = append(stmts, fmt.Sprintf(`set(%s, Concat([%s, "suffix"], "-"))`, dst, src)) |
|
} |
|
} |
|
return stmts |
|
} |
|
|
|
func buildBodyStmts(depth int) []string { |
|
stmts := make([]string, 0, benchStmtCount) |
|
for i := range benchStmtCount { |
|
src := nestedPath("body", depth, fmt.Sprintf("src_%d", i)) |
|
dst := nestedPath("body", depth, fmt.Sprintf("dst_%d", i)) |
|
switch i % 5 { |
|
case 0: |
|
stmts = append(stmts, fmt.Sprintf(`set(%s, %s)`, dst, src)) |
|
case 1: |
|
stmts = append(stmts, fmt.Sprintf(`set(%s, ToLowerCase(%s))`, dst, src)) |
|
case 2: |
|
stmts = append(stmts, fmt.Sprintf(`set(%s, Len(%s))`, dst, src)) |
|
case 3: |
|
stmts = append(stmts, fmt.Sprintf(`set(%s, %s) where severity_number >= 9`, dst, src)) |
|
default: |
|
stmts = append(stmts, fmt.Sprintf(`set(%s, Concat([%s, "suffix"], "-"))`, dst, src)) |
|
} |
|
} |
|
return stmts |
|
} |
|
|
|
// --------------------------------------------------------------------------- |
|
// Log data builders |
|
// --------------------------------------------------------------------------- |
|
|
|
func buildLogs(mapSize, depth int, useBody bool) plog.Logs { |
|
logs := plog.NewLogs() |
|
rl := logs.ResourceLogs().AppendEmpty() |
|
populateBenchResource(rl.Resource()) |
|
sl := rl.ScopeLogs().AppendEmpty() |
|
sl.Scope().SetName("bench-scope") |
|
lr := sl.LogRecords().AppendEmpty() |
|
populateBenchLogRecord(lr) |
|
|
|
var target pcommon.Map |
|
if useBody { |
|
target = lr.Body().SetEmptyMap() |
|
} else { |
|
lr.Body().SetStr("plain log body") |
|
target = lr.Attributes() |
|
} |
|
|
|
for i := range mapSize { |
|
target.PutStr(fmt.Sprintf("pad_%d", i), fmt.Sprintf("padding_value_%d", i)) |
|
} |
|
|
|
leaf := target |
|
for i := range depth - 1 { |
|
leaf = leaf.PutEmptyMap(fmt.Sprintf("n_%d", i)) |
|
} |
|
for i := range benchStmtCount { |
|
leaf.PutStr(fmt.Sprintf("src_%d", i), fmt.Sprintf("Value_%d_realistic_data", i)) |
|
} |
|
|
|
return logs |
|
} |
|
|
|
func buildMinimalLogs(srcCount int, useBody bool) plog.Logs { |
|
logs := plog.NewLogs() |
|
rl := logs.ResourceLogs().AppendEmpty() |
|
populateBenchResource(rl.Resource()) |
|
sl := rl.ScopeLogs().AppendEmpty() |
|
lr := sl.LogRecords().AppendEmpty() |
|
populateBenchLogRecord(lr) |
|
|
|
var target pcommon.Map |
|
if useBody { |
|
target = lr.Body().SetEmptyMap() |
|
} else { |
|
lr.Body().SetStr("plain") |
|
target = lr.Attributes() |
|
} |
|
for i := range srcCount { |
|
target.PutStr(fmt.Sprintf("src_%d", i), fmt.Sprintf("Value_%d", i)) |
|
} |
|
return logs |
|
} |
|
|
|
func buildReplaceMatchLogs(mapSize int, useBody bool) plog.Logs { |
|
logs := plog.NewLogs() |
|
rl := logs.ResourceLogs().AppendEmpty() |
|
populateBenchResource(rl.Resource()) |
|
sl := rl.ScopeLogs().AppendEmpty() |
|
lr := sl.LogRecords().AppendEmpty() |
|
populateBenchLogRecord(lr) |
|
|
|
var target pcommon.Map |
|
if useBody { |
|
target = lr.Body().SetEmptyMap() |
|
} else { |
|
lr.Body().SetStr("plain") |
|
target = lr.Attributes() |
|
} |
|
for i := range mapSize { |
|
target.PutStr(fmt.Sprintf("key_%d", i), fmt.Sprintf("val_%d_data_%d", i%15, i)) |
|
} |
|
return logs |
|
} |
|
|
|
func populateBenchResource(r pcommon.Resource) { |
|
a := r.Attributes() |
|
a.PutStr("service.name", "checkout-service") |
|
a.PutStr("service.version", "2.14.3") |
|
a.PutStr("service.namespace", "ecommerce") |
|
a.PutStr("deployment.environment", "production") |
|
a.PutStr("cloud.provider", "aws") |
|
a.PutStr("cloud.region", "us-east-1") |
|
a.PutStr("cloud.availability_zone", "us-east-1b") |
|
a.PutStr("host.name", "ip-10-0-42-117.ec2.internal") |
|
a.PutStr("host.id", "i-0abcdef1234567890") |
|
a.PutStr("host.type", "m5.xlarge") |
|
a.PutStr("container.id", "a1b2c3d4e5f6") |
|
a.PutStr("k8s.namespace.name", "checkout") |
|
a.PutStr("k8s.pod.name", "checkout-service-7b9f4c6d8-x2k9m") |
|
a.PutStr("k8s.node.name", "ip-10-0-42-117") |
|
} |
|
|
|
func populateBenchLogRecord(lr plog.LogRecord) { |
|
lr.SetTimestamp(pcommon.Timestamp(1710000000000000000)) |
|
lr.SetObservedTimestamp(pcommon.Timestamp(1710000000100000000)) |
|
lr.SetSeverityNumber(plog.SeverityNumberError) |
|
lr.SetSeverityText("ERROR") |
|
lr.SetTraceID(pcommon.TraceID{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}) |
|
lr.SetSpanID(pcommon.SpanID{1, 2, 3, 4, 5, 6, 7, 8}) |
|
} |