Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 37 additions & 31 deletions lib/strategy/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -114,29 +114,33 @@ def process
def process_table progress
index = 0

source_table_limited.each do |record|
index += 1
begin
process_record_if index, record
rescue => exception
@errors.log_error record, exception
source_table.transaction do
source_table_limited.each do |record|
index += 1
begin
process_record_if index, record
rescue => exception
@errors.log_error record, exception
end
progress.show index
end
progress.show index
end
end

def process_table_in_batches progress
logger.info "Processing table #{@name} records in batch size of #{@batch_size}"
index = 0

source_table_limited.find_each(:batch_size => @batch_size) do |record|
index += 1
begin
process_record_if index, record
rescue => exception
@errors.log_error record, exception
source_table.transaction do
source_table_limited.find_each(:batch_size => @batch_size) do |record|
index += 1
begin
process_record_if index, record
rescue => exception
@errors.log_error record, exception
end
progress.show index
end
progress.show index
end
end

Expand All @@ -146,25 +150,27 @@ def process_table_in_threads progress
index = 0
threads = []

source_table.find_in_batches(batch_size: @batch_size) do |records|
until threads.count(&:alive?) <= @thread_num
thr = threads.delete_at 0
thr.join
progress.show index
end
source_table.transaction do
source_table.find_in_batches(batch_size: @batch_size) do |records|
until threads.count(&:alive?) <= @thread_num
thr = threads.delete_at 0
thr.join
progress.show index
end

thr = Thread.new {
records.each do |record|
begin
process_record_if index, record
index += 1
rescue => exception
puts exception.inspect
@errors.log_error record, exception
thr = Thread.new {
records.each do |record|
begin
process_record_if index, record
index += 1
rescue => exception
puts exception.inspect
@errors.log_error record, exception
end
end
end
}
threads << thr
}
threads << thr
end
end

until threads.empty?
Expand Down