Skip to content

Commit

Permalink
Merge pull request #67 from BenB196/staging
Browse files Browse the repository at this point in the history
Merge staging to master and bump to version 0.1.9
  • Loading branch information
BenB196 authored Oct 19, 2019
2 parents 9cc358d + 457c4ce commit ac640a5
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 50 deletions.
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.1.8
0.1.9
2 changes: 1 addition & 1 deletion elasticsearch/logstash.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ func CreateLogstashClient(logstashURL string) (net.Conn,error) {
}

d := net.Dialer{
Timeout: 30 * time.Second,
Timeout: 5 * time.Minute,
}

connection, err := d.Dial("tcp", tcpAddr.String())
Expand Down
12 changes: 12 additions & 0 deletions eventOutput/fileHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,12 @@ func WriteInProgressQueries(query config.FFSQuery, inProgressQueries *[]InProgre
return errors.New("error: flushing file: " + fileName + " " + err.Error())
}

err = file.Sync()

if err != nil {
return errors.New("error: syncing file: " + fileName + " " + err.Error())
}

return nil
}

Expand Down Expand Up @@ -438,6 +444,12 @@ func WriteLastCompletedQuery(query config.FFSQuery, lastCompletedQuery InProgres
return errors.New("error: flushing file: " + fileName + " " + err.Error())
}

err = file.Sync()

if err != nil {
return errors.New("error: syncing file: " + fileName + " " + err.Error())
}

return nil
}

Expand Down
72 changes: 24 additions & 48 deletions ffsEvent/ffsEvent.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/google/go-cmp/cmp"
"github.com/olivere/elastic/v7"
"log"
"reflect"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -86,29 +85,6 @@ func FFSQuery (configuration config.Config, query config.FFSQuery) {
}
}()

//Write in progress queries every 100 milliseconds to file
inProgressQueryWriteTimeTicker := time.NewTicker(100 * time.Millisecond)
go func() {
var oldInProgressQueries []eventOutput.InProgressQuery
oldInProgressQueries = inProgressQueries
for {
select {
case <- inProgressQueryWriteTimeTicker.C:
if !reflect.DeepEqual(oldInProgressQueries,inProgressQueries) {
oldInProgressQueries = inProgressQueries
err := eventOutput.WriteInProgressQueries(query, &inProgressQueries)

if err != nil {
panic(err)
}
}
case <- quit:
inProgressQueryWriteTimeTicker.Stop()
return
}
}
}()

//Init elastic client if output type == elastic
var elasticClient *elastic.Client
var ctx context.Context
Expand Down Expand Up @@ -148,28 +124,6 @@ func FFSQuery (configuration config.Config, query config.FFSQuery) {
}()
}

//Write last completed query every 100 milliseconds to file
lastCompletedQueryWriteTimeTicker := time.NewTicker(100 * time.Millisecond)
go func() {
var oldLastCompletedQuery eventOutput.InProgressQuery
oldLastCompletedQuery = lastCompletedQuery
for {
select {
case <-lastCompletedQueryWriteTimeTicker.C:
if oldLastCompletedQuery != lastCompletedQuery {
oldLastCompletedQuery = lastCompletedQuery
err := eventOutput.WriteLastCompletedQuery(query, lastCompletedQuery)
if err != nil {
panic(err)
}
}
case <- quit:
lastCompletedQueryWriteTimeTicker.Stop()
return
}
}
}()

//Handle setting the initial ON_OR_BEFORE and ON_OR_AFTER depending on the saved lastCompletedQuery
if lastCompletedQuery != (eventOutput.InProgressQuery{}) {
//TODO handle setting correct times
Expand Down Expand Up @@ -228,18 +182,28 @@ func queryFetcher(query config.FFSQuery, inProgressQueries *[]eventOutput.InProg

if !cleanUpQuery && !retryQuery {
*inProgressQueries = append(*inProgressQueries,inProgressQuery)

//Write in progress queries to file
err := eventOutput.WriteInProgressQueries(query, inProgressQueries)

if err != nil {
panic(err)
}
}

fileEvents, err := ffs.GetFileEvents(authData,configuration.FFSURI, query.Query)

if err != nil {
log.Println("error getting file events for ffs query: " + query.Name)
//check if recoverable errors are thrown
if strings.Contains(err.Error(),"Error with gathering file events POST: 500 Internal Server Error") || (strings.Contains(err.Error(),"stream error: stream ID") && (strings.Contains(err.Error(),"INTERNAL_ERROR") || strings.Contains(err.Error(),"PROTOCOL_ERROR"))) || strings.Contains(err.Error(),"read: connection reset by peer") || strings.Contains(err.Error(),"POST: 400 Bad Request") || strings.Contains(err.Error(),"unexpected EOF") {
if strings.Contains(err.Error(),"Error with gathering file events POST: 500 Internal Server Error") || (strings.Contains(err.Error(),"stream error: stream ID") && (strings.Contains(err.Error(),"INTERNAL_ERROR") || strings.Contains(err.Error(),"PROTOCOL_ERROR"))) || strings.Contains(err.Error(),"read: connection reset by peer") || strings.Contains(err.Error(),"POST: 400 Bad Request") || strings.Contains(err.Error(),"unexpected EOF") || strings.Contains(err.Error(),"POST: 504 Gateway Timeout") {
//allow for 10 retries before killing to save resource overload.
log.Println("Attempting to recover from error: " + err.Error() + ". Retry number: " + strconv.Itoa(retryCount))
if retryCount <= 10 {
retryCount++
queryInterval, _ := time.ParseDuration(query.Interval)
//sleep before retry to reduce chance of hitting max queries per minute
time.Sleep(queryInterval)
retryCount = retryCount + 1
queryFetcher(query, inProgressQueries, authData, configuration, lastCompletedQuery, maxTime, cleanUpQuery, client, ctx, quit, retryCount, true)
return
} else {
Expand Down Expand Up @@ -1063,6 +1027,11 @@ func queryFetcher(query config.FFSQuery, inProgressQueries *[]eventOutput.InProg
//Check if this query is the newest completed query, if it is, set last completed query to query times
if lastCompletedQuery.OnOrBefore.Sub(inProgressQuery.OnOrAfter) <= 0 {
*lastCompletedQuery = inProgressQuery

err := eventOutput.WriteLastCompletedQuery(query, inProgressQuery)
if err != nil {
panic(err)
}
}

//Remove from in progress query slice
Expand All @@ -1075,6 +1044,13 @@ func queryFetcher(query config.FFSQuery, inProgressQueries *[]eventOutput.InProg
}
*inProgressQueries = tempInProgress

//Write in progress queries to file
err = eventOutput.WriteInProgressQueries(query, inProgressQueries)

if err != nil {
panic(err)
}

promMetrics.IncrementEventsProcessed(len(ffsEvents))
promMetrics.DecreaseInProgressQueries()
}
Expand Down

0 comments on commit ac640a5

Please sign in to comment.