-
Notifications
You must be signed in to change notification settings - Fork 113
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add GCP Cloud PubSub reporter #159
Changes from 21 commits
058ff83
109b9d7
092e941
624e689
f9914a5
c0594a6
692e01b
c67a949
88ecaf9
d38ff1e
bb39103
537d8e6
e6cd074
19469f3
a3fb7f9
a85b23e
0cf6684
5744523
5514ba7
67ccd68
386fed9
5273cb5
37ee3bd
30282f4
6a742ef
38a56c7
99542a8
6cf1224
ad9151f
5606781
89d56c9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -24,3 +24,4 @@ _testmain.go | |
*.prof | ||
|
||
.idea | ||
vendor |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,25 +1,31 @@ | ||
module github.com/openzipkin/zipkin-go | ||
|
||
require ( | ||
cloud.google.com/go v0.38.0 | ||
github.com/Shopify/sarama v1.19.0 | ||
github.com/Shopify/toxiproxy v2.1.4+incompatible // indirect | ||
github.com/davecgh/go-spew v1.1.1 // indirect | ||
github.com/eapache/go-resiliency v1.1.0 // indirect | ||
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 // indirect | ||
github.com/eapache/queue v1.1.0 // indirect | ||
github.com/envoyproxy/go-control-plane v0.6.9 // indirect | ||
github.com/gogo/googleapis v1.1.0 // indirect | ||
github.com/gogo/protobuf v1.2.0 | ||
github.com/golang/protobuf v1.2.0 | ||
github.com/golang/protobuf v1.3.1 | ||
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db // indirect | ||
github.com/googleapis/gax-go v2.0.2+incompatible // indirect | ||
github.com/gorilla/context v1.1.1 // indirect | ||
github.com/gorilla/mux v1.6.2 | ||
github.com/lyft/protoc-gen-validate v0.0.13 // indirect | ||
github.com/onsi/ginkgo v1.7.0 | ||
github.com/onsi/gomega v1.4.3 | ||
github.com/pierrec/lz4 v1.0.2-0.20190131084431-473cd7ce01a1 // indirect | ||
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a // indirect | ||
github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94 | ||
golang.org/x/net v0.0.0-20190311183353-d8887717615a | ||
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f // indirect | ||
google.golang.org/grpc v1.20.0 | ||
go.opencensus.io v0.22.0 // indirect | ||
golang.org/x/net v0.0.0-20190503192946-f4e77d36d62c | ||
google.golang.org/api v0.7.0 // indirect | ||
google.golang.org/grpc v1.20.1 | ||
) | ||
|
||
go 1.12 |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,112 @@ | ||
package gcppubsub | ||
|
||
import ( | ||
"cloud.google.com/go/pubsub" | ||
"context" | ||
"encoding/json" | ||
"errors" | ||
"github.com/openzipkin/zipkin-go/model" | ||
"github.com/openzipkin/zipkin-go/reporter" | ||
"log" | ||
"os" | ||
) | ||
|
||
const defaultPubSubTopic = "defaultTopic" | ||
|
||
var resultMsg = make(chan reporterResult) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just out of curiosity, why this is not part of the struct that entirely uses it? Also, could we rename it into |
||
|
||
// Reporter implements Reporter by publishing spans to a GCP gcppubsub. | ||
type Reporter struct { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am curious about this being exported. Can't this be private? |
||
logger *log.Logger | ||
topic *pubsub.Topic | ||
client *pubsub.Client | ||
} | ||
|
||
type reporterResult struct { | ||
ctx context.Context | ||
result pubsub.PublishResult | ||
} | ||
|
||
// ReporterOption sets a parameter for the reporter | ||
type ReporterOption func(c *Reporter) | ||
|
||
// Send send span to topic | ||
func (r *Reporter) Send(s model.SpanModel) { | ||
// Zipkin expects the message to be wrapped in an array | ||
ss := []model.SpanModel{s} | ||
m, err := json.Marshal(ss) | ||
if err != nil { | ||
r.logger.Printf("failed when marshalling the span: %s\n", err.Error()) | ||
return | ||
} | ||
r.publish(m) | ||
} | ||
|
||
// Close releases any resources held by the client (pubsub client publisher and subscriber connections). | ||
func (r *Reporter) Close() error { | ||
close(resultMsg) | ||
return r.client.Close() | ||
} | ||
|
||
// Logger sets the logger used to report errors in the collection | ||
// process. | ||
func Logger(logger *log.Logger) ReporterOption { | ||
return func(c *Reporter) { | ||
c.logger = logger | ||
} | ||
} | ||
|
||
// Client sets the client used to produce to gcppubsub. | ||
func Client(client *pubsub.Client) ReporterOption { | ||
return func(c *Reporter) { | ||
c.client = client | ||
} | ||
} | ||
|
||
// Topic sets the gcppubsub topic to attach the reporter producer on. | ||
func Topic(t *pubsub.Topic) ReporterOption { | ||
return func(c *Reporter) { | ||
c.topic = t | ||
} | ||
} | ||
|
||
// NewReporter returns a new gcppubsub-backed Reporter. address should be a slice of | ||
// TCP endpoints of the form "host:port". | ||
func NewReporter(options ...ReporterOption) (reporter.Reporter, error) { | ||
r := &Reporter{ | ||
logger: log.New(os.Stderr, "", log.LstdFlags), | ||
} | ||
|
||
for _, option := range options { | ||
option(r) | ||
} | ||
|
||
if r.client == nil { | ||
err := errors.New("cannot create pubsub reporter without valid client") | ||
return nil, err | ||
} | ||
if r.topic == nil { | ||
t := r.client.Topic(defaultPubSubTopic) | ||
r.topic = t | ||
} | ||
go r.checkResult() | ||
return r, nil | ||
} | ||
|
||
func (r *Reporter) publish(msg []byte) { | ||
ctx := context.Background() | ||
|
||
result := r.topic.Publish(ctx, &pubsub.Message{ | ||
Data: msg, | ||
}) | ||
resultMsg <- reporterResult{ctx, *result} | ||
} | ||
|
||
func (r *Reporter) checkResult() { | ||
for n := range resultMsg { | ||
_, err := n.result.Get(n.ctx) | ||
if err != nil { | ||
r.logger.Printf("Error sending message: %s\n", err.Error()) | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,111 @@ | ||
package gcppubsub | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"github.com/openzipkin/zipkin-go/model" | ||
"os" | ||
"sync" | ||
"testing" | ||
"time" | ||
|
||
"cloud.google.com/go/pubsub" | ||
) | ||
|
||
var topicID string | ||
|
||
var once sync.Once // guards cleanup related operations in setup. | ||
KeisukeYamashita marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
func setup(topicID string) *pubsub.Client { | ||
ctx := context.Background() | ||
proj := os.Getenv("GOOGLE_CLOUD_PROJECT") | ||
fmt.Printf("GCP Project: %s\n", proj) | ||
|
||
client, err := pubsub.NewClient(ctx, proj) | ||
if err != nil { | ||
fmt.Printf("failed to create client: %s\n", topicID) | ||
KeisukeYamashita marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return nil | ||
} | ||
|
||
_, err = client.CreateTopic(ctx, topicID) | ||
if err != nil { | ||
fmt.Printf("failed to create topic: %v", err) | ||
return nil | ||
} | ||
fmt.Printf("Topic created: %s\n", topicID) | ||
return client | ||
} | ||
|
||
func TestPublish(t *testing.T) { | ||
tcs := map[string]struct { | ||
topicID string | ||
}{ | ||
"with test-topic": { | ||
topicID: "test-topic", | ||
}, | ||
"with default topic": { | ||
topicID: defaultPubSubTopic, | ||
}, | ||
} | ||
|
||
for n, tc := range tcs { | ||
t.Run(n, func(t *testing.T) { | ||
c := setup(tc.topicID) | ||
if c != nil { | ||
KeisukeYamashita marked this conversation as resolved.
Show resolved
Hide resolved
|
||
top := c.Topic(topicID) | ||
reporter, err := NewReporter(Client(c), Topic(top)) | ||
if err != nil { | ||
t.Fatalf("failed creating reporter: %v", err) | ||
} | ||
span := makeNewSpan("avg1", 124, 457, 0, true) | ||
reporter.Send(*span) | ||
|
||
// Cleanup resources from the previous failed tests. | ||
once.Do(func() { | ||
ctx := context.Background() | ||
topic := c.Topic(topicID) | ||
_, err := topic.Exists(ctx) | ||
if err != nil { | ||
t.Fatalf("failed to check if topic exists: %v", err) | ||
} | ||
|
||
if err := topic.Delete(ctx); err != nil { | ||
fmt.Printf("failed to cleanup the topic (%q): %v", topicID, err) | ||
} | ||
}) | ||
} | ||
}) | ||
} | ||
} | ||
|
||
func TestErrorNotProjEnv(t *testing.T) { | ||
reporter, err := NewReporter() | ||
if reporter != nil { | ||
t.Fatal("Reporter should be null when initiated without client") | ||
} | ||
if err == nil { | ||
t.Fatal("NewReporter should return an error when initiated without client") | ||
} | ||
if err.Error() != "cannot create pubsub reporter without valid client" { | ||
t.Fatal("NewReporter should return cannot create pubsub reporter without valid client error") | ||
} | ||
} | ||
|
||
func makeNewSpan(methodName string, traceID, spanID, parentSpanID uint64, debug bool) *model.SpanModel { | ||
timestamp := time.Now() | ||
var parentID = new(model.ID) | ||
if parentSpanID != 0 { | ||
*parentID = model.ID(parentSpanID) | ||
} | ||
|
||
return &model.SpanModel{ | ||
SpanContext: model.SpanContext{ | ||
TraceID: model.TraceID{Low: traceID}, | ||
ID: model.ID(spanID), | ||
ParentID: parentID, | ||
Debug: debug, | ||
}, | ||
Name: methodName, | ||
Timestamp: timestamp, | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see many dependencies being added, did you try
go mod tidy
?