Skip to content

Commit 71c3364

Browse files
committed
internalstorage: add trace
Signed-off-by: scyda <[email protected]>
1 parent e343a04 commit 71c3364

File tree

5 files changed

+209
-4
lines changed

5 files changed

+209
-4
lines changed

go.mod

+2-2
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ require (
1919
github.com/spf13/cobra v1.8.1
2020
github.com/spf13/pflag v1.0.5
2121
github.com/stretchr/testify v1.9.0
22+
go.opentelemetry.io/otel v1.28.0
23+
go.opentelemetry.io/otel/trace v1.28.0
2224
go.uber.org/atomic v1.10.0
2325
gopkg.in/natefinch/lumberjack.v2 v2.2.1
2426
gorm.io/datatypes v1.0.7
@@ -126,12 +128,10 @@ require (
126128
go.etcd.io/etcd/client/v3 v3.5.14 // indirect
127129
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.53.0 // indirect
128130
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.53.0 // indirect
129-
go.opentelemetry.io/otel v1.28.0 // indirect
130131
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.28.0 // indirect
131132
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.27.0 // indirect
132133
go.opentelemetry.io/otel/metric v1.28.0 // indirect
133134
go.opentelemetry.io/otel/sdk v1.28.0 // indirect
134-
go.opentelemetry.io/otel/trace v1.28.0 // indirect
135135
go.opentelemetry.io/proto/otlp v1.3.1 // indirect
136136
go.uber.org/multierr v1.11.0 // indirect
137137
go.uber.org/zap v1.26.0 // indirect

pkg/storage/internalstorage/collectionresource_storage.go

+7
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,14 @@ import (
66
"net/url"
77
"strconv"
88
"strings"
9+
"time"
910

11+
"go.opentelemetry.io/otel/attribute"
1012
"gorm.io/gorm"
1113
apierrors "k8s.io/apimachinery/pkg/api/errors"
1214
"k8s.io/apimachinery/pkg/runtime"
1315
"k8s.io/apimachinery/pkg/runtime/schema"
16+
"k8s.io/component-base/tracing"
1417

1518
internal "github.com/clusterpedia-io/api/clusterpedia"
1619
"github.com/clusterpedia-io/clusterpedia/pkg/storage"
@@ -98,6 +101,9 @@ func (s *CollectionResourceStorage) query(ctx context.Context, opts *internal.Li
98101
}
99102

100103
func (s *CollectionResourceStorage) Get(ctx context.Context, opts *internal.ListOptions) (*internal.CollectionResource, error) {
104+
ctx, span := tracing.Start(ctx, "GetCollectionResource from internalstorage")
105+
defer span.End(500 * time.Millisecond)
106+
101107
query, list, err := s.query(ctx, opts)
102108
if err != nil {
103109
return nil, err
@@ -117,6 +123,7 @@ func (s *CollectionResourceStorage) Get(ctx context.Context, opts *internal.List
117123
Items: make([]runtime.Object, 0, len(items)),
118124
}
119125

126+
span.AddEvent("About to convert objects", attribute.Int("count", len(items)))
120127
gvrs := make(map[schema.GroupVersionResource]struct{})
121128
for _, resource := range items {
122129
obj, err := resource.ConvertToUnstructured()

pkg/storage/internalstorage/register.go

+4
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,10 @@ func NewStorageFactory(configPath string) (storage.StorageFactory, error) {
8888
}
8989
}
9090

91+
if err := db.Use(NewGormTrace(false)); err != nil {
92+
return nil, err
93+
}
94+
9195
sqlDB, err := db.DB()
9296
if err != nil {
9397
return nil, err

pkg/storage/internalstorage/resource_storage.go

+20-2
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@ import (
77
"fmt"
88
"reflect"
99
"strconv"
10+
"time"
1011

12+
"go.opentelemetry.io/otel/attribute"
1113
"gorm.io/datatypes"
1214
"gorm.io/gorm"
1315
apierrors "k8s.io/apimachinery/pkg/api/errors"
@@ -21,6 +23,7 @@ import (
2123
"k8s.io/apimachinery/pkg/watch"
2224
genericstorage "k8s.io/apiserver/pkg/storage"
2325
"k8s.io/client-go/tools/cache"
26+
"k8s.io/component-base/tracing"
2427

2528
internal "github.com/clusterpedia-io/api/clusterpedia"
2629
"github.com/clusterpedia-io/clusterpedia/pkg/storage"
@@ -172,11 +175,17 @@ func (s *ResourceStorage) genGetObjectQuery(ctx context.Context, cluster, namesp
172175
}
173176

174177
func (s *ResourceStorage) Get(ctx context.Context, cluster, namespace, name string, into runtime.Object) error {
178+
ctx, span := tracing.Start(ctx, "Get from internalstorage",
179+
attribute.String("storage resource", s.config.StorageResource.String()),
180+
attribute.String("target type", fmt.Sprintf("%T", into)),
181+
)
182+
175183
var objects [][]byte
176184
if result := s.genGetObjectQuery(ctx, cluster, namespace, name).First(&objects); result.Error != nil {
177185
return InterpretResourceDBError(cluster, namespace+"/"+name, result.Error)
178186
}
179187

188+
span.AddEvent("About to decode object")
180189
obj, _, err := s.config.Codec.Decode(objects[0], nil, into)
181190
if err != nil {
182191
return err
@@ -193,12 +202,19 @@ func (s *ResourceStorage) genListObjectsQuery(ctx context.Context, opts *interna
193202
result = &ResourceMetadataList{}
194203
}
195204

196-
query := s.db.WithContext(ctx).Model(&Resource{}).Where(s.gvrKeyMap())
197-
offset, amount, query, err := applyListOptionsToResourceQuery(s.db, query, opts)
205+
db := s.db.WithContext(ctx)
206+
query := db.Model(&Resource{}).Where(s.gvrKeyMap())
207+
offset, amount, query, err := applyListOptionsToResourceQuery(db, query, opts)
198208
return offset, amount, query, result, err
199209
}
200210

201211
func (s *ResourceStorage) List(ctx context.Context, listObject runtime.Object, opts *internal.ListOptions) error {
212+
ctx, span := tracing.Start(ctx, "List from internalstorage",
213+
attribute.String("storage resource", s.config.StorageResource.String()),
214+
attribute.String("target type", fmt.Sprintf("%T", listObject)),
215+
)
216+
defer span.End(500 * time.Millisecond)
217+
202218
offset, amount, query, result, err := s.genListObjectsQuery(ctx, opts)
203219
if err != nil {
204220
return err
@@ -231,6 +247,8 @@ func (s *ResourceStorage) List(ctx context.Context, listObject runtime.Object, o
231247
return nil
232248
}
233249

250+
span.AddEvent("About to convert objects", attribute.Int("count", len(objects)))
251+
234252
if unstructuredList, ok := listObject.(*unstructured.UnstructuredList); ok {
235253
unstructuredList.Items = make([]unstructured.Unstructured, 0, len(objects))
236254
for _, object := range objects {
+176
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
package internalstorage
2+
3+
/*
4+
Ref from https://github.com/go-gorm/opentelemetry/tree/master/[email protected]
5+
*/
6+
7+
import (
8+
"database/sql"
9+
"database/sql/driver"
10+
"fmt"
11+
"io"
12+
"regexp"
13+
"strings"
14+
15+
"go.opentelemetry.io/otel/attribute"
16+
semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
17+
"go.opentelemetry.io/otel/trace"
18+
"gorm.io/gorm"
19+
)
20+
21+
var (
22+
firstWordRegex = regexp.MustCompile(`^\w+`)
23+
cCommentRegex = regexp.MustCompile(`(?is)/\*.*?\*/`)
24+
lineCommentRegex = regexp.MustCompile(`(?im)(?:--|#).*?$`)
25+
sqlPrefixRegex = regexp.MustCompile(`^[\s;]*`)
26+
27+
dbRowsAffected = attribute.Key("db.rows_affected")
28+
)
29+
30+
type tracePlugin struct {
31+
attrs []attribute.KeyValue
32+
excludeQueryVars bool
33+
}
34+
35+
func NewGormTrace(excludeQueryVars bool) gorm.Plugin {
36+
return &tracePlugin{
37+
excludeQueryVars: excludeQueryVars,
38+
}
39+
}
40+
41+
func (p tracePlugin) Name() string {
42+
return "gorm:oteltracing"
43+
}
44+
45+
type gormHookFunc func(tx *gorm.DB)
46+
47+
type gormRegister interface {
48+
Register(name string, fn func(*gorm.DB)) error
49+
}
50+
51+
func (p tracePlugin) Initialize(db *gorm.DB) (err error) {
52+
cb := db.Callback()
53+
hooks := []struct {
54+
callback gormRegister
55+
hook gormHookFunc
56+
name string
57+
}{
58+
{cb.Query().Before("gorm:query"), p.before("gorm.Query"), "before:select"},
59+
{cb.Query().After("gorm:query"), p.after("gorm.Query"), "after:select"},
60+
61+
/*
62+
{cb.Create().Before("gorm:create"), p.before("gorm.Create"), "before:create"},
63+
{cb.Create().After("gorm:create"), p.after("gorm.Create"), "after:create"},
64+
65+
{cb.Delete().Before("gorm:delete"), p.before("gorm.Delete"), "before:delete"},
66+
{cb.Delete().After("gorm:delete"), p.after("gorm.Delete"), "after:delete"},
67+
68+
{cb.Update().Before("gorm:update"), p.before("gorm.Update"), "before:update"},
69+
{cb.Update().After("gorm:update"), p.after("gorm.Update"), "after:update"},
70+
71+
{cb.Row().Before("gorm:row"), p.before("gorm.Row"), "before:row"},
72+
{cb.Row().After("gorm:row"), p.after("gorm.Row"), "after:row"},
73+
74+
{cb.Raw().Before("gorm:raw"), p.before("gorm.Raw"), "before:raw"},
75+
{cb.Raw().After("gorm:raw"), p.after("gorm.Raw"), "after:raw"},
76+
*/
77+
}
78+
79+
var firstErr error
80+
for _, h := range hooks {
81+
if err := h.callback.Register("otel:"+h.name, h.hook); err != nil && firstErr == nil {
82+
firstErr = fmt.Errorf("callback register %s failed: %w", h.name, err)
83+
}
84+
}
85+
86+
return firstErr
87+
}
88+
89+
func (p *tracePlugin) before(operate string) gormHookFunc {
90+
return func(tx *gorm.DB) {
91+
span := trace.SpanFromContext(tx.Statement.Context)
92+
if !span.IsRecording() {
93+
return
94+
}
95+
span.AddEvent("About " + operate)
96+
}
97+
}
98+
99+
func (p *tracePlugin) after(operate string) gormHookFunc {
100+
return func(tx *gorm.DB) {
101+
span := trace.SpanFromContext(tx.Statement.Context)
102+
if !span.IsRecording() {
103+
return
104+
}
105+
106+
attrs := make([]attribute.KeyValue, 0, len(p.attrs)+4)
107+
attrs = append(attrs, p.attrs...)
108+
109+
if sys := dbSystem(tx); sys.Valid() {
110+
attrs = append(attrs, sys)
111+
}
112+
113+
vars := tx.Statement.Vars
114+
115+
var query string
116+
if p.excludeQueryVars {
117+
query = tx.Statement.SQL.String()
118+
} else {
119+
query = tx.Dialector.Explain(tx.Statement.SQL.String(), vars...)
120+
}
121+
122+
formatQuery := p.formatQuery(query)
123+
attrs = append(attrs, semconv.DBStatementKey.String(formatQuery))
124+
attrs = append(attrs, semconv.DBOperationKey.String(dbOperation(formatQuery)))
125+
if tx.Statement.Table != "" {
126+
attrs = append(attrs, semconv.DBSQLTableKey.String(tx.Statement.Table))
127+
}
128+
if tx.Statement.RowsAffected != -1 {
129+
attrs = append(attrs, dbRowsAffected.Int64(tx.Statement.RowsAffected))
130+
}
131+
132+
span.AddEvent(fmt.Sprintf("%s succeeded", operate), trace.WithAttributes(attrs...))
133+
switch tx.Error {
134+
case nil,
135+
gorm.ErrRecordNotFound,
136+
driver.ErrSkip,
137+
io.EOF, // end of rows iterator
138+
sql.ErrNoRows:
139+
// ignore
140+
default:
141+
span.RecordError(tx.Error)
142+
}
143+
}
144+
}
145+
146+
func (p *tracePlugin) formatQuery(query string) string {
147+
return query
148+
}
149+
150+
func dbSystem(tx *gorm.DB) attribute.KeyValue {
151+
switch tx.Dialector.Name() {
152+
case "mysql":
153+
return semconv.DBSystemMySQL
154+
case "mssql":
155+
return semconv.DBSystemMSSQL
156+
case "postgres", "postgresql":
157+
return semconv.DBSystemPostgreSQL
158+
case "sqlite":
159+
return semconv.DBSystemSqlite
160+
case "sqlserver":
161+
return semconv.DBSystemKey.String("sqlserver")
162+
case "clickhouse":
163+
return semconv.DBSystemKey.String("clickhouse")
164+
case "spanner":
165+
return semconv.DBSystemKey.String("spanner")
166+
default:
167+
return attribute.KeyValue{}
168+
}
169+
}
170+
171+
func dbOperation(query string) string {
172+
s := cCommentRegex.ReplaceAllString(query, "")
173+
s = lineCommentRegex.ReplaceAllString(s, "")
174+
s = sqlPrefixRegex.ReplaceAllString(s, "")
175+
return strings.ToLower(firstWordRegex.FindString(s))
176+
}

0 commit comments

Comments
 (0)