Skip to content

Commit

Permalink
Merge pull request #29 from lovoo/remove-logs
Browse files Browse the repository at this point in the history
removed noisy logs
  • Loading branch information
frairon authored Jun 1, 2022
2 parents 546936a + 07f4300 commit 84dd928
Show file tree
Hide file tree
Showing 4 changed files with 8 additions and 33 deletions.
7 changes: 3 additions & 4 deletions bbq/bbq.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,11 @@ func (s *saver) Save() (map[string]bigquery.Value, string, error) {
values := convertToMap(s.msgToSave)
return values, "", nil
}

func convertToMap(item interface{}) map[string]bigquery.Value {
return convertToMapReflect(reflect.ValueOf(item).Elem())
}

func convertToMapReflect(value reflect.Value) map[string]bigquery.Value {
values := make(map[string]bigquery.Value, value.NumField())

Expand Down Expand Up @@ -123,7 +125,6 @@ func NewBbq(gcpproject string, datesetName string, tables []*TableOptions, metri
}

dataset := client.Dataset(datesetName)
log.Printf("BigQuery client created successfully. Writing to %s dataset", datesetName)

m := newMetrics(metricsNamespace)

Expand All @@ -150,7 +151,6 @@ func NewBbq(gcpproject string, datesetName string, tables []*TableOptions, metri
stopUploaders: stop,
uploadersWg: &wg,
}, nil

}

