Skip to content

Commit

Permalink
Merge pull request #25 from BenB196/staging
Browse files Browse the repository at this point in the history
Merge staging to master for v0.0.8
  • Loading branch information
BenB196 authored Sep 26, 2019
2 parents 6c37c46 + 5896db6 commit f712fde
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 19 deletions.
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.0.7
0.0.8
43 changes: 25 additions & 18 deletions ffsEvent/ffsEvent.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,6 @@ func FFSQuery (configuration config.Config, query config.FFSQuery) {
panic(err)
}

//Handle getting API AuthTokens every 55 minutes
authTimeTicker := time.NewTicker(55 * time.Minute)

//Get initial authData
authData, err := ffs.GetAuthData(configuration.AuthURI,query.Username,query.Password)

Expand All @@ -64,8 +61,12 @@ func FFSQuery (configuration config.Config, query config.FFSQuery) {
panic(err)
}

//Make quit chan to close go routines
quit := make(chan struct{})

//Init goroutine for getting authData ever 55 minutes
wgQuery.Add(1)
//Handle getting API AuthTokens every 55 minutes
authTimeTicker := time.NewTicker(55 * time.Minute)
go func() {
for {
select {
Expand All @@ -76,14 +77,15 @@ func FFSQuery (configuration config.Config, query config.FFSQuery) {
log.Println("error with getting authentication data for ffs query: " + query.Name)
panic(err)
}
case <- quit:
authTimeTicker.Stop()
return
}
defer wgQuery.Done()
}
}()

//Write in progress queries every 100 milliseconds to file
inProgressQueryWriteTimeTicker := time.NewTicker(100 * time.Millisecond)
wgQuery.Add(1)
go func() {
var oldInProgressQueries []eventOutput.InProgressQuery
oldInProgressQueries = inProgressQueries
Expand All @@ -99,8 +101,10 @@ func FFSQuery (configuration config.Config, query config.FFSQuery) {
panic(err)
}
}
case <- quit:
inProgressQueryWriteTimeTicker.Stop()
return
}
defer wgQuery.Done()
}
}()

Expand Down Expand Up @@ -135,34 +139,33 @@ func FFSQuery (configuration config.Config, query config.FFSQuery) {

//Handle old in progress queries that never completed when programmed died
if len(inProgressQueries) > 0 {
wgQuery.Add(1)
go func() {
for _, inProgressQuery := range inProgressQueries {
query = setOnOrBeforeAndAfter(query,inProgressQuery.OnOrBefore,inProgressQuery.OnOrAfter)
queryFetcher(query, &inProgressQueries, authData, configuration, &lastCompletedQuery, maxTime, nil, true, elasticClient, ctx)
defer wgQuery.Done()
queryFetcher(query, &inProgressQueries, authData, configuration, &lastCompletedQuery, maxTime, true, elasticClient, ctx, nil)
}
}()
}

//Write last completed query every 100 milliseconds to file
lastCompletedQueryWriteTimeTicker := time.NewTicker(100 * time.Millisecond)
wgQuery.Add(1)
go func() {
var oldLastCompletedQuery eventOutput.InProgressQuery
oldLastCompletedQuery = lastCompletedQuery
for {
select {
case <- lastCompletedQueryWriteTimeTicker.C:
case <-lastCompletedQueryWriteTimeTicker.C:
if oldLastCompletedQuery != lastCompletedQuery {
oldLastCompletedQuery = lastCompletedQuery
err := eventOutput.WriteLastCompletedQuery(query, lastCompletedQuery)
if err != nil {
panic(err)
}
}
case <- quit:
lastCompletedQueryWriteTimeTicker.Stop()
return
}
defer wgQuery.Done()
}
}()

Expand All @@ -178,15 +181,19 @@ func FFSQuery (configuration config.Config, query config.FFSQuery) {
for {
select {
case <- queryIntervalTimeTicker.C:
go queryFetcher(query, &inProgressQueries, authData, configuration, &lastCompletedQuery, maxTime, queryIntervalTimeTicker,false, elasticClient, ctx)
go queryFetcher(query, &inProgressQueries, authData, configuration, &lastCompletedQuery, maxTime,false, elasticClient, ctx, quit)
case <- quit:
queryIntervalTimeTicker.Stop()
wgQuery.Done()
return
}
defer wgQuery.Done()
}
}()
wgQuery.Wait()
return
}

func queryFetcher(query config.FFSQuery, inProgressQueries *[]eventOutput.InProgressQuery, authData ffs.AuthData, configuration config.Config, lastCompletedQuery *eventOutput.InProgressQuery, maxTime time.Time, queryIntervalTimeTicker *time.Ticker, cleanUpQuery bool, client *elastic.Client, ctx context.Context) {
func queryFetcher(query config.FFSQuery, inProgressQueries *[]eventOutput.InProgressQuery, authData ffs.AuthData, configuration config.Config, lastCompletedQuery *eventOutput.InProgressQuery, maxTime time.Time, cleanUpQuery bool, client *elastic.Client, ctx context.Context, quit chan<- struct{}) {
var done bool
var err error
//Increment time
Expand All @@ -200,8 +207,8 @@ func queryFetcher(query config.FFSQuery, inProgressQueries *[]eventOutput.InProg

//Stop the goroutine if the max time is past
if done {
if queryIntervalTimeTicker != nil {
queryIntervalTimeTicker.Stop()
if quit != nil {
close(quit)
}
return
}
Expand Down

0 comments on commit f712fde

Please sign in to comment.