Use separate workers to process imports, retry failures (#5207)
This commit is contained in:
		
							parent
							
								
									c743b5e1fd
								
							
						
					
					
						commit
						cdd5ef691b
					
				
							
								
								
									
										25
									
								
								app/workers/import/relationship_worker.rb
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										25
									
								
								app/workers/import/relationship_worker.rb
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,25 @@
 | 
			
		||||
# frozen_string_literal: true
 | 
			
		||||
 | 
			
		||||
class Import::RelationshipWorker
 | 
			
		||||
  include Sidekiq::Worker
 | 
			
		||||
 | 
			
		||||
  sidekiq_options queue: 'pull', retry: 8, dead: false
 | 
			
		||||
 | 
			
		||||
  def perform(account_id, target_account_uri, relationship)
 | 
			
		||||
    from_account   = Account.find(account_id)
 | 
			
		||||
    target_account = ResolveRemoteAccountService.new.call(target_account_uri)
 | 
			
		||||
 | 
			
		||||
    return if target_account.nil?
 | 
			
		||||
 | 
			
		||||
    case relationship
 | 
			
		||||
    when 'follow'
 | 
			
		||||
      FollowService.new.call(from_account, target_account.acct)
 | 
			
		||||
    when 'block'
 | 
			
		||||
      BlockService.new.call(from_account, target_account)
 | 
			
		||||
    when 'mute'
 | 
			
		||||
      MuteService.new.call(from_account, target_account)
 | 
			
		||||
    end
 | 
			
		||||
  rescue ActiveRecord::RecordNotFound
 | 
			
		||||
    true
 | 
			
		||||
  end
 | 
			
		||||
end
 | 
			
		||||
@ -12,13 +12,8 @@ class ImportWorker
 | 
			
		||||
  def perform(import_id)
 | 
			
		||||
    @import = Import.find(import_id)
 | 
			
		||||
 | 
			
		||||
    case @import.type
 | 
			
		||||
    when 'blocking'
 | 
			
		||||
      process_blocks
 | 
			
		||||
    when 'following'
 | 
			
		||||
      process_follows
 | 
			
		||||
    when 'muting'
 | 
			
		||||
      process_mutes
 | 
			
		||||
    Import::RelationshipWorker.push_bulk(import_rows) do |row|
 | 
			
		||||
      [@import.account_id, row.first, relationship_type]
 | 
			
		||||
    end
 | 
			
		||||
 | 
			
		||||
    @import.destroy
 | 
			
		||||
@ -26,49 +21,22 @@ class ImportWorker
 | 
			
		||||
 | 
			
		||||
  private
 | 
			
		||||
 | 
			
		||||
  def from_account
 | 
			
		||||
    @import.account
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  def import_contents
 | 
			
		||||
    Paperclip.io_adapters.for(@import.data).read
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  def relationship_type
 | 
			
		||||
    case @import.type
 | 
			
		||||
    when 'following'
 | 
			
		||||
      'follow'
 | 
			
		||||
    when 'blocking'
 | 
			
		||||
      'block'
 | 
			
		||||
    when 'muting'
 | 
			
		||||
      'mute'
 | 
			
		||||
    end
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  def import_rows
 | 
			
		||||
    CSV.new(import_contents).reject(&:blank?)
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  def process_mutes
 | 
			
		||||
    import_rows.each do |row|
 | 
			
		||||
      begin
 | 
			
		||||
        target_account = ResolveRemoteAccountService.new.call(row.first)
 | 
			
		||||
        next if target_account.nil?
 | 
			
		||||
        MuteService.new.call(from_account, target_account)
 | 
			
		||||
      rescue Mastodon::UnexpectedResponseError, HTTP::Error, OpenSSL::SSL::SSLError
 | 
			
		||||
        next
 | 
			
		||||
      end
 | 
			
		||||
    end
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  def process_blocks
 | 
			
		||||
    import_rows.each do |row|
 | 
			
		||||
      begin
 | 
			
		||||
        target_account = ResolveRemoteAccountService.new.call(row.first)
 | 
			
		||||
        next if target_account.nil?
 | 
			
		||||
        BlockService.new.call(from_account, target_account)
 | 
			
		||||
      rescue Mastodon::UnexpectedResponseError, HTTP::Error, OpenSSL::SSL::SSLError
 | 
			
		||||
        next
 | 
			
		||||
      end
 | 
			
		||||
    end
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  def process_follows
 | 
			
		||||
    import_rows.each do |row|
 | 
			
		||||
      begin
 | 
			
		||||
        FollowService.new.call(from_account, row.first)
 | 
			
		||||
      rescue Mastodon::NotPermittedError, ActiveRecord::RecordNotFound, Mastodon::UnexpectedResponseError, HTTP::Error, OpenSSL::SSL::SSLError
 | 
			
		||||
        next
 | 
			
		||||
      end
 | 
			
		||||
    end
 | 
			
		||||
  end
 | 
			
		||||
end
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user