Skip to content

Commit

Permalink
Attempt to change how in progress queries and last completed queries …
Browse files Browse the repository at this point in the history
…are writen to disk

Resovles issue #59
And hopefully fixes issue #38
  • Loading branch information
BenB196 committed Oct 19, 2019
1 parent 3d6110a commit c888612
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 46 deletions.
12 changes: 12 additions & 0 deletions eventOutput/fileHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,12 @@ func WriteInProgressQueries(query config.FFSQuery, inProgressQueries *[]InProgre
return errors.New("error: flushing file: " + fileName + " " + err.Error())
}

err = file.Sync()

if err != nil {
return errors.New("error: syncing file: " + fileName + " " + err.Error())
}

return nil
}

Expand Down Expand Up @@ -438,6 +444,12 @@ func WriteLastCompletedQuery(query config.FFSQuery, lastCompletedQuery InProgres
return errors.New("error: flushing file: " + fileName + " " + err.Error())
}

err = file.Sync()

if err != nil {
return errors.New("error: syncing file: " + fileName + " " + err.Error())
}

return nil
}

Expand Down
65 changes: 19 additions & 46 deletions ffsEvent/ffsEvent.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/google/go-cmp/cmp"
"github.com/olivere/elastic/v7"
"log"
"reflect"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -86,29 +85,6 @@ func FFSQuery (configuration config.Config, query config.FFSQuery) {
}
}()

//Write in progress queries every 100 milliseconds to file
inProgressQueryWriteTimeTicker := time.NewTicker(100 * time.Millisecond)
go func() {
var oldInProgressQueries []eventOutput.InProgressQuery
oldInProgressQueries = inProgressQueries
for {
select {
case <- inProgressQueryWriteTimeTicker.C:
if !reflect.DeepEqual(oldInProgressQueries,inProgressQueries) {
oldInProgressQueries = inProgressQueries
err := eventOutput.WriteInProgressQueries(query, &inProgressQueries)

if err != nil {
panic(err)
}
}
case <- quit:
inProgressQueryWriteTimeTicker.Stop()
return
}
}
}()

//Init elastic client if output type == elastic
var elasticClient *elastic.Client
var ctx context.Context
Expand Down Expand Up @@ -148,28 +124,6 @@ func FFSQuery (configuration config.Config, query config.FFSQuery) {
}()
}

//Write last completed query every 100 milliseconds to file
lastCompletedQueryWriteTimeTicker := time.NewTicker(100 * time.Millisecond)
go func() {
var oldLastCompletedQuery eventOutput.InProgressQuery
oldLastCompletedQuery = lastCompletedQuery
for {
select {
case <-lastCompletedQueryWriteTimeTicker.C:
if oldLastCompletedQuery != lastCompletedQuery {
oldLastCompletedQuery = lastCompletedQuery
err := eventOutput.WriteLastCompletedQuery(query, lastCompletedQuery)
if err != nil {
panic(err)
}
}
case <- quit:
lastCompletedQueryWriteTimeTicker.Stop()
return
}
}
}()

//Handle setting the initial ON_OR_BEFORE and ON_OR_AFTER depending on the saved lastCompletedQuery
if lastCompletedQuery != (eventOutput.InProgressQuery{}) {
//TODO handle setting correct times
Expand Down Expand Up @@ -228,6 +182,13 @@ func queryFetcher(query config.FFSQuery, inProgressQueries *[]eventOutput.InProg

if !cleanUpQuery && !retryQuery {
*inProgressQueries = append(*inProgressQueries,inProgressQuery)

//Write in progress queries to file
err := eventOutput.WriteInProgressQueries(query, inProgressQueries)

if err != nil {
panic(err)
}
}

fileEvents, err := ffs.GetFileEvents(authData,configuration.FFSURI, query.Query)
Expand Down Expand Up @@ -1066,6 +1027,11 @@ func queryFetcher(query config.FFSQuery, inProgressQueries *[]eventOutput.InProg
//Check if this query is the newest completed query, if it is, set last completed query to query times
if lastCompletedQuery.OnOrBefore.Sub(inProgressQuery.OnOrAfter) <= 0 {
*lastCompletedQuery = inProgressQuery

err := eventOutput.WriteLastCompletedQuery(query, inProgressQuery)
if err != nil {
panic(err)
}
}

//Remove from in progress query slice
Expand All @@ -1078,6 +1044,13 @@ func queryFetcher(query config.FFSQuery, inProgressQueries *[]eventOutput.InProg
}
*inProgressQueries = tempInProgress

//Write in progress queries to file
err = eventOutput.WriteInProgressQueries(query, inProgressQueries)

if err != nil {
panic(err)
}

promMetrics.IncrementEventsProcessed(len(ffsEvents))
promMetrics.DecreaseInProgressQueries()
}
Expand Down

0 comments on commit c888612

Please sign in to comment.