Source code for valkey.sentinel

import random
import weakref
from typing import Optional

from valkey.client import Valkey
from valkey.commands import SentinelCommands
from valkey.connection import Connection, ConnectionPool, SSLConnection
from valkey.exceptions import (
    ConnectionError,
    ReadOnlyError,
    ResponseError,
    TimeoutError,
)
from valkey.utils import str_if_bytes


class MasterNotFoundError(ConnectionError):
    pass


class SlaveNotFoundError(ConnectionError):
    pass


class SentinelManagedConnection(Connection):
    def __init__(self, **kwargs):
        self.connection_pool = kwargs.pop("connection_pool")
        super().__init__(**kwargs)

    def __repr__(self):
        pool = self.connection_pool
        s = (
            f"<{type(self).__module__}.{type(self).__name__}"
            f"(service={pool.service_name}%s)>"
        )
        if self.host:
            host_info = f",host={self.host},port={self.port}"
            s = s % host_info
        return s

    def connect_to(self, address):
        self.host, self.port = address
        super().connect()
        if self.connection_pool.check_connection:
            self.send_command("PING")
            if str_if_bytes(self.read_response()) != "PONG":
                raise ConnectionError("PING failed")

    def _connect_retry(self):
        if self._sock:
            return  # already connected
        if self.connection_pool.is_master:
            self.connect_to(self.connection_pool.get_master_address())
        else:
            for slave in self.connection_pool.rotate_slaves():
                try:
                    return self.connect_to(slave)
                except ConnectionError:
                    continue
            raise SlaveNotFoundError  # Never be here

    def connect(self):
        return self.retry.call_with_retry(self._connect_retry, lambda error: None)

    def read_response(
        self,
        disable_decoding=False,
        *,
        disconnect_on_error: Optional[bool] = False,
        push_request: Optional[bool] = False,
    ):
        try:
            return super().read_response(
                disable_decoding=disable_decoding,
                disconnect_on_error=disconnect_on_error,
                push_request=push_request,
            )
        except ReadOnlyError:
            if self.connection_pool.is_master:
                # When talking to a master, a ReadOnlyError when likely
                # indicates that the previous master that we're still connected
                # to has been demoted to a slave and there's a new master.
                # calling disconnect will force the connection to re-query
                # sentinel during the next connect() attempt.
                self.disconnect()
                raise ConnectionError("The previous master is now a slave")
            raise


class SentinelManagedSSLConnection(SentinelManagedConnection, SSLConnection):
    pass


class SentinelConnectionPoolProxy:
    def __init__(
        self,
        connection_pool,
        is_master,
        check_connection,
        service_name,
        sentinel_manager,
    ):
        self.connection_pool_ref = weakref.ref(connection_pool)
        self.is_master = is_master
        self.check_connection = check_connection
        self.service_name = service_name
        self.sentinel_manager = sentinel_manager
        self.reset()

    def reset(self):
        self.master_address = None
        self.slave_rr_counter = None

    def get_master_address(self):
        master_address = self.sentinel_manager.discover_master(self.service_name)
        if self.is_master and self.master_address != master_address:
            self.master_address = master_address
            # disconnect any idle connections so that they reconnect
            # to the new master the next time that they are used.
            connection_pool = self.connection_pool_ref()
            if connection_pool is not None:
                connection_pool.disconnect(inuse_connections=False)
        return master_address

    def rotate_slaves(self):
        slaves = self.sentinel_manager.discover_slaves(self.service_name)
        if slaves:
            if self.slave_rr_counter is None:
                self.slave_rr_counter = random.randint(0, len(slaves) - 1)
            for _ in range(len(slaves)):
                self.slave_rr_counter = (self.slave_rr_counter + 1) % len(slaves)
                slave = slaves[self.slave_rr_counter]
                yield slave
        # Fallback to the master connection
        try:
            yield self.get_master_address()
        except MasterNotFoundError:
            pass
        raise SlaveNotFoundError(f"No slave found for {self.service_name!r}")


