Source code for valkey.backoff
import random
from abc import ABC, abstractmethod
# Maximum backoff between each retry in seconds
DEFAULT_CAP = 0.512
# Minimum backoff between each retry in seconds
DEFAULT_BASE = 0.008
[docs]
class AbstractBackoff(ABC):
"""Backoff interface"""
[docs]
def reset(self):
"""
Reset internal state before an operation.
`reset` is called once at the beginning of
every call to `Retry.call_with_retry`
"""
pass
[docs]
@abstractmethod
def compute(self, failures: int) -> float:
"""Compute backoff in seconds upon failure"""
pass
[docs]
class ConstantBackoff(AbstractBackoff):
"""Constant backoff upon failure"""
def __init__(self, backoff: float) -> None:
"""`backoff`: backoff time in seconds"""
self._backoff = backoff
[docs]
def compute(self, failures: int) -> float:
return self._backoff
[docs]
class NoBackoff(ConstantBackoff):
"""No backoff upon failure"""
def __init__(self) -> None:
super().__init__(0)
[docs]
class ExponentialBackoff(AbstractBackoff):
"""Exponential backoff upon failure"""
def __init__(self, cap: float = DEFAULT_CAP, base: float = DEFAULT_BASE):
"""
`cap`: maximum backoff time in seconds
`base`: base backoff time in seconds
"""
self._cap = cap
self._base = base
[docs]
def compute(self, failures: int) -> float:
return min(self._cap, self._base * 2**failures)
[docs]
class FullJitterBackoff(AbstractBackoff):
"""Full jitter backoff upon failure"""
def __init__(self, cap: float = DEFAULT_CAP, base: float = DEFAULT_BASE) -> None:
"""
`cap`: maximum backoff time in seconds
`base`: base backoff time in seconds
"""
self._cap = cap
self._base = base
[docs]
def compute(self, failures: int) -> float:
return random.uniform(0, min(self._cap, self._base * 2**failures))
[docs]
class EqualJitterBackoff(AbstractBackoff):
"""Equal jitter backoff upon failure"""
def __init__(self, cap: float = DEFAULT_CAP, base: float = DEFAULT_BASE) -> None:
"""
`cap`: maximum backoff time in seconds
`base`: base backoff time in seconds
"""
self._cap = cap
self._base = base
[docs]
def compute(self, failures: int) -> float:
temp = min(self._cap, self._base * 2**failures) / 2
return temp + random.uniform(0, temp)
def default_backoff():
return EqualJitterBackoff()