Skip to content

Commit

Permalink
Journal refactoring stage 4
Browse files Browse the repository at this point in the history
  • Loading branch information
pyromaniac committed Jul 20, 2017
1 parent b7fe263 commit ea53687
Show file tree
Hide file tree
Showing 24 changed files with 467 additions and 281 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,4 @@ tmp
.rvmrc
_site
.sass-cache
file::memory:?cache=shared
2 changes: 1 addition & 1 deletion .rubocop_todo.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ Metrics/MethodLength:
# Offense count: 1
# Configuration parameters: CountComments.
Metrics/ModuleLength:
Max: 177
Max: 180

# Offense count: 14
Metrics/PerceivedComplexity:
Expand Down
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

* Changed behavior of `Chewy::Index.index_name`, it doesn't cache the values anymore.

* Journal interfaces, related code and rake tasks were completely refactored and are not compatible with the previous version.

## Changes

* Parallel import and rake tasks.
Expand Down
17 changes: 12 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ Chewy is an ODM and wrapper for [the official Elasticsearch client](https://gith
* [chewy:sync](#chewysync)
* [chewy:deploy](#chewydeploy)
* [Parallelizing rake tasks](#parallelizing-rake-tasks)
* [chewy:journal](#chewyjournal)
* [Rspec integration](#rspec-integration)
* [Minitest integration](#minitest-integration)
* [TODO a.k.a coming soon](#todo-aka-coming-soon)
Expand Down Expand Up @@ -573,12 +574,9 @@ class CityIndex
end
```

You may be wondering why do you need it? The answer is simple: Not to lose the data.
Imagine that:
You reset your index in Zero Downtime manner (to separate index), and meantime somebody keeps updating the data frequently (to old index). So all these actions will be written to the journal index and you'll be able to apply them after index reset with `Chewy::Journal::Apply.since(1.hour.ago.to_i)`.
You may be wondering why do you need it? The answer is simple: not to lose the data.

For index reset journaling is turned off even if you set `journal: true` in `config/chewy.yml` or in `default_import_options`.
You can change it only if you pass `journal: true` parameter explicitly to `#import`.
Imagine that you reset your index in a zero-downtime manner (to separate index), and at the meantime somebody keeps updating the data frequently (to old index). So all these actions will be written to the journal index and you'll be able to apply them after index reset using the `Chewy::Journal` interface.

### Types access

Expand Down Expand Up @@ -1004,6 +1002,15 @@ rake chewy:parallel:sync[4,-users]
rake chewy:parallel:deploy[4] # performs parallel upgrade and parallel sync afterwards
```
#### `chewy:journal`
This namespace contains two tasks for the journal manipulations: `chewy:journal:apply` and `chewy:journal:clean`. Both are taking time as the first argument (optional for clean) and a list of indexes/types exactly as the tasks above. Time can be in any format parsable by ActiveSupport.
```bash
rake chewy:journal:apply["$(date -v-1H -u +%FT%TZ)"] # apply journaled changes for the past hour
rake chewy:journal:apply["$(date -v-1H -u +%FT%TZ)",users] # apply journaled changes for the past hour on UsersIndex only
```
### Rspec integration
Just add `require 'chewy/rspec'` to your spec_helper.rb and you will get additional features: See [update_index.rb](lib/chewy/rspec/update_index.rb) for more details.
Expand Down
44 changes: 29 additions & 15 deletions lib/chewy/index/actions.rb
Original file line number Diff line number Diff line change
Expand Up @@ -150,30 +150,37 @@ def #{method} options = {}
METHOD
end

# Deletes, creates and imports data to the index.
# Returns import result
# Deletes, creates and imports data to the index. Returns the
# import result. If index name suffix is passed as the first
# argument - performs zero-downtime index resetting.
#
# UsersIndex.reset!
#
# If index name suffix passed as the first argument - performs
# zero-downtime index resetting (described here:
# http://www.elasticsearch.org/blog/changing-mapping-with-zero-downtime/).
# It also applies journal if anything was journaled during the
# reset.
#
# @example
# UsersIndex.reset!
# UsersIndex.reset! Time.now.to_i
#
def reset!(suffix = nil, journal: false, **options)
result = if suffix.present? && (indexes = self.indexes).present?
time = Time.now

# @see http://www.elasticsearch.org/blog/changing-mapping-with-zero-downtime
# @param suffix [String] a suffix for the newly created index
# @param apply_journal [true, false] if true, journal is applied after the import is completed
# @param journal [true, false] journalig is switched off for import during reset by default
# @param import_options [Hash] options, passed to the import call
# @return [true, false] false in case of errors
def reset!(suffix = nil, apply_journal: true, journal: false, **import_options)
result = if suffix.present?
start_time = Time.now
indexes = self.indexes
create! suffix, alias: false

general_name = index_name
suffixed_name = index_name(suffix: suffix)

optimize_index_settings suffixed_name
result = import options.merge(suffix: suffix, journal: journal, refresh: !Chewy.reset_disable_refresh_interval)
result = import import_options.merge(suffix: suffix, journal: journal, refresh: !Chewy.reset_disable_refresh_interval)
original_index_settings suffixed_name

delete if indexes.blank?
client.indices.update_aliases body: {actions: [
*indexes.map do |index|
{remove: {index: index, alias: general_name}}
Expand All @@ -182,17 +189,24 @@ def reset!(suffix = nil, journal: false, **options)
]}
client.indices.delete index: indexes if indexes.present?

Chewy::Journal.new(self).apply(time)
self.journal.apply(start_time, **import_options) if apply_journal
result
else
purge! suffix
import options.merge(journal: journal)
purge!
import import_options.merge(journal: journal)
end

specification.lock!
result
end

# A {Chewy::Journal} instance for the particular index
#
# @return [Chewy::Journal] journal instance
def journal
@journal ||= Chewy::Journal.new(self)
end

private

def optimize_index_settings(index_name)
Expand Down
58 changes: 23 additions & 35 deletions lib/chewy/journal.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,30 +7,33 @@ module Chewy
# journal.clean
#
class Journal
# @param only [Array<String, Chewy::Index, Chewy::Type>] indexes/types or even string references to perform actions on.
# @param only [Array<String, Chewy::Index, Chewy::Type>] indexes/types or even string references to perform actions on
def initialize(*only)
@only = only
end

# Applies all changes that were done since some moment to the specified
# indexes/types.
# Applies all changes that were done since the specified time to the
# specified indexes/types.
#
# @param since_time [Time, DateTime] timestamp from which changes will be applied
# @param retries [Integer] maximum number of attempts to make journal empty. By default is set to 10
def apply(since_time, retries: 10)
previous_entries = []
stage = 0
while stage < retries
# @param retries [Integer] maximum number of attempts to make journal empty, 10 by default
# @return [Integer] the amount of journal entries found
def apply(since_time, retries: 10, **import_options)
stage = 1
since_time -= 1
count = 0
while stage <= retries
entries = Chewy::Stash::Journal.entries(since_time, only: @only).to_a.presence or break
count += entries.size
groups = reference_groups(entries)
ActiveSupport::Notifications.instrument 'apply_journal.chewy', stage: stage, groups: groups
groups.each do |type, references|
type.import(references, import_options.merge(journal: false))
end
stage += 1
previous_entries.select { |entry| entry.created_at.to_i >= since_time }
entries = group(Chewy::Stash::Journal.entries(since_time, only: @only))
entries = subtract(entries, previous_entries)
break if entries.empty?
ActiveSupport::Notifications.instrument 'apply_journal.chewy', stage: stage
entries.each { |entry| entry.type.import(entry.references, journal: false) }
since_time = recent_timestamp(entries)
previous_entries = entries
since_time = entries.map(&:created_at).max
end
count
end

# Cleans journal for the specified indexes/types.
Expand All @@ -43,25 +46,10 @@ def clean(until_time = nil)

private

def group(entries)
entries.group_by(&:derivable_type_name).map do |_, grouped_entries|
grouped_entries.reduce(:merge)
end
end

def subtract(from, what)
return from if what.empty?
from.map do |from_entry|
ids = from_entry.references
what.each do |what_entry|
ids -= what_entry.references if from_entry.derivable_type_name == what_entry.derivable_type_name
end
from_entry.class.new(from_entry.attributes.merge('references' => ids.map(&:to_json))) if ids.present?
end.compact
end

def recent_timestamp(entries)
entries.map { |entry| entry.created_at.to_i }.max
def reference_groups(entries)
entries.group_by(&:type).map do |type, grouped_entries|
[type, grouped_entries.map(&:references).inject(:|)]
end.to_h
end
end
end
86 changes: 69 additions & 17 deletions lib/chewy/rake_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@ module RakeHelper
end

JOURNAL_CALLBACK = lambda do |output, _, _, _, _, payload| # rubocop:disable Metrics/ParameterLists
output.puts "Applying journal. Stage #{payload[:stage]}"
count = payload[:groups].values.map(&:size).sum
targets = payload[:groups].keys.sort_by(&:derivable_name)
output.puts " Applying journal to #{targets}, #{count} entries, stage #{payload[:stage]}"
end

class << self
# Performs zero downtime reindexing of all documents for the specified indexes
# Performs zero-downtime reindexing of all documents for the specified indexes
#
# @example
# Chewy::RakeHelper.reset # resets everything
Expand All @@ -32,6 +34,7 @@ class << self
# @param only [Array<Chewy::Index, String>, Chewy::Index, String] index or indexes to reset; if nothing is passed - uses all the indexes defined in the app
# @param except [Array<Chewy::Index, String>, Chewy::Index, String] index or indexes to exclude from processing
# @param parallel [true, Integer, Hash] any acceptable parallel options for import
# @param output [IO] output io for logging
# @return [Array<Chewy::Index>] indexes that were reset
def reset(only: nil, except: nil, parallel: nil, output: STDOUT)
subscribed_task_stats(output) do
Expand All @@ -41,7 +44,7 @@ def reset(only: nil, except: nil, parallel: nil, output: STDOUT)
end
end

# Performs zero downtime reindexing of all documents for the specified
# Performs zero-downtime reindexing of all documents for the specified
# indexes only if a particular index specification was changed.
#
# @example
Expand All @@ -54,6 +57,7 @@ def reset(only: nil, except: nil, parallel: nil, output: STDOUT)
# @param only [Array<Chewy::Index, String>, Chewy::Index, String] index or indexes to reset; if nothing is passed - uses all the indexes defined in the app
# @param except [Array<Chewy::Index, String>, Chewy::Index, String] index or indexes to exclude from processing
# @param parallel [true, Integer, Hash] any acceptable parallel options for import
# @param output [IO] output io for logging
# @return [Array<Chewy::Index>] indexes that were actually reset
def upgrade(only: nil, except: nil, parallel: nil, output: STDOUT)
subscribed_task_stats(output) do
Expand Down Expand Up @@ -91,6 +95,7 @@ def upgrade(only: nil, except: nil, parallel: nil, output: STDOUT)
# @param only [Array<Chewy::Index, Chewy::Type, String>, Chewy::Index, Chewy::Type, String] indexes or types to update; if nothing is passed - uses all the types defined in the app
# @param except [Array<Chewy::Index, Chewy::Type, String>, Chewy::Index, Chewy::Type, String] indexes or types to exclude from processing
# @param parallel [true, Integer, Hash] any acceptable parallel options for import
# @param output [IO] output io for logging
# @return [Array<Chewy::Type>] types that were actually updated
def update(only: nil, except: nil, parallel: nil, output: STDOUT)
subscribed_task_stats(output) do
Expand Down Expand Up @@ -118,6 +123,7 @@ def update(only: nil, except: nil, parallel: nil, output: STDOUT)
# @param only [Array<Chewy::Index, Chewy::Type, String>, Chewy::Index, Chewy::Type, String] indexes or types to synchronize; if nothing is passed - uses all the types defined in the app
# @param except [Array<Chewy::Index, Chewy::Type, String>, Chewy::Index, Chewy::Type, String] indexes or types to exclude from processing
# @param parallel [true, Integer, Hash] any acceptable parallel options for sync
# @param output [IO] output io for logging
# @return [Array<Chewy::Type>] types that were actually updated
def sync(only: nil, except: nil, parallel: nil, output: STDOUT)
subscribed_task_stats(output) do
Expand All @@ -140,6 +146,55 @@ def sync(only: nil, except: nil, parallel: nil, output: STDOUT)
end
end

# Applies changes that were done after the specified time for the
# specified indexes/types or all of them.
#
# @example
# Chewy::RakeHelper.journal_apply(time: 1.minute.ago) # applies entries created for the last minute
# Chewy::RakeHelper.journal_apply(time: 1.minute.ago, only: 'places') # applies only PlacesIndex::City and PlacesIndex::Country entries reated for the last minute
# Chewy::RakeHelper.journal_apply(time: 1.minute.ago, only: 'places#city') # applies PlacesIndex::City entries reated for the last minute only
# Chewy::RakeHelper.journal_apply(time: 1.minute.ago, except: PlacesIndex::Country) # applies everything, but PlacesIndex::Country entries reated for the last minute
# Chewy::RakeHelper.journal_apply(time: 1.minute.ago, only: 'places', except: 'places#country') # applies PlacesIndex::City entries reated for the last minute only
#
# @param time [Time, DateTime] use only journal entries created after this time
# @param only [Array<Chewy::Index, Chewy::Type, String>, Chewy::Index, Chewy::Type, String] indexes or types to synchronize; if nothing is passed - uses all the types defined in the app
# @param except [Array<Chewy::Index, Chewy::Type, String>, Chewy::Index, Chewy::Type, String] indexes or types to exclude from processing
# @param output [IO] output io for logging
# @return [Array<Chewy::Type>] types that were actually updated
def journal_apply(time: nil, only: nil, except: nil, output: STDOUT)
raise ArgumentError, 'Please specify the time to start with' unless time
subscribed_task_stats(output) do
output.puts "Applying journal entries created after #{time}"
count = Chewy::Journal.new(types_from(only: only, except: except)).apply(time)
output.puts 'No journal entries were created after the specified time' if count.zero?
end
end

# Removes journal records created before the specified timestamp for
# the specified indexes/types or all of them.
#
# @example
# Chewy::RakeHelper.journal_clean # cleans everything
# Chewy::RakeHelper.journal_clean(time: 1.minute.ago) # leaves only entries created for the last minute
# Chewy::RakeHelper.journal_clean(only: 'places') # cleans only PlacesIndex::City and PlacesIndex::Country entries
# Chewy::RakeHelper.journal_clean(only: 'places#city') # cleans PlacesIndex::City entries only
# Chewy::RakeHelper.journal_clean(except: PlacesIndex::Country) # cleans everything, but PlacesIndex::Country entries
# Chewy::RakeHelper.journal_clean(only: 'places', except: 'places#country') # cleans PlacesIndex::City entries only
#
# @param time [Time, DateTime] clean all the journal entries created before this time
# @param only [Array<Chewy::Index, Chewy::Type, String>, Chewy::Index, Chewy::Type, String] indexes or types to synchronize; if nothing is passed - uses all the types defined in the app
# @param except [Array<Chewy::Index, Chewy::Type, String>, Chewy::Index, Chewy::Type, String] indexes or types to exclude from processing
# @param output [IO] output io for logging
# @return [Array<Chewy::Type>] types that were actually updated
def journal_clean(time: nil, only: nil, except: nil, output: STDOUT)
subscribed_task_stats(output) do
output.puts "Cleaning journal entries created before #{time}" if time
response = Chewy::Journal.new(types_from(only: only, except: except)).clean(time)
count = response['deleted'] || response['_indices']['_all']['deleted']
output.puts "Cleaned up #{count} journal entries"
end
end

# Eager loads and returns all the indexes defined in the application
# except the Chewy::Stash.
#
Expand All @@ -149,24 +204,15 @@ def all_indexes
Chewy::Index.descendants - [Chewy::Stash]
end

def human_duration(seconds)
[[60, :s], [60, :m], [24, :h]].map do |amount, unit|
if seconds > 0
seconds, n = seconds.divmod(amount)
"#{n.to_i}#{unit}"
end
end.compact.reverse.join(' ')
def normalize_indexes(*identifiers)
identifiers.flatten(1).map { |identifier| normalize_index(identifier) }
end

def normalize_index(identifier)
return identifier if identifier.is_a?(Class) && identifier < Chewy::Index
"#{identifier.to_s.gsub(/identifier\z/i, '').camelize}Index".constantize
end

def normalize_indexes(*identifiers)
identifiers.flatten(1).map { |identifier| normalize_index(identifier) }
end

def subscribed_task_stats(output = STDOUT)
start = Time.now
ActiveSupport::Notifications.subscribed(JOURNAL_CALLBACK.curry[output], 'apply_journal.chewy') do
Expand Down Expand Up @@ -236,12 +282,18 @@ def normalize_types(*identifiers)
end

def normalize_type(identifier)
return identifier if identifier.is_a?(Class) && identifier < Chewy::Type
return identifier.types if identifier.is_a?(Class) && identifier < Chewy::Index

Chewy.derive_types(identifier)
end

def human_duration(seconds)
[[60, :s], [60, :m], [24, :h]].map do |amount, unit|
if seconds > 0
seconds, n = seconds.divmod(amount)
"#{n.to_i}#{unit}"
end
end.compact.reverse.join(' ')
end

def reset_one(index, output, parallel: false)
output.puts "Resetting #{index}"
index.reset!((Time.now.to_f * 1000).round, parallel: parallel)
Expand Down
Loading

0 comments on commit ea53687

Please sign in to comment.