-
Notifications
You must be signed in to change notification settings - Fork 113
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Google Cloud Pub Sub reporter #142
base: master
Are you sure you want to change the base?
Changes from 12 commits
058ff83
109b9d7
092e941
624e689
f9914a5
c0594a6
692e01b
c67a949
88ecaf9
d38ff1e
bb39103
537d8e6
e6cd074
19469f3
a3fb7f9
a85b23e
0cf6684
5744523
53eecf3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -24,3 +24,4 @@ _testmain.go | |
*.prof | ||
|
||
.idea | ||
vendor |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,25 +1,31 @@ | ||
module github.com/openzipkin/zipkin-go | ||
|
||
require ( | ||
cloud.google.com/go v0.38.0 | ||
github.com/Shopify/sarama v1.19.0 | ||
github.com/Shopify/toxiproxy v2.1.4+incompatible // indirect | ||
github.com/davecgh/go-spew v1.1.1 // indirect | ||
github.com/eapache/go-resiliency v1.1.0 // indirect | ||
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 // indirect | ||
github.com/eapache/queue v1.1.0 // indirect | ||
github.com/envoyproxy/go-control-plane v0.6.9 // indirect | ||
github.com/gogo/googleapis v1.1.0 // indirect | ||
github.com/gogo/protobuf v1.2.0 | ||
github.com/golang/protobuf v1.2.0 | ||
github.com/golang/protobuf v1.3.1 | ||
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db // indirect | ||
github.com/googleapis/gax-go v2.0.2+incompatible // indirect | ||
github.com/gorilla/context v1.1.1 // indirect | ||
github.com/gorilla/mux v1.6.2 | ||
github.com/lyft/protoc-gen-validate v0.0.13 // indirect | ||
github.com/onsi/ginkgo v1.7.0 | ||
github.com/onsi/gomega v1.4.3 | ||
github.com/pierrec/lz4 v1.0.2-0.20190131084431-473cd7ce01a1 // indirect | ||
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a // indirect | ||
github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94 | ||
golang.org/x/net v0.0.0-20190311183353-d8887717615a | ||
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f // indirect | ||
google.golang.org/grpc v1.20.0 | ||
go.opencensus.io v0.22.0 // indirect | ||
golang.org/x/net v0.0.0-20190503192946-f4e77d36d62c | ||
google.golang.org/api v0.7.0 // indirect | ||
google.golang.org/grpc v1.20.1 | ||
) | ||
|
||
go 1.12 |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,108 @@ | ||
package gcppubsub | ||
|
||
import ( | ||
"cloud.google.com/go/pubsub" | ||
"context" | ||
"encoding/json" | ||
"errors" | ||
"github.com/openzipkin/zipkin-go/model" | ||
"github.com/openzipkin/zipkin-go/reporter" | ||
"log" | ||
"os" | ||
) | ||
|
||
const defaultPubSubTopic = "defaultTopic" | ||
|
||
// Reporter implements Reporter by publishing spans to a GCP gcppubsub. | ||
type Reporter struct { | ||
logger *log.Logger | ||
topic string | ||
client *pubsub.Client | ||
} | ||
|
||
// ReporterOption sets a parameter for the reporter | ||
type ReporterOption func(c *Reporter) | ||
|
||
// Send send span to topic | ||
func (r *Reporter) Send(s model.SpanModel) { | ||
// Zipkin expects the message to be wrapped in an array | ||
ss := []model.SpanModel{s} | ||
m, err := json.Marshal(ss) | ||
if err != nil { | ||
r.logger.Printf("failed when marshalling the span: %s\n", err.Error()) | ||
return | ||
} | ||
r.publish(m) | ||
} | ||
|
||
// Close releases any resources held by the client (pubsub client publisher and subscriber connections). | ||
func (r *Reporter) Close() error { | ||
return r.client.Close() | ||
} | ||
|
||
// Logger sets the logger used to report errors in the collection | ||
// process. | ||
func Logger(logger *log.Logger) ReporterOption { | ||
return func(c *Reporter) { | ||
c.logger = logger | ||
} | ||
} | ||
|
||
// Client sets the client used to produce to gcppubsub. | ||
func Client(client *pubsub.Client) ReporterOption { | ||
return func(c *Reporter) { | ||
c.client = client | ||
} | ||
} | ||
|
||
// Topic sets the gcppubsub topic to attach the reporter producer on. | ||
func Topic(t string) ReporterOption { | ||
return func(c *Reporter) { | ||
c.topic = t | ||
} | ||
} | ||
|
||
// NewReporter returns a new gcppubsub-backed Reporter. address should be a slice of | ||
// TCP endpoints of the form "host:port". | ||
func NewReporter(options ...ReporterOption) (reporter.Reporter, error) { | ||
r := &Reporter{ | ||
logger: log.New(os.Stderr, "", log.LstdFlags), | ||
topic: defaultPubSubTopic, | ||
} | ||
|
||
for _, option := range options { | ||
option(r) | ||
} | ||
if r.client == nil { | ||
ctx := context.Background() | ||
proj := os.Getenv("GOOGLE_CLOUD_PROJECT") | ||
javierviera marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if proj == "" { | ||
err := errors.New("GOOGLE_CLOUD_PROJECT environment variable must be set. Traces wont be sent to gcppubsub") | ||
return nil, err | ||
} | ||
client, err := pubsub.NewClient(ctx, proj) | ||
if err != nil { | ||
log.Fatalf("Could not create gcppubsub Client: %v", err) | ||
return nil, err | ||
} | ||
r.client = client | ||
} | ||
|
||
return r, nil | ||
} | ||
|
||
func (r *Reporter) publish(msg []byte) { | ||
ctx := context.Background() | ||
t := r.client.Topic(r.topic) | ||
javierviera marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
result := t.Publish(ctx, &pubsub.Message{ | ||
// data must be a ByteString | ||
javierviera marked this conversation as resolved.
Show resolved
Hide resolved
|
||
Data: msg, | ||
}) | ||
go func() { | ||
javierviera marked this conversation as resolved.
Show resolved
Hide resolved
|
||
_, err := result.Get(ctx) | ||
if err != nil { | ||
r.logger.Printf("Error sending message: %s\n", err.Error()) | ||
} | ||
}() | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,98 @@ | ||
package gcppubsub | ||
javierviera marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"github.com/openzipkin/zipkin-go/model" | ||
"os" | ||
"sync" | ||
"testing" | ||
"time" | ||
|
||
"cloud.google.com/go/pubsub" | ||
) | ||
|
||
var topicID string | ||
|
||
var once sync.Once // guards cleanup related operations in setup. | ||
|
||
func setup() *pubsub.Client { | ||
ctx := context.Background() | ||
proj := os.Getenv("GOOGLE_CLOUD_PROJECT") | ||
fmt.Printf("GCP Project: %s\n", proj) | ||
topicID = "test-topic" | ||
|
||
client, err := pubsub.NewClient(ctx, proj) | ||
if err != nil { | ||
fmt.Printf("failed to create client: %s\n", topicID) | ||
return nil | ||
} | ||
|
||
_, err = client.CreateTopic(ctx, topicID) | ||
if err != nil { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the best here is that instead of returning a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It was like that originally. But the problem is that it would always fail in a CI server or everywhere where GOOGLE_CLOUD_PROJECT is not configured. As the test topic is created in an exiting GCP project... We we test the error but cannot test the success setup... |
||
fmt.Printf("failed to create topic: %v", err) | ||
return nil | ||
} | ||
fmt.Printf("Topic created: %s\n", topicID) | ||
return client | ||
} | ||
|
||
func TestPublish(t *testing.T) { | ||
javierviera marked this conversation as resolved.
Show resolved
Hide resolved
|
||
c := setup() | ||
if c != nil { | ||
reporter, err := NewReporter(Client(c), Topic(topicID)) | ||
if err != nil { | ||
t.Fatalf("failed creating reporter: %v", err) | ||
} | ||
span := makeNewSpan("avg1", 124, 457, 0, true) | ||
reporter.Send(*span) | ||
|
||
// Cleanup resources from the previous failed tests. | ||
once.Do(func() { | ||
ctx := context.Background() | ||
topic := c.Topic(topicID) | ||
ok, err := topic.Exists(ctx) | ||
if err != nil { | ||
fmt.Printf("failed to check if topic exists: %v", err) | ||
javierviera marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
if !ok { | ||
javierviera marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return | ||
} | ||
if err := topic.Delete(ctx); err != nil { | ||
fmt.Printf("failed to cleanup the topic (%q): %v", topicID, err) | ||
} | ||
}) | ||
} | ||
} | ||
|
||
func TestErrorNotProjEnv(t *testing.T) { | ||
reporter, err := NewReporter(Topic(topicID)) | ||
if reporter != nil { | ||
t.Fatal("Reporter should be null when initiated without client") | ||
} | ||
if err == nil { | ||
t.Fatal("NewReporter should return an error when initiated without client") | ||
} | ||
if err.Error() != "GOOGLE_CLOUD_PROJECT environment variable must be set. Traces wont be sent to gcppubsub" { | ||
t.Fatal("NewReporter should return GOOGLE_CLOUD_PROJECT environment variable must be set error when initiated without client") | ||
} | ||
} | ||
|
||
func makeNewSpan(methodName string, traceID, spanID, parentSpanID uint64, debug bool) *model.SpanModel { | ||
timestamp := time.Now() | ||
var parentID = new(model.ID) | ||
if parentSpanID != 0 { | ||
*parentID = model.ID(parentSpanID) | ||
} | ||
|
||
return &model.SpanModel{ | ||
SpanContext: model.SpanContext{ | ||
TraceID: model.TraceID{Low: traceID}, | ||
ID: model.ID(spanID), | ||
ParentID: parentID, | ||
Debug: debug, | ||
}, | ||
Name: methodName, | ||
Timestamp: timestamp, | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am curious about the tradeoff for reporting one span each time of piling up them up to a point and then do the reporting. I would prefer to not to do a http call on every span cc @basvanbeek
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can for symmetry reasons with other reporters (and maybe optimization of message encoding/decoding). For request/response calls it is not needed as pubsub handles sending underneath and batches.... see https://godoc.org/cloud.google.com/go/pubsub