Codebase list matrix-synapse / debian/1.30.0-1 synctl
debian/1.30.0-1

Tree @debian/1.30.0-1 (Download .tar.gz)

synctl @debian/1.30.0-1raw · history · blame

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import argparse
import collections
import errno
import glob
import os
import os.path
import signal
import subprocess
import sys
import time

import yaml

from synapse.config import find_config_files

SYNAPSE = [sys.executable, "-m", "synapse.app.homeserver"]

GREEN = "\x1b[1;32m"
YELLOW = "\x1b[1;33m"
RED = "\x1b[1;31m"
NORMAL = "\x1b[m"


def pid_running(pid):
    try:
        os.kill(pid, 0)
        return True
    except OSError as err:
        if err.errno == errno.EPERM:
            return True
        return False


def write(message, colour=NORMAL, stream=sys.stdout):
    # Lets check if we're writing to a TTY before colouring
    should_colour = False
    try:
        should_colour = stream.isatty()
    except AttributeError:
        # Just in case `isatty` isn't defined on everything. The python
        # docs are incredibly vague.
        pass

    if not should_colour:
        stream.write(message + "\n")
    else:
        stream.write(colour + message + NORMAL + "\n")


def abort(message, colour=RED, stream=sys.stderr):
    write(message, colour, stream)
    sys.exit(1)


def start(configfile: str, daemonize: bool = True) -> bool:
    """Attempts to start synapse.
    Args:
        configfile: path to a yaml synapse config file
        daemonize: whether to daemonize synapse or keep it attached to the current
            session

    Returns:
        True if the process started successfully
        False if there was an error starting the process

        If deamonize is False it will only return once synapse exits.
    """

    write("Starting ...")
    args = SYNAPSE

    if daemonize:
        args.extend(["--daemonize", "-c", configfile])
    else:
        args.extend(["-c", configfile])

    try:
        subprocess.check_call(args)
        write("started synapse.app.homeserver(%r)" % (configfile,), colour=GREEN)
        return True
    except subprocess.CalledProcessError as e:
        write(
            "error starting (exit code: %d); see above for logs" % e.returncode,
            colour=RED,
        )
        return False


def start_worker(app: str, configfile: str, worker_configfile: str) -> bool:
    """Attempts to start a synapse worker.
    Args:
        app: name of the worker's appservice
        configfile: path to a yaml synapse config file
        worker_configfile: path to worker specific yaml synapse file

    Returns:
        True if the process started successfully
        False if there was an error starting the process
    """

    args = [
        sys.executable,
        "-m",
        app,
        "-c",
        configfile,
        "-c",
        worker_configfile,
        "--daemonize",
    ]

    try:
        subprocess.check_call(args)
        write("started %s(%r)" % (app, worker_configfile), colour=GREEN)
        return True
    except subprocess.CalledProcessError as e:
        write(
            "error starting %s(%r) (exit code: %d); see above for logs"
            % (app, worker_configfile, e.returncode),
            colour=RED,
        )
        return False


def stop(pidfile: str, app: str) -> bool:
    """Attempts to kill a synapse worker from the pidfile.
    Args:
        pidfile: path to file containing worker's pid
        app: name of the worker's appservice

    Returns:
        True if the process stopped successfully
        False if process was already stopped or an error occured
    """

    if os.path.exists(pidfile):
        pid = int(open(pidfile).read())
        try:
            os.kill(pid, signal.SIGTERM)
            write("stopped %s" % (app,), colour=GREEN)
            return True
        except OSError as err:
            if err.errno == errno.ESRCH:
                write("%s not running" % (app,), colour=YELLOW)
            elif err.errno == errno.EPERM:
                abort("Cannot stop %s: Operation not permitted" % (app,))
            else:
                abort("Cannot stop %s: Unknown error" % (app,))
            return False
    else:
        write(
            "No running worker of %s found (from %s)\nThe process might be managed by another controller (e.g. systemd)"
            % (app, pidfile),
            colour=YELLOW,
        )
    return False


Worker = collections.namedtuple(
    "Worker", ["app", "configfile", "pidfile", "cache_factor", "cache_factors"]
)


def main():

    parser = argparse.ArgumentParser()

    parser.add_argument(
        "action",
        choices=["start", "stop", "restart"],
        help="whether to start, stop or restart the synapse",
    )
    parser.add_argument(
        "configfile",
        nargs="?",
        default="homeserver.yaml",
        help="the homeserver config file. Defaults to homeserver.yaml. May also be"
        " a directory with *.yaml files",
    )
    parser.add_argument(
        "-w", "--worker", metavar="WORKERCONFIG", help="start or stop a single worker"
    )
    parser.add_argument(
        "-a",
        "--all-processes",
        metavar="WORKERCONFIGDIR",
        help="start or stop all the workers in the given directory"
        " and the main synapse process",
    )
    parser.add_argument(
        "--no-daemonize",
        action="store_false",
        dest="daemonize",
        help="Run synapse in the foreground for debugging. "
        "Will work only if the daemonize option is not set in the config.",
    )

    options = parser.parse_args()

    if options.worker and options.all_processes:
        write('Cannot use "--worker" with "--all-processes"', stream=sys.stderr)
        sys.exit(1)
    if not options.daemonize and options.all_processes:
        write('Cannot use "--no-daemonize" with "--all-processes"', stream=sys.stderr)
        sys.exit(1)

    configfile = options.configfile

    if not os.path.exists(configfile):
        write(
            "No config file found\n"
            "To generate a config file, run '%s -c %s --generate-config"
            " --server-name=<server name> --report-stats=<yes/no>'\n"
            % (" ".join(SYNAPSE), options.configfile),
            stream=sys.stderr,
        )
        sys.exit(1)

    config_files = find_config_files([configfile])
    config = {}
    for config_file in config_files:
        with open(config_file) as file_stream:
            yaml_config = yaml.safe_load(file_stream)
        if yaml_config is not None:
            config.update(yaml_config)

    pidfile = config["pid_file"]
    cache_factor = config.get("synctl_cache_factor")
    start_stop_synapse = True

    if cache_factor:
        os.environ["SYNAPSE_CACHE_FACTOR"] = str(cache_factor)

    cache_factors = config.get("synctl_cache_factors", {})
    for cache_name, factor in cache_factors.items():
        os.environ["SYNAPSE_CACHE_FACTOR_" + cache_name.upper()] = str(factor)

    worker_configfiles = []
    if options.worker:
        start_stop_synapse = False
        worker_configfile = options.worker
        if not os.path.exists(worker_configfile):
            write(
                "No worker config found at %r" % (worker_configfile,), stream=sys.stderr
            )
            sys.exit(1)
        worker_configfiles.append(worker_configfile)

    if options.all_processes:
        # To start the main synapse with -a you need to add a worker file
        # with worker_app == "synapse.app.homeserver"
        start_stop_synapse = False
        worker_configdir = options.all_processes
        if not os.path.isdir(worker_configdir):
            write(
                "No worker config directory found at %r" % (worker_configdir,),
                stream=sys.stderr,
            )
            sys.exit(1)
        worker_configfiles.extend(
            sorted(glob.glob(os.path.join(worker_configdir, "*.yaml")))
        )

    workers = []
    for worker_configfile in worker_configfiles:
        with open(worker_configfile) as stream:
            worker_config = yaml.safe_load(stream)
        worker_app = worker_config["worker_app"]
        if worker_app == "synapse.app.homeserver":
            # We need to special case all of this to pick up options that may
            # be set in the main config file or in this worker config file.
            worker_pidfile = worker_config.get("pid_file") or pidfile
            worker_cache_factor = (
                worker_config.get("synctl_cache_factor") or cache_factor
            )
            worker_cache_factors = (
                worker_config.get("synctl_cache_factors") or cache_factors
            )
            # The master process doesn't support using worker_* config.
            for key in worker_config:
                if key == "worker_app":  # But we allow worker_app
                    continue
                assert not key.startswith(
                    "worker_"
                ), "Main process cannot use worker_* config"
        else:
            worker_pidfile = worker_config["worker_pid_file"]
            worker_cache_factor = worker_config.get("synctl_cache_factor")
            worker_cache_factors = worker_config.get("synctl_cache_factors", {})
        workers.append(
            Worker(
                worker_app,
                worker_configfile,
                worker_pidfile,
                worker_cache_factor,
                worker_cache_factors,
            )
        )

    action = options.action

    if action == "stop" or action == "restart":
        has_stopped = True
        for worker in workers:
            if not stop(worker.pidfile, worker.app):
                # A worker could not be stopped.
                has_stopped = False

        if start_stop_synapse:
            if not stop(pidfile, "synapse.app.homeserver"):
                has_stopped = False
        if not has_stopped and action == "stop":
            sys.exit(1)

    # Wait for synapse to actually shutdown before starting it again
    if action == "restart":
        running_pids = []
        if start_stop_synapse and os.path.exists(pidfile):
            running_pids.append(int(open(pidfile).read()))
        for worker in workers:
            if os.path.exists(worker.pidfile):
                running_pids.append(int(open(worker.pidfile).read()))
        if len(running_pids) > 0:
            write("Waiting for process to exit before restarting...")
            for running_pid in running_pids:
                while pid_running(running_pid):
                    time.sleep(0.2)
            write("All processes exited; now restarting...")

    if action == "start" or action == "restart":
        error = False
        if start_stop_synapse:
            # Check if synapse is already running
            if os.path.exists(pidfile) and pid_running(int(open(pidfile).read())):
                abort("synapse.app.homeserver already running")

            if not start(configfile, bool(options.daemonize)):
                error = True

        for worker in workers:
            env = os.environ.copy()

            # Skip starting a worker if its already running
            if os.path.exists(worker.pidfile) and pid_running(
                int(open(worker.pidfile).read())
            ):
                print(worker.app + " already running")
                continue

            if worker.cache_factor:
                os.environ["SYNAPSE_CACHE_FACTOR"] = str(worker.cache_factor)

            for cache_name, factor in worker.cache_factors.items():
                os.environ["SYNAPSE_CACHE_FACTOR_" + cache_name.upper()] = str(factor)

            if not start_worker(worker.app, configfile, worker.configfile):
                error = True

            # Reset env back to the original
            os.environ.clear()
            os.environ.update(env)

        if error:
            exit(1)


if __name__ == "__main__":
    main()