From 6f655581fec2d665309026a076ae3364bcba4f07 Mon Sep 17 00:00:00 2001 From: BenB196 Date: Tue, 15 Oct 2019 19:25:43 -0400 Subject: [PATCH 1/6] Corrected issue where the elastic standardized output put exposure in the wrong area Fixed issue #56 --- eventOutput/fileHandler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/eventOutput/fileHandler.go b/eventOutput/fileHandler.go index caaecf6..0c5d1c6 100644 --- a/eventOutput/fileHandler.go +++ b/eventOutput/fileHandler.go @@ -31,6 +31,7 @@ type ElasticFileEvent struct { File *File `json:"file"` Device *Device `json:"device"` Cloud *Cloud `json:"cloud"` + Exposure []string `json:"exposure,omitempty"` Process *Process `json:"process"` RemovableMedia *RemovableMedia `json:"removable_media,omitempty"` SyncDestination string `json:"sync_destination,omitempty"` @@ -80,7 +81,6 @@ type Cloud struct { CloudDriveId string `json:"drive_id,omitempty"` DetectionSourceAlias string `json:"detection_source_alias,omitempty"` FileId string `json:"file_id,omitempty"` - Exposure []string `json:"exposure,omitempty"` } type Process struct { From d4e586071333dc41486cc4912e1f5172e405639f Mon Sep 17 00:00:00 2001 From: BenB196 Date: Tue, 15 Oct 2019 19:29:54 -0400 Subject: [PATCH 2/6] Fixed issue with correct on issue #56 --- ffsEvent/ffsEvent.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ffsEvent/ffsEvent.go b/ffsEvent/ffsEvent.go index ef0014e..406a3f3 100644 --- a/ffsEvent/ffsEvent.go +++ b/ffsEvent/ffsEvent.go @@ -409,7 +409,6 @@ func queryFetcher(query config.FFSQuery, inProgressQueries *[]eventOutput.InProg CloudDriveId: ffsEvent.CloudDriveId, DetectionSourceAlias: ffsEvent.DetectionSourceAlias, FileId: ffsEvent.FileId, - Exposure: ffsEvent.Exposure, } process := eventOutput.Process{ @@ -434,6 +433,7 @@ func queryFetcher(query config.FFSQuery, inProgressQueries *[]eventOutput.InProg File: &file, Device: &device, Cloud: &cloud, + Exposure: ffsEvent.Exposure, Process: &process, RemovableMedia: &removableMedia, SyncDestination: ffsEvent.SyncDestination, From 8f86afc33eeab66deb92d0ff71208b5a5ba1286f Mon Sep 17 00:00:00 2001 From: BenB196 Date: Tue, 15 Oct 2019 19:53:37 -0400 Subject: [PATCH 3/6] Fixed issue where OneDrive events where not being logged This issue was related to have ip api support enabled, and events that did not have a public IP address. Fixed issue #57 --- eventOutput/fileHandler.go | 2 +- ffsEvent/ffsEvent.go | 126 +++++++++++++++++++------------------ 2 files changed, 67 insertions(+), 61 deletions(-) diff --git a/eventOutput/fileHandler.go b/eventOutput/fileHandler.go index 0c5d1c6..153a0cd 100644 --- a/eventOutput/fileHandler.go +++ b/eventOutput/fileHandler.go @@ -16,7 +16,7 @@ import ( type FFSEvent struct { ffs.FileEvent - ip_api.Location `json:",omitempty"` + *ip_api.Location `json:",omitempty"` *GeoPoint `json:"geoPoint,omitempty"` } diff --git a/ffsEvent/ffsEvent.go b/ffsEvent/ffsEvent.go index 406a3f3..69c93d0 100644 --- a/ffsEvent/ffsEvent.go +++ b/ffsEvent/ffsEvent.go @@ -232,6 +232,8 @@ func queryFetcher(query config.FFSQuery, inProgressQueries *[]eventOutput.InProg fileEvents, err := ffs.GetFileEvents(authData,configuration.FFSURI, query.Query) + log.Println(fileEvents) + if err != nil { log.Println("error getting file events for ffs query: " + query.Name) //check if recoverable errors are thrown @@ -315,7 +317,7 @@ func queryFetcher(query config.FFSQuery, inProgressQueries *[]eventOutput.InProg for _, event := range fileEvents { if event.PublicIpAddress != "" { if len(locationMap) == 0 { - ffsEvents = append(ffsEvents,eventOutput.FFSEvent{FileEvent: event}) + ffsEvents = append(ffsEvents,eventOutput.FFSEvent{FileEvent: event, Location: nil, GeoPoint: nil}) } else if location, ok := locationMap[event.PublicIpAddress]; ok { //nil this as it is not needed, we already have event.publicIpAddress location.Query = "" @@ -323,12 +325,14 @@ func queryFetcher(query config.FFSQuery, inProgressQueries *[]eventOutput.InProg Lat: location.Lat, Lon: location.Lon, } - ffsEvents = append(ffsEvents,eventOutput.FFSEvent{FileEvent: event, Location: location, GeoPoint: &geoPoint}) + ffsEvents = append(ffsEvents,eventOutput.FFSEvent{FileEvent: event, Location: &location, GeoPoint: &geoPoint}) } else { b, _ := json.Marshal(event) log.Println("error getting location for fileEvent: " + string(b)) panic("Unable to find location which should exist.") } + } else { + ffsEvents = append(ffsEvents,eventOutput.FFSEvent{FileEvent: event, Location: nil, GeoPoint: nil}) } defer eventWg.Done() } @@ -442,68 +446,70 @@ func queryFetcher(query config.FFSQuery, inProgressQueries *[]eventOutput.InProg var elasticFFSEvent eventOutput.ElasticFFSEvent var geoPoint eventOutput.GeoPoint var geoip eventOutput.Geoip - if ffsEvent.Location.Lat != 0 && ffsEvent.Location.Lon != 0 { - geoPoint = eventOutput.GeoPoint{ - Lat: ffsEvent.Location.Lat, - Lon: ffsEvent.Location.Lon, - } + if ffsEvent.Location != nil { + if ffsEvent.Location.Lat != 0 && ffsEvent.Location.Lon != 0 { + geoPoint = eventOutput.GeoPoint{ + Lat: ffsEvent.Location.Lat, + Lon: ffsEvent.Location.Lon, + } - geoip = eventOutput.Geoip{ - Status: ffsEvent.Location.Status, - Message: ffsEvent.Location.Message, - Continent: ffsEvent.Location.Continent, - ContinentCode: ffsEvent.Location.ContinentCode, - Country: ffsEvent.Location.Country, - CountryCode: ffsEvent.Location.CountryCode, - Region: ffsEvent.Location.Region, - RegionName: ffsEvent.Location.RegionName, - City: ffsEvent.Location.City, - District: ffsEvent.Location.District, - ZIP: ffsEvent.Location.ZIP, - Lat: ffsEvent.Location.Lat, - Lon: ffsEvent.Location.Lon, - Timezone: ffsEvent.Location.Timezone, - Currency: ffsEvent.Location.Currency, - ISP: ffsEvent.Location.ISP, - Org: ffsEvent.Location.Org, - AS: ffsEvent.Location.AS, - ASName: ffsEvent.Location.ASName, - Reverse: ffsEvent.Location.Reverse, - Mobile: ffsEvent.Location.Mobile, - Proxy: ffsEvent.Location.Proxy, - Query: ffsEvent.Location.Query, - GeoPoint: &geoPoint, - } - } else { - geoip = eventOutput.Geoip{ - Status: ffsEvent.Location.Status, - Message: ffsEvent.Location.Message, - Continent: ffsEvent.Location.Continent, - ContinentCode: ffsEvent.Location.ContinentCode, - Country: ffsEvent.Location.Country, - CountryCode: ffsEvent.Location.CountryCode, - Region: ffsEvent.Location.Region, - RegionName: ffsEvent.Location.RegionName, - City: ffsEvent.Location.City, - District: ffsEvent.Location.District, - ZIP: ffsEvent.Location.ZIP, - Lat: ffsEvent.Location.Lat, - Lon: ffsEvent.Location.Lon, - Timezone: ffsEvent.Location.Timezone, - Currency: ffsEvent.Location.Currency, - ISP: ffsEvent.Location.ISP, - Org: ffsEvent.Location.Org, - AS: ffsEvent.Location.AS, - ASName: ffsEvent.Location.ASName, - Reverse: ffsEvent.Location.Reverse, - Mobile: ffsEvent.Location.Mobile, - Proxy: ffsEvent.Location.Proxy, - Query: ffsEvent.Location.Query, - GeoPoint: nil, + geoip = eventOutput.Geoip{ + Status: ffsEvent.Location.Status, + Message: ffsEvent.Location.Message, + Continent: ffsEvent.Location.Continent, + ContinentCode: ffsEvent.Location.ContinentCode, + Country: ffsEvent.Location.Country, + CountryCode: ffsEvent.Location.CountryCode, + Region: ffsEvent.Location.Region, + RegionName: ffsEvent.Location.RegionName, + City: ffsEvent.Location.City, + District: ffsEvent.Location.District, + ZIP: ffsEvent.Location.ZIP, + Lat: ffsEvent.Location.Lat, + Lon: ffsEvent.Location.Lon, + Timezone: ffsEvent.Location.Timezone, + Currency: ffsEvent.Location.Currency, + ISP: ffsEvent.Location.ISP, + Org: ffsEvent.Location.Org, + AS: ffsEvent.Location.AS, + ASName: ffsEvent.Location.ASName, + Reverse: ffsEvent.Location.Reverse, + Mobile: ffsEvent.Location.Mobile, + Proxy: ffsEvent.Location.Proxy, + Query: ffsEvent.Location.Query, + GeoPoint: &geoPoint, + } + } else { + geoip = eventOutput.Geoip{ + Status: ffsEvent.Location.Status, + Message: ffsEvent.Location.Message, + Continent: ffsEvent.Location.Continent, + ContinentCode: ffsEvent.Location.ContinentCode, + Country: ffsEvent.Location.Country, + CountryCode: ffsEvent.Location.CountryCode, + Region: ffsEvent.Location.Region, + RegionName: ffsEvent.Location.RegionName, + City: ffsEvent.Location.City, + District: ffsEvent.Location.District, + ZIP: ffsEvent.Location.ZIP, + Lat: ffsEvent.Location.Lat, + Lon: ffsEvent.Location.Lon, + Timezone: ffsEvent.Location.Timezone, + Currency: ffsEvent.Location.Currency, + ISP: ffsEvent.Location.ISP, + Org: ffsEvent.Location.Org, + AS: ffsEvent.Location.AS, + ASName: ffsEvent.Location.ASName, + Reverse: ffsEvent.Location.Reverse, + Mobile: ffsEvent.Location.Mobile, + Proxy: ffsEvent.Location.Proxy, + Query: ffsEvent.Location.Query, + GeoPoint: nil, + } } } - if ffsEvent.Location.Status == "" { + if ffsEvent.Location != nil && ffsEvent.Location.Status == "" { elasticFFSEvent = eventOutput.ElasticFFSEvent{ FileEvent: elasticFileEvent, Geoip: nil, From df09fd0537756fd8d49b76de0919ba5b4dd41bdb Mon Sep 17 00:00:00 2001 From: BenB196 Date: Tue, 15 Oct 2019 20:35:33 -0400 Subject: [PATCH 4/6] Added support for outputing a semi elastic standardized format Resolved #58 --- README.md | 4 +- config/configReader.go | 7 +- eventOutput/fileHandler.go | 67 ++++++++- ffsEvent/ffsEvent.go | 272 ++++++++++++++++++++++++++++++++----- 4 files changed, 311 insertions(+), 39 deletions(-) diff --git a/README.md b/README.md index b09b299..0478126 100644 --- a/README.md +++ b/README.md @@ -133,7 +133,7 @@ Currently, only JSON formatted configuration files are accepted, in the future Y "logstash": { #Logstash output "logstashURL": "192.168.1.105:8080" #Address of logstash } - "esStandardized": true, #esStandardized This allows for the output to be formatted in an elastic standardized output + "esStandardized": "", #esStandardized This allows for the output to be formatted in standard Crashplan FFS (""), Semi Elastic Standard ("half"), or full Elastic Standard ("full") "validIpAddressesOnly": true #Setting this to true makes the private IP Addresses valid. By default Crashplan FFS provides invalid private IP addresses. }, { @@ -211,7 +211,7 @@ If you are using the elastic output type there are a few important things to und 1. bestCompression 1. refreshInterval 1. aliases -1. If you use the esStandardized output, there is currently no build in template for this. Therefore you need to provide an index template on the elasticsearch side. +1. If you use the esStandardized output (half or full), there is currently no build in template for this. Therefore you need to provide an index template on the elasticsearch side. ### Logstash Integration diff --git a/config/configReader.go b/config/configReader.go index c01531d..aebf922 100644 --- a/config/configReader.go +++ b/config/configReader.go @@ -35,7 +35,7 @@ type FFSQuery struct { IPAPI IPAPI `json:"ip-api,omitempty"` Elasticsearch Elasticsearch `json:"elasticsearch,omitempty"` Logstash Logstash `json:"logstash,omitempty"` - EsStandardized bool `json:"esStandardized,omitempty"` + EsStandardized string `json:"esStandardized,omitempty"` ValidIpAddressesOnly bool `json:"validIpAddressesOnly"` } @@ -425,6 +425,11 @@ func validateConfigJson(fileBytes []byte) (Config, error) { } } + //validate esStandardized + if query.EsStandardized != "" && !strings.EqualFold(query.EsStandardized,"full") && !strings.EqualFold(query.EsStandardized,"half") { + return config, errors.New("unknown value for esStandardized, values can either be full, half, or \"\"") + } + //Validate ip-api if query.IPAPI != (IPAPI{}) && query.IPAPI.Enabled { diff --git a/eventOutput/fileHandler.go b/eventOutput/fileHandler.go index 153a0cd..0f29b97 100644 --- a/eventOutput/fileHandler.go +++ b/eventOutput/fileHandler.go @@ -20,19 +20,70 @@ type FFSEvent struct { *GeoPoint `json:"geoPoint,omitempty"` } +type SemiElasticFFSEvent struct { + FileEvent SemiElasticFileEvent `json:"file_event"` + Geoip *Geoip `json:"geoip,omitempty"` +} + +type SemiElasticFileEvent struct { + EventId string `json:"event_id"` + EventType string `json:"event_type"` + EventTimestamp *time.Time `json:"event_timestamp,omitempty"` + InsertionTimestamp *time.Time `json:"insertion_timestamp,omitempty"` + FilePath string `json:"file_path,omitempty"` + FileName string `json:"file_name"` + FileType string `json:"file_type,omitempty"` + FileCategory string `json:"file_category,omitempty"` + FileSize *int `json:"file_size"` + FileOwner []string `json:"file_owner,omitempty"` //Array of owners + Md5Checksum string `json:"md5_checksum,omitempty"` + Sha256Checksum string `json:"sha256_checksum,omitempty"` + CreatedTimestamp *time.Time `json:"created_timestamp,omitempty"` + ModifyTimestamp *time.Time `json:"modify_timestamp,omitempty"` + DeviceUsername string `json:"device_username,omitempty"` + DeviceUid string `json:"device_uid,omitempty"` + UserUid string `json:"user_uid,omitempty"` + OsHostname string `json:"os_hostname,omitempty"` + DomainName string `json:"domain_name,omitempty"` + PublicIpAddress string `json:"public_ip_address,omitempty"` + PrivateIpAddresses []string `json:"private_ip_addresses,omitempty"` //Array of IP address strings + Actor string `json:"actor,omitempty"` + DirectoryId []string `json:"directory_id,omitempty"` //An array of something, I am not sure + Source string `json:"source,omitempty"` + Url string `json:"url,omitempty"` + Shared string `json:"shared,omitempty"` + SharedWith []string `json:"shared_with,omitempty"` //An array of strings (Mainly Email Addresses) + SharingTypeAdded []string `json:"sharing_type_added,omitempty"` + CloudDriveId string `json:"cloud_drive_id,omitempty"` + DetectionSourceAlias string `json:"detection_source_alias,omitempty"` + FileId string `json:"file_id,omitempty"` + Exposure []string `json:"exposure,omitempty"` + ProcessOwner string `json:"process_owner,omitempty"` + ProcessName string `json:"process_name,omitempty"` + RemovableMediaVendor string `json:"removable_media_vendor,omitempty"` + RemovableMediaName string `json:"removable_media_name,omitempty"` + RemovableMediaSerialNumber string `json:"removable_media_serial_number,omitempty"` + RemovableMediaCapacity *int `json:"removable_media_capacity,omitempty"` + RemovableMediaBusType string `json:"removable_media_bus_type,omitempty"` + RemovableMediaMediaName string `json:"removable_media_media_name,omitempty"` + RemovableMediaVolumeName string `json:"removable_media_volume_name,omitempty"` + RemovableMediaPartitionId string `json:"removable_media_partition_id,omitempty"` + SyncDestination string `json:"sync_destination,omitempty"` +} + type ElasticFFSEvent struct { FileEvent ElasticFileEvent `json:"file_event"` - Geoip *Geoip `json:"geoip"` + Geoip *Geoip `json:"geoip,omitempty"` } type ElasticFileEvent struct { Event *Event `json:"event,omitempty"` Insertion *Insertion `json:"insertion,omitempty"` - File *File `json:"file"` - Device *Device `json:"device"` - Cloud *Cloud `json:"cloud"` + File *File `json:"file,omitempty"` + Device *Device `json:"device,omitempty"` + Cloud *Cloud `json:"cloud,omitempty"` Exposure []string `json:"exposure,omitempty"` - Process *Process `json:"process"` + Process *Process `json:"process,omitempty"` RemovableMedia *RemovableMedia `json:"removable_media,omitempty"` SyncDestination string `json:"sync_destination,omitempty"` } @@ -195,6 +246,12 @@ func WriteEvents (ffsEvents interface{}, query config.FFSQuery) error { return errors.New("error: flushing file: " + fileName + " " + err.Error()) } + err = file.Sync() + + if err != nil { + return errors.New("error: syncing file: " + fileName + " " + err.Error()) + } + return nil } diff --git a/ffsEvent/ffsEvent.go b/ffsEvent/ffsEvent.go index 69c93d0..4241cfb 100644 --- a/ffsEvent/ffsEvent.go +++ b/ffsEvent/ffsEvent.go @@ -232,8 +232,6 @@ func queryFetcher(query config.FFSQuery, inProgressQueries *[]eventOutput.InProg fileEvents, err := ffs.GetFileEvents(authData,configuration.FFSURI, query.Query) - log.Println(fileEvents) - if err != nil { log.Println("error getting file events for ffs query: " + query.Name) //check if recoverable errors are thrown @@ -366,7 +364,8 @@ func queryFetcher(query config.FFSQuery, inProgressQueries *[]eventOutput.InProg //remap ffsEvents to ElasticFFSEvent var elasticFFSEvents []eventOutput.ElasticFFSEvent - if query.EsStandardized { + var semiElasticFFSEvents []eventOutput.SemiElasticFFSEvent + if query.EsStandardized != "" && strings.EqualFold(query.EsStandardized,"full") { var remapWg sync.WaitGroup remapWg.Add(len(ffsEvents)) go func() { @@ -526,14 +525,150 @@ func queryFetcher(query config.FFSQuery, inProgressQueries *[]eventOutput.InProg } }() remapWg.Wait() + } else if query.EsStandardized != "" && strings.EqualFold(query.EsStandardized,"half") { + var remapWg sync.WaitGroup + remapWg.Add(len(ffsEvents)) + go func() { + for _, ffsEvent := range ffsEvents { + semiElasticFileEvent := eventOutput.SemiElasticFileEvent{ + EventId: ffsEvent.EventId, + EventType: ffsEvent.EventType, + EventTimestamp: ffsEvent.EventTimestamp, + InsertionTimestamp: ffsEvent.InsertionTimestamp, + FilePath: ffsEvent.FilePath, + FileName: ffsEvent.FileName, + FileType: ffsEvent.FileType, + FileCategory: ffsEvent.FileCategory, + FileSize: ffsEvent.FileSize, + FileOwner: ffsEvent.FileOwner, + Md5Checksum: ffsEvent.Md5Checksum, + Sha256Checksum: ffsEvent.Sha256Checksum, + CreatedTimestamp: ffsEvent.CreatedTimestamp, + ModifyTimestamp: ffsEvent.ModifyTimestamp, + DeviceUsername: ffsEvent.DeviceUsername, + DeviceUid: ffsEvent.DeviceUid, + UserUid: ffsEvent.UserUid, + OsHostname: ffsEvent.OsHostname, + DomainName: ffsEvent.DomainName, + PublicIpAddress: ffsEvent.PublicIpAddress, + PrivateIpAddresses: ffsEvent.PrivateIpAddresses, + Actor: ffsEvent.Actor, + DirectoryId: ffsEvent.DirectoryId, + Source: ffsEvent.Source, + Url: ffsEvent.Url, + Shared: ffsEvent.Shared, + SharedWith: ffsEvent.SharedWith, + SharingTypeAdded: ffsEvent.SharingTypeAdded, + CloudDriveId: ffsEvent.CloudDriveId, + DetectionSourceAlias: ffsEvent.DetectionSourceAlias, + FileId: ffsEvent.FileId, + Exposure: ffsEvent.Exposure, + ProcessOwner: ffsEvent.ProcessOwner, + ProcessName: ffsEvent.ProcessName, + RemovableMediaVendor: ffsEvent.RemovableMediaVendor, + RemovableMediaName: ffsEvent.RemovableMediaName, + RemovableMediaSerialNumber: ffsEvent.RemovableMediaSerialNumber, + RemovableMediaCapacity: ffsEvent.RemovableMediaCapacity, + RemovableMediaBusType: ffsEvent.RemovableMediaBusType, + RemovableMediaMediaName: ffsEvent.RemovableMediaMediaName, + RemovableMediaVolumeName: ffsEvent.RemovableMediaVolumeName, + RemovableMediaPartitionId: ffsEvent.RemovableMediaPartitionId, + SyncDestination: ffsEvent.SyncDestination, + } + + var semiElasticFFSEvent eventOutput.SemiElasticFFSEvent + var geoPoint eventOutput.GeoPoint + var geoip eventOutput.Geoip + if ffsEvent.Location != nil { + if ffsEvent.Location.Lat != 0 && ffsEvent.Location.Lon != 0 { + geoPoint = eventOutput.GeoPoint{ + Lat: ffsEvent.Location.Lat, + Lon: ffsEvent.Location.Lon, + } + + geoip = eventOutput.Geoip{ + Status: ffsEvent.Location.Status, + Message: ffsEvent.Location.Message, + Continent: ffsEvent.Location.Continent, + ContinentCode: ffsEvent.Location.ContinentCode, + Country: ffsEvent.Location.Country, + CountryCode: ffsEvent.Location.CountryCode, + Region: ffsEvent.Location.Region, + RegionName: ffsEvent.Location.RegionName, + City: ffsEvent.Location.City, + District: ffsEvent.Location.District, + ZIP: ffsEvent.Location.ZIP, + Lat: ffsEvent.Location.Lat, + Lon: ffsEvent.Location.Lon, + Timezone: ffsEvent.Location.Timezone, + Currency: ffsEvent.Location.Currency, + ISP: ffsEvent.Location.ISP, + Org: ffsEvent.Location.Org, + AS: ffsEvent.Location.AS, + ASName: ffsEvent.Location.ASName, + Reverse: ffsEvent.Location.Reverse, + Mobile: ffsEvent.Location.Mobile, + Proxy: ffsEvent.Location.Proxy, + Query: ffsEvent.Location.Query, + GeoPoint: &geoPoint, + } + } else { + geoip = eventOutput.Geoip{ + Status: ffsEvent.Location.Status, + Message: ffsEvent.Location.Message, + Continent: ffsEvent.Location.Continent, + ContinentCode: ffsEvent.Location.ContinentCode, + Country: ffsEvent.Location.Country, + CountryCode: ffsEvent.Location.CountryCode, + Region: ffsEvent.Location.Region, + RegionName: ffsEvent.Location.RegionName, + City: ffsEvent.Location.City, + District: ffsEvent.Location.District, + ZIP: ffsEvent.Location.ZIP, + Lat: ffsEvent.Location.Lat, + Lon: ffsEvent.Location.Lon, + Timezone: ffsEvent.Location.Timezone, + Currency: ffsEvent.Location.Currency, + ISP: ffsEvent.Location.ISP, + Org: ffsEvent.Location.Org, + AS: ffsEvent.Location.AS, + ASName: ffsEvent.Location.ASName, + Reverse: ffsEvent.Location.Reverse, + Mobile: ffsEvent.Location.Mobile, + Proxy: ffsEvent.Location.Proxy, + Query: ffsEvent.Location.Query, + GeoPoint: nil, + } + } + } + + if ffsEvent.Location != nil && ffsEvent.Location.Status == "" { + semiElasticFFSEvent = eventOutput.SemiElasticFFSEvent{ + FileEvent: semiElasticFileEvent, + Geoip: nil, + } + } else { + semiElasticFFSEvent = eventOutput.SemiElasticFFSEvent{ + FileEvent: semiElasticFileEvent, + Geoip: &geoip, + } + } + + semiElasticFFSEvents = append(semiElasticFFSEvents, semiElasticFFSEvent) + remapWg.Done() + } + }() + remapWg.Wait() } switch query.OutputType { case "file": - if query.EsStandardized { - err = eventOutput.WriteEvents(elasticFFSEvents, query) - } else { + if query.EsStandardized == "" { err = eventOutput.WriteEvents(ffsEvents, query) + } else if query.EsStandardized == "full" { + err = eventOutput.WriteEvents(elasticFFSEvents, query) + } else if query.EsStandardized == "half" { + err = eventOutput.WriteEvents(semiElasticFFSEvents, query) } if err != nil { @@ -585,7 +720,16 @@ func queryFetcher(query config.FFSQuery, inProgressQueries *[]eventOutput.InProg panic("elasticsearch index creation failed for: " + indexName) } } - if query.EsStandardized { + if query.EsStandardized == "" { + elasticWg.Add(len(ffsEvents)) + go func() { + for _, ffsEvent := range ffsEvents { + r := elastic.NewBulkIndexRequest().Index(indexName).Doc(ffsEvent) + processor.Add(r) + elasticWg.Done() + } + }() + } else if query.EsStandardized == "full" { elasticWg.Add(len(elasticFFSEvents)) go func() { for _, elasticFileEvent := range elasticFFSEvents { @@ -594,11 +738,11 @@ func queryFetcher(query config.FFSQuery, inProgressQueries *[]eventOutput.InProg elasticWg.Done() } }() - } else { - elasticWg.Add(len(ffsEvents)) + } else if query.EsStandardized == "half" { + elasticWg.Add(len(semiElasticFFSEvents)) go func() { - for _, ffsEvent := range ffsEvents { - r := elastic.NewBulkIndexRequest().Index(indexName).Doc(ffsEvent) + for _, elasticFileEvent := range semiElasticFFSEvents { + r := elastic.NewBulkIndexRequest().Index(indexName).Doc(elasticFileEvent) processor.Add(r) elasticWg.Done() } @@ -618,10 +762,29 @@ func queryFetcher(query config.FFSQuery, inProgressQueries *[]eventOutput.InProg //create map of indexes required var requiredIndexTimestamps = map[time.Time]interface{}{} var requiredIndexMutex = sync.RWMutex{} - if query.EsStandardized { + if query.EsStandardized == "" { + elasticWg.Add(len(ffsEvents)) + go func() { + for _, ffsEvent := range ffsEvents { + var indexTime time.Time + if query.Elasticsearch.IndexTimeGen == "insertionTimestamp" { + indexTime, _ = time.Parse(query.Elasticsearch.IndexTimeAppend,ffsEvent.EventTimestamp.Format(query.Elasticsearch.IndexTimeAppend)) + } else { + indexTime, _ = time.Parse(query.Elasticsearch.IndexTimeAppend,ffsEvent.EventTimestamp.Format(query.Elasticsearch.IndexTimeAppend)) + } + + requiredIndexMutex.RLock() + if _, found := requiredIndexTimestamps[indexTime]; !found { + requiredIndexTimestamps[indexTime] = nil + } + requiredIndexMutex.RUnlock() + elasticWg.Done() + } + }() + } else if query.EsStandardized == "full" { elasticWg.Add(len(elasticFFSEvents)) go func() { - for _, elasticFileEvent :=range elasticFFSEvents { + for _, elasticFileEvent := range elasticFFSEvents { var indexTime time.Time if query.Elasticsearch.IndexTimeGen == "insertionTimestamp" { indexTime, _ = time.Parse(query.Elasticsearch.IndexTimeAppend,elasticFileEvent.FileEvent.Event.EventTimestamp.Format(query.Elasticsearch.IndexTimeAppend)) @@ -637,15 +800,15 @@ func queryFetcher(query config.FFSQuery, inProgressQueries *[]eventOutput.InProg elasticWg.Done() } }() - } else { - elasticWg.Add(len(ffsEvents)) + } else if query.EsStandardized == "half" { + elasticWg.Add(len(semiElasticFFSEvents)) go func() { - for _, ffsEvent :=range ffsEvents { + for _, elasticFileEvent := range semiElasticFFSEvents { var indexTime time.Time if query.Elasticsearch.IndexTimeGen == "insertionTimestamp" { - indexTime, _ = time.Parse(query.Elasticsearch.IndexTimeAppend,ffsEvent.EventTimestamp.Format(query.Elasticsearch.IndexTimeAppend)) + indexTime, _ = time.Parse(query.Elasticsearch.IndexTimeAppend,elasticFileEvent.FileEvent.EventTimestamp.Format(query.Elasticsearch.IndexTimeAppend)) } else { - indexTime, _ = time.Parse(query.Elasticsearch.IndexTimeAppend,ffsEvent.EventTimestamp.Format(query.Elasticsearch.IndexTimeAppend)) + indexTime, _ = time.Parse(query.Elasticsearch.IndexTimeAppend,elasticFileEvent.FileEvent.EventTimestamp.Format(query.Elasticsearch.IndexTimeAppend)) } requiredIndexMutex.RLock() @@ -703,7 +866,23 @@ func queryFetcher(query config.FFSQuery, inProgressQueries *[]eventOutput.InProg elasticWg.Wait() //build bulk request - if query.EsStandardized { + if query.EsStandardized == "" { + elasticWg.Add(len(ffsEvents)) + go func() { + for _, ffsEvent := range ffsEvents { + var indexTime time.Time + if query.Elasticsearch.IndexTimeGen == "insertionTimestamp" { + indexTime, _ = time.Parse(query.Elasticsearch.IndexTimeAppend,ffsEvent.InsertionTimestamp.Format(query.Elasticsearch.IndexTimeAppend)) + } else { + indexTime, _ = time.Parse(query.Elasticsearch.IndexTimeAppend,ffsEvent.EventTimestamp.Format(query.Elasticsearch.IndexTimeAppend)) + } + indexName := elasticsearch.BuildIndexNameWithTime(query.Elasticsearch,indexTime) + r := elastic.NewBulkIndexRequest().Index(indexName).Doc(ffsEvent) + processor.Add(r) + elasticWg.Done() + } + }() + } else if query.EsStandardized == "full" { elasticWg.Add(len(elasticFFSEvents)) go func() { for _, elasticFileEvent := range elasticFFSEvents { @@ -719,18 +898,18 @@ func queryFetcher(query config.FFSQuery, inProgressQueries *[]eventOutput.InProg elasticWg.Done() } }() - } else { - elasticWg.Add(len(ffsEvents)) + } else if query.EsStandardized == "half" { + elasticWg.Add(len(semiElasticFFSEvents)) go func() { - for _, ffsEvent := range ffsEvents { + for _, elasticFileEvent := range semiElasticFFSEvents { var indexTime time.Time if query.Elasticsearch.IndexTimeGen == "insertionTimestamp" { - indexTime, _ = time.Parse(query.Elasticsearch.IndexTimeAppend,ffsEvent.InsertionTimestamp.Format(query.Elasticsearch.IndexTimeAppend)) + indexTime, _ = time.Parse(query.Elasticsearch.IndexTimeAppend,elasticFileEvent.FileEvent.InsertionTimestamp.Format(query.Elasticsearch.IndexTimeAppend)) } else { - indexTime, _ = time.Parse(query.Elasticsearch.IndexTimeAppend,ffsEvent.EventTimestamp.Format(query.Elasticsearch.IndexTimeAppend)) + indexTime, _ = time.Parse(query.Elasticsearch.IndexTimeAppend,elasticFileEvent.FileEvent.EventTimestamp.Format(query.Elasticsearch.IndexTimeAppend)) } indexName := elasticsearch.BuildIndexNameWithTime(query.Elasticsearch,indexTime) - r := elastic.NewBulkIndexRequest().Index(indexName).Doc(ffsEvent) + r := elastic.NewBulkIndexRequest().Index(indexName).Doc(elasticFileEvent) processor.Add(r) elasticWg.Done() } @@ -767,7 +946,38 @@ func queryFetcher(query config.FFSQuery, inProgressQueries *[]eventOutput.InProg writer := bufio.NewWriter(conn) - if query.EsStandardized { + if query.EsStandardized == "" { + logstashWg.Add(len(ffsEvents)) + go func() { + for _, ffsEvent := range ffsEvents { + event, err := json.Marshal(ffsEvent) + + if err != nil { + //TODO handle error + log.Println("error marshaling ffs event") + log.Println(ffsEvent) + panic(err) + } + + _, err = writer.Write(event) + + if err != nil { + //TODO handle error + log.Println("error writing ffs event") + log.Println(string(event)) + panic(err) + } + _, err = writer.Write([]byte("\n")) + if err != nil { + //TODO handle error + log.Println("error writing ffs event") + log.Println(string(event)) + panic(err) + } + logstashWg.Done() + } + }() + } else if query.EsStandardized == "full" { logstashWg.Add(len(elasticFFSEvents)) go func() { for _, elasticFileEvent := range elasticFFSEvents { @@ -798,16 +1008,16 @@ func queryFetcher(query config.FFSQuery, inProgressQueries *[]eventOutput.InProg logstashWg.Done() } }() - } else { - logstashWg.Add(len(ffsEvents)) + } else if query.EsStandardized == "half" { + logstashWg.Add(len(semiElasticFFSEvents)) go func() { - for _, ffsEvent := range ffsEvents { - event, err := json.Marshal(ffsEvent) + for _, elasticFileEvent := range semiElasticFFSEvents { + event, err := json.Marshal(elasticFileEvent) if err != nil { //TODO handle error log.Println("error marshaling ffs event") - log.Println(ffsEvent) + log.Println(elasticFileEvent) panic(err) } From d17c64de5a777c765811cd217f38546c39d4da10 Mon Sep 17 00:00:00 2001 From: BenB196 Date: Tue, 15 Oct 2019 20:54:57 -0400 Subject: [PATCH 5/6] Changed how logstash writing works Resolved issue #60 --- elasticsearch/logstash.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/elasticsearch/logstash.go b/elasticsearch/logstash.go index 6c4766e..101417c 100644 --- a/elasticsearch/logstash.go +++ b/elasticsearch/logstash.go @@ -2,6 +2,7 @@ package elasticsearch import ( "net" + "time" ) func CreateLogstashClient(logstashURL string) (net.Conn,error) { @@ -10,13 +11,12 @@ func CreateLogstashClient(logstashURL string) (net.Conn,error) { if err != nil { return nil, err } - - connection, err := net.DialTCP("tcp",nil,tcpAddr) - - if err != nil { - return nil, err + + d := net.Dialer{ + Timeout: 30 * time.Second, } - err = connection.SetWriteBuffer(100000) + + connection, err := d.Dial("tcp", tcpAddr.String()) if err != nil { return nil, err From faf4205be2fdf3b95713d7944d3b88f67cf11d16 Mon Sep 17 00:00:00 2001 From: BenB196 Date: Tue, 15 Oct 2019 20:56:08 -0400 Subject: [PATCH 6/6] Bumped version --- VERSION | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/VERSION b/VERSION index a1e1395..84aa3a7 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.1.7 \ No newline at end of file +0.1.8 \ No newline at end of file