From 2cab0c424e3d7fde15aa1912e010ab4c92b48bd4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Duarte?= Date: Wed, 29 May 2024 11:45:18 +0100 Subject: [PATCH 01/27] Add cursor to follow new data on an index through a tracked field --- lib/logstash/inputs/elasticsearch.rb | 49 ++++++++++++++++- .../inputs/elasticsearch/aggregation.rb | 18 ++++--- .../inputs/elasticsearch/cursor_tracker.rb | 54 +++++++++++++++++++ .../inputs/elasticsearch/paginated_search.rb | 10 ++-- 4 files changed, 118 insertions(+), 13 deletions(-) create mode 100644 lib/logstash/inputs/elasticsearch/cursor_tracker.rb diff --git a/lib/logstash/inputs/elasticsearch.rb b/lib/logstash/inputs/elasticsearch.rb index c9d8b552..b47c3f37 100644 --- a/lib/logstash/inputs/elasticsearch.rb +++ b/lib/logstash/inputs/elasticsearch.rb @@ -73,6 +73,7 @@ class LogStash::Inputs::Elasticsearch < LogStash::Inputs::Base require 'logstash/inputs/elasticsearch/paginated_search' require 'logstash/inputs/elasticsearch/aggregation' + require 'logstash/inputs/elasticsearch/cursor_tracker' include LogStash::PluginMixins::ECSCompatibilitySupport(:disabled, :v1, :v8 => :v1) include LogStash::PluginMixins::ECSCompatibilitySupport::TargetCheck @@ -124,6 +125,22 @@ class LogStash::Inputs::Elasticsearch < LogStash::Inputs::Base # by this pipeline input. config :slices, :validate => :number + # Enable tracking the value of a given field to be used as a cursor + # TODO: main concerns + # * schedule overlap needs to be disabled (hardcoded as enabled) + # * using anything other than _event.timestamp easily leads to data loss + # * the first "synchronization run can take a long time" + # * checkpointing is only safe to do after each run (not per document) + config :tracking_field, :validate => :string + + # Define the initial seed value of the tracking_field + config :tracking_field_seed, :validate => :string + + # The location of where the tracking field value will be stored + # The value is persisted after each scheduled run (and not per result) + # If it's not set it defaults to '${path.data}/plugins/inputs/elasticsearch/last_run_value' + config :last_run_metadata_path, :validate => :string + # If set, include Elasticsearch document information such as index, type, and # the id in the event. # @@ -250,6 +267,10 @@ class LogStash::Inputs::Elasticsearch < LogStash::Inputs::Base # exactly once. config :schedule, :validate => :string + # Allow scheduled runs to overlap (enabled by default). Setting to false will + # only start a new scheduled run after the previous one completes. + config :schedule_overlap, :validate => :string + # If set, the _source of each hit will be added nested under the target instead of at the top-level config :target, :validate => :field_reference @@ -328,18 +349,30 @@ def register setup_query_executor + setup_cursor_tracker + @client end def run(output_queue) if @schedule - scheduler.cron(@schedule) { @query_executor.do_run(output_queue) } + scheduler.cron(@schedule, :overlap => @schedule_overlap) do + @query_executor.do_run(output_queue, get_query_object()) + @cursor_tracker.checkpoint_cursor + end scheduler.join else - @query_executor.do_run(output_queue) + @query_executor.do_run(output_queue, get_query_object()) + @cursor_tracker.checkpoint_cursor end end + def get_query_object + injected_query = @cursor_tracker.inject_cursor(@query) + @logger.debug("new query is #{injected_query}") + query_object = LogStash::Json.load(injected_query) + end + ## # This can be called externally from the query_executor public @@ -347,6 +380,7 @@ def push_hit(hit, output_queue, root_field = '_source') event = event_from_hit(hit, root_field) decorate(event) output_queue << event + @cursor_tracker.record_last_value(event) end def event_from_hit(hit, root_field) @@ -654,6 +688,17 @@ def get_transport_client_class ::Elastic::Transport::Transport::HTTP::Manticore end + def setup_cursor_tracker + if @tracking_field + @tracking_field_seed ||= Time.now.utc.iso8601 + @cursor_tracker = CursorTracker.new(last_run_metadata_path: @last_run_metadata_path, + tracking_field: @tracking_field, + tracking_field_seed: @tracking_field_seed) + else + @cursor_tracker = NoopCursorTracker.new + end + end + module URIOrEmptyValidator ## # @override to provide :uri_or_empty validator diff --git a/lib/logstash/inputs/elasticsearch/aggregation.rb b/lib/logstash/inputs/elasticsearch/aggregation.rb index e74a4357..b8f43897 100644 --- a/lib/logstash/inputs/elasticsearch/aggregation.rb +++ b/lib/logstash/inputs/elasticsearch/aggregation.rb @@ -13,13 +13,7 @@ def initialize(client, plugin) @plugin_params = plugin.params @size = @plugin_params["size"] - @query = @plugin_params["query"] @retries = @plugin_params["retries"] - @agg_options = { - :index => @plugin_params["index"], - :size => 0 - }.merge(:body => @query) - @plugin = plugin end @@ -33,10 +27,18 @@ def retryable(job_name, &block) false end - def do_run(output_queue) + def aggregation_options(query_object) + { + :index => @index, + :size => 0, + :body => query_object + } + end + + def do_run(output_queue, query_object) logger.info("Aggregation starting") r = retryable(AGGREGATION_JOB) do - @client.search(@agg_options) + @client.search(aggregation_options(query_object)) end @plugin.push_hit(r, output_queue, 'aggregations') if r end diff --git a/lib/logstash/inputs/elasticsearch/cursor_tracker.rb b/lib/logstash/inputs/elasticsearch/cursor_tracker.rb new file mode 100644 index 00000000..33a9411b --- /dev/null +++ b/lib/logstash/inputs/elasticsearch/cursor_tracker.rb @@ -0,0 +1,54 @@ +require 'fileutils' + +module LogStash; module Inputs; class Elasticsearch + class NoopCursorTracker + include LogStash::Util::Loggable + def checkpoint_cursor; end + + def converge_last_value; end + + def record_last_value(event); end + + def inject_cursor(query_json); return query_json; end + end + + class CursorTracker + include LogStash::Util::Loggable + + attr_reader :last_value + + def initialize(last_run_metadata_path:, tracking_field:, tracking_field_seed:) + @last_run_metadata_path = last_run_metadata_path + @last_run_metadata_path ||= ::File.join(LogStash::SETTINGS.get_value("path.data"), "plugins", "inputs", "elasticsearch", "last_run_value") + FileUtils.mkdir_p ::File.dirname(@last_run_metadata_path) + @last_value_hashmap = Java::java.util.concurrent.ConcurrentHashMap.new + @last_value = IO.read(@last_run_metadata_path) rescue nil || tracking_field_seed + @tracking_field = tracking_field + logger.info "Starting value for cursor field \"#{@tracking_field}\": #{@last_value}" + end + + def checkpoint_cursor + converge_last_value + IO.write(@last_run_metadata_path, @last_value) + @last_value_hashmap.clear + end + + def converge_last_value + return if @last_value_hashmap.empty? + new_last_value = @last_value_hashmap.reduceValues(1, lambda { |v1, v2| Time.parse(v1) < Time.parse(v2) ? v2 : v1 }) + return if new_last_value == @last_value + @last_value = new_last_value + logger.info "New cursor value for field \"#{@tracking_field}\" is: #{new_last_value}" + end + + def record_last_value(event) + value = event.get(@tracking_field) + logger.trace? && logger.trace("storing last_value if #{@tracking_field} for #{Thread.current.object_id}: #{value}") + @last_value_hashmap.put(Thread.current.object_id, value) + end + + def inject_cursor(query_json) + query_json.gsub(":last_value", @last_value) + end + end +end; end; end diff --git a/lib/logstash/inputs/elasticsearch/paginated_search.rb b/lib/logstash/inputs/elasticsearch/paginated_search.rb index 2e8236bc..cb1cbdc7 100644 --- a/lib/logstash/inputs/elasticsearch/paginated_search.rb +++ b/lib/logstash/inputs/elasticsearch/paginated_search.rb @@ -21,10 +21,14 @@ def initialize(client, plugin) @pipeline_id = plugin.pipeline_id end - def do_run(output_queue) - return retryable_search(output_queue) if @slices.nil? || @slices <= 1 + def do_run(output_queue, query) + @query = query - retryable_slice_search(output_queue) + if @slices.nil? || @slices <= 1 + retryable_search(output_queue) + else + retryable_slice_search(output_queue) + end end def retryable(job_name, &block) From 1526e9fa7981a82eccd4521497d8bcf9dada2eff Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Duarte?= Date: Wed, 29 May 2024 12:51:56 +0100 Subject: [PATCH 02/27] Update lib/logstash/inputs/elasticsearch/cursor_tracker.rb --- lib/logstash/inputs/elasticsearch/cursor_tracker.rb | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/logstash/inputs/elasticsearch/cursor_tracker.rb b/lib/logstash/inputs/elasticsearch/cursor_tracker.rb index 33a9411b..1ad7b69f 100644 --- a/lib/logstash/inputs/elasticsearch/cursor_tracker.rb +++ b/lib/logstash/inputs/elasticsearch/cursor_tracker.rb @@ -35,6 +35,7 @@ def checkpoint_cursor def converge_last_value return if @last_value_hashmap.empty? + # TODO this implicitly assumes that the way to converge the value among slices is to pick the highest and we can't assume that new_last_value = @last_value_hashmap.reduceValues(1, lambda { |v1, v2| Time.parse(v1) < Time.parse(v2) ? v2 : v1 }) return if new_last_value == @last_value @last_value = new_last_value From 96d229413634c2531ffabd4b34dd0c0a3476c394 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Duarte?= Date: Thu, 6 Mar 2025 10:45:42 +0000 Subject: [PATCH 03/27] Update lib/logstash/inputs/elasticsearch/cursor_tracker.rb Co-authored-by: Joel Andritsch --- lib/logstash/inputs/elasticsearch/cursor_tracker.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/logstash/inputs/elasticsearch/cursor_tracker.rb b/lib/logstash/inputs/elasticsearch/cursor_tracker.rb index 1ad7b69f..055ab227 100644 --- a/lib/logstash/inputs/elasticsearch/cursor_tracker.rb +++ b/lib/logstash/inputs/elasticsearch/cursor_tracker.rb @@ -49,7 +49,7 @@ def record_last_value(event) end def inject_cursor(query_json) - query_json.gsub(":last_value", @last_value) + query_json.gsub(":last_value", @last_value.to_s) end end end; end; end From 64bba492ed9baf4657c80ebe3c0e3322d3994dd3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Duarte?= Date: Fri, 14 Mar 2025 10:26:49 +0000 Subject: [PATCH 04/27] add :present injection to safeguard against PIT right-edge inconsistencies --- lib/logstash/inputs/elasticsearch.rb | 3 +-- lib/logstash/inputs/elasticsearch/cursor_tracker.rb | 9 ++++++--- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/lib/logstash/inputs/elasticsearch.rb b/lib/logstash/inputs/elasticsearch.rb index b47c3f37..327bf7fa 100644 --- a/lib/logstash/inputs/elasticsearch.rb +++ b/lib/logstash/inputs/elasticsearch.rb @@ -127,7 +127,6 @@ class LogStash::Inputs::Elasticsearch < LogStash::Inputs::Base # Enable tracking the value of a given field to be used as a cursor # TODO: main concerns - # * schedule overlap needs to be disabled (hardcoded as enabled) # * using anything other than _event.timestamp easily leads to data loss # * the first "synchronization run can take a long time" # * checkpointing is only safe to do after each run (not per document) @@ -269,7 +268,7 @@ class LogStash::Inputs::Elasticsearch < LogStash::Inputs::Base # Allow scheduled runs to overlap (enabled by default). Setting to false will # only start a new scheduled run after the previous one completes. - config :schedule_overlap, :validate => :string + config :schedule_overlap, :validate => :boolean # If set, the _source of each hit will be added nested under the target instead of at the top-level config :target, :validate => :field_reference diff --git a/lib/logstash/inputs/elasticsearch/cursor_tracker.rb b/lib/logstash/inputs/elasticsearch/cursor_tracker.rb index 055ab227..00c77fb0 100644 --- a/lib/logstash/inputs/elasticsearch/cursor_tracker.rb +++ b/lib/logstash/inputs/elasticsearch/cursor_tracker.rb @@ -35,8 +35,8 @@ def checkpoint_cursor def converge_last_value return if @last_value_hashmap.empty? - # TODO this implicitly assumes that the way to converge the value among slices is to pick the highest and we can't assume that - new_last_value = @last_value_hashmap.reduceValues(1, lambda { |v1, v2| Time.parse(v1) < Time.parse(v2) ? v2 : v1 }) + new_last_value = @last_value_hashmap.reduceValues(1000, lambda { |v1, v2| Java::java.time.Instant.parse(v1).isBefore(Java::java.time.Instant.parse(v2)) ? v2 : v1 }) + logger.trace? && logger.trace("converge_last_value: got #{@last_value_hashmap.values.inspect}. won: #{new_last_value}") return if new_last_value == @last_value @last_value = new_last_value logger.info "New cursor value for field \"#{@tracking_field}\" is: #{new_last_value}" @@ -49,7 +49,10 @@ def record_last_value(event) end def inject_cursor(query_json) - query_json.gsub(":last_value", @last_value.to_s) + # ":present" means "now - 30s" to avoid grabbing partially visible data in the PIT + result = query_json.gsub(":last_value", @last_value.to_s).gsub(":present", Java::java.time.Instant.now.minusSeconds(30).to_s) + logger.debug("inject_cursor: injected values for ':last_value' and ':present'", :query => result) + result end end end; end; end From 50a19b99c954ff1704513d96ec43ee22f5f2f282 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Duarte?= Date: Fri, 14 Mar 2025 13:31:59 +0000 Subject: [PATCH 05/27] revert incorrect changes in aggs --- .../inputs/elasticsearch/aggregation.rb | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/lib/logstash/inputs/elasticsearch/aggregation.rb b/lib/logstash/inputs/elasticsearch/aggregation.rb index b8f43897..e74a4357 100644 --- a/lib/logstash/inputs/elasticsearch/aggregation.rb +++ b/lib/logstash/inputs/elasticsearch/aggregation.rb @@ -13,7 +13,13 @@ def initialize(client, plugin) @plugin_params = plugin.params @size = @plugin_params["size"] + @query = @plugin_params["query"] @retries = @plugin_params["retries"] + @agg_options = { + :index => @plugin_params["index"], + :size => 0 + }.merge(:body => @query) + @plugin = plugin end @@ -27,18 +33,10 @@ def retryable(job_name, &block) false end - def aggregation_options(query_object) - { - :index => @index, - :size => 0, - :body => query_object - } - end - - def do_run(output_queue, query_object) + def do_run(output_queue) logger.info("Aggregation starting") r = retryable(AGGREGATION_JOB) do - @client.search(aggregation_options(query_object)) + @client.search(@agg_options) end @plugin.push_hit(r, output_queue, 'aggregations') if r end From 5df729100a18517d2ed0bb41c821e3c4e1dfa1f7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Duarte?= Date: Fri, 14 Mar 2025 14:32:02 +0000 Subject: [PATCH 06/27] Revert "revert incorrect changes in aggs" This reverts commit baf70654840a4dd742147b14a3173fa75190ea9a. --- .../inputs/elasticsearch/aggregation.rb | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/lib/logstash/inputs/elasticsearch/aggregation.rb b/lib/logstash/inputs/elasticsearch/aggregation.rb index e74a4357..b8f43897 100644 --- a/lib/logstash/inputs/elasticsearch/aggregation.rb +++ b/lib/logstash/inputs/elasticsearch/aggregation.rb @@ -13,13 +13,7 @@ def initialize(client, plugin) @plugin_params = plugin.params @size = @plugin_params["size"] - @query = @plugin_params["query"] @retries = @plugin_params["retries"] - @agg_options = { - :index => @plugin_params["index"], - :size => 0 - }.merge(:body => @query) - @plugin = plugin end @@ -33,10 +27,18 @@ def retryable(job_name, &block) false end - def do_run(output_queue) + def aggregation_options(query_object) + { + :index => @index, + :size => 0, + :body => query_object + } + end + + def do_run(output_queue, query_object) logger.info("Aggregation starting") r = retryable(AGGREGATION_JOB) do - @client.search(@agg_options) + @client.search(aggregation_options(query_object)) end @plugin.push_hit(r, output_queue, 'aggregations') if r end From 2b922c539de1ff0d19bdac6d2e4f1d9df2533e3b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Duarte?= Date: Fri, 14 Mar 2025 14:53:57 +0000 Subject: [PATCH 07/27] fix tests --- lib/logstash/inputs/elasticsearch/aggregation.rb | 1 + spec/inputs/elasticsearch_spec.rb | 6 +++++- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/lib/logstash/inputs/elasticsearch/aggregation.rb b/lib/logstash/inputs/elasticsearch/aggregation.rb index b8f43897..91c12443 100644 --- a/lib/logstash/inputs/elasticsearch/aggregation.rb +++ b/lib/logstash/inputs/elasticsearch/aggregation.rb @@ -12,6 +12,7 @@ def initialize(client, plugin) @client = client @plugin_params = plugin.params + @index = @plugin_params["index"] @size = @plugin_params["size"] @retries = @plugin_params["retries"] @plugin = plugin diff --git a/spec/inputs/elasticsearch_spec.rb b/spec/inputs/elasticsearch_spec.rb index 5067400e..0c07992c 100644 --- a/spec/inputs/elasticsearch_spec.rb +++ b/spec/inputs/elasticsearch_spec.rb @@ -1165,7 +1165,7 @@ def wait_receive_request context "when there's an exception" do before(:each) do - allow(client).to receive(:search).and_raise RuntimeError + allow(client).to receive(:search).and_raise RuntimeError.new("test exception") end it 'produces no events' do plugin.run queue @@ -1310,6 +1310,10 @@ def wait_receive_request let(:mock_queue) { double('queue', :<< => nil) } + before(:each) do + plugin.send(:setup_cursor_tracker) + end + it 'pushes a generated event to the queue' do plugin.send(:push_hit, hit, mock_queue) expect(mock_queue).to have_received(:<<) do |event| From 887463fb0fb090056da8e4bd4b437bacdc360315 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Duarte?= Date: Fri, 14 Mar 2025 16:25:00 +0000 Subject: [PATCH 08/27] add docs --- docs/index.asciidoc | 45 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 45 insertions(+) diff --git a/docs/index.asciidoc b/docs/index.asciidoc index f17c92e3..ee39d45b 100644 --- a/docs/index.asciidoc +++ b/docs/index.asciidoc @@ -126,12 +126,14 @@ Please check out <> for details. | <> |<>|No | <> |<>|No | <> |<>|No +| <> |<>|No | <> |<>|No | <> |<>|No | <> |<>|No | <> |<>, one of `["hits","aggregations"]`|No | <> | <>|No | <> |<>|No +| <> |<>|No | <> |<>|No | <> |<>, one of `["auto", "search_after", "scroll"]`|No | <> |<>|No @@ -151,6 +153,8 @@ Please check out <> for details. | <> |<>, one of `["full", "none"]`|No | <> | <>|No | <> | {logstash-ref}/field-references-deepdive.html[field reference] | No +| <> |<>|No +| <> |<>|No | <> | <>|No | <> |<>|No |======================================================================= @@ -330,6 +334,15 @@ Check out {ref}/api-conventions.html#api-multi-index[Multi Indices documentation] in the Elasticsearch documentation for info on referencing multiple indices. +[id="plugins-{type}s-{plugin}-last_run_metadata_path"] +===== `last_run_metadata_path` + + * Value type is <> + * There is no default value for this setting. + +The path to store the last observed value of the tracking field, when used. +By default this file is stored as `/plugins/inputs/elasticsearch/.last_run`. + [id="plugins-{type}s-{plugin}-password"] ===== `password` @@ -410,6 +423,19 @@ for example: "* * * * *" (execute query every minute, on the minute) There is no schedule by default. If no schedule is given, then the statement is run exactly once. +[id="plugins-{type}s-{plugin}-schedule_overlap"] +===== `schedule_overlap` + + * Value type is <> + * Default value is `true` + +Whether to allow queuing of a scheduled run if a run is occurring. +While this is ideal for ensuring a new run happens immediatelly after the previous on finishes if there +is a lot of work to do, but given the queue is unbounded it may lead to an out of memory over long periods of time +if the queue grows continuously. + +When in doubt, set `schedule_overlap` to false (it may become the default value in the future). + [id="plugins-{type}s-{plugin}-scroll"] ===== `scroll` @@ -622,6 +648,25 @@ When the `target` is set to a field reference, the `_source` of the hit is place This option can be useful to avoid populating unknown fields when a downstream schema such as ECS is enforced. It is also possible to target an entry in the event's metadata, which will be available during event processing but not exported to your outputs (e.g., `target \=> "[@metadata][_source]"`). +[id="plugins-{type}s-{plugin}-tracking_field"] +===== `tracking_field` + +* Value type is <> +* There is no default value for this setting. + +Which field from the last event of a previous run will be used a cursor value for the following run. +The value of this field is injected into each query if the query uses the placeholder `:last_value`. +For the first query after a pipeline is started, the value used is either read from <> file, +or taken from <> setting. + +[id="plugins-{type}s-{plugin}-tracking_field_seed"] +===== `tracking_field_seed` + +* Value type is <> +* There is no default value for this setting. + +The starting value for the <> if there is no <> already. +For a nanosecond timestamp based field, a suggested seed value something like `1980-01-01T23:59:59.999999999Z`. [id="plugins-{type}s-{plugin}-user"] ===== `user` From 30bed1be04b6e853b206314e7e96fedd8c25f84f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Duarte?= Date: Mon, 17 Mar 2025 10:21:48 +0000 Subject: [PATCH 09/27] document the use case for tailing an index --- docs/index.asciidoc | 115 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 115 insertions(+) diff --git a/docs/index.asciidoc b/docs/index.asciidoc index ee39d45b..190ddbd7 100644 --- a/docs/index.asciidoc +++ b/docs/index.asciidoc @@ -49,6 +49,119 @@ This would create an Elasticsearch query with the following format: }' +[id="plugins-{type}s-{plugin}-cursor"] +==== Tracking a field's value across runs + +It is sometimes desirable to track the value of a particular field between two jobs: +* avoid re-processing the entire result set of a long query after an unplanned restart +* only grab new data from an index instead of processing the entire set on each job + +For this, the Elasticsearch input plugin provides the <> and <> options. +When <> is set, the plugin will record the value of that field for the last document retrieved in a run into +a file (location defaults to <>. + +The user can then inject this value in the query using the placeholder `:last_value`. The value will be injected into the query +before execution, and the updated after the query completes, assuming new data was found. + +The plugin also offers another placeholder called `:present` used to inject the nano-second based + +This feature works best when: +* the query sorts by the tracking field +* the field type has enough resolution so that two events are unlikely to have the same value for the field + +A suggestion is to use a tracking field that has nanosecond second precision, like +https://www.elastic.co/guide/en/elasticsearch/reference/current/date_nanos.html[date nanoseconds] field type. + +A good use case for this feature is to track new data in an index, which can be achieved by: + +1. create ingest pipeline that adds Elasticsearch's `_ingest.timestamp` field to the documents as `event.ingested`: + +[source, json] + PUT _ingest/pipeline/my-pipeline + { + "processors": [ + { + "script": { + "lang": "painless", + "source": "ctx.putIfAbsent(\"event\", [:]); ctx.event.ingested = metadata().now.format(DateTimeFormatter.ISO_INSTANT);" + } + } + ] + } + + +2. create an index mapping where the tracking field is of date nanosecond type and invokes the defined pipeline: + +[source, json] + PUT /_template/my_template + { + "index_patterns": ["test-*"], + "settings": { + "index.default_pipeline": "my-pipeline", + }, + "mappings": { + "properties": { + "event": { + "properties": { + "ingested": { + "type": "date_nanos", + "format": "strict_date_optional_time_nanos" + } + } + } + } + } + } + +3. define a query that looks at all data of the indices, sorted by the tracking field, and with a range filter since the last value seen until present: + +[source,json] +{ + "query": { + "range": { + "event.ingested": { + "gt": ":last_value", + "lt": ":present" + } + } + }, + "sort": [ + { + "event.ingested": { + "order": "asc", + "format": "strict_date_optional_time_nanos", + "numeric_type": "date_nanos" + } + } + ] +} + +4. configure the Elasticsearch input to query the indices with the query defined above, every minute, and track the `event.ingested` field: + +[source, ruby] + input { + elasticsearch { + id => tail_test_index + hosts => [ 'https://..'] + api_key => '....' + index => 'test-*' + query => '{ "query": { "range": { "event.ingested": { "gt": ":last_value", "lt": ":present"}}}, "sort": [ { "event.ingested": {"order": "asc", "format": "strict_date_optional_time_nanos", "numeric_type" : "date_nanos" } } ] }' + tracking_field => "[event][ingested]" + # set a seed value to a value known to be older than any value of `event.ingested` + tracking_field_seed => "1980-01-01T23:59:59.999999999Z" + slices => 5 # optional use of slices to speed data processing, should be less than number of primary shards + schedule => '* * * * *' # every minute + schedule_overlap => false # don't accumulate jobs if one takes longer than 1 minute + } + } + +With this setup, as new documents are indexed an `test-*` index, the next scheduled run will: + +1. select all new documents since the last observed value of the tracking field; +2. use PIT+search_after to paginate through all the data; +3. update the value of the field at the end of the pagination. + +[id="plugins-{type}s-{plugin}-scheduling"] ==== Scheduling Input from this plugin can be scheduled to run periodically according to a specific @@ -659,6 +772,8 @@ The value of this field is injected into each query if the query uses the placeh For the first query after a pipeline is started, the value used is either read from <> file, or taken from <> setting. +Note: The tracking value is updated only after the PIT+search_after run completes, it won't update during the search_after pagination. This is to allow use of slices. + [id="plugins-{type}s-{plugin}-tracking_field_seed"] ===== `tracking_field_seed` From 1a307db90579aa5149747946d4bced61966e63c6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Duarte?= Date: Mon, 17 Mar 2025 14:06:41 +0000 Subject: [PATCH 10/27] minor fixes --- docs/index.asciidoc | 4 +++- lib/logstash/inputs/elasticsearch.rb | 19 ++++++++++--------- .../inputs/elasticsearch/cursor_tracker.rb | 11 ----------- .../inputs/elasticsearch/paginated_search.rb | 7 ++----- 4 files changed, 15 insertions(+), 26 deletions(-) diff --git a/docs/index.asciidoc b/docs/index.asciidoc index 190ddbd7..584e7888 100644 --- a/docs/index.asciidoc +++ b/docs/index.asciidoc @@ -454,7 +454,9 @@ referencing multiple indices. * There is no default value for this setting. The path to store the last observed value of the tracking field, when used. -By default this file is stored as `/plugins/inputs/elasticsearch/.last_run`. +By default this file is stored as `/plugins/inputs/elasticsearch/last_run_value`. + +This setting should point to file, not a directory, and Logstash must have read+write access to this file. [id="plugins-{type}s-{plugin}-password"] ===== `password` diff --git a/lib/logstash/inputs/elasticsearch.rb b/lib/logstash/inputs/elasticsearch.rb index 327bf7fa..c706ebab 100644 --- a/lib/logstash/inputs/elasticsearch.rb +++ b/lib/logstash/inputs/elasticsearch.rb @@ -379,7 +379,11 @@ def push_hit(hit, output_queue, root_field = '_source') event = event_from_hit(hit, root_field) decorate(event) output_queue << event - @cursor_tracker.record_last_value(event) + record_last_value(event) + end + + def record_last_value(event) + @cursor_tracker.record_last_value(event) if @tracking_field end def event_from_hit(hit, root_field) @@ -688,14 +692,11 @@ def get_transport_client_class end def setup_cursor_tracker - if @tracking_field - @tracking_field_seed ||= Time.now.utc.iso8601 - @cursor_tracker = CursorTracker.new(last_run_metadata_path: @last_run_metadata_path, - tracking_field: @tracking_field, - tracking_field_seed: @tracking_field_seed) - else - @cursor_tracker = NoopCursorTracker.new - end + return unless @tracking_field + @tracking_field_seed ||= Time.now.utc.iso8601 + @cursor_tracker = CursorTracker.new(last_run_metadata_path: @last_run_metadata_path, + tracking_field: @tracking_field, + tracking_field_seed: @tracking_field_seed) end module URIOrEmptyValidator diff --git a/lib/logstash/inputs/elasticsearch/cursor_tracker.rb b/lib/logstash/inputs/elasticsearch/cursor_tracker.rb index 00c77fb0..172cd460 100644 --- a/lib/logstash/inputs/elasticsearch/cursor_tracker.rb +++ b/lib/logstash/inputs/elasticsearch/cursor_tracker.rb @@ -1,17 +1,6 @@ require 'fileutils' module LogStash; module Inputs; class Elasticsearch - class NoopCursorTracker - include LogStash::Util::Loggable - def checkpoint_cursor; end - - def converge_last_value; end - - def record_last_value(event); end - - def inject_cursor(query_json); return query_json; end - end - class CursorTracker include LogStash::Util::Loggable diff --git a/lib/logstash/inputs/elasticsearch/paginated_search.rb b/lib/logstash/inputs/elasticsearch/paginated_search.rb index cb1cbdc7..551a3f0a 100644 --- a/lib/logstash/inputs/elasticsearch/paginated_search.rb +++ b/lib/logstash/inputs/elasticsearch/paginated_search.rb @@ -24,11 +24,8 @@ def initialize(client, plugin) def do_run(output_queue, query) @query = query - if @slices.nil? || @slices <= 1 - retryable_search(output_queue) - else - retryable_slice_search(output_queue) - end + return retryable_search(output_queue) if @slices.nil? || @slices <= 1 + retryable_slice_search(output_queue) end def retryable(job_name, &block) From ba1dff2b91057d1d544c651330fd35e273bdf40c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Duarte?= Date: Mon, 17 Mar 2025 14:23:22 +0000 Subject: [PATCH 11/27] fix docs references --- docs/index.asciidoc | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/docs/index.asciidoc b/docs/index.asciidoc index 584e7888..c09c2aa7 100644 --- a/docs/index.asciidoc +++ b/docs/index.asciidoc @@ -56,9 +56,9 @@ It is sometimes desirable to track the value of a particular field between two j * avoid re-processing the entire result set of a long query after an unplanned restart * only grab new data from an index instead of processing the entire set on each job -For this, the Elasticsearch input plugin provides the <> and <> options. -When <> is set, the plugin will record the value of that field for the last document retrieved in a run into -a file (location defaults to <>. +For this, the Elasticsearch input plugin provides the <> and <> options. +When <> is set, the plugin will record the value of that field for the last document retrieved in a run into +a file (location defaults to <>). The user can then inject this value in the query using the placeholder `:last_value`. The value will be injected into the query before execution, and the updated after the query completes, assuming new data was found. @@ -771,8 +771,8 @@ It is also possible to target an entry in the event's metadata, which will be av Which field from the last event of a previous run will be used a cursor value for the following run. The value of this field is injected into each query if the query uses the placeholder `:last_value`. -For the first query after a pipeline is started, the value used is either read from <> file, -or taken from <> setting. +For the first query after a pipeline is started, the value used is either read from <> file, +or taken from <> setting. Note: The tracking value is updated only after the PIT+search_after run completes, it won't update during the search_after pagination. This is to allow use of slices. @@ -782,7 +782,7 @@ Note: The tracking value is updated only after the PIT+search_after run complete * Value type is <> * There is no default value for this setting. -The starting value for the <> if there is no <> already. +The starting value for the <> if there is no <> already. For a nanosecond timestamp based field, a suggested seed value something like `1980-01-01T23:59:59.999999999Z`. [id="plugins-{type}s-{plugin}-user"] From d598f37f109475559768e8cd0b58b781c1c22b0b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Duarte?= Date: Mon, 17 Mar 2025 23:18:14 +0000 Subject: [PATCH 12/27] cursor checkpoint after each page --- lib/logstash/inputs/elasticsearch.rb | 27 +++++++++++-------- .../inputs/elasticsearch/cursor_tracker.rb | 23 +++++++++++----- .../inputs/elasticsearch/paginated_search.rb | 9 +++++++ 3 files changed, 41 insertions(+), 18 deletions(-) diff --git a/lib/logstash/inputs/elasticsearch.rb b/lib/logstash/inputs/elasticsearch.rb index c706ebab..66e08ebc 100644 --- a/lib/logstash/inputs/elasticsearch.rb +++ b/lib/logstash/inputs/elasticsearch.rb @@ -357,12 +357,10 @@ def run(output_queue) if @schedule scheduler.cron(@schedule, :overlap => @schedule_overlap) do @query_executor.do_run(output_queue, get_query_object()) - @cursor_tracker.checkpoint_cursor end scheduler.join else @query_executor.do_run(output_queue, get_query_object()) - @cursor_tracker.checkpoint_cursor end end @@ -677,7 +675,22 @@ def setup_query_executor end end - def get_transport_client_class + def setup_cursor_tracker + return unless @tracking_field + return unless @query_executor.is_a?(LogStash::Inputs::Elasticsearch::SearchAfter) + + if @resolved_search_api != "search_after" || @response_type != "hits" + raise ConfigurationError.new("The `tracking_field` feature can only be used with `search_after` non-aggregation queries") + end + + @tracking_field_seed ||= Time.now.utc.iso8601 + @cursor_tracker = CursorTracker.new(last_run_metadata_path: @last_run_metadata_path, + tracking_field: @tracking_field, + tracking_field_seed: @tracking_field_seed) + @query_executor.cursor_tracker = @cursor_tracker + end + + def get_transport_client_class # LS-core includes `elasticsearch` gem. The gem is composed of two separate gems: `elasticsearch-api` and `elasticsearch-transport` # And now `elasticsearch-transport` is old, instead we have `elastic-transport`. # LS-core updated `elasticsearch` > 8: https://github.com/elastic/logstash/pull/17161 @@ -691,14 +704,6 @@ def get_transport_client_class ::Elastic::Transport::Transport::HTTP::Manticore end - def setup_cursor_tracker - return unless @tracking_field - @tracking_field_seed ||= Time.now.utc.iso8601 - @cursor_tracker = CursorTracker.new(last_run_metadata_path: @last_run_metadata_path, - tracking_field: @tracking_field, - tracking_field_seed: @tracking_field_seed) - end - module URIOrEmptyValidator ## # @override to provide :uri_or_empty validator diff --git a/lib/logstash/inputs/elasticsearch/cursor_tracker.rb b/lib/logstash/inputs/elasticsearch/cursor_tracker.rb index 172cd460..fca74a76 100644 --- a/lib/logstash/inputs/elasticsearch/cursor_tracker.rb +++ b/lib/logstash/inputs/elasticsearch/cursor_tracker.rb @@ -14,18 +14,27 @@ def initialize(last_run_metadata_path:, tracking_field:, tracking_field_seed:) @last_value = IO.read(@last_run_metadata_path) rescue nil || tracking_field_seed @tracking_field = tracking_field logger.info "Starting value for cursor field \"#{@tracking_field}\": #{@last_value}" + @mutex = Mutex.new end - def checkpoint_cursor - converge_last_value - IO.write(@last_run_metadata_path, @last_value) - @last_value_hashmap.clear + def checkpoint_cursor(intermediate: true) + @mutex.synchronize do + if intermediate + # in intermediate checkpoints pick the smallest + converge_last_value {|v1, v2| v1 < v2 ? v1 : v2} + else + # in the last search of a PIT choose the largest + converge_last_value {|v1, v2| v1 > v2 ? v1 : v2} + @last_value_hashmap.clear + end + IO.write(@last_run_metadata_path, @last_value) + end end - def converge_last_value + def converge_last_value(&block) return if @last_value_hashmap.empty? - new_last_value = @last_value_hashmap.reduceValues(1000, lambda { |v1, v2| Java::java.time.Instant.parse(v1).isBefore(Java::java.time.Instant.parse(v2)) ? v2 : v1 }) - logger.trace? && logger.trace("converge_last_value: got #{@last_value_hashmap.values.inspect}. won: #{new_last_value}") + new_last_value = @last_value_hashmap.reduceValues(1000, &block) + logger.debug? && logger.debug("converge_last_value: got #{@last_value_hashmap.values.inspect}. won: #{new_last_value}") return if new_last_value == @last_value @last_value = new_last_value logger.info "New cursor value for field \"#{@tracking_field}\" is: #{new_last_value}" diff --git a/lib/logstash/inputs/elasticsearch/paginated_search.rb b/lib/logstash/inputs/elasticsearch/paginated_search.rb index 551a3f0a..dd66b2c0 100644 --- a/lib/logstash/inputs/elasticsearch/paginated_search.rb +++ b/lib/logstash/inputs/elasticsearch/paginated_search.rb @@ -123,6 +123,13 @@ class SearchAfter < PaginatedSearch PIT_JOB = "create point in time (PIT)" SEARCH_AFTER_JOB = "search_after paginated search" + attr_accessor :cursor_tracker + + def do_run(output_queue, query) + super(output_queue, query) + @cursor_tracker.checkpoint_cursor(intermediate: false) if @cursor_tracker + end + def pit?(id) !!id&.is_a?(String) end @@ -193,6 +200,8 @@ def search(output_queue:, slice_id: nil, pit_id:) end end + @cursor_tracker.checkpoint_cursor(intermediate: true) if @cursor_tracker + logger.info("Query completed", log_details) end From 2c4af0bbb8a38bf600f8fc742a3a2d2b274d5373 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Duarte?= Date: Tue, 18 Mar 2025 07:42:34 +0000 Subject: [PATCH 13/27] Update lib/logstash/inputs/elasticsearch.rb --- lib/logstash/inputs/elasticsearch.rb | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/logstash/inputs/elasticsearch.rb b/lib/logstash/inputs/elasticsearch.rb index 66e08ebc..91406eae 100644 --- a/lib/logstash/inputs/elasticsearch.rb +++ b/lib/logstash/inputs/elasticsearch.rb @@ -365,6 +365,7 @@ def run(output_queue) end def get_query_object + return @query unless @cursor_tracker injected_query = @cursor_tracker.inject_cursor(@query) @logger.debug("new query is #{injected_query}") query_object = LogStash::Json.load(injected_query) From d44c9781a3a0a3ee1c6be1d17bcea7cf7d179cb1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Duarte?= Date: Tue, 18 Mar 2025 08:22:02 +0000 Subject: [PATCH 14/27] fix get_query_object --- lib/logstash/inputs/elasticsearch.rb | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/lib/logstash/inputs/elasticsearch.rb b/lib/logstash/inputs/elasticsearch.rb index 91406eae..da8a7fa5 100644 --- a/lib/logstash/inputs/elasticsearch.rb +++ b/lib/logstash/inputs/elasticsearch.rb @@ -365,10 +365,13 @@ def run(output_queue) end def get_query_object - return @query unless @cursor_tracker - injected_query = @cursor_tracker.inject_cursor(@query) - @logger.debug("new query is #{injected_query}") - query_object = LogStash::Json.load(injected_query) + if @cursor_tracker + query = @cursor_tracker.inject_cursor(@query) + @logger.debug("new query is #{injected_query}") + else + query = @query + end + LogStash::Json.load(query) end ## From ceabf2e106f1e9d3aa7ff3bdba494df5a8ec28c7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Duarte?= Date: Tue, 18 Mar 2025 08:25:08 +0000 Subject: [PATCH 15/27] Update lib/logstash/inputs/elasticsearch.rb --- lib/logstash/inputs/elasticsearch.rb | 1 - 1 file changed, 1 deletion(-) diff --git a/lib/logstash/inputs/elasticsearch.rb b/lib/logstash/inputs/elasticsearch.rb index da8a7fa5..ab8db1fd 100644 --- a/lib/logstash/inputs/elasticsearch.rb +++ b/lib/logstash/inputs/elasticsearch.rb @@ -129,7 +129,6 @@ class LogStash::Inputs::Elasticsearch < LogStash::Inputs::Base # TODO: main concerns # * using anything other than _event.timestamp easily leads to data loss # * the first "synchronization run can take a long time" - # * checkpointing is only safe to do after each run (not per document) config :tracking_field, :validate => :string # Define the initial seed value of the tracking_field From 3c32b91e0414c3228107e3e4fcdd9102ef016aa5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Duarte?= Date: Tue, 18 Mar 2025 15:39:36 +0000 Subject: [PATCH 16/27] fix log entry --- lib/logstash/inputs/elasticsearch.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/logstash/inputs/elasticsearch.rb b/lib/logstash/inputs/elasticsearch.rb index ab8db1fd..91aa34a3 100644 --- a/lib/logstash/inputs/elasticsearch.rb +++ b/lib/logstash/inputs/elasticsearch.rb @@ -366,7 +366,7 @@ def run(output_queue) def get_query_object if @cursor_tracker query = @cursor_tracker.inject_cursor(@query) - @logger.debug("new query is #{injected_query}") + @logger.debug("new query is #{query}") else query = @query end From 92362df8e1c77552f94c7cc2f6b7aa174a8d971a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Duarte?= Date: Tue, 18 Mar 2025 17:00:30 +0000 Subject: [PATCH 17/27] add cursor unit testing --- .../inputs/elasticsearch/cursor_tracker.rb | 6 +- spec/inputs/cursor_tracker_spec.rb | 70 +++++++++++++++++++ 2 files changed, 75 insertions(+), 1 deletion(-) create mode 100644 spec/inputs/cursor_tracker_spec.rb diff --git a/lib/logstash/inputs/elasticsearch/cursor_tracker.rb b/lib/logstash/inputs/elasticsearch/cursor_tracker.rb index fca74a76..e0e7b48e 100644 --- a/lib/logstash/inputs/elasticsearch/cursor_tracker.rb +++ b/lib/logstash/inputs/elasticsearch/cursor_tracker.rb @@ -48,9 +48,13 @@ def record_last_value(event) def inject_cursor(query_json) # ":present" means "now - 30s" to avoid grabbing partially visible data in the PIT - result = query_json.gsub(":last_value", @last_value.to_s).gsub(":present", Java::java.time.Instant.now.minusSeconds(30).to_s) + result = query_json.gsub(":last_value", @last_value.to_s).gsub(":present", now_in_nanos) logger.debug("inject_cursor: injected values for ':last_value' and ':present'", :query => result) result end + + def now_in_nanos + Java::java.time.Instant.now.minusSeconds(30).to_s + end end end; end; end diff --git a/spec/inputs/cursor_tracker_spec.rb b/spec/inputs/cursor_tracker_spec.rb new file mode 100644 index 00000000..f88a1f9d --- /dev/null +++ b/spec/inputs/cursor_tracker_spec.rb @@ -0,0 +1,70 @@ +# encoding: utf-8 +require "logstash/devutils/rspec/spec_helper" +require "logstash/devutils/rspec/shared_examples" +require "logstash/inputs/elasticsearch/cursor_tracker" + +describe LogStash::Inputs::Elasticsearch::CursorTracker do + + let(:last_run_metadata_path) { Tempfile.new('cursor_tracker_testing').path } + let(:tracking_field_seed) { "1980-01-01T23:59:59.999999999Z" } + let(:options) do + { + :last_run_metadata_path => last_run_metadata_path, + :tracking_field => "my_field", + :tracking_field_seed => tracking_field_seed + } + end + + subject { described_class.new(**options) } + + it "creating a class works" do + expect(subject).to be_a described_class + end + + describe "checkpoint_cursor" do + before(:each) do + [ + Thread.new(subject) {|subject| subject.record_last_value(LogStash::Event.new("my_field" => "2025-01-03T23:59:59.999999999Z")) }, + Thread.new(subject) {|subject| subject.record_last_value(LogStash::Event.new("my_field" => "2025-01-01T23:59:59.999999999Z")) }, + Thread.new(subject) {|subject| subject.record_last_value(LogStash::Event.new("my_field" => "2025-01-02T23:59:59.999999999Z")) }, + ].each(&:join) + end + context "when doing intermediate checkpoint" do + before(:each) { subject.checkpoint_cursor(intermediate: true) } + it "persists the smallest value" do + expect(IO.read(last_run_metadata_path)).to eq("2025-01-01T23:59:59.999999999Z") + end + end + context "when doing non-intermediate checkpoint" do + before(:each) { subject.checkpoint_cursor(intermediate: false) } + it "persists the largest value" do + expect(IO.read(last_run_metadata_path)).to eq("2025-01-03T23:59:59.999999999Z") + end + end + end + + describe "inject_cursor" do + let(:new_value) { "2025-01-03T23:59:59.999999999Z" } + let(:fake_now) { "2026-09-19T23:59:59.999999999Z" } + + let(:query) do + %q[ + { "query": { "range": { "event.ingested": { "gt": :last_value, "lt": :present}}}, "sort": [ { "event.ingested": {"order": "asc", "format": "strict_date_optional_time_nanos", "numeric_type" : "date_nanos" } } ] } + ] + end + + before(:each) do + subject.record_last_value(LogStash::Event.new("my_field" => new_value)) + subject.checkpoint_cursor(intermediate: false) + allow(subject).to receive(:now_in_nanos).and_return(fake_now) + end + + it "injects the value of the cursor into json query if it contains :last_value" do + expect(subject.inject_cursor(query)).to match(/#{new_value}/) + end + + it "injects current time into json query if it contains :present" do + expect(subject.inject_cursor(query)).to match(/#{fake_now}/) + end + end +end From eed4295d49a67f1d99d1267d5a769d41d40bd443 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Duarte?= Date: Tue, 18 Mar 2025 19:10:16 +0000 Subject: [PATCH 18/27] fix testing --- spec/inputs/cursor_tracker_spec.rb | 1 + 1 file changed, 1 insertion(+) diff --git a/spec/inputs/cursor_tracker_spec.rb b/spec/inputs/cursor_tracker_spec.rb index f88a1f9d..3c1d0b9e 100644 --- a/spec/inputs/cursor_tracker_spec.rb +++ b/spec/inputs/cursor_tracker_spec.rb @@ -1,6 +1,7 @@ # encoding: utf-8 require "logstash/devutils/rspec/spec_helper" require "logstash/devutils/rspec/shared_examples" +require "logstash/inputs/elasticsearch" require "logstash/inputs/elasticsearch/cursor_tracker" describe LogStash::Inputs::Elasticsearch::CursorTracker do From f45f271641b5ea032a6049793269dff20ef9e545 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Duarte?= Date: Wed, 19 Mar 2025 10:45:27 +0000 Subject: [PATCH 19/27] allow test to work with es-ruby 7 and 8 --- spec/inputs/integration/elasticsearch_spec.rb | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/spec/inputs/integration/elasticsearch_spec.rb b/spec/inputs/integration/elasticsearch_spec.rb index 2e153495..a7c45bd5 100644 --- a/spec/inputs/integration/elasticsearch_spec.rb +++ b/spec/inputs/integration/elasticsearch_spec.rb @@ -76,6 +76,14 @@ shared_examples 'secured_elasticsearch' do it_behaves_like 'an elasticsearch index plugin' + let(:unauth_exception_class) do + begin + Elasticsearch::Transport::Transport::Errors::Unauthorized + rescue + Elastic::Transport::Transport::Errors::Unauthorized + end + end + context "incorrect auth credentials" do let(:config) do @@ -85,7 +93,7 @@ let(:queue) { [] } it "fails to run the plugin" do - expect { plugin.register }.to raise_error Elasticsearch::Transport::Transport::Errors::Unauthorized + expect { plugin.register }.to raise_error unauth_exception_class end end end From 5bb7a4f9fe89d770218d564272092f74da143459 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Duarte?= Date: Wed, 19 Mar 2025 13:53:02 +0000 Subject: [PATCH 20/27] Fix ordered list formatting in documentation --- docs/index.asciidoc | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/docs/index.asciidoc b/docs/index.asciidoc index c09c2aa7..d83a8f82 100644 --- a/docs/index.asciidoc +++ b/docs/index.asciidoc @@ -74,7 +74,7 @@ https://www.elastic.co/guide/en/elasticsearch/reference/current/date_nanos.html[ A good use case for this feature is to track new data in an index, which can be achieved by: -1. create ingest pipeline that adds Elasticsearch's `_ingest.timestamp` field to the documents as `event.ingested`: +. create ingest pipeline that adds Elasticsearch's `_ingest.timestamp` field to the documents as `event.ingested`: [source, json] PUT _ingest/pipeline/my-pipeline @@ -89,8 +89,8 @@ A good use case for this feature is to track new data in an index, which can be ] } - -2. create an index mapping where the tracking field is of date nanosecond type and invokes the defined pipeline: +[start=2] +. create an index mapping where the tracking field is of date nanosecond type and invokes the defined pipeline: [source, json] PUT /_template/my_template @@ -113,7 +113,8 @@ A good use case for this feature is to track new data in an index, which can be } } -3. define a query that looks at all data of the indices, sorted by the tracking field, and with a range filter since the last value seen until present: +[start=3] +. define a query that looks at all data of the indices, sorted by the tracking field, and with a range filter since the last value seen until present: [source,json] { @@ -136,7 +137,8 @@ A good use case for this feature is to track new data in an index, which can be ] } -4. configure the Elasticsearch input to query the indices with the query defined above, every minute, and track the `event.ingested` field: +[start=4] +. configure the Elasticsearch input to query the indices with the query defined above, every minute, and track the `event.ingested` field: [source, ruby] input { @@ -157,9 +159,9 @@ A good use case for this feature is to track new data in an index, which can be With this setup, as new documents are indexed an `test-*` index, the next scheduled run will: -1. select all new documents since the last observed value of the tracking field; -2. use PIT+search_after to paginate through all the data; -3. update the value of the field at the end of the pagination. +. select all new documents since the last observed value of the tracking field; +. use PIT+search_after to paginate through all the data; +. update the value of the field at the end of the pagination. [id="plugins-{type}s-{plugin}-scheduling"] ==== Scheduling From 6217204c737c715fc0cb517e99e86076f73b6881 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Duarte?= Date: Wed, 19 Mar 2025 16:16:26 +0000 Subject: [PATCH 21/27] store last_run_value in a location namespaced by pipeline id --- docs/index.asciidoc | 6 +++--- lib/logstash/inputs/elasticsearch.rb | 14 +++++++++++--- .../inputs/elasticsearch/cursor_tracker.rb | 2 -- 3 files changed, 14 insertions(+), 8 deletions(-) diff --git a/docs/index.asciidoc b/docs/index.asciidoc index d83a8f82..de539784 100644 --- a/docs/index.asciidoc +++ b/docs/index.asciidoc @@ -63,12 +63,12 @@ a file (location defaults to <> The user can then inject this value in the query using the placeholder `:last_value`. The value will be injected into the query before execution, and the updated after the query completes, assuming new data was found. -The plugin also offers another placeholder called `:present` used to inject the nano-second based - This feature works best when: * the query sorts by the tracking field * the field type has enough resolution so that two events are unlikely to have the same value for the field +The plugin also offers another placeholder called `:present` used to inject the nano-second based value of "now-30s". + A suggestion is to use a tracking field that has nanosecond second precision, like https://www.elastic.co/guide/en/elasticsearch/reference/current/date_nanos.html[date nanoseconds] field type. @@ -456,7 +456,7 @@ referencing multiple indices. * There is no default value for this setting. The path to store the last observed value of the tracking field, when used. -By default this file is stored as `/plugins/inputs/elasticsearch/last_run_value`. +By default this file is stored as `/plugins/inputs/elasticsearch//last_run_value`. This setting should point to file, not a directory, and Logstash must have read+write access to this file. diff --git a/lib/logstash/inputs/elasticsearch.rb b/lib/logstash/inputs/elasticsearch.rb index 91aa34a3..3630712c 100644 --- a/lib/logstash/inputs/elasticsearch.rb +++ b/lib/logstash/inputs/elasticsearch.rb @@ -136,7 +136,7 @@ class LogStash::Inputs::Elasticsearch < LogStash::Inputs::Base # The location of where the tracking field value will be stored # The value is persisted after each scheduled run (and not per result) - # If it's not set it defaults to '${path.data}/plugins/inputs/elasticsearch/last_run_value' + # If it's not set it defaults to '${path.data}/plugins/inputs/elasticsearch//last_run_value' config :last_run_metadata_path, :validate => :string # If set, include Elasticsearch document information such as index, type, and @@ -687,13 +687,21 @@ def setup_cursor_tracker end @tracking_field_seed ||= Time.now.utc.iso8601 - @cursor_tracker = CursorTracker.new(last_run_metadata_path: @last_run_metadata_path, + @cursor_tracker = CursorTracker.new(last_run_metadata_path: last_run_metadata_path, tracking_field: @tracking_field, tracking_field_seed: @tracking_field_seed) @query_executor.cursor_tracker = @cursor_tracker end - def get_transport_client_class + def last_run_metadata_path + return @last_run_metadata_path if @last_run_metadata_path + + last_run_metadata_path = ::File.join(LogStash::SETTINGS.get_value("path.data"), "plugins", "inputs", "elasticsearch", pipeline_id, "last_run_value") + FileUtils.mkdir_p ::File.dirname(last_run_metadata_path) + last_run_metadata_path + end + + def get_transport_client_class # LS-core includes `elasticsearch` gem. The gem is composed of two separate gems: `elasticsearch-api` and `elasticsearch-transport` # And now `elasticsearch-transport` is old, instead we have `elastic-transport`. # LS-core updated `elasticsearch` > 8: https://github.com/elastic/logstash/pull/17161 diff --git a/lib/logstash/inputs/elasticsearch/cursor_tracker.rb b/lib/logstash/inputs/elasticsearch/cursor_tracker.rb index e0e7b48e..24386ac9 100644 --- a/lib/logstash/inputs/elasticsearch/cursor_tracker.rb +++ b/lib/logstash/inputs/elasticsearch/cursor_tracker.rb @@ -8,8 +8,6 @@ class CursorTracker def initialize(last_run_metadata_path:, tracking_field:, tracking_field_seed:) @last_run_metadata_path = last_run_metadata_path - @last_run_metadata_path ||= ::File.join(LogStash::SETTINGS.get_value("path.data"), "plugins", "inputs", "elasticsearch", "last_run_value") - FileUtils.mkdir_p ::File.dirname(@last_run_metadata_path) @last_value_hashmap = Java::java.util.concurrent.ConcurrentHashMap.new @last_value = IO.read(@last_run_metadata_path) rescue nil || tracking_field_seed @tracking_field = tracking_field From b3d81c992cfd76a0b2112c39eb56e8451b1acc5c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Duarte?= Date: Wed, 19 Mar 2025 16:48:55 +0000 Subject: [PATCH 22/27] minor test tweaks --- spec/inputs/cursor_tracker_spec.rb | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/spec/inputs/cursor_tracker_spec.rb b/spec/inputs/cursor_tracker_spec.rb index 3c1d0b9e..ea86fd86 100644 --- a/spec/inputs/cursor_tracker_spec.rb +++ b/spec/inputs/cursor_tracker_spec.rb @@ -24,6 +24,7 @@ describe "checkpoint_cursor" do before(:each) do + subject.checkpoint_cursor(intermediate: false) # store seed value [ Thread.new(subject) {|subject| subject.record_last_value(LogStash::Event.new("my_field" => "2025-01-03T23:59:59.999999999Z")) }, Thread.new(subject) {|subject| subject.record_last_value(LogStash::Event.new("my_field" => "2025-01-01T23:59:59.999999999Z")) }, @@ -31,14 +32,14 @@ ].each(&:join) end context "when doing intermediate checkpoint" do - before(:each) { subject.checkpoint_cursor(intermediate: true) } it "persists the smallest value" do + subject.checkpoint_cursor(intermediate: true) expect(IO.read(last_run_metadata_path)).to eq("2025-01-01T23:59:59.999999999Z") end end context "when doing non-intermediate checkpoint" do - before(:each) { subject.checkpoint_cursor(intermediate: false) } it "persists the largest value" do + subject.checkpoint_cursor(intermediate: false) expect(IO.read(last_run_metadata_path)).to eq("2025-01-03T23:59:59.999999999Z") end end From 713d528fced82ac38f8924a3191c6665228e0aab Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Duarte?= Date: Wed, 19 Mar 2025 17:25:31 +0000 Subject: [PATCH 23/27] [skip ci] Update docs/index.asciidoc --- docs/index.asciidoc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/index.asciidoc b/docs/index.asciidoc index de539784..86566537 100644 --- a/docs/index.asciidoc +++ b/docs/index.asciidoc @@ -776,7 +776,7 @@ The value of this field is injected into each query if the query uses the placeh For the first query after a pipeline is started, the value used is either read from <> file, or taken from <> setting. -Note: The tracking value is updated only after the PIT+search_after run completes, it won't update during the search_after pagination. This is to allow use of slices. +Note: The tracking value is updated after each page is read and at the end of each Point in Time. In case of a crash the last saved value will be used so some duplication of data can occur. For this reason the use of unique document IDs for each event is recommended in the downstream destination. [id="plugins-{type}s-{plugin}-tracking_field_seed"] ===== `tracking_field_seed` From 006213855fe120e4d723ba9aa336510ea32dc0ff Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Duarte?= Date: Thu, 20 Mar 2025 10:44:49 +0000 Subject: [PATCH 24/27] default seed value to nanosecond epoch --- docs/index.asciidoc | 5 +++-- lib/logstash/inputs/elasticsearch.rb | 5 ++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/docs/index.asciidoc b/docs/index.asciidoc index 86566537..c4f9c3be 100644 --- a/docs/index.asciidoc +++ b/docs/index.asciidoc @@ -782,10 +782,11 @@ Note: The tracking value is updated after each page is read and at the end of ea ===== `tracking_field_seed` * Value type is <> -* There is no default value for this setting. +* Default value is `"1970-01-01T00:00:00.000000000Z"` The starting value for the <> if there is no <> already. -For a nanosecond timestamp based field, a suggested seed value something like `1980-01-01T23:59:59.999999999Z`. +This field defaults to the nanosecond precision ISO8601 representation of `epoch`, or "1970-01-01T00:00:00.000000000Z", given nano-second precision timestamps are the +most reliable data format to use for this feature. [id="plugins-{type}s-{plugin}-user"] ===== `user` diff --git a/lib/logstash/inputs/elasticsearch.rb b/lib/logstash/inputs/elasticsearch.rb index 3630712c..564acc6a 100644 --- a/lib/logstash/inputs/elasticsearch.rb +++ b/lib/logstash/inputs/elasticsearch.rb @@ -126,13 +126,13 @@ class LogStash::Inputs::Elasticsearch < LogStash::Inputs::Base config :slices, :validate => :number # Enable tracking the value of a given field to be used as a cursor - # TODO: main concerns + # Main concerns: # * using anything other than _event.timestamp easily leads to data loss # * the first "synchronization run can take a long time" config :tracking_field, :validate => :string # Define the initial seed value of the tracking_field - config :tracking_field_seed, :validate => :string + config :tracking_field_seed, :validate => :string, :default => "1970-01-01T00:00:00.000000000Z" # The location of where the tracking field value will be stored # The value is persisted after each scheduled run (and not per result) @@ -686,7 +686,6 @@ def setup_cursor_tracker raise ConfigurationError.new("The `tracking_field` feature can only be used with `search_after` non-aggregation queries") end - @tracking_field_seed ||= Time.now.utc.iso8601 @cursor_tracker = CursorTracker.new(last_run_metadata_path: last_run_metadata_path, tracking_field: @tracking_field, tracking_field_seed: @tracking_field_seed) From 7eb964ed0b2efbf9dd102db611ffe25547b7c855 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Duarte?= Date: Thu, 20 Mar 2025 10:48:43 +0000 Subject: [PATCH 25/27] improve name of now_in_nanos to now_minus_30s --- lib/logstash/inputs/elasticsearch/cursor_tracker.rb | 4 ++-- spec/inputs/cursor_tracker_spec.rb | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/logstash/inputs/elasticsearch/cursor_tracker.rb b/lib/logstash/inputs/elasticsearch/cursor_tracker.rb index 24386ac9..d43b1fd8 100644 --- a/lib/logstash/inputs/elasticsearch/cursor_tracker.rb +++ b/lib/logstash/inputs/elasticsearch/cursor_tracker.rb @@ -46,12 +46,12 @@ def record_last_value(event) def inject_cursor(query_json) # ":present" means "now - 30s" to avoid grabbing partially visible data in the PIT - result = query_json.gsub(":last_value", @last_value.to_s).gsub(":present", now_in_nanos) + result = query_json.gsub(":last_value", @last_value.to_s).gsub(":present", now_minus_30s) logger.debug("inject_cursor: injected values for ':last_value' and ':present'", :query => result) result end - def now_in_nanos + def now_minus_30s Java::java.time.Instant.now.minusSeconds(30).to_s end end diff --git a/spec/inputs/cursor_tracker_spec.rb b/spec/inputs/cursor_tracker_spec.rb index ea86fd86..291d6c61 100644 --- a/spec/inputs/cursor_tracker_spec.rb +++ b/spec/inputs/cursor_tracker_spec.rb @@ -58,7 +58,7 @@ before(:each) do subject.record_last_value(LogStash::Event.new("my_field" => new_value)) subject.checkpoint_cursor(intermediate: false) - allow(subject).to receive(:now_in_nanos).and_return(fake_now) + allow(subject).to receive(:now_minus_30s).and_return(fake_now) end it "injects the value of the cursor into json query if it contains :last_value" do From 7141ad992790e098e7898fab638e14b98d901609 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Duarte?= Date: Thu, 20 Mar 2025 10:49:03 +0000 Subject: [PATCH 26/27] Apply suggestions from code review Co-authored-by: Rob Bavey --- docs/index.asciidoc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/index.asciidoc b/docs/index.asciidoc index c4f9c3be..8a611c3d 100644 --- a/docs/index.asciidoc +++ b/docs/index.asciidoc @@ -547,7 +547,7 @@ exactly once. * Default value is `true` Whether to allow queuing of a scheduled run if a run is occurring. -While this is ideal for ensuring a new run happens immediatelly after the previous on finishes if there +While this is ideal for ensuring a new run happens immediately after the previous on finishes if there is a lot of work to do, but given the queue is unbounded it may lead to an out of memory over long periods of time if the queue grows continuously. From 5ab5546caeb10c3e731fb4b2a392b526670dc662 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Duarte?= Date: Thu, 20 Mar 2025 15:37:25 +0000 Subject: [PATCH 27/27] de-emphasize the cursor docs section --- docs/index.asciidoc | 111 ++++++++++++++++++++++---------------------- 1 file changed, 56 insertions(+), 55 deletions(-) diff --git a/docs/index.asciidoc b/docs/index.asciidoc index 8a611c3d..905d632d 100644 --- a/docs/index.asciidoc +++ b/docs/index.asciidoc @@ -48,10 +48,66 @@ This would create an Elasticsearch query with the following format: "sort": [ "_doc" ] }' +[id="plugins-{type}s-{plugin}-scheduling"] +==== Scheduling + +Input from this plugin can be scheduled to run periodically according to a specific +schedule. This scheduling syntax is powered by https://github.com/jmettraux/rufus-scheduler[rufus-scheduler]. +The syntax is cron-like with some extensions specific to Rufus (e.g. timezone support ). + +Examples: + +|========================================================== +| `* 5 * 1-3 *` | will execute every minute of 5am every day of January through March. +| `0 * * * *` | will execute on the 0th minute of every hour every day. +| `0 6 * * * America/Chicago` | will execute at 6:00am (UTC/GMT -5) every day. +|========================================================== + + +Further documentation describing this syntax can be found +https://github.com/jmettraux/rufus-scheduler#parsing-cronlines-and-time-strings[here]. + + +[id="plugins-{type}s-{plugin}-auth"] +==== Authentication + +Authentication to a secure Elasticsearch cluster is possible using _one_ of the following options: + +* <> AND <> +* <> +* <> + +[id="plugins-{type}s-{plugin}-autz"] +==== Authorization + +Authorization to a secure Elasticsearch cluster requires `read` permission at index level and `monitoring` permissions at cluster level. +The `monitoring` permission at cluster level is necessary to perform periodic connectivity checks. + +[id="plugins-{type}s-{plugin}-ecs"] +==== Compatibility with the Elastic Common Schema (ECS) + +When ECS compatibility is disabled, `docinfo_target` uses the `"@metadata"` field as a default, with ECS enabled the plugin +uses a naming convention `"[@metadata][input][elasticsearch]"` as a default target for placing document information. + +The plugin logs a warning when ECS is enabled and `target` isn't set. + +TIP: Set the `target` option to avoid potential schema conflicts. + +[id="plugins-{type}s-{plugin}-failure-handling"] +==== Failure handling + +When this input plugin cannot create a structured `Event` from a hit result, it will instead create an `Event` that is tagged with `_elasticsearch_input_failure` whose `[event][original]` is a JSON-encoded string representation of the entire hit. + +Common causes are: + + - When the hit result contains top-level fields that are {logstash-ref}/processing.html#reserved-fields[reserved in Logstash] but do not have the expected shape. Use the <> directive to avoid conflicts with the top-level namespace. + - When <> is enabled and the docinfo fields cannot be merged into the hit result. Combine <> and <> to avoid conflict. [id="plugins-{type}s-{plugin}-cursor"] ==== Tracking a field's value across runs +NOTE: experimental:[] `tracking_field` and related settings are experimental and subject to change in the future + It is sometimes desirable to track the value of a particular field between two jobs: * avoid re-processing the entire result set of a long query after an unplanned restart * only grab new data from an index instead of processing the entire set on each job @@ -163,61 +219,6 @@ With this setup, as new documents are indexed an `test-*` index, the next schedu . use PIT+search_after to paginate through all the data; . update the value of the field at the end of the pagination. -[id="plugins-{type}s-{plugin}-scheduling"] -==== Scheduling - -Input from this plugin can be scheduled to run periodically according to a specific -schedule. This scheduling syntax is powered by https://github.com/jmettraux/rufus-scheduler[rufus-scheduler]. -The syntax is cron-like with some extensions specific to Rufus (e.g. timezone support ). - -Examples: - -|========================================================== -| `* 5 * 1-3 *` | will execute every minute of 5am every day of January through March. -| `0 * * * *` | will execute on the 0th minute of every hour every day. -| `0 6 * * * America/Chicago` | will execute at 6:00am (UTC/GMT -5) every day. -|========================================================== - - -Further documentation describing this syntax can be found -https://github.com/jmettraux/rufus-scheduler#parsing-cronlines-and-time-strings[here]. - - -[id="plugins-{type}s-{plugin}-auth"] -==== Authentication - -Authentication to a secure Elasticsearch cluster is possible using _one_ of the following options: - -* <> AND <> -* <> -* <> - -[id="plugins-{type}s-{plugin}-autz"] -==== Authorization - -Authorization to a secure Elasticsearch cluster requires `read` permission at index level and `monitoring` permissions at cluster level. -The `monitoring` permission at cluster level is necessary to perform periodic connectivity checks. - -[id="plugins-{type}s-{plugin}-ecs"] -==== Compatibility with the Elastic Common Schema (ECS) - -When ECS compatibility is disabled, `docinfo_target` uses the `"@metadata"` field as a default, with ECS enabled the plugin -uses a naming convention `"[@metadata][input][elasticsearch]"` as a default target for placing document information. - -The plugin logs a warning when ECS is enabled and `target` isn't set. - -TIP: Set the `target` option to avoid potential schema conflicts. - -[id="plugins-{type}s-{plugin}-failure-handling"] -==== Failure handling - -When this input plugin cannot create a structured `Event` from a hit result, it will instead create an `Event` that is tagged with `_elasticsearch_input_failure` whose `[event][original]` is a JSON-encoded string representation of the entire hit. - -Common causes are: - - - When the hit result contains top-level fields that are {logstash-ref}/processing.html#reserved-fields[reserved in Logstash] but do not have the expected shape. Use the <> directive to avoid conflicts with the top-level namespace. - - When <> is enabled and the docinfo fields cannot be merged into the hit result. Combine <> and <> to avoid conflict. - [id="plugins-{type}s-{plugin}-options"] ==== Elasticsearch Input configuration options