From ca8c2ce9c0a4349c6848a04f66d84d807459b882 Mon Sep 17 00:00:00 2001 From: Jan Notka Date: Wed, 27 Jan 2021 15:39:17 +0100 Subject: [PATCH 1/2] Put pipeline_id into ThreadContext after entering flush_thread --- lib/logstash/outputs/google_bigquery.rb | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/lib/logstash/outputs/google_bigquery.rb b/lib/logstash/outputs/google_bigquery.rb index 1adc2c4..f9afea2 100644 --- a/lib/logstash/outputs/google_bigquery.rb +++ b/lib/logstash/outputs/google_bigquery.rb @@ -9,6 +9,8 @@ require 'fileutils' require 'concurrent' +java_import org.apache.logging.log4j.ThreadContext + # # === Summary # @@ -275,7 +277,9 @@ def write_to_errors_file(messages, table) end def init_batcher_flush_thread + pipeline_id = ThreadContext.get('pipeline.id') @flush_thread = Thread.new do + ThreadContext.put('pipeline.id', pipeline_id) until stopping? Stud.stoppable_sleep(@flush_interval_secs) { stopping? } From 294778710040c6242ad1d5d95c2f98345022df4b Mon Sep 17 00:00:00 2001 From: Jan Notka Date: Wed, 27 Jan 2021 15:42:03 +0100 Subject: [PATCH 2/2] implement retry functionality --- lib/logstash/outputs/google_bigquery.rb | 40 +++++++++++++++++++++++-- spec/outputs/google_bigquery_spec.rb | 22 ++++++++++++++ 2 files changed, 60 insertions(+), 2 deletions(-) diff --git a/lib/logstash/outputs/google_bigquery.rb b/lib/logstash/outputs/google_bigquery.rb index f9afea2..484f118 100644 --- a/lib/logstash/outputs/google_bigquery.rb +++ b/lib/logstash/outputs/google_bigquery.rb @@ -167,6 +167,12 @@ class LogStash::Outputs::GoogleBigQuery < LogStash::Outputs::Base # which causes the entire request to fail if any invalid rows exist. config :skip_invalid_rows, validate: :boolean, default: false + # Retry appending data if the append request failed. If a partial failue was detected, retry only the filed rows, otherwise retry with the full batch. + config :max_tries, validate: :number, default: 1 + + # Wait this amount of seconds before attempting a retry. + config :retry_delay, validate: :number, default: nil + # The following configuration options still exist to alert users that are using them config :uploader_interval_secs, validate: :number, deprecated: 'No longer used.' config :deleter_interval_secs, validate: :number, deprecated: 'No longer used.' @@ -238,9 +244,39 @@ def publish(messages) table = get_table_name @logger.info("Publishing #{messages.length} messages to #{table}") - create_table_if_not_exists table + begin + try_count ||= 0 + try_count += 1 + + if @retry_delay and try_count > 1 + sleep(@retry_delay) + end + + create_table_if_not_exists table + + failed_rows = @bq_client.append(@dataset, table, messages, @ignore_unknown_values, @skip_invalid_rows) + raise "failed rows" unless failed_rows.empty? + rescue => e + if try_count < @max_tries + if e.to_s == "failed rows" + @logger.warn "#{failed_rows.count} failed rows detected, will retry, remaining tries: #{@max_tries - try_count}." + messages = failed_rows + retry + end + @logger.warn "Cought exception, remaining tries: #{@max_tries - try_count}.", :exception => e + retry + else + if e.to_s != 'failed rows' + @logger.warn 'Giving up' + raise + end + end + ensure + if try_count > 1 + @logger.warn "Publish succeeded at try #{try_count}/#{@max_tries}" + end + end - failed_rows = @bq_client.append(@dataset, table, messages, @ignore_unknown_values, @skip_invalid_rows) write_to_errors_file(failed_rows, table) unless failed_rows.empty? rescue StandardError => e @logger.error 'Error uploading data.', :exception => e diff --git a/spec/outputs/google_bigquery_spec.rb b/spec/outputs/google_bigquery_spec.rb index b5d957c..fe492c9 100644 --- a/spec/outputs/google_bigquery_spec.rb +++ b/spec/outputs/google_bigquery_spec.rb @@ -72,6 +72,28 @@ expect(subject).not_to receive(:create_table_if_not_exists) end + context 'if retry is configured' do + let(:config) { { 'project_id' => 'project', 'dataset' => 'dataset', 'csv_schema' => 'path:STRING,status:INTEGER,score:FLOAT', 'max_tries' => 2, 'retry_delay' => 1 } } + + it 'tries again if insert threw an exception' do + allow(subject).to receive(:create_table_if_not_exists).and_return(nil) + allow(bq_client).to receive(:append).and_raise('expected insert error') + expect(bq_client).to receive(:append).twice + expect(subject).to receive(:sleep).once + + subject.publish ['{"foo":"bar"}'] + end + + it 'tries again on failed insert' do + allow(subject).to receive(:create_table_if_not_exists).and_return(nil) + allow(bq_client).to receive(:append).and_return([0]) + expect(bq_client).to receive(:append).twice + expect(subject).to receive(:sleep).once + + subject.publish ['{"foo":"bar"}'] + end + end + it 'creates a table if it does not exist' do allow(subject).to receive(:create_table_if_not_exists).and_return(nil) allow(bq_client).to receive(:append).and_return([])