Stream Processing made easy
go get -v -u github.com/gojekfarm/ziggurat/cmd/...
go install github.com/gojekfarm/ziggurat/cmd/...
ziggurat new <app_name>
package main
import (
"context"
"github.com/gojekfarm/ziggurat/mw/event"
"github.com/gojekfarm/ziggurat/mw/prometheus"
"github.com/gojekfarm/ziggurat/mw/statsd"
"github.com/gojekfarm/ziggurat"
"github.com/gojekfarm/ziggurat/kafka"
"github.com/gojekfarm/ziggurat/logger"
"github.com/gojekfarm/ziggurat/router"
)
func main() {
var zig ziggurat.Ziggurat
jsonLogger := logger.NewJSONLogger(logger.LevelInfo)
ctx := context.Background()
kafkaStreams := kafka.Streams{
StreamConfig: kafka.StreamConfig{
{
BootstrapServers: "localhost:9092",
OriginTopics: "plain-text-log",
ConsumerGroupID: "plain_text_consumer",
ConsumerCount: 1,
RouteGroup: "plain-text-log",
},
{
BootstrapServers: "localhost:9092",
OriginTopics: "json-log",
ConsumerGroupID: "json_consumer",
ConsumerCount: 1,
RouteGroup: "json-log",
},
},
Logger: jsonLogger,
}
r := router.New()
r.HandleFunc("plain-text-log", func(ctx context.Context, event *ziggurat.Event) error {
return nil
})
r.HandleFunc("json-log", func(ctx context.Context, event *ziggurat.Event) error {
return ziggurat.Retry
})
handler := r.Compose(event.Logger(jsonLogger))
if runErr := zig.Run(ctx, &kafkaStreams, handler); runErr != nil {
jsonLogger.Error("could not start streams", runErr)
}
}
- There are 2 main interfaces that your structs can implement to plug into the Ziggurat library
package ziggurat
import "context"
type Streamer interface {
Stream(ctx context.Context, handler Handler) error
}
// Any type can implement the Streamer interface to stream data from any source
package ziggurat
import "context"
type Handler interface {
Handle(ctx context.Context, event *Event) error
}
// The Handler interface is very similar to the http.Handler interface
// The default router shipped with Ziggurat also implements the Handler interface