Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

[Draft] Add support for exponential growing inference#1987

Open
jgreer013 wants to merge 5 commits intomainfrom
jgreer013/exponential-inference
Open

[Draft] Add support for exponential growing inference#1987
jgreer013 wants to merge 5 commits intomainfrom
jgreer013/exponential-inference

Conversation

@jgreer013
Copy link
Contributor

@jgreer013 jgreer013 commented Oct 13, 2025

Description

Screenshot 2025-10-13 at 2 13 44 PM

Related issues

Fixes # (issue)

Before submitting

  • This PR only changes documentation. (You can ignore the following checks in that case)
  • Did you read the contributor guideline Pull Request guidelines?
  • Did you link the issue(s) related to this PR in the section above?
  • Did you add / update tests where needed?

Reviewers

At least one review from a member of oumi-ai/oumi-staff is required.

@gitar-bot
Copy link

gitar-bot bot commented Jan 30, 2026

Code Review ⚠️ Changes requested 0 resolved / 4 findings

Re-reviewed the exponential scaling PR. All 4 previous findings remain unresolved: fire-and-forget task without exception handling, unbounded RPM history and good concurrency stack, and race condition on burst backoff flag.

⚠️ Bug: Fire-and-forget task may cause unhandled exceptions

📄 src/oumi/inference/adaptive_concurrency_controller.py

The asyncio.create_task(self._immediate_burst_backoff()) call creates an unmanaged task. If _immediate_burst_backoff() raises an exception, it will be silently ignored (or logged as "Task exception was never retrieved" depending on Python version and settings).

Impact: Errors during burst backoff won't be visible to the caller or properly logged, making debugging production issues difficult.

Suggested fix: Either:

  1. Store the task reference and handle exceptions explicitly
  2. Add exception handling within _immediate_burst_backoff() (partially done, but could still raise during _try_backoff())
  3. Use asyncio.ensure_future() with a done callback to log exceptions
# Option 1: Store and handle
task = asyncio.create_task(self._immediate_burst_backoff())
task.add_done_callback(lambda t: t.exception() if not t.cancelled() else None)

# Option 2: More robust error handling in _immediate_burst_backoff
async def _immediate_burst_backoff(self):
    try:
        logger.info("Executing immediate burst backoff...")
        await self._try_backoff()
    except Exception as e:
        logger.error("Burst backoff failed: %s", e)
    finally:
        self._burst_backoff_in_progress = False
⚠️ Edge Case: RPM history unbounded, may cause memory growth

📄 src/oumi/inference/adaptive_concurrency_controller.py

The _rpm_history list grows unbounded as successful adjustments are recorded. In long-running processes, this could cause memory to grow indefinitely.

self._rpm_history: list[tuple[float, int, float]] = []
# ...
self._rpm_history.append((current_rpm, self._current_concurrency, current_politeness))

Impact: Memory leak in long-running inference jobs that may run for hours or days.

Suggested fix: Use a deque with a maximum length or periodically prune old entries:

# Option 1: Use deque with maxlen
self._rpm_history: deque[tuple[float, int, float]] = deque(maxlen=100)

# Option 2: Keep only the best few entries
# When appending, also prune if needed:
if len(self._rpm_history) > 100:
    # Keep only top 10 by RPM
    self._rpm_history = sorted(self._rpm_history, key=lambda x: x[0], reverse=True)[:10]
⚠️ Bug: Race condition: accessing `_burst_backoff_in_progress` outside lock

📄 src/oumi/inference/adaptive_concurrency_controller.py

The flag _burst_backoff_in_progress is checked and set inside record_error() while holding _outcome_lock, but it's also cleared in _immediate_burst_backoff() without any lock. This could cause race conditions where:

  1. Thread A sets _burst_backoff_in_progress = True and creates the task
  2. The task clears _burst_backoff_in_progress = False
  3. Thread B sees False and creates a duplicate burst backoff

Impact: Potential duplicate burst backoffs or skipped backoffs due to unsynchronized flag access.

Suggested fix: Either:

  1. Use asyncio.Lock to protect the flag access in _immediate_burst_backoff()
  2. Or rely on the time-based rate limiting (last_burst_backoff_time) as the primary guard and document that _burst_backoff_in_progress is best-effort
async def _immediate_burst_backoff(self):
    try:
        logger.info("Executing immediate burst backoff...")
        await self._try_backoff()
    finally:
        async with self._outcome_lock:  # Use same lock
            self._burst_backoff_in_progress = False
💡 Edge Case: `_good_concurrency_stack` unbounded, could grow indefinitely

📄 src/oumi/inference/adaptive_concurrency_controller.py

Similar to _rpm_history, the _good_concurrency_stack list can grow without bound as the system explores new concurrency levels:

self._good_concurrency_stack: list[int] = []
# ...
self._good_concurrency_stack.append(self._current_concurrency)

Impact: Minor memory concern, though the growth rate is likely much slower than RPM history since it only grows during successful warmups.

Suggested fix: Add a maximum depth, keeping only the most recent good values:

MAX_GOOD_STACK_DEPTH = 20
# When appending:
self._good_concurrency_stack.append(self._current_concurrency)
if len(self._good_concurrency_stack) > MAX_GOOD_STACK_DEPTH:
    self._good_concurrency_stack = self._good_concurrency_stack[-MAX_GOOD_STACK_DEPTH:]
Options

Auto-apply is off → Gitar will not commit updates to this branch.
Display: compact → Showing less information.

Comment with these commands to change:

Auto-apply Compact
gitar auto-apply:on         
gitar display:verbose         

Was this helpful? React with 👍 / 👎 | Gitar

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant