Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion ruby/lib/ci/queue/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ class Configuration
attr_accessor :requeue_tolerance, :namespace, :failing_test, :statsd_endpoint
attr_accessor :max_test_duration, :max_test_duration_percentile, :track_test_duration
attr_accessor :max_test_failed, :redis_ttl, :warnings_file, :debug_log, :max_missed_heartbeat_seconds
attr_writer :heartbeat_max_test_duration
attr_accessor :lazy_load, :lazy_load_stream_batch_size
attr_writer :lazy_load_streaming_timeout
attr_accessor :lazy_load_test_helpers
Expand Down Expand Up @@ -57,7 +58,7 @@ def initialize(
grind_count: nil, max_duration: nil, failure_file: nil, max_test_duration: nil,
max_test_duration_percentile: 0.5, track_test_duration: false, max_test_failed: nil,
queue_init_timeout: nil, redis_ttl: 8 * 60 * 60, report_timeout: nil, inactive_workers_timeout: nil,
export_flaky_tests_file: nil, warnings_file: nil, debug_log: nil, max_missed_heartbeat_seconds: nil,
export_flaky_tests_file: nil, warnings_file: nil, debug_log: nil, max_missed_heartbeat_seconds: nil, heartbeat_max_test_duration: nil,
lazy_load: false, lazy_load_stream_batch_size: nil, lazy_load_streaming_timeout: nil, lazy_load_test_helpers: nil,
skip_stale_tests: false)
@build_id = build_id
Expand Down Expand Up @@ -86,6 +87,7 @@ def initialize(
@warnings_file = warnings_file
@debug_log = debug_log
@max_missed_heartbeat_seconds = max_missed_heartbeat_seconds
@heartbeat_max_test_duration = heartbeat_max_test_duration
@lazy_load = lazy_load
@lazy_load_stream_batch_size = lazy_load_stream_batch_size || 5_000
@lazy_load_streaming_timeout = lazy_load_streaming_timeout
Expand Down Expand Up @@ -153,6 +155,10 @@ def inactive_workers_timeout
@inactive_workers_timeout || timeout
end

def heartbeat_max_test_duration
@heartbeat_max_test_duration || (timeout * 10 if max_missed_heartbeat_seconds)
end

def max_consecutive_failures=(max)
if max
@circuit_breakers << CircuitBreaker::MaxConsecutiveFailures.new(max_consecutive_failures: max)
Expand Down
22 changes: 19 additions & 3 deletions ruby/lib/ci/queue/redis/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def reconnect_attempts
def with_heartbeat(id, lease: nil)
if heartbeat_enabled?
ensure_heartbeat_thread_alive!
heartbeat_state.set(:tick, id, lease)
heartbeat_state.set(:tick, id, lease, Process.clock_gettime(Process::CLOCK_MONOTONIC))
end

yield
Expand Down Expand Up @@ -386,16 +386,32 @@ def heartbeat
Thread.current.name = "CI::Queue#heartbeat"
Thread.current.abort_on_exception = true

capped = false

loop do
command = heartbeat_state.wait(1) # waits for max 1 second but wakes up immediately if we receive a command

case command&.first
when :tick
# command = [:tick, entry_id, lease_id]
next if capped

max_duration = config.heartbeat_max_test_duration
if max_duration
# command = [:tick, entry_id, lease_id, started_at]
# Use the absolute start time from when with_heartbeat was called so that
# the elapsed calculation is not skewed by heartbeat thread startup delay.
elapsed = Process.clock_gettime(Process::CLOCK_MONOTONIC) - command[3]
if elapsed >= max_duration
capped = true
next
end
end

# command = [:tick, entry_id, lease_id, started_at]
heartbeat_process.tick!(command[1], command[2])
when :reset
# Test finished, stop ticking until next test starts
nil
capped = false
when :stop
break
end
Expand Down
10 changes: 10 additions & 0 deletions ruby/lib/minitest/queue/runner.rb
Original file line number Diff line number Diff line change
Expand Up @@ -736,6 +736,16 @@ def parser
queue_config.max_missed_heartbeat_seconds = time || 30
end

help = <<~EOS
Maximum duration in seconds that the heartbeat will tick for a single test.
If a test runs longer than this, the heartbeat stops and the test entry becomes
eligible for reclamation by another worker.
Defaults to timeout * 10 when heartbeat is enabled.
EOS
opts.on("--heartbeat-max-test-duration SECONDS", Float, help) do |seconds|
queue_config.heartbeat_max_test_duration = seconds
end


opts.on("-v", "--verbose", "Verbose. Show progress processing files.") do
self.verbose = true
Expand Down
14 changes: 14 additions & 0 deletions ruby/test/ci/queue/configuration_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -200,5 +200,19 @@ def test_new_lazy_load_test_helpers_env
assert_equal ["test/test_helper.rb", "test/support/helper.rb"], config.lazy_load_test_helper_paths
end

def test_heartbeat_max_test_duration_defaults
# defaults to timeout*10 when heartbeat is enabled
config = Configuration.new(timeout: 5, max_missed_heartbeat_seconds: 1)
assert_equal 50, config.heartbeat_max_test_duration

# nil when heartbeat is disabled (no max_missed_heartbeat_seconds)
config = Configuration.new(timeout: 5)
assert_nil config.heartbeat_max_test_duration

# explicit value overrides the default
config = Configuration.new(timeout: 5, max_missed_heartbeat_seconds: 1, heartbeat_max_test_duration: 3)
assert_equal 3, config.heartbeat_max_test_duration
end

end
end
70 changes: 70 additions & 0 deletions ruby/test/ci/queue/redis_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,76 @@ def test_heartbeat_only_checks_lease
assert_nil result
end

def test_heartbeat_max_test_duration_stops_heartbeat
queue = worker(1, max_missed_heartbeat_seconds: 2, heartbeat_max_test_duration: 1, tests: [TEST_LIST.first], build_id: 'hb-cap')
queue.boot_heartbeat_process!

entry = nil
lease = nil
queue.poll do |test|
entry = test.queue_entry
lease = queue.lease_for(entry)

# Score should be updating while heartbeat ticks
queue.with_heartbeat(entry, lease: lease) do
sleep 0.5
score_while_ticking = @redis.zscore(queue.send(:key, 'running'), entry)
refute_nil score_while_ticking, "Entry should be in running set while heartbeat ticks"

# Sleep past the heartbeat cap (1s) + extra buffer
sleep 1.5

# After cap, score should have stopped updating.
# The entry should now be stale enough for reserve_lost to reclaim.
score_after_cap = @redis.zscore(queue.send(:key, 'running'), entry)
# Score should be frozen (not updated for >1s since cap at ~1s)
assert score_after_cap < CI::Queue.time_now.to_f - 1, "Score should be stale after heartbeat cap"
end

queue.acknowledge(entry)
end

refute_nil entry, "Test should have been reserved"
ensure
queue&.stop_heartbeat!
end

def test_heartbeat_cap_resets_between_tests
# Two slow tests; cap fires after 1s so the first one goes stale.
# After the first test finishes, :reset is sent and capped becomes false,
# so the heartbeat should resume ticking for the second test.
tests = TEST_LIST.first(2)
queue = worker(1, max_missed_heartbeat_seconds: 3, heartbeat_max_test_duration: 1, tests: tests, build_id: 'hb-reset')
queue.boot_heartbeat_process!

polled = []
queue.poll do |test|
entry = test.queue_entry
lease = queue.lease_for(entry)
polled << entry

queue.with_heartbeat(entry, lease: lease) do
if polled.size == 1
# Sleep past cap for first test — heartbeat stops ticking
sleep 2
score = @redis.zscore(queue.send(:key, 'running'), entry)
assert score < CI::Queue.time_now.to_f - 1, "First test score should be stale after cap"
else
# For second test, sleep briefly then verify score is fresh — reset worked
sleep 0.5
score = @redis.zscore(queue.send(:key, 'running'), entry)
assert score >= CI::Queue.time_now.to_f - 2, "Second test score should be fresh after cap reset"
end
end

queue.acknowledge(entry)
end

assert_equal 2, polled.size, "Both tests should have been polled"
ensure
queue&.stop_heartbeat!
end

def test_resolve_entry_falls_back_to_resolver
queue = worker(1, populate: false)
queue.instance_variable_set(:@index, { 'ATest#test_foo' => :ok })
Expand Down
23 changes: 23 additions & 0 deletions ruby/test/fixtures/test/consecutive_capped_tests.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# frozen_string_literal: true
require 'test_helper'

CI::Queue::Redis.max_sleep_time = 0.05

# Fixture for test_heartbeat_cap_resets_between_tests.
#
# test_alpha fires the heartbeat cap (sleep 2 > cap 1s) but finishes before going stale
# (sleep 2 < cap 1 + heartbeat 2 = 3s). This sets capped=true in the heartbeat thread.
# After test_alpha, :reset is sent and capped should be false.
#
# test_beta sleeps in the range (heartbeat=2, heartbeat+cap=3):
# - Without reset: no ticks, stale at t_B + 2s, finishes at t_B + 2.5s → STOLEN
# - With reset: ticks until cap at t_B + 1s, stale at t_B + 3s, finishes at t_B + 2.5s → NOT stolen
class ConsecutiveCappedTests < Minitest::Test
def test_alpha
sleep 2
end

def test_beta
sleep 2.5
end
end
16 changes: 16 additions & 0 deletions ruby/test/fixtures/test/two_lost_tests.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# frozen_string_literal: true
require 'test_helper'

CI::Queue::Redis.max_sleep_time = 0.05

class TwoLostTests < Minitest::Test

def test_alpha
sleep 3
end

def test_beta
sleep 3
end

end
Loading
Loading