Change automatic post deletion thresholds and load detection (#24614)
This commit is contained in:
parent
73e9555994
commit
5340ab622a
@ -7,30 +7,28 @@ class Scheduler::AccountsStatusesCleanupScheduler
|
||||
# This limit is mostly to be nice to the fediverse at large and not
|
||||
# generate too much traffic.
|
||||
# This also helps limiting the running time of the scheduler itself.
|
||||
MAX_BUDGET = 300
|
||||
MAX_BUDGET = 150
|
||||
|
||||
# This is an attempt to spread the load across remote servers, as
|
||||
# spreading deletions across diverse accounts is likely to spread
|
||||
# the deletion across diverse followers. It also helps each individual
|
||||
# user see some effect sooner.
|
||||
# This is an attempt to spread the load across instances, as various
|
||||
# accounts are likely to have various followers.
|
||||
PER_ACCOUNT_BUDGET = 5
|
||||
|
||||
# This is an attempt to limit the workload generated by status removal
|
||||
# jobs to something the particular server can handle.
|
||||
PER_THREAD_BUDGET = 5
|
||||
# jobs to something the particular instance can handle.
|
||||
PER_THREAD_BUDGET = 6
|
||||
|
||||
# These are latency limits on various queues above which a server is
|
||||
# considered to be under load, causing the auto-deletion to be entirely
|
||||
# skipped for that run.
|
||||
LOAD_LATENCY_THRESHOLDS = {
|
||||
default: 5,
|
||||
push: 10,
|
||||
# The `pull` queue has lower priority jobs, and it's unlikely that
|
||||
# pushing deletes would cause much issues with this queue if it didn't
|
||||
# cause issues with `default` and `push`. Yet, do not enqueue deletes
|
||||
# if the instance is lagging behind too much.
|
||||
pull: 5.minutes.to_i,
|
||||
}.freeze
|
||||
# Those avoid loading an instance that is already under load
|
||||
MAX_DEFAULT_SIZE = 200
|
||||
MAX_DEFAULT_LATENCY = 5
|
||||
MAX_PUSH_SIZE = 500
|
||||
MAX_PUSH_LATENCY = 10
|
||||
|
||||
# 'pull' queue has lower priority jobs, and it's unlikely that pushing
|
||||
# deletes would cause much issues with this queue if it didn't cause issues
|
||||
# with default and push. Yet, do not enqueue deletes if the instance is
|
||||
# lagging behind too much.
|
||||
MAX_PULL_SIZE = 10_000
|
||||
MAX_PULL_LATENCY = 5.minutes.to_i
|
||||
|
||||
sidekiq_options retry: 0, lock: :until_executed, lock_ttl: 1.day.to_i
|
||||
|
||||
@ -38,37 +36,17 @@ class Scheduler::AccountsStatusesCleanupScheduler
|
||||
return if under_load?
|
||||
|
||||
budget = compute_budget
|
||||
|
||||
# If the budget allows it, we want to consider all accounts with enabled
|
||||
# auto cleanup at least once.
|
||||
#
|
||||
# We start from `first_policy_id` (the last processed id in the previous
|
||||
# run) and process each policy until we loop to `first_policy_id`,
|
||||
# recording into `affected_policies` any policy that caused posts to be
|
||||
# deleted.
|
||||
#
|
||||
# After that, we set `full_iteration` to `false` and continue looping on
|
||||
# policies from `affected_policies`.
|
||||
first_policy_id = last_processed_id || 0
|
||||
first_iteration = true
|
||||
full_iteration = true
|
||||
affected_policies = []
|
||||
first_policy_id = last_processed_id
|
||||
|
||||
loop do
|
||||
num_processed_accounts = 0
|
||||
|
||||
scope = cleanup_policies(first_policy_id, affected_policies, first_iteration, full_iteration)
|
||||
scope = AccountStatusesCleanupPolicy.where(enabled: true)
|
||||
scope.where(Account.arel_table[:id].gt(first_policy_id)) if first_policy_id.present?
|
||||
scope.find_each(order: :asc) do |policy|
|
||||
num_deleted = AccountStatusesCleanupService.new.call(policy, [budget, PER_ACCOUNT_BUDGET].min)
|
||||
num_processed_accounts += 1 unless num_deleted.zero?
|
||||
budget -= num_deleted
|
||||
|
||||
unless num_deleted.zero?
|
||||
num_processed_accounts += 1
|
||||
affected_policies << policy.id if full_iteration
|
||||
end
|
||||
|
||||
full_iteration = false if !first_iteration && policy.id >= first_policy_id
|
||||
|
||||
if budget.zero?
|
||||
save_last_processed_id(policy.id)
|
||||
break
|
||||
@ -77,55 +55,36 @@ class Scheduler::AccountsStatusesCleanupScheduler
|
||||
|
||||
# The idea here is to loop through all policies at least once until the budget is exhausted
|
||||
# and start back after the last processed account otherwise
|
||||
break if budget.zero? || (num_processed_accounts.zero? && !full_iteration)
|
||||
|
||||
full_iteration = false unless first_iteration
|
||||
first_iteration = false
|
||||
break if budget.zero? || (num_processed_accounts.zero? && first_policy_id.nil?)
|
||||
first_policy_id = nil
|
||||
end
|
||||
end
|
||||
|
||||
def compute_budget
|
||||
# Each post deletion is a `RemovalWorker` job (on `default` queue), each
|
||||
# potentially spawning many `ActivityPub::DeliveryWorker` jobs (on the `push` queue).
|
||||
threads = Sidekiq::ProcessSet.new.select { |x| x['queues'].include?('push') }.pluck('concurrency').sum
|
||||
threads = Sidekiq::ProcessSet.new.select { |x| x['queues'].include?('push') }.map { |x| x['concurrency'] }.sum
|
||||
[PER_THREAD_BUDGET * threads, MAX_BUDGET].min
|
||||
end
|
||||
|
||||
def under_load?
|
||||
LOAD_LATENCY_THRESHOLDS.any? { |queue, max_latency| queue_under_load?(queue, max_latency) }
|
||||
queue_under_load?('default', MAX_DEFAULT_SIZE, MAX_DEFAULT_LATENCY) || queue_under_load?('push', MAX_PUSH_SIZE, MAX_PUSH_LATENCY) || queue_under_load?('pull', MAX_PULL_SIZE, MAX_PULL_LATENCY)
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def cleanup_policies(first_policy_id, affected_policies, first_iteration, full_iteration)
|
||||
scope = AccountStatusesCleanupPolicy.where(enabled: true)
|
||||
|
||||
if full_iteration
|
||||
# If we are doing a full iteration, examine all policies we have not examined yet
|
||||
if first_iteration
|
||||
scope.where(id: first_policy_id...)
|
||||
else
|
||||
scope.where(id: ..first_policy_id).or(scope.where(id: affected_policies))
|
||||
end
|
||||
else
|
||||
# Otherwise, examine only policies that previously yielded posts to delete
|
||||
scope.where(id: affected_policies)
|
||||
end
|
||||
end
|
||||
|
||||
def queue_under_load?(name, max_latency)
|
||||
Sidekiq::Queue.new(name).latency > max_latency
|
||||
def queue_under_load?(name, max_size, max_latency)
|
||||
queue = Sidekiq::Queue.new(name)
|
||||
queue.size > max_size || queue.latency > max_latency
|
||||
end
|
||||
|
||||
def last_processed_id
|
||||
redis.get('account_statuses_cleanup_scheduler:last_policy_id')&.to_i
|
||||
redis.get('account_statuses_cleanup_scheduler:last_account_id')
|
||||
end
|
||||
|
||||
def save_last_processed_id(id)
|
||||
if id.nil?
|
||||
redis.del('account_statuses_cleanup_scheduler:last_policy_id')
|
||||
redis.del('account_statuses_cleanup_scheduler:last_account_id')
|
||||
else
|
||||
redis.set('account_statuses_cleanup_scheduler:last_policy_id', id, ex: 1.hour.seconds)
|
||||
redis.set('account_statuses_cleanup_scheduler:last_account_id', id, ex: 1.hour.seconds)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
Loading…
x
Reference in New Issue
Block a user