From 5340ab622ab6f3e00e650c9420f520a46d9c42f7 Mon Sep 17 00:00:00 2001 From: Claire Date: Fri, 21 Apr 2023 18:14:19 +0200 Subject: [PATCH] Change automatic post deletion thresholds and load detection (#24614) --- .../accounts_statuses_cleanup_scheduler.rb | 103 ++++++------------ 1 file changed, 31 insertions(+), 72 deletions(-) diff --git a/app/workers/scheduler/accounts_statuses_cleanup_scheduler.rb b/app/workers/scheduler/accounts_statuses_cleanup_scheduler.rb index a2ab31cc5d..d245f6bbdc 100644 --- a/app/workers/scheduler/accounts_statuses_cleanup_scheduler.rb +++ b/app/workers/scheduler/accounts_statuses_cleanup_scheduler.rb @@ -7,30 +7,28 @@ class Scheduler::AccountsStatusesCleanupScheduler # This limit is mostly to be nice to the fediverse at large and not # generate too much traffic. # This also helps limiting the running time of the scheduler itself. - MAX_BUDGET = 300 + MAX_BUDGET = 150 - # This is an attempt to spread the load across remote servers, as - # spreading deletions across diverse accounts is likely to spread - # the deletion across diverse followers. It also helps each individual - # user see some effect sooner. + # This is an attempt to spread the load across instances, as various + # accounts are likely to have various followers. PER_ACCOUNT_BUDGET = 5 # This is an attempt to limit the workload generated by status removal - # jobs to something the particular server can handle. - PER_THREAD_BUDGET = 5 + # jobs to something the particular instance can handle. + PER_THREAD_BUDGET = 6 - # These are latency limits on various queues above which a server is - # considered to be under load, causing the auto-deletion to be entirely - # skipped for that run. - LOAD_LATENCY_THRESHOLDS = { - default: 5, - push: 10, - # The `pull` queue has lower priority jobs, and it's unlikely that - # pushing deletes would cause much issues with this queue if it didn't - # cause issues with `default` and `push`. Yet, do not enqueue deletes - # if the instance is lagging behind too much. - pull: 5.minutes.to_i, - }.freeze + # Those avoid loading an instance that is already under load + MAX_DEFAULT_SIZE = 200 + MAX_DEFAULT_LATENCY = 5 + MAX_PUSH_SIZE = 500 + MAX_PUSH_LATENCY = 10 + + # 'pull' queue has lower priority jobs, and it's unlikely that pushing + # deletes would cause much issues with this queue if it didn't cause issues + # with default and push. Yet, do not enqueue deletes if the instance is + # lagging behind too much. + MAX_PULL_SIZE = 10_000 + MAX_PULL_LATENCY = 5.minutes.to_i sidekiq_options retry: 0, lock: :until_executed, lock_ttl: 1.day.to_i @@ -38,37 +36,17 @@ class Scheduler::AccountsStatusesCleanupScheduler return if under_load? budget = compute_budget - - # If the budget allows it, we want to consider all accounts with enabled - # auto cleanup at least once. - # - # We start from `first_policy_id` (the last processed id in the previous - # run) and process each policy until we loop to `first_policy_id`, - # recording into `affected_policies` any policy that caused posts to be - # deleted. - # - # After that, we set `full_iteration` to `false` and continue looping on - # policies from `affected_policies`. - first_policy_id = last_processed_id || 0 - first_iteration = true - full_iteration = true - affected_policies = [] + first_policy_id = last_processed_id loop do num_processed_accounts = 0 - scope = cleanup_policies(first_policy_id, affected_policies, first_iteration, full_iteration) + scope = AccountStatusesCleanupPolicy.where(enabled: true) + scope.where(Account.arel_table[:id].gt(first_policy_id)) if first_policy_id.present? scope.find_each(order: :asc) do |policy| num_deleted = AccountStatusesCleanupService.new.call(policy, [budget, PER_ACCOUNT_BUDGET].min) + num_processed_accounts += 1 unless num_deleted.zero? budget -= num_deleted - - unless num_deleted.zero? - num_processed_accounts += 1 - affected_policies << policy.id if full_iteration - end - - full_iteration = false if !first_iteration && policy.id >= first_policy_id - if budget.zero? save_last_processed_id(policy.id) break @@ -77,55 +55,36 @@ class Scheduler::AccountsStatusesCleanupScheduler # The idea here is to loop through all policies at least once until the budget is exhausted # and start back after the last processed account otherwise - break if budget.zero? || (num_processed_accounts.zero? && !full_iteration) - - full_iteration = false unless first_iteration - first_iteration = false + break if budget.zero? || (num_processed_accounts.zero? && first_policy_id.nil?) + first_policy_id = nil end end def compute_budget - # Each post deletion is a `RemovalWorker` job (on `default` queue), each - # potentially spawning many `ActivityPub::DeliveryWorker` jobs (on the `push` queue). - threads = Sidekiq::ProcessSet.new.select { |x| x['queues'].include?('push') }.pluck('concurrency').sum + threads = Sidekiq::ProcessSet.new.select { |x| x['queues'].include?('push') }.map { |x| x['concurrency'] }.sum [PER_THREAD_BUDGET * threads, MAX_BUDGET].min end def under_load? - LOAD_LATENCY_THRESHOLDS.any? { |queue, max_latency| queue_under_load?(queue, max_latency) } + queue_under_load?('default', MAX_DEFAULT_SIZE, MAX_DEFAULT_LATENCY) || queue_under_load?('push', MAX_PUSH_SIZE, MAX_PUSH_LATENCY) || queue_under_load?('pull', MAX_PULL_SIZE, MAX_PULL_LATENCY) end private - def cleanup_policies(first_policy_id, affected_policies, first_iteration, full_iteration) - scope = AccountStatusesCleanupPolicy.where(enabled: true) - - if full_iteration - # If we are doing a full iteration, examine all policies we have not examined yet - if first_iteration - scope.where(id: first_policy_id...) - else - scope.where(id: ..first_policy_id).or(scope.where(id: affected_policies)) - end - else - # Otherwise, examine only policies that previously yielded posts to delete - scope.where(id: affected_policies) - end - end - - def queue_under_load?(name, max_latency) - Sidekiq::Queue.new(name).latency > max_latency + def queue_under_load?(name, max_size, max_latency) + queue = Sidekiq::Queue.new(name) + queue.size > max_size || queue.latency > max_latency end def last_processed_id - redis.get('account_statuses_cleanup_scheduler:last_policy_id')&.to_i + redis.get('account_statuses_cleanup_scheduler:last_account_id') end def save_last_processed_id(id) if id.nil? - redis.del('account_statuses_cleanup_scheduler:last_policy_id') + redis.del('account_statuses_cleanup_scheduler:last_account_id') else - redis.set('account_statuses_cleanup_scheduler:last_policy_id', id, ex: 1.hour.seconds) + redis.set('account_statuses_cleanup_scheduler:last_account_id', id, ex: 1.hour.seconds) end end end