[docs] class SentinelConnectionPool(ConnectionPool): """ Sentinel backed connection pool. If ``check_connection`` flag is set to True, SentinelManagedConnection sends a PING command right after establishing the connection. """ def __init__(self, service_name, sentinel_manager, **kwargs): kwargs["connection_class"] = kwargs.get( "connection_class", ( SentinelManagedSSLConnection if kwargs.pop("ssl", False) else SentinelManagedConnection ), ) self.is_master = kwargs.pop("is_master", True) self.check_connection = kwargs.pop("check_connection", False) self.proxy = SentinelConnectionPoolProxy( connection_pool=self, is_master=self.is_master, check_connection=self.check_connection, service_name=service_name, sentinel_manager=sentinel_manager, ) super().__init__(**kwargs) self.connection_kwargs["connection_pool"] = self.proxy self.service_name = service_name self.sentinel_manager = sentinel_manager def __repr__(self): role = "master" if self.is_master else "slave" return ( f"<{type(self).__module__}.{type(self).__name__}" f"(service={self.service_name}({role}))>" ) def reset(self): super().reset() self.proxy.reset() @property def master_address(self): return self.proxy.master_address def owns_connection(self, connection): check = not self.is_master or ( self.is_master and self.master_address == (connection.host, connection.port) ) parent = super() return check and parent.owns_connection(connection) def get_master_address(self): return self.proxy.get_master_address()
[docs] def rotate_slaves(self): "Round-robin slave balancer" return self.proxy.rotate_slaves()
[docs] class Sentinel(SentinelCommands): """ Valkey Sentinel cluster client >>> from valkey.sentinel import Sentinel >>> sentinel = Sentinel([('localhost', 26379)], socket_timeout=0.1) >>> master = sentinel.master_for('mymaster', socket_timeout=0.1) >>> master.set('foo', 'bar') >>> slave = sentinel.slave_for('mymaster', socket_timeout=0.1) >>> slave.get('foo') b'bar' ``sentinels`` is a list of sentinel nodes. Each node is represented by a pair (hostname, port). ``min_other_sentinels`` defined a minimum number of peers for a sentinel. When querying a sentinel, if it doesn't meet this threshold, responses from that sentinel won't be considered valid. ``sentinel_kwargs`` is a dictionary of connection arguments used when connecting to sentinel instances. Any argument that can be passed to a normal Valkey connection can be specified here. If ``sentinel_kwargs`` is not specified, any socket_timeout and socket_keepalive options specified in ``connection_kwargs`` will be used. ``connection_kwargs`` are keyword arguments that will be used when establishing a connection to a Valkey server. """ def __init__( self, sentinels, min_other_sentinels=0, sentinel_kwargs=None, **connection_kwargs, ): # if sentinel_kwargs isn't defined, use the socket_* options from # connection_kwargs if sentinel_kwargs is None: sentinel_kwargs = { k: v for k, v in connection_kwargs.items() if k.startswith("socket_") } self.sentinel_kwargs = sentinel_kwargs self.sentinels = [ Valkey(hostname, port, **self.sentinel_kwargs) for hostname, port in sentinels ] self.min_other_sentinels = min_other_sentinels self.connection_kwargs = connection_kwargs
[docs] def execute_command(self, *args, **kwargs): """ Execute Sentinel command in sentinel nodes. once - If set to True, then execute the resulting command on a single node at random, rather than across the entire sentinel cluster. """ kwargs.pop("keys", None) # the keys are used only for client side caching once = bool(kwargs.get("once", False)) if "once" in kwargs.keys(): kwargs.pop("once") if once: random.choice(self.sentinels).execute_command(*args, **kwargs) else: for sentinel in self.sentinels: sentinel.execute_command(*args, **kwargs) return True
def __repr__(self): sentinel_addresses = [] for sentinel in self.sentinels: sentinel_addresses.append( "{host}:{port}".format_map(sentinel.connection_pool.connection_kwargs) ) return ( f"<{type(self).__module__}.{type(self).__name__}" f'(sentinels=[{",".join(sentinel_addresses)}])>' ) def check_master_state(self, state, service_name): if not state["is_master"] or state["is_sdown"] or state["is_odown"]: return False # Check if our sentinel doesn't see other nodes if state["num-other-sentinels"] < self.min_other_sentinels: return False return True
[docs] def discover_master(self, service_name): """ Asks sentinel servers for the Valkey master's address corresponding to the service labeled ``service_name``. Returns a pair (address, port) or raises MasterNotFoundError if no master is found. """ collected_errors = list() for sentinel_no, sentinel in enumerate(self.sentinels): try: masters = sentinel.sentinel_masters() except (ConnectionError, TimeoutError) as e: collected_errors.append(f"{sentinel} - {e!r}") continue state = masters.get(service_name) if state and self.check_master_state(state, service_name): # Put this sentinel at the top of the list self.sentinels[0], self.sentinels[sentinel_no] = ( sentinel, self.sentinels[0], ) return state["ip"], state["port"] error_info = "" if len(collected_errors) > 0: error_info = f" : {', '.join(collected_errors)}" raise MasterNotFoundError(f"No master found for {service_name!r}{error_info}")
[docs] def filter_slaves(self, slaves): "Remove slaves that are in an ODOWN or SDOWN state" slaves_alive = [] for slave in slaves: if slave["is_odown"] or slave["is_sdown"]: continue slaves_alive.append((slave["ip"], slave["port"])) return slaves_alive
[docs] def discover_slaves(self, service_name): "Returns a list of alive slaves for service ``service_name``" for sentinel in self.sentinels: try: slaves = sentinel.sentinel_slaves(service_name) except (ConnectionError, ResponseError, TimeoutError): continue slaves = self.filter_slaves(slaves) if slaves: return slaves return []
[docs] def master_for( self, service_name, valkey_class=Valkey, connection_pool_class=SentinelConnectionPool, **kwargs, ): """ Returns a valkey client instance for the ``service_name`` master. A :py:class:`~valkey.sentinel.SentinelConnectionPool` class is used to retrieve the master's address before establishing a new connection. NOTE: If the master's address has changed, any cached connections to the old master are closed. By default clients will be a :py:class:`~valkey.Valkey` instance. Specify a different class to the ``valkey_class`` argument if you desire something different. The ``connection_pool_class`` specifies the connection pool to use. The :py:class:`~valkey.sentinel.SentinelConnectionPool` will be used by default. All other keyword arguments are merged with any connection_kwargs passed to this class and passed to the connection pool as keyword arguments to be used to initialize Valkey connections. """ kwargs["is_master"] = True connection_kwargs = dict(self.connection_kwargs) connection_kwargs.update(kwargs) return valkey_class.from_pool( connection_pool_class(service_name, self, **connection_kwargs) )
[docs] def slave_for( self, service_name, valkey_class=Valkey, connection_pool_class=SentinelConnectionPool, **kwargs, ): """ Returns valkey client instance for the ``service_name`` slave(s). A SentinelConnectionPool class is used to retrieve the slave's address before establishing a new connection. By default clients will be a :py:class:`~valkey.Valkey` instance. Specify a different class to the ``valkey_class`` argument if you desire something different. The ``connection_pool_class`` specifies the connection pool to use. The SentinelConnectionPool will be used by default. All other keyword arguments are merged with any connection_kwargs passed to this class and passed to the connection pool as keyword arguments to be used to initialize Valkey connections. """ kwargs["is_master"] = False connection_kwargs = dict(self.connection_kwargs) connection_kwargs.update(kwargs) return valkey_class.from_pool( connection_pool_class(service_name, self, **connection_kwargs) )