// Stop drains the batches in the bbq-uploaders and blocks until they're done
Expand All @@ -167,7 +167,6 @@ func (b *Bbq) Stop(timeout time.Duration) {

select {
case <-done:
log.Printf("BBQ: uploaders fully drained. BBQ is shutting down cleanly")
case <-time.After(timeout):
log.Printf("Timeout shutting down bbq, we will lose some data. Sorry.")
}
Expand Down Expand Up @@ -223,7 +222,7 @@ func createOrUpdateTable(ctx context.Context, dataset *bigquery.Dataset, name st
schema, err = inferSchema(tableOptions.Obj)
}

//Set all the required fields to false
// Set all the required fields to false
setRequiredFalse(schema)
if err != nil {
return fmt.Errorf("error infering schema: %v", err)
Expand Down
12 changes: 3 additions & 9 deletions scheduler/scheduler_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ func TestScheduler_Integration(t *testing.T) {
t.Errorf("Error creating order emitter: %v", err)
}

log.Printf("waiting for processors to get up")
// need some time to start the processors
time.Sleep(10 * time.Second)

Expand Down Expand Up @@ -144,7 +143,6 @@ func TestScheduler_Integration(t *testing.T) {
}

func startScheduler(ctx context.Context, errg *multierr.ErrGroup, prefix string) *Scheduler {

sched := CreateScheduler(NewConfig(), []time.Duration{
1 * time.Second,
50 * time.Millisecond,
Expand Down Expand Up @@ -179,14 +177,12 @@ type receiverProc struct {
}

func createReceiverProc(ctx context.Context, t *testing.T, errg *multierr.ErrGroup) *receiverProc {

rp := &receiverProc{
received: make(map[string]string),
}
proc, err := goka.NewProcessor([]string{*broker}, goka.DefineGroup(
"receiver-proc",
goka.Input(targetTopic, new(codec.String), func(ctx goka.Context, msg interface{}) {
// log.Printf("receiving %s, %v", ctx.Key(), msg)
rp.m.Lock()
defer rp.m.Unlock()
rp.received[ctx.Key()] = msg.(string)
Expand All @@ -204,11 +200,9 @@ func createReceiverProc(ctx context.Context, t *testing.T, errg *multierr.ErrGro

return rp
}
func (rp *receiverProc) checkValue(t *testing.T, timeout time.Duration, key string, value string) {

var (
waitTime = 10 * time.Millisecond
)
func (rp *receiverProc) checkValue(t *testing.T, timeout time.Duration, key string, value string) {
waitTime := 10 * time.Millisecond
start := time.Now()
for {
time.Sleep(waitTime)
Expand All @@ -235,7 +229,7 @@ func (rp *receiverProc) clear() {
}

func randKey() string {
var target = make([]byte, 4)
target := make([]byte, 4)
rand.Read(target)

return hex.EncodeToString(target)
Expand Down
20 changes: 2 additions & 18 deletions telly/inserter.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ func (t *Telly) offsetKey() string {
}

func (t *Telly) readOffsetDoc(ctx context.Context) (*offsetDoc, error) {

cursor, err := t.metaTable().Get(t.offsetKey()).Run(t.rsess)
if err != nil {
return nil, fmt.Errorf("error reading stats:%v", err)
Expand All @@ -54,12 +53,10 @@ func (t *Telly) readOffsetDoc(ctx context.Context) (*offsetDoc, error) {
_, err = t.metaTable().Insert(&newOffsets, rethinkdb.InsertOpts{
Conflict: "replace",
}).RunWrite(t.rsess)
log.Printf("creating new ones. %v#", newOffsets)
return &newOffsets, err
}

func (t *Telly) runInserter(ctx context.Context, input <-chan *sarama.ConsumerMessage) error {

offsets, err := t.readOffsetDoc(ctx)
if err != nil {
return fmt.Errorf("error reading stats: %v", err)
Expand All @@ -68,31 +65,19 @@ func (t *Telly) runInserter(ctx context.Context, input <-chan *sarama.ConsumerMe
committer := time.NewTicker(maxBatch)
defer committer.Stop()

var (
batch = make([]*sarama.ConsumerMessage, 0, t.opts.inserterBatchSize)
start = time.Now()
)
batch := make([]*sarama.ConsumerMessage, 0, t.opts.inserterBatchSize)

var totalItemsCommitted int
commit := func() {
now := time.Now()

ctx, cancel := context.WithTimeout(context.Background(), commitTimeout)
defer cancel()

newItems, updatedItems, deletedItems, err := t.insertBatch(ctx, offsets, batch)
newItems, updatedItems, _, err := t.insertBatch(ctx, offsets, batch)
if err != nil {
log.Printf("error committing batch: %v", err)
}
totalItemsCommitted += newItems + updatedItems

defer func() {
// TODO: add metrics instead of logging every batch
if false {
log.Printf("commit took %v for %d new, %d updated, %d deleted items (total %d) (%.0f items/s)", time.Since(now), newItems, updatedItems, deletedItems, totalItemsCommitted, float64(totalItemsCommitted)/time.Since(start).Seconds())
}
}()

committer.Reset(maxBatch)
// reset batch
batch = make([]*sarama.ConsumerMessage, 0, t.opts.inserterBatchSize)
Expand Down Expand Up @@ -121,7 +106,6 @@ func (t *Telly) runInserter(ctx context.Context, input <-chan *sarama.ConsumerMe
}

func (t *Telly) insertBatch(ctx context.Context, offsets *offsetDoc, batch []*sarama.ConsumerMessage) (int, int, int, error) {

// ignore empty batch
if len(batch) == 0 {
return 0, 0, 0, nil
Expand Down
2 changes: 0 additions & 2 deletions telly/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,11 @@ func (t *Telly) offsetForPartition(offsets *offsetDoc, partition int32, tmgr gok
}

func (t *Telly) readPartition(ctx context.Context, part int32, cons sarama.Consumer, offset int64, inputMsgChan chan<- *sarama.ConsumerMessage) error {

partCons, err := cons.ConsumePartition(t.opts.topic, part, offset)
if err != nil {
return fmt.Errorf("error creating partition consumer %s/%d: %v", t.opts.topic, part, err)
}
defer func() {
log.Printf("reading partition done")
if err := partCons.Close(); err != nil {
log.Printf("error closing partition consumer %s/%d: %v", t.opts.topic, part, err)
}
Expand Down

0 comments on commit 84dd928

Please sign in to comment.