Codebase list matrix-synapse / upstream/0.29.0
New upstream version 0.29.0 Erik Johnston 5 years ago
119 changed file(s) with 2585 addition(s) and 1240 deletion(s). Raw diff Collapse all Expand all
0 Dockerfile
1 .travis.yml
2 .gitignore
3 demo/etc
4 tox.ini
3131 demo/etc
3232
3333 uploads
34 cache
3435
3536 .idea/
3637 media_store/
00 sudo: false
11 language: python
2 python: 2.7
32
43 # tell travis to cache ~/.cache/pip
54 cache: pip
65
7 env:
8 - TOX_ENV=packaging
9 - TOX_ENV=pep8
10 - TOX_ENV=py27
6 matrix:
7 include:
8 - python: 2.7
9 env: TOX_ENV=packaging
10
11 - python: 2.7
12 env: TOX_ENV=pep8
13
14 - python: 2.7
15 env: TOX_ENV=py27
16
17 - python: 3.6
18 env: TOX_ENV=py36
1119
1220 install:
1321 - pip install tox
5959
6060 Christoph Witzany <christoph at web.crofting.com>
6161 * Add LDAP support for authentication
62
63 Pierre Jaury <pierre at jaury.eu>
64 * Docker packaging
0 Changes in synapse v0.29.0 (2018-05-16)
1 ===========================================
2
3
4 Changes in synapse v0.29.0-rc1 (2018-05-14)
5 ===========================================
6
7 Notable changes, a docker file for running Synapse (Thanks to @kaiyou!) and a
8 closed spec bug in the Client Server API. Additionally further prep for Python 3
9 migration.
10
11 Potentially breaking change:
12
13 * Make Client-Server API return 401 for invalid token (PR #3161).
14
15 This changes the Client-server spec to return a 401 error code instead of 403
16 when the access token is unrecognised. This is the behaviour required by the
17 specification, but some clients may be relying on the old, incorrect
18 behaviour.
19
20 Thanks to @NotAFile for fixing this.
21
22 Features:
23
24 * Add a Dockerfile for synapse (PR #2846) Thanks to @kaiyou!
25
26 Changes - General:
27
28 * nuke-room-from-db.sh: added postgresql option and help (PR #2337) Thanks to @rubo77!
29 * Part user from rooms on account deactivate (PR #3201)
30 * Make 'unexpected logging context' into warnings (PR #3007)
31 * Set Server header in SynapseRequest (PR #3208)
32 * remove duplicates from groups tables (PR #3129)
33 * Improve exception handling for background processes (PR #3138)
34 * Add missing consumeErrors to improve exception handling (PR #3139)
35 * reraise exceptions more carefully (PR #3142)
36 * Remove redundant call to preserve_fn (PR #3143)
37 * Trap exceptions thrown within run_in_background (PR #3144)
38
39 Changes - Refactors:
40
41 * Refactor /context to reuse pagination storage functions (PR #3193)
42 * Refactor recent events func to use pagination func (PR #3195)
43 * Refactor pagination DB API to return concrete type (PR #3196)
44 * Refactor get_recent_events_for_room return type (PR #3198)
45 * Refactor sync APIs to reuse pagination API (PR #3199)
46 * Remove unused code path from member change DB func (PR #3200)
47 * Refactor request handling wrappers (PR #3203)
48 * transaction_id, destination defined twice (PR #3209) Thanks to @damir-manapov!
49 * Refactor event storage to prepare for changes in state calculations (PR #3141)
50 * Set Server header in SynapseRequest (PR #3208)
51 * Use deferred.addTimeout instead of time_bound_deferred (PR #3127, #3178)
52 * Use run_in_background in preference to preserve_fn (PR #3140)
53
54 Changes - Python 3 migration:
55
56 * Construct HMAC as bytes on py3 (PR #3156) Thanks to @NotAFile!
57 * run config tests on py3 (PR #3159) Thanks to @NotAFile!
58 * Open certificate files as bytes (PR #3084) Thanks to @NotAFile!
59 * Open config file in non-bytes mode (PR #3085) Thanks to @NotAFile!
60 * Make event properties raise AttributeError instead (PR #3102) Thanks to @NotAFile!
61 * Use six.moves.urlparse (PR #3108) Thanks to @NotAFile!
62 * Add py3 tests to tox with folders that work (PR #3145) Thanks to @NotAFile!
63 * Don't yield in list comprehensions (PR #3150) Thanks to @NotAFile!
64 * Move more xrange to six (PR #3151) Thanks to @NotAFile!
65 * make imports local (PR #3152) Thanks to @NotAFile!
66 * move httplib import to six (PR #3153) Thanks to @NotAFile!
67 * Replace stringIO imports with six (PR #3154, #3168) Thanks to @NotAFile!
68 * more bytes strings (PR #3155) Thanks to @NotAFile!
69
70 Bug Fixes:
71
72 * synapse fails to start under Twisted >= 18.4 (PR #3157)
73 * Fix a class of logcontext leaks (PR #3170)
74 * Fix a couple of logcontext leaks in unit tests (PR #3172)
75 * Fix logcontext leak in media repo (PR #3174)
76 * Escape label values in prometheus metrics (PR #3175, #3186)
77 * Fix 'Unhandled Error' logs with Twisted 18.4 (PR #3182) Thanks to @Half-Shot!
78 * Fix logcontext leaks in rate limiter (PR #3183)
79 * notifications: Convert next_token to string according to the spec (PR #3190) Thanks to @mujx!
80 * nuke-room-from-db.sh: fix deletion from search table (PR #3194) Thanks to @rubo77!
81 * add guard for None on purge_history api (PR #3160) Thanks to @krombel!
82
083 Changes in synapse v0.28.1 (2018-05-01)
184 =======================================
285
386 SECURITY UPDATE
487
588 * Clamp the allowed values of event depth received over federation to be
6 [0, 2**63 - 1]. This mitigates an attack where malicious events
7 injected with depth = 2**63 - 1 render rooms unusable. Depth is used to
89 [0, 2^63 - 1]. This mitigates an attack where malicious events
90 injected with depth = 2^63 - 1 render rooms unusable. Depth is used to
891 determine the cosmetic ordering of events within a room, and so the ordering
992 of events in such a room will default to using stream_ordering rather than depth
1093 (topological_ordering).
1194
12 This is a temporary solution to mitigate abuse in the wild, whilst a long solution
95 This is a temporary solution to mitigate abuse in the wild, whilst a long term solution
1396 is being implemented to improve how the depth parameter is used.
1497
1598 Full details at
16 https://docs.google.com/document/d/1I3fi2S-XnpO45qrpCsowZv8P8dHcNZ4fsBsbOW7KABI/edit#
99 https://docs.google.com/document/d/1I3fi2S-XnpO45qrpCsowZv8P8dHcNZ4fsBsbOW7KABI
17100
18101 * Pin Twisted to <18.4 until we stop using the private _OpenSSLECCurve API.
19102
32115
33116 Minor performance improvement to federation sending and bug fixes.
34117
35 (Note: This release does not include state resolutions discussed in matrix live)
118 (Note: This release does not include the delta state resolution implementation discussed in matrix live)
119
36120
37121 Features:
38122
0 FROM docker.io/python:2-alpine3.7
1
2 RUN apk add --no-cache --virtual .nacl_deps su-exec build-base libffi-dev zlib-dev libressl-dev libjpeg-turbo-dev linux-headers postgresql-dev
3
4 COPY . /synapse
5
6 # A wheel cache may be provided in ./cache for faster build
7 RUN cd /synapse \
8 && pip install --upgrade pip setuptools psycopg2 \
9 && mkdir -p /synapse/cache \
10 && pip install -f /synapse/cache --upgrade --process-dependency-links . \
11 && mv /synapse/contrib/docker/start.py /synapse/contrib/docker/conf / \
12 && rm -rf setup.py setup.cfg synapse
13
14 VOLUME ["/data"]
15
16 EXPOSE 8008/tcp 8448/tcp
17
18 ENTRYPOINT ["/start.py"]
2424 exclude jenkins.sh
2525 exclude jenkins*.sh
2626 exclude jenkins*
27 exclude Dockerfile
28 exclude .dockerignore
2729 recursive-exclude jenkins *.sh
2830
2931 prune .github
0 # Synapse Docker
1
2 This Docker image will run Synapse as a single process. It does not provide any
3 database server or TURN server that you should run separately.
4
5 If you run a Postgres server, you should simply have it in the same Compose
6 project or set the proper environment variables and the image will automatically
7 use that server.
8
9 ## Build
10
11 Build the docker image with the `docker build` command from the root of the synapse repository.
12
13 ```
14 docker build -t docker.io/matrixdotorg/synapse .
15 ```
16
17 The `-t` option sets the image tag. Official images are tagged `matrixdotorg/synapse:<version>` where `<version>` is the same as the release tag in the synapse git repository.
18
19 You may have a local Python wheel cache available, in which case copy the relevant packages in the ``cache/`` directory at the root of the project.
20
21 ## Run
22
23 This image is designed to run either with an automatically generated configuration
24 file or with a custom configuration that requires manual edition.
25
26 ### Automated configuration
27
28 It is recommended that you use Docker Compose to run your containers, including
29 this image and a Postgres server. A sample ``docker-compose.yml`` is provided,
30 including example labels for reverse proxying and other artifacts.
31
32 Read the section about environment variables and set at least mandatory variables,
33 then run the server:
34
35 ```
36 docker-compose up -d
37 ```
38
39 ### Manual configuration
40
41 A sample ``docker-compose.yml`` is provided, including example labels for
42 reverse proxying and other artifacts.
43
44 Specify a ``SYNAPSE_CONFIG_PATH``, preferably to a persistent path,
45 to use manual configuration. To generate a fresh ``homeserver.yaml``, simply run:
46
47 ```
48 docker-compose run --rm -e SYNAPSE_SERVER_NAME=my.matrix.host synapse generate
49 ```
50
51 Then, customize your configuration and run the server:
52
53 ```
54 docker-compose up -d
55 ```
56
57 ### Without Compose
58
59 If you do not wish to use Compose, you may still run this image using plain
60 Docker commands. Note that the following is just a guideline and you may need
61 to add parameters to the docker run command to account for the network situation
62 with your postgres database.
63
64 ```
65 docker run \
66 -d \
67 --name synapse \
68 -v ${DATA_PATH}:/data \
69 -e SYNAPSE_SERVER_NAME=my.matrix.host \
70 -e SYNAPSE_REPORT_STATS=yes \
71 docker.io/matrixdotorg/synapse:latest
72 ```
73
74 ## Volumes
75
76 The image expects a single volume, located at ``/data``, that will hold:
77
78 * temporary files during uploads;
79 * uploaded media and thumbnails;
80 * the SQLite database if you do not configure postgres;
81 * the appservices configuration.
82
83 You are free to use separate volumes depending on storage endpoints at your
84 disposal. For instance, ``/data/media`` coud be stored on a large but low
85 performance hdd storage while other files could be stored on high performance
86 endpoints.
87
88 In order to setup an application service, simply create an ``appservices``
89 directory in the data volume and write the application service Yaml
90 configuration file there. Multiple application services are supported.
91
92 ## Environment
93
94 Unless you specify a custom path for the configuration file, a very generic
95 file will be generated, based on the following environment settings.
96 These are a good starting point for setting up your own deployment.
97
98 Global settings:
99
100 * ``UID``, the user id Synapse will run as [default 991]
101 * ``GID``, the group id Synapse will run as [default 991]
102 * ``SYNAPSE_CONFIG_PATH``, path to a custom config file
103
104 If ``SYNAPSE_CONFIG_PATH`` is set, you should generate a configuration file
105 then customize it manually. No other environment variable is required.
106
107 Otherwise, a dynamic configuration file will be used. The following environment
108 variables are available for configuration:
109
110 * ``SYNAPSE_SERVER_NAME`` (mandatory), the current server public hostname.
111 * ``SYNAPSE_REPORT_STATS``, (mandatory, ``yes`` or ``no``), enable anonymous
112 statistics reporting back to the Matrix project which helps us to get funding.
113 * ``SYNAPSE_MACAROON_SECRET_KEY`` (mandatory) secret for signing access tokens
114 to the server, set this to a proper random key.
115 * ``SYNAPSE_NO_TLS``, set this variable to disable TLS in Synapse (use this if
116 you run your own TLS-capable reverse proxy).
117 * ``SYNAPSE_ENABLE_REGISTRATION``, set this variable to enable registration on
118 the Synapse instance.
119 * ``SYNAPSE_ALLOW_GUEST``, set this variable to allow guest joining this server.
120 * ``SYNAPSE_EVENT_CACHE_SIZE``, the event cache size [default `10K`].
121 * ``SYNAPSE_CACHE_FACTOR``, the cache factor [default `0.5`].
122 * ``SYNAPSE_RECAPTCHA_PUBLIC_KEY``, set this variable to the recaptcha public
123 key in order to enable recaptcha upon registration.
124 * ``SYNAPSE_RECAPTCHA_PRIVATE_KEY``, set this variable to the recaptcha private
125 key in order to enable recaptcha upon registration.
126 * ``SYNAPSE_TURN_URIS``, set this variable to the coma-separated list of TURN
127 uris to enable TURN for this homeserver.
128 * ``SYNAPSE_TURN_SECRET``, set this to the TURN shared secret if required.
129
130 Shared secrets, that will be initialized to random values if not set:
131
132 * ``SYNAPSE_REGISTRATION_SHARED_SECRET``, secret for registrering users if
133 registration is disable.
134
135 Database specific values (will use SQLite if not set):
136
137 * `POSTGRES_DB` - The database name for the synapse postgres database. [default: `synapse`]
138 * `POSTGRES_HOST` - The host of the postgres database if you wish to use postgresql instead of sqlite3. [default: `db` which is useful when using a container on the same docker network in a compose file where the postgres service is called `db`]
139 * `POSTGRES_PASSWORD` - The password for the synapse postgres database. **If this is set then postgres will be used instead of sqlite3.** [default: none] **NOTE**: You are highly encouraged to use postgresql! Please use the compose file to make it easier to deploy.
140 * `POSTGRES_USER` - The user for the synapse postgres database. [default: `matrix`]
141
142 Mail server specific values (will not send emails if not set):
143
144 * ``SYNAPSE_SMTP_HOST``, hostname to the mail server.
145 * ``SYNAPSE_SMTP_PORT``, TCP port for accessing the mail server [default ``25``].
146 * ``SYNAPSE_SMTP_USER``, username for authenticating against the mail server if any.
147 * ``SYNAPSE_SMTP_PASSWORD``, password for authenticating against the mail server if any.
0 # vim:ft=yaml
1
2 ## TLS ##
3
4 tls_certificate_path: "/data/{{ SYNAPSE_SERVER_NAME }}.tls.crt"
5 tls_private_key_path: "/data/{{ SYNAPSE_SERVER_NAME }}.tls.key"
6 tls_dh_params_path: "/data/{{ SYNAPSE_SERVER_NAME }}.tls.dh"
7 no_tls: {{ "True" if SYNAPSE_NO_TLS else "False" }}
8 tls_fingerprints: []
9
10 ## Server ##
11
12 server_name: "{{ SYNAPSE_SERVER_NAME }}"
13 pid_file: /homeserver.pid
14 web_client: False
15 soft_file_limit: 0
16
17 ## Ports ##
18
19 listeners:
20 {% if not SYNAPSE_NO_TLS %}
21 -
22 port: 8448
23 bind_addresses: ['0.0.0.0']
24 type: http
25 tls: true
26 x_forwarded: false
27 resources:
28 - names: [client]
29 compress: true
30 - names: [federation] # Federation APIs
31 compress: false
32 {% endif %}
33
34 - port: 8008
35 tls: false
36 bind_addresses: ['0.0.0.0']
37 type: http
38 x_forwarded: false
39
40 resources:
41 - names: [client]
42 compress: true
43 - names: [federation]
44 compress: false
45
46 ## Database ##
47
48 {% if POSTGRES_PASSWORD %}
49 database:
50 name: "psycopg2"
51 args:
52 user: "{{ POSTGRES_USER or "synapse" }}"
53 password: "{{ POSTGRES_PASSWORD }}"
54 database: "{{ POSTGRES_DB or "synapse" }}"
55 host: "{{ POSTGRES_HOST or "db" }}"
56 port: "{{ POSTGRES_PORT or "5432" }}"
57 cp_min: 5
58 cp_max: 10
59 {% else %}
60 database:
61 name: "sqlite3"
62 args:
63 database: "/data/homeserver.db"
64 {% endif %}
65
66 ## Performance ##
67
68 event_cache_size: "{{ SYNAPSE_EVENT_CACHE_SIZE or "10K" }}"
69 verbose: 0
70 log_file: "/data/homeserver.log"
71 log_config: "/compiled/log.config"
72
73 ## Ratelimiting ##
74
75 rc_messages_per_second: 0.2
76 rc_message_burst_count: 10.0
77 federation_rc_window_size: 1000
78 federation_rc_sleep_limit: 10
79 federation_rc_sleep_delay: 500
80 federation_rc_reject_limit: 50
81 federation_rc_concurrent: 3
82
83 ## Files ##
84
85 media_store_path: "/data/media"
86 uploads_path: "/data/uploads"
87 max_upload_size: "10M"
88 max_image_pixels: "32M"
89 dynamic_thumbnails: false
90
91 # List of thumbnail to precalculate when an image is uploaded.
92 thumbnail_sizes:
93 - width: 32
94 height: 32
95 method: crop
96 - width: 96
97 height: 96
98 method: crop
99 - width: 320
100 height: 240
101 method: scale
102 - width: 640
103 height: 480
104 method: scale
105 - width: 800
106 height: 600
107 method: scale
108
109 url_preview_enabled: False
110 max_spider_size: "10M"
111
112 ## Captcha ##
113
114 {% if SYNAPSE_RECAPTCHA_PUBLIC_KEY %}
115 recaptcha_public_key: "{{ SYNAPSE_RECAPTCHA_PUBLIC_KEY }}"
116 recaptcha_private_key: "{{ SYNAPSE_RECAPTCHA_PRIVATE_KEY }}"
117 enable_registration_captcha: True
118 recaptcha_siteverify_api: "https://www.google.com/recaptcha/api/siteverify"
119 {% else %}
120 recaptcha_public_key: "YOUR_PUBLIC_KEY"
121 recaptcha_private_key: "YOUR_PRIVATE_KEY"
122 enable_registration_captcha: False
123 recaptcha_siteverify_api: "https://www.google.com/recaptcha/api/siteverify"
124 {% endif %}
125
126 ## Turn ##
127
128 {% if SYNAPSE_TURN_URIS %}
129 turn_uris:
130 {% for uri in SYNAPSE_TURN_URIS.split(',') %} - "{{ uri }}"
131 {% endfor %}
132 turn_shared_secret: "{{ SYNAPSE_TURN_SECRET }}"
133 turn_user_lifetime: "1h"
134 turn_allow_guests: True
135 {% else %}
136 turn_uris: []
137 turn_shared_secret: "YOUR_SHARED_SECRET"
138 turn_user_lifetime: "1h"
139 turn_allow_guests: True
140 {% endif %}
141
142 ## Registration ##
143
144 enable_registration: {{ "True" if SYNAPSE_ENABLE_REGISTRATION else "False" }}
145 registration_shared_secret: "{{ SYNAPSE_REGISTRATION_SHARED_SECRET }}"
146 bcrypt_rounds: 12
147 allow_guest_access: {{ "True" if SYNAPSE_ALLOW_GUEST else "False" }}
148 enable_group_creation: true
149
150 # The list of identity servers trusted to verify third party
151 # identifiers by this server.
152 trusted_third_party_id_servers:
153 - matrix.org
154 - vector.im
155 - riot.im
156
157 ## Metrics ###
158
159 {% if SYNAPSE_REPORT_STATS.lower() == "yes" %}
160 enable_metrics: True
161 report_stats: True
162 {% else %}
163 enable_metrics: False
164 report_stats: False
165 {% endif %}
166
167 ## API Configuration ##
168
169 room_invite_state_types:
170 - "m.room.join_rules"
171 - "m.room.canonical_alias"
172 - "m.room.avatar"
173 - "m.room.name"
174
175 {% if SYNAPSE_APPSERVICES %}
176 app_service_config_files:
177 {% for appservice in SYNAPSE_APPSERVICES %} - "{{ appservice }}"
178 {% endfor %}
179 {% else %}
180 app_service_config_files: []
181 {% endif %}
182
183 macaroon_secret_key: "{{ SYNAPSE_MACAROON_SECRET_KEY }}"
184 expire_access_token: False
185
186 ## Signing Keys ##
187
188 signing_key_path: "/data/{{ SYNAPSE_SERVER_NAME }}.signing.key"
189 old_signing_keys: {}
190 key_refresh_interval: "1d" # 1 Day.
191
192 # The trusted servers to download signing keys from.
193 perspectives:
194 servers:
195 "matrix.org":
196 verify_keys:
197 "ed25519:auto":
198 key: "Noi6WqcDj0QmPxCNQqgezwTlBKrfqehY1u2FyWP9uYw"
199
200 password_config:
201 enabled: true
202
203 {% if SYNAPSE_SMTP_HOST %}
204 email:
205 enable_notifs: false
206 smtp_host: "{{ SYNAPSE_SMTP_HOST }}"
207 smtp_port: {{ SYNAPSE_SMTP_PORT or "25" }}
208 smtp_user: "{{ SYNAPSE_SMTP_USER }}"
209 smtp_pass: "{{ SYNAPSE_SMTP_PASSWORD }}"
210 require_transport_security: False
211 notif_from: "{{ SYNAPSE_SMTP_FROM or "hostmaster@" + SYNAPSE_SERVER_NAME }}"
212 app_name: Matrix
213 template_dir: res/templates
214 notif_template_html: notif_mail.html
215 notif_template_text: notif_mail.txt
216 notif_for_new_users: True
217 riot_base_url: "https://{{ SYNAPSE_SERVER_NAME }}"
218 {% endif %}
0 version: 1
1
2 formatters:
3 precise:
4 format: '%(asctime)s - %(name)s - %(lineno)d - %(levelname)s - %(request)s- %(message)s'
5
6 filters:
7 context:
8 (): synapse.util.logcontext.LoggingContextFilter
9 request: ""
10
11 handlers:
12 console:
13 class: logging.StreamHandler
14 formatter: precise
15 filters: [context]
16
17 loggers:
18 synapse:
19 level: {{ SYNAPSE_LOG_LEVEL or "WARNING" }}
20
21 synapse.storage.SQL:
22 # beware: increasing this to DEBUG will make synapse log sensitive
23 # information such as access tokens.
24 level: {{ SYNAPSE_LOG_LEVEL or "WARNING" }}
25
26 root:
27 level: {{ SYNAPSE_LOG_LEVEL or "WARNING" }}
28 handlers: [console]
0 # This compose file is compatible with Compose itself, it might need some
1 # adjustments to run properly with stack.
2
3 version: '3'
4
5 services:
6
7 synapse:
8 image: docker.io/matrixdotorg/synapse:latest
9 # Since snyapse does not retry to connect to the database, restart upon
10 # failure
11 restart: unless-stopped
12 # See the readme for a full documentation of the environment settings
13 environment:
14 - SYNAPSE_SERVER_NAME=my.matrix.host
15 - SYNAPSE_REPORT_STATS=no
16 - SYNAPSE_ENABLE_REGISTRATION=yes
17 - SYNAPSE_LOG_LEVEL=INFO
18 - POSTGRES_PASSWORD=changeme
19 volumes:
20 # You may either store all the files in a local folder
21 - ./files:/data
22 # .. or you may split this between different storage points
23 # - ./files:/data
24 # - /path/to/ssd:/data/uploads
25 # - /path/to/large_hdd:/data/media
26 depends_on:
27 - db
28 # In order to expose Synapse, remove one of the following, you might for
29 # instance expose the TLS port directly:
30 ports:
31 - 8448:8448/tcp
32 # ... or use a reverse proxy, here is an example for traefik:
33 labels:
34 - traefik.enable=true
35 - traefik.frontend.rule=Host:my.matrix.Host
36 - traefik.port=8448
37
38 db:
39 image: docker.io/postgres:10-alpine
40 # Change that password, of course!
41 environment:
42 - POSTGRES_USER=synapse
43 - POSTGRES_PASSWORD=changeme
44 volumes:
45 # You may store the database tables in a local folder..
46 - ./schemas:/var/lib/postgresql/data
47 # .. or store them on some high performance storage for better results
48 # - /path/to/ssd/storage:/var/lib/postfesql/data
0 #!/usr/local/bin/python
1
2 import jinja2
3 import os
4 import sys
5 import subprocess
6 import glob
7
8 # Utility functions
9 convert = lambda src, dst, environ: open(dst, "w").write(jinja2.Template(open(src).read()).render(**environ))
10
11 def check_arguments(environ, args):
12 for argument in args:
13 if argument not in environ:
14 print("Environment variable %s is mandatory, exiting." % argument)
15 sys.exit(2)
16
17 def generate_secrets(environ, secrets):
18 for name, secret in secrets.items():
19 if secret not in environ:
20 filename = "/data/%s.%s.key" % (environ["SYNAPSE_SERVER_NAME"], name)
21 if os.path.exists(filename):
22 with open(filename) as handle: value = handle.read()
23 else:
24 print("Generating a random secret for {}".format(name))
25 value = os.urandom(32).encode("hex")
26 with open(filename, "w") as handle: handle.write(value)
27 environ[secret] = value
28
29 # Prepare the configuration
30 mode = sys.argv[1] if len(sys.argv) > 1 else None
31 environ = os.environ.copy()
32 ownership = "{}:{}".format(environ.get("UID", 991), environ.get("GID", 991))
33 args = ["python", "-m", "synapse.app.homeserver"]
34
35 # In generate mode, generate a configuration, missing keys, then exit
36 if mode == "generate":
37 check_arguments(environ, ("SYNAPSE_SERVER_NAME", "SYNAPSE_REPORT_STATS", "SYNAPSE_CONFIG_PATH"))
38 args += [
39 "--server-name", environ["SYNAPSE_SERVER_NAME"],
40 "--report-stats", environ["SYNAPSE_REPORT_STATS"],
41 "--config-path", environ["SYNAPSE_CONFIG_PATH"],
42 "--generate-config"
43 ]
44 os.execv("/usr/local/bin/python", args)
45
46 # In normal mode, generate missing keys if any, then run synapse
47 else:
48 # Parse the configuration file
49 if "SYNAPSE_CONFIG_PATH" in environ:
50 args += ["--config-path", environ["SYNAPSE_CONFIG_PATH"]]
51 else:
52 check_arguments(environ, ("SYNAPSE_SERVER_NAME", "SYNAPSE_REPORT_STATS"))
53 generate_secrets(environ, {
54 "registration": "SYNAPSE_REGISTRATION_SHARED_SECRET",
55 "macaroon": "SYNAPSE_MACAROON_SECRET_KEY"
56 })
57 environ["SYNAPSE_APPSERVICES"] = glob.glob("/data/appservices/*.yaml")
58 if not os.path.exists("/compiled"): os.mkdir("/compiled")
59 convert("/conf/homeserver.yaml", "/compiled/homeserver.yaml", environ)
60 convert("/conf/log.config", "/compiled/log.config", environ)
61 subprocess.check_output(["chown", "-R", ownership, "/data"])
62 args += ["--config-path", "/compiled/homeserver.yaml"]
63 # Generate missing keys and start synapse
64 subprocess.check_output(args + ["--generate-keys"])
65 os.execv("/sbin/su-exec", ["su-exec", ownership] + args)
00 #! /bin/bash
1
2 set -eux
13
24 cd "`dirname $0`/.."
35
1315 tox -e py27 --notest -v
1416
1517 TOX_BIN=$TOX_DIR/py27/bin
16 $TOX_BIN/pip install setuptools
18
19 # cryptography 2.2 requires setuptools >= 18.5.
20 #
21 # older versions of virtualenv (?) give us a virtualenv with the same version
22 # of setuptools as is installed on the system python (and tox runs virtualenv
23 # under python3, so we get the version of setuptools that is installed on that).
24 #
25 # anyway, make sure that we have a recent enough setuptools.
26 $TOX_BIN/pip install 'setuptools>=18.5'
27
28 # we also need a semi-recent version of pip, because old ones fail to install
29 # the "enum34" dependency of cryptography.
30 $TOX_BIN/pip install 'pip>=10'
31
1732 { python synapse/python_dependencies.py
1833 echo lxml psycopg2
1934 } | xargs $TOX_BIN/pip install
55
66 ## Do not run it lightly.
77
8 set -e
9
10 if [ "$1" == "-h" ] || [ "$1" == "" ]; then
11 echo "Call with ROOM_ID as first option and then pipe it into the database. So for instance you might run"
12 echo " nuke-room-from-db.sh <room_id> | sqlite3 homeserver.db"
13 echo "or"
14 echo " nuke-room-from-db.sh <room_id> | psql --dbname=synapse"
15 exit
16 fi
17
818 ROOMID="$1"
919
10 sqlite3 homeserver.db <<EOF
20 cat <<EOF
1121 DELETE FROM event_forward_extremities WHERE room_id = '$ROOMID';
1222 DELETE FROM event_backward_extremities WHERE room_id = '$ROOMID';
1323 DELETE FROM event_edges WHERE room_id = '$ROOMID';
2838 DELETE FROM state_groups_state WHERE room_id = '$ROOMID';
2939 DELETE FROM receipts_graph WHERE room_id = '$ROOMID';
3040 DELETE FROM receipts_linearized WHERE room_id = '$ROOMID';
31 DELETE FROM event_search_content WHERE c1room_id = '$ROOMID';
41 DELETE FROM event_search WHERE room_id = '$ROOMID';
3242 DELETE FROM guest_access WHERE room_id = '$ROOMID';
3343 DELETE FROM history_visibility WHERE room_id = '$ROOMID';
3444 DELETE FROM room_tags WHERE room_id = '$ROOMID';
1515 """ This is a reference implementation of a Matrix home server.
1616 """
1717
18 __version__ = "0.28.1"
18 __version__ = "0.29.0"
3131 from synapse.server import HomeServer
3232 from synapse.storage.engines import create_engine
3333 from synapse.util.httpresourcetree import create_resource_tree
34 from synapse.util.logcontext import LoggingContext, preserve_fn
34 from synapse.util.logcontext import LoggingContext, run_in_background
3535 from synapse.util.manhole import manhole
3636 from synapse.util.versionstring import get_version_string
37 from twisted.internet import reactor
37 from twisted.internet import reactor, defer
3838 from twisted.web.resource import NoResource
3939
4040 logger = logging.getLogger("synapse.app.appservice")
7373 site_tag,
7474 listener_config,
7575 root_resource,
76 self.version_string,
7677 )
7778 )
7879
111112
112113 if stream_name == "events":
113114 max_stream_id = self.store.get_room_max_stream_ordering()
114 preserve_fn(
115 self.appservice_handler.notify_interested_services
116 )(max_stream_id)
115 run_in_background(self._notify_app_services, max_stream_id)
116
117 @defer.inlineCallbacks
118 def _notify_app_services(self, room_stream_id):
119 try:
120 yield self.appservice_handler.notify_interested_services(room_stream_id)
121 except Exception:
122 logger.exception("Error notifying application services of event")
117123
118124
119125 def start(config_options):
9797 site_tag,
9898 listener_config,
9999 root_resource,
100 self.version_string,
100101 )
101102 )
102103
113113 site_tag,
114114 listener_config,
115115 root_resource,
116 self.version_string,
116117 )
117118 )
118119
8686 site_tag,
8787 listener_config,
8888 root_resource,
89 self.version_string,
8990 )
9091 )
9192
3737 from synapse.storage.engines import create_engine
3838 from synapse.util.async import Linearizer
3939 from synapse.util.httpresourcetree import create_resource_tree
40 from synapse.util.logcontext import LoggingContext, preserve_fn
40 from synapse.util.logcontext import LoggingContext, run_in_background
4141 from synapse.util.manhole import manhole
4242 from synapse.util.versionstring import get_version_string
4343 from twisted.internet import defer, reactor
100100 site_tag,
101101 listener_config,
102102 root_resource,
103 self.version_string,
103104 )
104105 )
105106
228229 # presence, typing, etc.
229230 if stream_name == "federation":
230231 send_queue.process_rows_for_federation(self.federation_sender, rows)
231 preserve_fn(self.update_token)(token)
232 run_in_background(self.update_token, token)
232233
233234 # We also need to poke the federation sender when new events happen
234235 elif stream_name == "events":
236237
237238 @defer.inlineCallbacks
238239 def update_token(self, token):
239 self.federation_position = token
240
241 # We linearize here to ensure we don't have races updating the token
242 with (yield self._fed_position_linearizer.queue(None)):
243 if self._last_ack < self.federation_position:
244 yield self.store.update_federation_out_pos(
245 "federation", self.federation_position
246 )
247
248 # We ACK this token over replication so that the master can drop
249 # its in memory queues
250 self.replication_client.send_federation_ack(self.federation_position)
251 self._last_ack = self.federation_position
240 try:
241 self.federation_position = token
242
243 # We linearize here to ensure we don't have races updating the token
244 with (yield self._fed_position_linearizer.queue(None)):
245 if self._last_ack < self.federation_position:
246 yield self.store.update_federation_out_pos(
247 "federation", self.federation_position
248 )
249
250 # We ACK this token over replication so that the master can drop
251 # its in memory queues
252 self.replication_client.send_federation_ack(self.federation_position)
253 self._last_ack = self.federation_position
254 except Exception:
255 logger.exception("Error updating federation stream position")
252256
253257
254258 if __name__ == '__main__':
151151 site_tag,
152152 listener_config,
153153 root_resource,
154 self.version_string,
154155 )
155156 )
156157
139139 site_tag,
140140 listener_config,
141141 root_resource,
142 self.version_string,
142143 ),
143144 self.tls_server_context_factory,
144145 )
152153 site_tag,
153154 listener_config,
154155 root_resource,
156 self.version_string,
155157 )
156158 )
157159 logger.info("Synapse now listening on port %d", port)
9393 site_tag,
9494 listener_config,
9595 root_resource,
96 self.version_string,
9697 )
9798 )
9899
3232 from synapse.storage import DataStore
3333 from synapse.storage.engines import create_engine
3434 from synapse.util.httpresourcetree import create_resource_tree
35 from synapse.util.logcontext import LoggingContext, preserve_fn
35 from synapse.util.logcontext import LoggingContext, run_in_background
3636 from synapse.util.manhole import manhole
3737 from synapse.util.versionstring import get_version_string
3838 from twisted.internet import defer, reactor
103103 site_tag,
104104 listener_config,
105105 root_resource,
106 self.version_string,
106107 )
107108 )
108109
139140
140141 def on_rdata(self, stream_name, token, rows):
141142 super(PusherReplicationHandler, self).on_rdata(stream_name, token, rows)
142 preserve_fn(self.poke_pushers)(stream_name, token, rows)
143 run_in_background(self.poke_pushers, stream_name, token, rows)
143144
144145 @defer.inlineCallbacks
145146 def poke_pushers(self, stream_name, token, rows):
146 if stream_name == "pushers":
147 for row in rows:
148 if row.deleted:
149 yield self.stop_pusher(row.user_id, row.app_id, row.pushkey)
150 else:
151 yield self.start_pusher(row.user_id, row.app_id, row.pushkey)
152 elif stream_name == "events":
153 yield self.pusher_pool.on_new_notifications(
154 token, token,
155 )
156 elif stream_name == "receipts":
157 yield self.pusher_pool.on_new_receipts(
158 token, token, set(row.room_id for row in rows)
159 )
147 try:
148 if stream_name == "pushers":
149 for row in rows:
150 if row.deleted:
151 yield self.stop_pusher(row.user_id, row.app_id, row.pushkey)
152 else:
153 yield self.start_pusher(row.user_id, row.app_id, row.pushkey)
154 elif stream_name == "events":
155 yield self.pusher_pool.on_new_notifications(
156 token, token,
157 )
158 elif stream_name == "receipts":
159 yield self.pusher_pool.on_new_receipts(
160 token, token, set(row.room_id for row in rows)
161 )
162 except Exception:
163 logger.exception("Error poking pushers")
160164
161165 def stop_pusher(self, user_id, app_id, pushkey):
162166 key = "%s:%s" % (app_id, pushkey)
5050 from synapse.storage.presence import UserPresenceState
5151 from synapse.storage.roommember import RoomMemberStore
5252 from synapse.util.httpresourcetree import create_resource_tree
53 from synapse.util.logcontext import LoggingContext, preserve_fn
53 from synapse.util.logcontext import LoggingContext, run_in_background
5454 from synapse.util.manhole import manhole
5555 from synapse.util.stringutils import random_string
5656 from synapse.util.versionstring import get_version_string
280280 site_tag,
281281 listener_config,
282282 root_resource,
283 self.version_string,
283284 )
284285 )
285286
326327
327328 def on_rdata(self, stream_name, token, rows):
328329 super(SyncReplicationHandler, self).on_rdata(stream_name, token, rows)
329
330 preserve_fn(self.process_and_notify)(stream_name, token, rows)
330 run_in_background(self.process_and_notify, stream_name, token, rows)
331331
332332 def get_streams_to_replicate(self):
333333 args = super(SyncReplicationHandler, self).get_streams_to_replicate()
339339
340340 @defer.inlineCallbacks
341341 def process_and_notify(self, stream_name, token, rows):
342 if stream_name == "events":
343 # We shouldn't get multiple rows per token for events stream, so
344 # we don't need to optimise this for multiple rows.
345 for row in rows:
346 event = yield self.store.get_event(row.event_id)
347 extra_users = ()
348 if event.type == EventTypes.Member:
349 extra_users = (event.state_key,)
350 max_token = self.store.get_room_max_stream_ordering()
351 self.notifier.on_new_room_event(
352 event, token, max_token, extra_users
353 )
354 elif stream_name == "push_rules":
355 self.notifier.on_new_event(
356 "push_rules_key", token, users=[row.user_id for row in rows],
357 )
358 elif stream_name in ("account_data", "tag_account_data",):
359 self.notifier.on_new_event(
360 "account_data_key", token, users=[row.user_id for row in rows],
361 )
362 elif stream_name == "receipts":
363 self.notifier.on_new_event(
364 "receipt_key", token, rooms=[row.room_id for row in rows],
365 )
366 elif stream_name == "typing":
367 self.typing_handler.process_replication_rows(token, rows)
368 self.notifier.on_new_event(
369 "typing_key", token, rooms=[row.room_id for row in rows],
370 )
371 elif stream_name == "to_device":
372 entities = [row.entity for row in rows if row.entity.startswith("@")]
373 if entities:
374 self.notifier.on_new_event(
375 "to_device_key", token, users=entities,
376 )
377 elif stream_name == "device_lists":
378 all_room_ids = set()
379 for row in rows:
380 room_ids = yield self.store.get_rooms_for_user(row.user_id)
381 all_room_ids.update(room_ids)
382 self.notifier.on_new_event(
383 "device_list_key", token, rooms=all_room_ids,
384 )
385 elif stream_name == "presence":
386 yield self.presence_handler.process_replication_rows(token, rows)
387 elif stream_name == "receipts":
388 self.notifier.on_new_event(
389 "groups_key", token, users=[row.user_id for row in rows],
390 )
342 try:
343 if stream_name == "events":
344 # We shouldn't get multiple rows per token for events stream, so
345 # we don't need to optimise this for multiple rows.
346 for row in rows:
347 event = yield self.store.get_event(row.event_id)
348 extra_users = ()
349 if event.type == EventTypes.Member:
350 extra_users = (event.state_key,)
351 max_token = self.store.get_room_max_stream_ordering()
352 self.notifier.on_new_room_event(
353 event, token, max_token, extra_users
354 )
355 elif stream_name == "push_rules":
356 self.notifier.on_new_event(
357 "push_rules_key", token, users=[row.user_id for row in rows],
358 )
359 elif stream_name in ("account_data", "tag_account_data",):
360 self.notifier.on_new_event(
361 "account_data_key", token, users=[row.user_id for row in rows],
362 )
363 elif stream_name == "receipts":
364 self.notifier.on_new_event(
365 "receipt_key", token, rooms=[row.room_id for row in rows],
366 )
367 elif stream_name == "typing":
368 self.typing_handler.process_replication_rows(token, rows)
369 self.notifier.on_new_event(
370 "typing_key", token, rooms=[row.room_id for row in rows],
371 )
372 elif stream_name == "to_device":
373 entities = [row.entity for row in rows if row.entity.startswith("@")]
374 if entities:
375 self.notifier.on_new_event(
376 "to_device_key", token, users=entities,
377 )
378 elif stream_name == "device_lists":
379 all_room_ids = set()
380 for row in rows:
381 room_ids = yield self.store.get_rooms_for_user(row.user_id)
382 all_room_ids.update(room_ids)
383 self.notifier.on_new_event(
384 "device_list_key", token, rooms=all_room_ids,
385 )
386 elif stream_name == "presence":
387 yield self.presence_handler.process_replication_rows(token, rows)
388 elif stream_name == "receipts":
389 self.notifier.on_new_event(
390 "groups_key", token, users=[row.user_id for row in rows],
391 )
392 except Exception:
393 logger.exception("Error processing replication")
391394
392395
393396 def start(config_options):
3838 from synapse.storage.user_directory import UserDirectoryStore
3939 from synapse.util.caches.stream_change_cache import StreamChangeCache
4040 from synapse.util.httpresourcetree import create_resource_tree
41 from synapse.util.logcontext import LoggingContext, preserve_fn
41 from synapse.util.logcontext import LoggingContext, run_in_background
4242 from synapse.util.manhole import manhole
4343 from synapse.util.versionstring import get_version_string
44 from twisted.internet import reactor
44 from twisted.internet import reactor, defer
4545 from twisted.web.resource import NoResource
4646
4747 logger = logging.getLogger("synapse.app.user_dir")
125125 site_tag,
126126 listener_config,
127127 root_resource,
128 self.version_string,
128129 )
129130 )
130131
163164 stream_name, token, rows
164165 )
165166 if stream_name == "current_state_deltas":
166 preserve_fn(self.user_directory.notify_new_event)()
167 run_in_background(self._notify_directory)
168
169 @defer.inlineCallbacks
170 def _notify_directory(self):
171 try:
172 yield self.user_directory.notify_new_event()
173 except Exception:
174 logger.exception("Error notifiying user directory of state update")
167175
168176
169177 def start(config_options):
5050 from twisted.internet import defer
5151
5252 from synapse.appservice import ApplicationServiceState
53 from synapse.util.logcontext import preserve_fn
53 from synapse.util.logcontext import run_in_background
5454 from synapse.util.metrics import Measure
5555
5656 import logging
105105 def enqueue(self, service, event):
106106 # if this service isn't being sent something
107107 self.queued_events.setdefault(service.id, []).append(event)
108 preserve_fn(self._send_request)(service)
108 run_in_background(self._send_request, service)
109109
110110 @defer.inlineCallbacks
111111 def _send_request(self, service):
151151 if sent:
152152 yield txn.complete(self.store)
153153 else:
154 preserve_fn(self._start_recoverer)(service)
155 except Exception as e:
156 logger.exception(e)
157 preserve_fn(self._start_recoverer)(service)
154 run_in_background(self._start_recoverer, service)
155 except Exception:
156 logger.exception("Error creating appservice transaction")
157 run_in_background(self._start_recoverer, service)
158158
159159 @defer.inlineCallbacks
160160 def on_recovered(self, recoverer):
175175
176176 @defer.inlineCallbacks
177177 def _start_recoverer(self, service):
178 yield self.store.set_appservice_state(
179 service,
180 ApplicationServiceState.DOWN
181 )
182 logger.info(
183 "Application service falling behind. Starting recoverer. AS ID %s",
184 service.id
185 )
186 recoverer = self.recoverer_fn(service, self.on_recovered)
187 self.add_recoverers([recoverer])
188 recoverer.recover()
178 try:
179 yield self.store.set_appservice_state(
180 service,
181 ApplicationServiceState.DOWN
182 )
183 logger.info(
184 "Application service falling behind. Starting recoverer. AS ID %s",
185 service.id
186 )
187 recoverer = self.recoverer_fn(service, self.on_recovered)
188 self.add_recoverers([recoverer])
189 recoverer.recover()
190 except Exception:
191 logger.exception("Error starting AS recoverer")
189192
190193 @defer.inlineCallbacks
191194 def _is_service_up(self, service):
280280 )
281281 if not cls.path_exists(config_dir_path):
282282 os.makedirs(config_dir_path)
283 with open(config_path, "wb") as config_file:
284 config_bytes, config = obj.generate_config(
283 with open(config_path, "w") as config_file:
284 config_str, config = obj.generate_config(
285285 config_dir_path=config_dir_path,
286286 server_name=server_name,
287287 report_stats=(config_args.report_stats == "yes"),
288288 is_generating_file=True
289289 )
290290 obj.invoke_all("generate_files", config)
291 config_file.write(config_bytes)
291 config_file.write(config_str)
292292 print((
293293 "A config file has been generated in %r for server name"
294294 " %r with corresponding SSL keys and self-signed"
1616 from synapse.appservice import ApplicationService
1717 from synapse.types import UserID
1818
19 import urllib
2019 import yaml
2120 import logging
2221
2322 from six import string_types
23 from six.moves.urllib import parse as urlparse
2424
2525 logger = logging.getLogger(__name__)
2626
104104 )
105105
106106 localpart = as_info["sender_localpart"]
107 if urllib.quote(localpart) != localpart:
107 if urlparse.quote(localpart) != localpart:
108108 raise ValueError(
109109 "sender_localpart needs characters which are not URL encoded."
110110 )
116116 log_config = config.get("log_config")
117117 if log_config and not os.path.exists(log_config):
118118 log_file = self.abspath("homeserver.log")
119 with open(log_config, "wb") as log_config_file:
119 with open(log_config, "w") as log_config_file:
120120 log_config_file.write(
121121 DEFAULT_LOG_CONFIG.substitute(log_file=log_file)
122122 )
132132 tls_dh_params_path = config["tls_dh_params_path"]
133133
134134 if not self.path_exists(tls_private_key_path):
135 with open(tls_private_key_path, "w") as private_key_file:
135 with open(tls_private_key_path, "wb") as private_key_file:
136136 tls_private_key = crypto.PKey()
137137 tls_private_key.generate_key(crypto.TYPE_RSA, 2048)
138138 private_key_pem = crypto.dump_privatekey(
147147 )
148148
149149 if not self.path_exists(tls_certificate_path):
150 with open(tls_certificate_path, "w") as certificate_file:
150 with open(tls_certificate_path, "wb") as certificate_file:
151151 cert = crypto.X509()
152152 subject = cert.get_subject()
153153 subject.CN = config["server_name"]
1212 # limitations under the License.
1313
1414 from twisted.internet import ssl
15 from OpenSSL import SSL
16 from twisted.internet._sslverify import _OpenSSLECCurve, _defaultCurveName
15 from OpenSSL import SSL, crypto
16 from twisted.internet._sslverify import _defaultCurveName
1717
1818 import logging
1919
3131 @staticmethod
3232 def configure_context(context, config):
3333 try:
34 _ecCurve = _OpenSSLECCurve(_defaultCurveName)
35 _ecCurve.addECKeyToContext(context)
34 _ecCurve = crypto.get_elliptic_curve(_defaultCurveName)
35 context.set_tmp_ecdh(_ecCurve)
36
3637 except Exception:
3738 logger.exception("Failed to enable elliptic curve for TLS")
3839 context.set_options(SSL.OP_NO_SSLv2 | SSL.OP_NO_SSLv3)
1818 from synapse.util import unwrapFirstError, logcontext
1919 from synapse.util.logcontext import (
2020 PreserveLoggingContext,
21 preserve_fn
21 preserve_fn,
22 run_in_background,
2223 )
2324 from synapse.util.metrics import Measure
2425
126127
127128 verify_requests.append(verify_request)
128129
129 preserve_fn(self._start_key_lookups)(verify_requests)
130 run_in_background(self._start_key_lookups, verify_requests)
130131
131132 # Pass those keys to handle_key_deferred so that the json object
132133 # signatures can be verified
145146 verify_requests (List[VerifyKeyRequest]):
146147 """
147148
148 # create a deferred for each server we're going to look up the keys
149 # for; we'll resolve them once we have completed our lookups.
150 # These will be passed into wait_for_previous_lookups to block
151 # any other lookups until we have finished.
152 # The deferreds are called with no logcontext.
153 server_to_deferred = {
154 rq.server_name: defer.Deferred()
155 for rq in verify_requests
156 }
157
158 # We want to wait for any previous lookups to complete before
159 # proceeding.
160 yield self.wait_for_previous_lookups(
161 [rq.server_name for rq in verify_requests],
162 server_to_deferred,
163 )
164
165 # Actually start fetching keys.
166 self._get_server_verify_keys(verify_requests)
167
168 # When we've finished fetching all the keys for a given server_name,
169 # resolve the deferred passed to `wait_for_previous_lookups` so that
170 # any lookups waiting will proceed.
171 #
172 # map from server name to a set of request ids
173 server_to_request_ids = {}
174
175 for verify_request in verify_requests:
176 server_name = verify_request.server_name
177 request_id = id(verify_request)
178 server_to_request_ids.setdefault(server_name, set()).add(request_id)
179
180 def remove_deferreds(res, verify_request):
181 server_name = verify_request.server_name
182 request_id = id(verify_request)
183 server_to_request_ids[server_name].discard(request_id)
184 if not server_to_request_ids[server_name]:
185 d = server_to_deferred.pop(server_name, None)
186 if d:
187 d.callback(None)
188 return res
189
190 for verify_request in verify_requests:
191 verify_request.deferred.addBoth(
192 remove_deferreds, verify_request,
149 try:
150 # create a deferred for each server we're going to look up the keys
151 # for; we'll resolve them once we have completed our lookups.
152 # These will be passed into wait_for_previous_lookups to block
153 # any other lookups until we have finished.
154 # The deferreds are called with no logcontext.
155 server_to_deferred = {
156 rq.server_name: defer.Deferred()
157 for rq in verify_requests
158 }
159
160 # We want to wait for any previous lookups to complete before
161 # proceeding.
162 yield self.wait_for_previous_lookups(
163 [rq.server_name for rq in verify_requests],
164 server_to_deferred,
193165 )
166
167 # Actually start fetching keys.
168 self._get_server_verify_keys(verify_requests)
169
170 # When we've finished fetching all the keys for a given server_name,
171 # resolve the deferred passed to `wait_for_previous_lookups` so that
172 # any lookups waiting will proceed.
173 #
174 # map from server name to a set of request ids
175 server_to_request_ids = {}
176
177 for verify_request in verify_requests:
178 server_name = verify_request.server_name
179 request_id = id(verify_request)
180 server_to_request_ids.setdefault(server_name, set()).add(request_id)
181
182 def remove_deferreds(res, verify_request):
183 server_name = verify_request.server_name
184 request_id = id(verify_request)
185 server_to_request_ids[server_name].discard(request_id)
186 if not server_to_request_ids[server_name]:
187 d = server_to_deferred.pop(server_name, None)
188 if d:
189 d.callback(None)
190 return res
191
192 for verify_request in verify_requests:
193 verify_request.deferred.addBoth(
194 remove_deferreds, verify_request,
195 )
196 except Exception:
197 logger.exception("Error starting key lookups")
194198
195199 @defer.inlineCallbacks
196200 def wait_for_previous_lookups(self, server_names, server_to_deferred):
312316 if not verify_request.deferred.called:
313317 verify_request.deferred.errback(err)
314318
315 preserve_fn(do_iterations)().addErrback(on_err)
319 run_in_background(do_iterations).addErrback(on_err)
316320
317321 @defer.inlineCallbacks
318322 def get_keys_from_store(self, server_name_and_key_ids):
328332 """
329333 res = yield logcontext.make_deferred_yieldable(defer.gatherResults(
330334 [
331 preserve_fn(self.store.get_server_verify_keys)(
332 server_name, key_ids
335 run_in_background(
336 self.store.get_server_verify_keys,
337 server_name, key_ids,
333338 ).addCallback(lambda ks, server: (server, ks), server_name)
334339 for server_name, key_ids in server_name_and_key_ids
335340 ],
357362
358363 results = yield logcontext.make_deferred_yieldable(defer.gatherResults(
359364 [
360 preserve_fn(get_key)(p_name, p_keys)
365 run_in_background(get_key, p_name, p_keys)
361366 for p_name, p_keys in self.perspective_servers.items()
362367 ],
363368 consumeErrors=True,
397402
398403 results = yield logcontext.make_deferred_yieldable(defer.gatherResults(
399404 [
400 preserve_fn(get_key)(server_name, key_ids)
405 run_in_background(get_key, server_name, key_ids)
401406 for server_name, key_ids in server_name_and_key_ids
402407 ],
403408 consumeErrors=True,
480485
481486 yield logcontext.make_deferred_yieldable(defer.gatherResults(
482487 [
483 preserve_fn(self.store_keys)(
488 run_in_background(
489 self.store_keys,
484490 server_name=server_name,
485491 from_server=perspective_name,
486492 verify_keys=response_keys,
538544
539545 yield logcontext.make_deferred_yieldable(defer.gatherResults(
540546 [
541 preserve_fn(self.store_keys)(
547 run_in_background(
548 self.store_keys,
542549 server_name=key_server_name,
543550 from_server=server_name,
544551 verify_keys=verify_keys,
614621
615622 yield logcontext.make_deferred_yieldable(defer.gatherResults(
616623 [
617 preserve_fn(self.store.store_server_keys_json)(
624 run_in_background(
625 self.store.store_server_keys_json,
618626 server_name=server_name,
619627 key_id=key_id,
620628 from_server=server_name,
715723 # TODO(markjh): Store whether the keys have expired.
716724 return logcontext.make_deferred_yieldable(defer.gatherResults(
717725 [
718 preserve_fn(self.store.store_server_verify_key)(
726 run_in_background(
727 self.store.store_server_verify_key,
719728 server_name, server_name, key.time_added, key
720729 )
721730 for key_id, key in verify_keys.items()
4646
4747
4848 def _event_dict_property(key):
49 # We want to be able to use hasattr with the event dict properties.
50 # However, (on python3) hasattr expects AttributeError to be raised. Hence,
51 # we need to transform the KeyError into an AttributeError
4952 def getter(self):
50 return self._event_dict[key]
53 try:
54 return self._event_dict[key]
55 except KeyError:
56 raise AttributeError(key)
5157
5258 def setter(self, v):
53 self._event_dict[key] = v
59 try:
60 self._event_dict[key] = v
61 except KeyError:
62 raise AttributeError(key)
5463
5564 def delete(self):
56 del self._event_dict[key]
65 try:
66 del self._event_dict[key]
67 except KeyError:
68 raise AttributeError(key)
5769
5870 return property(
5971 getter,
1818 import logging
1919 import random
2020
21 from six.moves import range
22
2123 from twisted.internet import defer
2224
2325 from synapse.api.constants import Membership
3234 import synapse.metrics
3335 from synapse.util import logcontext, unwrapFirstError
3436 from synapse.util.caches.expiringcache import ExpiringCache
35 from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
37 from synapse.util.logcontext import make_deferred_yieldable, run_in_background
3638 from synapse.util.logutils import log_function
3739 from synapse.util.retryutils import NotRetryingDestination
3840
412414
413415 batch_size = 20
414416 missing_events = list(missing_events)
415 for i in xrange(0, len(missing_events), batch_size):
417 for i in range(0, len(missing_events), batch_size):
416418 batch = set(missing_events[i:i + batch_size])
417419
418420 deferreds = [
419 preserve_fn(self.get_pdu)(
421 run_in_background(
422 self.get_pdu,
420423 destinations=random_server_list(),
421424 event_id=e_id,
422425 )
322322 break
323323
324324 yield self._process_presence_inner(states_map.values())
325 except Exception:
326 logger.exception("Error sending presence states to servers")
325327 finally:
326328 self._processing_pending_presence = False
327329
2424 )
2525 from synapse.util.ratelimitutils import FederationRateLimiter
2626 from synapse.util.versionstring import get_version_string
27 from synapse.util.logcontext import preserve_fn
27 from synapse.util.logcontext import run_in_background
2828 from synapse.types import ThirdPartyInstanceID, get_domain_from_id
2929
3030 import functools
151151 # alive
152152 retry_timings = yield self.store.get_destination_retry_timings(origin)
153153 if retry_timings and retry_timings["retry_last_ts"]:
154 run_in_background(self._reset_retry_timings, origin)
155
156 defer.returnValue(origin)
157
158 @defer.inlineCallbacks
159 def _reset_retry_timings(self, origin):
160 try:
154161 logger.info("Marking origin %r as up", origin)
155 preserve_fn(self.store.set_destination_retry_timings)(origin, 0, 0)
156
157 defer.returnValue(origin)
162 yield self.store.set_destination_retry_timings(origin, 0, 0)
163 except Exception:
164 logger.exception("Error resetting retry timings on %s", origin)
158165
159166
160167 class BaseFederationServlet(object):
7373 "previous_ids",
7474 "pdus",
7575 "edus",
76 "transaction_id",
77 "destination",
7876 "pdu_failures",
7977 ]
8078
4141
4242 from synapse.api.errors import SynapseError
4343 from synapse.types import get_domain_from_id
44 from synapse.util.logcontext import preserve_fn
44 from synapse.util.logcontext import run_in_background
4545
4646 from signedjson.sign import sign_json
4747
164164
165165 @defer.inlineCallbacks
166166 def _renew_attestation(group_id, user_id):
167 if not self.is_mine_id(group_id):
168 destination = get_domain_from_id(group_id)
169 elif not self.is_mine_id(user_id):
170 destination = get_domain_from_id(user_id)
171 else:
172 logger.warn(
173 "Incorrectly trying to do attestations for user: %r in %r",
174 user_id, group_id,
167 try:
168 if not self.is_mine_id(group_id):
169 destination = get_domain_from_id(group_id)
170 elif not self.is_mine_id(user_id):
171 destination = get_domain_from_id(user_id)
172 else:
173 logger.warn(
174 "Incorrectly trying to do attestations for user: %r in %r",
175 user_id, group_id,
176 )
177 yield self.store.remove_attestation_renewal(group_id, user_id)
178 return
179
180 attestation = self.attestations.create_attestation(group_id, user_id)
181
182 yield self.transport_client.renew_group_attestation(
183 destination, group_id, user_id,
184 content={"attestation": attestation},
175185 )
176 yield self.store.remove_attestation_renewal(group_id, user_id)
177 return
178186
179 attestation = self.attestations.create_attestation(group_id, user_id)
180
181 yield self.transport_client.renew_group_attestation(
182 destination, group_id, user_id,
183 content={"attestation": attestation},
184 )
185
186 yield self.store.update_attestation_renewal(
187 group_id, user_id, attestation
188 )
187 yield self.store.update_attestation_renewal(
188 group_id, user_id, attestation
189 )
190 except Exception:
191 logger.exception("Error renewing attestation of %r in %r",
192 user_id, group_id)
189193
190194 for row in rows:
191195 group_id = row["group_id"]
192196 user_id = row["user_id"]
193197
194 preserve_fn(_renew_attestation)(group_id, user_id)
198 run_in_background(_renew_attestation, group_id, user_id)
1818 from synapse.api.constants import EventTypes
1919 from synapse.util.metrics import Measure
2020 from synapse.util.logcontext import (
21 make_deferred_yieldable, preserve_fn, run_in_background,
21 make_deferred_yieldable, run_in_background,
2222 )
2323
2424 import logging
110110
111111 # Fork off pushes to these services
112112 for service in services:
113 preserve_fn(self.scheduler.submit_event_for_as)(
114 service, event
115 )
113 self.scheduler.submit_event_for_as(service, event)
116114
117115 @defer.inlineCallbacks
118116 def handle_room_events(events):
197195 services = yield self._get_services_for_3pn(protocol)
198196
199197 results = yield make_deferred_yieldable(defer.DeferredList([
200 preserve_fn(self.appservice_api.query_3pe)(service, kind, protocol, fields)
198 run_in_background(
199 self.appservice_api.query_3pe,
200 service, kind, protocol, fields,
201 )
201202 for service in services
202203 ], consumeErrors=True))
203204
258259 event based on the service regex.
259260 """
260261 services = self.store.get_app_services()
261 interested_list = [
262 s for s in services if (
263 yield s.is_interested(event, self.store)
264 )
265 ]
262
263 # we can't use a list comprehension here. Since python 3, list
264 # comprehensions use a generator internally. This means you can't yield
265 # inside of a list comprehension anymore.
266 interested_list = []
267 for s in services:
268 if (yield s.is_interested(event, self.store)):
269 interested_list.append(s)
270
266271 defer.returnValue(interested_list)
267272
268273 def _get_services_for_user(self, user_id):
00 # -*- coding: utf-8 -*-
1 # Copyright 2017 New Vector Ltd
1 # Copyright 2017, 2018 New Vector Ltd
22 #
33 # Licensed under the Apache License, Version 2.0 (the "License");
44 # you may not use this file except in compliance with the License.
1111 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1212 # See the License for the specific language governing permissions and
1313 # limitations under the License.
14 from twisted.internet import defer
14 from twisted.internet import defer, reactor
1515
1616 from ._base import BaseHandler
17 from synapse.types import UserID, create_requester
18 from synapse.util.logcontext import run_in_background
1719
1820 import logging
1921
2628 super(DeactivateAccountHandler, self).__init__(hs)
2729 self._auth_handler = hs.get_auth_handler()
2830 self._device_handler = hs.get_device_handler()
31 self._room_member_handler = hs.get_room_member_handler()
32
33 # Flag that indicates whether the process to part users from rooms is running
34 self._user_parter_running = False
35
36 # Start the user parter loop so it can resume parting users from rooms where
37 # it left off (if it has work left to do).
38 reactor.callWhenRunning(self._start_user_parting)
2939
3040 @defer.inlineCallbacks
3141 def deactivate_account(self, user_id):
4959
5060 yield self.store.user_delete_threepids(user_id)
5161 yield self.store.user_set_password_hash(user_id, None)
62
63 # Add the user to a table of users penpding deactivation (ie.
64 # removal from all the rooms they're a member of)
65 yield self.store.add_user_pending_deactivation(user_id)
66
67 # Now start the process that goes through that list and
68 # parts users from rooms (if it isn't already running)
69 self._start_user_parting()
70
71 def _start_user_parting(self):
72 """
73 Start the process that goes through the table of users
74 pending deactivation, if it isn't already running.
75
76 Returns:
77 None
78 """
79 if not self._user_parter_running:
80 run_in_background(self._user_parter_loop)
81
82 @defer.inlineCallbacks
83 def _user_parter_loop(self):
84 """Loop that parts deactivated users from rooms
85
86 Returns:
87 None
88 """
89 self._user_parter_running = True
90 logger.info("Starting user parter")
91 try:
92 while True:
93 user_id = yield self.store.get_user_pending_deactivation()
94 if user_id is None:
95 break
96 logger.info("User parter parting %r", user_id)
97 yield self._part_user(user_id)
98 yield self.store.del_user_pending_deactivation(user_id)
99 logger.info("User parter finished parting %r", user_id)
100 logger.info("User parter finished: stopping")
101 finally:
102 self._user_parter_running = False
103
104 @defer.inlineCallbacks
105 def _part_user(self, user_id):
106 """Causes the given user_id to leave all the rooms they're joined to
107
108 Returns:
109 None
110 """
111 user = UserID.from_string(user_id)
112
113 rooms_for_user = yield self.store.get_rooms_for_user(user_id)
114 for room_id in rooms_for_user:
115 logger.info("User parter parting %r from %r", user_id, room_id)
116 try:
117 yield self._room_member_handler.update_membership(
118 create_requester(user),
119 user,
120 room_id,
121 "leave",
122 ratelimit=False,
123 )
124 except Exception:
125 logger.exception(
126 "Failed to part user %r from room %r: ignoring and continuing",
127 user_id, room_id,
128 )
2323 SynapseError, CodeMessageException, FederationDeniedError,
2424 )
2525 from synapse.types import get_domain_from_id, UserID
26 from synapse.util.logcontext import preserve_fn, make_deferred_yieldable
26 from synapse.util.logcontext import make_deferred_yieldable, run_in_background
2727 from synapse.util.retryutils import NotRetryingDestination
2828
2929 logger = logging.getLogger(__name__)
138138 failures[destination] = _exception_to_failure(e)
139139
140140 yield make_deferred_yieldable(defer.gatherResults([
141 preserve_fn(do_remote_query)(destination)
141 run_in_background(do_remote_query, destination)
142142 for destination in remote_queries_not_in_cache
143 ]))
143 ], consumeErrors=True))
144144
145145 defer.returnValue({
146146 "device_keys": results, "failures": failures,
241241 failures[destination] = _exception_to_failure(e)
242242
243243 yield make_deferred_yieldable(defer.gatherResults([
244 preserve_fn(claim_client_keys)(destination)
244 run_in_background(claim_client_keys, destination)
245245 for destination in remote_queries
246 ]))
246 ], consumeErrors=True))
247247
248248 logger.info(
249249 "Claimed one-time-keys: %s",
1515
1616 """Contains handlers for federation events."""
1717
18 import httplib
1918 import itertools
2019 import logging
20 import sys
2121
2222 from signedjson.key import decode_verify_key_bytes
2323 from signedjson.sign import verify_signed_json
24 import six
25 from six.moves import http_client
2426 from twisted.internet import defer
2527 from unpaddedbase64 import decode_base64
2628
636638
637639 results = yield logcontext.make_deferred_yieldable(defer.gatherResults(
638640 [
639 logcontext.preserve_fn(self.replication_layer.get_pdu)(
641 logcontext.run_in_background(
642 self.replication_layer.get_pdu,
640643 [dest],
641644 event_id,
642645 outlier=True,
886889 logger.warn("Rejecting event %s which has %i prev_events",
887890 ev.event_id, len(ev.prev_events))
888891 raise SynapseError(
889 httplib.BAD_REQUEST,
892 http_client.BAD_REQUEST,
890893 "Too many prev_events",
891894 )
892895
894897 logger.warn("Rejecting event %s which has %i auth_events",
895898 ev.event_id, len(ev.auth_events))
896899 raise SynapseError(
897 httplib.BAD_REQUEST,
900 http_client.BAD_REQUEST,
898901 "Too many auth_events",
899902 )
900903
10221025 # lots of requests for missing prev_events which we do actually
10231026 # have. Hence we fire off the deferred, but don't wait for it.
10241027
1025 logcontext.preserve_fn(self._handle_queued_pdus)(room_queue)
1028 logcontext.run_in_background(self._handle_queued_pdus, room_queue)
10261029
10271030 defer.returnValue(True)
10281031
15121515 backfilled=backfilled,
15131516 )
15141517 except: # noqa: E722, as we reraise the exception this is fine.
1515 # Ensure that we actually remove the entries in the push actions
1516 # staging area
1517 logcontext.preserve_fn(
1518 self.store.remove_push_actions_from_staging
1519 )(event.event_id)
1520 raise
1518 tp, value, tb = sys.exc_info()
1519
1520 logcontext.run_in_background(
1521 self.store.remove_push_actions_from_staging,
1522 event.event_id,
1523 )
1524
1525 six.reraise(tp, value, tb)
15211526
15221527 if not backfilled:
15231528 # this intentionally does not yield: we don't care about the result
15241529 # and don't need to wait for it.
1525 logcontext.preserve_fn(self.pusher_pool.on_new_notifications)(
1526 event_stream_id, max_stream_id
1530 logcontext.run_in_background(
1531 self.pusher_pool.on_new_notifications,
1532 event_stream_id, max_stream_id,
15271533 )
15281534
15291535 defer.returnValue((context, event_stream_id, max_stream_id))
15371543 """
15381544 contexts = yield logcontext.make_deferred_yieldable(defer.gatherResults(
15391545 [
1540 logcontext.preserve_fn(self._prep_event)(
1546 logcontext.run_in_background(
1547 self._prep_event,
15411548 origin,
15421549 ev_info["event"],
15431550 state=ev_info.get("state"),
18661873
18671874 different_events = yield logcontext.make_deferred_yieldable(
18681875 defer.gatherResults([
1869 logcontext.preserve_fn(self.store.get_event)(
1876 logcontext.run_in_background(
1877 self.store.get_event,
18701878 d,
18711879 allow_none=True,
18721880 allow_rejected=False,
2626 from synapse.util import unwrapFirstError
2727 from synapse.util.async import concurrently_execute
2828 from synapse.util.caches.snapshot_cache import SnapshotCache
29 from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
29 from synapse.util.logcontext import make_deferred_yieldable, run_in_background
3030 from synapse.visibility import filter_events_for_client
3131
3232 from ._base import BaseHandler
165165 (messages, token), current_state = yield make_deferred_yieldable(
166166 defer.gatherResults(
167167 [
168 preserve_fn(self.store.get_recent_events_for_room)(
168 run_in_background(
169 self.store.get_recent_events_for_room,
169170 event.room_id,
170171 limit=limit,
171172 end_token=room_end_token,
179180 self.store, user_id, messages
180181 )
181182
182 start_token = now_token.copy_and_replace("room_key", token[0])
183 end_token = now_token.copy_and_replace("room_key", token[1])
183 start_token = now_token.copy_and_replace("room_key", token)
184 end_token = now_token.copy_and_replace("room_key", room_end_token)
184185 time_now = self.clock.time_msec()
185186
186187 d["messages"] = {
323324 self.store, user_id, messages, is_peeking=is_peeking
324325 )
325326
326 start_token = StreamToken.START.copy_and_replace("room_key", token[0])
327 end_token = StreamToken.START.copy_and_replace("room_key", token[1])
327 start_token = StreamToken.START.copy_and_replace("room_key", token)
328 end_token = StreamToken.START.copy_and_replace("room_key", stream_token)
328329
329330 time_now = self.clock.time_msec()
330331
390391
391392 presence, receipts, (messages, token) = yield defer.gatherResults(
392393 [
393 preserve_fn(get_presence)(),
394 preserve_fn(get_receipts)(),
395 preserve_fn(self.store.get_recent_events_for_room)(
394 run_in_background(get_presence),
395 run_in_background(get_receipts),
396 run_in_background(
397 self.store.get_recent_events_for_room,
396398 room_id,
397399 limit=limit,
398400 end_token=now_token.room_key,
405407 self.store, user_id, messages, is_peeking=is_peeking,
406408 )
407409
408 start_token = now_token.copy_and_replace("room_key", token[0])
409 end_token = now_token.copy_and_replace("room_key", token[1])
410 start_token = now_token.copy_and_replace("room_key", token)
411 end_token = now_token
410412
411413 time_now = self.clock.time_msec()
412414
1212 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1313 # See the License for the specific language governing permissions and
1414 # limitations under the License.
15 import logging
16 import simplejson
17 import sys
18
19 from canonicaljson import encode_canonical_json
20 import six
1521 from twisted.internet import defer, reactor
1622 from twisted.python.failure import Failure
1723
2430 UserID, RoomAlias, RoomStreamToken,
2531 )
2632 from synapse.util.async import run_on_reactor, ReadWriteLock, Limiter
27 from synapse.util.logcontext import preserve_fn, run_in_background
33 from synapse.util.logcontext import run_in_background
2834 from synapse.util.metrics import measure_func
2935 from synapse.util.frozenutils import frozendict_json_encoder
3036 from synapse.util.stringutils import random_string
3238 from synapse.replication.http.send_event import send_event_to_master
3339
3440 from ._base import BaseHandler
35
36 from canonicaljson import encode_canonical_json
37
38 import logging
39 import simplejson
4041
4142 logger = logging.getLogger(__name__)
4243
732733 except: # noqa: E722, as we reraise the exception this is fine.
733734 # Ensure that we actually remove the entries in the push actions
734735 # staging area, if we calculated them.
735 preserve_fn(self.store.remove_push_actions_from_staging)(event.event_id)
736 raise
736 tp, value, tb = sys.exc_info()
737
738 run_in_background(
739 self.store.remove_push_actions_from_staging,
740 event.event_id,
741 )
742
743 six.reraise(tp, value, tb)
737744
738745 @defer.inlineCallbacks
739746 def persist_and_notify_client_event(
853860
854861 # this intentionally does not yield: we don't care about the result
855862 # and don't need to wait for it.
856 preserve_fn(self.pusher_pool.on_new_notifications)(
863 run_in_background(
864 self.pusher_pool.on_new_notifications,
857865 event_stream_id, max_stream_id
858866 )
859867
860868 @defer.inlineCallbacks
861869 def _notify():
862870 yield run_on_reactor()
863 self.notifier.on_new_room_event(
864 event, event_stream_id, max_stream_id,
865 extra_users=extra_users
866 )
867
868 preserve_fn(_notify)()
871 try:
872 self.notifier.on_new_room_event(
873 event, event_stream_id, max_stream_id,
874 extra_users=extra_users
875 )
876 except Exception:
877 logger.exception("Error notifying about new room event")
878
879 run_in_background(_notify)
869880
870881 if event.type == EventTypes.Message:
871 presence = self.hs.get_presence_handler()
872882 # We don't want to block sending messages on any presence code. This
873883 # matters as sometimes presence code can take a while.
874 preserve_fn(presence.bump_presence_active_time)(requester.user)
884 run_in_background(self._bump_active_time, requester.user)
885
886 @defer.inlineCallbacks
887 def _bump_active_time(self, user):
888 try:
889 presence = self.hs.get_presence_handler()
890 yield presence.bump_presence_active_time(user)
891 except Exception:
892 logger.exception("Error bumping presence active time")
3030
3131 from synapse.util.caches.descriptors import cachedInlineCallbacks
3232 from synapse.util.async import Linearizer
33 from synapse.util.logcontext import preserve_fn
33 from synapse.util.logcontext import run_in_background
3434 from synapse.util.logutils import log_function
3535 from synapse.util.metrics import Measure
3636 from synapse.util.wheel_timer import WheelTimer
254254 logger.info("Finished _persist_unpersisted_changes")
255255
256256 @defer.inlineCallbacks
257 def _update_states_and_catch_exception(self, new_states):
258 try:
259 res = yield self._update_states(new_states)
260 defer.returnValue(res)
261 except Exception:
262 logger.exception("Error updating presence")
263
264 @defer.inlineCallbacks
257265 def _update_states(self, new_states):
258266 """Updates presence of users. Sets the appropriate timeouts. Pokes
259267 the notifier and federation if and only if the changed presence state
363371 now=now,
364372 )
365373
366 preserve_fn(self._update_states)(changes)
374 run_in_background(self._update_states_and_catch_exception, changes)
367375 except Exception:
368376 logger.exception("Exception in _handle_timeouts loop")
369377
421429
422430 @defer.inlineCallbacks
423431 def _end():
424 if affect_presence:
432 try:
425433 self.user_to_num_current_syncs[user_id] -= 1
426434
427435 prev_state = yield self.current_state_for_user(user_id)
428436 yield self._update_states([prev_state.copy_and_replace(
429437 last_user_sync_ts=self.clock.time_msec(),
430438 )])
439 except Exception:
440 logger.exception("Error updating presence after sync")
431441
432442 @contextmanager
433443 def _user_syncing():
434444 try:
435445 yield
436446 finally:
437 preserve_fn(_end)()
447 if affect_presence:
448 run_in_background(_end)
438449
439450 defer.returnValue(_user_syncing())
440451
134134 """Given a list of receipts, works out which remote servers should be
135135 poked and pokes them.
136136 """
137 # TODO: Some of this stuff should be coallesced.
138 for receipt in receipts:
139 room_id = receipt["room_id"]
140 receipt_type = receipt["receipt_type"]
141 user_id = receipt["user_id"]
142 event_ids = receipt["event_ids"]
143 data = receipt["data"]
144
145 users = yield self.state.get_current_user_in_room(room_id)
146 remotedomains = set(get_domain_from_id(u) for u in users)
147 remotedomains = remotedomains.copy()
148 remotedomains.discard(self.server_name)
149
150 logger.debug("Sending receipt to: %r", remotedomains)
151
152 for domain in remotedomains:
153 self.federation.send_edu(
154 destination=domain,
155 edu_type="m.receipt",
156 content={
157 room_id: {
158 receipt_type: {
159 user_id: {
160 "event_ids": event_ids,
161 "data": data,
137 try:
138 # TODO: Some of this stuff should be coallesced.
139 for receipt in receipts:
140 room_id = receipt["room_id"]
141 receipt_type = receipt["receipt_type"]
142 user_id = receipt["user_id"]
143 event_ids = receipt["event_ids"]
144 data = receipt["data"]
145
146 users = yield self.state.get_current_user_in_room(room_id)
147 remotedomains = set(get_domain_from_id(u) for u in users)
148 remotedomains = remotedomains.copy()
149 remotedomains.discard(self.server_name)
150
151 logger.debug("Sending receipt to: %r", remotedomains)
152
153 for domain in remotedomains:
154 self.federation.send_edu(
155 destination=domain,
156 edu_type="m.receipt",
157 content={
158 room_id: {
159 receipt_type: {
160 user_id: {
161 "event_ids": event_ids,
162 "data": data,
163 }
162164 }
163 }
165 },
164166 },
165 },
166 key=(room_id, receipt_type, user_id),
167 )
167 key=(room_id, receipt_type, user_id),
168 )
169 except Exception:
170 logger.exception("Error pushing receipts to remote servers")
168171
169172 @defer.inlineCallbacks
170173 def get_receipts_for_room(self, room_id, to_key):
1313 # limitations under the License.
1414
1515 from twisted.internet import defer
16
17 from six.moves import range
1618
1719 from ._base import BaseHandler
1820
199201 step = len(rooms_to_scan) if len(rooms_to_scan) != 0 else 1
200202
201203 chunk = []
202 for i in xrange(0, len(rooms_to_scan), step):
204 for i in range(0, len(rooms_to_scan), step):
203205 batch = rooms_to_scan[i:i + step]
204206 logger.info("Processing %i rooms for result", len(batch))
205207 yield concurrently_execute(
353353 since_key = since_token.room_key
354354
355355 while limited and len(recents) < timeline_limit and max_repeat:
356 events, end_key = yield self.store.get_room_events_stream_for_room(
357 room_id,
358 limit=load_limit + 1,
359 from_key=since_key,
360 to_key=end_key,
361 )
356 # If we have a since_key then we are trying to get any events
357 # that have happened since `since_key` up to `end_key`, so we
358 # can just use `get_room_events_stream_for_room`.
359 # Otherwise, we want to return the last N events in the room
360 # in toplogical ordering.
361 if since_key:
362 events, end_key = yield self.store.get_room_events_stream_for_room(
363 room_id,
364 limit=load_limit + 1,
365 from_key=since_key,
366 to_key=end_key,
367 )
368 else:
369 events, end_key = yield self.store.get_recent_events_for_room(
370 room_id,
371 limit=load_limit + 1,
372 end_token=end_key,
373 )
362374 loaded_recents = sync_config.filter_collection.filter_room_timeline(
363375 events
364376 )
428440 Returns:
429441 A Deferred map from ((type, state_key)->Event)
430442 """
431 last_events, token = yield self.store.get_recent_events_for_room(
443 last_events, _ = yield self.store.get_recent_events_for_room(
432444 room_id, end_token=stream_position.room_key, limit=1,
433445 )
434446
1515 from twisted.internet import defer
1616
1717 from synapse.api.errors import SynapseError, AuthError
18 from synapse.util.logcontext import preserve_fn
18 from synapse.util.logcontext import run_in_background
1919 from synapse.util.metrics import Measure
2020 from synapse.util.wheel_timer import WheelTimer
2121 from synapse.types import UserID, get_domain_from_id
9696 if self.hs.is_mine_id(member.user_id):
9797 last_fed_poke = self._member_last_federation_poke.get(member, None)
9898 if not last_fed_poke or last_fed_poke + FEDERATION_PING_INTERVAL <= now:
99 preserve_fn(self._push_remote)(
99 run_in_background(
100 self._push_remote,
100101 member=member,
101102 typing=True
102103 )
195196 def _push_update(self, member, typing):
196197 if self.hs.is_mine_id(member.user_id):
197198 # Only send updates for changes to our own users.
198 preserve_fn(self._push_remote)(member, typing)
199 run_in_background(self._push_remote, member, typing)
199200
200201 self._push_update_local(
201202 member=member,
204205
205206 @defer.inlineCallbacks
206207 def _push_remote(self, member, typing):
207 users = yield self.state.get_current_user_in_room(member.room_id)
208 self._member_last_federation_poke[member] = self.clock.time_msec()
209
210 now = self.clock.time_msec()
211 self.wheel_timer.insert(
212 now=now,
213 obj=member,
214 then=now + FEDERATION_PING_INTERVAL,
215 )
216
217 for domain in set(get_domain_from_id(u) for u in users):
218 if domain != self.server_name:
219 self.federation.send_edu(
220 destination=domain,
221 edu_type="m.typing",
222 content={
223 "room_id": member.room_id,
224 "user_id": member.user_id,
225 "typing": typing,
226 },
227 key=member,
228 )
208 try:
209 users = yield self.state.get_current_user_in_room(member.room_id)
210 self._member_last_federation_poke[member] = self.clock.time_msec()
211
212 now = self.clock.time_msec()
213 self.wheel_timer.insert(
214 now=now,
215 obj=member,
216 then=now + FEDERATION_PING_INTERVAL,
217 )
218
219 for domain in set(get_domain_from_id(u) for u in users):
220 if domain != self.server_name:
221 self.federation.send_edu(
222 destination=domain,
223 edu_type="m.typing",
224 content={
225 "room_id": member.room_id,
226 "user_id": member.user_id,
227 "typing": typing,
228 },
229 key=member,
230 )
231 except Exception:
232 logger.exception("Error pushing typing notif to remotes")
229233
230234 @defer.inlineCallbacks
231235 def _recv_edu(self, origin, content):
00 # -*- coding: utf-8 -*-
11 # Copyright 2014-2016 OpenMarket Ltd
2 # Copyright 2018 New Vector Ltd
23 #
34 # Licensed under the Apache License, Version 2.0 (the "License");
45 # you may not use this file except in compliance with the License.
1112 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1213 # See the License for the specific language governing permissions and
1314 # limitations under the License.
15 from twisted.internet.defer import CancelledError
16 from twisted.python import failure
17
18 from synapse.api.errors import SynapseError
19
20
21 class RequestTimedOutError(SynapseError):
22 """Exception representing timeout of an outbound request"""
23 def __init__(self):
24 super(RequestTimedOutError, self).__init__(504, "Timed out")
25
26
27 def cancelled_to_request_timed_out_error(value, timeout):
28 """Turns CancelledErrors into RequestTimedOutErrors.
29
30 For use with async.add_timeout_to_deferred
31 """
32 if isinstance(value, failure.Failure):
33 value.trap(CancelledError)
34 raise RequestTimedOutError()
35 return value
1212 # See the License for the specific language governing permissions and
1313 # limitations under the License.
1414
15 from synapse.http.server import wrap_request_handler
15 from synapse.http.server import wrap_json_request_handler
1616 from twisted.web.resource import Resource
1717 from twisted.web.server import NOT_DONE_YET
1818
4141 Resource.__init__(self)
4242 self._handler = handler
4343
44 # these are required by the request_handler wrapper
45 self.version_string = hs.version_string
44 # required by the request_handler wrapper
4645 self.clock = hs.get_clock()
4746
4847 def render(self, request):
4948 self._async_render(request)
5049 return NOT_DONE_YET
5150
52 @wrap_request_handler
51 @wrap_json_request_handler
5352 def _async_render(self, request):
5453 return self._handler(request)
00 # -*- coding: utf-8 -*-
11 # Copyright 2014-2016 OpenMarket Ltd
2 # Copyright 2018 New Vector Ltd
23 #
34 # Licensed under the Apache License, Version 2.0 (the "License");
45 # you may not use this file except in compliance with the License.
1718 from synapse.api.errors import (
1819 CodeMessageException, MatrixCodeMessageException, SynapseError, Codes,
1920 )
21 from synapse.http import cancelled_to_request_timed_out_error
22 from synapse.util.async import add_timeout_to_deferred
2023 from synapse.util.caches import CACHE_SIZE_FACTOR
2124 from synapse.util.logcontext import make_deferred_yieldable
22 from synapse.util import logcontext
2325 import synapse.metrics
2426 from synapse.http.endpoint import SpiderEndpoint
2527
3739 from twisted.web.http_headers import Headers
3840 from twisted.web._newclient import ResponseDone
3941
40 from StringIO import StringIO
42 from six import StringIO
4143
4244 import simplejson as json
4345 import logging
9496 # counters to it
9597 outgoing_requests_counter.inc(method)
9698
97 def send_request():
99 logger.info("Sending request %s %s", method, uri)
100
101 try:
98102 request_deferred = self.agent.request(
99103 method, uri, *args, **kwargs
100104 )
101
102 return self.clock.time_bound_deferred(
105 add_timeout_to_deferred(
103106 request_deferred,
104 time_out=60,
105 )
106
107 logger.info("Sending request %s %s", method, uri)
108
109 try:
110 with logcontext.PreserveLoggingContext():
111 response = yield send_request()
107 60, cancelled_to_request_timed_out_error,
108 )
109 response = yield make_deferred_yieldable(request_deferred)
112110
113111 incoming_responses_counter.inc(method, response.code)
114112 logger.info(
508506 reactor,
509507 SpiderEndpointFactory(hs)
510508 )
511 ), [('gzip', GzipDecoder)]
509 ), [(b'gzip', GzipDecoder)]
512510 )
513511 # We could look like Chrome:
514512 # self.user_agent = ("Mozilla/5.0 (%s) (KHTML, like Gecko)
114114 if time.time() - self.last_request >= 2.5 * 60:
115115 self.abort()
116116 # Abort the underlying TLS connection. The abort() method calls
117 # loseConnection() on the underlying TLS connection which tries to
117 # loseConnection() on the TLS connection which tries to
118118 # shutdown the connection cleanly. We call abortConnection()
119 # since that will promptly close the underlying TCP connection.
120 self.transport.abortConnection()
119 # since that will promptly close the TLS connection.
120 #
121 # In Twisted >18.4; the TLS connection will be None if it has closed
122 # which will make abortConnection() throw. Check that the TLS connection
123 # is not None before trying to close it.
124 if self.transport.getHandle() is not None:
125 self.transport.abortConnection()
121126
122127 def request(self, request):
123128 self.last_request = time.time()
285290 if (len(answers) == 1
286291 and answers[0].type == dns.SRV
287292 and answers[0].payload
288 and answers[0].payload.target == dns.Name('.')):
293 and answers[0].payload.target == dns.Name(b'.')):
289294 raise ConnectError("Service %s unavailable" % service_name)
290295
291296 for answer in answers:
00 # -*- coding: utf-8 -*-
11 # Copyright 2014-2016 OpenMarket Ltd
2 # Copyright 2018 New Vector Ltd
23 #
34 # Licensed under the Apache License, Version 2.0 (the "License");
45 # you may not use this file except in compliance with the License.
1112 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1213 # See the License for the specific language governing permissions and
1314 # limitations under the License.
14 import synapse.util.retryutils
1515 from twisted.internet import defer, reactor, protocol
1616 from twisted.internet.error import DNSLookupError
1717 from twisted.web.client import readBody, HTTPConnectionPool, Agent
1818 from twisted.web.http_headers import Headers
1919 from twisted.web._newclient import ResponseDone
2020
21 from synapse.http import cancelled_to_request_timed_out_error
2122 from synapse.http.endpoint import matrix_federation_endpoint
22 from synapse.util.async import sleep
23 import synapse.metrics
24 from synapse.util.async import sleep, add_timeout_to_deferred
2325 from synapse.util import logcontext
24 import synapse.metrics
26 from synapse.util.logcontext import make_deferred_yieldable
27 import synapse.util.retryutils
2528
2629 from canonicaljson import encode_canonical_json
2730
3740 import random
3841 import sys
3942 import urllib
40 import urlparse
41
43 from six.moves.urllib import parse as urlparse
4244
4345 logger = logging.getLogger(__name__)
4446 outbound_logger = logging.getLogger("synapse.http.outbound")
183185 producer = body_callback(method, http_url_bytes, headers_dict)
184186
185187 try:
186 def send_request():
187 request_deferred = self.agent.request(
188 method,
189 url_bytes,
190 Headers(headers_dict),
191 producer
192 )
193
194 return self.clock.time_bound_deferred(
195 request_deferred,
196 time_out=timeout / 1000. if timeout else 60,
197 )
198
199 with logcontext.PreserveLoggingContext():
200 response = yield send_request()
188 request_deferred = self.agent.request(
189 method,
190 url_bytes,
191 Headers(headers_dict),
192 producer
193 )
194 add_timeout_to_deferred(
195 request_deferred,
196 timeout / 1000. if timeout else 60,
197 cancelled_to_request_timed_out_error,
198 )
199 response = yield make_deferred_yieldable(
200 request_deferred,
201 )
201202
202203 log_result = "%d %s" % (response.code, response.phrase,)
203204 break
0 # -*- coding: utf-8 -*-
1 # Copyright 2014-2016 OpenMarket Ltd
2 # Copyright 2018 New Vector Ltd
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15
16 import logging
17
18 import synapse.metrics
19 from synapse.util.logcontext import LoggingContext
20
21 logger = logging.getLogger(__name__)
22
23 metrics = synapse.metrics.get_metrics_for("synapse.http.server")
24
25 # total number of responses served, split by method/servlet/tag
26 response_count = metrics.register_counter(
27 "response_count",
28 labels=["method", "servlet", "tag"],
29 alternative_names=(
30 # the following are all deprecated aliases for the same metric
31 metrics.name_prefix + x for x in (
32 "_requests",
33 "_response_time:count",
34 "_response_ru_utime:count",
35 "_response_ru_stime:count",
36 "_response_db_txn_count:count",
37 "_response_db_txn_duration:count",
38 )
39 )
40 )
41
42 requests_counter = metrics.register_counter(
43 "requests_received",
44 labels=["method", "servlet", ],
45 )
46
47 outgoing_responses_counter = metrics.register_counter(
48 "responses",
49 labels=["method", "code"],
50 )
51
52 response_timer = metrics.register_counter(
53 "response_time_seconds",
54 labels=["method", "servlet", "tag"],
55 alternative_names=(
56 metrics.name_prefix + "_response_time:total",
57 ),
58 )
59
60 response_ru_utime = metrics.register_counter(
61 "response_ru_utime_seconds", labels=["method", "servlet", "tag"],
62 alternative_names=(
63 metrics.name_prefix + "_response_ru_utime:total",
64 ),
65 )
66
67 response_ru_stime = metrics.register_counter(
68 "response_ru_stime_seconds", labels=["method", "servlet", "tag"],
69 alternative_names=(
70 metrics.name_prefix + "_response_ru_stime:total",
71 ),
72 )
73
74 response_db_txn_count = metrics.register_counter(
75 "response_db_txn_count", labels=["method", "servlet", "tag"],
76 alternative_names=(
77 metrics.name_prefix + "_response_db_txn_count:total",
78 ),
79 )
80
81 # seconds spent waiting for db txns, excluding scheduling time, when processing
82 # this request
83 response_db_txn_duration = metrics.register_counter(
84 "response_db_txn_duration_seconds", labels=["method", "servlet", "tag"],
85 alternative_names=(
86 metrics.name_prefix + "_response_db_txn_duration:total",
87 ),
88 )
89
90 # seconds spent waiting for a db connection, when processing this request
91 response_db_sched_duration = metrics.register_counter(
92 "response_db_sched_duration_seconds", labels=["method", "servlet", "tag"]
93 )
94
95 # size in bytes of the response written
96 response_size = metrics.register_counter(
97 "response_size", labels=["method", "servlet", "tag"]
98 )
99
100
101 class RequestMetrics(object):
102 def start(self, time_msec, name):
103 self.start = time_msec
104 self.start_context = LoggingContext.current_context()
105 self.name = name
106
107 def stop(self, time_msec, request):
108 context = LoggingContext.current_context()
109
110 tag = ""
111 if context:
112 tag = context.tag
113
114 if context != self.start_context:
115 logger.warn(
116 "Context have unexpectedly changed %r, %r",
117 context, self.start_context
118 )
119 return
120
121 outgoing_responses_counter.inc(request.method, str(request.code))
122
123 response_count.inc(request.method, self.name, tag)
124
125 response_timer.inc_by(
126 time_msec - self.start, request.method,
127 self.name, tag
128 )
129
130 ru_utime, ru_stime = context.get_resource_usage()
131
132 response_ru_utime.inc_by(
133 ru_utime, request.method, self.name, tag
134 )
135 response_ru_stime.inc_by(
136 ru_stime, request.method, self.name, tag
137 )
138 response_db_txn_count.inc_by(
139 context.db_txn_count, request.method, self.name, tag
140 )
141 response_db_txn_duration.inc_by(
142 context.db_txn_duration_ms / 1000., request.method, self.name, tag
143 )
144 response_db_sched_duration.inc_by(
145 context.db_sched_duration_ms / 1000., request.method, self.name, tag
146 )
147
148 response_size.inc_by(request.sentLength, request.method, self.name, tag)
1717 from synapse.api.errors import (
1818 cs_exception, SynapseError, CodeMessageException, UnrecognizedRequestError, Codes
1919 )
20 from synapse.http.request_metrics import (
21 requests_counter,
22 )
2023 from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
2124 from synapse.util.caches import intern_dict
2225 from synapse.util.metrics import Measure
4043
4144 logger = logging.getLogger(__name__)
4245
43 metrics = synapse.metrics.get_metrics_for(__name__)
44
45 # total number of responses served, split by method/servlet/tag
46 response_count = metrics.register_counter(
47 "response_count",
48 labels=["method", "servlet", "tag"],
49 alternative_names=(
50 # the following are all deprecated aliases for the same metric
51 metrics.name_prefix + x for x in (
52 "_requests",
53 "_response_time:count",
54 "_response_ru_utime:count",
55 "_response_ru_stime:count",
56 "_response_db_txn_count:count",
57 "_response_db_txn_duration:count",
58 )
59 )
60 )
61
62 requests_counter = metrics.register_counter(
63 "requests_received",
64 labels=["method", "servlet", ],
65 )
66
67 outgoing_responses_counter = metrics.register_counter(
68 "responses",
69 labels=["method", "code"],
70 )
71
72 response_timer = metrics.register_counter(
73 "response_time_seconds",
74 labels=["method", "servlet", "tag"],
75 alternative_names=(
76 metrics.name_prefix + "_response_time:total",
77 ),
78 )
79
80 response_ru_utime = metrics.register_counter(
81 "response_ru_utime_seconds", labels=["method", "servlet", "tag"],
82 alternative_names=(
83 metrics.name_prefix + "_response_ru_utime:total",
84 ),
85 )
86
87 response_ru_stime = metrics.register_counter(
88 "response_ru_stime_seconds", labels=["method", "servlet", "tag"],
89 alternative_names=(
90 metrics.name_prefix + "_response_ru_stime:total",
91 ),
92 )
93
94 response_db_txn_count = metrics.register_counter(
95 "response_db_txn_count", labels=["method", "servlet", "tag"],
96 alternative_names=(
97 metrics.name_prefix + "_response_db_txn_count:total",
98 ),
99 )
100
101 # seconds spent waiting for db txns, excluding scheduling time, when processing
102 # this request
103 response_db_txn_duration = metrics.register_counter(
104 "response_db_txn_duration_seconds", labels=["method", "servlet", "tag"],
105 alternative_names=(
106 metrics.name_prefix + "_response_db_txn_duration:total",
107 ),
108 )
109
110 # seconds spent waiting for a db connection, when processing this request
111 response_db_sched_duration = metrics.register_counter(
112 "response_db_sched_duration_seconds", labels=["method", "servlet", "tag"]
113 )
114
115 # size in bytes of the response written
116 response_size = metrics.register_counter(
117 "response_size", labels=["method", "servlet", "tag"]
118 )
119
120 _next_request_id = 0
121
122
123 def request_handler(include_metrics=False):
124 """Decorator for ``wrap_request_handler``"""
125 return lambda request_handler: wrap_request_handler(request_handler, include_metrics)
126
127
128 def wrap_request_handler(request_handler, include_metrics=False):
129 """Wraps a method that acts as a request handler with the necessary logging
130 and exception handling.
131
132 The method must have a signature of "handle_foo(self, request)". The
133 argument "self" must have "version_string" and "clock" attributes. The
134 argument "request" must be a twisted HTTP request.
135
136 The method must return a deferred. If the deferred succeeds we assume that
46
47 def wrap_json_request_handler(h):
48 """Wraps a request handler method with exception handling.
49
50 Also adds logging as per wrap_request_handler_with_logging.
51
52 The handler method must have a signature of "handle_foo(self, request)",
53 where "self" must have a "clock" attribute (and "request" must be a
54 SynapseRequest).
55
56 The handler must return a deferred. If the deferred succeeds we assume that
13757 a response has been sent. If the deferred fails with a SynapseError we use
13858 it to send a JSON response with the appropriate HTTP reponse code. If the
13959 deferred fails with any other type of error we send a 500 reponse.
140
141 We insert a unique request-id into the logging context for this request and
142 log the response and duration for this request.
14360 """
14461
14562 @defer.inlineCallbacks
14663 def wrapped_request_handler(self, request):
147 global _next_request_id
148 request_id = "%s-%s" % (request.method, _next_request_id)
149 _next_request_id += 1
150
64 try:
65 yield h(self, request)
66 except CodeMessageException as e:
67 code = e.code
68 if isinstance(e, SynapseError):
69 logger.info(
70 "%s SynapseError: %s - %s", request, code, e.msg
71 )
72 else:
73 logger.exception(e)
74 respond_with_json(
75 request, code, cs_exception(e), send_cors=True,
76 pretty_print=_request_user_agent_is_curl(request),
77 )
78
79 except Exception:
80 # failure.Failure() fishes the original Failure out
81 # of our stack, and thus gives us a sensible stack
82 # trace.
83 f = failure.Failure()
84 logger.error(
85 "Failed handle request via %r: %r: %s",
86 h,
87 request,
88 f.getTraceback().rstrip(),
89 )
90 respond_with_json(
91 request,
92 500,
93 {
94 "error": "Internal server error",
95 "errcode": Codes.UNKNOWN,
96 },
97 send_cors=True,
98 pretty_print=_request_user_agent_is_curl(request),
99 )
100
101 return wrap_request_handler_with_logging(wrapped_request_handler)
102
103
104 def wrap_request_handler_with_logging(h):
105 """Wraps a request handler to provide logging and metrics
106
107 The handler method must have a signature of "handle_foo(self, request)",
108 where "self" must have a "clock" attribute (and "request" must be a
109 SynapseRequest).
110
111 As well as calling `request.processing` (which will log the response and
112 duration for this request), the wrapped request handler will insert the
113 request id into the logging context.
114 """
115 @defer.inlineCallbacks
116 def wrapped_request_handler(self, request):
117 """
118 Args:
119 self:
120 request (synapse.http.site.SynapseRequest):
121 """
122
123 request_id = request.get_request_id()
151124 with LoggingContext(request_id) as request_context:
125 request_context.request = request_id
152126 with Measure(self.clock, "wrapped_request_handler"):
153 request_metrics = RequestMetrics()
154127 # we start the request metrics timer here with an initial stab
155128 # at the servlet name. For most requests that name will be
156129 # JsonResource (or a subclass), and JsonResource._async_render
157130 # will update it once it picks a servlet.
158131 servlet_name = self.__class__.__name__
159 request_metrics.start(self.clock, name=servlet_name)
160
161 request_context.request = request_id
162 with request.processing():
163 try:
164 with PreserveLoggingContext(request_context):
165 if include_metrics:
166 yield request_handler(self, request, request_metrics)
167 else:
168 requests_counter.inc(request.method, servlet_name)
169 yield request_handler(self, request)
170 except CodeMessageException as e:
171 code = e.code
172 if isinstance(e, SynapseError):
173 logger.info(
174 "%s SynapseError: %s - %s", request, code, e.msg
175 )
176 else:
177 logger.exception(e)
178 outgoing_responses_counter.inc(request.method, str(code))
179 respond_with_json(
180 request, code, cs_exception(e), send_cors=True,
181 pretty_print=_request_user_agent_is_curl(request),
182 version_string=self.version_string,
183 )
184 except Exception:
185 # failure.Failure() fishes the original Failure out
186 # of our stack, and thus gives us a sensible stack
187 # trace.
188 f = failure.Failure()
189 logger.error(
190 "Failed handle request %s.%s on %r: %r: %s",
191 request_handler.__module__,
192 request_handler.__name__,
193 self,
194 request,
195 f.getTraceback().rstrip(),
196 )
197 respond_with_json(
198 request,
199 500,
200 {
201 "error": "Internal server error",
202 "errcode": Codes.UNKNOWN,
203 },
204 send_cors=True,
205 pretty_print=_request_user_agent_is_curl(request),
206 version_string=self.version_string,
207 )
208 finally:
209 try:
210 request_metrics.stop(
211 self.clock, request
212 )
213 except Exception as e:
214 logger.warn("Failed to stop metrics: %r", e)
132 with request.processing(servlet_name):
133 with PreserveLoggingContext(request_context):
134 d = h(self, request)
135
136 # record the arrival of the request *after*
137 # dispatching to the handler, so that the handler
138 # can update the servlet name in the request
139 # metrics
140 requests_counter.inc(request.method,
141 request.request_metrics.name)
142 yield d
215143 return wrapped_request_handler
216144
217145
261189 self.canonical_json = canonical_json
262190 self.clock = hs.get_clock()
263191 self.path_regexs = {}
264 self.version_string = hs.version_string
265192 self.hs = hs
266193
267194 def register_paths(self, method, path_patterns, callback):
277204 self._async_render(request)
278205 return server.NOT_DONE_YET
279206
280 # Disable metric reporting because _async_render does its own metrics.
281 # It does its own metric reporting because _async_render dispatches to
282 # a callback and it's the class name of that callback we want to report
283 # against rather than the JsonResource itself.
284 @request_handler(include_metrics=True)
207 @wrap_json_request_handler
285208 @defer.inlineCallbacks
286 def _async_render(self, request, request_metrics):
209 def _async_render(self, request):
287210 """ This gets called from render() every time someone sends us a request.
288211 This checks if anyone has registered a callback for that method and
289212 path.
295218 servlet_classname = servlet_instance.__class__.__name__
296219 else:
297220 servlet_classname = "%r" % callback
298
299 request_metrics.name = servlet_classname
300 requests_counter.inc(request.method, servlet_classname)
221 request.request_metrics.name = servlet_classname
301222
302223 # Now trigger the callback. If it returns a response, we send it
303224 # here. If it throws an exception, that is handled by the wrapper
344265
345266 def _send_response(self, request, code, response_json_object,
346267 response_code_message=None):
347 outgoing_responses_counter.inc(request.method, str(code))
348
349268 # TODO: Only enable CORS for the requests that need it.
350269 respond_with_json(
351270 request, code, response_json_object,
352271 send_cors=True,
353272 response_code_message=response_code_message,
354273 pretty_print=_request_user_agent_is_curl(request),
355 version_string=self.version_string,
356274 canonical_json=self.canonical_json,
357275 )
358276
383301 request (twisted.web.http.Request):
384302 """
385303 raise UnrecognizedRequestError()
386
387
388 class RequestMetrics(object):
389 def start(self, clock, name):
390 self.start = clock.time_msec()
391 self.start_context = LoggingContext.current_context()
392 self.name = name
393
394 def stop(self, clock, request):
395 context = LoggingContext.current_context()
396
397 tag = ""
398 if context:
399 tag = context.tag
400
401 if context != self.start_context:
402 logger.warn(
403 "Context have unexpectedly changed %r, %r",
404 context, self.start_context
405 )
406 return
407
408 response_count.inc(request.method, self.name, tag)
409
410 response_timer.inc_by(
411 clock.time_msec() - self.start, request.method,
412 self.name, tag
413 )
414
415 ru_utime, ru_stime = context.get_resource_usage()
416
417 response_ru_utime.inc_by(
418 ru_utime, request.method, self.name, tag
419 )
420 response_ru_stime.inc_by(
421 ru_stime, request.method, self.name, tag
422 )
423 response_db_txn_count.inc_by(
424 context.db_txn_count, request.method, self.name, tag
425 )
426 response_db_txn_duration.inc_by(
427 context.db_txn_duration_ms / 1000., request.method, self.name, tag
428 )
429 response_db_sched_duration.inc_by(
430 context.db_sched_duration_ms / 1000., request.method, self.name, tag
431 )
432
433 response_size.inc_by(request.sentLength, request.method, self.name, tag)
434304
435305
436306 class RootRedirect(resource.Resource):
451321
452322 def respond_with_json(request, code, json_object, send_cors=False,
453323 response_code_message=None, pretty_print=False,
454 version_string="", canonical_json=True):
324 canonical_json=True):
455325 # could alternatively use request.notifyFinish() and flip a flag when
456326 # the Deferred fires, but since the flag is RIGHT THERE it seems like
457327 # a waste.
473343 request, code, json_bytes,
474344 send_cors=send_cors,
475345 response_code_message=response_code_message,
476 version_string=version_string
477346 )
478347
479348
480349 def respond_with_json_bytes(request, code, json_bytes, send_cors=False,
481 version_string="", response_code_message=None):
350 response_code_message=None):
482351 """Sends encoded JSON in response to the given request.
483352
484353 Args:
492361
493362 request.setResponseCode(code, message=response_code_message)
494363 request.setHeader(b"Content-Type", b"application/json")
495 request.setHeader(b"Server", version_string)
496364 request.setHeader(b"Content-Length", b"%d" % (len(json_bytes),))
497365 request.setHeader(b"Cache-Control", b"no-cache, no-store, must-revalidate")
498366
545413 b"User-Agent", default=[]
546414 )
547415 for user_agent in user_agents:
548 if "curl" in user_agent:
416 if b"curl" in user_agent:
549417 return True
550418 return False
1111 # See the License for the specific language governing permissions and
1212 # limitations under the License.
1313
14 from synapse.util.logcontext import LoggingContext
15 from twisted.web.server import Site, Request
16
1714 import contextlib
1815 import logging
1916 import re
2017 import time
2118
19 from twisted.web.server import Site, Request
20
21 from synapse.http.request_metrics import RequestMetrics
22 from synapse.util.logcontext import LoggingContext
23
24 logger = logging.getLogger(__name__)
25
2226 ACCESS_TOKEN_RE = re.compile(br'(\?.*access(_|%5[Ff])token=)[^&]*(.*)$')
2327
28 _next_request_seq = 0
29
2430
2531 class SynapseRequest(Request):
32 """Class which encapsulates an HTTP request to synapse.
33
34 All of the requests processed in synapse are of this type.
35
36 It extends twisted's twisted.web.server.Request, and adds:
37 * Unique request ID
38 * Redaction of access_token query-params in __repr__
39 * Logging at start and end
40 * Metrics to record CPU, wallclock and DB time by endpoint.
41
42 It provides a method `processing` which should be called by the Resource
43 which is handling the request, and returns a context manager.
44
45 """
2646 def __init__(self, site, *args, **kw):
2747 Request.__init__(self, *args, **kw)
2848 self.site = site
2949 self.authenticated_entity = None
3050 self.start_time = 0
51
52 global _next_request_seq
53 self.request_seq = _next_request_seq
54 _next_request_seq += 1
3155
3256 def __repr__(self):
3357 # We overwrite this so that we don't log ``access_token``
4064 self.site.site_tag,
4165 )
4266
67 def get_request_id(self):
68 return "%s-%i" % (self.method, self.request_seq)
69
4370 def get_redacted_uri(self):
4471 return ACCESS_TOKEN_RE.sub(
4572 br'\1<redacted>\3',
4976 def get_user_agent(self):
5077 return self.requestHeaders.getRawHeaders(b"User-Agent", [None])[-1]
5178
52 def started_processing(self):
79 def render(self, resrc):
80 # override the Server header which is set by twisted
81 self.setHeader("Server", self.site.server_version_string)
82 return Request.render(self, resrc)
83
84 def _started_processing(self, servlet_name):
85 self.start_time = int(time.time() * 1000)
86 self.request_metrics = RequestMetrics()
87 self.request_metrics.start(self.start_time, name=servlet_name)
88
5389 self.site.access_logger.info(
5490 "%s - %s - Received request: %s %s",
5591 self.getClientIP(),
5793 self.method,
5894 self.get_redacted_uri()
5995 )
60 self.start_time = int(time.time() * 1000)
61
62 def finished_processing(self):
63
96
97 def _finished_processing(self):
6498 try:
6599 context = LoggingContext.current_context()
66100 ru_utime, ru_stime = context.get_resource_usage()
71105 ru_utime, ru_stime = (0, 0)
72106 db_txn_count, db_txn_duration_ms = (0, 0)
73107
108 end_time = int(time.time() * 1000)
109
74110 self.site.access_logger.info(
75111 "%s - %s - {%s}"
76112 " Processed request: %dms (%dms, %dms) (%dms/%dms/%d)"
78114 self.getClientIP(),
79115 self.site.site_tag,
80116 self.authenticated_entity,
81 int(time.time() * 1000) - self.start_time,
117 end_time - self.start_time,
82118 int(ru_utime * 1000),
83119 int(ru_stime * 1000),
84120 db_sched_duration_ms,
92128 self.get_user_agent(),
93129 )
94130
131 try:
132 self.request_metrics.stop(end_time, self)
133 except Exception as e:
134 logger.warn("Failed to stop metrics: %r", e)
135
95136 @contextlib.contextmanager
96 def processing(self):
97 self.started_processing()
137 def processing(self, servlet_name):
138 """Record the fact that we are processing this request.
139
140 Returns a context manager; the correct way to use this is:
141
142 @defer.inlineCallbacks
143 def handle_request(request):
144 with request.processing("FooServlet"):
145 yield really_handle_the_request()
146
147 This will log the request's arrival. Once the context manager is
148 closed, the completion of the request will be logged, and the various
149 metrics will be updated.
150
151 Args:
152 servlet_name (str): the name of the servlet which will be
153 processing this request. This is used in the metrics.
154
155 It is possible to update this afterwards by updating
156 self.request_metrics.servlet_name.
157 """
158 # TODO: we should probably just move this into render() and finish(),
159 # to save having to call a separate method.
160 self._started_processing(servlet_name)
98161 yield
99 self.finished_processing()
162 self._finished_processing()
100163
101164
102165 class XForwardedForRequest(SynapseRequest):
134197 Subclass of a twisted http Site that does access logging with python's
135198 standard logging
136199 """
137 def __init__(self, logger_name, site_tag, config, resource, *args, **kwargs):
200 def __init__(self, logger_name, site_tag, config, resource,
201 server_version_string, *args, **kwargs):
138202 Site.__init__(self, resource, *args, **kwargs)
139203
140204 self.site_tag = site_tag
142206 proxied = config.get("x_forwarded", False)
143207 self.requestFactory = SynapseRequestFactory(self, proxied)
144208 self.access_logger = logging.getLogger(logger_name)
209 self.server_version_string = server_version_string
145210
146211 def log(self, request):
147212 pass
1515
1616 from itertools import chain
1717 import logging
18 import re
1819
1920 logger = logging.getLogger(__name__)
2021
5556 return not len(self.labels)
5657
5758 def _render_labelvalue(self, value):
58 # TODO: escape backslashes, quotes and newlines
59 return '"%s"' % (value)
59 return '"%s"' % (_escape_label_value(value),)
6060
6161 def _render_key(self, values):
6262 if self.is_scalar():
7070 """Render this metric for a single set of labels
7171
7272 Args:
73 label_values (list[str]): values for each of the labels
73 label_values (list[object]): values for each of the labels,
74 (which get stringified).
7475 value: value of the metric at with these labels
7576
7677 Returns:
298299 "process_psutil_rss:total %d" % sum_rss,
299300 "process_psutil_rss:count %d" % len_rss,
300301 ]
302
303
304 def _escape_character(m):
305 """Replaces a single character with its escape sequence.
306
307 Args:
308 m (re.MatchObject): A match object whose first group is the single
309 character to replace
310
311 Returns:
312 str
313 """
314 c = m.group(1)
315 if c == "\\":
316 return "\\\\"
317 elif c == "\"":
318 return "\\\""
319 elif c == "\n":
320 return "\\n"
321 return c
322
323
324 def _escape_label_value(value):
325 """Takes a label value and escapes quotes, newlines and backslashes
326 """
327 return re.sub(r"([\n\"\\])", _escape_character, str(value))
1313 # limitations under the License.
1414
1515 from twisted.internet import defer
16
1617 from synapse.api.constants import EventTypes, Membership
1718 from synapse.api.errors import AuthError
1819 from synapse.handlers.presence import format_user_presence_state
1920
20 from synapse.util import DeferredTimedOutError
2121 from synapse.util.logutils import log_function
22 from synapse.util.async import ObservableDeferred
23 from synapse.util.logcontext import PreserveLoggingContext, preserve_fn
22 from synapse.util.async import (
23 ObservableDeferred, add_timeout_to_deferred,
24 DeferredTimeoutError,
25 )
26 from synapse.util.logcontext import PreserveLoggingContext, run_in_background
2427 from synapse.util.metrics import Measure
2528 from synapse.types import StreamToken
2629 from synapse.visibility import filter_events_for_client
250253 def _on_new_room_event(self, event, room_stream_id, extra_users=[]):
251254 """Notify any user streams that are interested in this room event"""
252255 # poke any interested application service.
253 preserve_fn(self.appservice_handler.notify_interested_services)(
254 room_stream_id
255 )
256 run_in_background(self._notify_app_services, room_stream_id)
256257
257258 if self.federation_sender:
258259 self.federation_sender.notify_new_events(room_stream_id)
265266 users=extra_users,
266267 rooms=[event.room_id],
267268 )
269
270 @defer.inlineCallbacks
271 def _notify_app_services(self, room_stream_id):
272 try:
273 yield self.appservice_handler.notify_interested_services(room_stream_id)
274 except Exception:
275 logger.exception("Error notifying application services of event")
268276
269277 def on_new_event(self, stream_key, new_token, users=[], rooms=[]):
270278 """ Used to inform listeners that something has happend event wise.
330338 # Now we wait for the _NotifierUserStream to be told there
331339 # is a new token.
332340 listener = user_stream.new_listener(prev_token)
341 add_timeout_to_deferred(
342 listener.deferred,
343 (end_time - now) / 1000.,
344 )
333345 with PreserveLoggingContext():
334 yield self.clock.time_bound_deferred(
335 listener.deferred,
336 time_out=(end_time - now) / 1000.
337 )
346 yield listener.deferred
338347
339348 current_token = user_stream.current_token
340349
345354 # Update the prev_token to the current_token since nothing
346355 # has happened between the old prev_token and the current_token
347356 prev_token = current_token
348 except DeferredTimedOutError:
357 except DeferredTimeoutError:
349358 break
350359 except defer.CancelledError:
351360 break
550559 if end_time <= now:
551560 break
552561
562 add_timeout_to_deferred(
563 listener.deferred.addTimeout,
564 (end_time - now) / 1000.,
565 )
553566 try:
554567 with PreserveLoggingContext():
555 yield self.clock.time_bound_deferred(
556 listener.deferred,
557 time_out=(end_time - now) / 1000.
558 )
559 except DeferredTimedOutError:
568 yield listener.deferred
569 except DeferredTimeoutError:
560570 break
561571 except defer.CancelledError:
562572 break
7676 @defer.inlineCallbacks
7777 def on_started(self):
7878 if self.mailer is not None:
79 self.throttle_params = yield self.store.get_throttle_params_by_room(
80 self.pusher_id
81 )
82 yield self._process()
79 try:
80 self.throttle_params = yield self.store.get_throttle_params_by_room(
81 self.pusher_id
82 )
83 yield self._process()
84 except Exception:
85 logger.exception("Error starting email pusher")
8386
8487 def on_stop(self):
8588 if self.timed_call:
1717 from twisted.internet import defer, reactor
1818 from twisted.internet.error import AlreadyCalled, AlreadyCancelled
1919
20 import push_rule_evaluator
21 import push_tools
20 from . import push_rule_evaluator
21 from . import push_tools
2222 import synapse
2323 from synapse.push import PusherConfigException
2424 from synapse.util.logcontext import LoggingContext
9393
9494 @defer.inlineCallbacks
9595 def on_started(self):
96 yield self._process()
96 try:
97 yield self._process()
98 except Exception:
99 logger.exception("Error starting http pusher")
97100
98101 @defer.inlineCallbacks
99102 def on_new_notifications(self, min_stream_ordering, max_stream_ordering):
1212 # See the License for the specific language governing permissions and
1313 # limitations under the License.
1414
15 from httppusher import HttpPusher
15 from .httppusher import HttpPusher
1616
1717 import logging
1818 logger = logging.getLogger(__name__)
1313 # See the License for the specific language governing permissions and
1414 # limitations under the License.
1515
16 import logging
17
1618 from twisted.internet import defer
1719
18 from .pusher import PusherFactory
19 from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
20 from synapse.push.pusher import PusherFactory
2021 from synapse.util.async import run_on_reactor
21
22 import logging
22 from synapse.util.logcontext import make_deferred_yieldable, run_in_background
2323
2424 logger = logging.getLogger(__name__)
2525
136136 if u in self.pushers:
137137 for p in self.pushers[u].values():
138138 deferreds.append(
139 preserve_fn(p.on_new_notifications)(
140 min_stream_id, max_stream_id
139 run_in_background(
140 p.on_new_notifications,
141 min_stream_id, max_stream_id,
141142 )
142143 )
143144
144 yield make_deferred_yieldable(defer.gatherResults(deferreds))
145 yield make_deferred_yieldable(
146 defer.gatherResults(deferreds, consumeErrors=True),
147 )
145148 except Exception:
146149 logger.exception("Exception in pusher on_new_notifications")
147150
163166 if u in self.pushers:
164167 for p in self.pushers[u].values():
165168 deferreds.append(
166 preserve_fn(p.on_new_receipts)(min_stream_id, max_stream_id)
169 run_in_background(
170 p.on_new_receipts,
171 min_stream_id, max_stream_id,
172 )
167173 )
168174
169 yield make_deferred_yieldable(defer.gatherResults(deferreds))
175 yield make_deferred_yieldable(
176 defer.gatherResults(deferreds, consumeErrors=True),
177 )
170178 except Exception:
171179 logger.exception("Exception in pusher on_new_receipts")
172180
206214 if appid_pushkey in byuser:
207215 byuser[appid_pushkey].on_stop()
208216 byuser[appid_pushkey] = p
209 preserve_fn(p.on_started)()
217 run_in_background(p.on_started)
210218
211219 logger.info("Started pushers")
212220
3838 "signedjson>=1.0.0": ["signedjson>=1.0.0"],
3939 "pynacl>=1.2.1": ["nacl>=1.2.1", "nacl.bindings"],
4040 "service_identity>=1.0.0": ["service_identity>=1.0.0"],
41 "Twisted>=16.0.0": ["twisted>=16.0.0"],
4142
42 # we break under Twisted 18.4
43 # (https://github.com/matrix-org/synapse/issues/3135)
44 "Twisted>=16.0.0,<18.4": ["twisted>=16.0.0"],
43 # We use crypto.get_elliptic_curve which is only supported in >=0.15
44 "pyopenssl>=0.15": ["OpenSSL>=0.15"],
4545
46 "pyopenssl>=0.14": ["OpenSSL>=0.14"],
4746 "pyyaml": ["yaml"],
4847 "pyasn1": ["pyasn1"],
4948 "daemonize": ["daemonize"],
5252 from twisted.protocols.basic import LineOnlyReceiver
5353 from twisted.python.failure import Failure
5454
55 from commands import (
55 from .commands import (
5656 COMMAND_MAP, VALID_CLIENT_COMMANDS, VALID_SERVER_COMMANDS,
5757 ErrorCommand, ServerCommand, RdataCommand, PositionCommand, PingCommand,
5858 NameCommand, ReplicateCommand, UserSyncCommand, SyncCommand,
5959 )
60 from streams import STREAMS_MAP
60 from .streams import STREAMS_MAP
6161
6262 from synapse.util.stringutils import random_string
6363 from synapse.metrics.metric import CounterMetric
1717 from twisted.internet import defer, reactor
1818 from twisted.internet.protocol import Factory
1919
20 from streams import STREAMS_MAP, FederationStream
21 from protocol import ServerReplicationStreamProtocol
20 from .streams import STREAMS_MAP, FederationStream
21 from .protocol import ServerReplicationStreamProtocol
2222
2323 from synapse.util.metrics import Measure, measure_func
2424
167167 yield self.store.find_first_stream_ordering_after_ts(ts)
168168 )
169169
170 (_, depth, _) = (
170 room_event_after_stream_ordering = (
171171 yield self.store.get_room_event_after_stream_ordering(
172172 room_id, stream_ordering,
173173 )
174174 )
175 if room_event_after_stream_ordering:
176 (_, depth, _) = room_event_after_stream_ordering
177 else:
178 logger.warn(
179 "[purge] purging events not possible: No event found "
180 "(received_ts %i => stream_ordering %i)",
181 ts, stream_ordering,
182 )
183 raise SynapseError(
184 404,
185 "there is no event to be purged",
186 errcode=Codes.NOT_FOUND,
187 )
175188 logger.info(
176189 "[purge] purging up to depth %i (received_ts %i => "
177190 "stream_ordering %i)",
5151 """A base Synapse REST Servlet for the client version 1 API.
5252 """
5353
54 # This subclass was presumably created to allow the auth for the v1
55 # protocol version to be different, however this behaviour was removed.
56 # it may no longer be necessary
57
5458 def __init__(self, hs):
5559 """
5660 Args:
5862 """
5963 self.hs = hs
6064 self.builder_factory = hs.get_event_builder_factory()
61 self.auth = hs.get_v1auth()
65 self.auth = hs.get_auth()
6266 self.txns = HttpTransactionCache(hs.get_clock())
2424
2525 import simplejson as json
2626 import urllib
27 import urlparse
27 from six.moves.urllib import parse as urlparse
2828
2929 import logging
3030 from saml2 import BINDING_HTTP_POST
149149 super(RestServlet, self).__init__()
150150 self.hs = hs
151151 self.notifier = hs.get_notifier()
152 self.auth = hs.get_v1auth()
152 self.auth = hs.get_auth()
153153 self.pusher_pool = self.hs.get_pusherpool()
154154
155155 @defer.inlineCallbacks
175175
176176 request.setResponseCode(200)
177177 request.setHeader(b"Content-Type", b"text/html; charset=utf-8")
178 request.setHeader(b"Server", self.hs.version_string)
179178 request.setHeader(b"Content-Length", b"%d" % (
180179 len(PushersRemoveRestServlet.SUCCESS_HTML),
181180 ))
2828 from hashlib import sha1
2929 import hmac
3030 import logging
31
32 from six import string_types
3133
3234 logger = logging.getLogger(__name__)
3335
332334 def _do_shared_secret(self, request, register_json, session):
333335 yield run_on_reactor()
334336
335 if not isinstance(register_json.get("mac", None), basestring):
337 if not isinstance(register_json.get("mac", None), string_types):
336338 raise SynapseError(400, "Expected mac.")
337 if not isinstance(register_json.get("user", None), basestring):
339 if not isinstance(register_json.get("user", None), string_types):
338340 raise SynapseError(400, "Expected 'user' key.")
339 if not isinstance(register_json.get("password", None), basestring):
341 if not isinstance(register_json.get("password", None), string_types):
340342 raise SynapseError(400, "Expected 'password' key.")
341343
342344 if not self.hs.config.registration_shared_secret:
357359 got_mac = str(register_json["mac"])
358360
359361 want_mac = hmac.new(
360 key=self.hs.config.registration_shared_secret,
362 key=self.hs.config.registration_shared_secret.encode(),
361363 digestmod=sha1,
362364 )
363365 want_mac.update(user)
364 want_mac.update("\x00")
366 want_mac.update(b"\x00")
365367 want_mac.update(password)
366 want_mac.update("\x00")
367 want_mac.update("admin" if admin else "notadmin")
368 want_mac.update(b"\x00")
369 want_mac.update(b"admin" if admin else b"notadmin")
368370 want_mac = want_mac.hexdigest()
369371
370372 if compare_digest(want_mac, got_mac):
2727 parse_json_object_from_request, parse_string, parse_integer
2828 )
2929
30 from six.moves.urllib import parse as urlparse
31
3032 import logging
31 import urllib
3233 import simplejson as json
3334
3435 logger = logging.getLogger(__name__)
432433 as_client_event = "raw" not in request.args
433434 filter_bytes = request.args.get("filter", None)
434435 if filter_bytes:
435 filter_json = urllib.unquote(filter_bytes[-1]).decode("UTF-8")
436 filter_json = urlparse.unquote(filter_bytes[-1]).decode("UTF-8")
436437 event_filter = Filter(json.loads(filter_json))
437438 else:
438439 event_filter = None
717718 def on_PUT(self, request, room_id, user_id):
718719 requester = yield self.auth.get_user_by_req(request)
719720
720 room_id = urllib.unquote(room_id)
721 target_user = UserID.from_string(urllib.unquote(user_id))
721 room_id = urlparse.unquote(room_id)
722 target_user = UserID.from_string(urlparse.unquote(user_id))
722723
723724 content = parse_json_object_from_request(request)
724725
128128 html_bytes = html.encode("utf8")
129129 request.setResponseCode(200)
130130 request.setHeader(b"Content-Type", b"text/html; charset=utf-8")
131 request.setHeader(b"Server", self.hs.version_string)
132131 request.setHeader(b"Content-Length", b"%d" % (len(html_bytes),))
133132
134133 request.write(html_bytes)
174173 html_bytes = html.encode("utf8")
175174 request.setResponseCode(200)
176175 request.setHeader(b"Content-Type", b"text/html; charset=utf-8")
177 request.setHeader(b"Server", self.hs.version_string)
178176 request.setHeader(b"Content-Length", b"%d" % (len(html_bytes),))
179177
180178 request.write(html_bytes)
8787 pa["topological_ordering"], pa["stream_ordering"]
8888 )
8989 returned_push_actions.append(returned_pa)
90 next_token = pa["stream_ordering"]
90 next_token = str(pa["stream_ordering"])
9191
9292 defer.returnValue((200, {
9393 "notifications": returned_push_actions,
3434 from synapse.util.async import run_on_reactor
3535 from synapse.util.ratelimitutils import FederationRateLimiter
3636
37 from six import string_types
38
3739
3840 # We ought to be using hmac.compare_digest() but on older pythons it doesn't
3941 # exist. It's a _really minor_ security flaw to use plain string comparison
209211 # in sessions. Pull out the username/password provided to us.
210212 desired_password = None
211213 if 'password' in body:
212 if (not isinstance(body['password'], basestring) or
214 if (not isinstance(body['password'], string_types) or
213215 len(body['password']) > 512):
214216 raise SynapseError(400, "Invalid password")
215217 desired_password = body["password"]
216218
217219 desired_username = None
218220 if 'username' in body:
219 if (not isinstance(body['username'], basestring) or
221 if (not isinstance(body['username'], string_types) or
220222 len(body['username']) > 512):
221223 raise SynapseError(400, "Invalid username")
222224 desired_username = body['username']
242244
243245 access_token = get_access_token_from_request(request)
244246
245 if isinstance(desired_username, basestring):
247 if isinstance(desired_username, string_types):
246248 result = yield self._do_appservice_registration(
247249 desired_username, access_token, body
248250 )
463465 # includes the password and admin flag in the hashed text. Why are
464466 # these different?
465467 want_mac = hmac.new(
466 key=self.hs.config.registration_shared_secret,
468 key=self.hs.config.registration_shared_secret.encode(),
467469 msg=user,
468470 digestmod=sha1,
469471 ).hexdigest()
4848 """
4949
5050 def __init__(self, hs):
51 self.version_string = hs.version_string
5251 self.response_body = encode_canonical_json(
5352 self.response_json_object(hs.config)
5453 )
8382 def render_GET(self, request):
8483 return respond_with_json_bytes(
8584 request, 200, self.response_body,
86 version_string=self.version_string
8785 )
8886
8987 def getChild(self, name, request):
6262 isLeaf = True
6363
6464 def __init__(self, hs):
65 self.version_string = hs.version_string
6665 self.config = hs.config
6766 self.clock = hs.clock
6867 self.update_response_body(self.clock.time_msec())
114113 self.update_response_body(time_now)
115114 return respond_with_json_bytes(
116115 request, 200, self.response_body,
117 version_string=self.version_string
118116 )
1111 # See the License for the specific language governing permissions and
1212 # limitations under the License.
1313
14 from synapse.http.server import request_handler, respond_with_json_bytes
14 from synapse.http.server import (
15 respond_with_json_bytes, wrap_json_request_handler,
16 )
1517 from synapse.http.servlet import parse_integer, parse_json_object_from_request
1618 from synapse.api.errors import SynapseError, Codes
1719 from synapse.crypto.keyring import KeyLookupError
9092 def __init__(self, hs):
9193 self.keyring = hs.get_keyring()
9294 self.store = hs.get_datastore()
93 self.version_string = hs.version_string
9495 self.clock = hs.get_clock()
9596 self.federation_domain_whitelist = hs.config.federation_domain_whitelist
9697
9899 self.async_render_GET(request)
99100 return NOT_DONE_YET
100101
101 @request_handler()
102 @wrap_json_request_handler
102103 @defer.inlineCallbacks
103104 def async_render_GET(self, request):
104105 if len(request.postpath) == 1:
123124 self.async_render_POST(request)
124125 return NOT_DONE_YET
125126
126 @request_handler()
127 @wrap_json_request_handler
127128 @defer.inlineCallbacks
128129 def async_render_POST(self, request):
129130 content = parse_json_object_from_request(request)
239240
240241 respond_with_json_bytes(
241242 request, 200, result_io.getvalue(),
242 version_string=self.version_string
243 )
243 )
2727
2828 import logging
2929 import urllib
30 import urlparse
30 from six.moves.urllib import parse as urlparse
3131
3232 logger = logging.getLogger(__name__)
3333
142142 respond_404(request)
143143 return
144144
145 logger.debug("Responding to media request with responder %s")
145146 add_file_headers(request, media_type, file_size, upload_name)
146147 with responder:
147148 yield responder.write_to_consumer(request)
1111 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1212 # See the License for the specific language governing permissions and
1313 # limitations under the License.
14 import logging
15
16 from twisted.internet import defer
17 from twisted.web.resource import Resource
18 from twisted.web.server import NOT_DONE_YET
19
20 from synapse.http.server import (
21 set_cors_headers,
22 wrap_json_request_handler,
23 )
1424 import synapse.http.servlet
15
1625 from ._base import parse_media_id, respond_404
17 from twisted.web.resource import Resource
18 from synapse.http.server import request_handler, set_cors_headers
19
20 from twisted.web.server import NOT_DONE_YET
21 from twisted.internet import defer
22
23 import logging
2426
2527 logger = logging.getLogger(__name__)
2628
3436 self.media_repo = media_repo
3537 self.server_name = hs.hostname
3638
37 # Both of these are expected by @request_handler()
39 # this is expected by @wrap_json_request_handler
3840 self.clock = hs.get_clock()
39 self.version_string = hs.version_string
4041
4142 def render_GET(self, request):
4243 self._async_render_GET(request)
4344 return NOT_DONE_YET
4445
45 @request_handler()
46 @wrap_json_request_handler
4647 @defer.inlineCallbacks
4748 def _async_render_GET(self, request):
4849 set_cors_headers(request)
4646
4747 import cgi
4848 import logging
49 import urlparse
49 from six.moves.urllib import parse as urlparse
5050
5151 logger = logging.getLogger(__name__)
5252
254254 self.open_file = open_file
255255
256256 def write_to_consumer(self, consumer):
257 return FileSender().beginFileTransfer(self.open_file, consumer)
257 return make_deferred_yieldable(
258 FileSender().beginFileTransfer(self.open_file, consumer)
259 )
258260
259261 def __exit__(self, exc_type, exc_val, exc_tb):
260262 self.open_file.close()
3434 from synapse.api.errors import (
3535 SynapseError, Codes,
3636 )
37 from synapse.util.logcontext import preserve_fn, make_deferred_yieldable
37 from synapse.util.logcontext import make_deferred_yieldable, run_in_background
3838 from synapse.util.stringutils import random_string
3939 from synapse.util.caches.expiringcache import ExpiringCache
4040 from synapse.http.client import SpiderHttpClient
4141 from synapse.http.server import (
42 request_handler, respond_with_json_bytes,
42 respond_with_json_bytes,
4343 respond_with_json,
44 wrap_json_request_handler,
4445 )
4546 from synapse.util.async import ObservableDeferred
4647 from synapse.util.stringutils import is_ascii
5657
5758 self.auth = hs.get_auth()
5859 self.clock = hs.get_clock()
59 self.version_string = hs.version_string
6060 self.filepaths = media_repo.filepaths
6161 self.max_spider_size = hs.config.max_spider_size
6262 self.server_name = hs.hostname
8989 self._async_render_GET(request)
9090 return NOT_DONE_YET
9191
92 @request_handler()
92 @wrap_json_request_handler
9393 @defer.inlineCallbacks
9494 def _async_render_GET(self, request):
9595
143143 observable = self._cache.get(url)
144144
145145 if not observable:
146 download = preserve_fn(self._do_preview)(
146 download = run_in_background(
147 self._do_preview,
147148 url, requester.user, ts,
148149 )
149150 observable = ObservableDeferred(
1717 from .media_storage import FileResponder
1818
1919 from synapse.config._base import Config
20 from synapse.util.logcontext import preserve_fn
20 from synapse.util.logcontext import run_in_background
2121
2222 import logging
2323 import os
8686 return self.backend.store_file(path, file_info)
8787 else:
8888 # TODO: Handle errors.
89 preserve_fn(self.backend.store_file)(path, file_info)
89 def store():
90 try:
91 return self.backend.store_file(path, file_info)
92 except Exception:
93 logger.exception("Error storing file")
94 run_in_background(store)
9095 return defer.succeed(None)
9196
9297 def fetch(self, path, file_info):
1313 # limitations under the License.
1414
1515
16 import logging
17
18 from twisted.internet import defer
19 from twisted.web.resource import Resource
20 from twisted.web.server import NOT_DONE_YET
21
22 from synapse.http.server import (
23 set_cors_headers,
24 wrap_json_request_handler,
25 )
26 from synapse.http.servlet import parse_integer, parse_string
1627 from ._base import (
17 parse_media_id, respond_404, respond_with_file, FileInfo,
28 FileInfo, parse_media_id, respond_404, respond_with_file,
1829 respond_with_responder,
1930 )
20 from twisted.web.resource import Resource
21 from synapse.http.servlet import parse_string, parse_integer
22 from synapse.http.server import request_handler, set_cors_headers
23
24 from twisted.web.server import NOT_DONE_YET
25 from twisted.internet import defer
26
27 import logging
2831
2932 logger = logging.getLogger(__name__)
3033
4043 self.media_storage = media_storage
4144 self.dynamic_thumbnails = hs.config.dynamic_thumbnails
4245 self.server_name = hs.hostname
43 self.version_string = hs.version_string
4446 self.clock = hs.get_clock()
4547
4648 def render_GET(self, request):
4749 self._async_render_GET(request)
4850 return NOT_DONE_YET
4951
50 @request_handler()
52 @wrap_json_request_handler
5153 @defer.inlineCallbacks
5254 def _async_render_GET(self, request):
5355 set_cors_headers(request)
1212 # See the License for the specific language governing permissions and
1313 # limitations under the License.
1414
15 from synapse.http.server import respond_with_json, request_handler
15 import logging
16
17 from twisted.internet import defer
18 from twisted.web.resource import Resource
19 from twisted.web.server import NOT_DONE_YET
1620
1721 from synapse.api.errors import SynapseError
18
19 from twisted.web.server import NOT_DONE_YET
20 from twisted.internet import defer
21
22 from twisted.web.resource import Resource
23
24 import logging
22 from synapse.http.server import (
23 respond_with_json,
24 wrap_json_request_handler,
25 )
2526
2627 logger = logging.getLogger(__name__)
2728
3940 self.server_name = hs.hostname
4041 self.auth = hs.get_auth()
4142 self.max_upload_size = hs.config.max_upload_size
42 self.version_string = hs.version_string
4343 self.clock = hs.get_clock()
4444
4545 def render_POST(self, request):
5050 respond_with_json(request, 200, {}, send_cors=True)
5151 return NOT_DONE_YET
5252
53 @request_handler()
53 @wrap_json_request_handler
5454 @defer.inlineCallbacks
5555 def _async_render_POST(self, request):
5656 requester = yield self.auth.get_user_by_req(request)
8080 headers = request.requestHeaders
8181
8282 if headers.hasHeader("Content-Type"):
83 media_type = headers.getRawHeaders("Content-Type")[0]
83 media_type = headers.getRawHeaders(b"Content-Type")[0]
8484 else:
8585 raise SynapseError(
8686 msg="Upload request missing 'Content-Type'",
8787 code=400,
8888 )
8989
90 # if headers.hasHeader("Content-Disposition"):
91 # disposition = headers.getRawHeaders("Content-Disposition")[0]
90 # if headers.hasHeader(b"Content-Disposition"):
91 # disposition = headers.getRawHeaders(b"Content-Disposition")[0]
9292 # TODO(markjh): parse content-dispostion
9393
9494 content_uri = yield self.media_repo.create_content(
104104 'federation_client',
105105 'federation_server',
106106 'handlers',
107 'v1auth',
108107 'auth',
109108 'state_handler',
110109 'state_resolution_handler',
224223 def build_simple_http_client(self):
225224 return SimpleHttpClient(self)
226225
227 def build_v1auth(self):
228 orf = Auth(self)
229 # Matrix spec makes no reference to what HTTP status code is returned,
230 # but the V1 API uses 403 where it means 401, and the webclient
231 # relies on this behaviour, so V1 gets its own copy of the auth
232 # with backwards compat behaviour.
233 orf.TOKEN_NOT_FOUND_HTTP_STATUS = 403
234 return orf
235
236226 def build_state_handler(self):
237227 return StateHandler(self)
238228
447447 "add_push_actions_to_staging", _add_push_actions_to_staging_txn
448448 )
449449
450 @defer.inlineCallbacks
450451 def remove_push_actions_from_staging(self, event_id):
451452 """Called if we failed to persist the event to ensure that stale push
452453 actions don't build up in the DB
455456 event_id (str)
456457 """
457458
458 return self._simple_delete(
459 table="event_push_actions_staging",
460 keyvalues={
461 "event_id": event_id,
462 },
463 desc="remove_push_actions_from_staging",
464 )
459 try:
460 res = yield self._simple_delete(
461 table="event_push_actions_staging",
462 keyvalues={
463 "event_id": event_id,
464 },
465 desc="remove_push_actions_from_staging",
466 )
467 defer.returnValue(res)
468 except Exception:
469 # this method is called from an exception handler, so propagating
470 # another exception here really isn't helpful - there's nothing
471 # the caller can do about it. Just log the exception and move on.
472 logger.exception(
473 "Error removing push actions after event persistence failure",
474 )
465475
466476 @defer.inlineCallbacks
467477 def _find_stream_orderings_for_times(self):
2020
2121 import simplejson as json
2222 from twisted.internet import defer
23
2423
2524 from synapse.storage.events_worker import EventsWorkerStore
2625 from synapse.util.async import ObservableDeferred
424423 )
425424 current_state = yield self._get_new_state_after_events(
426425 room_id,
427 ev_ctx_rm, new_latest_event_ids,
426 ev_ctx_rm,
427 latest_event_ids,
428 new_latest_event_ids,
428429 )
429430 if current_state is not None:
430431 current_state_for_room[room_id] = current_state
512513 defer.returnValue(new_latest_event_ids)
513514
514515 @defer.inlineCallbacks
515 def _get_new_state_after_events(self, room_id, events_context, new_latest_event_ids):
516 def _get_new_state_after_events(self, room_id, events_context, old_latest_event_ids,
517 new_latest_event_ids):
516518 """Calculate the current state dict after adding some new events to
517519 a room
518520
522524
523525 events_context (list[(EventBase, EventContext)]):
524526 events and contexts which are being added to the room
527
528 old_latest_event_ids (iterable[str]):
529 the old forward extremities for the room.
525530
526531 new_latest_event_ids (iterable[str]):
527532 the new forward extremities for the room.
533538 """
534539
535540 if not new_latest_event_ids:
536 defer.returnValue({})
541 return
537542
538543 # map from state_group to ((type, key) -> event_id) state map
539 state_groups = {}
540 missing_event_ids = []
541 was_updated = False
544 state_groups_map = {}
545 for ev, ctx in events_context:
546 if ctx.state_group is None:
547 # I don't think this can happen, but let's double-check
548 raise Exception(
549 "Context for new extremity event %s has no state "
550 "group" % (ev.event_id, ),
551 )
552
553 if ctx.state_group in state_groups_map:
554 continue
555
556 state_groups_map[ctx.state_group] = ctx.current_state_ids
557
558 # We need to map the event_ids to their state groups. First, let's
559 # check if the event is one we're persisting, in which case we can
560 # pull the state group from its context.
561 # Otherwise we need to pull the state group from the database.
562
563 # Set of events we need to fetch groups for. (We know none of the old
564 # extremities are going to be in events_context).
565 missing_event_ids = set(old_latest_event_ids)
566
567 event_id_to_state_group = {}
542568 for event_id in new_latest_event_ids:
543 # First search in the list of new events we're adding,
544 # and then use the current state from that
569 # First search in the list of new events we're adding.
545570 for ev, ctx in events_context:
546571 if event_id == ev.event_id:
547 if ctx.current_state_ids is None:
548 raise Exception("Unknown current state")
549
550 if ctx.state_group is None:
551 # I don't think this can happen, but let's double-check
552 raise Exception(
553 "Context for new extremity event %s has no state "
554 "group" % (event_id, ),
555 )
556
557 # If we've already seen the state group don't bother adding
558 # it to the state sets again
559 if ctx.state_group not in state_groups:
560 state_groups[ctx.state_group] = ctx.current_state_ids
561 if ctx.delta_ids or hasattr(ev, "state_key"):
562 was_updated = True
572 event_id_to_state_group[event_id] = ctx.state_group
563573 break
564574 else:
565575 # If we couldn't find it, then we'll need to pull
566576 # the state from the database
567 was_updated = True
568 missing_event_ids.append(event_id)
569
570 if not was_updated:
571 return
577 missing_event_ids.add(event_id)
572578
573579 if missing_event_ids:
574 # Now pull out the state for any missing events from DB
580 # Now pull out the state groups for any missing events from DB
575581 event_to_groups = yield self._get_state_group_for_events(
576582 missing_event_ids,
577583 )
578
579 groups = set(event_to_groups.itervalues()) - set(state_groups.iterkeys())
580
581 if groups:
582 group_to_state = yield self._get_state_for_groups(groups)
583 state_groups.update(group_to_state)
584
585 if len(state_groups) == 1:
584 event_id_to_state_group.update(event_to_groups)
585
586 # State groups of old_latest_event_ids
587 old_state_groups = set(
588 event_id_to_state_group[evid] for evid in old_latest_event_ids
589 )
590
591 # State groups of new_latest_event_ids
592 new_state_groups = set(
593 event_id_to_state_group[evid] for evid in new_latest_event_ids
594 )
595
596 # If they old and new groups are the same then we don't need to do
597 # anything.
598 if old_state_groups == new_state_groups:
599 return
600
601 # Now that we have calculated new_state_groups we need to get
602 # their state IDs so we can resolve to a single state set.
603 missing_state = new_state_groups - set(state_groups_map)
604 if missing_state:
605 group_to_state = yield self._get_state_for_groups(missing_state)
606 state_groups_map.update(group_to_state)
607
608 if len(new_state_groups) == 1:
586609 # If there is only one state group, then we know what the current
587610 # state is.
588 defer.returnValue(state_groups.values()[0])
611 defer.returnValue(state_groups_map[new_state_groups.pop()])
612
613 # Ok, we need to defer to the state handler to resolve our state sets.
589614
590615 def get_events(ev_ids):
591616 return self.get_events(
592617 ev_ids, get_prev_content=False, check_redacted=False,
593618 )
619
620 state_groups = {
621 sg: state_groups_map[sg] for sg in new_state_groups
622 }
623
594624 events_map = {ev.event_id: ev for ev, _ in events_context}
595625 logger.debug("calling resolve_state_groups from preserve_events")
596626 res = yield self._state_resolution_handler.resolve_state_groups(
1919 from synapse.events.utils import prune_event
2020
2121 from synapse.util.logcontext import (
22 preserve_fn, PreserveLoggingContext, make_deferred_yieldable
22 PreserveLoggingContext, make_deferred_yieldable, run_in_background,
2323 )
2424 from synapse.util.metrics import Measure
2525 from synapse.api.errors import SynapseError
318318
319319 res = yield make_deferred_yieldable(defer.gatherResults(
320320 [
321 preserve_fn(self._get_event_from_row)(
321 run_in_background(
322 self._get_event_from_row,
322323 row["internal_metadata"], row["json"], row["redacts"],
323324 rejected_reason=row["rejects"],
324325 )
2020 from synapse.storage import background_updates
2121 from synapse.storage._base import SQLBaseStore
2222 from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
23
24 from six.moves import range
2325
2426
2527 class RegistrationWorkerStore(SQLBaseStore):
468470 match = regex.search(user_id)
469471 if match:
470472 found.add(int(match.group(1)))
471 for i in xrange(len(found) + 1):
473 for i in range(len(found) + 1):
472474 if i not in found:
473475 return i
474476
523525 except self.database_engine.module.IntegrityError:
524526 ret = yield self.get_3pid_guest_access_token(medium, address)
525527 defer.returnValue(ret)
528
529 def add_user_pending_deactivation(self, user_id):
530 """
531 Adds a user to the table of users who need to be parted from all the rooms they're
532 in
533 """
534 return self._simple_insert(
535 "users_pending_deactivation",
536 values={
537 "user_id": user_id,
538 },
539 desc="add_user_pending_deactivation",
540 )
541
542 def del_user_pending_deactivation(self, user_id):
543 """
544 Removes the given user to the table of users who need to be parted from all the
545 rooms they're in, effectively marking that user as fully deactivated.
546 """
547 return self._simple_delete_one(
548 "users_pending_deactivation",
549 keyvalues={
550 "user_id": user_id,
551 },
552 desc="del_user_pending_deactivation",
553 )
554
555 def get_user_pending_deactivation(self):
556 """
557 Gets one user from the table of users waiting to be parted from all the rooms
558 they're in.
559 """
560 return self._simple_select_one_onecol(
561 "users_pending_deactivation",
562 keyvalues={},
563 retcol="user_id",
564 allow_none=True,
565 desc="get_users_pending_deactivation",
566 )
1212 # limitations under the License.
1313 import logging
1414 from synapse.config.appservice import load_appservices
15
16 from six.moves import range
1517
1618
1719 logger = logging.getLogger(__name__)
5759
5860 for as_id, user_ids in owned.items():
5961 n = 100
60 user_chunks = (user_ids[i:i + 100] for i in xrange(0, len(user_ids), n))
62 user_chunks = (user_ids[i:i + 100] for i in range(0, len(user_ids), n))
6163 for chunk in user_chunks:
6264 cur.execute(
6365 database_engine.convert_param_style(
0 /* Copyright 2018 New Vector Ltd
1 *
2 * Licensed under the Apache License, Version 2.0 (the "License");
3 * you may not use this file except in compliance with the License.
4 * You may obtain a copy of the License at
5 *
6 * http://www.apache.org/licenses/LICENSE-2.0
7 *
8 * Unless required by applicable law or agreed to in writing, software
9 * distributed under the License is distributed on an "AS IS" BASIS,
10 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 * See the License for the specific language governing permissions and
12 * limitations under the License.
13 */
14
15 /*
16 * Store any accounts that have been requested to be deactivated.
17 * We part the account from all the rooms its in when its
18 * deactivated. This can take some time and synapse may be restarted
19 * before it completes, so store the user IDs here until the process
20 * is complete.
21 */
22 CREATE TABLE users_pending_deactivation (
23 user_id TEXT NOT NULL
24 );
0 # Copyright 2018 New Vector Ltd
1 #
2 # Licensed under the Apache License, Version 2.0 (the "License");
3 # you may not use this file except in compliance with the License.
4 # You may obtain a copy of the License at
5 #
6 # http://www.apache.org/licenses/LICENSE-2.0
7 #
8 # Unless required by applicable law or agreed to in writing, software
9 # distributed under the License is distributed on an "AS IS" BASIS,
10 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 # See the License for the specific language governing permissions and
12 # limitations under the License.
13
14 from synapse.storage.engines import PostgresEngine
15 from synapse.storage.prepare_database import get_statements
16
17 FIX_INDEXES = """
18 -- rebuild indexes as uniques
19 DROP INDEX groups_invites_g_idx;
20 CREATE UNIQUE INDEX group_invites_g_idx ON group_invites(group_id, user_id);
21 DROP INDEX groups_users_g_idx;
22 CREATE UNIQUE INDEX group_users_g_idx ON group_users(group_id, user_id);
23
24 -- rename other indexes to actually match their table names..
25 DROP INDEX groups_users_u_idx;
26 CREATE INDEX group_users_u_idx ON group_users(user_id);
27 DROP INDEX groups_invites_u_idx;
28 CREATE INDEX group_invites_u_idx ON group_invites(user_id);
29 DROP INDEX groups_rooms_g_idx;
30 CREATE UNIQUE INDEX group_rooms_g_idx ON group_rooms(group_id, room_id);
31 DROP INDEX groups_rooms_r_idx;
32 CREATE INDEX group_rooms_r_idx ON group_rooms(room_id);
33 """
34
35
36 def run_create(cur, database_engine, *args, **kwargs):
37 rowid = "ctid" if isinstance(database_engine, PostgresEngine) else "rowid"
38
39 # remove duplicates from group_users & group_invites tables
40 cur.execute("""
41 DELETE FROM group_users WHERE %s NOT IN (
42 SELECT min(%s) FROM group_users GROUP BY group_id, user_id
43 );
44 """ % (rowid, rowid))
45 cur.execute("""
46 DELETE FROM group_invites WHERE %s NOT IN (
47 SELECT min(%s) FROM group_invites GROUP BY group_id, user_id
48 );
49 """ % (rowid, rowid))
50
51 for statement in get_statements(FIX_INDEXES.splitlines()):
52 cur.execute(statement)
53
54
55 def run_upgrade(*args, **kwargs):
56 pass
3737 from synapse.storage._base import SQLBaseStore
3838 from synapse.storage.events import EventsWorkerStore
3939
40 from synapse.util.caches.descriptors import cached
4140 from synapse.types import RoomStreamToken
4241 from synapse.util.caches.stream_change_cache import StreamChangeCache
43 from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
44 from synapse.storage.engines import PostgresEngine, Sqlite3Engine
42 from synapse.util.logcontext import make_deferred_yieldable, run_in_background
43 from synapse.storage.engines import PostgresEngine
4544
4645 import abc
4746 import logging
4847
48 from six.moves import range
49 from collections import namedtuple
50
4951
5052 logger = logging.getLogger(__name__)
5153
5557
5658 _STREAM_TOKEN = "stream"
5759 _TOPOLOGICAL_TOKEN = "topological"
60
61
62 # Used as return values for pagination APIs
63 _EventDictReturn = namedtuple("_EventDictReturn", (
64 "event_id", "topological_ordering", "stream_ordering",
65 ))
5866
5967
6068 def lower_bound(token, engine, inclusive=False):
195203
196204 results = {}
197205 room_ids = list(room_ids)
198 for rm_ids in (room_ids[i:i + 20] for i in xrange(0, len(room_ids), 20)):
206 for rm_ids in (room_ids[i:i + 20] for i in range(0, len(room_ids), 20)):
199207 res = yield make_deferred_yieldable(defer.gatherResults([
200 preserve_fn(self.get_room_events_stream_for_room)(
208 run_in_background(
209 self.get_room_events_stream_for_room,
201210 room_id, from_key, to_key, limit, order=order,
202211 )
203212 for room_id in rm_ids
204 ]))
213 ], consumeErrors=True))
205214 results.update(dict(zip(rm_ids, res)))
206215
207216 defer.returnValue(results)
223232 @defer.inlineCallbacks
224233 def get_room_events_stream_for_room(self, room_id, from_key, to_key, limit=0,
225234 order='DESC'):
226 # Note: If from_key is None then we return in topological order. This
227 # is because in that case we're using this as a "get the last few messages
228 # in a room" function, rather than "get new messages since last sync"
229 if from_key is not None:
230 from_id = RoomStreamToken.parse_stream_token(from_key).stream
231 else:
232 from_id = None
233 to_id = RoomStreamToken.parse_stream_token(to_key).stream
234
235
236 """Get new room events in stream ordering since `from_key`.
237
238 Args:
239 room_id (str)
240 from_key (str): Token from which no events are returned before
241 to_key (str): Token from which no events are returned after. (This
242 is typically the current stream token)
243 limit (int): Maximum number of events to return
244 order (str): Either "DESC" or "ASC". Determines which events are
245 returned when the result is limited. If "DESC" then the most
246 recent `limit` events are returned, otherwise returns the
247 oldest `limit` events.
248
249 Returns:
250 Deferred[tuple[list[FrozenEvent], str]]: Returns the list of
251 events (in ascending order) and the token from the start of
252 the chunk of events returned.
253 """
235254 if from_key == to_key:
236255 defer.returnValue(([], from_key))
237256
238 if from_id:
239 has_changed = yield self._events_stream_cache.has_entity_changed(
240 room_id, from_id
241 )
242
243 if not has_changed:
244 defer.returnValue(([], from_key))
257 from_id = RoomStreamToken.parse_stream_token(from_key).stream
258 to_id = RoomStreamToken.parse_stream_token(to_key).stream
259
260 has_changed = yield self._events_stream_cache.has_entity_changed(
261 room_id, from_id
262 )
263
264 if not has_changed:
265 defer.returnValue(([], from_key))
245266
246267 def f(txn):
247 if from_id is not None:
248 sql = (
249 "SELECT event_id, stream_ordering FROM events WHERE"
250 " room_id = ?"
251 " AND not outlier"
252 " AND stream_ordering > ? AND stream_ordering <= ?"
253 " ORDER BY stream_ordering %s LIMIT ?"
254 ) % (order,)
255 txn.execute(sql, (room_id, from_id, to_id, limit))
256 else:
257 sql = (
258 "SELECT event_id, stream_ordering FROM events WHERE"
259 " room_id = ?"
260 " AND not outlier"
261 " AND stream_ordering <= ?"
262 " ORDER BY topological_ordering %s, stream_ordering %s LIMIT ?"
263 ) % (order, order,)
264 txn.execute(sql, (room_id, to_id, limit))
265
266 rows = self.cursor_to_dict(txn)
267
268 sql = (
269 "SELECT event_id, stream_ordering FROM events WHERE"
270 " room_id = ?"
271 " AND not outlier"
272 " AND stream_ordering > ? AND stream_ordering <= ?"
273 " ORDER BY stream_ordering %s LIMIT ?"
274 ) % (order,)
275 txn.execute(sql, (room_id, from_id, to_id, limit))
276
277 rows = [_EventDictReturn(row[0], None, row[1]) for row in txn]
268278 return rows
269279
270280 rows = yield self.runInteraction("get_room_events_stream_for_room", f)
271281
272282 ret = yield self._get_events(
273 [r["event_id"] for r in rows],
283 [r.event_id for r in rows],
274284 get_prev_content=True
275285 )
276286
280290 ret.reverse()
281291
282292 if rows:
283 key = "s%d" % min(r["stream_ordering"] for r in rows)
293 key = "s%d" % min(r.stream_ordering for r in rows)
284294 else:
285295 # Assume we didn't get anything because there was nothing to
286296 # get.
290300
291301 @defer.inlineCallbacks
292302 def get_membership_changes_for_user(self, user_id, from_key, to_key):
293 if from_key is not None:
294 from_id = RoomStreamToken.parse_stream_token(from_key).stream
295 else:
296 from_id = None
303 from_id = RoomStreamToken.parse_stream_token(from_key).stream
297304 to_id = RoomStreamToken.parse_stream_token(to_key).stream
298305
299306 if from_key == to_key:
307314 defer.returnValue([])
308315
309316 def f(txn):
310 if from_id is not None:
311 sql = (
312 "SELECT m.event_id, stream_ordering FROM events AS e,"
313 " room_memberships AS m"
314 " WHERE e.event_id = m.event_id"
315 " AND m.user_id = ?"
316 " AND e.stream_ordering > ? AND e.stream_ordering <= ?"
317 " ORDER BY e.stream_ordering ASC"
318 )
319 txn.execute(sql, (user_id, from_id, to_id,))
320 else:
321 sql = (
322 "SELECT m.event_id, stream_ordering FROM events AS e,"
323 " room_memberships AS m"
324 " WHERE e.event_id = m.event_id"
325 " AND m.user_id = ?"
326 " AND stream_ordering <= ?"
327 " ORDER BY stream_ordering ASC"
328 )
329 txn.execute(sql, (user_id, to_id,))
330 rows = self.cursor_to_dict(txn)
317 sql = (
318 "SELECT m.event_id, stream_ordering FROM events AS e,"
319 " room_memberships AS m"
320 " WHERE e.event_id = m.event_id"
321 " AND m.user_id = ?"
322 " AND e.stream_ordering > ? AND e.stream_ordering <= ?"
323 " ORDER BY e.stream_ordering ASC"
324 )
325 txn.execute(sql, (user_id, from_id, to_id,))
326
327 rows = [_EventDictReturn(row[0], None, row[1]) for row in txn]
331328
332329 return rows
333330
334331 rows = yield self.runInteraction("get_membership_changes_for_user", f)
335332
336333 ret = yield self._get_events(
337 [r["event_id"] for r in rows],
334 [r.event_id for r in rows],
338335 get_prev_content=True
339336 )
340337
343340 defer.returnValue(ret)
344341
345342 @defer.inlineCallbacks
346 def get_recent_events_for_room(self, room_id, limit, end_token, from_token=None):
343 def get_recent_events_for_room(self, room_id, limit, end_token):
344 """Get the most recent events in the room in topological ordering.
345
346 Args:
347 room_id (str)
348 limit (int)
349 end_token (str): The stream token representing now.
350
351 Returns:
352 Deferred[tuple[list[FrozenEvent], str]]: Returns a list of
353 events and a token pointing to the start of the returned
354 events.
355 The events returned are in ascending order.
356 """
357
347358 rows, token = yield self.get_recent_event_ids_for_room(
348 room_id, limit, end_token, from_token
359 room_id, limit, end_token,
349360 )
350361
351362 logger.debug("stream before")
352363 events = yield self._get_events(
353 [r["event_id"] for r in rows],
364 [r.event_id for r in rows],
354365 get_prev_content=True
355366 )
356367 logger.debug("stream after")
359370
360371 defer.returnValue((events, token))
361372
362 @cached(num_args=4)
363 def get_recent_event_ids_for_room(self, room_id, limit, end_token, from_token=None):
364 end_token = RoomStreamToken.parse_stream_token(end_token)
365
366 if from_token is None:
367 sql = (
368 "SELECT stream_ordering, topological_ordering, event_id"
369 " FROM events"
370 " WHERE room_id = ? AND stream_ordering <= ? AND outlier = ?"
371 " ORDER BY topological_ordering DESC, stream_ordering DESC"
372 " LIMIT ?"
373 )
374 else:
375 from_token = RoomStreamToken.parse_stream_token(from_token)
376 sql = (
377 "SELECT stream_ordering, topological_ordering, event_id"
378 " FROM events"
379 " WHERE room_id = ? AND stream_ordering > ?"
380 " AND stream_ordering <= ? AND outlier = ?"
381 " ORDER BY topological_ordering DESC, stream_ordering DESC"
382 " LIMIT ?"
383 )
384
385 def get_recent_events_for_room_txn(txn):
386 if from_token is None:
387 txn.execute(sql, (room_id, end_token.stream, False, limit,))
388 else:
389 txn.execute(sql, (
390 room_id, from_token.stream, end_token.stream, False, limit
391 ))
392
393 rows = self.cursor_to_dict(txn)
394
395 rows.reverse() # As we selected with reverse ordering
396
397 if rows:
398 # Tokens are positions between events.
399 # This token points *after* the last event in the chunk.
400 # We need it to point to the event before it in the chunk
401 # since we are going backwards so we subtract one from the
402 # stream part.
403 topo = rows[0]["topological_ordering"]
404 toke = rows[0]["stream_ordering"] - 1
405 start_token = str(RoomStreamToken(topo, toke))
406
407 token = (start_token, str(end_token))
408 else:
409 token = (str(end_token), str(end_token))
410
411 return rows, token
412
413 return self.runInteraction(
414 "get_recent_events_for_room", get_recent_events_for_room_txn
415 )
373 @defer.inlineCallbacks
374 def get_recent_event_ids_for_room(self, room_id, limit, end_token):
375 """Get the most recent events in the room in topological ordering.
376
377 Args:
378 room_id (str)
379 limit (int)
380 end_token (str): The stream token representing now.
381
382 Returns:
383 Deferred[tuple[list[_EventDictReturn], str]]: Returns a list of
384 _EventDictReturn and a token pointing to the start of the returned
385 events.
386 The events returned are in ascending order.
387 """
388 # Allow a zero limit here, and no-op.
389 if limit == 0:
390 defer.returnValue(([], end_token))
391
392 end_token = RoomStreamToken.parse(end_token)
393
394 rows, token = yield self.runInteraction(
395 "get_recent_event_ids_for_room", self._paginate_room_events_txn,
396 room_id, from_token=end_token, limit=limit,
397 )
398
399 # We want to return the results in ascending order.
400 rows.reverse()
401
402 defer.returnValue((rows, token))
416403
417404 def get_room_event_after_stream_ordering(self, room_id, stream_ordering):
418405 """Gets details of the first event in a room at or after a stream ordering
516503
517504 @staticmethod
518505 def _set_before_and_after(events, rows, topo_order=True):
506 """Inserts ordering information to events' internal metadata from
507 the DB rows.
508
509 Args:
510 events (list[FrozenEvent])
511 rows (list[_EventDictReturn])
512 topo_order (bool): Whether the events were ordered topologically
513 or by stream ordering. If true then all rows should have a non
514 null topological_ordering.
515 """
519516 for event, row in zip(events, rows):
520 stream = row["stream_ordering"]
521 if topo_order:
522 topo = event.depth
517 stream = row.stream_ordering
518 if topo_order and row.topological_ordering:
519 topo = row.topological_ordering
523520 else:
524521 topo = None
525522 internal = event.internal_metadata
591588 retcols=["stream_ordering", "topological_ordering"],
592589 )
593590
594 token = RoomStreamToken(
591 # Paginating backwards includes the event at the token, but paginating
592 # forward doesn't.
593 before_token = RoomStreamToken(
594 results["topological_ordering"] - 1,
595 results["stream_ordering"],
596 )
597
598 after_token = RoomStreamToken(
595599 results["topological_ordering"],
596600 results["stream_ordering"],
597601 )
598602
599 if isinstance(self.database_engine, Sqlite3Engine):
600 # SQLite3 doesn't optimise ``(x < a) OR (x = a AND y < b)``
601 # So we give pass it to SQLite3 as the UNION ALL of the two queries.
602
603 query_before = (
604 "SELECT topological_ordering, stream_ordering, event_id FROM events"
605 " WHERE room_id = ? AND topological_ordering < ?"
606 " UNION ALL"
607 " SELECT topological_ordering, stream_ordering, event_id FROM events"
608 " WHERE room_id = ? AND topological_ordering = ? AND stream_ordering < ?"
609 " ORDER BY topological_ordering DESC, stream_ordering DESC LIMIT ?"
610 )
611 before_args = (
612 room_id, token.topological,
613 room_id, token.topological, token.stream,
614 before_limit,
615 )
616
617 query_after = (
618 "SELECT topological_ordering, stream_ordering, event_id FROM events"
619 " WHERE room_id = ? AND topological_ordering > ?"
620 " UNION ALL"
621 " SELECT topological_ordering, stream_ordering, event_id FROM events"
622 " WHERE room_id = ? AND topological_ordering = ? AND stream_ordering > ?"
623 " ORDER BY topological_ordering ASC, stream_ordering ASC LIMIT ?"
624 )
625 after_args = (
626 room_id, token.topological,
627 room_id, token.topological, token.stream,
628 after_limit,
629 )
630 else:
631 query_before = (
632 "SELECT topological_ordering, stream_ordering, event_id FROM events"
633 " WHERE room_id = ? AND %s"
634 " ORDER BY topological_ordering DESC, stream_ordering DESC LIMIT ?"
635 ) % (upper_bound(token, self.database_engine, inclusive=False),)
636
637 before_args = (room_id, before_limit)
638
639 query_after = (
640 "SELECT topological_ordering, stream_ordering, event_id FROM events"
641 " WHERE room_id = ? AND %s"
642 " ORDER BY topological_ordering ASC, stream_ordering ASC LIMIT ?"
643 ) % (lower_bound(token, self.database_engine, inclusive=False),)
644
645 after_args = (room_id, after_limit)
646
647 txn.execute(query_before, before_args)
648
649 rows = self.cursor_to_dict(txn)
650 events_before = [r["event_id"] for r in rows]
651
652 if rows:
653 start_token = str(RoomStreamToken(
654 rows[0]["topological_ordering"],
655 rows[0]["stream_ordering"] - 1,
656 ))
657 else:
658 start_token = str(RoomStreamToken(
659 token.topological,
660 token.stream - 1,
661 ))
662
663 txn.execute(query_after, after_args)
664
665 rows = self.cursor_to_dict(txn)
666 events_after = [r["event_id"] for r in rows]
667
668 if rows:
669 end_token = str(RoomStreamToken(
670 rows[-1]["topological_ordering"],
671 rows[-1]["stream_ordering"],
672 ))
673 else:
674 end_token = str(token)
603 rows, start_token = self._paginate_room_events_txn(
604 txn, room_id, before_token, direction='b', limit=before_limit,
605 )
606 events_before = [r.event_id for r in rows]
607
608 rows, end_token = self._paginate_room_events_txn(
609 txn, room_id, after_token, direction='f', limit=after_limit,
610 )
611 events_after = [r.event_id for r in rows]
675612
676613 return {
677614 "before": {
734671 def has_room_changed_since(self, room_id, stream_id):
735672 return self._events_stream_cache.has_entity_changed(room_id, stream_id)
736673
737
738 class StreamStore(StreamWorkerStore):
739 def get_room_max_stream_ordering(self):
740 return self._stream_id_gen.get_current_token()
741
742 def get_room_min_stream_ordering(self):
743 return self._backfill_id_gen.get_current_token()
744
745 @defer.inlineCallbacks
746 def paginate_room_events(self, room_id, from_key, to_key=None,
747 direction='b', limit=-1, event_filter=None):
674 def _paginate_room_events_txn(self, txn, room_id, from_token, to_token=None,
675 direction='b', limit=-1, event_filter=None):
676 """Returns list of events before or after a given token.
677
678 Args:
679 txn
680 room_id (str)
681 from_token (RoomStreamToken): The token used to stream from
682 to_token (RoomStreamToken|None): A token which if given limits the
683 results to only those before
684 direction(char): Either 'b' or 'f' to indicate whether we are
685 paginating forwards or backwards from `from_key`.
686 limit (int): The maximum number of events to return. Zero or less
687 means no limit.
688 event_filter (Filter|None): If provided filters the events to
689 those that match the filter.
690
691 Returns:
692 Deferred[tuple[list[_EventDictReturn], str]]: Returns the results
693 as a list of _EventDictReturn and a token that points to the end
694 of the result set.
695 """
748696 # Tokens really represent positions between elements, but we use
749697 # the convention of pointing to the event before the gap. Hence
750698 # we have a bit of asymmetry when it comes to equalities.
752700 if direction == 'b':
753701 order = "DESC"
754702 bounds = upper_bound(
755 RoomStreamToken.parse(from_key), self.database_engine
756 )
757 if to_key:
703 from_token, self.database_engine
704 )
705 if to_token:
758706 bounds = "%s AND %s" % (bounds, lower_bound(
759 RoomStreamToken.parse(to_key), self.database_engine
707 to_token, self.database_engine
760708 ))
761709 else:
762710 order = "ASC"
763711 bounds = lower_bound(
764 RoomStreamToken.parse(from_key), self.database_engine
765 )
766 if to_key:
712 from_token, self.database_engine
713 )
714 if to_token:
767715 bounds = "%s AND %s" % (bounds, upper_bound(
768 RoomStreamToken.parse(to_key), self.database_engine
716 to_token, self.database_engine
769717 ))
770718
771719 filter_clause, filter_args = filter_to_clause(event_filter)
781729 limit_str = ""
782730
783731 sql = (
784 "SELECT * FROM events"
732 "SELECT event_id, topological_ordering, stream_ordering"
733 " FROM events"
785734 " WHERE outlier = ? AND room_id = ? AND %(bounds)s"
786735 " ORDER BY topological_ordering %(order)s,"
787736 " stream_ordering %(order)s %(limit)s"
791740 "limit": limit_str
792741 }
793742
794 def f(txn):
795 txn.execute(sql, args)
796
797 rows = self.cursor_to_dict(txn)
798
799 if rows:
800 topo = rows[-1]["topological_ordering"]
801 toke = rows[-1]["stream_ordering"]
802 if direction == 'b':
803 # Tokens are positions between events.
804 # This token points *after* the last event in the chunk.
805 # We need it to point to the event before it in the chunk
806 # when we are going backwards so we subtract one from the
807 # stream part.
808 toke -= 1
809 next_token = str(RoomStreamToken(topo, toke))
810 else:
811 # TODO (erikj): We should work out what to do here instead.
812 next_token = to_key if to_key else from_key
813
814 return rows, next_token,
815
816 rows, token = yield self.runInteraction("paginate_room_events", f)
743 txn.execute(sql, args)
744
745 rows = [_EventDictReturn(row[0], row[1], row[2]) for row in txn]
746
747 if rows:
748 topo = rows[-1].topological_ordering
749 toke = rows[-1].stream_ordering
750 if direction == 'b':
751 # Tokens are positions between events.
752 # This token points *after* the last event in the chunk.
753 # We need it to point to the event before it in the chunk
754 # when we are going backwards so we subtract one from the
755 # stream part.
756 toke -= 1
757 next_token = RoomStreamToken(topo, toke)
758 else:
759 # TODO (erikj): We should work out what to do here instead.
760 next_token = to_token if to_token else from_token
761
762 return rows, str(next_token),
763
764 @defer.inlineCallbacks
765 def paginate_room_events(self, room_id, from_key, to_key=None,
766 direction='b', limit=-1, event_filter=None):
767 """Returns list of events before or after a given token.
768
769 Args:
770 room_id (str)
771 from_key (str): The token used to stream from
772 to_key (str|None): A token which if given limits the results to
773 only those before
774 direction(char): Either 'b' or 'f' to indicate whether we are
775 paginating forwards or backwards from `from_key`.
776 limit (int): The maximum number of events to return. Zero or less
777 means no limit.
778 event_filter (Filter|None): If provided filters the events to
779 those that match the filter.
780
781 Returns:
782 tuple[list[dict], str]: Returns the results as a list of dicts and
783 a token that points to the end of the result set. The dicts have
784 the keys "event_id", "topological_ordering" and "stream_orderign".
785 """
786
787 from_key = RoomStreamToken.parse(from_key)
788 if to_key:
789 to_key = RoomStreamToken.parse(to_key)
790
791 rows, token = yield self.runInteraction(
792 "paginate_room_events", self._paginate_room_events_txn,
793 room_id, from_key, to_key, direction, limit, event_filter,
794 )
817795
818796 events = yield self._get_events(
819 [r["event_id"] for r in rows],
797 [r.event_id for r in rows],
820798 get_prev_content=True
821799 )
822800
823801 self._set_before_and_after(events, rows)
824802
825803 defer.returnValue((events, token))
804
805
806 class StreamStore(StreamWorkerStore):
807 def get_room_max_stream_ordering(self):
808 return self._stream_id_gen.get_current_token()
809
810 def get_room_min_stream_ordering(self):
811 return self._backfill_id_gen.get_current_token()
2121 import simplejson as json
2222 import logging
2323
24 from six.moves import range
25
2426 logger = logging.getLogger(__name__)
2527
2628
9799
98100 batch_size = 50
99101 results = []
100 for i in xrange(0, len(tag_ids), batch_size):
102 for i in range(0, len(tag_ids), batch_size):
101103 tags = yield self.runInteraction(
102104 "get_all_updated_tag_content",
103105 get_tag_content,
1212 # See the License for the specific language governing permissions and
1313 # limitations under the License.
1414
15 from synapse.api.errors import SynapseError
1615 from synapse.util.logcontext import PreserveLoggingContext
1716
1817 from twisted.internet import defer, reactor, task
2120 import logging
2221
2322 logger = logging.getLogger(__name__)
24
25
26 class DeferredTimedOutError(SynapseError):
27 def __init__(self):
28 super(DeferredTimedOutError, self).__init__(504, "Timed out")
2923
3024
3125 def unwrapFirstError(failure):
8478 except Exception:
8579 if not ignore_errs:
8680 raise
87
88 def time_bound_deferred(self, given_deferred, time_out):
89 if given_deferred.called:
90 return given_deferred
91
92 ret_deferred = defer.Deferred()
93
94 def timed_out_fn():
95 e = DeferredTimedOutError()
96
97 try:
98 ret_deferred.errback(e)
99 except Exception:
100 pass
101
102 try:
103 given_deferred.cancel()
104 except Exception:
105 pass
106
107 timer = None
108
109 def cancel(res):
110 try:
111 self.cancel_call_later(timer)
112 except Exception:
113 pass
114 return res
115
116 ret_deferred.addBoth(cancel)
117
118 def success(res):
119 try:
120 ret_deferred.callback(res)
121 except Exception:
122 pass
123
124 return res
125
126 def err(res):
127 try:
128 ret_deferred.errback(res)
129 except Exception:
130 pass
131
132 given_deferred.addCallbacks(callback=success, errback=err)
133
134 timer = self.call_later(time_out, timed_out_fn)
135
136 return ret_deferred
1414
1515
1616 from twisted.internet import defer, reactor
17 from twisted.internet.defer import CancelledError
18 from twisted.python import failure
1719
1820 from .logcontext import (
19 PreserveLoggingContext, make_deferred_yieldable, preserve_fn
21 PreserveLoggingContext, make_deferred_yieldable, run_in_background
2022 )
2123 from synapse.util import logcontext, unwrapFirstError
2224
2325 from contextlib import contextmanager
2426
2527 import logging
28
29 from six.moves import range
2630
2731 logger = logging.getLogger(__name__)
2832
155159 def _concurrently_execute_inner():
156160 try:
157161 while True:
158 yield func(it.next())
162 yield func(next(it))
159163 except StopIteration:
160164 pass
161165
162166 return logcontext.make_deferred_yieldable(defer.gatherResults([
163 preserve_fn(_concurrently_execute_inner)()
164 for _ in xrange(limit)
167 run_in_background(_concurrently_execute_inner)
168 for _ in range(limit)
165169 ], consumeErrors=True)).addErrback(unwrapFirstError)
166170
167171
391395 self.key_to_current_writer.pop(key)
392396
393397 defer.returnValue(_ctx_manager())
398
399
400 class DeferredTimeoutError(Exception):
401 """
402 This error is raised by default when a L{Deferred} times out.
403 """
404
405
406 def add_timeout_to_deferred(deferred, timeout, on_timeout_cancel=None):
407 """
408 Add a timeout to a deferred by scheduling it to be cancelled after
409 timeout seconds.
410
411 This is essentially a backport of deferred.addTimeout, which was introduced
412 in twisted 16.5.
413
414 If the deferred gets timed out, it errbacks with a DeferredTimeoutError,
415 unless a cancelable function was passed to its initialization or unless
416 a different on_timeout_cancel callable is provided.
417
418 Args:
419 deferred (defer.Deferred): deferred to be timed out
420 timeout (Number): seconds to time out after
421
422 on_timeout_cancel (callable): A callable which is called immediately
423 after the deferred times out, and not if this deferred is
424 otherwise cancelled before the timeout.
425
426 It takes an arbitrary value, which is the value of the deferred at
427 that exact point in time (probably a CancelledError Failure), and
428 the timeout.
429
430 The default callable (if none is provided) will translate a
431 CancelledError Failure into a DeferredTimeoutError.
432 """
433 timed_out = [False]
434
435 def time_it_out():
436 timed_out[0] = True
437 deferred.cancel()
438
439 delayed_call = reactor.callLater(timeout, time_it_out)
440
441 def convert_cancelled(value):
442 if timed_out[0]:
443 to_call = on_timeout_cancel or _cancelled_to_timed_out_error
444 return to_call(value, timeout)
445 return value
446
447 deferred.addBoth(convert_cancelled)
448
449 def cancel_timeout(result):
450 # stop the pending call to cancel the deferred if it's been fired
451 if delayed_call.active():
452 delayed_call.cancel()
453 return result
454
455 deferred.addBoth(cancel_timeout)
456
457
458 def _cancelled_to_timed_out_error(value, timeout):
459 if isinstance(value, failure.Failure):
460 value.trap(CancelledError)
461 raise DeferredTimeoutError(timeout, "Deferred")
462 return value
1414
1515 from twisted.internet import threads, reactor
1616
17 from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
17 from synapse.util.logcontext import make_deferred_yieldable, run_in_background
1818
1919 from six.moves import queue
2020
6969
7070 self._producer = producer
7171 self.streaming = streaming
72 self._finished_deferred = preserve_fn(threads.deferToThread)(self._writer)
72 self._finished_deferred = run_in_background(
73 threads.deferToThread, self._writer
74 )
7375 if not streaming:
7476 self._producer.resumeProducing()
7577
3939 # extra resources to existing nodes. See self._resource_id for the key.
4040 resource_mappings = {}
4141 for full_path, res in desired_tree.items():
42 # twisted requires all resources to be bytes
43 full_path = full_path.encode("utf-8")
44
4245 logger.info("Attaching %s to path %s", res, full_path)
4346 last_resource = root_resource
44 for path_seg in full_path.split('/')[1:-1]:
47 for path_seg in full_path.split(b'/')[1:-1]:
4548 if path_seg not in last_resource.listNames():
4649 # resource doesn't exist, so make a "dummy resource"
4750 child_resource = NoResource()
5659
5760 # ===========================
5861 # now attach the actual desired resource
59 last_path_seg = full_path.split('/')[-1]
62 last_path_seg = full_path.split(b'/')[-1]
6063
6164 # if there is already a resource here, thieve its children and
6265 # replace it
163163 current = self.set_current_context(self.previous_context)
164164 if current is not self:
165165 if current is self.sentinel:
166 logger.debug("Expected logging context %s has been lost", self)
166 logger.warn("Expected logging context %s has been lost", self)
167167 else:
168168 logger.warn(
169169 "Current logging context %s is not expected context %s",
278278 context = LoggingContext.set_current_context(self.current_context)
279279
280280 if context != self.new_context:
281 logger.debug(
281 logger.warn(
282282 "Unexpected logging context: %s is not %s",
283283 context, self.new_context,
284284 )
301301 def run_in_background(f, *args, **kwargs):
302302 """Calls a function, ensuring that the current context is restored after
303303 return from the function, and that the sentinel context is set once the
304 deferred returned by the funtion completes.
304 deferred returned by the function completes.
305305
306306 Useful for wrapping functions that return a deferred which you don't yield
307 on.
307 on (for instance because you want to pass it to deferred.gatherResults()).
308
309 Note that if you completely discard the result, you should make sure that
310 `f` doesn't raise any deferred exceptions, otherwise a scary-looking
311 CRITICAL error about an unhandled error will be logged without much
312 indication about where it came from.
308313 """
309314 current = LoggingContext.current_context()
310 res = f(*args, **kwargs)
311 if isinstance(res, defer.Deferred) and not res.called:
312 # The function will have reset the context before returning, so
313 # we need to restore it now.
314 LoggingContext.set_current_context(current)
315
316 # The original context will be restored when the deferred
317 # completes, but there is nothing waiting for it, so it will
318 # get leaked into the reactor or some other function which
319 # wasn't expecting it. We therefore need to reset the context
320 # here.
321 #
322 # (If this feels asymmetric, consider it this way: we are
323 # effectively forking a new thread of execution. We are
324 # probably currently within a ``with LoggingContext()`` block,
325 # which is supposed to have a single entry and exit point. But
326 # by spawning off another deferred, we are effectively
327 # adding a new exit point.)
328 res.addBoth(_set_context_cb, LoggingContext.sentinel)
315 try:
316 res = f(*args, **kwargs)
317 except: # noqa: E722
318 # the assumption here is that the caller doesn't want to be disturbed
319 # by synchronous exceptions, so let's turn them into Failures.
320 return defer.fail()
321
322 if not isinstance(res, defer.Deferred):
323 return res
324
325 if res.called and not res.paused:
326 # The function should have maintained the logcontext, so we can
327 # optimise out the messing about
328 return res
329
330 # The function may have reset the context before returning, so
331 # we need to restore it now.
332 ctx = LoggingContext.set_current_context(current)
333
334 # The original context will be restored when the deferred
335 # completes, but there is nothing waiting for it, so it will
336 # get leaked into the reactor or some other function which
337 # wasn't expecting it. We therefore need to reset the context
338 # here.
339 #
340 # (If this feels asymmetric, consider it this way: we are
341 # effectively forking a new thread of execution. We are
342 # probably currently within a ``with LoggingContext()`` block,
343 # which is supposed to have a single entry and exit point. But
344 # by spawning off another deferred, we are effectively
345 # adding a new exit point.)
346 res.addBoth(_set_context_cb, ctx)
329347 return res
330348
331349
340358 returning a deferred. Then, when the deferred completes, restores the
341359 current logcontext before running callbacks/errbacks.
342360
343 (This is more-or-less the opposite operation to preserve_fn.)
361 (This is more-or-less the opposite operation to run_in_background.)
344362 """
345 if isinstance(deferred, defer.Deferred) and not deferred.called:
346 prev_context = LoggingContext.set_current_context(LoggingContext.sentinel)
347 deferred.addBoth(_set_context_cb, prev_context)
363 if not isinstance(deferred, defer.Deferred):
364 return deferred
365
366 if deferred.called and not deferred.paused:
367 # it looks like this deferred is ready to run any callbacks we give it
368 # immediately. We may as well optimise out the logcontext faffery.
369 return deferred
370
371 # ok, we can't be sure that a yield won't block, so let's reset the
372 # logcontext, and add a callback to the deferred to restore it.
373 prev_context = LoggingContext.set_current_context(LoggingContext.sentinel)
374 deferred.addBoth(_set_context_cb, prev_context)
348375 return deferred
349376
350377
1313 # limitations under the License.
1414
1515
16 import StringIO
16 from six import StringIO
1717 import logging
1818 import traceback
1919
3131 super(LogFormatter, self).__init__(*args, **kwargs)
3232
3333 def formatException(self, ei):
34 sio = StringIO.StringIO()
34 sio = StringIO()
3535 (typ, val, tb) = ei
3636
3737 # log the stack above the exception capture point if possible, but
1717 from synapse.api.errors import LimitExceededError
1818
1919 from synapse.util.async import sleep
20 from synapse.util.logcontext import preserve_fn
20 from synapse.util.logcontext import (
21 run_in_background, make_deferred_yieldable,
22 PreserveLoggingContext,
23 )
2124
2225 import collections
2326 import contextlib
149152 "Ratelimit [%s]: sleeping req",
150153 id(request_id),
151154 )
152 ret_defer = preserve_fn(sleep)(self.sleep_msec / 1000.0)
155 ret_defer = run_in_background(sleep, self.sleep_msec / 1000.0)
153156
154157 self.sleeping_requests.add(request_id)
155158
175178 return r
176179
177180 def on_err(r):
181 # XXX: why is this necessary? this is called before we start
182 # processing the request so why would the request be in
183 # current_processing?
178184 self.current_processing.discard(request_id)
179185 return r
180186
186192
187193 ret_defer.addCallbacks(on_start, on_err)
188194 ret_defer.addBoth(on_both)
189 return ret_defer
195 return make_deferred_yieldable(ret_defer)
190196
191197 def _on_exit(self, request_id):
192198 logger.debug(
196202 self.current_processing.discard(request_id)
197203 try:
198204 request_id, deferred = self.ready_request_queue.popitem()
205
206 # XXX: why do we do the following? the on_start callback above will
207 # do it for us.
199208 self.current_processing.add(request_id)
200 deferred.callback(None)
209
210 with PreserveLoggingContext():
211 deferred.callback(None)
201212 except KeyError:
202213 pass
202202 )
203203 except Exception:
204204 logger.exception(
205 "Failed to store set_destination_retry_timings",
205 "Failed to store destination_retry_timings",
206206 )
207207
208208 # we deliberately do this in the background.
209 synapse.util.logcontext.preserve_fn(store_retry_timings)()
209 synapse.util.logcontext.run_in_background(store_retry_timings)
1414
1515 import random
1616 import string
17 from six.moves import range
1718
1819 _string_with_symbols = (
1920 string.digits + string.ascii_letters + ".,;:^&*-_+=#~@"
2122
2223
2324 def random_string(length):
24 return ''.join(random.choice(string.ascii_letters) for _ in xrange(length))
25 return ''.join(random.choice(string.ascii_letters) for _ in range(length))
2526
2627
2728 def random_string_with_symbols(length):
2829 return ''.join(
29 random.choice(_string_with_symbols) for _ in xrange(length)
30 random.choice(_string_with_symbols) for _ in range(length)
3031 )
3132
3233
1111 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1212 # See the License for the specific language governing permissions and
1313 # limitations under the License.
14
15 from six.moves import range
1416
1517
1618 class _Entry(object):
6769 # Add empty entries between the end of the current list and when we want
6870 # to insert. This ensures there are no gaps.
6971 self.entries.extend(
70 _Entry(key) for key in xrange(last_key, then_key + 1)
72 _Entry(key) for key in range(last_key, then_key + 1)
7173 )
7274
7375 self.entries[-1].queue.append(obj)
1616 _ServiceQueuer, _TransactionController, _Recoverer
1717 )
1818 from twisted.internet import defer
19
20 from synapse.util.logcontext import make_deferred_yieldable
1921 from ..utils import MockClock
2022 from mock import Mock
2123 from tests import unittest
203205
204206 def test_send_single_event_with_queue(self):
205207 d = defer.Deferred()
206 self.txn_ctrl.send = Mock(return_value=d)
208 self.txn_ctrl.send = Mock(
209 side_effect=lambda x, y: make_deferred_yieldable(d),
210 )
207211 service = Mock(id=4)
208212 event = Mock(event_id="first")
209213 event2 = Mock(event_id="second")
234238 srv_2_event2 = Mock(event_id="srv2b")
235239
236240 send_return_list = [srv_1_defer, srv_2_defer]
237 self.txn_ctrl.send = Mock(side_effect=lambda x, y: send_return_list.pop(0))
241
242 def do_send(x, y):
243 return make_deferred_yieldable(send_return_list.pop(0))
244 self.txn_ctrl.send = Mock(side_effect=do_send)
238245
239246 # send events for different ASes and make sure they are sent
240247 self.queuer.enqueue(srv1, srv_1_event)
1515 from tests import unittest
1616
1717 from synapse.metrics.metric import (
18 CounterMetric, CallbackMetric, DistributionMetric, CacheMetric
18 CounterMetric, CallbackMetric, DistributionMetric, CacheMetric,
19 _escape_label_value,
1920 )
2021
2122
170171 'cache:size{name="cache_name"} 1',
171172 'cache:evicted_size{name="cache_name"} 2',
172173 ])
174
175
176 class LabelValueEscapeTestCase(unittest.TestCase):
177 def test_simple(self):
178 string = "safjhsdlifhyskljfksdfh"
179 self.assertEqual(string, _escape_label_value(string))
180
181 def test_escape(self):
182 self.assertEqual(
183 "abc\\\"def\\nghi\\\\",
184 _escape_label_value("abc\"def\nghi\\"),
185 )
186
187 def test_sequence_of_escapes(self):
188 self.assertEqual(
189 "abc\\\"def\\nghi\\\\\\n",
190 _escape_label_value("abc\"def\nghi\\\n"),
191 )
147147
148148 @defer.inlineCallbacks
149149 def test_stream_basic_permissions(self):
150 # invalid token, expect 403
150 # invalid token, expect 401
151 # note: this is in violation of the original v1 spec, which expected
152 # 403. However, since the v1 spec no longer exists and the v1
153 # implementation is now part of the r0 implementation, the newer
154 # behaviour is used instead to be consistent with the r0 spec.
155 # see issue #2602
151156 (code, response) = yield self.mock_resource.trigger_get(
152157 "/events?access_token=%s" % ("invalid" + self.token, )
153158 )
154 self.assertEquals(403, code, msg=str(response))
159 self.assertEquals(401, code, msg=str(response))
155160
156161 # valid token, expect content
157162 (code, response) = yield self.mock_resource.trigger_get(
5151 def _get_user_by_req(request=None, allow_guest=False):
5252 return synapse.types.create_requester(myid)
5353
54 hs.get_v1auth().get_user_by_req = _get_user_by_req
54 hs.get_auth().get_user_by_req = _get_user_by_req
5555
5656 profile.register_servlets(hs, self.mock_resource)
5757
2323 from synapse.types import UserID
2424
2525 import json
26 import urllib
26 from six.moves.urllib import parse as urlparse
2727
2828 from ....utils import MockHttpResource, setup_test_homeserver
2929 from .utils import RestTestCase
5959 "token_id": 1,
6060 "is_guest": False,
6161 }
62 hs.get_v1auth().get_user_by_access_token = get_user_by_access_token
62 hs.get_auth().get_user_by_access_token = get_user_by_access_token
6363
6464 def _insert_client_ip(*args, **kwargs):
6565 return defer.succeed(None)
6969
7070 synapse.rest.client.v1.room.register_servlets(hs, self.mock_resource)
7171
72 self.auth = hs.get_v1auth()
72 self.auth = hs.get_auth()
7373
7474 # create some rooms under the name rmcreator_id
7575 self.uncreated_rmid = "!aa:test"
424424 "token_id": 1,
425425 "is_guest": False,
426426 }
427 hs.get_v1auth().get_user_by_access_token = get_user_by_access_token
427 hs.get_auth().get_user_by_access_token = get_user_by_access_token
428428
429429 def _insert_client_ip(*args, **kwargs):
430430 return defer.succeed(None)
506506 "token_id": 1,
507507 "is_guest": False,
508508 }
509 hs.get_v1auth().get_user_by_access_token = get_user_by_access_token
509 hs.get_auth().get_user_by_access_token = get_user_by_access_token
510510
511511 def _insert_client_ip(*args, **kwargs):
512512 return defer.succeed(None)
596596 "is_guest": False,
597597 }
598598
599 hs.get_v1auth().get_user_by_access_token = get_user_by_access_token
599 hs.get_auth().get_user_by_access_token = get_user_by_access_token
600600
601601 def _insert_client_ip(*args, **kwargs):
602602 return defer.succeed(None)
710710 "token_id": 1,
711711 "is_guest": False,
712712 }
713 hs.get_v1auth().get_user_by_access_token = get_user_by_access_token
713 hs.get_auth().get_user_by_access_token = get_user_by_access_token
714714
715715 def _insert_client_ip(*args, **kwargs):
716716 return defer.succeed(None)
765765 @defer.inlineCallbacks
766766 def test_rooms_members_self(self):
767767 path = "/rooms/%s/state/m.room.member/%s" % (
768 urllib.quote(self.room_id), self.user_id
768 urlparse.quote(self.room_id), self.user_id
769769 )
770770
771771 # valid join message (NOOP since we made the room)
785785 def test_rooms_members_other(self):
786786 self.other_id = "@zzsid1:red"
787787 path = "/rooms/%s/state/m.room.member/%s" % (
788 urllib.quote(self.room_id), self.other_id
788 urlparse.quote(self.room_id), self.other_id
789789 )
790790
791791 # valid invite message
801801 def test_rooms_members_other_custom_keys(self):
802802 self.other_id = "@zzsid1:red"
803803 path = "/rooms/%s/state/m.room.member/%s" % (
804 urllib.quote(self.room_id), self.other_id
804 urlparse.quote(self.room_id), self.other_id
805805 )
806806
807807 # valid invite message with custom key
842842 "token_id": 1,
843843 "is_guest": False,
844844 }
845 hs.get_v1auth().get_user_by_access_token = get_user_by_access_token
845 hs.get_auth().get_user_by_access_token = get_user_by_access_token
846846
847847 def _insert_client_ip(*args, **kwargs):
848848 return defer.succeed(None)
858858 @defer.inlineCallbacks
859859 def test_invalid_puts(self):
860860 path = "/rooms/%s/send/m.room.message/mid1" % (
861 urllib.quote(self.room_id))
861 urlparse.quote(self.room_id))
862862 # missing keys or invalid json
863863 (code, response) = yield self.mock_resource.trigger(
864864 "PUT", path, '{}'
893893 @defer.inlineCallbacks
894894 def test_rooms_messages_sent(self):
895895 path = "/rooms/%s/send/m.room.message/mid1" % (
896 urllib.quote(self.room_id))
896 urlparse.quote(self.room_id))
897897
898898 content = '{"body":"test","msgtype":{"type":"a"}}'
899899 (code, response) = yield self.mock_resource.trigger("PUT", path, content)
910910
911911 # m.text message type
912912 path = "/rooms/%s/send/m.room.message/mid2" % (
913 urllib.quote(self.room_id))
913 urlparse.quote(self.room_id))
914914 content = '{"body":"test2","msgtype":"m.text"}'
915915 (code, response) = yield self.mock_resource.trigger("PUT", path, content)
916916 self.assertEquals(200, code, msg=str(response))
944944 "token_id": 1,
945945 "is_guest": False,
946946 }
947 hs.get_v1auth().get_user_by_access_token = get_user_by_access_token
947 hs.get_auth().get_user_by_access_token = get_user_by_access_token
948948
949949 def _insert_client_ip(*args, **kwargs):
950950 return defer.succeed(None)
10161016 "token_id": 1,
10171017 "is_guest": False,
10181018 }
1019 hs.get_v1auth().get_user_by_access_token = get_user_by_access_token
1019 hs.get_auth().get_user_by_access_token = get_user_by_access_token
10201020
10211021 def _insert_client_ip(*args, **kwargs):
10221022 return defer.succeed(None)
6767 "is_guest": False,
6868 }
6969
70 hs.get_v1auth().get_user_by_access_token = get_user_by_access_token
70 hs.get_auth().get_user_by_access_token = get_user_by_access_token
7171
7272 def _insert_client_ip(*args, **kwargs):
7373 return defer.succeed(None)
127127 yield _rotate(10)
128128 yield _assert_counts(1, 1)
129129
130 @tests.unittest.DEBUG
131130 @defer.inlineCallbacks
132131 def test_find_first_stream_ordering_after_ts(self):
133132 def add_event(so, ts):
+0
-33
tests/util/test_clock.py less more
0 # -*- coding: utf-8 -*-
1 # Copyright 2017 Vector Creations Ltd
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 from synapse import util
15 from twisted.internet import defer
16 from tests import unittest
17
18
19 class ClockTestCase(unittest.TestCase):
20 @defer.inlineCallbacks
21 def test_time_bound_deferred(self):
22 # just a deferred which never resolves
23 slow_deferred = defer.Deferred()
24
25 clock = util.Clock()
26 time_bound = clock.time_bound_deferred(slow_deferred, 0.001)
27
28 try:
29 yield time_bound
30 self.fail("Expected timedout error, but got nothing")
31 except util.DeferredTimedOutError:
32 pass
3535 yield sleep(0)
3636 self._check_test_key("one")
3737
38 def _test_preserve_fn(self, function):
38 def _test_run_in_background(self, function):
3939 sentinel_context = LoggingContext.current_context()
4040
4141 callback_completed = [False]
4242
43 @defer.inlineCallbacks
44 def cb():
43 def test():
4544 context_one.request = "one"
46 yield function()
47 self._check_test_key("one")
45 d = function()
4846
49 callback_completed[0] = True
47 def cb(res):
48 self._check_test_key("one")
49 callback_completed[0] = True
50 return res
51 d.addCallback(cb)
52
53 return d
5054
5155 with LoggingContext() as context_one:
5256 context_one.request = "one"
5357
5458 # fire off function, but don't wait on it.
55 logcontext.preserve_fn(cb)()
59 logcontext.run_in_background(test)
5660
5761 self._check_test_key("one")
5862
7983 # test is done once d2 finishes
8084 return d2
8185
82 def test_preserve_fn_with_blocking_fn(self):
86 def test_run_in_background_with_blocking_fn(self):
8387 @defer.inlineCallbacks
8488 def blocking_function():
8589 yield sleep(0)
8690
87 return self._test_preserve_fn(blocking_function)
91 return self._test_run_in_background(blocking_function)
8892
89 def test_preserve_fn_with_non_blocking_fn(self):
93 def test_run_in_background_with_non_blocking_fn(self):
9094 @defer.inlineCallbacks
9195 def nonblocking_function():
9296 with logcontext.PreserveLoggingContext():
9397 yield defer.succeed(None)
9498
95 return self._test_preserve_fn(nonblocking_function)
99 return self._test_run_in_background(nonblocking_function)
100
101 def test_run_in_background_with_chained_deferred(self):
102 # a function which returns a deferred which looks like it has been
103 # called, but is actually paused
104 def testfunc():
105 return logcontext.make_deferred_yieldable(
106 _chained_deferred_function()
107 )
108
109 return self._test_run_in_background(testfunc)
96110
97111 @defer.inlineCallbacks
98112 def test_make_deferred_yieldable(self):
118132 self._check_test_key("one")
119133
120134 @defer.inlineCallbacks
135 def test_make_deferred_yieldable_with_chained_deferreds(self):
136 sentinel_context = LoggingContext.current_context()
137
138 with LoggingContext() as context_one:
139 context_one.request = "one"
140
141 d1 = logcontext.make_deferred_yieldable(_chained_deferred_function())
142 # make sure that the context was reset by make_deferred_yieldable
143 self.assertIs(LoggingContext.current_context(), sentinel_context)
144
145 yield d1
146
147 # now it should be restored
148 self._check_test_key("one")
149
150 @defer.inlineCallbacks
121151 def test_make_deferred_yieldable_on_non_deferred(self):
122152 """Check that make_deferred_yieldable does the right thing when its
123153 argument isn't actually a deferred"""
131161 r = yield d1
132162 self.assertEqual(r, "bum")
133163 self._check_test_key("one")
164
165
166 # a function which returns a deferred which has been "called", but
167 # which had a function which returned another incomplete deferred on
168 # its callback list, so won't yet call any other new callbacks.
169 def _chained_deferred_function():
170 d = defer.succeed(None)
171
172 def cb(res):
173 d2 = defer.Deferred()
174 reactor.callLater(0, d2.callback, res)
175 return d2
176 d.addCallback(cb)
177 return d
0 # -*- coding: utf-8 -*-
1 # Copyright 2018 New Vector Ltd
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 import sys
15
16 from synapse.util.logformatter import LogFormatter
17 from tests import unittest
18
19
20 class TestException(Exception):
21 pass
22
23
24 class LogFormatterTestCase(unittest.TestCase):
25 def test_formatter(self):
26 formatter = LogFormatter()
27
28 try:
29 raise TestException("testytest")
30 except TestException:
31 ei = sys.exc_info()
32
33 output = formatter.formatException(ei)
34
35 # check the output looks vaguely sane
36 self.assertIn("testytest", output)
37 self.assertIn("Capture point", output)
1414
1515 import hashlib
1616 from inspect import getcallargs
17 import urllib
18 import urlparse
17 from six.moves.urllib import parse as urlparse
1918
2019 from mock import Mock, patch
2120 from twisted.internet import defer, reactor
237236 if matcher:
238237 try:
239238 args = [
240 urllib.unquote(u).decode("UTF-8")
239 urlparse.unquote(u).decode("UTF-8")
241240 for u in matcher.groups()
242241 ]
243242
00 [tox]
1 envlist = packaging, py27, pep8
1 envlist = packaging, py27, py36, pep8
22
33 [testenv]
44 deps =
4545 # )
4646 usedevelop=true
4747
48 [testenv:py36]
49 usedevelop=true
50 commands =
51 /usr/bin/find "{toxinidir}" -name '*.pyc' -delete
52 coverage run {env:COVERAGE_OPTS:} --source="{toxinidir}/synapse" \
53 "{envbindir}/trial" {env:TRIAL_FLAGS:} {posargs:tests/metrics tests/config} \
54 {env:TOXSUFFIX:}
55 {env:DUMP_COVERAGE_COMMAND:coverage report -m}
56
4857 [testenv:packaging]
4958 deps =
5059 check-manifest