Source code for lighthouse.haproxy.balancer

import collections
import logging
import threading
import time

import six

from lighthouse.balancer import Balancer

from .config import HAProxyConfig
from .control import HAProxyControl
from .stanzas.stanza import Stanza
from .stanzas.proxy import ProxyStanza
from .stanzas.stats import StatsStanza


MIN_TIME_BETWEEN_RESTARTS = 2  # seconds

logger = logging.getLogger(__name__)


[docs]class HAProxy(Balancer): """ The HAProxy balancer class. Leverages the HAProxy control, config and stanza-related classes in order to keep the HAProxy config file in sync with the services and nodes discovered. """ name = "haproxy" def __init__(self, *args, **kwargs): super(HAProxy, self).__init__(*args, **kwargs) self.last_restart = 0 self.restart_required = True self.restart_interval = MIN_TIME_BETWEEN_RESTARTS self.restart_lock = threading.RLock() self.haproxy_config_path = None self.config_file = None self.control = None @classmethod
[docs] def validate_dependencies(cls): """ The HAProxy Balancer doesn't use any specific python libraries so there are no extra dependencies to check for. """ return True
@classmethod
[docs] def validate_config(cls, config): """ Validates that a config file path and a control socket file path and pid file path are all present in the HAProxy config. """ if "config_file" not in config: raise ValueError("No config file path given") if "socket_file" not in config: raise ValueError("No control socket path given") if "pid_file" not in config: raise ValueError("No PID file path given") if "stats" in config and "port" not in config["stats"]: raise ValueError("Stats interface defined, but no port given") if "proxies" in config: cls.validate_proxies_config(config["proxies"]) return config
@classmethod
[docs] def validate_proxies_config(cls, proxies): """ Specific config validation method for the "proxies" portion of a config. Checks that each proxy defines a port and a list of `upstreams`, and that each upstream entry has a host and port defined. """ for name, proxy in six.iteritems(proxies): if "port" not in proxy: raise ValueError("No port defined for proxy %s" % name) if "upstreams" not in proxy: raise ValueError( "No upstreams defined for proxy %s" % name ) for upstream in proxy["upstreams"]: if "host" not in upstream: raise ValueError( "No host defined for upstream in proxy %s" % name ) if "port" not in upstream: raise ValueError( "No port defined for upstream in proxy %s" % name )
[docs] def apply_config(self, config): """ Constructs HAProxyConfig and HAProxyControl instances based on the contents of the config. This is mostly a matter of constructing the configuration stanzas. """ self.haproxy_config_path = config["config_file"] global_stanza = Stanza("global") global_stanza.add_lines(config.get("global", [])) global_stanza.add_lines([ "stats socket %s mode 600 level admin" % config["socket_file"], "stats timeout 2m" ]) defaults_stanza = Stanza("defaults") defaults_stanza.add_lines(config.get("defaults", [])) proxy_stanzas = [ ProxyStanza( name, proxy["port"], proxy["upstreams"], proxy.get("options", []), proxy.get("bind_address") ) for name, proxy in six.iteritems(config.get("proxies", {})) ] stats_stanza = None if "stats" in config: stats_stanza = StatsStanza( config["stats"]["port"], config["stats"].get("uri", "/") ) for timeout in ("client", "connect", "server"): if timeout in config["stats"].get("timeouts", {}): stats_stanza.add_line( "timeout %s %d" % ( timeout, config["stats"]["timeouts"][timeout] ) ) self.config_file = HAProxyConfig( global_stanza, defaults_stanza, proxy_stanzas=proxy_stanzas, stats_stanza=stats_stanza, meta_clusters=config.get("meta_clusters", {}), bind_address=config.get("bind_address") ) self.control = HAProxyControl( config["config_file"], config["socket_file"], config["pid_file"], )
[docs] def sync_file(self, clusters): """ Generates new HAProxy config file content and writes it to the file at `haproxy_config_path`. If a restart is not necessary the nodes configured in HAProxy will be synced on the fly. If a restart *is* necessary, one will be triggered. """ logger.info("Updating HAProxy config file.") if not self.restart_required: self.sync_nodes(clusters) version = self.control.get_version() with open(self.haproxy_config_path, "w") as f: f.write(self.config_file.generate(clusters, version=version)) if self.restart_required: with self.restart_lock: self.restart()
[docs] def restart(self): """ Tells the HAProxy control object to restart the process. If it's been fewer than `restart_interval` seconds since the previous restart, it will wait until the interval has passed. This staves off situations where the process is constantly restarting, as it is possible to drop packets for a short interval while doing so. """ delay = (self.last_restart - time.time()) + self.restart_interval if delay > 0: time.sleep(delay) self.control.restart() self.last_restart = time.time() self.restart_required = False
[docs] def sync_nodes(self, clusters): """ Syncs the enabled/disabled status of nodes existing in HAProxy based on the given clusters. This is used to inform HAProxy of up/down nodes without necessarily doing a restart of the process. """ logger.info("Syncing HAProxy backends.") current_nodes, enabled_nodes = self.get_current_nodes(clusters) for cluster_name, nodes in six.iteritems(current_nodes): for node in nodes: if node["svname"] in enabled_nodes[cluster_name]: command = self.control.enable_node else: command = self.control.disable_node try: response = command(cluster_name, node["svname"]) except Exception: logger.exception("Error when enabling/disabling node") self.restart_required = True else: if response: logger.error( "Socket command for %s node %s failed: %s", cluster_name, node["svname"], response ) self.restart_required = True return logger.info("HAProxy nodes/servers synced.")
[docs] def get_current_nodes(self, clusters): """ Returns two dictionaries, the current nodes and the enabled nodes. The current_nodes dictionary is keyed off of the cluster name and values are a list of nodes known to HAProxy. The enabled_nodes dictionary is also keyed off of the cluster name and values are list of *enabled* nodes, i.e. the same values as current_nodes but limited to servers currently taking traffic. """ current_nodes = self.control.get_active_nodes() enabled_nodes = collections.defaultdict(list) for cluster in clusters: if not cluster.nodes: continue if cluster.name not in current_nodes: logger.debug( "New cluster '%s' added, restart required.", cluster.name ) self.restart_required = True for node in cluster.nodes: if node.name not in [ current_node["svname"] for current_node in current_nodes.get(cluster.name, []) ]: logger.debug( "New node added to cluster '%s', restart required.", cluster.name ) self.restart_required = True enabled_nodes[cluster.name].append(node.name) return current_nodes, enabled_nodes