New upstream version 0.29.0
Erik Johnston
5 years ago
0 | 0 | sudo: false |
1 | 1 | language: python |
2 | python: 2.7 | |
3 | 2 | |
4 | 3 | # tell travis to cache ~/.cache/pip |
5 | 4 | cache: pip |
6 | 5 | |
7 | env: | |
8 | - TOX_ENV=packaging | |
9 | - TOX_ENV=pep8 | |
10 | - TOX_ENV=py27 | |
6 | matrix: | |
7 | include: | |
8 | - python: 2.7 | |
9 | env: TOX_ENV=packaging | |
10 | ||
11 | - python: 2.7 | |
12 | env: TOX_ENV=pep8 | |
13 | ||
14 | - python: 2.7 | |
15 | env: TOX_ENV=py27 | |
16 | ||
17 | - python: 3.6 | |
18 | env: TOX_ENV=py36 | |
11 | 19 | |
12 | 20 | install: |
13 | 21 | - pip install tox |
59 | 59 | |
60 | 60 | Christoph Witzany <christoph at web.crofting.com> |
61 | 61 | * Add LDAP support for authentication |
62 | ||
63 | Pierre Jaury <pierre at jaury.eu> | |
64 | * Docker packaging⏎ |
0 | Changes in synapse v0.29.0 (2018-05-16) | |
1 | =========================================== | |
2 | ||
3 | ||
4 | Changes in synapse v0.29.0-rc1 (2018-05-14) | |
5 | =========================================== | |
6 | ||
7 | Notable changes, a docker file for running Synapse (Thanks to @kaiyou!) and a | |
8 | closed spec bug in the Client Server API. Additionally further prep for Python 3 | |
9 | migration. | |
10 | ||
11 | Potentially breaking change: | |
12 | ||
13 | * Make Client-Server API return 401 for invalid token (PR #3161). | |
14 | ||
15 | This changes the Client-server spec to return a 401 error code instead of 403 | |
16 | when the access token is unrecognised. This is the behaviour required by the | |
17 | specification, but some clients may be relying on the old, incorrect | |
18 | behaviour. | |
19 | ||
20 | Thanks to @NotAFile for fixing this. | |
21 | ||
22 | Features: | |
23 | ||
24 | * Add a Dockerfile for synapse (PR #2846) Thanks to @kaiyou! | |
25 | ||
26 | Changes - General: | |
27 | ||
28 | * nuke-room-from-db.sh: added postgresql option and help (PR #2337) Thanks to @rubo77! | |
29 | * Part user from rooms on account deactivate (PR #3201) | |
30 | * Make 'unexpected logging context' into warnings (PR #3007) | |
31 | * Set Server header in SynapseRequest (PR #3208) | |
32 | * remove duplicates from groups tables (PR #3129) | |
33 | * Improve exception handling for background processes (PR #3138) | |
34 | * Add missing consumeErrors to improve exception handling (PR #3139) | |
35 | * reraise exceptions more carefully (PR #3142) | |
36 | * Remove redundant call to preserve_fn (PR #3143) | |
37 | * Trap exceptions thrown within run_in_background (PR #3144) | |
38 | ||
39 | Changes - Refactors: | |
40 | ||
41 | * Refactor /context to reuse pagination storage functions (PR #3193) | |
42 | * Refactor recent events func to use pagination func (PR #3195) | |
43 | * Refactor pagination DB API to return concrete type (PR #3196) | |
44 | * Refactor get_recent_events_for_room return type (PR #3198) | |
45 | * Refactor sync APIs to reuse pagination API (PR #3199) | |
46 | * Remove unused code path from member change DB func (PR #3200) | |
47 | * Refactor request handling wrappers (PR #3203) | |
48 | * transaction_id, destination defined twice (PR #3209) Thanks to @damir-manapov! | |
49 | * Refactor event storage to prepare for changes in state calculations (PR #3141) | |
50 | * Set Server header in SynapseRequest (PR #3208) | |
51 | * Use deferred.addTimeout instead of time_bound_deferred (PR #3127, #3178) | |
52 | * Use run_in_background in preference to preserve_fn (PR #3140) | |
53 | ||
54 | Changes - Python 3 migration: | |
55 | ||
56 | * Construct HMAC as bytes on py3 (PR #3156) Thanks to @NotAFile! | |
57 | * run config tests on py3 (PR #3159) Thanks to @NotAFile! | |
58 | * Open certificate files as bytes (PR #3084) Thanks to @NotAFile! | |
59 | * Open config file in non-bytes mode (PR #3085) Thanks to @NotAFile! | |
60 | * Make event properties raise AttributeError instead (PR #3102) Thanks to @NotAFile! | |
61 | * Use six.moves.urlparse (PR #3108) Thanks to @NotAFile! | |
62 | * Add py3 tests to tox with folders that work (PR #3145) Thanks to @NotAFile! | |
63 | * Don't yield in list comprehensions (PR #3150) Thanks to @NotAFile! | |
64 | * Move more xrange to six (PR #3151) Thanks to @NotAFile! | |
65 | * make imports local (PR #3152) Thanks to @NotAFile! | |
66 | * move httplib import to six (PR #3153) Thanks to @NotAFile! | |
67 | * Replace stringIO imports with six (PR #3154, #3168) Thanks to @NotAFile! | |
68 | * more bytes strings (PR #3155) Thanks to @NotAFile! | |
69 | ||
70 | Bug Fixes: | |
71 | ||
72 | * synapse fails to start under Twisted >= 18.4 (PR #3157) | |
73 | * Fix a class of logcontext leaks (PR #3170) | |
74 | * Fix a couple of logcontext leaks in unit tests (PR #3172) | |
75 | * Fix logcontext leak in media repo (PR #3174) | |
76 | * Escape label values in prometheus metrics (PR #3175, #3186) | |
77 | * Fix 'Unhandled Error' logs with Twisted 18.4 (PR #3182) Thanks to @Half-Shot! | |
78 | * Fix logcontext leaks in rate limiter (PR #3183) | |
79 | * notifications: Convert next_token to string according to the spec (PR #3190) Thanks to @mujx! | |
80 | * nuke-room-from-db.sh: fix deletion from search table (PR #3194) Thanks to @rubo77! | |
81 | * add guard for None on purge_history api (PR #3160) Thanks to @krombel! | |
82 | ||
0 | 83 | Changes in synapse v0.28.1 (2018-05-01) |
1 | 84 | ======================================= |
2 | 85 | |
3 | 86 | SECURITY UPDATE |
4 | 87 | |
5 | 88 | * Clamp the allowed values of event depth received over federation to be |
6 | [0, 2**63 - 1]. This mitigates an attack where malicious events | |
7 | injected with depth = 2**63 - 1 render rooms unusable. Depth is used to | |
89 | [0, 2^63 - 1]. This mitigates an attack where malicious events | |
90 | injected with depth = 2^63 - 1 render rooms unusable. Depth is used to | |
8 | 91 | determine the cosmetic ordering of events within a room, and so the ordering |
9 | 92 | of events in such a room will default to using stream_ordering rather than depth |
10 | 93 | (topological_ordering). |
11 | 94 | |
12 | This is a temporary solution to mitigate abuse in the wild, whilst a long solution | |
95 | This is a temporary solution to mitigate abuse in the wild, whilst a long term solution | |
13 | 96 | is being implemented to improve how the depth parameter is used. |
14 | 97 | |
15 | 98 | Full details at |
16 | https://docs.google.com/document/d/1I3fi2S-XnpO45qrpCsowZv8P8dHcNZ4fsBsbOW7KABI/edit# | |
99 | https://docs.google.com/document/d/1I3fi2S-XnpO45qrpCsowZv8P8dHcNZ4fsBsbOW7KABI | |
17 | 100 | |
18 | 101 | * Pin Twisted to <18.4 until we stop using the private _OpenSSLECCurve API. |
19 | 102 | |
32 | 115 | |
33 | 116 | Minor performance improvement to federation sending and bug fixes. |
34 | 117 | |
35 | (Note: This release does not include state resolutions discussed in matrix live) | |
118 | (Note: This release does not include the delta state resolution implementation discussed in matrix live) | |
119 | ||
36 | 120 | |
37 | 121 | Features: |
38 | 122 |
0 | FROM docker.io/python:2-alpine3.7 | |
1 | ||
2 | RUN apk add --no-cache --virtual .nacl_deps su-exec build-base libffi-dev zlib-dev libressl-dev libjpeg-turbo-dev linux-headers postgresql-dev | |
3 | ||
4 | COPY . /synapse | |
5 | ||
6 | # A wheel cache may be provided in ./cache for faster build | |
7 | RUN cd /synapse \ | |
8 | && pip install --upgrade pip setuptools psycopg2 \ | |
9 | && mkdir -p /synapse/cache \ | |
10 | && pip install -f /synapse/cache --upgrade --process-dependency-links . \ | |
11 | && mv /synapse/contrib/docker/start.py /synapse/contrib/docker/conf / \ | |
12 | && rm -rf setup.py setup.cfg synapse | |
13 | ||
14 | VOLUME ["/data"] | |
15 | ||
16 | EXPOSE 8008/tcp 8448/tcp | |
17 | ||
18 | ENTRYPOINT ["/start.py"] |
24 | 24 | exclude jenkins.sh |
25 | 25 | exclude jenkins*.sh |
26 | 26 | exclude jenkins* |
27 | exclude Dockerfile | |
28 | exclude .dockerignore | |
27 | 29 | recursive-exclude jenkins *.sh |
28 | 30 | |
29 | 31 | prune .github |
0 | # Synapse Docker | |
1 | ||
2 | This Docker image will run Synapse as a single process. It does not provide any | |
3 | database server or TURN server that you should run separately. | |
4 | ||
5 | If you run a Postgres server, you should simply have it in the same Compose | |
6 | project or set the proper environment variables and the image will automatically | |
7 | use that server. | |
8 | ||
9 | ## Build | |
10 | ||
11 | Build the docker image with the `docker build` command from the root of the synapse repository. | |
12 | ||
13 | ``` | |
14 | docker build -t docker.io/matrixdotorg/synapse . | |
15 | ``` | |
16 | ||
17 | The `-t` option sets the image tag. Official images are tagged `matrixdotorg/synapse:<version>` where `<version>` is the same as the release tag in the synapse git repository. | |
18 | ||
19 | You may have a local Python wheel cache available, in which case copy the relevant packages in the ``cache/`` directory at the root of the project. | |
20 | ||
21 | ## Run | |
22 | ||
23 | This image is designed to run either with an automatically generated configuration | |
24 | file or with a custom configuration that requires manual edition. | |
25 | ||
26 | ### Automated configuration | |
27 | ||
28 | It is recommended that you use Docker Compose to run your containers, including | |
29 | this image and a Postgres server. A sample ``docker-compose.yml`` is provided, | |
30 | including example labels for reverse proxying and other artifacts. | |
31 | ||
32 | Read the section about environment variables and set at least mandatory variables, | |
33 | then run the server: | |
34 | ||
35 | ``` | |
36 | docker-compose up -d | |
37 | ``` | |
38 | ||
39 | ### Manual configuration | |
40 | ||
41 | A sample ``docker-compose.yml`` is provided, including example labels for | |
42 | reverse proxying and other artifacts. | |
43 | ||
44 | Specify a ``SYNAPSE_CONFIG_PATH``, preferably to a persistent path, | |
45 | to use manual configuration. To generate a fresh ``homeserver.yaml``, simply run: | |
46 | ||
47 | ``` | |
48 | docker-compose run --rm -e SYNAPSE_SERVER_NAME=my.matrix.host synapse generate | |
49 | ``` | |
50 | ||
51 | Then, customize your configuration and run the server: | |
52 | ||
53 | ``` | |
54 | docker-compose up -d | |
55 | ``` | |
56 | ||
57 | ### Without Compose | |
58 | ||
59 | If you do not wish to use Compose, you may still run this image using plain | |
60 | Docker commands. Note that the following is just a guideline and you may need | |
61 | to add parameters to the docker run command to account for the network situation | |
62 | with your postgres database. | |
63 | ||
64 | ``` | |
65 | docker run \ | |
66 | -d \ | |
67 | --name synapse \ | |
68 | -v ${DATA_PATH}:/data \ | |
69 | -e SYNAPSE_SERVER_NAME=my.matrix.host \ | |
70 | -e SYNAPSE_REPORT_STATS=yes \ | |
71 | docker.io/matrixdotorg/synapse:latest | |
72 | ``` | |
73 | ||
74 | ## Volumes | |
75 | ||
76 | The image expects a single volume, located at ``/data``, that will hold: | |
77 | ||
78 | * temporary files during uploads; | |
79 | * uploaded media and thumbnails; | |
80 | * the SQLite database if you do not configure postgres; | |
81 | * the appservices configuration. | |
82 | ||
83 | You are free to use separate volumes depending on storage endpoints at your | |
84 | disposal. For instance, ``/data/media`` coud be stored on a large but low | |
85 | performance hdd storage while other files could be stored on high performance | |
86 | endpoints. | |
87 | ||
88 | In order to setup an application service, simply create an ``appservices`` | |
89 | directory in the data volume and write the application service Yaml | |
90 | configuration file there. Multiple application services are supported. | |
91 | ||
92 | ## Environment | |
93 | ||
94 | Unless you specify a custom path for the configuration file, a very generic | |
95 | file will be generated, based on the following environment settings. | |
96 | These are a good starting point for setting up your own deployment. | |
97 | ||
98 | Global settings: | |
99 | ||
100 | * ``UID``, the user id Synapse will run as [default 991] | |
101 | * ``GID``, the group id Synapse will run as [default 991] | |
102 | * ``SYNAPSE_CONFIG_PATH``, path to a custom config file | |
103 | ||
104 | If ``SYNAPSE_CONFIG_PATH`` is set, you should generate a configuration file | |
105 | then customize it manually. No other environment variable is required. | |
106 | ||
107 | Otherwise, a dynamic configuration file will be used. The following environment | |
108 | variables are available for configuration: | |
109 | ||
110 | * ``SYNAPSE_SERVER_NAME`` (mandatory), the current server public hostname. | |
111 | * ``SYNAPSE_REPORT_STATS``, (mandatory, ``yes`` or ``no``), enable anonymous | |
112 | statistics reporting back to the Matrix project which helps us to get funding. | |
113 | * ``SYNAPSE_MACAROON_SECRET_KEY`` (mandatory) secret for signing access tokens | |
114 | to the server, set this to a proper random key. | |
115 | * ``SYNAPSE_NO_TLS``, set this variable to disable TLS in Synapse (use this if | |
116 | you run your own TLS-capable reverse proxy). | |
117 | * ``SYNAPSE_ENABLE_REGISTRATION``, set this variable to enable registration on | |
118 | the Synapse instance. | |
119 | * ``SYNAPSE_ALLOW_GUEST``, set this variable to allow guest joining this server. | |
120 | * ``SYNAPSE_EVENT_CACHE_SIZE``, the event cache size [default `10K`]. | |
121 | * ``SYNAPSE_CACHE_FACTOR``, the cache factor [default `0.5`]. | |
122 | * ``SYNAPSE_RECAPTCHA_PUBLIC_KEY``, set this variable to the recaptcha public | |
123 | key in order to enable recaptcha upon registration. | |
124 | * ``SYNAPSE_RECAPTCHA_PRIVATE_KEY``, set this variable to the recaptcha private | |
125 | key in order to enable recaptcha upon registration. | |
126 | * ``SYNAPSE_TURN_URIS``, set this variable to the coma-separated list of TURN | |
127 | uris to enable TURN for this homeserver. | |
128 | * ``SYNAPSE_TURN_SECRET``, set this to the TURN shared secret if required. | |
129 | ||
130 | Shared secrets, that will be initialized to random values if not set: | |
131 | ||
132 | * ``SYNAPSE_REGISTRATION_SHARED_SECRET``, secret for registrering users if | |
133 | registration is disable. | |
134 | ||
135 | Database specific values (will use SQLite if not set): | |
136 | ||
137 | * `POSTGRES_DB` - The database name for the synapse postgres database. [default: `synapse`] | |
138 | * `POSTGRES_HOST` - The host of the postgres database if you wish to use postgresql instead of sqlite3. [default: `db` which is useful when using a container on the same docker network in a compose file where the postgres service is called `db`] | |
139 | * `POSTGRES_PASSWORD` - The password for the synapse postgres database. **If this is set then postgres will be used instead of sqlite3.** [default: none] **NOTE**: You are highly encouraged to use postgresql! Please use the compose file to make it easier to deploy. | |
140 | * `POSTGRES_USER` - The user for the synapse postgres database. [default: `matrix`] | |
141 | ||
142 | Mail server specific values (will not send emails if not set): | |
143 | ||
144 | * ``SYNAPSE_SMTP_HOST``, hostname to the mail server. | |
145 | * ``SYNAPSE_SMTP_PORT``, TCP port for accessing the mail server [default ``25``]. | |
146 | * ``SYNAPSE_SMTP_USER``, username for authenticating against the mail server if any. | |
147 | * ``SYNAPSE_SMTP_PASSWORD``, password for authenticating against the mail server if any. |
0 | # vim:ft=yaml | |
1 | ||
2 | ## TLS ## | |
3 | ||
4 | tls_certificate_path: "/data/{{ SYNAPSE_SERVER_NAME }}.tls.crt" | |
5 | tls_private_key_path: "/data/{{ SYNAPSE_SERVER_NAME }}.tls.key" | |
6 | tls_dh_params_path: "/data/{{ SYNAPSE_SERVER_NAME }}.tls.dh" | |
7 | no_tls: {{ "True" if SYNAPSE_NO_TLS else "False" }} | |
8 | tls_fingerprints: [] | |
9 | ||
10 | ## Server ## | |
11 | ||
12 | server_name: "{{ SYNAPSE_SERVER_NAME }}" | |
13 | pid_file: /homeserver.pid | |
14 | web_client: False | |
15 | soft_file_limit: 0 | |
16 | ||
17 | ## Ports ## | |
18 | ||
19 | listeners: | |
20 | {% if not SYNAPSE_NO_TLS %} | |
21 | - | |
22 | port: 8448 | |
23 | bind_addresses: ['0.0.0.0'] | |
24 | type: http | |
25 | tls: true | |
26 | x_forwarded: false | |
27 | resources: | |
28 | - names: [client] | |
29 | compress: true | |
30 | - names: [federation] # Federation APIs | |
31 | compress: false | |
32 | {% endif %} | |
33 | ||
34 | - port: 8008 | |
35 | tls: false | |
36 | bind_addresses: ['0.0.0.0'] | |
37 | type: http | |
38 | x_forwarded: false | |
39 | ||
40 | resources: | |
41 | - names: [client] | |
42 | compress: true | |
43 | - names: [federation] | |
44 | compress: false | |
45 | ||
46 | ## Database ## | |
47 | ||
48 | {% if POSTGRES_PASSWORD %} | |
49 | database: | |
50 | name: "psycopg2" | |
51 | args: | |
52 | user: "{{ POSTGRES_USER or "synapse" }}" | |
53 | password: "{{ POSTGRES_PASSWORD }}" | |
54 | database: "{{ POSTGRES_DB or "synapse" }}" | |
55 | host: "{{ POSTGRES_HOST or "db" }}" | |
56 | port: "{{ POSTGRES_PORT or "5432" }}" | |
57 | cp_min: 5 | |
58 | cp_max: 10 | |
59 | {% else %} | |
60 | database: | |
61 | name: "sqlite3" | |
62 | args: | |
63 | database: "/data/homeserver.db" | |
64 | {% endif %} | |
65 | ||
66 | ## Performance ## | |
67 | ||
68 | event_cache_size: "{{ SYNAPSE_EVENT_CACHE_SIZE or "10K" }}" | |
69 | verbose: 0 | |
70 | log_file: "/data/homeserver.log" | |
71 | log_config: "/compiled/log.config" | |
72 | ||
73 | ## Ratelimiting ## | |
74 | ||
75 | rc_messages_per_second: 0.2 | |
76 | rc_message_burst_count: 10.0 | |
77 | federation_rc_window_size: 1000 | |
78 | federation_rc_sleep_limit: 10 | |
79 | federation_rc_sleep_delay: 500 | |
80 | federation_rc_reject_limit: 50 | |
81 | federation_rc_concurrent: 3 | |
82 | ||
83 | ## Files ## | |
84 | ||
85 | media_store_path: "/data/media" | |
86 | uploads_path: "/data/uploads" | |
87 | max_upload_size: "10M" | |
88 | max_image_pixels: "32M" | |
89 | dynamic_thumbnails: false | |
90 | ||
91 | # List of thumbnail to precalculate when an image is uploaded. | |
92 | thumbnail_sizes: | |
93 | - width: 32 | |
94 | height: 32 | |
95 | method: crop | |
96 | - width: 96 | |
97 | height: 96 | |
98 | method: crop | |
99 | - width: 320 | |
100 | height: 240 | |
101 | method: scale | |
102 | - width: 640 | |
103 | height: 480 | |
104 | method: scale | |
105 | - width: 800 | |
106 | height: 600 | |
107 | method: scale | |
108 | ||
109 | url_preview_enabled: False | |
110 | max_spider_size: "10M" | |
111 | ||
112 | ## Captcha ## | |
113 | ||
114 | {% if SYNAPSE_RECAPTCHA_PUBLIC_KEY %} | |
115 | recaptcha_public_key: "{{ SYNAPSE_RECAPTCHA_PUBLIC_KEY }}" | |
116 | recaptcha_private_key: "{{ SYNAPSE_RECAPTCHA_PRIVATE_KEY }}" | |
117 | enable_registration_captcha: True | |
118 | recaptcha_siteverify_api: "https://www.google.com/recaptcha/api/siteverify" | |
119 | {% else %} | |
120 | recaptcha_public_key: "YOUR_PUBLIC_KEY" | |
121 | recaptcha_private_key: "YOUR_PRIVATE_KEY" | |
122 | enable_registration_captcha: False | |
123 | recaptcha_siteverify_api: "https://www.google.com/recaptcha/api/siteverify" | |
124 | {% endif %} | |
125 | ||
126 | ## Turn ## | |
127 | ||
128 | {% if SYNAPSE_TURN_URIS %} | |
129 | turn_uris: | |
130 | {% for uri in SYNAPSE_TURN_URIS.split(',') %} - "{{ uri }}" | |
131 | {% endfor %} | |
132 | turn_shared_secret: "{{ SYNAPSE_TURN_SECRET }}" | |
133 | turn_user_lifetime: "1h" | |
134 | turn_allow_guests: True | |
135 | {% else %} | |
136 | turn_uris: [] | |
137 | turn_shared_secret: "YOUR_SHARED_SECRET" | |
138 | turn_user_lifetime: "1h" | |
139 | turn_allow_guests: True | |
140 | {% endif %} | |
141 | ||
142 | ## Registration ## | |
143 | ||
144 | enable_registration: {{ "True" if SYNAPSE_ENABLE_REGISTRATION else "False" }} | |
145 | registration_shared_secret: "{{ SYNAPSE_REGISTRATION_SHARED_SECRET }}" | |
146 | bcrypt_rounds: 12 | |
147 | allow_guest_access: {{ "True" if SYNAPSE_ALLOW_GUEST else "False" }} | |
148 | enable_group_creation: true | |
149 | ||
150 | # The list of identity servers trusted to verify third party | |
151 | # identifiers by this server. | |
152 | trusted_third_party_id_servers: | |
153 | - matrix.org | |
154 | - vector.im | |
155 | - riot.im | |
156 | ||
157 | ## Metrics ### | |
158 | ||
159 | {% if SYNAPSE_REPORT_STATS.lower() == "yes" %} | |
160 | enable_metrics: True | |
161 | report_stats: True | |
162 | {% else %} | |
163 | enable_metrics: False | |
164 | report_stats: False | |
165 | {% endif %} | |
166 | ||
167 | ## API Configuration ## | |
168 | ||
169 | room_invite_state_types: | |
170 | - "m.room.join_rules" | |
171 | - "m.room.canonical_alias" | |
172 | - "m.room.avatar" | |
173 | - "m.room.name" | |
174 | ||
175 | {% if SYNAPSE_APPSERVICES %} | |
176 | app_service_config_files: | |
177 | {% for appservice in SYNAPSE_APPSERVICES %} - "{{ appservice }}" | |
178 | {% endfor %} | |
179 | {% else %} | |
180 | app_service_config_files: [] | |
181 | {% endif %} | |
182 | ||
183 | macaroon_secret_key: "{{ SYNAPSE_MACAROON_SECRET_KEY }}" | |
184 | expire_access_token: False | |
185 | ||
186 | ## Signing Keys ## | |
187 | ||
188 | signing_key_path: "/data/{{ SYNAPSE_SERVER_NAME }}.signing.key" | |
189 | old_signing_keys: {} | |
190 | key_refresh_interval: "1d" # 1 Day. | |
191 | ||
192 | # The trusted servers to download signing keys from. | |
193 | perspectives: | |
194 | servers: | |
195 | "matrix.org": | |
196 | verify_keys: | |
197 | "ed25519:auto": | |
198 | key: "Noi6WqcDj0QmPxCNQqgezwTlBKrfqehY1u2FyWP9uYw" | |
199 | ||
200 | password_config: | |
201 | enabled: true | |
202 | ||
203 | {% if SYNAPSE_SMTP_HOST %} | |
204 | email: | |
205 | enable_notifs: false | |
206 | smtp_host: "{{ SYNAPSE_SMTP_HOST }}" | |
207 | smtp_port: {{ SYNAPSE_SMTP_PORT or "25" }} | |
208 | smtp_user: "{{ SYNAPSE_SMTP_USER }}" | |
209 | smtp_pass: "{{ SYNAPSE_SMTP_PASSWORD }}" | |
210 | require_transport_security: False | |
211 | notif_from: "{{ SYNAPSE_SMTP_FROM or "hostmaster@" + SYNAPSE_SERVER_NAME }}" | |
212 | app_name: Matrix | |
213 | template_dir: res/templates | |
214 | notif_template_html: notif_mail.html | |
215 | notif_template_text: notif_mail.txt | |
216 | notif_for_new_users: True | |
217 | riot_base_url: "https://{{ SYNAPSE_SERVER_NAME }}" | |
218 | {% endif %} |
0 | version: 1 | |
1 | ||
2 | formatters: | |
3 | precise: | |
4 | format: '%(asctime)s - %(name)s - %(lineno)d - %(levelname)s - %(request)s- %(message)s' | |
5 | ||
6 | filters: | |
7 | context: | |
8 | (): synapse.util.logcontext.LoggingContextFilter | |
9 | request: "" | |
10 | ||
11 | handlers: | |
12 | console: | |
13 | class: logging.StreamHandler | |
14 | formatter: precise | |
15 | filters: [context] | |
16 | ||
17 | loggers: | |
18 | synapse: | |
19 | level: {{ SYNAPSE_LOG_LEVEL or "WARNING" }} | |
20 | ||
21 | synapse.storage.SQL: | |
22 | # beware: increasing this to DEBUG will make synapse log sensitive | |
23 | # information such as access tokens. | |
24 | level: {{ SYNAPSE_LOG_LEVEL or "WARNING" }} | |
25 | ||
26 | root: | |
27 | level: {{ SYNAPSE_LOG_LEVEL or "WARNING" }} | |
28 | handlers: [console] |
0 | # This compose file is compatible with Compose itself, it might need some | |
1 | # adjustments to run properly with stack. | |
2 | ||
3 | version: '3' | |
4 | ||
5 | services: | |
6 | ||
7 | synapse: | |
8 | image: docker.io/matrixdotorg/synapse:latest | |
9 | # Since snyapse does not retry to connect to the database, restart upon | |
10 | # failure | |
11 | restart: unless-stopped | |
12 | # See the readme for a full documentation of the environment settings | |
13 | environment: | |
14 | - SYNAPSE_SERVER_NAME=my.matrix.host | |
15 | - SYNAPSE_REPORT_STATS=no | |
16 | - SYNAPSE_ENABLE_REGISTRATION=yes | |
17 | - SYNAPSE_LOG_LEVEL=INFO | |
18 | - POSTGRES_PASSWORD=changeme | |
19 | volumes: | |
20 | # You may either store all the files in a local folder | |
21 | - ./files:/data | |
22 | # .. or you may split this between different storage points | |
23 | # - ./files:/data | |
24 | # - /path/to/ssd:/data/uploads | |
25 | # - /path/to/large_hdd:/data/media | |
26 | depends_on: | |
27 | - db | |
28 | # In order to expose Synapse, remove one of the following, you might for | |
29 | # instance expose the TLS port directly: | |
30 | ports: | |
31 | - 8448:8448/tcp | |
32 | # ... or use a reverse proxy, here is an example for traefik: | |
33 | labels: | |
34 | - traefik.enable=true | |
35 | - traefik.frontend.rule=Host:my.matrix.Host | |
36 | - traefik.port=8448 | |
37 | ||
38 | db: | |
39 | image: docker.io/postgres:10-alpine | |
40 | # Change that password, of course! | |
41 | environment: | |
42 | - POSTGRES_USER=synapse | |
43 | - POSTGRES_PASSWORD=changeme | |
44 | volumes: | |
45 | # You may store the database tables in a local folder.. | |
46 | - ./schemas:/var/lib/postgresql/data | |
47 | # .. or store them on some high performance storage for better results | |
48 | # - /path/to/ssd/storage:/var/lib/postfesql/data |
0 | #!/usr/local/bin/python | |
1 | ||
2 | import jinja2 | |
3 | import os | |
4 | import sys | |
5 | import subprocess | |
6 | import glob | |
7 | ||
8 | # Utility functions | |
9 | convert = lambda src, dst, environ: open(dst, "w").write(jinja2.Template(open(src).read()).render(**environ)) | |
10 | ||
11 | def check_arguments(environ, args): | |
12 | for argument in args: | |
13 | if argument not in environ: | |
14 | print("Environment variable %s is mandatory, exiting." % argument) | |
15 | sys.exit(2) | |
16 | ||
17 | def generate_secrets(environ, secrets): | |
18 | for name, secret in secrets.items(): | |
19 | if secret not in environ: | |
20 | filename = "/data/%s.%s.key" % (environ["SYNAPSE_SERVER_NAME"], name) | |
21 | if os.path.exists(filename): | |
22 | with open(filename) as handle: value = handle.read() | |
23 | else: | |
24 | print("Generating a random secret for {}".format(name)) | |
25 | value = os.urandom(32).encode("hex") | |
26 | with open(filename, "w") as handle: handle.write(value) | |
27 | environ[secret] = value | |
28 | ||
29 | # Prepare the configuration | |
30 | mode = sys.argv[1] if len(sys.argv) > 1 else None | |
31 | environ = os.environ.copy() | |
32 | ownership = "{}:{}".format(environ.get("UID", 991), environ.get("GID", 991)) | |
33 | args = ["python", "-m", "synapse.app.homeserver"] | |
34 | ||
35 | # In generate mode, generate a configuration, missing keys, then exit | |
36 | if mode == "generate": | |
37 | check_arguments(environ, ("SYNAPSE_SERVER_NAME", "SYNAPSE_REPORT_STATS", "SYNAPSE_CONFIG_PATH")) | |
38 | args += [ | |
39 | "--server-name", environ["SYNAPSE_SERVER_NAME"], | |
40 | "--report-stats", environ["SYNAPSE_REPORT_STATS"], | |
41 | "--config-path", environ["SYNAPSE_CONFIG_PATH"], | |
42 | "--generate-config" | |
43 | ] | |
44 | os.execv("/usr/local/bin/python", args) | |
45 | ||
46 | # In normal mode, generate missing keys if any, then run synapse | |
47 | else: | |
48 | # Parse the configuration file | |
49 | if "SYNAPSE_CONFIG_PATH" in environ: | |
50 | args += ["--config-path", environ["SYNAPSE_CONFIG_PATH"]] | |
51 | else: | |
52 | check_arguments(environ, ("SYNAPSE_SERVER_NAME", "SYNAPSE_REPORT_STATS")) | |
53 | generate_secrets(environ, { | |
54 | "registration": "SYNAPSE_REGISTRATION_SHARED_SECRET", | |
55 | "macaroon": "SYNAPSE_MACAROON_SECRET_KEY" | |
56 | }) | |
57 | environ["SYNAPSE_APPSERVICES"] = glob.glob("/data/appservices/*.yaml") | |
58 | if not os.path.exists("/compiled"): os.mkdir("/compiled") | |
59 | convert("/conf/homeserver.yaml", "/compiled/homeserver.yaml", environ) | |
60 | convert("/conf/log.config", "/compiled/log.config", environ) | |
61 | subprocess.check_output(["chown", "-R", ownership, "/data"]) | |
62 | args += ["--config-path", "/compiled/homeserver.yaml"] | |
63 | # Generate missing keys and start synapse | |
64 | subprocess.check_output(args + ["--generate-keys"]) | |
65 | os.execv("/sbin/su-exec", ["su-exec", ownership] + args) |
0 | 0 | #! /bin/bash |
1 | ||
2 | set -eux | |
1 | 3 | |
2 | 4 | cd "`dirname $0`/.." |
3 | 5 | |
13 | 15 | tox -e py27 --notest -v |
14 | 16 | |
15 | 17 | TOX_BIN=$TOX_DIR/py27/bin |
16 | $TOX_BIN/pip install setuptools | |
18 | ||
19 | # cryptography 2.2 requires setuptools >= 18.5. | |
20 | # | |
21 | # older versions of virtualenv (?) give us a virtualenv with the same version | |
22 | # of setuptools as is installed on the system python (and tox runs virtualenv | |
23 | # under python3, so we get the version of setuptools that is installed on that). | |
24 | # | |
25 | # anyway, make sure that we have a recent enough setuptools. | |
26 | $TOX_BIN/pip install 'setuptools>=18.5' | |
27 | ||
28 | # we also need a semi-recent version of pip, because old ones fail to install | |
29 | # the "enum34" dependency of cryptography. | |
30 | $TOX_BIN/pip install 'pip>=10' | |
31 | ||
17 | 32 | { python synapse/python_dependencies.py |
18 | 33 | echo lxml psycopg2 |
19 | 34 | } | xargs $TOX_BIN/pip install |
5 | 5 | |
6 | 6 | ## Do not run it lightly. |
7 | 7 | |
8 | set -e | |
9 | ||
10 | if [ "$1" == "-h" ] || [ "$1" == "" ]; then | |
11 | echo "Call with ROOM_ID as first option and then pipe it into the database. So for instance you might run" | |
12 | echo " nuke-room-from-db.sh <room_id> | sqlite3 homeserver.db" | |
13 | echo "or" | |
14 | echo " nuke-room-from-db.sh <room_id> | psql --dbname=synapse" | |
15 | exit | |
16 | fi | |
17 | ||
8 | 18 | ROOMID="$1" |
9 | 19 | |
10 | sqlite3 homeserver.db <<EOF | |
20 | cat <<EOF | |
11 | 21 | DELETE FROM event_forward_extremities WHERE room_id = '$ROOMID'; |
12 | 22 | DELETE FROM event_backward_extremities WHERE room_id = '$ROOMID'; |
13 | 23 | DELETE FROM event_edges WHERE room_id = '$ROOMID'; |
28 | 38 | DELETE FROM state_groups_state WHERE room_id = '$ROOMID'; |
29 | 39 | DELETE FROM receipts_graph WHERE room_id = '$ROOMID'; |
30 | 40 | DELETE FROM receipts_linearized WHERE room_id = '$ROOMID'; |
31 | DELETE FROM event_search_content WHERE c1room_id = '$ROOMID'; | |
41 | DELETE FROM event_search WHERE room_id = '$ROOMID'; | |
32 | 42 | DELETE FROM guest_access WHERE room_id = '$ROOMID'; |
33 | 43 | DELETE FROM history_visibility WHERE room_id = '$ROOMID'; |
34 | 44 | DELETE FROM room_tags WHERE room_id = '$ROOMID'; |
15 | 15 | """ This is a reference implementation of a Matrix home server. |
16 | 16 | """ |
17 | 17 | |
18 | __version__ = "0.28.1" | |
18 | __version__ = "0.29.0" |
31 | 31 | from synapse.server import HomeServer |
32 | 32 | from synapse.storage.engines import create_engine |
33 | 33 | from synapse.util.httpresourcetree import create_resource_tree |
34 | from synapse.util.logcontext import LoggingContext, preserve_fn | |
34 | from synapse.util.logcontext import LoggingContext, run_in_background | |
35 | 35 | from synapse.util.manhole import manhole |
36 | 36 | from synapse.util.versionstring import get_version_string |
37 | from twisted.internet import reactor | |
37 | from twisted.internet import reactor, defer | |
38 | 38 | from twisted.web.resource import NoResource |
39 | 39 | |
40 | 40 | logger = logging.getLogger("synapse.app.appservice") |
73 | 73 | site_tag, |
74 | 74 | listener_config, |
75 | 75 | root_resource, |
76 | self.version_string, | |
76 | 77 | ) |
77 | 78 | ) |
78 | 79 | |
111 | 112 | |
112 | 113 | if stream_name == "events": |
113 | 114 | max_stream_id = self.store.get_room_max_stream_ordering() |
114 | preserve_fn( | |
115 | self.appservice_handler.notify_interested_services | |
116 | )(max_stream_id) | |
115 | run_in_background(self._notify_app_services, max_stream_id) | |
116 | ||
117 | @defer.inlineCallbacks | |
118 | def _notify_app_services(self, room_stream_id): | |
119 | try: | |
120 | yield self.appservice_handler.notify_interested_services(room_stream_id) | |
121 | except Exception: | |
122 | logger.exception("Error notifying application services of event") | |
117 | 123 | |
118 | 124 | |
119 | 125 | def start(config_options): |
97 | 97 | site_tag, |
98 | 98 | listener_config, |
99 | 99 | root_resource, |
100 | self.version_string, | |
100 | 101 | ) |
101 | 102 | ) |
102 | 103 |
113 | 113 | site_tag, |
114 | 114 | listener_config, |
115 | 115 | root_resource, |
116 | self.version_string, | |
116 | 117 | ) |
117 | 118 | ) |
118 | 119 |
37 | 37 | from synapse.storage.engines import create_engine |
38 | 38 | from synapse.util.async import Linearizer |
39 | 39 | from synapse.util.httpresourcetree import create_resource_tree |
40 | from synapse.util.logcontext import LoggingContext, preserve_fn | |
40 | from synapse.util.logcontext import LoggingContext, run_in_background | |
41 | 41 | from synapse.util.manhole import manhole |
42 | 42 | from synapse.util.versionstring import get_version_string |
43 | 43 | from twisted.internet import defer, reactor |
100 | 100 | site_tag, |
101 | 101 | listener_config, |
102 | 102 | root_resource, |
103 | self.version_string, | |
103 | 104 | ) |
104 | 105 | ) |
105 | 106 | |
228 | 229 | # presence, typing, etc. |
229 | 230 | if stream_name == "federation": |
230 | 231 | send_queue.process_rows_for_federation(self.federation_sender, rows) |
231 | preserve_fn(self.update_token)(token) | |
232 | run_in_background(self.update_token, token) | |
232 | 233 | |
233 | 234 | # We also need to poke the federation sender when new events happen |
234 | 235 | elif stream_name == "events": |
236 | 237 | |
237 | 238 | @defer.inlineCallbacks |
238 | 239 | def update_token(self, token): |
239 | self.federation_position = token | |
240 | ||
241 | # We linearize here to ensure we don't have races updating the token | |
242 | with (yield self._fed_position_linearizer.queue(None)): | |
243 | if self._last_ack < self.federation_position: | |
244 | yield self.store.update_federation_out_pos( | |
245 | "federation", self.federation_position | |
246 | ) | |
247 | ||
248 | # We ACK this token over replication so that the master can drop | |
249 | # its in memory queues | |
250 | self.replication_client.send_federation_ack(self.federation_position) | |
251 | self._last_ack = self.federation_position | |
240 | try: | |
241 | self.federation_position = token | |
242 | ||
243 | # We linearize here to ensure we don't have races updating the token | |
244 | with (yield self._fed_position_linearizer.queue(None)): | |
245 | if self._last_ack < self.federation_position: | |
246 | yield self.store.update_federation_out_pos( | |
247 | "federation", self.federation_position | |
248 | ) | |
249 | ||
250 | # We ACK this token over replication so that the master can drop | |
251 | # its in memory queues | |
252 | self.replication_client.send_federation_ack(self.federation_position) | |
253 | self._last_ack = self.federation_position | |
254 | except Exception: | |
255 | logger.exception("Error updating federation stream position") | |
252 | 256 | |
253 | 257 | |
254 | 258 | if __name__ == '__main__': |
151 | 151 | site_tag, |
152 | 152 | listener_config, |
153 | 153 | root_resource, |
154 | self.version_string, | |
154 | 155 | ) |
155 | 156 | ) |
156 | 157 |
139 | 139 | site_tag, |
140 | 140 | listener_config, |
141 | 141 | root_resource, |
142 | self.version_string, | |
142 | 143 | ), |
143 | 144 | self.tls_server_context_factory, |
144 | 145 | ) |
152 | 153 | site_tag, |
153 | 154 | listener_config, |
154 | 155 | root_resource, |
156 | self.version_string, | |
155 | 157 | ) |
156 | 158 | ) |
157 | 159 | logger.info("Synapse now listening on port %d", port) |
32 | 32 | from synapse.storage import DataStore |
33 | 33 | from synapse.storage.engines import create_engine |
34 | 34 | from synapse.util.httpresourcetree import create_resource_tree |
35 | from synapse.util.logcontext import LoggingContext, preserve_fn | |
35 | from synapse.util.logcontext import LoggingContext, run_in_background | |
36 | 36 | from synapse.util.manhole import manhole |
37 | 37 | from synapse.util.versionstring import get_version_string |
38 | 38 | from twisted.internet import defer, reactor |
103 | 103 | site_tag, |
104 | 104 | listener_config, |
105 | 105 | root_resource, |
106 | self.version_string, | |
106 | 107 | ) |
107 | 108 | ) |
108 | 109 | |
139 | 140 | |
140 | 141 | def on_rdata(self, stream_name, token, rows): |
141 | 142 | super(PusherReplicationHandler, self).on_rdata(stream_name, token, rows) |
142 | preserve_fn(self.poke_pushers)(stream_name, token, rows) | |
143 | run_in_background(self.poke_pushers, stream_name, token, rows) | |
143 | 144 | |
144 | 145 | @defer.inlineCallbacks |
145 | 146 | def poke_pushers(self, stream_name, token, rows): |
146 | if stream_name == "pushers": | |
147 | for row in rows: | |
148 | if row.deleted: | |
149 | yield self.stop_pusher(row.user_id, row.app_id, row.pushkey) | |
150 | else: | |
151 | yield self.start_pusher(row.user_id, row.app_id, row.pushkey) | |
152 | elif stream_name == "events": | |
153 | yield self.pusher_pool.on_new_notifications( | |
154 | token, token, | |
155 | ) | |
156 | elif stream_name == "receipts": | |
157 | yield self.pusher_pool.on_new_receipts( | |
158 | token, token, set(row.room_id for row in rows) | |
159 | ) | |
147 | try: | |
148 | if stream_name == "pushers": | |
149 | for row in rows: | |
150 | if row.deleted: | |
151 | yield self.stop_pusher(row.user_id, row.app_id, row.pushkey) | |
152 | else: | |
153 | yield self.start_pusher(row.user_id, row.app_id, row.pushkey) | |
154 | elif stream_name == "events": | |
155 | yield self.pusher_pool.on_new_notifications( | |
156 | token, token, | |
157 | ) | |
158 | elif stream_name == "receipts": | |
159 | yield self.pusher_pool.on_new_receipts( | |
160 | token, token, set(row.room_id for row in rows) | |
161 | ) | |
162 | except Exception: | |
163 | logger.exception("Error poking pushers") | |
160 | 164 | |
161 | 165 | def stop_pusher(self, user_id, app_id, pushkey): |
162 | 166 | key = "%s:%s" % (app_id, pushkey) |
50 | 50 | from synapse.storage.presence import UserPresenceState |
51 | 51 | from synapse.storage.roommember import RoomMemberStore |
52 | 52 | from synapse.util.httpresourcetree import create_resource_tree |
53 | from synapse.util.logcontext import LoggingContext, preserve_fn | |
53 | from synapse.util.logcontext import LoggingContext, run_in_background | |
54 | 54 | from synapse.util.manhole import manhole |
55 | 55 | from synapse.util.stringutils import random_string |
56 | 56 | from synapse.util.versionstring import get_version_string |
280 | 280 | site_tag, |
281 | 281 | listener_config, |
282 | 282 | root_resource, |
283 | self.version_string, | |
283 | 284 | ) |
284 | 285 | ) |
285 | 286 | |
326 | 327 | |
327 | 328 | def on_rdata(self, stream_name, token, rows): |
328 | 329 | super(SyncReplicationHandler, self).on_rdata(stream_name, token, rows) |
329 | ||
330 | preserve_fn(self.process_and_notify)(stream_name, token, rows) | |
330 | run_in_background(self.process_and_notify, stream_name, token, rows) | |
331 | 331 | |
332 | 332 | def get_streams_to_replicate(self): |
333 | 333 | args = super(SyncReplicationHandler, self).get_streams_to_replicate() |
339 | 339 | |
340 | 340 | @defer.inlineCallbacks |
341 | 341 | def process_and_notify(self, stream_name, token, rows): |
342 | if stream_name == "events": | |
343 | # We shouldn't get multiple rows per token for events stream, so | |
344 | # we don't need to optimise this for multiple rows. | |
345 | for row in rows: | |
346 | event = yield self.store.get_event(row.event_id) | |
347 | extra_users = () | |
348 | if event.type == EventTypes.Member: | |
349 | extra_users = (event.state_key,) | |
350 | max_token = self.store.get_room_max_stream_ordering() | |
351 | self.notifier.on_new_room_event( | |
352 | event, token, max_token, extra_users | |
353 | ) | |
354 | elif stream_name == "push_rules": | |
355 | self.notifier.on_new_event( | |
356 | "push_rules_key", token, users=[row.user_id for row in rows], | |
357 | ) | |
358 | elif stream_name in ("account_data", "tag_account_data",): | |
359 | self.notifier.on_new_event( | |
360 | "account_data_key", token, users=[row.user_id for row in rows], | |
361 | ) | |
362 | elif stream_name == "receipts": | |
363 | self.notifier.on_new_event( | |
364 | "receipt_key", token, rooms=[row.room_id for row in rows], | |
365 | ) | |
366 | elif stream_name == "typing": | |
367 | self.typing_handler.process_replication_rows(token, rows) | |
368 | self.notifier.on_new_event( | |
369 | "typing_key", token, rooms=[row.room_id for row in rows], | |
370 | ) | |
371 | elif stream_name == "to_device": | |
372 | entities = [row.entity for row in rows if row.entity.startswith("@")] | |
373 | if entities: | |
374 | self.notifier.on_new_event( | |
375 | "to_device_key", token, users=entities, | |
376 | ) | |
377 | elif stream_name == "device_lists": | |
378 | all_room_ids = set() | |
379 | for row in rows: | |
380 | room_ids = yield self.store.get_rooms_for_user(row.user_id) | |
381 | all_room_ids.update(room_ids) | |
382 | self.notifier.on_new_event( | |
383 | "device_list_key", token, rooms=all_room_ids, | |
384 | ) | |
385 | elif stream_name == "presence": | |
386 | yield self.presence_handler.process_replication_rows(token, rows) | |
387 | elif stream_name == "receipts": | |
388 | self.notifier.on_new_event( | |
389 | "groups_key", token, users=[row.user_id for row in rows], | |
390 | ) | |
342 | try: | |
343 | if stream_name == "events": | |
344 | # We shouldn't get multiple rows per token for events stream, so | |
345 | # we don't need to optimise this for multiple rows. | |
346 | for row in rows: | |
347 | event = yield self.store.get_event(row.event_id) | |
348 | extra_users = () | |
349 | if event.type == EventTypes.Member: | |
350 | extra_users = (event.state_key,) | |
351 | max_token = self.store.get_room_max_stream_ordering() | |
352 | self.notifier.on_new_room_event( | |
353 | event, token, max_token, extra_users | |
354 | ) | |
355 | elif stream_name == "push_rules": | |
356 | self.notifier.on_new_event( | |
357 | "push_rules_key", token, users=[row.user_id for row in rows], | |
358 | ) | |
359 | elif stream_name in ("account_data", "tag_account_data",): | |
360 | self.notifier.on_new_event( | |
361 | "account_data_key", token, users=[row.user_id for row in rows], | |
362 | ) | |
363 | elif stream_name == "receipts": | |
364 | self.notifier.on_new_event( | |
365 | "receipt_key", token, rooms=[row.room_id for row in rows], | |
366 | ) | |
367 | elif stream_name == "typing": | |
368 | self.typing_handler.process_replication_rows(token, rows) | |
369 | self.notifier.on_new_event( | |
370 | "typing_key", token, rooms=[row.room_id for row in rows], | |
371 | ) | |
372 | elif stream_name == "to_device": | |
373 | entities = [row.entity for row in rows if row.entity.startswith("@")] | |
374 | if entities: | |
375 | self.notifier.on_new_event( | |
376 | "to_device_key", token, users=entities, | |
377 | ) | |
378 | elif stream_name == "device_lists": | |
379 | all_room_ids = set() | |
380 | for row in rows: | |
381 | room_ids = yield self.store.get_rooms_for_user(row.user_id) | |
382 | all_room_ids.update(room_ids) | |
383 | self.notifier.on_new_event( | |
384 | "device_list_key", token, rooms=all_room_ids, | |
385 | ) | |
386 | elif stream_name == "presence": | |
387 | yield self.presence_handler.process_replication_rows(token, rows) | |
388 | elif stream_name == "receipts": | |
389 | self.notifier.on_new_event( | |
390 | "groups_key", token, users=[row.user_id for row in rows], | |
391 | ) | |
392 | except Exception: | |
393 | logger.exception("Error processing replication") | |
391 | 394 | |
392 | 395 | |
393 | 396 | def start(config_options): |
38 | 38 | from synapse.storage.user_directory import UserDirectoryStore |
39 | 39 | from synapse.util.caches.stream_change_cache import StreamChangeCache |
40 | 40 | from synapse.util.httpresourcetree import create_resource_tree |
41 | from synapse.util.logcontext import LoggingContext, preserve_fn | |
41 | from synapse.util.logcontext import LoggingContext, run_in_background | |
42 | 42 | from synapse.util.manhole import manhole |
43 | 43 | from synapse.util.versionstring import get_version_string |
44 | from twisted.internet import reactor | |
44 | from twisted.internet import reactor, defer | |
45 | 45 | from twisted.web.resource import NoResource |
46 | 46 | |
47 | 47 | logger = logging.getLogger("synapse.app.user_dir") |
125 | 125 | site_tag, |
126 | 126 | listener_config, |
127 | 127 | root_resource, |
128 | self.version_string, | |
128 | 129 | ) |
129 | 130 | ) |
130 | 131 | |
163 | 164 | stream_name, token, rows |
164 | 165 | ) |
165 | 166 | if stream_name == "current_state_deltas": |
166 | preserve_fn(self.user_directory.notify_new_event)() | |
167 | run_in_background(self._notify_directory) | |
168 | ||
169 | @defer.inlineCallbacks | |
170 | def _notify_directory(self): | |
171 | try: | |
172 | yield self.user_directory.notify_new_event() | |
173 | except Exception: | |
174 | logger.exception("Error notifiying user directory of state update") | |
167 | 175 | |
168 | 176 | |
169 | 177 | def start(config_options): |
50 | 50 | from twisted.internet import defer |
51 | 51 | |
52 | 52 | from synapse.appservice import ApplicationServiceState |
53 | from synapse.util.logcontext import preserve_fn | |
53 | from synapse.util.logcontext import run_in_background | |
54 | 54 | from synapse.util.metrics import Measure |
55 | 55 | |
56 | 56 | import logging |
105 | 105 | def enqueue(self, service, event): |
106 | 106 | # if this service isn't being sent something |
107 | 107 | self.queued_events.setdefault(service.id, []).append(event) |
108 | preserve_fn(self._send_request)(service) | |
108 | run_in_background(self._send_request, service) | |
109 | 109 | |
110 | 110 | @defer.inlineCallbacks |
111 | 111 | def _send_request(self, service): |
151 | 151 | if sent: |
152 | 152 | yield txn.complete(self.store) |
153 | 153 | else: |
154 | preserve_fn(self._start_recoverer)(service) | |
155 | except Exception as e: | |
156 | logger.exception(e) | |
157 | preserve_fn(self._start_recoverer)(service) | |
154 | run_in_background(self._start_recoverer, service) | |
155 | except Exception: | |
156 | logger.exception("Error creating appservice transaction") | |
157 | run_in_background(self._start_recoverer, service) | |
158 | 158 | |
159 | 159 | @defer.inlineCallbacks |
160 | 160 | def on_recovered(self, recoverer): |
175 | 175 | |
176 | 176 | @defer.inlineCallbacks |
177 | 177 | def _start_recoverer(self, service): |
178 | yield self.store.set_appservice_state( | |
179 | service, | |
180 | ApplicationServiceState.DOWN | |
181 | ) | |
182 | logger.info( | |
183 | "Application service falling behind. Starting recoverer. AS ID %s", | |
184 | service.id | |
185 | ) | |
186 | recoverer = self.recoverer_fn(service, self.on_recovered) | |
187 | self.add_recoverers([recoverer]) | |
188 | recoverer.recover() | |
178 | try: | |
179 | yield self.store.set_appservice_state( | |
180 | service, | |
181 | ApplicationServiceState.DOWN | |
182 | ) | |
183 | logger.info( | |
184 | "Application service falling behind. Starting recoverer. AS ID %s", | |
185 | service.id | |
186 | ) | |
187 | recoverer = self.recoverer_fn(service, self.on_recovered) | |
188 | self.add_recoverers([recoverer]) | |
189 | recoverer.recover() | |
190 | except Exception: | |
191 | logger.exception("Error starting AS recoverer") | |
189 | 192 | |
190 | 193 | @defer.inlineCallbacks |
191 | 194 | def _is_service_up(self, service): |
280 | 280 | ) |
281 | 281 | if not cls.path_exists(config_dir_path): |
282 | 282 | os.makedirs(config_dir_path) |
283 | with open(config_path, "wb") as config_file: | |
284 | config_bytes, config = obj.generate_config( | |
283 | with open(config_path, "w") as config_file: | |
284 | config_str, config = obj.generate_config( | |
285 | 285 | config_dir_path=config_dir_path, |
286 | 286 | server_name=server_name, |
287 | 287 | report_stats=(config_args.report_stats == "yes"), |
288 | 288 | is_generating_file=True |
289 | 289 | ) |
290 | 290 | obj.invoke_all("generate_files", config) |
291 | config_file.write(config_bytes) | |
291 | config_file.write(config_str) | |
292 | 292 | print(( |
293 | 293 | "A config file has been generated in %r for server name" |
294 | 294 | " %r with corresponding SSL keys and self-signed" |
16 | 16 | from synapse.appservice import ApplicationService |
17 | 17 | from synapse.types import UserID |
18 | 18 | |
19 | import urllib | |
20 | 19 | import yaml |
21 | 20 | import logging |
22 | 21 | |
23 | 22 | from six import string_types |
23 | from six.moves.urllib import parse as urlparse | |
24 | 24 | |
25 | 25 | logger = logging.getLogger(__name__) |
26 | 26 | |
104 | 104 | ) |
105 | 105 | |
106 | 106 | localpart = as_info["sender_localpart"] |
107 | if urllib.quote(localpart) != localpart: | |
107 | if urlparse.quote(localpart) != localpart: | |
108 | 108 | raise ValueError( |
109 | 109 | "sender_localpart needs characters which are not URL encoded." |
110 | 110 | ) |
116 | 116 | log_config = config.get("log_config") |
117 | 117 | if log_config and not os.path.exists(log_config): |
118 | 118 | log_file = self.abspath("homeserver.log") |
119 | with open(log_config, "wb") as log_config_file: | |
119 | with open(log_config, "w") as log_config_file: | |
120 | 120 | log_config_file.write( |
121 | 121 | DEFAULT_LOG_CONFIG.substitute(log_file=log_file) |
122 | 122 | ) |
132 | 132 | tls_dh_params_path = config["tls_dh_params_path"] |
133 | 133 | |
134 | 134 | if not self.path_exists(tls_private_key_path): |
135 | with open(tls_private_key_path, "w") as private_key_file: | |
135 | with open(tls_private_key_path, "wb") as private_key_file: | |
136 | 136 | tls_private_key = crypto.PKey() |
137 | 137 | tls_private_key.generate_key(crypto.TYPE_RSA, 2048) |
138 | 138 | private_key_pem = crypto.dump_privatekey( |
147 | 147 | ) |
148 | 148 | |
149 | 149 | if not self.path_exists(tls_certificate_path): |
150 | with open(tls_certificate_path, "w") as certificate_file: | |
150 | with open(tls_certificate_path, "wb") as certificate_file: | |
151 | 151 | cert = crypto.X509() |
152 | 152 | subject = cert.get_subject() |
153 | 153 | subject.CN = config["server_name"] |
12 | 12 | # limitations under the License. |
13 | 13 | |
14 | 14 | from twisted.internet import ssl |
15 | from OpenSSL import SSL | |
16 | from twisted.internet._sslverify import _OpenSSLECCurve, _defaultCurveName | |
15 | from OpenSSL import SSL, crypto | |
16 | from twisted.internet._sslverify import _defaultCurveName | |
17 | 17 | |
18 | 18 | import logging |
19 | 19 | |
31 | 31 | @staticmethod |
32 | 32 | def configure_context(context, config): |
33 | 33 | try: |
34 | _ecCurve = _OpenSSLECCurve(_defaultCurveName) | |
35 | _ecCurve.addECKeyToContext(context) | |
34 | _ecCurve = crypto.get_elliptic_curve(_defaultCurveName) | |
35 | context.set_tmp_ecdh(_ecCurve) | |
36 | ||
36 | 37 | except Exception: |
37 | 38 | logger.exception("Failed to enable elliptic curve for TLS") |
38 | 39 | context.set_options(SSL.OP_NO_SSLv2 | SSL.OP_NO_SSLv3) |
18 | 18 | from synapse.util import unwrapFirstError, logcontext |
19 | 19 | from synapse.util.logcontext import ( |
20 | 20 | PreserveLoggingContext, |
21 | preserve_fn | |
21 | preserve_fn, | |
22 | run_in_background, | |
22 | 23 | ) |
23 | 24 | from synapse.util.metrics import Measure |
24 | 25 | |
126 | 127 | |
127 | 128 | verify_requests.append(verify_request) |
128 | 129 | |
129 | preserve_fn(self._start_key_lookups)(verify_requests) | |
130 | run_in_background(self._start_key_lookups, verify_requests) | |
130 | 131 | |
131 | 132 | # Pass those keys to handle_key_deferred so that the json object |
132 | 133 | # signatures can be verified |
145 | 146 | verify_requests (List[VerifyKeyRequest]): |
146 | 147 | """ |
147 | 148 | |
148 | # create a deferred for each server we're going to look up the keys | |
149 | # for; we'll resolve them once we have completed our lookups. | |
150 | # These will be passed into wait_for_previous_lookups to block | |
151 | # any other lookups until we have finished. | |
152 | # The deferreds are called with no logcontext. | |
153 | server_to_deferred = { | |
154 | rq.server_name: defer.Deferred() | |
155 | for rq in verify_requests | |
156 | } | |
157 | ||
158 | # We want to wait for any previous lookups to complete before | |
159 | # proceeding. | |
160 | yield self.wait_for_previous_lookups( | |
161 | [rq.server_name for rq in verify_requests], | |
162 | server_to_deferred, | |
163 | ) | |
164 | ||
165 | # Actually start fetching keys. | |
166 | self._get_server_verify_keys(verify_requests) | |
167 | ||
168 | # When we've finished fetching all the keys for a given server_name, | |
169 | # resolve the deferred passed to `wait_for_previous_lookups` so that | |
170 | # any lookups waiting will proceed. | |
171 | # | |
172 | # map from server name to a set of request ids | |
173 | server_to_request_ids = {} | |
174 | ||
175 | for verify_request in verify_requests: | |
176 | server_name = verify_request.server_name | |
177 | request_id = id(verify_request) | |
178 | server_to_request_ids.setdefault(server_name, set()).add(request_id) | |
179 | ||
180 | def remove_deferreds(res, verify_request): | |
181 | server_name = verify_request.server_name | |
182 | request_id = id(verify_request) | |
183 | server_to_request_ids[server_name].discard(request_id) | |
184 | if not server_to_request_ids[server_name]: | |
185 | d = server_to_deferred.pop(server_name, None) | |
186 | if d: | |
187 | d.callback(None) | |
188 | return res | |
189 | ||
190 | for verify_request in verify_requests: | |
191 | verify_request.deferred.addBoth( | |
192 | remove_deferreds, verify_request, | |
149 | try: | |
150 | # create a deferred for each server we're going to look up the keys | |
151 | # for; we'll resolve them once we have completed our lookups. | |
152 | # These will be passed into wait_for_previous_lookups to block | |
153 | # any other lookups until we have finished. | |
154 | # The deferreds are called with no logcontext. | |
155 | server_to_deferred = { | |
156 | rq.server_name: defer.Deferred() | |
157 | for rq in verify_requests | |
158 | } | |
159 | ||
160 | # We want to wait for any previous lookups to complete before | |
161 | # proceeding. | |
162 | yield self.wait_for_previous_lookups( | |
163 | [rq.server_name for rq in verify_requests], | |
164 | server_to_deferred, | |
193 | 165 | ) |
166 | ||
167 | # Actually start fetching keys. | |
168 | self._get_server_verify_keys(verify_requests) | |
169 | ||
170 | # When we've finished fetching all the keys for a given server_name, | |
171 | # resolve the deferred passed to `wait_for_previous_lookups` so that | |
172 | # any lookups waiting will proceed. | |
173 | # | |
174 | # map from server name to a set of request ids | |
175 | server_to_request_ids = {} | |
176 | ||
177 | for verify_request in verify_requests: | |
178 | server_name = verify_request.server_name | |
179 | request_id = id(verify_request) | |
180 | server_to_request_ids.setdefault(server_name, set()).add(request_id) | |
181 | ||
182 | def remove_deferreds(res, verify_request): | |
183 | server_name = verify_request.server_name | |
184 | request_id = id(verify_request) | |
185 | server_to_request_ids[server_name].discard(request_id) | |
186 | if not server_to_request_ids[server_name]: | |
187 | d = server_to_deferred.pop(server_name, None) | |
188 | if d: | |
189 | d.callback(None) | |
190 | return res | |
191 | ||
192 | for verify_request in verify_requests: | |
193 | verify_request.deferred.addBoth( | |
194 | remove_deferreds, verify_request, | |
195 | ) | |
196 | except Exception: | |
197 | logger.exception("Error starting key lookups") | |
194 | 198 | |
195 | 199 | @defer.inlineCallbacks |
196 | 200 | def wait_for_previous_lookups(self, server_names, server_to_deferred): |
312 | 316 | if not verify_request.deferred.called: |
313 | 317 | verify_request.deferred.errback(err) |
314 | 318 | |
315 | preserve_fn(do_iterations)().addErrback(on_err) | |
319 | run_in_background(do_iterations).addErrback(on_err) | |
316 | 320 | |
317 | 321 | @defer.inlineCallbacks |
318 | 322 | def get_keys_from_store(self, server_name_and_key_ids): |
328 | 332 | """ |
329 | 333 | res = yield logcontext.make_deferred_yieldable(defer.gatherResults( |
330 | 334 | [ |
331 | preserve_fn(self.store.get_server_verify_keys)( | |
332 | server_name, key_ids | |
335 | run_in_background( | |
336 | self.store.get_server_verify_keys, | |
337 | server_name, key_ids, | |
333 | 338 | ).addCallback(lambda ks, server: (server, ks), server_name) |
334 | 339 | for server_name, key_ids in server_name_and_key_ids |
335 | 340 | ], |
357 | 362 | |
358 | 363 | results = yield logcontext.make_deferred_yieldable(defer.gatherResults( |
359 | 364 | [ |
360 | preserve_fn(get_key)(p_name, p_keys) | |
365 | run_in_background(get_key, p_name, p_keys) | |
361 | 366 | for p_name, p_keys in self.perspective_servers.items() |
362 | 367 | ], |
363 | 368 | consumeErrors=True, |
397 | 402 | |
398 | 403 | results = yield logcontext.make_deferred_yieldable(defer.gatherResults( |
399 | 404 | [ |
400 | preserve_fn(get_key)(server_name, key_ids) | |
405 | run_in_background(get_key, server_name, key_ids) | |
401 | 406 | for server_name, key_ids in server_name_and_key_ids |
402 | 407 | ], |
403 | 408 | consumeErrors=True, |
480 | 485 | |
481 | 486 | yield logcontext.make_deferred_yieldable(defer.gatherResults( |
482 | 487 | [ |
483 | preserve_fn(self.store_keys)( | |
488 | run_in_background( | |
489 | self.store_keys, | |
484 | 490 | server_name=server_name, |
485 | 491 | from_server=perspective_name, |
486 | 492 | verify_keys=response_keys, |
538 | 544 | |
539 | 545 | yield logcontext.make_deferred_yieldable(defer.gatherResults( |
540 | 546 | [ |
541 | preserve_fn(self.store_keys)( | |
547 | run_in_background( | |
548 | self.store_keys, | |
542 | 549 | server_name=key_server_name, |
543 | 550 | from_server=server_name, |
544 | 551 | verify_keys=verify_keys, |
614 | 621 | |
615 | 622 | yield logcontext.make_deferred_yieldable(defer.gatherResults( |
616 | 623 | [ |
617 | preserve_fn(self.store.store_server_keys_json)( | |
624 | run_in_background( | |
625 | self.store.store_server_keys_json, | |
618 | 626 | server_name=server_name, |
619 | 627 | key_id=key_id, |
620 | 628 | from_server=server_name, |
715 | 723 | # TODO(markjh): Store whether the keys have expired. |
716 | 724 | return logcontext.make_deferred_yieldable(defer.gatherResults( |
717 | 725 | [ |
718 | preserve_fn(self.store.store_server_verify_key)( | |
726 | run_in_background( | |
727 | self.store.store_server_verify_key, | |
719 | 728 | server_name, server_name, key.time_added, key |
720 | 729 | ) |
721 | 730 | for key_id, key in verify_keys.items() |
46 | 46 | |
47 | 47 | |
48 | 48 | def _event_dict_property(key): |
49 | # We want to be able to use hasattr with the event dict properties. | |
50 | # However, (on python3) hasattr expects AttributeError to be raised. Hence, | |
51 | # we need to transform the KeyError into an AttributeError | |
49 | 52 | def getter(self): |
50 | return self._event_dict[key] | |
53 | try: | |
54 | return self._event_dict[key] | |
55 | except KeyError: | |
56 | raise AttributeError(key) | |
51 | 57 | |
52 | 58 | def setter(self, v): |
53 | self._event_dict[key] = v | |
59 | try: | |
60 | self._event_dict[key] = v | |
61 | except KeyError: | |
62 | raise AttributeError(key) | |
54 | 63 | |
55 | 64 | def delete(self): |
56 | del self._event_dict[key] | |
65 | try: | |
66 | del self._event_dict[key] | |
67 | except KeyError: | |
68 | raise AttributeError(key) | |
57 | 69 | |
58 | 70 | return property( |
59 | 71 | getter, |
18 | 18 | import logging |
19 | 19 | import random |
20 | 20 | |
21 | from six.moves import range | |
22 | ||
21 | 23 | from twisted.internet import defer |
22 | 24 | |
23 | 25 | from synapse.api.constants import Membership |
32 | 34 | import synapse.metrics |
33 | 35 | from synapse.util import logcontext, unwrapFirstError |
34 | 36 | from synapse.util.caches.expiringcache import ExpiringCache |
35 | from synapse.util.logcontext import make_deferred_yieldable, preserve_fn | |
37 | from synapse.util.logcontext import make_deferred_yieldable, run_in_background | |
36 | 38 | from synapse.util.logutils import log_function |
37 | 39 | from synapse.util.retryutils import NotRetryingDestination |
38 | 40 | |
412 | 414 | |
413 | 415 | batch_size = 20 |
414 | 416 | missing_events = list(missing_events) |
415 | for i in xrange(0, len(missing_events), batch_size): | |
417 | for i in range(0, len(missing_events), batch_size): | |
416 | 418 | batch = set(missing_events[i:i + batch_size]) |
417 | 419 | |
418 | 420 | deferreds = [ |
419 | preserve_fn(self.get_pdu)( | |
421 | run_in_background( | |
422 | self.get_pdu, | |
420 | 423 | destinations=random_server_list(), |
421 | 424 | event_id=e_id, |
422 | 425 | ) |
322 | 322 | break |
323 | 323 | |
324 | 324 | yield self._process_presence_inner(states_map.values()) |
325 | except Exception: | |
326 | logger.exception("Error sending presence states to servers") | |
325 | 327 | finally: |
326 | 328 | self._processing_pending_presence = False |
327 | 329 |
24 | 24 | ) |
25 | 25 | from synapse.util.ratelimitutils import FederationRateLimiter |
26 | 26 | from synapse.util.versionstring import get_version_string |
27 | from synapse.util.logcontext import preserve_fn | |
27 | from synapse.util.logcontext import run_in_background | |
28 | 28 | from synapse.types import ThirdPartyInstanceID, get_domain_from_id |
29 | 29 | |
30 | 30 | import functools |
151 | 151 | # alive |
152 | 152 | retry_timings = yield self.store.get_destination_retry_timings(origin) |
153 | 153 | if retry_timings and retry_timings["retry_last_ts"]: |
154 | run_in_background(self._reset_retry_timings, origin) | |
155 | ||
156 | defer.returnValue(origin) | |
157 | ||
158 | @defer.inlineCallbacks | |
159 | def _reset_retry_timings(self, origin): | |
160 | try: | |
154 | 161 | logger.info("Marking origin %r as up", origin) |
155 | preserve_fn(self.store.set_destination_retry_timings)(origin, 0, 0) | |
156 | ||
157 | defer.returnValue(origin) | |
162 | yield self.store.set_destination_retry_timings(origin, 0, 0) | |
163 | except Exception: | |
164 | logger.exception("Error resetting retry timings on %s", origin) | |
158 | 165 | |
159 | 166 | |
160 | 167 | class BaseFederationServlet(object): |
73 | 73 | "previous_ids", |
74 | 74 | "pdus", |
75 | 75 | "edus", |
76 | "transaction_id", | |
77 | "destination", | |
78 | 76 | "pdu_failures", |
79 | 77 | ] |
80 | 78 |
41 | 41 | |
42 | 42 | from synapse.api.errors import SynapseError |
43 | 43 | from synapse.types import get_domain_from_id |
44 | from synapse.util.logcontext import preserve_fn | |
44 | from synapse.util.logcontext import run_in_background | |
45 | 45 | |
46 | 46 | from signedjson.sign import sign_json |
47 | 47 | |
164 | 164 | |
165 | 165 | @defer.inlineCallbacks |
166 | 166 | def _renew_attestation(group_id, user_id): |
167 | if not self.is_mine_id(group_id): | |
168 | destination = get_domain_from_id(group_id) | |
169 | elif not self.is_mine_id(user_id): | |
170 | destination = get_domain_from_id(user_id) | |
171 | else: | |
172 | logger.warn( | |
173 | "Incorrectly trying to do attestations for user: %r in %r", | |
174 | user_id, group_id, | |
167 | try: | |
168 | if not self.is_mine_id(group_id): | |
169 | destination = get_domain_from_id(group_id) | |
170 | elif not self.is_mine_id(user_id): | |
171 | destination = get_domain_from_id(user_id) | |
172 | else: | |
173 | logger.warn( | |
174 | "Incorrectly trying to do attestations for user: %r in %r", | |
175 | user_id, group_id, | |
176 | ) | |
177 | yield self.store.remove_attestation_renewal(group_id, user_id) | |
178 | return | |
179 | ||
180 | attestation = self.attestations.create_attestation(group_id, user_id) | |
181 | ||
182 | yield self.transport_client.renew_group_attestation( | |
183 | destination, group_id, user_id, | |
184 | content={"attestation": attestation}, | |
175 | 185 | ) |
176 | yield self.store.remove_attestation_renewal(group_id, user_id) | |
177 | return | |
178 | 186 | |
179 | attestation = self.attestations.create_attestation(group_id, user_id) | |
180 | ||
181 | yield self.transport_client.renew_group_attestation( | |
182 | destination, group_id, user_id, | |
183 | content={"attestation": attestation}, | |
184 | ) | |
185 | ||
186 | yield self.store.update_attestation_renewal( | |
187 | group_id, user_id, attestation | |
188 | ) | |
187 | yield self.store.update_attestation_renewal( | |
188 | group_id, user_id, attestation | |
189 | ) | |
190 | except Exception: | |
191 | logger.exception("Error renewing attestation of %r in %r", | |
192 | user_id, group_id) | |
189 | 193 | |
190 | 194 | for row in rows: |
191 | 195 | group_id = row["group_id"] |
192 | 196 | user_id = row["user_id"] |
193 | 197 | |
194 | preserve_fn(_renew_attestation)(group_id, user_id) | |
198 | run_in_background(_renew_attestation, group_id, user_id) |
18 | 18 | from synapse.api.constants import EventTypes |
19 | 19 | from synapse.util.metrics import Measure |
20 | 20 | from synapse.util.logcontext import ( |
21 | make_deferred_yieldable, preserve_fn, run_in_background, | |
21 | make_deferred_yieldable, run_in_background, | |
22 | 22 | ) |
23 | 23 | |
24 | 24 | import logging |
110 | 110 | |
111 | 111 | # Fork off pushes to these services |
112 | 112 | for service in services: |
113 | preserve_fn(self.scheduler.submit_event_for_as)( | |
114 | service, event | |
115 | ) | |
113 | self.scheduler.submit_event_for_as(service, event) | |
116 | 114 | |
117 | 115 | @defer.inlineCallbacks |
118 | 116 | def handle_room_events(events): |
197 | 195 | services = yield self._get_services_for_3pn(protocol) |
198 | 196 | |
199 | 197 | results = yield make_deferred_yieldable(defer.DeferredList([ |
200 | preserve_fn(self.appservice_api.query_3pe)(service, kind, protocol, fields) | |
198 | run_in_background( | |
199 | self.appservice_api.query_3pe, | |
200 | service, kind, protocol, fields, | |
201 | ) | |
201 | 202 | for service in services |
202 | 203 | ], consumeErrors=True)) |
203 | 204 | |
258 | 259 | event based on the service regex. |
259 | 260 | """ |
260 | 261 | services = self.store.get_app_services() |
261 | interested_list = [ | |
262 | s for s in services if ( | |
263 | yield s.is_interested(event, self.store) | |
264 | ) | |
265 | ] | |
262 | ||
263 | # we can't use a list comprehension here. Since python 3, list | |
264 | # comprehensions use a generator internally. This means you can't yield | |
265 | # inside of a list comprehension anymore. | |
266 | interested_list = [] | |
267 | for s in services: | |
268 | if (yield s.is_interested(event, self.store)): | |
269 | interested_list.append(s) | |
270 | ||
266 | 271 | defer.returnValue(interested_list) |
267 | 272 | |
268 | 273 | def _get_services_for_user(self, user_id): |
0 | 0 | # -*- coding: utf-8 -*- |
1 | # Copyright 2017 New Vector Ltd | |
1 | # Copyright 2017, 2018 New Vector Ltd | |
2 | 2 | # |
3 | 3 | # Licensed under the Apache License, Version 2.0 (the "License"); |
4 | 4 | # you may not use this file except in compliance with the License. |
11 | 11 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | 12 | # See the License for the specific language governing permissions and |
13 | 13 | # limitations under the License. |
14 | from twisted.internet import defer | |
14 | from twisted.internet import defer, reactor | |
15 | 15 | |
16 | 16 | from ._base import BaseHandler |
17 | from synapse.types import UserID, create_requester | |
18 | from synapse.util.logcontext import run_in_background | |
17 | 19 | |
18 | 20 | import logging |
19 | 21 | |
26 | 28 | super(DeactivateAccountHandler, self).__init__(hs) |
27 | 29 | self._auth_handler = hs.get_auth_handler() |
28 | 30 | self._device_handler = hs.get_device_handler() |
31 | self._room_member_handler = hs.get_room_member_handler() | |
32 | ||
33 | # Flag that indicates whether the process to part users from rooms is running | |
34 | self._user_parter_running = False | |
35 | ||
36 | # Start the user parter loop so it can resume parting users from rooms where | |
37 | # it left off (if it has work left to do). | |
38 | reactor.callWhenRunning(self._start_user_parting) | |
29 | 39 | |
30 | 40 | @defer.inlineCallbacks |
31 | 41 | def deactivate_account(self, user_id): |
49 | 59 | |
50 | 60 | yield self.store.user_delete_threepids(user_id) |
51 | 61 | yield self.store.user_set_password_hash(user_id, None) |
62 | ||
63 | # Add the user to a table of users penpding deactivation (ie. | |
64 | # removal from all the rooms they're a member of) | |
65 | yield self.store.add_user_pending_deactivation(user_id) | |
66 | ||
67 | # Now start the process that goes through that list and | |
68 | # parts users from rooms (if it isn't already running) | |
69 | self._start_user_parting() | |
70 | ||
71 | def _start_user_parting(self): | |
72 | """ | |
73 | Start the process that goes through the table of users | |
74 | pending deactivation, if it isn't already running. | |
75 | ||
76 | Returns: | |
77 | None | |
78 | """ | |
79 | if not self._user_parter_running: | |
80 | run_in_background(self._user_parter_loop) | |
81 | ||
82 | @defer.inlineCallbacks | |
83 | def _user_parter_loop(self): | |
84 | """Loop that parts deactivated users from rooms | |
85 | ||
86 | Returns: | |
87 | None | |
88 | """ | |
89 | self._user_parter_running = True | |
90 | logger.info("Starting user parter") | |
91 | try: | |
92 | while True: | |
93 | user_id = yield self.store.get_user_pending_deactivation() | |
94 | if user_id is None: | |
95 | break | |
96 | logger.info("User parter parting %r", user_id) | |
97 | yield self._part_user(user_id) | |
98 | yield self.store.del_user_pending_deactivation(user_id) | |
99 | logger.info("User parter finished parting %r", user_id) | |
100 | logger.info("User parter finished: stopping") | |
101 | finally: | |
102 | self._user_parter_running = False | |
103 | ||
104 | @defer.inlineCallbacks | |
105 | def _part_user(self, user_id): | |
106 | """Causes the given user_id to leave all the rooms they're joined to | |
107 | ||
108 | Returns: | |
109 | None | |
110 | """ | |
111 | user = UserID.from_string(user_id) | |
112 | ||
113 | rooms_for_user = yield self.store.get_rooms_for_user(user_id) | |
114 | for room_id in rooms_for_user: | |
115 | logger.info("User parter parting %r from %r", user_id, room_id) | |
116 | try: | |
117 | yield self._room_member_handler.update_membership( | |
118 | create_requester(user), | |
119 | user, | |
120 | room_id, | |
121 | "leave", | |
122 | ratelimit=False, | |
123 | ) | |
124 | except Exception: | |
125 | logger.exception( | |
126 | "Failed to part user %r from room %r: ignoring and continuing", | |
127 | user_id, room_id, | |
128 | ) |
23 | 23 | SynapseError, CodeMessageException, FederationDeniedError, |
24 | 24 | ) |
25 | 25 | from synapse.types import get_domain_from_id, UserID |
26 | from synapse.util.logcontext import preserve_fn, make_deferred_yieldable | |
26 | from synapse.util.logcontext import make_deferred_yieldable, run_in_background | |
27 | 27 | from synapse.util.retryutils import NotRetryingDestination |
28 | 28 | |
29 | 29 | logger = logging.getLogger(__name__) |
138 | 138 | failures[destination] = _exception_to_failure(e) |
139 | 139 | |
140 | 140 | yield make_deferred_yieldable(defer.gatherResults([ |
141 | preserve_fn(do_remote_query)(destination) | |
141 | run_in_background(do_remote_query, destination) | |
142 | 142 | for destination in remote_queries_not_in_cache |
143 | ])) | |
143 | ], consumeErrors=True)) | |
144 | 144 | |
145 | 145 | defer.returnValue({ |
146 | 146 | "device_keys": results, "failures": failures, |
241 | 241 | failures[destination] = _exception_to_failure(e) |
242 | 242 | |
243 | 243 | yield make_deferred_yieldable(defer.gatherResults([ |
244 | preserve_fn(claim_client_keys)(destination) | |
244 | run_in_background(claim_client_keys, destination) | |
245 | 245 | for destination in remote_queries |
246 | ])) | |
246 | ], consumeErrors=True)) | |
247 | 247 | |
248 | 248 | logger.info( |
249 | 249 | "Claimed one-time-keys: %s", |
15 | 15 | |
16 | 16 | """Contains handlers for federation events.""" |
17 | 17 | |
18 | import httplib | |
19 | 18 | import itertools |
20 | 19 | import logging |
20 | import sys | |
21 | 21 | |
22 | 22 | from signedjson.key import decode_verify_key_bytes |
23 | 23 | from signedjson.sign import verify_signed_json |
24 | import six | |
25 | from six.moves import http_client | |
24 | 26 | from twisted.internet import defer |
25 | 27 | from unpaddedbase64 import decode_base64 |
26 | 28 | |
636 | 638 | |
637 | 639 | results = yield logcontext.make_deferred_yieldable(defer.gatherResults( |
638 | 640 | [ |
639 | logcontext.preserve_fn(self.replication_layer.get_pdu)( | |
641 | logcontext.run_in_background( | |
642 | self.replication_layer.get_pdu, | |
640 | 643 | [dest], |
641 | 644 | event_id, |
642 | 645 | outlier=True, |
886 | 889 | logger.warn("Rejecting event %s which has %i prev_events", |
887 | 890 | ev.event_id, len(ev.prev_events)) |
888 | 891 | raise SynapseError( |
889 | httplib.BAD_REQUEST, | |
892 | http_client.BAD_REQUEST, | |
890 | 893 | "Too many prev_events", |
891 | 894 | ) |
892 | 895 | |
894 | 897 | logger.warn("Rejecting event %s which has %i auth_events", |
895 | 898 | ev.event_id, len(ev.auth_events)) |
896 | 899 | raise SynapseError( |
897 | httplib.BAD_REQUEST, | |
900 | http_client.BAD_REQUEST, | |
898 | 901 | "Too many auth_events", |
899 | 902 | ) |
900 | 903 | |
1022 | 1025 | # lots of requests for missing prev_events which we do actually |
1023 | 1026 | # have. Hence we fire off the deferred, but don't wait for it. |
1024 | 1027 | |
1025 | logcontext.preserve_fn(self._handle_queued_pdus)(room_queue) | |
1028 | logcontext.run_in_background(self._handle_queued_pdus, room_queue) | |
1026 | 1029 | |
1027 | 1030 | defer.returnValue(True) |
1028 | 1031 | |
1512 | 1515 | backfilled=backfilled, |
1513 | 1516 | ) |
1514 | 1517 | except: # noqa: E722, as we reraise the exception this is fine. |
1515 | # Ensure that we actually remove the entries in the push actions | |
1516 | # staging area | |
1517 | logcontext.preserve_fn( | |
1518 | self.store.remove_push_actions_from_staging | |
1519 | )(event.event_id) | |
1520 | raise | |
1518 | tp, value, tb = sys.exc_info() | |
1519 | ||
1520 | logcontext.run_in_background( | |
1521 | self.store.remove_push_actions_from_staging, | |
1522 | event.event_id, | |
1523 | ) | |
1524 | ||
1525 | six.reraise(tp, value, tb) | |
1521 | 1526 | |
1522 | 1527 | if not backfilled: |
1523 | 1528 | # this intentionally does not yield: we don't care about the result |
1524 | 1529 | # and don't need to wait for it. |
1525 | logcontext.preserve_fn(self.pusher_pool.on_new_notifications)( | |
1526 | event_stream_id, max_stream_id | |
1530 | logcontext.run_in_background( | |
1531 | self.pusher_pool.on_new_notifications, | |
1532 | event_stream_id, max_stream_id, | |
1527 | 1533 | ) |
1528 | 1534 | |
1529 | 1535 | defer.returnValue((context, event_stream_id, max_stream_id)) |
1537 | 1543 | """ |
1538 | 1544 | contexts = yield logcontext.make_deferred_yieldable(defer.gatherResults( |
1539 | 1545 | [ |
1540 | logcontext.preserve_fn(self._prep_event)( | |
1546 | logcontext.run_in_background( | |
1547 | self._prep_event, | |
1541 | 1548 | origin, |
1542 | 1549 | ev_info["event"], |
1543 | 1550 | state=ev_info.get("state"), |
1866 | 1873 | |
1867 | 1874 | different_events = yield logcontext.make_deferred_yieldable( |
1868 | 1875 | defer.gatherResults([ |
1869 | logcontext.preserve_fn(self.store.get_event)( | |
1876 | logcontext.run_in_background( | |
1877 | self.store.get_event, | |
1870 | 1878 | d, |
1871 | 1879 | allow_none=True, |
1872 | 1880 | allow_rejected=False, |
26 | 26 | from synapse.util import unwrapFirstError |
27 | 27 | from synapse.util.async import concurrently_execute |
28 | 28 | from synapse.util.caches.snapshot_cache import SnapshotCache |
29 | from synapse.util.logcontext import make_deferred_yieldable, preserve_fn | |
29 | from synapse.util.logcontext import make_deferred_yieldable, run_in_background | |
30 | 30 | from synapse.visibility import filter_events_for_client |
31 | 31 | |
32 | 32 | from ._base import BaseHandler |
165 | 165 | (messages, token), current_state = yield make_deferred_yieldable( |
166 | 166 | defer.gatherResults( |
167 | 167 | [ |
168 | preserve_fn(self.store.get_recent_events_for_room)( | |
168 | run_in_background( | |
169 | self.store.get_recent_events_for_room, | |
169 | 170 | event.room_id, |
170 | 171 | limit=limit, |
171 | 172 | end_token=room_end_token, |
179 | 180 | self.store, user_id, messages |
180 | 181 | ) |
181 | 182 | |
182 | start_token = now_token.copy_and_replace("room_key", token[0]) | |
183 | end_token = now_token.copy_and_replace("room_key", token[1]) | |
183 | start_token = now_token.copy_and_replace("room_key", token) | |
184 | end_token = now_token.copy_and_replace("room_key", room_end_token) | |
184 | 185 | time_now = self.clock.time_msec() |
185 | 186 | |
186 | 187 | d["messages"] = { |
323 | 324 | self.store, user_id, messages, is_peeking=is_peeking |
324 | 325 | ) |
325 | 326 | |
326 | start_token = StreamToken.START.copy_and_replace("room_key", token[0]) | |
327 | end_token = StreamToken.START.copy_and_replace("room_key", token[1]) | |
327 | start_token = StreamToken.START.copy_and_replace("room_key", token) | |
328 | end_token = StreamToken.START.copy_and_replace("room_key", stream_token) | |
328 | 329 | |
329 | 330 | time_now = self.clock.time_msec() |
330 | 331 | |
390 | 391 | |
391 | 392 | presence, receipts, (messages, token) = yield defer.gatherResults( |
392 | 393 | [ |
393 | preserve_fn(get_presence)(), | |
394 | preserve_fn(get_receipts)(), | |
395 | preserve_fn(self.store.get_recent_events_for_room)( | |
394 | run_in_background(get_presence), | |
395 | run_in_background(get_receipts), | |
396 | run_in_background( | |
397 | self.store.get_recent_events_for_room, | |
396 | 398 | room_id, |
397 | 399 | limit=limit, |
398 | 400 | end_token=now_token.room_key, |
405 | 407 | self.store, user_id, messages, is_peeking=is_peeking, |
406 | 408 | ) |
407 | 409 | |
408 | start_token = now_token.copy_and_replace("room_key", token[0]) | |
409 | end_token = now_token.copy_and_replace("room_key", token[1]) | |
410 | start_token = now_token.copy_and_replace("room_key", token) | |
411 | end_token = now_token | |
410 | 412 | |
411 | 413 | time_now = self.clock.time_msec() |
412 | 414 |
12 | 12 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | 13 | # See the License for the specific language governing permissions and |
14 | 14 | # limitations under the License. |
15 | import logging | |
16 | import simplejson | |
17 | import sys | |
18 | ||
19 | from canonicaljson import encode_canonical_json | |
20 | import six | |
15 | 21 | from twisted.internet import defer, reactor |
16 | 22 | from twisted.python.failure import Failure |
17 | 23 | |
24 | 30 | UserID, RoomAlias, RoomStreamToken, |
25 | 31 | ) |
26 | 32 | from synapse.util.async import run_on_reactor, ReadWriteLock, Limiter |
27 | from synapse.util.logcontext import preserve_fn, run_in_background | |
33 | from synapse.util.logcontext import run_in_background | |
28 | 34 | from synapse.util.metrics import measure_func |
29 | 35 | from synapse.util.frozenutils import frozendict_json_encoder |
30 | 36 | from synapse.util.stringutils import random_string |
32 | 38 | from synapse.replication.http.send_event import send_event_to_master |
33 | 39 | |
34 | 40 | from ._base import BaseHandler |
35 | ||
36 | from canonicaljson import encode_canonical_json | |
37 | ||
38 | import logging | |
39 | import simplejson | |
40 | 41 | |
41 | 42 | logger = logging.getLogger(__name__) |
42 | 43 | |
732 | 733 | except: # noqa: E722, as we reraise the exception this is fine. |
733 | 734 | # Ensure that we actually remove the entries in the push actions |
734 | 735 | # staging area, if we calculated them. |
735 | preserve_fn(self.store.remove_push_actions_from_staging)(event.event_id) | |
736 | raise | |
736 | tp, value, tb = sys.exc_info() | |
737 | ||
738 | run_in_background( | |
739 | self.store.remove_push_actions_from_staging, | |
740 | event.event_id, | |
741 | ) | |
742 | ||
743 | six.reraise(tp, value, tb) | |
737 | 744 | |
738 | 745 | @defer.inlineCallbacks |
739 | 746 | def persist_and_notify_client_event( |
853 | 860 | |
854 | 861 | # this intentionally does not yield: we don't care about the result |
855 | 862 | # and don't need to wait for it. |
856 | preserve_fn(self.pusher_pool.on_new_notifications)( | |
863 | run_in_background( | |
864 | self.pusher_pool.on_new_notifications, | |
857 | 865 | event_stream_id, max_stream_id |
858 | 866 | ) |
859 | 867 | |
860 | 868 | @defer.inlineCallbacks |
861 | 869 | def _notify(): |
862 | 870 | yield run_on_reactor() |
863 | self.notifier.on_new_room_event( | |
864 | event, event_stream_id, max_stream_id, | |
865 | extra_users=extra_users | |
866 | ) | |
867 | ||
868 | preserve_fn(_notify)() | |
871 | try: | |
872 | self.notifier.on_new_room_event( | |
873 | event, event_stream_id, max_stream_id, | |
874 | extra_users=extra_users | |
875 | ) | |
876 | except Exception: | |
877 | logger.exception("Error notifying about new room event") | |
878 | ||
879 | run_in_background(_notify) | |
869 | 880 | |
870 | 881 | if event.type == EventTypes.Message: |
871 | presence = self.hs.get_presence_handler() | |
872 | 882 | # We don't want to block sending messages on any presence code. This |
873 | 883 | # matters as sometimes presence code can take a while. |
874 | preserve_fn(presence.bump_presence_active_time)(requester.user) | |
884 | run_in_background(self._bump_active_time, requester.user) | |
885 | ||
886 | @defer.inlineCallbacks | |
887 | def _bump_active_time(self, user): | |
888 | try: | |
889 | presence = self.hs.get_presence_handler() | |
890 | yield presence.bump_presence_active_time(user) | |
891 | except Exception: | |
892 | logger.exception("Error bumping presence active time") |
30 | 30 | |
31 | 31 | from synapse.util.caches.descriptors import cachedInlineCallbacks |
32 | 32 | from synapse.util.async import Linearizer |
33 | from synapse.util.logcontext import preserve_fn | |
33 | from synapse.util.logcontext import run_in_background | |
34 | 34 | from synapse.util.logutils import log_function |
35 | 35 | from synapse.util.metrics import Measure |
36 | 36 | from synapse.util.wheel_timer import WheelTimer |
254 | 254 | logger.info("Finished _persist_unpersisted_changes") |
255 | 255 | |
256 | 256 | @defer.inlineCallbacks |
257 | def _update_states_and_catch_exception(self, new_states): | |
258 | try: | |
259 | res = yield self._update_states(new_states) | |
260 | defer.returnValue(res) | |
261 | except Exception: | |
262 | logger.exception("Error updating presence") | |
263 | ||
264 | @defer.inlineCallbacks | |
257 | 265 | def _update_states(self, new_states): |
258 | 266 | """Updates presence of users. Sets the appropriate timeouts. Pokes |
259 | 267 | the notifier and federation if and only if the changed presence state |
363 | 371 | now=now, |
364 | 372 | ) |
365 | 373 | |
366 | preserve_fn(self._update_states)(changes) | |
374 | run_in_background(self._update_states_and_catch_exception, changes) | |
367 | 375 | except Exception: |
368 | 376 | logger.exception("Exception in _handle_timeouts loop") |
369 | 377 | |
421 | 429 | |
422 | 430 | @defer.inlineCallbacks |
423 | 431 | def _end(): |
424 | if affect_presence: | |
432 | try: | |
425 | 433 | self.user_to_num_current_syncs[user_id] -= 1 |
426 | 434 | |
427 | 435 | prev_state = yield self.current_state_for_user(user_id) |
428 | 436 | yield self._update_states([prev_state.copy_and_replace( |
429 | 437 | last_user_sync_ts=self.clock.time_msec(), |
430 | 438 | )]) |
439 | except Exception: | |
440 | logger.exception("Error updating presence after sync") | |
431 | 441 | |
432 | 442 | @contextmanager |
433 | 443 | def _user_syncing(): |
434 | 444 | try: |
435 | 445 | yield |
436 | 446 | finally: |
437 | preserve_fn(_end)() | |
447 | if affect_presence: | |
448 | run_in_background(_end) | |
438 | 449 | |
439 | 450 | defer.returnValue(_user_syncing()) |
440 | 451 |
134 | 134 | """Given a list of receipts, works out which remote servers should be |
135 | 135 | poked and pokes them. |
136 | 136 | """ |
137 | # TODO: Some of this stuff should be coallesced. | |
138 | for receipt in receipts: | |
139 | room_id = receipt["room_id"] | |
140 | receipt_type = receipt["receipt_type"] | |
141 | user_id = receipt["user_id"] | |
142 | event_ids = receipt["event_ids"] | |
143 | data = receipt["data"] | |
144 | ||
145 | users = yield self.state.get_current_user_in_room(room_id) | |
146 | remotedomains = set(get_domain_from_id(u) for u in users) | |
147 | remotedomains = remotedomains.copy() | |
148 | remotedomains.discard(self.server_name) | |
149 | ||
150 | logger.debug("Sending receipt to: %r", remotedomains) | |
151 | ||
152 | for domain in remotedomains: | |
153 | self.federation.send_edu( | |
154 | destination=domain, | |
155 | edu_type="m.receipt", | |
156 | content={ | |
157 | room_id: { | |
158 | receipt_type: { | |
159 | user_id: { | |
160 | "event_ids": event_ids, | |
161 | "data": data, | |
137 | try: | |
138 | # TODO: Some of this stuff should be coallesced. | |
139 | for receipt in receipts: | |
140 | room_id = receipt["room_id"] | |
141 | receipt_type = receipt["receipt_type"] | |
142 | user_id = receipt["user_id"] | |
143 | event_ids = receipt["event_ids"] | |
144 | data = receipt["data"] | |
145 | ||
146 | users = yield self.state.get_current_user_in_room(room_id) | |
147 | remotedomains = set(get_domain_from_id(u) for u in users) | |
148 | remotedomains = remotedomains.copy() | |
149 | remotedomains.discard(self.server_name) | |
150 | ||
151 | logger.debug("Sending receipt to: %r", remotedomains) | |
152 | ||
153 | for domain in remotedomains: | |
154 | self.federation.send_edu( | |
155 | destination=domain, | |
156 | edu_type="m.receipt", | |
157 | content={ | |
158 | room_id: { | |
159 | receipt_type: { | |
160 | user_id: { | |
161 | "event_ids": event_ids, | |
162 | "data": data, | |
163 | } | |
162 | 164 | } |
163 | } | |
165 | }, | |
164 | 166 | }, |
165 | }, | |
166 | key=(room_id, receipt_type, user_id), | |
167 | ) | |
167 | key=(room_id, receipt_type, user_id), | |
168 | ) | |
169 | except Exception: | |
170 | logger.exception("Error pushing receipts to remote servers") | |
168 | 171 | |
169 | 172 | @defer.inlineCallbacks |
170 | 173 | def get_receipts_for_room(self, room_id, to_key): |
13 | 13 | # limitations under the License. |
14 | 14 | |
15 | 15 | from twisted.internet import defer |
16 | ||
17 | from six.moves import range | |
16 | 18 | |
17 | 19 | from ._base import BaseHandler |
18 | 20 | |
199 | 201 | step = len(rooms_to_scan) if len(rooms_to_scan) != 0 else 1 |
200 | 202 | |
201 | 203 | chunk = [] |
202 | for i in xrange(0, len(rooms_to_scan), step): | |
204 | for i in range(0, len(rooms_to_scan), step): | |
203 | 205 | batch = rooms_to_scan[i:i + step] |
204 | 206 | logger.info("Processing %i rooms for result", len(batch)) |
205 | 207 | yield concurrently_execute( |
353 | 353 | since_key = since_token.room_key |
354 | 354 | |
355 | 355 | while limited and len(recents) < timeline_limit and max_repeat: |
356 | events, end_key = yield self.store.get_room_events_stream_for_room( | |
357 | room_id, | |
358 | limit=load_limit + 1, | |
359 | from_key=since_key, | |
360 | to_key=end_key, | |
361 | ) | |
356 | # If we have a since_key then we are trying to get any events | |
357 | # that have happened since `since_key` up to `end_key`, so we | |
358 | # can just use `get_room_events_stream_for_room`. | |
359 | # Otherwise, we want to return the last N events in the room | |
360 | # in toplogical ordering. | |
361 | if since_key: | |
362 | events, end_key = yield self.store.get_room_events_stream_for_room( | |
363 | room_id, | |
364 | limit=load_limit + 1, | |
365 | from_key=since_key, | |
366 | to_key=end_key, | |
367 | ) | |
368 | else: | |
369 | events, end_key = yield self.store.get_recent_events_for_room( | |
370 | room_id, | |
371 | limit=load_limit + 1, | |
372 | end_token=end_key, | |
373 | ) | |
362 | 374 | loaded_recents = sync_config.filter_collection.filter_room_timeline( |
363 | 375 | events |
364 | 376 | ) |
428 | 440 | Returns: |
429 | 441 | A Deferred map from ((type, state_key)->Event) |
430 | 442 | """ |
431 | last_events, token = yield self.store.get_recent_events_for_room( | |
443 | last_events, _ = yield self.store.get_recent_events_for_room( | |
432 | 444 | room_id, end_token=stream_position.room_key, limit=1, |
433 | 445 | ) |
434 | 446 |
15 | 15 | from twisted.internet import defer |
16 | 16 | |
17 | 17 | from synapse.api.errors import SynapseError, AuthError |
18 | from synapse.util.logcontext import preserve_fn | |
18 | from synapse.util.logcontext import run_in_background | |
19 | 19 | from synapse.util.metrics import Measure |
20 | 20 | from synapse.util.wheel_timer import WheelTimer |
21 | 21 | from synapse.types import UserID, get_domain_from_id |
96 | 96 | if self.hs.is_mine_id(member.user_id): |
97 | 97 | last_fed_poke = self._member_last_federation_poke.get(member, None) |
98 | 98 | if not last_fed_poke or last_fed_poke + FEDERATION_PING_INTERVAL <= now: |
99 | preserve_fn(self._push_remote)( | |
99 | run_in_background( | |
100 | self._push_remote, | |
100 | 101 | member=member, |
101 | 102 | typing=True |
102 | 103 | ) |
195 | 196 | def _push_update(self, member, typing): |
196 | 197 | if self.hs.is_mine_id(member.user_id): |
197 | 198 | # Only send updates for changes to our own users. |
198 | preserve_fn(self._push_remote)(member, typing) | |
199 | run_in_background(self._push_remote, member, typing) | |
199 | 200 | |
200 | 201 | self._push_update_local( |
201 | 202 | member=member, |
204 | 205 | |
205 | 206 | @defer.inlineCallbacks |
206 | 207 | def _push_remote(self, member, typing): |
207 | users = yield self.state.get_current_user_in_room(member.room_id) | |
208 | self._member_last_federation_poke[member] = self.clock.time_msec() | |
209 | ||
210 | now = self.clock.time_msec() | |
211 | self.wheel_timer.insert( | |
212 | now=now, | |
213 | obj=member, | |
214 | then=now + FEDERATION_PING_INTERVAL, | |
215 | ) | |
216 | ||
217 | for domain in set(get_domain_from_id(u) for u in users): | |
218 | if domain != self.server_name: | |
219 | self.federation.send_edu( | |
220 | destination=domain, | |
221 | edu_type="m.typing", | |
222 | content={ | |
223 | "room_id": member.room_id, | |
224 | "user_id": member.user_id, | |
225 | "typing": typing, | |
226 | }, | |
227 | key=member, | |
228 | ) | |
208 | try: | |
209 | users = yield self.state.get_current_user_in_room(member.room_id) | |
210 | self._member_last_federation_poke[member] = self.clock.time_msec() | |
211 | ||
212 | now = self.clock.time_msec() | |
213 | self.wheel_timer.insert( | |
214 | now=now, | |
215 | obj=member, | |
216 | then=now + FEDERATION_PING_INTERVAL, | |
217 | ) | |
218 | ||
219 | for domain in set(get_domain_from_id(u) for u in users): | |
220 | if domain != self.server_name: | |
221 | self.federation.send_edu( | |
222 | destination=domain, | |
223 | edu_type="m.typing", | |
224 | content={ | |
225 | "room_id": member.room_id, | |
226 | "user_id": member.user_id, | |
227 | "typing": typing, | |
228 | }, | |
229 | key=member, | |
230 | ) | |
231 | except Exception: | |
232 | logger.exception("Error pushing typing notif to remotes") | |
229 | 233 | |
230 | 234 | @defer.inlineCallbacks |
231 | 235 | def _recv_edu(self, origin, content): |
0 | 0 | # -*- coding: utf-8 -*- |
1 | 1 | # Copyright 2014-2016 OpenMarket Ltd |
2 | # Copyright 2018 New Vector Ltd | |
2 | 3 | # |
3 | 4 | # Licensed under the Apache License, Version 2.0 (the "License"); |
4 | 5 | # you may not use this file except in compliance with the License. |
11 | 12 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | 13 | # See the License for the specific language governing permissions and |
13 | 14 | # limitations under the License. |
15 | from twisted.internet.defer import CancelledError | |
16 | from twisted.python import failure | |
17 | ||
18 | from synapse.api.errors import SynapseError | |
19 | ||
20 | ||
21 | class RequestTimedOutError(SynapseError): | |
22 | """Exception representing timeout of an outbound request""" | |
23 | def __init__(self): | |
24 | super(RequestTimedOutError, self).__init__(504, "Timed out") | |
25 | ||
26 | ||
27 | def cancelled_to_request_timed_out_error(value, timeout): | |
28 | """Turns CancelledErrors into RequestTimedOutErrors. | |
29 | ||
30 | For use with async.add_timeout_to_deferred | |
31 | """ | |
32 | if isinstance(value, failure.Failure): | |
33 | value.trap(CancelledError) | |
34 | raise RequestTimedOutError() | |
35 | return value |
12 | 12 | # See the License for the specific language governing permissions and |
13 | 13 | # limitations under the License. |
14 | 14 | |
15 | from synapse.http.server import wrap_request_handler | |
15 | from synapse.http.server import wrap_json_request_handler | |
16 | 16 | from twisted.web.resource import Resource |
17 | 17 | from twisted.web.server import NOT_DONE_YET |
18 | 18 | |
41 | 41 | Resource.__init__(self) |
42 | 42 | self._handler = handler |
43 | 43 | |
44 | # these are required by the request_handler wrapper | |
45 | self.version_string = hs.version_string | |
44 | # required by the request_handler wrapper | |
46 | 45 | self.clock = hs.get_clock() |
47 | 46 | |
48 | 47 | def render(self, request): |
49 | 48 | self._async_render(request) |
50 | 49 | return NOT_DONE_YET |
51 | 50 | |
52 | @wrap_request_handler | |
51 | @wrap_json_request_handler | |
53 | 52 | def _async_render(self, request): |
54 | 53 | return self._handler(request) |
0 | 0 | # -*- coding: utf-8 -*- |
1 | 1 | # Copyright 2014-2016 OpenMarket Ltd |
2 | # Copyright 2018 New Vector Ltd | |
2 | 3 | # |
3 | 4 | # Licensed under the Apache License, Version 2.0 (the "License"); |
4 | 5 | # you may not use this file except in compliance with the License. |
17 | 18 | from synapse.api.errors import ( |
18 | 19 | CodeMessageException, MatrixCodeMessageException, SynapseError, Codes, |
19 | 20 | ) |
21 | from synapse.http import cancelled_to_request_timed_out_error | |
22 | from synapse.util.async import add_timeout_to_deferred | |
20 | 23 | from synapse.util.caches import CACHE_SIZE_FACTOR |
21 | 24 | from synapse.util.logcontext import make_deferred_yieldable |
22 | from synapse.util import logcontext | |
23 | 25 | import synapse.metrics |
24 | 26 | from synapse.http.endpoint import SpiderEndpoint |
25 | 27 | |
37 | 39 | from twisted.web.http_headers import Headers |
38 | 40 | from twisted.web._newclient import ResponseDone |
39 | 41 | |
40 | from StringIO import StringIO | |
42 | from six import StringIO | |
41 | 43 | |
42 | 44 | import simplejson as json |
43 | 45 | import logging |
94 | 96 | # counters to it |
95 | 97 | outgoing_requests_counter.inc(method) |
96 | 98 | |
97 | def send_request(): | |
99 | logger.info("Sending request %s %s", method, uri) | |
100 | ||
101 | try: | |
98 | 102 | request_deferred = self.agent.request( |
99 | 103 | method, uri, *args, **kwargs |
100 | 104 | ) |
101 | ||
102 | return self.clock.time_bound_deferred( | |
105 | add_timeout_to_deferred( | |
103 | 106 | request_deferred, |
104 | time_out=60, | |
105 | ) | |
106 | ||
107 | logger.info("Sending request %s %s", method, uri) | |
108 | ||
109 | try: | |
110 | with logcontext.PreserveLoggingContext(): | |
111 | response = yield send_request() | |
107 | 60, cancelled_to_request_timed_out_error, | |
108 | ) | |
109 | response = yield make_deferred_yieldable(request_deferred) | |
112 | 110 | |
113 | 111 | incoming_responses_counter.inc(method, response.code) |
114 | 112 | logger.info( |
508 | 506 | reactor, |
509 | 507 | SpiderEndpointFactory(hs) |
510 | 508 | ) |
511 | ), [('gzip', GzipDecoder)] | |
509 | ), [(b'gzip', GzipDecoder)] | |
512 | 510 | ) |
513 | 511 | # We could look like Chrome: |
514 | 512 | # self.user_agent = ("Mozilla/5.0 (%s) (KHTML, like Gecko) |
114 | 114 | if time.time() - self.last_request >= 2.5 * 60: |
115 | 115 | self.abort() |
116 | 116 | # Abort the underlying TLS connection. The abort() method calls |
117 | # loseConnection() on the underlying TLS connection which tries to | |
117 | # loseConnection() on the TLS connection which tries to | |
118 | 118 | # shutdown the connection cleanly. We call abortConnection() |
119 | # since that will promptly close the underlying TCP connection. | |
120 | self.transport.abortConnection() | |
119 | # since that will promptly close the TLS connection. | |
120 | # | |
121 | # In Twisted >18.4; the TLS connection will be None if it has closed | |
122 | # which will make abortConnection() throw. Check that the TLS connection | |
123 | # is not None before trying to close it. | |
124 | if self.transport.getHandle() is not None: | |
125 | self.transport.abortConnection() | |
121 | 126 | |
122 | 127 | def request(self, request): |
123 | 128 | self.last_request = time.time() |
285 | 290 | if (len(answers) == 1 |
286 | 291 | and answers[0].type == dns.SRV |
287 | 292 | and answers[0].payload |
288 | and answers[0].payload.target == dns.Name('.')): | |
293 | and answers[0].payload.target == dns.Name(b'.')): | |
289 | 294 | raise ConnectError("Service %s unavailable" % service_name) |
290 | 295 | |
291 | 296 | for answer in answers: |
0 | 0 | # -*- coding: utf-8 -*- |
1 | 1 | # Copyright 2014-2016 OpenMarket Ltd |
2 | # Copyright 2018 New Vector Ltd | |
2 | 3 | # |
3 | 4 | # Licensed under the Apache License, Version 2.0 (the "License"); |
4 | 5 | # you may not use this file except in compliance with the License. |
11 | 12 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | 13 | # See the License for the specific language governing permissions and |
13 | 14 | # limitations under the License. |
14 | import synapse.util.retryutils | |
15 | 15 | from twisted.internet import defer, reactor, protocol |
16 | 16 | from twisted.internet.error import DNSLookupError |
17 | 17 | from twisted.web.client import readBody, HTTPConnectionPool, Agent |
18 | 18 | from twisted.web.http_headers import Headers |
19 | 19 | from twisted.web._newclient import ResponseDone |
20 | 20 | |
21 | from synapse.http import cancelled_to_request_timed_out_error | |
21 | 22 | from synapse.http.endpoint import matrix_federation_endpoint |
22 | from synapse.util.async import sleep | |
23 | import synapse.metrics | |
24 | from synapse.util.async import sleep, add_timeout_to_deferred | |
23 | 25 | from synapse.util import logcontext |
24 | import synapse.metrics | |
26 | from synapse.util.logcontext import make_deferred_yieldable | |
27 | import synapse.util.retryutils | |
25 | 28 | |
26 | 29 | from canonicaljson import encode_canonical_json |
27 | 30 | |
37 | 40 | import random |
38 | 41 | import sys |
39 | 42 | import urllib |
40 | import urlparse | |
41 | ||
43 | from six.moves.urllib import parse as urlparse | |
42 | 44 | |
43 | 45 | logger = logging.getLogger(__name__) |
44 | 46 | outbound_logger = logging.getLogger("synapse.http.outbound") |
183 | 185 | producer = body_callback(method, http_url_bytes, headers_dict) |
184 | 186 | |
185 | 187 | try: |
186 | def send_request(): | |
187 | request_deferred = self.agent.request( | |
188 | method, | |
189 | url_bytes, | |
190 | Headers(headers_dict), | |
191 | producer | |
192 | ) | |
193 | ||
194 | return self.clock.time_bound_deferred( | |
195 | request_deferred, | |
196 | time_out=timeout / 1000. if timeout else 60, | |
197 | ) | |
198 | ||
199 | with logcontext.PreserveLoggingContext(): | |
200 | response = yield send_request() | |
188 | request_deferred = self.agent.request( | |
189 | method, | |
190 | url_bytes, | |
191 | Headers(headers_dict), | |
192 | producer | |
193 | ) | |
194 | add_timeout_to_deferred( | |
195 | request_deferred, | |
196 | timeout / 1000. if timeout else 60, | |
197 | cancelled_to_request_timed_out_error, | |
198 | ) | |
199 | response = yield make_deferred_yieldable( | |
200 | request_deferred, | |
201 | ) | |
201 | 202 | |
202 | 203 | log_result = "%d %s" % (response.code, response.phrase,) |
203 | 204 | break |
0 | # -*- coding: utf-8 -*- | |
1 | # Copyright 2014-2016 OpenMarket Ltd | |
2 | # Copyright 2018 New Vector Ltd | |
3 | # | |
4 | # Licensed under the Apache License, Version 2.0 (the "License"); | |
5 | # you may not use this file except in compliance with the License. | |
6 | # You may obtain a copy of the License at | |
7 | # | |
8 | # http://www.apache.org/licenses/LICENSE-2.0 | |
9 | # | |
10 | # Unless required by applicable law or agreed to in writing, software | |
11 | # distributed under the License is distributed on an "AS IS" BASIS, | |
12 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
13 | # See the License for the specific language governing permissions and | |
14 | # limitations under the License. | |
15 | ||
16 | import logging | |
17 | ||
18 | import synapse.metrics | |
19 | from synapse.util.logcontext import LoggingContext | |
20 | ||
21 | logger = logging.getLogger(__name__) | |
22 | ||
23 | metrics = synapse.metrics.get_metrics_for("synapse.http.server") | |
24 | ||
25 | # total number of responses served, split by method/servlet/tag | |
26 | response_count = metrics.register_counter( | |
27 | "response_count", | |
28 | labels=["method", "servlet", "tag"], | |
29 | alternative_names=( | |
30 | # the following are all deprecated aliases for the same metric | |
31 | metrics.name_prefix + x for x in ( | |
32 | "_requests", | |
33 | "_response_time:count", | |
34 | "_response_ru_utime:count", | |
35 | "_response_ru_stime:count", | |
36 | "_response_db_txn_count:count", | |
37 | "_response_db_txn_duration:count", | |
38 | ) | |
39 | ) | |
40 | ) | |
41 | ||
42 | requests_counter = metrics.register_counter( | |
43 | "requests_received", | |
44 | labels=["method", "servlet", ], | |
45 | ) | |
46 | ||
47 | outgoing_responses_counter = metrics.register_counter( | |
48 | "responses", | |
49 | labels=["method", "code"], | |
50 | ) | |
51 | ||
52 | response_timer = metrics.register_counter( | |
53 | "response_time_seconds", | |
54 | labels=["method", "servlet", "tag"], | |
55 | alternative_names=( | |
56 | metrics.name_prefix + "_response_time:total", | |
57 | ), | |
58 | ) | |
59 | ||
60 | response_ru_utime = metrics.register_counter( | |
61 | "response_ru_utime_seconds", labels=["method", "servlet", "tag"], | |
62 | alternative_names=( | |
63 | metrics.name_prefix + "_response_ru_utime:total", | |
64 | ), | |
65 | ) | |
66 | ||
67 | response_ru_stime = metrics.register_counter( | |
68 | "response_ru_stime_seconds", labels=["method", "servlet", "tag"], | |
69 | alternative_names=( | |
70 | metrics.name_prefix + "_response_ru_stime:total", | |
71 | ), | |
72 | ) | |
73 | ||
74 | response_db_txn_count = metrics.register_counter( | |
75 | "response_db_txn_count", labels=["method", "servlet", "tag"], | |
76 | alternative_names=( | |
77 | metrics.name_prefix + "_response_db_txn_count:total", | |
78 | ), | |
79 | ) | |
80 | ||
81 | # seconds spent waiting for db txns, excluding scheduling time, when processing | |
82 | # this request | |
83 | response_db_txn_duration = metrics.register_counter( | |
84 | "response_db_txn_duration_seconds", labels=["method", "servlet", "tag"], | |
85 | alternative_names=( | |
86 | metrics.name_prefix + "_response_db_txn_duration:total", | |
87 | ), | |
88 | ) | |
89 | ||
90 | # seconds spent waiting for a db connection, when processing this request | |
91 | response_db_sched_duration = metrics.register_counter( | |
92 | "response_db_sched_duration_seconds", labels=["method", "servlet", "tag"] | |
93 | ) | |
94 | ||
95 | # size in bytes of the response written | |
96 | response_size = metrics.register_counter( | |
97 | "response_size", labels=["method", "servlet", "tag"] | |
98 | ) | |
99 | ||
100 | ||
101 | class RequestMetrics(object): | |
102 | def start(self, time_msec, name): | |
103 | self.start = time_msec | |
104 | self.start_context = LoggingContext.current_context() | |
105 | self.name = name | |
106 | ||
107 | def stop(self, time_msec, request): | |
108 | context = LoggingContext.current_context() | |
109 | ||
110 | tag = "" | |
111 | if context: | |
112 | tag = context.tag | |
113 | ||
114 | if context != self.start_context: | |
115 | logger.warn( | |
116 | "Context have unexpectedly changed %r, %r", | |
117 | context, self.start_context | |
118 | ) | |
119 | return | |
120 | ||
121 | outgoing_responses_counter.inc(request.method, str(request.code)) | |
122 | ||
123 | response_count.inc(request.method, self.name, tag) | |
124 | ||
125 | response_timer.inc_by( | |
126 | time_msec - self.start, request.method, | |
127 | self.name, tag | |
128 | ) | |
129 | ||
130 | ru_utime, ru_stime = context.get_resource_usage() | |
131 | ||
132 | response_ru_utime.inc_by( | |
133 | ru_utime, request.method, self.name, tag | |
134 | ) | |
135 | response_ru_stime.inc_by( | |
136 | ru_stime, request.method, self.name, tag | |
137 | ) | |
138 | response_db_txn_count.inc_by( | |
139 | context.db_txn_count, request.method, self.name, tag | |
140 | ) | |
141 | response_db_txn_duration.inc_by( | |
142 | context.db_txn_duration_ms / 1000., request.method, self.name, tag | |
143 | ) | |
144 | response_db_sched_duration.inc_by( | |
145 | context.db_sched_duration_ms / 1000., request.method, self.name, tag | |
146 | ) | |
147 | ||
148 | response_size.inc_by(request.sentLength, request.method, self.name, tag) |
17 | 17 | from synapse.api.errors import ( |
18 | 18 | cs_exception, SynapseError, CodeMessageException, UnrecognizedRequestError, Codes |
19 | 19 | ) |
20 | from synapse.http.request_metrics import ( | |
21 | requests_counter, | |
22 | ) | |
20 | 23 | from synapse.util.logcontext import LoggingContext, PreserveLoggingContext |
21 | 24 | from synapse.util.caches import intern_dict |
22 | 25 | from synapse.util.metrics import Measure |
40 | 43 | |
41 | 44 | logger = logging.getLogger(__name__) |
42 | 45 | |
43 | metrics = synapse.metrics.get_metrics_for(__name__) | |
44 | ||
45 | # total number of responses served, split by method/servlet/tag | |
46 | response_count = metrics.register_counter( | |
47 | "response_count", | |
48 | labels=["method", "servlet", "tag"], | |
49 | alternative_names=( | |
50 | # the following are all deprecated aliases for the same metric | |
51 | metrics.name_prefix + x for x in ( | |
52 | "_requests", | |
53 | "_response_time:count", | |
54 | "_response_ru_utime:count", | |
55 | "_response_ru_stime:count", | |
56 | "_response_db_txn_count:count", | |
57 | "_response_db_txn_duration:count", | |
58 | ) | |
59 | ) | |
60 | ) | |
61 | ||
62 | requests_counter = metrics.register_counter( | |
63 | "requests_received", | |
64 | labels=["method", "servlet", ], | |
65 | ) | |
66 | ||
67 | outgoing_responses_counter = metrics.register_counter( | |
68 | "responses", | |
69 | labels=["method", "code"], | |
70 | ) | |
71 | ||
72 | response_timer = metrics.register_counter( | |
73 | "response_time_seconds", | |
74 | labels=["method", "servlet", "tag"], | |
75 | alternative_names=( | |
76 | metrics.name_prefix + "_response_time:total", | |
77 | ), | |
78 | ) | |
79 | ||
80 | response_ru_utime = metrics.register_counter( | |
81 | "response_ru_utime_seconds", labels=["method", "servlet", "tag"], | |
82 | alternative_names=( | |
83 | metrics.name_prefix + "_response_ru_utime:total", | |
84 | ), | |
85 | ) | |
86 | ||
87 | response_ru_stime = metrics.register_counter( | |
88 | "response_ru_stime_seconds", labels=["method", "servlet", "tag"], | |
89 | alternative_names=( | |
90 | metrics.name_prefix + "_response_ru_stime:total", | |
91 | ), | |
92 | ) | |
93 | ||
94 | response_db_txn_count = metrics.register_counter( | |
95 | "response_db_txn_count", labels=["method", "servlet", "tag"], | |
96 | alternative_names=( | |
97 | metrics.name_prefix + "_response_db_txn_count:total", | |
98 | ), | |
99 | ) | |
100 | ||
101 | # seconds spent waiting for db txns, excluding scheduling time, when processing | |
102 | # this request | |
103 | response_db_txn_duration = metrics.register_counter( | |
104 | "response_db_txn_duration_seconds", labels=["method", "servlet", "tag"], | |
105 | alternative_names=( | |
106 | metrics.name_prefix + "_response_db_txn_duration:total", | |
107 | ), | |
108 | ) | |
109 | ||
110 | # seconds spent waiting for a db connection, when processing this request | |
111 | response_db_sched_duration = metrics.register_counter( | |
112 | "response_db_sched_duration_seconds", labels=["method", "servlet", "tag"] | |
113 | ) | |
114 | ||
115 | # size in bytes of the response written | |
116 | response_size = metrics.register_counter( | |
117 | "response_size", labels=["method", "servlet", "tag"] | |
118 | ) | |
119 | ||
120 | _next_request_id = 0 | |
121 | ||
122 | ||
123 | def request_handler(include_metrics=False): | |
124 | """Decorator for ``wrap_request_handler``""" | |
125 | return lambda request_handler: wrap_request_handler(request_handler, include_metrics) | |
126 | ||
127 | ||
128 | def wrap_request_handler(request_handler, include_metrics=False): | |
129 | """Wraps a method that acts as a request handler with the necessary logging | |
130 | and exception handling. | |
131 | ||
132 | The method must have a signature of "handle_foo(self, request)". The | |
133 | argument "self" must have "version_string" and "clock" attributes. The | |
134 | argument "request" must be a twisted HTTP request. | |
135 | ||
136 | The method must return a deferred. If the deferred succeeds we assume that | |
46 | ||
47 | def wrap_json_request_handler(h): | |
48 | """Wraps a request handler method with exception handling. | |
49 | ||
50 | Also adds logging as per wrap_request_handler_with_logging. | |
51 | ||
52 | The handler method must have a signature of "handle_foo(self, request)", | |
53 | where "self" must have a "clock" attribute (and "request" must be a | |
54 | SynapseRequest). | |
55 | ||
56 | The handler must return a deferred. If the deferred succeeds we assume that | |
137 | 57 | a response has been sent. If the deferred fails with a SynapseError we use |
138 | 58 | it to send a JSON response with the appropriate HTTP reponse code. If the |
139 | 59 | deferred fails with any other type of error we send a 500 reponse. |
140 | ||
141 | We insert a unique request-id into the logging context for this request and | |
142 | log the response and duration for this request. | |
143 | 60 | """ |
144 | 61 | |
145 | 62 | @defer.inlineCallbacks |
146 | 63 | def wrapped_request_handler(self, request): |
147 | global _next_request_id | |
148 | request_id = "%s-%s" % (request.method, _next_request_id) | |
149 | _next_request_id += 1 | |
150 | ||
64 | try: | |
65 | yield h(self, request) | |
66 | except CodeMessageException as e: | |
67 | code = e.code | |
68 | if isinstance(e, SynapseError): | |
69 | logger.info( | |
70 | "%s SynapseError: %s - %s", request, code, e.msg | |
71 | ) | |
72 | else: | |
73 | logger.exception(e) | |
74 | respond_with_json( | |
75 | request, code, cs_exception(e), send_cors=True, | |
76 | pretty_print=_request_user_agent_is_curl(request), | |
77 | ) | |
78 | ||
79 | except Exception: | |
80 | # failure.Failure() fishes the original Failure out | |
81 | # of our stack, and thus gives us a sensible stack | |
82 | # trace. | |
83 | f = failure.Failure() | |
84 | logger.error( | |
85 | "Failed handle request via %r: %r: %s", | |
86 | h, | |
87 | request, | |
88 | f.getTraceback().rstrip(), | |
89 | ) | |
90 | respond_with_json( | |
91 | request, | |
92 | 500, | |
93 | { | |
94 | "error": "Internal server error", | |
95 | "errcode": Codes.UNKNOWN, | |
96 | }, | |
97 | send_cors=True, | |
98 | pretty_print=_request_user_agent_is_curl(request), | |
99 | ) | |
100 | ||
101 | return wrap_request_handler_with_logging(wrapped_request_handler) | |
102 | ||
103 | ||
104 | def wrap_request_handler_with_logging(h): | |
105 | """Wraps a request handler to provide logging and metrics | |
106 | ||
107 | The handler method must have a signature of "handle_foo(self, request)", | |
108 | where "self" must have a "clock" attribute (and "request" must be a | |
109 | SynapseRequest). | |
110 | ||
111 | As well as calling `request.processing` (which will log the response and | |
112 | duration for this request), the wrapped request handler will insert the | |
113 | request id into the logging context. | |
114 | """ | |
115 | @defer.inlineCallbacks | |
116 | def wrapped_request_handler(self, request): | |
117 | """ | |
118 | Args: | |
119 | self: | |
120 | request (synapse.http.site.SynapseRequest): | |
121 | """ | |
122 | ||
123 | request_id = request.get_request_id() | |
151 | 124 | with LoggingContext(request_id) as request_context: |
125 | request_context.request = request_id | |
152 | 126 | with Measure(self.clock, "wrapped_request_handler"): |
153 | request_metrics = RequestMetrics() | |
154 | 127 | # we start the request metrics timer here with an initial stab |
155 | 128 | # at the servlet name. For most requests that name will be |
156 | 129 | # JsonResource (or a subclass), and JsonResource._async_render |
157 | 130 | # will update it once it picks a servlet. |
158 | 131 | servlet_name = self.__class__.__name__ |
159 | request_metrics.start(self.clock, name=servlet_name) | |
160 | ||
161 | request_context.request = request_id | |
162 | with request.processing(): | |
163 | try: | |
164 | with PreserveLoggingContext(request_context): | |
165 | if include_metrics: | |
166 | yield request_handler(self, request, request_metrics) | |
167 | else: | |
168 | requests_counter.inc(request.method, servlet_name) | |
169 | yield request_handler(self, request) | |
170 | except CodeMessageException as e: | |
171 | code = e.code | |
172 | if isinstance(e, SynapseError): | |
173 | logger.info( | |
174 | "%s SynapseError: %s - %s", request, code, e.msg | |
175 | ) | |
176 | else: | |
177 | logger.exception(e) | |
178 | outgoing_responses_counter.inc(request.method, str(code)) | |
179 | respond_with_json( | |
180 | request, code, cs_exception(e), send_cors=True, | |
181 | pretty_print=_request_user_agent_is_curl(request), | |
182 | version_string=self.version_string, | |
183 | ) | |
184 | except Exception: | |
185 | # failure.Failure() fishes the original Failure out | |
186 | # of our stack, and thus gives us a sensible stack | |
187 | # trace. | |
188 | f = failure.Failure() | |
189 | logger.error( | |
190 | "Failed handle request %s.%s on %r: %r: %s", | |
191 | request_handler.__module__, | |
192 | request_handler.__name__, | |
193 | self, | |
194 | request, | |
195 | f.getTraceback().rstrip(), | |
196 | ) | |
197 | respond_with_json( | |
198 | request, | |
199 | 500, | |
200 | { | |
201 | "error": "Internal server error", | |
202 | "errcode": Codes.UNKNOWN, | |
203 | }, | |
204 | send_cors=True, | |
205 | pretty_print=_request_user_agent_is_curl(request), | |
206 | version_string=self.version_string, | |
207 | ) | |
208 | finally: | |
209 | try: | |
210 | request_metrics.stop( | |
211 | self.clock, request | |
212 | ) | |
213 | except Exception as e: | |
214 | logger.warn("Failed to stop metrics: %r", e) | |
132 | with request.processing(servlet_name): | |
133 | with PreserveLoggingContext(request_context): | |
134 | d = h(self, request) | |
135 | ||
136 | # record the arrival of the request *after* | |
137 | # dispatching to the handler, so that the handler | |
138 | # can update the servlet name in the request | |
139 | # metrics | |
140 | requests_counter.inc(request.method, | |
141 | request.request_metrics.name) | |
142 | yield d | |
215 | 143 | return wrapped_request_handler |
216 | 144 | |
217 | 145 | |
261 | 189 | self.canonical_json = canonical_json |
262 | 190 | self.clock = hs.get_clock() |
263 | 191 | self.path_regexs = {} |
264 | self.version_string = hs.version_string | |
265 | 192 | self.hs = hs |
266 | 193 | |
267 | 194 | def register_paths(self, method, path_patterns, callback): |
277 | 204 | self._async_render(request) |
278 | 205 | return server.NOT_DONE_YET |
279 | 206 | |
280 | # Disable metric reporting because _async_render does its own metrics. | |
281 | # It does its own metric reporting because _async_render dispatches to | |
282 | # a callback and it's the class name of that callback we want to report | |
283 | # against rather than the JsonResource itself. | |
284 | @request_handler(include_metrics=True) | |
207 | @wrap_json_request_handler | |
285 | 208 | @defer.inlineCallbacks |
286 | def _async_render(self, request, request_metrics): | |
209 | def _async_render(self, request): | |
287 | 210 | """ This gets called from render() every time someone sends us a request. |
288 | 211 | This checks if anyone has registered a callback for that method and |
289 | 212 | path. |
295 | 218 | servlet_classname = servlet_instance.__class__.__name__ |
296 | 219 | else: |
297 | 220 | servlet_classname = "%r" % callback |
298 | ||
299 | request_metrics.name = servlet_classname | |
300 | requests_counter.inc(request.method, servlet_classname) | |
221 | request.request_metrics.name = servlet_classname | |
301 | 222 | |
302 | 223 | # Now trigger the callback. If it returns a response, we send it |
303 | 224 | # here. If it throws an exception, that is handled by the wrapper |
344 | 265 | |
345 | 266 | def _send_response(self, request, code, response_json_object, |
346 | 267 | response_code_message=None): |
347 | outgoing_responses_counter.inc(request.method, str(code)) | |
348 | ||
349 | 268 | # TODO: Only enable CORS for the requests that need it. |
350 | 269 | respond_with_json( |
351 | 270 | request, code, response_json_object, |
352 | 271 | send_cors=True, |
353 | 272 | response_code_message=response_code_message, |
354 | 273 | pretty_print=_request_user_agent_is_curl(request), |
355 | version_string=self.version_string, | |
356 | 274 | canonical_json=self.canonical_json, |
357 | 275 | ) |
358 | 276 | |
383 | 301 | request (twisted.web.http.Request): |
384 | 302 | """ |
385 | 303 | raise UnrecognizedRequestError() |
386 | ||
387 | ||
388 | class RequestMetrics(object): | |
389 | def start(self, clock, name): | |
390 | self.start = clock.time_msec() | |
391 | self.start_context = LoggingContext.current_context() | |
392 | self.name = name | |
393 | ||
394 | def stop(self, clock, request): | |
395 | context = LoggingContext.current_context() | |
396 | ||
397 | tag = "" | |
398 | if context: | |
399 | tag = context.tag | |
400 | ||
401 | if context != self.start_context: | |
402 | logger.warn( | |
403 | "Context have unexpectedly changed %r, %r", | |
404 | context, self.start_context | |
405 | ) | |
406 | return | |
407 | ||
408 | response_count.inc(request.method, self.name, tag) | |
409 | ||
410 | response_timer.inc_by( | |
411 | clock.time_msec() - self.start, request.method, | |
412 | self.name, tag | |
413 | ) | |
414 | ||
415 | ru_utime, ru_stime = context.get_resource_usage() | |
416 | ||
417 | response_ru_utime.inc_by( | |
418 | ru_utime, request.method, self.name, tag | |
419 | ) | |
420 | response_ru_stime.inc_by( | |
421 | ru_stime, request.method, self.name, tag | |
422 | ) | |
423 | response_db_txn_count.inc_by( | |
424 | context.db_txn_count, request.method, self.name, tag | |
425 | ) | |
426 | response_db_txn_duration.inc_by( | |
427 | context.db_txn_duration_ms / 1000., request.method, self.name, tag | |
428 | ) | |
429 | response_db_sched_duration.inc_by( | |
430 | context.db_sched_duration_ms / 1000., request.method, self.name, tag | |
431 | ) | |
432 | ||
433 | response_size.inc_by(request.sentLength, request.method, self.name, tag) | |
434 | 304 | |
435 | 305 | |
436 | 306 | class RootRedirect(resource.Resource): |
451 | 321 | |
452 | 322 | def respond_with_json(request, code, json_object, send_cors=False, |
453 | 323 | response_code_message=None, pretty_print=False, |
454 | version_string="", canonical_json=True): | |
324 | canonical_json=True): | |
455 | 325 | # could alternatively use request.notifyFinish() and flip a flag when |
456 | 326 | # the Deferred fires, but since the flag is RIGHT THERE it seems like |
457 | 327 | # a waste. |
473 | 343 | request, code, json_bytes, |
474 | 344 | send_cors=send_cors, |
475 | 345 | response_code_message=response_code_message, |
476 | version_string=version_string | |
477 | 346 | ) |
478 | 347 | |
479 | 348 | |
480 | 349 | def respond_with_json_bytes(request, code, json_bytes, send_cors=False, |
481 | version_string="", response_code_message=None): | |
350 | response_code_message=None): | |
482 | 351 | """Sends encoded JSON in response to the given request. |
483 | 352 | |
484 | 353 | Args: |
492 | 361 | |
493 | 362 | request.setResponseCode(code, message=response_code_message) |
494 | 363 | request.setHeader(b"Content-Type", b"application/json") |
495 | request.setHeader(b"Server", version_string) | |
496 | 364 | request.setHeader(b"Content-Length", b"%d" % (len(json_bytes),)) |
497 | 365 | request.setHeader(b"Cache-Control", b"no-cache, no-store, must-revalidate") |
498 | 366 | |
545 | 413 | b"User-Agent", default=[] |
546 | 414 | ) |
547 | 415 | for user_agent in user_agents: |
548 | if "curl" in user_agent: | |
416 | if b"curl" in user_agent: | |
549 | 417 | return True |
550 | 418 | return False |
11 | 11 | # See the License for the specific language governing permissions and |
12 | 12 | # limitations under the License. |
13 | 13 | |
14 | from synapse.util.logcontext import LoggingContext | |
15 | from twisted.web.server import Site, Request | |
16 | ||
17 | 14 | import contextlib |
18 | 15 | import logging |
19 | 16 | import re |
20 | 17 | import time |
21 | 18 | |
19 | from twisted.web.server import Site, Request | |
20 | ||
21 | from synapse.http.request_metrics import RequestMetrics | |
22 | from synapse.util.logcontext import LoggingContext | |
23 | ||
24 | logger = logging.getLogger(__name__) | |
25 | ||
22 | 26 | ACCESS_TOKEN_RE = re.compile(br'(\?.*access(_|%5[Ff])token=)[^&]*(.*)$') |
23 | 27 | |
28 | _next_request_seq = 0 | |
29 | ||
24 | 30 | |
25 | 31 | class SynapseRequest(Request): |
32 | """Class which encapsulates an HTTP request to synapse. | |
33 | ||
34 | All of the requests processed in synapse are of this type. | |
35 | ||
36 | It extends twisted's twisted.web.server.Request, and adds: | |
37 | * Unique request ID | |
38 | * Redaction of access_token query-params in __repr__ | |
39 | * Logging at start and end | |
40 | * Metrics to record CPU, wallclock and DB time by endpoint. | |
41 | ||
42 | It provides a method `processing` which should be called by the Resource | |
43 | which is handling the request, and returns a context manager. | |
44 | ||
45 | """ | |
26 | 46 | def __init__(self, site, *args, **kw): |
27 | 47 | Request.__init__(self, *args, **kw) |
28 | 48 | self.site = site |
29 | 49 | self.authenticated_entity = None |
30 | 50 | self.start_time = 0 |
51 | ||
52 | global _next_request_seq | |
53 | self.request_seq = _next_request_seq | |
54 | _next_request_seq += 1 | |
31 | 55 | |
32 | 56 | def __repr__(self): |
33 | 57 | # We overwrite this so that we don't log ``access_token`` |
40 | 64 | self.site.site_tag, |
41 | 65 | ) |
42 | 66 | |
67 | def get_request_id(self): | |
68 | return "%s-%i" % (self.method, self.request_seq) | |
69 | ||
43 | 70 | def get_redacted_uri(self): |
44 | 71 | return ACCESS_TOKEN_RE.sub( |
45 | 72 | br'\1<redacted>\3', |
49 | 76 | def get_user_agent(self): |
50 | 77 | return self.requestHeaders.getRawHeaders(b"User-Agent", [None])[-1] |
51 | 78 | |
52 | def started_processing(self): | |
79 | def render(self, resrc): | |
80 | # override the Server header which is set by twisted | |
81 | self.setHeader("Server", self.site.server_version_string) | |
82 | return Request.render(self, resrc) | |
83 | ||
84 | def _started_processing(self, servlet_name): | |
85 | self.start_time = int(time.time() * 1000) | |
86 | self.request_metrics = RequestMetrics() | |
87 | self.request_metrics.start(self.start_time, name=servlet_name) | |
88 | ||
53 | 89 | self.site.access_logger.info( |
54 | 90 | "%s - %s - Received request: %s %s", |
55 | 91 | self.getClientIP(), |
57 | 93 | self.method, |
58 | 94 | self.get_redacted_uri() |
59 | 95 | ) |
60 | self.start_time = int(time.time() * 1000) | |
61 | ||
62 | def finished_processing(self): | |
63 | ||
96 | ||
97 | def _finished_processing(self): | |
64 | 98 | try: |
65 | 99 | context = LoggingContext.current_context() |
66 | 100 | ru_utime, ru_stime = context.get_resource_usage() |
71 | 105 | ru_utime, ru_stime = (0, 0) |
72 | 106 | db_txn_count, db_txn_duration_ms = (0, 0) |
73 | 107 | |
108 | end_time = int(time.time() * 1000) | |
109 | ||
74 | 110 | self.site.access_logger.info( |
75 | 111 | "%s - %s - {%s}" |
76 | 112 | " Processed request: %dms (%dms, %dms) (%dms/%dms/%d)" |
78 | 114 | self.getClientIP(), |
79 | 115 | self.site.site_tag, |
80 | 116 | self.authenticated_entity, |
81 | int(time.time() * 1000) - self.start_time, | |
117 | end_time - self.start_time, | |
82 | 118 | int(ru_utime * 1000), |
83 | 119 | int(ru_stime * 1000), |
84 | 120 | db_sched_duration_ms, |
92 | 128 | self.get_user_agent(), |
93 | 129 | ) |
94 | 130 | |
131 | try: | |
132 | self.request_metrics.stop(end_time, self) | |
133 | except Exception as e: | |
134 | logger.warn("Failed to stop metrics: %r", e) | |
135 | ||
95 | 136 | @contextlib.contextmanager |
96 | def processing(self): | |
97 | self.started_processing() | |
137 | def processing(self, servlet_name): | |
138 | """Record the fact that we are processing this request. | |
139 | ||
140 | Returns a context manager; the correct way to use this is: | |
141 | ||
142 | @defer.inlineCallbacks | |
143 | def handle_request(request): | |
144 | with request.processing("FooServlet"): | |
145 | yield really_handle_the_request() | |
146 | ||
147 | This will log the request's arrival. Once the context manager is | |
148 | closed, the completion of the request will be logged, and the various | |
149 | metrics will be updated. | |
150 | ||
151 | Args: | |
152 | servlet_name (str): the name of the servlet which will be | |
153 | processing this request. This is used in the metrics. | |
154 | ||
155 | It is possible to update this afterwards by updating | |
156 | self.request_metrics.servlet_name. | |
157 | """ | |
158 | # TODO: we should probably just move this into render() and finish(), | |
159 | # to save having to call a separate method. | |
160 | self._started_processing(servlet_name) | |
98 | 161 | yield |
99 | self.finished_processing() | |
162 | self._finished_processing() | |
100 | 163 | |
101 | 164 | |
102 | 165 | class XForwardedForRequest(SynapseRequest): |
134 | 197 | Subclass of a twisted http Site that does access logging with python's |
135 | 198 | standard logging |
136 | 199 | """ |
137 | def __init__(self, logger_name, site_tag, config, resource, *args, **kwargs): | |
200 | def __init__(self, logger_name, site_tag, config, resource, | |
201 | server_version_string, *args, **kwargs): | |
138 | 202 | Site.__init__(self, resource, *args, **kwargs) |
139 | 203 | |
140 | 204 | self.site_tag = site_tag |
142 | 206 | proxied = config.get("x_forwarded", False) |
143 | 207 | self.requestFactory = SynapseRequestFactory(self, proxied) |
144 | 208 | self.access_logger = logging.getLogger(logger_name) |
209 | self.server_version_string = server_version_string | |
145 | 210 | |
146 | 211 | def log(self, request): |
147 | 212 | pass |
15 | 15 | |
16 | 16 | from itertools import chain |
17 | 17 | import logging |
18 | import re | |
18 | 19 | |
19 | 20 | logger = logging.getLogger(__name__) |
20 | 21 | |
55 | 56 | return not len(self.labels) |
56 | 57 | |
57 | 58 | def _render_labelvalue(self, value): |
58 | # TODO: escape backslashes, quotes and newlines | |
59 | return '"%s"' % (value) | |
59 | return '"%s"' % (_escape_label_value(value),) | |
60 | 60 | |
61 | 61 | def _render_key(self, values): |
62 | 62 | if self.is_scalar(): |
70 | 70 | """Render this metric for a single set of labels |
71 | 71 | |
72 | 72 | Args: |
73 | label_values (list[str]): values for each of the labels | |
73 | label_values (list[object]): values for each of the labels, | |
74 | (which get stringified). | |
74 | 75 | value: value of the metric at with these labels |
75 | 76 | |
76 | 77 | Returns: |
298 | 299 | "process_psutil_rss:total %d" % sum_rss, |
299 | 300 | "process_psutil_rss:count %d" % len_rss, |
300 | 301 | ] |
302 | ||
303 | ||
304 | def _escape_character(m): | |
305 | """Replaces a single character with its escape sequence. | |
306 | ||
307 | Args: | |
308 | m (re.MatchObject): A match object whose first group is the single | |
309 | character to replace | |
310 | ||
311 | Returns: | |
312 | str | |
313 | """ | |
314 | c = m.group(1) | |
315 | if c == "\\": | |
316 | return "\\\\" | |
317 | elif c == "\"": | |
318 | return "\\\"" | |
319 | elif c == "\n": | |
320 | return "\\n" | |
321 | return c | |
322 | ||
323 | ||
324 | def _escape_label_value(value): | |
325 | """Takes a label value and escapes quotes, newlines and backslashes | |
326 | """ | |
327 | return re.sub(r"([\n\"\\])", _escape_character, str(value)) |
13 | 13 | # limitations under the License. |
14 | 14 | |
15 | 15 | from twisted.internet import defer |
16 | ||
16 | 17 | from synapse.api.constants import EventTypes, Membership |
17 | 18 | from synapse.api.errors import AuthError |
18 | 19 | from synapse.handlers.presence import format_user_presence_state |
19 | 20 | |
20 | from synapse.util import DeferredTimedOutError | |
21 | 21 | from synapse.util.logutils import log_function |
22 | from synapse.util.async import ObservableDeferred | |
23 | from synapse.util.logcontext import PreserveLoggingContext, preserve_fn | |
22 | from synapse.util.async import ( | |
23 | ObservableDeferred, add_timeout_to_deferred, | |
24 | DeferredTimeoutError, | |
25 | ) | |
26 | from synapse.util.logcontext import PreserveLoggingContext, run_in_background | |
24 | 27 | from synapse.util.metrics import Measure |
25 | 28 | from synapse.types import StreamToken |
26 | 29 | from synapse.visibility import filter_events_for_client |
250 | 253 | def _on_new_room_event(self, event, room_stream_id, extra_users=[]): |
251 | 254 | """Notify any user streams that are interested in this room event""" |
252 | 255 | # poke any interested application service. |
253 | preserve_fn(self.appservice_handler.notify_interested_services)( | |
254 | room_stream_id | |
255 | ) | |
256 | run_in_background(self._notify_app_services, room_stream_id) | |
256 | 257 | |
257 | 258 | if self.federation_sender: |
258 | 259 | self.federation_sender.notify_new_events(room_stream_id) |
265 | 266 | users=extra_users, |
266 | 267 | rooms=[event.room_id], |
267 | 268 | ) |
269 | ||
270 | @defer.inlineCallbacks | |
271 | def _notify_app_services(self, room_stream_id): | |
272 | try: | |
273 | yield self.appservice_handler.notify_interested_services(room_stream_id) | |
274 | except Exception: | |
275 | logger.exception("Error notifying application services of event") | |
268 | 276 | |
269 | 277 | def on_new_event(self, stream_key, new_token, users=[], rooms=[]): |
270 | 278 | """ Used to inform listeners that something has happend event wise. |
330 | 338 | # Now we wait for the _NotifierUserStream to be told there |
331 | 339 | # is a new token. |
332 | 340 | listener = user_stream.new_listener(prev_token) |
341 | add_timeout_to_deferred( | |
342 | listener.deferred, | |
343 | (end_time - now) / 1000., | |
344 | ) | |
333 | 345 | with PreserveLoggingContext(): |
334 | yield self.clock.time_bound_deferred( | |
335 | listener.deferred, | |
336 | time_out=(end_time - now) / 1000. | |
337 | ) | |
346 | yield listener.deferred | |
338 | 347 | |
339 | 348 | current_token = user_stream.current_token |
340 | 349 | |
345 | 354 | # Update the prev_token to the current_token since nothing |
346 | 355 | # has happened between the old prev_token and the current_token |
347 | 356 | prev_token = current_token |
348 | except DeferredTimedOutError: | |
357 | except DeferredTimeoutError: | |
349 | 358 | break |
350 | 359 | except defer.CancelledError: |
351 | 360 | break |
550 | 559 | if end_time <= now: |
551 | 560 | break |
552 | 561 | |
562 | add_timeout_to_deferred( | |
563 | listener.deferred.addTimeout, | |
564 | (end_time - now) / 1000., | |
565 | ) | |
553 | 566 | try: |
554 | 567 | with PreserveLoggingContext(): |
555 | yield self.clock.time_bound_deferred( | |
556 | listener.deferred, | |
557 | time_out=(end_time - now) / 1000. | |
558 | ) | |
559 | except DeferredTimedOutError: | |
568 | yield listener.deferred | |
569 | except DeferredTimeoutError: | |
560 | 570 | break |
561 | 571 | except defer.CancelledError: |
562 | 572 | break |
76 | 76 | @defer.inlineCallbacks |
77 | 77 | def on_started(self): |
78 | 78 | if self.mailer is not None: |
79 | self.throttle_params = yield self.store.get_throttle_params_by_room( | |
80 | self.pusher_id | |
81 | ) | |
82 | yield self._process() | |
79 | try: | |
80 | self.throttle_params = yield self.store.get_throttle_params_by_room( | |
81 | self.pusher_id | |
82 | ) | |
83 | yield self._process() | |
84 | except Exception: | |
85 | logger.exception("Error starting email pusher") | |
83 | 86 | |
84 | 87 | def on_stop(self): |
85 | 88 | if self.timed_call: |
17 | 17 | from twisted.internet import defer, reactor |
18 | 18 | from twisted.internet.error import AlreadyCalled, AlreadyCancelled |
19 | 19 | |
20 | import push_rule_evaluator | |
21 | import push_tools | |
20 | from . import push_rule_evaluator | |
21 | from . import push_tools | |
22 | 22 | import synapse |
23 | 23 | from synapse.push import PusherConfigException |
24 | 24 | from synapse.util.logcontext import LoggingContext |
93 | 93 | |
94 | 94 | @defer.inlineCallbacks |
95 | 95 | def on_started(self): |
96 | yield self._process() | |
96 | try: | |
97 | yield self._process() | |
98 | except Exception: | |
99 | logger.exception("Error starting http pusher") | |
97 | 100 | |
98 | 101 | @defer.inlineCallbacks |
99 | 102 | def on_new_notifications(self, min_stream_ordering, max_stream_ordering): |
12 | 12 | # See the License for the specific language governing permissions and |
13 | 13 | # limitations under the License. |
14 | 14 | |
15 | from httppusher import HttpPusher | |
15 | from .httppusher import HttpPusher | |
16 | 16 | |
17 | 17 | import logging |
18 | 18 | logger = logging.getLogger(__name__) |
13 | 13 | # See the License for the specific language governing permissions and |
14 | 14 | # limitations under the License. |
15 | 15 | |
16 | import logging | |
17 | ||
16 | 18 | from twisted.internet import defer |
17 | 19 | |
18 | from .pusher import PusherFactory | |
19 | from synapse.util.logcontext import make_deferred_yieldable, preserve_fn | |
20 | from synapse.push.pusher import PusherFactory | |
20 | 21 | from synapse.util.async import run_on_reactor |
21 | ||
22 | import logging | |
22 | from synapse.util.logcontext import make_deferred_yieldable, run_in_background | |
23 | 23 | |
24 | 24 | logger = logging.getLogger(__name__) |
25 | 25 | |
136 | 136 | if u in self.pushers: |
137 | 137 | for p in self.pushers[u].values(): |
138 | 138 | deferreds.append( |
139 | preserve_fn(p.on_new_notifications)( | |
140 | min_stream_id, max_stream_id | |
139 | run_in_background( | |
140 | p.on_new_notifications, | |
141 | min_stream_id, max_stream_id, | |
141 | 142 | ) |
142 | 143 | ) |
143 | 144 | |
144 | yield make_deferred_yieldable(defer.gatherResults(deferreds)) | |
145 | yield make_deferred_yieldable( | |
146 | defer.gatherResults(deferreds, consumeErrors=True), | |
147 | ) | |
145 | 148 | except Exception: |
146 | 149 | logger.exception("Exception in pusher on_new_notifications") |
147 | 150 | |
163 | 166 | if u in self.pushers: |
164 | 167 | for p in self.pushers[u].values(): |
165 | 168 | deferreds.append( |
166 | preserve_fn(p.on_new_receipts)(min_stream_id, max_stream_id) | |
169 | run_in_background( | |
170 | p.on_new_receipts, | |
171 | min_stream_id, max_stream_id, | |
172 | ) | |
167 | 173 | ) |
168 | 174 | |
169 | yield make_deferred_yieldable(defer.gatherResults(deferreds)) | |
175 | yield make_deferred_yieldable( | |
176 | defer.gatherResults(deferreds, consumeErrors=True), | |
177 | ) | |
170 | 178 | except Exception: |
171 | 179 | logger.exception("Exception in pusher on_new_receipts") |
172 | 180 | |
206 | 214 | if appid_pushkey in byuser: |
207 | 215 | byuser[appid_pushkey].on_stop() |
208 | 216 | byuser[appid_pushkey] = p |
209 | preserve_fn(p.on_started)() | |
217 | run_in_background(p.on_started) | |
210 | 218 | |
211 | 219 | logger.info("Started pushers") |
212 | 220 |
38 | 38 | "signedjson>=1.0.0": ["signedjson>=1.0.0"], |
39 | 39 | "pynacl>=1.2.1": ["nacl>=1.2.1", "nacl.bindings"], |
40 | 40 | "service_identity>=1.0.0": ["service_identity>=1.0.0"], |
41 | "Twisted>=16.0.0": ["twisted>=16.0.0"], | |
41 | 42 | |
42 | # we break under Twisted 18.4 | |
43 | # (https://github.com/matrix-org/synapse/issues/3135) | |
44 | "Twisted>=16.0.0,<18.4": ["twisted>=16.0.0"], | |
43 | # We use crypto.get_elliptic_curve which is only supported in >=0.15 | |
44 | "pyopenssl>=0.15": ["OpenSSL>=0.15"], | |
45 | 45 | |
46 | "pyopenssl>=0.14": ["OpenSSL>=0.14"], | |
47 | 46 | "pyyaml": ["yaml"], |
48 | 47 | "pyasn1": ["pyasn1"], |
49 | 48 | "daemonize": ["daemonize"], |
52 | 52 | from twisted.protocols.basic import LineOnlyReceiver |
53 | 53 | from twisted.python.failure import Failure |
54 | 54 | |
55 | from commands import ( | |
55 | from .commands import ( | |
56 | 56 | COMMAND_MAP, VALID_CLIENT_COMMANDS, VALID_SERVER_COMMANDS, |
57 | 57 | ErrorCommand, ServerCommand, RdataCommand, PositionCommand, PingCommand, |
58 | 58 | NameCommand, ReplicateCommand, UserSyncCommand, SyncCommand, |
59 | 59 | ) |
60 | from streams import STREAMS_MAP | |
60 | from .streams import STREAMS_MAP | |
61 | 61 | |
62 | 62 | from synapse.util.stringutils import random_string |
63 | 63 | from synapse.metrics.metric import CounterMetric |
17 | 17 | from twisted.internet import defer, reactor |
18 | 18 | from twisted.internet.protocol import Factory |
19 | 19 | |
20 | from streams import STREAMS_MAP, FederationStream | |
21 | from protocol import ServerReplicationStreamProtocol | |
20 | from .streams import STREAMS_MAP, FederationStream | |
21 | from .protocol import ServerReplicationStreamProtocol | |
22 | 22 | |
23 | 23 | from synapse.util.metrics import Measure, measure_func |
24 | 24 |
167 | 167 | yield self.store.find_first_stream_ordering_after_ts(ts) |
168 | 168 | ) |
169 | 169 | |
170 | (_, depth, _) = ( | |
170 | room_event_after_stream_ordering = ( | |
171 | 171 | yield self.store.get_room_event_after_stream_ordering( |
172 | 172 | room_id, stream_ordering, |
173 | 173 | ) |
174 | 174 | ) |
175 | if room_event_after_stream_ordering: | |
176 | (_, depth, _) = room_event_after_stream_ordering | |
177 | else: | |
178 | logger.warn( | |
179 | "[purge] purging events not possible: No event found " | |
180 | "(received_ts %i => stream_ordering %i)", | |
181 | ts, stream_ordering, | |
182 | ) | |
183 | raise SynapseError( | |
184 | 404, | |
185 | "there is no event to be purged", | |
186 | errcode=Codes.NOT_FOUND, | |
187 | ) | |
175 | 188 | logger.info( |
176 | 189 | "[purge] purging up to depth %i (received_ts %i => " |
177 | 190 | "stream_ordering %i)", |
51 | 51 | """A base Synapse REST Servlet for the client version 1 API. |
52 | 52 | """ |
53 | 53 | |
54 | # This subclass was presumably created to allow the auth for the v1 | |
55 | # protocol version to be different, however this behaviour was removed. | |
56 | # it may no longer be necessary | |
57 | ||
54 | 58 | def __init__(self, hs): |
55 | 59 | """ |
56 | 60 | Args: |
58 | 62 | """ |
59 | 63 | self.hs = hs |
60 | 64 | self.builder_factory = hs.get_event_builder_factory() |
61 | self.auth = hs.get_v1auth() | |
65 | self.auth = hs.get_auth() | |
62 | 66 | self.txns = HttpTransactionCache(hs.get_clock()) |
24 | 24 | |
25 | 25 | import simplejson as json |
26 | 26 | import urllib |
27 | import urlparse | |
27 | from six.moves.urllib import parse as urlparse | |
28 | 28 | |
29 | 29 | import logging |
30 | 30 | from saml2 import BINDING_HTTP_POST |
149 | 149 | super(RestServlet, self).__init__() |
150 | 150 | self.hs = hs |
151 | 151 | self.notifier = hs.get_notifier() |
152 | self.auth = hs.get_v1auth() | |
152 | self.auth = hs.get_auth() | |
153 | 153 | self.pusher_pool = self.hs.get_pusherpool() |
154 | 154 | |
155 | 155 | @defer.inlineCallbacks |
175 | 175 | |
176 | 176 | request.setResponseCode(200) |
177 | 177 | request.setHeader(b"Content-Type", b"text/html; charset=utf-8") |
178 | request.setHeader(b"Server", self.hs.version_string) | |
179 | 178 | request.setHeader(b"Content-Length", b"%d" % ( |
180 | 179 | len(PushersRemoveRestServlet.SUCCESS_HTML), |
181 | 180 | )) |
28 | 28 | from hashlib import sha1 |
29 | 29 | import hmac |
30 | 30 | import logging |
31 | ||
32 | from six import string_types | |
31 | 33 | |
32 | 34 | logger = logging.getLogger(__name__) |
33 | 35 | |
332 | 334 | def _do_shared_secret(self, request, register_json, session): |
333 | 335 | yield run_on_reactor() |
334 | 336 | |
335 | if not isinstance(register_json.get("mac", None), basestring): | |
337 | if not isinstance(register_json.get("mac", None), string_types): | |
336 | 338 | raise SynapseError(400, "Expected mac.") |
337 | if not isinstance(register_json.get("user", None), basestring): | |
339 | if not isinstance(register_json.get("user", None), string_types): | |
338 | 340 | raise SynapseError(400, "Expected 'user' key.") |
339 | if not isinstance(register_json.get("password", None), basestring): | |
341 | if not isinstance(register_json.get("password", None), string_types): | |
340 | 342 | raise SynapseError(400, "Expected 'password' key.") |
341 | 343 | |
342 | 344 | if not self.hs.config.registration_shared_secret: |
357 | 359 | got_mac = str(register_json["mac"]) |
358 | 360 | |
359 | 361 | want_mac = hmac.new( |
360 | key=self.hs.config.registration_shared_secret, | |
362 | key=self.hs.config.registration_shared_secret.encode(), | |
361 | 363 | digestmod=sha1, |
362 | 364 | ) |
363 | 365 | want_mac.update(user) |
364 | want_mac.update("\x00") | |
366 | want_mac.update(b"\x00") | |
365 | 367 | want_mac.update(password) |
366 | want_mac.update("\x00") | |
367 | want_mac.update("admin" if admin else "notadmin") | |
368 | want_mac.update(b"\x00") | |
369 | want_mac.update(b"admin" if admin else b"notadmin") | |
368 | 370 | want_mac = want_mac.hexdigest() |
369 | 371 | |
370 | 372 | if compare_digest(want_mac, got_mac): |
27 | 27 | parse_json_object_from_request, parse_string, parse_integer |
28 | 28 | ) |
29 | 29 | |
30 | from six.moves.urllib import parse as urlparse | |
31 | ||
30 | 32 | import logging |
31 | import urllib | |
32 | 33 | import simplejson as json |
33 | 34 | |
34 | 35 | logger = logging.getLogger(__name__) |
432 | 433 | as_client_event = "raw" not in request.args |
433 | 434 | filter_bytes = request.args.get("filter", None) |
434 | 435 | if filter_bytes: |
435 | filter_json = urllib.unquote(filter_bytes[-1]).decode("UTF-8") | |
436 | filter_json = urlparse.unquote(filter_bytes[-1]).decode("UTF-8") | |
436 | 437 | event_filter = Filter(json.loads(filter_json)) |
437 | 438 | else: |
438 | 439 | event_filter = None |
717 | 718 | def on_PUT(self, request, room_id, user_id): |
718 | 719 | requester = yield self.auth.get_user_by_req(request) |
719 | 720 | |
720 | room_id = urllib.unquote(room_id) | |
721 | target_user = UserID.from_string(urllib.unquote(user_id)) | |
721 | room_id = urlparse.unquote(room_id) | |
722 | target_user = UserID.from_string(urlparse.unquote(user_id)) | |
722 | 723 | |
723 | 724 | content = parse_json_object_from_request(request) |
724 | 725 |
128 | 128 | html_bytes = html.encode("utf8") |
129 | 129 | request.setResponseCode(200) |
130 | 130 | request.setHeader(b"Content-Type", b"text/html; charset=utf-8") |
131 | request.setHeader(b"Server", self.hs.version_string) | |
132 | 131 | request.setHeader(b"Content-Length", b"%d" % (len(html_bytes),)) |
133 | 132 | |
134 | 133 | request.write(html_bytes) |
174 | 173 | html_bytes = html.encode("utf8") |
175 | 174 | request.setResponseCode(200) |
176 | 175 | request.setHeader(b"Content-Type", b"text/html; charset=utf-8") |
177 | request.setHeader(b"Server", self.hs.version_string) | |
178 | 176 | request.setHeader(b"Content-Length", b"%d" % (len(html_bytes),)) |
179 | 177 | |
180 | 178 | request.write(html_bytes) |
87 | 87 | pa["topological_ordering"], pa["stream_ordering"] |
88 | 88 | ) |
89 | 89 | returned_push_actions.append(returned_pa) |
90 | next_token = pa["stream_ordering"] | |
90 | next_token = str(pa["stream_ordering"]) | |
91 | 91 | |
92 | 92 | defer.returnValue((200, { |
93 | 93 | "notifications": returned_push_actions, |
34 | 34 | from synapse.util.async import run_on_reactor |
35 | 35 | from synapse.util.ratelimitutils import FederationRateLimiter |
36 | 36 | |
37 | from six import string_types | |
38 | ||
37 | 39 | |
38 | 40 | # We ought to be using hmac.compare_digest() but on older pythons it doesn't |
39 | 41 | # exist. It's a _really minor_ security flaw to use plain string comparison |
209 | 211 | # in sessions. Pull out the username/password provided to us. |
210 | 212 | desired_password = None |
211 | 213 | if 'password' in body: |
212 | if (not isinstance(body['password'], basestring) or | |
214 | if (not isinstance(body['password'], string_types) or | |
213 | 215 | len(body['password']) > 512): |
214 | 216 | raise SynapseError(400, "Invalid password") |
215 | 217 | desired_password = body["password"] |
216 | 218 | |
217 | 219 | desired_username = None |
218 | 220 | if 'username' in body: |
219 | if (not isinstance(body['username'], basestring) or | |
221 | if (not isinstance(body['username'], string_types) or | |
220 | 222 | len(body['username']) > 512): |
221 | 223 | raise SynapseError(400, "Invalid username") |
222 | 224 | desired_username = body['username'] |
242 | 244 | |
243 | 245 | access_token = get_access_token_from_request(request) |
244 | 246 | |
245 | if isinstance(desired_username, basestring): | |
247 | if isinstance(desired_username, string_types): | |
246 | 248 | result = yield self._do_appservice_registration( |
247 | 249 | desired_username, access_token, body |
248 | 250 | ) |
463 | 465 | # includes the password and admin flag in the hashed text. Why are |
464 | 466 | # these different? |
465 | 467 | want_mac = hmac.new( |
466 | key=self.hs.config.registration_shared_secret, | |
468 | key=self.hs.config.registration_shared_secret.encode(), | |
467 | 469 | msg=user, |
468 | 470 | digestmod=sha1, |
469 | 471 | ).hexdigest() |
48 | 48 | """ |
49 | 49 | |
50 | 50 | def __init__(self, hs): |
51 | self.version_string = hs.version_string | |
52 | 51 | self.response_body = encode_canonical_json( |
53 | 52 | self.response_json_object(hs.config) |
54 | 53 | ) |
83 | 82 | def render_GET(self, request): |
84 | 83 | return respond_with_json_bytes( |
85 | 84 | request, 200, self.response_body, |
86 | version_string=self.version_string | |
87 | 85 | ) |
88 | 86 | |
89 | 87 | def getChild(self, name, request): |
62 | 62 | isLeaf = True |
63 | 63 | |
64 | 64 | def __init__(self, hs): |
65 | self.version_string = hs.version_string | |
66 | 65 | self.config = hs.config |
67 | 66 | self.clock = hs.clock |
68 | 67 | self.update_response_body(self.clock.time_msec()) |
114 | 113 | self.update_response_body(time_now) |
115 | 114 | return respond_with_json_bytes( |
116 | 115 | request, 200, self.response_body, |
117 | version_string=self.version_string | |
118 | 116 | ) |
11 | 11 | # See the License for the specific language governing permissions and |
12 | 12 | # limitations under the License. |
13 | 13 | |
14 | from synapse.http.server import request_handler, respond_with_json_bytes | |
14 | from synapse.http.server import ( | |
15 | respond_with_json_bytes, wrap_json_request_handler, | |
16 | ) | |
15 | 17 | from synapse.http.servlet import parse_integer, parse_json_object_from_request |
16 | 18 | from synapse.api.errors import SynapseError, Codes |
17 | 19 | from synapse.crypto.keyring import KeyLookupError |
90 | 92 | def __init__(self, hs): |
91 | 93 | self.keyring = hs.get_keyring() |
92 | 94 | self.store = hs.get_datastore() |
93 | self.version_string = hs.version_string | |
94 | 95 | self.clock = hs.get_clock() |
95 | 96 | self.federation_domain_whitelist = hs.config.federation_domain_whitelist |
96 | 97 | |
98 | 99 | self.async_render_GET(request) |
99 | 100 | return NOT_DONE_YET |
100 | 101 | |
101 | @request_handler() | |
102 | @wrap_json_request_handler | |
102 | 103 | @defer.inlineCallbacks |
103 | 104 | def async_render_GET(self, request): |
104 | 105 | if len(request.postpath) == 1: |
123 | 124 | self.async_render_POST(request) |
124 | 125 | return NOT_DONE_YET |
125 | 126 | |
126 | @request_handler() | |
127 | @wrap_json_request_handler | |
127 | 128 | @defer.inlineCallbacks |
128 | 129 | def async_render_POST(self, request): |
129 | 130 | content = parse_json_object_from_request(request) |
239 | 240 | |
240 | 241 | respond_with_json_bytes( |
241 | 242 | request, 200, result_io.getvalue(), |
242 | version_string=self.version_string | |
243 | ) | |
243 | ) |
27 | 27 | |
28 | 28 | import logging |
29 | 29 | import urllib |
30 | import urlparse | |
30 | from six.moves.urllib import parse as urlparse | |
31 | 31 | |
32 | 32 | logger = logging.getLogger(__name__) |
33 | 33 | |
142 | 142 | respond_404(request) |
143 | 143 | return |
144 | 144 | |
145 | logger.debug("Responding to media request with responder %s") | |
145 | 146 | add_file_headers(request, media_type, file_size, upload_name) |
146 | 147 | with responder: |
147 | 148 | yield responder.write_to_consumer(request) |
11 | 11 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | 12 | # See the License for the specific language governing permissions and |
13 | 13 | # limitations under the License. |
14 | import logging | |
15 | ||
16 | from twisted.internet import defer | |
17 | from twisted.web.resource import Resource | |
18 | from twisted.web.server import NOT_DONE_YET | |
19 | ||
20 | from synapse.http.server import ( | |
21 | set_cors_headers, | |
22 | wrap_json_request_handler, | |
23 | ) | |
14 | 24 | import synapse.http.servlet |
15 | ||
16 | 25 | from ._base import parse_media_id, respond_404 |
17 | from twisted.web.resource import Resource | |
18 | from synapse.http.server import request_handler, set_cors_headers | |
19 | ||
20 | from twisted.web.server import NOT_DONE_YET | |
21 | from twisted.internet import defer | |
22 | ||
23 | import logging | |
24 | 26 | |
25 | 27 | logger = logging.getLogger(__name__) |
26 | 28 | |
34 | 36 | self.media_repo = media_repo |
35 | 37 | self.server_name = hs.hostname |
36 | 38 | |
37 | # Both of these are expected by @request_handler() | |
39 | # this is expected by @wrap_json_request_handler | |
38 | 40 | self.clock = hs.get_clock() |
39 | self.version_string = hs.version_string | |
40 | 41 | |
41 | 42 | def render_GET(self, request): |
42 | 43 | self._async_render_GET(request) |
43 | 44 | return NOT_DONE_YET |
44 | 45 | |
45 | @request_handler() | |
46 | @wrap_json_request_handler | |
46 | 47 | @defer.inlineCallbacks |
47 | 48 | def _async_render_GET(self, request): |
48 | 49 | set_cors_headers(request) |
46 | 46 | |
47 | 47 | import cgi |
48 | 48 | import logging |
49 | import urlparse | |
49 | from six.moves.urllib import parse as urlparse | |
50 | 50 | |
51 | 51 | logger = logging.getLogger(__name__) |
52 | 52 |
254 | 254 | self.open_file = open_file |
255 | 255 | |
256 | 256 | def write_to_consumer(self, consumer): |
257 | return FileSender().beginFileTransfer(self.open_file, consumer) | |
257 | return make_deferred_yieldable( | |
258 | FileSender().beginFileTransfer(self.open_file, consumer) | |
259 | ) | |
258 | 260 | |
259 | 261 | def __exit__(self, exc_type, exc_val, exc_tb): |
260 | 262 | self.open_file.close() |
34 | 34 | from synapse.api.errors import ( |
35 | 35 | SynapseError, Codes, |
36 | 36 | ) |
37 | from synapse.util.logcontext import preserve_fn, make_deferred_yieldable | |
37 | from synapse.util.logcontext import make_deferred_yieldable, run_in_background | |
38 | 38 | from synapse.util.stringutils import random_string |
39 | 39 | from synapse.util.caches.expiringcache import ExpiringCache |
40 | 40 | from synapse.http.client import SpiderHttpClient |
41 | 41 | from synapse.http.server import ( |
42 | request_handler, respond_with_json_bytes, | |
42 | respond_with_json_bytes, | |
43 | 43 | respond_with_json, |
44 | wrap_json_request_handler, | |
44 | 45 | ) |
45 | 46 | from synapse.util.async import ObservableDeferred |
46 | 47 | from synapse.util.stringutils import is_ascii |
56 | 57 | |
57 | 58 | self.auth = hs.get_auth() |
58 | 59 | self.clock = hs.get_clock() |
59 | self.version_string = hs.version_string | |
60 | 60 | self.filepaths = media_repo.filepaths |
61 | 61 | self.max_spider_size = hs.config.max_spider_size |
62 | 62 | self.server_name = hs.hostname |
89 | 89 | self._async_render_GET(request) |
90 | 90 | return NOT_DONE_YET |
91 | 91 | |
92 | @request_handler() | |
92 | @wrap_json_request_handler | |
93 | 93 | @defer.inlineCallbacks |
94 | 94 | def _async_render_GET(self, request): |
95 | 95 | |
143 | 143 | observable = self._cache.get(url) |
144 | 144 | |
145 | 145 | if not observable: |
146 | download = preserve_fn(self._do_preview)( | |
146 | download = run_in_background( | |
147 | self._do_preview, | |
147 | 148 | url, requester.user, ts, |
148 | 149 | ) |
149 | 150 | observable = ObservableDeferred( |
17 | 17 | from .media_storage import FileResponder |
18 | 18 | |
19 | 19 | from synapse.config._base import Config |
20 | from synapse.util.logcontext import preserve_fn | |
20 | from synapse.util.logcontext import run_in_background | |
21 | 21 | |
22 | 22 | import logging |
23 | 23 | import os |
86 | 86 | return self.backend.store_file(path, file_info) |
87 | 87 | else: |
88 | 88 | # TODO: Handle errors. |
89 | preserve_fn(self.backend.store_file)(path, file_info) | |
89 | def store(): | |
90 | try: | |
91 | return self.backend.store_file(path, file_info) | |
92 | except Exception: | |
93 | logger.exception("Error storing file") | |
94 | run_in_background(store) | |
90 | 95 | return defer.succeed(None) |
91 | 96 | |
92 | 97 | def fetch(self, path, file_info): |
13 | 13 | # limitations under the License. |
14 | 14 | |
15 | 15 | |
16 | import logging | |
17 | ||
18 | from twisted.internet import defer | |
19 | from twisted.web.resource import Resource | |
20 | from twisted.web.server import NOT_DONE_YET | |
21 | ||
22 | from synapse.http.server import ( | |
23 | set_cors_headers, | |
24 | wrap_json_request_handler, | |
25 | ) | |
26 | from synapse.http.servlet import parse_integer, parse_string | |
16 | 27 | from ._base import ( |
17 | parse_media_id, respond_404, respond_with_file, FileInfo, | |
28 | FileInfo, parse_media_id, respond_404, respond_with_file, | |
18 | 29 | respond_with_responder, |
19 | 30 | ) |
20 | from twisted.web.resource import Resource | |
21 | from synapse.http.servlet import parse_string, parse_integer | |
22 | from synapse.http.server import request_handler, set_cors_headers | |
23 | ||
24 | from twisted.web.server import NOT_DONE_YET | |
25 | from twisted.internet import defer | |
26 | ||
27 | import logging | |
28 | 31 | |
29 | 32 | logger = logging.getLogger(__name__) |
30 | 33 | |
40 | 43 | self.media_storage = media_storage |
41 | 44 | self.dynamic_thumbnails = hs.config.dynamic_thumbnails |
42 | 45 | self.server_name = hs.hostname |
43 | self.version_string = hs.version_string | |
44 | 46 | self.clock = hs.get_clock() |
45 | 47 | |
46 | 48 | def render_GET(self, request): |
47 | 49 | self._async_render_GET(request) |
48 | 50 | return NOT_DONE_YET |
49 | 51 | |
50 | @request_handler() | |
52 | @wrap_json_request_handler | |
51 | 53 | @defer.inlineCallbacks |
52 | 54 | def _async_render_GET(self, request): |
53 | 55 | set_cors_headers(request) |
12 | 12 | # See the License for the specific language governing permissions and |
13 | 13 | # limitations under the License. |
14 | 14 | |
15 | from synapse.http.server import respond_with_json, request_handler | |
15 | import logging | |
16 | ||
17 | from twisted.internet import defer | |
18 | from twisted.web.resource import Resource | |
19 | from twisted.web.server import NOT_DONE_YET | |
16 | 20 | |
17 | 21 | from synapse.api.errors import SynapseError |
18 | ||
19 | from twisted.web.server import NOT_DONE_YET | |
20 | from twisted.internet import defer | |
21 | ||
22 | from twisted.web.resource import Resource | |
23 | ||
24 | import logging | |
22 | from synapse.http.server import ( | |
23 | respond_with_json, | |
24 | wrap_json_request_handler, | |
25 | ) | |
25 | 26 | |
26 | 27 | logger = logging.getLogger(__name__) |
27 | 28 | |
39 | 40 | self.server_name = hs.hostname |
40 | 41 | self.auth = hs.get_auth() |
41 | 42 | self.max_upload_size = hs.config.max_upload_size |
42 | self.version_string = hs.version_string | |
43 | 43 | self.clock = hs.get_clock() |
44 | 44 | |
45 | 45 | def render_POST(self, request): |
50 | 50 | respond_with_json(request, 200, {}, send_cors=True) |
51 | 51 | return NOT_DONE_YET |
52 | 52 | |
53 | @request_handler() | |
53 | @wrap_json_request_handler | |
54 | 54 | @defer.inlineCallbacks |
55 | 55 | def _async_render_POST(self, request): |
56 | 56 | requester = yield self.auth.get_user_by_req(request) |
80 | 80 | headers = request.requestHeaders |
81 | 81 | |
82 | 82 | if headers.hasHeader("Content-Type"): |
83 | media_type = headers.getRawHeaders("Content-Type")[0] | |
83 | media_type = headers.getRawHeaders(b"Content-Type")[0] | |
84 | 84 | else: |
85 | 85 | raise SynapseError( |
86 | 86 | msg="Upload request missing 'Content-Type'", |
87 | 87 | code=400, |
88 | 88 | ) |
89 | 89 | |
90 | # if headers.hasHeader("Content-Disposition"): | |
91 | # disposition = headers.getRawHeaders("Content-Disposition")[0] | |
90 | # if headers.hasHeader(b"Content-Disposition"): | |
91 | # disposition = headers.getRawHeaders(b"Content-Disposition")[0] | |
92 | 92 | # TODO(markjh): parse content-dispostion |
93 | 93 | |
94 | 94 | content_uri = yield self.media_repo.create_content( |
104 | 104 | 'federation_client', |
105 | 105 | 'federation_server', |
106 | 106 | 'handlers', |
107 | 'v1auth', | |
108 | 107 | 'auth', |
109 | 108 | 'state_handler', |
110 | 109 | 'state_resolution_handler', |
224 | 223 | def build_simple_http_client(self): |
225 | 224 | return SimpleHttpClient(self) |
226 | 225 | |
227 | def build_v1auth(self): | |
228 | orf = Auth(self) | |
229 | # Matrix spec makes no reference to what HTTP status code is returned, | |
230 | # but the V1 API uses 403 where it means 401, and the webclient | |
231 | # relies on this behaviour, so V1 gets its own copy of the auth | |
232 | # with backwards compat behaviour. | |
233 | orf.TOKEN_NOT_FOUND_HTTP_STATUS = 403 | |
234 | return orf | |
235 | ||
236 | 226 | def build_state_handler(self): |
237 | 227 | return StateHandler(self) |
238 | 228 |
447 | 447 | "add_push_actions_to_staging", _add_push_actions_to_staging_txn |
448 | 448 | ) |
449 | 449 | |
450 | @defer.inlineCallbacks | |
450 | 451 | def remove_push_actions_from_staging(self, event_id): |
451 | 452 | """Called if we failed to persist the event to ensure that stale push |
452 | 453 | actions don't build up in the DB |
455 | 456 | event_id (str) |
456 | 457 | """ |
457 | 458 | |
458 | return self._simple_delete( | |
459 | table="event_push_actions_staging", | |
460 | keyvalues={ | |
461 | "event_id": event_id, | |
462 | }, | |
463 | desc="remove_push_actions_from_staging", | |
464 | ) | |
459 | try: | |
460 | res = yield self._simple_delete( | |
461 | table="event_push_actions_staging", | |
462 | keyvalues={ | |
463 | "event_id": event_id, | |
464 | }, | |
465 | desc="remove_push_actions_from_staging", | |
466 | ) | |
467 | defer.returnValue(res) | |
468 | except Exception: | |
469 | # this method is called from an exception handler, so propagating | |
470 | # another exception here really isn't helpful - there's nothing | |
471 | # the caller can do about it. Just log the exception and move on. | |
472 | logger.exception( | |
473 | "Error removing push actions after event persistence failure", | |
474 | ) | |
465 | 475 | |
466 | 476 | @defer.inlineCallbacks |
467 | 477 | def _find_stream_orderings_for_times(self): |
20 | 20 | |
21 | 21 | import simplejson as json |
22 | 22 | from twisted.internet import defer |
23 | ||
24 | 23 | |
25 | 24 | from synapse.storage.events_worker import EventsWorkerStore |
26 | 25 | from synapse.util.async import ObservableDeferred |
424 | 423 | ) |
425 | 424 | current_state = yield self._get_new_state_after_events( |
426 | 425 | room_id, |
427 | ev_ctx_rm, new_latest_event_ids, | |
426 | ev_ctx_rm, | |
427 | latest_event_ids, | |
428 | new_latest_event_ids, | |
428 | 429 | ) |
429 | 430 | if current_state is not None: |
430 | 431 | current_state_for_room[room_id] = current_state |
512 | 513 | defer.returnValue(new_latest_event_ids) |
513 | 514 | |
514 | 515 | @defer.inlineCallbacks |
515 | def _get_new_state_after_events(self, room_id, events_context, new_latest_event_ids): | |
516 | def _get_new_state_after_events(self, room_id, events_context, old_latest_event_ids, | |
517 | new_latest_event_ids): | |
516 | 518 | """Calculate the current state dict after adding some new events to |
517 | 519 | a room |
518 | 520 | |
522 | 524 | |
523 | 525 | events_context (list[(EventBase, EventContext)]): |
524 | 526 | events and contexts which are being added to the room |
527 | ||
528 | old_latest_event_ids (iterable[str]): | |
529 | the old forward extremities for the room. | |
525 | 530 | |
526 | 531 | new_latest_event_ids (iterable[str]): |
527 | 532 | the new forward extremities for the room. |
533 | 538 | """ |
534 | 539 | |
535 | 540 | if not new_latest_event_ids: |
536 | defer.returnValue({}) | |
541 | return | |
537 | 542 | |
538 | 543 | # map from state_group to ((type, key) -> event_id) state map |
539 | state_groups = {} | |
540 | missing_event_ids = [] | |
541 | was_updated = False | |
544 | state_groups_map = {} | |
545 | for ev, ctx in events_context: | |
546 | if ctx.state_group is None: | |
547 | # I don't think this can happen, but let's double-check | |
548 | raise Exception( | |
549 | "Context for new extremity event %s has no state " | |
550 | "group" % (ev.event_id, ), | |
551 | ) | |
552 | ||
553 | if ctx.state_group in state_groups_map: | |
554 | continue | |
555 | ||
556 | state_groups_map[ctx.state_group] = ctx.current_state_ids | |
557 | ||
558 | # We need to map the event_ids to their state groups. First, let's | |
559 | # check if the event is one we're persisting, in which case we can | |
560 | # pull the state group from its context. | |
561 | # Otherwise we need to pull the state group from the database. | |
562 | ||
563 | # Set of events we need to fetch groups for. (We know none of the old | |
564 | # extremities are going to be in events_context). | |
565 | missing_event_ids = set(old_latest_event_ids) | |
566 | ||
567 | event_id_to_state_group = {} | |
542 | 568 | for event_id in new_latest_event_ids: |
543 | # First search in the list of new events we're adding, | |
544 | # and then use the current state from that | |
569 | # First search in the list of new events we're adding. | |
545 | 570 | for ev, ctx in events_context: |
546 | 571 | if event_id == ev.event_id: |
547 | if ctx.current_state_ids is None: | |
548 | raise Exception("Unknown current state") | |
549 | ||
550 | if ctx.state_group is None: | |
551 | # I don't think this can happen, but let's double-check | |
552 | raise Exception( | |
553 | "Context for new extremity event %s has no state " | |
554 | "group" % (event_id, ), | |
555 | ) | |
556 | ||
557 | # If we've already seen the state group don't bother adding | |
558 | # it to the state sets again | |
559 | if ctx.state_group not in state_groups: | |
560 | state_groups[ctx.state_group] = ctx.current_state_ids | |
561 | if ctx.delta_ids or hasattr(ev, "state_key"): | |
562 | was_updated = True | |
572 | event_id_to_state_group[event_id] = ctx.state_group | |
563 | 573 | break |
564 | 574 | else: |
565 | 575 | # If we couldn't find it, then we'll need to pull |
566 | 576 | # the state from the database |
567 | was_updated = True | |
568 | missing_event_ids.append(event_id) | |
569 | ||
570 | if not was_updated: | |
571 | return | |
577 | missing_event_ids.add(event_id) | |
572 | 578 | |
573 | 579 | if missing_event_ids: |
574 | # Now pull out the state for any missing events from DB | |
580 | # Now pull out the state groups for any missing events from DB | |
575 | 581 | event_to_groups = yield self._get_state_group_for_events( |
576 | 582 | missing_event_ids, |
577 | 583 | ) |
578 | ||
579 | groups = set(event_to_groups.itervalues()) - set(state_groups.iterkeys()) | |
580 | ||
581 | if groups: | |
582 | group_to_state = yield self._get_state_for_groups(groups) | |
583 | state_groups.update(group_to_state) | |
584 | ||
585 | if len(state_groups) == 1: | |
584 | event_id_to_state_group.update(event_to_groups) | |
585 | ||
586 | # State groups of old_latest_event_ids | |
587 | old_state_groups = set( | |
588 | event_id_to_state_group[evid] for evid in old_latest_event_ids | |
589 | ) | |
590 | ||
591 | # State groups of new_latest_event_ids | |
592 | new_state_groups = set( | |
593 | event_id_to_state_group[evid] for evid in new_latest_event_ids | |
594 | ) | |
595 | ||
596 | # If they old and new groups are the same then we don't need to do | |
597 | # anything. | |
598 | if old_state_groups == new_state_groups: | |
599 | return | |
600 | ||
601 | # Now that we have calculated new_state_groups we need to get | |
602 | # their state IDs so we can resolve to a single state set. | |
603 | missing_state = new_state_groups - set(state_groups_map) | |
604 | if missing_state: | |
605 | group_to_state = yield self._get_state_for_groups(missing_state) | |
606 | state_groups_map.update(group_to_state) | |
607 | ||
608 | if len(new_state_groups) == 1: | |
586 | 609 | # If there is only one state group, then we know what the current |
587 | 610 | # state is. |
588 | defer.returnValue(state_groups.values()[0]) | |
611 | defer.returnValue(state_groups_map[new_state_groups.pop()]) | |
612 | ||
613 | # Ok, we need to defer to the state handler to resolve our state sets. | |
589 | 614 | |
590 | 615 | def get_events(ev_ids): |
591 | 616 | return self.get_events( |
592 | 617 | ev_ids, get_prev_content=False, check_redacted=False, |
593 | 618 | ) |
619 | ||
620 | state_groups = { | |
621 | sg: state_groups_map[sg] for sg in new_state_groups | |
622 | } | |
623 | ||
594 | 624 | events_map = {ev.event_id: ev for ev, _ in events_context} |
595 | 625 | logger.debug("calling resolve_state_groups from preserve_events") |
596 | 626 | res = yield self._state_resolution_handler.resolve_state_groups( |
19 | 19 | from synapse.events.utils import prune_event |
20 | 20 | |
21 | 21 | from synapse.util.logcontext import ( |
22 | preserve_fn, PreserveLoggingContext, make_deferred_yieldable | |
22 | PreserveLoggingContext, make_deferred_yieldable, run_in_background, | |
23 | 23 | ) |
24 | 24 | from synapse.util.metrics import Measure |
25 | 25 | from synapse.api.errors import SynapseError |
318 | 318 | |
319 | 319 | res = yield make_deferred_yieldable(defer.gatherResults( |
320 | 320 | [ |
321 | preserve_fn(self._get_event_from_row)( | |
321 | run_in_background( | |
322 | self._get_event_from_row, | |
322 | 323 | row["internal_metadata"], row["json"], row["redacts"], |
323 | 324 | rejected_reason=row["rejects"], |
324 | 325 | ) |
20 | 20 | from synapse.storage import background_updates |
21 | 21 | from synapse.storage._base import SQLBaseStore |
22 | 22 | from synapse.util.caches.descriptors import cached, cachedInlineCallbacks |
23 | ||
24 | from six.moves import range | |
23 | 25 | |
24 | 26 | |
25 | 27 | class RegistrationWorkerStore(SQLBaseStore): |
468 | 470 | match = regex.search(user_id) |
469 | 471 | if match: |
470 | 472 | found.add(int(match.group(1))) |
471 | for i in xrange(len(found) + 1): | |
473 | for i in range(len(found) + 1): | |
472 | 474 | if i not in found: |
473 | 475 | return i |
474 | 476 | |
523 | 525 | except self.database_engine.module.IntegrityError: |
524 | 526 | ret = yield self.get_3pid_guest_access_token(medium, address) |
525 | 527 | defer.returnValue(ret) |
528 | ||
529 | def add_user_pending_deactivation(self, user_id): | |
530 | """ | |
531 | Adds a user to the table of users who need to be parted from all the rooms they're | |
532 | in | |
533 | """ | |
534 | return self._simple_insert( | |
535 | "users_pending_deactivation", | |
536 | values={ | |
537 | "user_id": user_id, | |
538 | }, | |
539 | desc="add_user_pending_deactivation", | |
540 | ) | |
541 | ||
542 | def del_user_pending_deactivation(self, user_id): | |
543 | """ | |
544 | Removes the given user to the table of users who need to be parted from all the | |
545 | rooms they're in, effectively marking that user as fully deactivated. | |
546 | """ | |
547 | return self._simple_delete_one( | |
548 | "users_pending_deactivation", | |
549 | keyvalues={ | |
550 | "user_id": user_id, | |
551 | }, | |
552 | desc="del_user_pending_deactivation", | |
553 | ) | |
554 | ||
555 | def get_user_pending_deactivation(self): | |
556 | """ | |
557 | Gets one user from the table of users waiting to be parted from all the rooms | |
558 | they're in. | |
559 | """ | |
560 | return self._simple_select_one_onecol( | |
561 | "users_pending_deactivation", | |
562 | keyvalues={}, | |
563 | retcol="user_id", | |
564 | allow_none=True, | |
565 | desc="get_users_pending_deactivation", | |
566 | ) |
12 | 12 | # limitations under the License. |
13 | 13 | import logging |
14 | 14 | from synapse.config.appservice import load_appservices |
15 | ||
16 | from six.moves import range | |
15 | 17 | |
16 | 18 | |
17 | 19 | logger = logging.getLogger(__name__) |
57 | 59 | |
58 | 60 | for as_id, user_ids in owned.items(): |
59 | 61 | n = 100 |
60 | user_chunks = (user_ids[i:i + 100] for i in xrange(0, len(user_ids), n)) | |
62 | user_chunks = (user_ids[i:i + 100] for i in range(0, len(user_ids), n)) | |
61 | 63 | for chunk in user_chunks: |
62 | 64 | cur.execute( |
63 | 65 | database_engine.convert_param_style( |
0 | /* Copyright 2018 New Vector Ltd | |
1 | * | |
2 | * Licensed under the Apache License, Version 2.0 (the "License"); | |
3 | * you may not use this file except in compliance with the License. | |
4 | * You may obtain a copy of the License at | |
5 | * | |
6 | * http://www.apache.org/licenses/LICENSE-2.0 | |
7 | * | |
8 | * Unless required by applicable law or agreed to in writing, software | |
9 | * distributed under the License is distributed on an "AS IS" BASIS, | |
10 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
11 | * See the License for the specific language governing permissions and | |
12 | * limitations under the License. | |
13 | */ | |
14 | ||
15 | /* | |
16 | * Store any accounts that have been requested to be deactivated. | |
17 | * We part the account from all the rooms its in when its | |
18 | * deactivated. This can take some time and synapse may be restarted | |
19 | * before it completes, so store the user IDs here until the process | |
20 | * is complete. | |
21 | */ | |
22 | CREATE TABLE users_pending_deactivation ( | |
23 | user_id TEXT NOT NULL | |
24 | ); |
0 | # Copyright 2018 New Vector Ltd | |
1 | # | |
2 | # Licensed under the Apache License, Version 2.0 (the "License"); | |
3 | # you may not use this file except in compliance with the License. | |
4 | # You may obtain a copy of the License at | |
5 | # | |
6 | # http://www.apache.org/licenses/LICENSE-2.0 | |
7 | # | |
8 | # Unless required by applicable law or agreed to in writing, software | |
9 | # distributed under the License is distributed on an "AS IS" BASIS, | |
10 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
11 | # See the License for the specific language governing permissions and | |
12 | # limitations under the License. | |
13 | ||
14 | from synapse.storage.engines import PostgresEngine | |
15 | from synapse.storage.prepare_database import get_statements | |
16 | ||
17 | FIX_INDEXES = """ | |
18 | -- rebuild indexes as uniques | |
19 | DROP INDEX groups_invites_g_idx; | |
20 | CREATE UNIQUE INDEX group_invites_g_idx ON group_invites(group_id, user_id); | |
21 | DROP INDEX groups_users_g_idx; | |
22 | CREATE UNIQUE INDEX group_users_g_idx ON group_users(group_id, user_id); | |
23 | ||
24 | -- rename other indexes to actually match their table names.. | |
25 | DROP INDEX groups_users_u_idx; | |
26 | CREATE INDEX group_users_u_idx ON group_users(user_id); | |
27 | DROP INDEX groups_invites_u_idx; | |
28 | CREATE INDEX group_invites_u_idx ON group_invites(user_id); | |
29 | DROP INDEX groups_rooms_g_idx; | |
30 | CREATE UNIQUE INDEX group_rooms_g_idx ON group_rooms(group_id, room_id); | |
31 | DROP INDEX groups_rooms_r_idx; | |
32 | CREATE INDEX group_rooms_r_idx ON group_rooms(room_id); | |
33 | """ | |
34 | ||
35 | ||
36 | def run_create(cur, database_engine, *args, **kwargs): | |
37 | rowid = "ctid" if isinstance(database_engine, PostgresEngine) else "rowid" | |
38 | ||
39 | # remove duplicates from group_users & group_invites tables | |
40 | cur.execute(""" | |
41 | DELETE FROM group_users WHERE %s NOT IN ( | |
42 | SELECT min(%s) FROM group_users GROUP BY group_id, user_id | |
43 | ); | |
44 | """ % (rowid, rowid)) | |
45 | cur.execute(""" | |
46 | DELETE FROM group_invites WHERE %s NOT IN ( | |
47 | SELECT min(%s) FROM group_invites GROUP BY group_id, user_id | |
48 | ); | |
49 | """ % (rowid, rowid)) | |
50 | ||
51 | for statement in get_statements(FIX_INDEXES.splitlines()): | |
52 | cur.execute(statement) | |
53 | ||
54 | ||
55 | def run_upgrade(*args, **kwargs): | |
56 | pass |
37 | 37 | from synapse.storage._base import SQLBaseStore |
38 | 38 | from synapse.storage.events import EventsWorkerStore |
39 | 39 | |
40 | from synapse.util.caches.descriptors import cached | |
41 | 40 | from synapse.types import RoomStreamToken |
42 | 41 | from synapse.util.caches.stream_change_cache import StreamChangeCache |
43 | from synapse.util.logcontext import make_deferred_yieldable, preserve_fn | |
44 | from synapse.storage.engines import PostgresEngine, Sqlite3Engine | |
42 | from synapse.util.logcontext import make_deferred_yieldable, run_in_background | |
43 | from synapse.storage.engines import PostgresEngine | |
45 | 44 | |
46 | 45 | import abc |
47 | 46 | import logging |
48 | 47 | |
48 | from six.moves import range | |
49 | from collections import namedtuple | |
50 | ||
49 | 51 | |
50 | 52 | logger = logging.getLogger(__name__) |
51 | 53 | |
55 | 57 | |
56 | 58 | _STREAM_TOKEN = "stream" |
57 | 59 | _TOPOLOGICAL_TOKEN = "topological" |
60 | ||
61 | ||
62 | # Used as return values for pagination APIs | |
63 | _EventDictReturn = namedtuple("_EventDictReturn", ( | |
64 | "event_id", "topological_ordering", "stream_ordering", | |
65 | )) | |
58 | 66 | |
59 | 67 | |
60 | 68 | def lower_bound(token, engine, inclusive=False): |
195 | 203 | |
196 | 204 | results = {} |
197 | 205 | room_ids = list(room_ids) |
198 | for rm_ids in (room_ids[i:i + 20] for i in xrange(0, len(room_ids), 20)): | |
206 | for rm_ids in (room_ids[i:i + 20] for i in range(0, len(room_ids), 20)): | |
199 | 207 | res = yield make_deferred_yieldable(defer.gatherResults([ |
200 | preserve_fn(self.get_room_events_stream_for_room)( | |
208 | run_in_background( | |
209 | self.get_room_events_stream_for_room, | |
201 | 210 | room_id, from_key, to_key, limit, order=order, |
202 | 211 | ) |
203 | 212 | for room_id in rm_ids |
204 | ])) | |
213 | ], consumeErrors=True)) | |
205 | 214 | results.update(dict(zip(rm_ids, res))) |
206 | 215 | |
207 | 216 | defer.returnValue(results) |
223 | 232 | @defer.inlineCallbacks |
224 | 233 | def get_room_events_stream_for_room(self, room_id, from_key, to_key, limit=0, |
225 | 234 | order='DESC'): |
226 | # Note: If from_key is None then we return in topological order. This | |
227 | # is because in that case we're using this as a "get the last few messages | |
228 | # in a room" function, rather than "get new messages since last sync" | |
229 | if from_key is not None: | |
230 | from_id = RoomStreamToken.parse_stream_token(from_key).stream | |
231 | else: | |
232 | from_id = None | |
233 | to_id = RoomStreamToken.parse_stream_token(to_key).stream | |
234 | ||
235 | ||
236 | """Get new room events in stream ordering since `from_key`. | |
237 | ||
238 | Args: | |
239 | room_id (str) | |
240 | from_key (str): Token from which no events are returned before | |
241 | to_key (str): Token from which no events are returned after. (This | |
242 | is typically the current stream token) | |
243 | limit (int): Maximum number of events to return | |
244 | order (str): Either "DESC" or "ASC". Determines which events are | |
245 | returned when the result is limited. If "DESC" then the most | |
246 | recent `limit` events are returned, otherwise returns the | |
247 | oldest `limit` events. | |
248 | ||
249 | Returns: | |
250 | Deferred[tuple[list[FrozenEvent], str]]: Returns the list of | |
251 | events (in ascending order) and the token from the start of | |
252 | the chunk of events returned. | |
253 | """ | |
235 | 254 | if from_key == to_key: |
236 | 255 | defer.returnValue(([], from_key)) |
237 | 256 | |
238 | if from_id: | |
239 | has_changed = yield self._events_stream_cache.has_entity_changed( | |
240 | room_id, from_id | |
241 | ) | |
242 | ||
243 | if not has_changed: | |
244 | defer.returnValue(([], from_key)) | |
257 | from_id = RoomStreamToken.parse_stream_token(from_key).stream | |
258 | to_id = RoomStreamToken.parse_stream_token(to_key).stream | |
259 | ||
260 | has_changed = yield self._events_stream_cache.has_entity_changed( | |
261 | room_id, from_id | |
262 | ) | |
263 | ||
264 | if not has_changed: | |
265 | defer.returnValue(([], from_key)) | |
245 | 266 | |
246 | 267 | def f(txn): |
247 | if from_id is not None: | |
248 | sql = ( | |
249 | "SELECT event_id, stream_ordering FROM events WHERE" | |
250 | " room_id = ?" | |
251 | " AND not outlier" | |
252 | " AND stream_ordering > ? AND stream_ordering <= ?" | |
253 | " ORDER BY stream_ordering %s LIMIT ?" | |
254 | ) % (order,) | |
255 | txn.execute(sql, (room_id, from_id, to_id, limit)) | |
256 | else: | |
257 | sql = ( | |
258 | "SELECT event_id, stream_ordering FROM events WHERE" | |
259 | " room_id = ?" | |
260 | " AND not outlier" | |
261 | " AND stream_ordering <= ?" | |
262 | " ORDER BY topological_ordering %s, stream_ordering %s LIMIT ?" | |
263 | ) % (order, order,) | |
264 | txn.execute(sql, (room_id, to_id, limit)) | |
265 | ||
266 | rows = self.cursor_to_dict(txn) | |
267 | ||
268 | sql = ( | |
269 | "SELECT event_id, stream_ordering FROM events WHERE" | |
270 | " room_id = ?" | |
271 | " AND not outlier" | |
272 | " AND stream_ordering > ? AND stream_ordering <= ?" | |
273 | " ORDER BY stream_ordering %s LIMIT ?" | |
274 | ) % (order,) | |
275 | txn.execute(sql, (room_id, from_id, to_id, limit)) | |
276 | ||
277 | rows = [_EventDictReturn(row[0], None, row[1]) for row in txn] | |
268 | 278 | return rows |
269 | 279 | |
270 | 280 | rows = yield self.runInteraction("get_room_events_stream_for_room", f) |
271 | 281 | |
272 | 282 | ret = yield self._get_events( |
273 | [r["event_id"] for r in rows], | |
283 | [r.event_id for r in rows], | |
274 | 284 | get_prev_content=True |
275 | 285 | ) |
276 | 286 | |
280 | 290 | ret.reverse() |
281 | 291 | |
282 | 292 | if rows: |
283 | key = "s%d" % min(r["stream_ordering"] for r in rows) | |
293 | key = "s%d" % min(r.stream_ordering for r in rows) | |
284 | 294 | else: |
285 | 295 | # Assume we didn't get anything because there was nothing to |
286 | 296 | # get. |
290 | 300 | |
291 | 301 | @defer.inlineCallbacks |
292 | 302 | def get_membership_changes_for_user(self, user_id, from_key, to_key): |
293 | if from_key is not None: | |
294 | from_id = RoomStreamToken.parse_stream_token(from_key).stream | |
295 | else: | |
296 | from_id = None | |
303 | from_id = RoomStreamToken.parse_stream_token(from_key).stream | |
297 | 304 | to_id = RoomStreamToken.parse_stream_token(to_key).stream |
298 | 305 | |
299 | 306 | if from_key == to_key: |
307 | 314 | defer.returnValue([]) |
308 | 315 | |
309 | 316 | def f(txn): |
310 | if from_id is not None: | |
311 | sql = ( | |
312 | "SELECT m.event_id, stream_ordering FROM events AS e," | |
313 | " room_memberships AS m" | |
314 | " WHERE e.event_id = m.event_id" | |
315 | " AND m.user_id = ?" | |
316 | " AND e.stream_ordering > ? AND e.stream_ordering <= ?" | |
317 | " ORDER BY e.stream_ordering ASC" | |
318 | ) | |
319 | txn.execute(sql, (user_id, from_id, to_id,)) | |
320 | else: | |
321 | sql = ( | |
322 | "SELECT m.event_id, stream_ordering FROM events AS e," | |
323 | " room_memberships AS m" | |
324 | " WHERE e.event_id = m.event_id" | |
325 | " AND m.user_id = ?" | |
326 | " AND stream_ordering <= ?" | |
327 | " ORDER BY stream_ordering ASC" | |
328 | ) | |
329 | txn.execute(sql, (user_id, to_id,)) | |
330 | rows = self.cursor_to_dict(txn) | |
317 | sql = ( | |
318 | "SELECT m.event_id, stream_ordering FROM events AS e," | |
319 | " room_memberships AS m" | |
320 | " WHERE e.event_id = m.event_id" | |
321 | " AND m.user_id = ?" | |
322 | " AND e.stream_ordering > ? AND e.stream_ordering <= ?" | |
323 | " ORDER BY e.stream_ordering ASC" | |
324 | ) | |
325 | txn.execute(sql, (user_id, from_id, to_id,)) | |
326 | ||
327 | rows = [_EventDictReturn(row[0], None, row[1]) for row in txn] | |
331 | 328 | |
332 | 329 | return rows |
333 | 330 | |
334 | 331 | rows = yield self.runInteraction("get_membership_changes_for_user", f) |
335 | 332 | |
336 | 333 | ret = yield self._get_events( |
337 | [r["event_id"] for r in rows], | |
334 | [r.event_id for r in rows], | |
338 | 335 | get_prev_content=True |
339 | 336 | ) |
340 | 337 | |
343 | 340 | defer.returnValue(ret) |
344 | 341 | |
345 | 342 | @defer.inlineCallbacks |
346 | def get_recent_events_for_room(self, room_id, limit, end_token, from_token=None): | |
343 | def get_recent_events_for_room(self, room_id, limit, end_token): | |
344 | """Get the most recent events in the room in topological ordering. | |
345 | ||
346 | Args: | |
347 | room_id (str) | |
348 | limit (int) | |
349 | end_token (str): The stream token representing now. | |
350 | ||
351 | Returns: | |
352 | Deferred[tuple[list[FrozenEvent], str]]: Returns a list of | |
353 | events and a token pointing to the start of the returned | |
354 | events. | |
355 | The events returned are in ascending order. | |
356 | """ | |
357 | ||
347 | 358 | rows, token = yield self.get_recent_event_ids_for_room( |
348 | room_id, limit, end_token, from_token | |
359 | room_id, limit, end_token, | |
349 | 360 | ) |
350 | 361 | |
351 | 362 | logger.debug("stream before") |
352 | 363 | events = yield self._get_events( |
353 | [r["event_id"] for r in rows], | |
364 | [r.event_id for r in rows], | |
354 | 365 | get_prev_content=True |
355 | 366 | ) |
356 | 367 | logger.debug("stream after") |
359 | 370 | |
360 | 371 | defer.returnValue((events, token)) |
361 | 372 | |
362 | @cached(num_args=4) | |
363 | def get_recent_event_ids_for_room(self, room_id, limit, end_token, from_token=None): | |
364 | end_token = RoomStreamToken.parse_stream_token(end_token) | |
365 | ||
366 | if from_token is None: | |
367 | sql = ( | |
368 | "SELECT stream_ordering, topological_ordering, event_id" | |
369 | " FROM events" | |
370 | " WHERE room_id = ? AND stream_ordering <= ? AND outlier = ?" | |
371 | " ORDER BY topological_ordering DESC, stream_ordering DESC" | |
372 | " LIMIT ?" | |
373 | ) | |
374 | else: | |
375 | from_token = RoomStreamToken.parse_stream_token(from_token) | |
376 | sql = ( | |
377 | "SELECT stream_ordering, topological_ordering, event_id" | |
378 | " FROM events" | |
379 | " WHERE room_id = ? AND stream_ordering > ?" | |
380 | " AND stream_ordering <= ? AND outlier = ?" | |
381 | " ORDER BY topological_ordering DESC, stream_ordering DESC" | |
382 | " LIMIT ?" | |
383 | ) | |
384 | ||
385 | def get_recent_events_for_room_txn(txn): | |
386 | if from_token is None: | |
387 | txn.execute(sql, (room_id, end_token.stream, False, limit,)) | |
388 | else: | |
389 | txn.execute(sql, ( | |
390 | room_id, from_token.stream, end_token.stream, False, limit | |
391 | )) | |
392 | ||
393 | rows = self.cursor_to_dict(txn) | |
394 | ||
395 | rows.reverse() # As we selected with reverse ordering | |
396 | ||
397 | if rows: | |
398 | # Tokens are positions between events. | |
399 | # This token points *after* the last event in the chunk. | |
400 | # We need it to point to the event before it in the chunk | |
401 | # since we are going backwards so we subtract one from the | |
402 | # stream part. | |
403 | topo = rows[0]["topological_ordering"] | |
404 | toke = rows[0]["stream_ordering"] - 1 | |
405 | start_token = str(RoomStreamToken(topo, toke)) | |
406 | ||
407 | token = (start_token, str(end_token)) | |
408 | else: | |
409 | token = (str(end_token), str(end_token)) | |
410 | ||
411 | return rows, token | |
412 | ||
413 | return self.runInteraction( | |
414 | "get_recent_events_for_room", get_recent_events_for_room_txn | |
415 | ) | |
373 | @defer.inlineCallbacks | |
374 | def get_recent_event_ids_for_room(self, room_id, limit, end_token): | |
375 | """Get the most recent events in the room in topological ordering. | |
376 | ||
377 | Args: | |
378 | room_id (str) | |
379 | limit (int) | |
380 | end_token (str): The stream token representing now. | |
381 | ||
382 | Returns: | |
383 | Deferred[tuple[list[_EventDictReturn], str]]: Returns a list of | |
384 | _EventDictReturn and a token pointing to the start of the returned | |
385 | events. | |
386 | The events returned are in ascending order. | |
387 | """ | |
388 | # Allow a zero limit here, and no-op. | |
389 | if limit == 0: | |
390 | defer.returnValue(([], end_token)) | |
391 | ||
392 | end_token = RoomStreamToken.parse(end_token) | |
393 | ||
394 | rows, token = yield self.runInteraction( | |
395 | "get_recent_event_ids_for_room", self._paginate_room_events_txn, | |
396 | room_id, from_token=end_token, limit=limit, | |
397 | ) | |
398 | ||
399 | # We want to return the results in ascending order. | |
400 | rows.reverse() | |
401 | ||
402 | defer.returnValue((rows, token)) | |
416 | 403 | |
417 | 404 | def get_room_event_after_stream_ordering(self, room_id, stream_ordering): |
418 | 405 | """Gets details of the first event in a room at or after a stream ordering |
516 | 503 | |
517 | 504 | @staticmethod |
518 | 505 | def _set_before_and_after(events, rows, topo_order=True): |
506 | """Inserts ordering information to events' internal metadata from | |
507 | the DB rows. | |
508 | ||
509 | Args: | |
510 | events (list[FrozenEvent]) | |
511 | rows (list[_EventDictReturn]) | |
512 | topo_order (bool): Whether the events were ordered topologically | |
513 | or by stream ordering. If true then all rows should have a non | |
514 | null topological_ordering. | |
515 | """ | |
519 | 516 | for event, row in zip(events, rows): |
520 | stream = row["stream_ordering"] | |
521 | if topo_order: | |
522 | topo = event.depth | |
517 | stream = row.stream_ordering | |
518 | if topo_order and row.topological_ordering: | |
519 | topo = row.topological_ordering | |
523 | 520 | else: |
524 | 521 | topo = None |
525 | 522 | internal = event.internal_metadata |
591 | 588 | retcols=["stream_ordering", "topological_ordering"], |
592 | 589 | ) |
593 | 590 | |
594 | token = RoomStreamToken( | |
591 | # Paginating backwards includes the event at the token, but paginating | |
592 | # forward doesn't. | |
593 | before_token = RoomStreamToken( | |
594 | results["topological_ordering"] - 1, | |
595 | results["stream_ordering"], | |
596 | ) | |
597 | ||
598 | after_token = RoomStreamToken( | |
595 | 599 | results["topological_ordering"], |
596 | 600 | results["stream_ordering"], |
597 | 601 | ) |
598 | 602 | |
599 | if isinstance(self.database_engine, Sqlite3Engine): | |
600 | # SQLite3 doesn't optimise ``(x < a) OR (x = a AND y < b)`` | |
601 | # So we give pass it to SQLite3 as the UNION ALL of the two queries. | |
602 | ||
603 | query_before = ( | |
604 | "SELECT topological_ordering, stream_ordering, event_id FROM events" | |
605 | " WHERE room_id = ? AND topological_ordering < ?" | |
606 | " UNION ALL" | |
607 | " SELECT topological_ordering, stream_ordering, event_id FROM events" | |
608 | " WHERE room_id = ? AND topological_ordering = ? AND stream_ordering < ?" | |
609 | " ORDER BY topological_ordering DESC, stream_ordering DESC LIMIT ?" | |
610 | ) | |
611 | before_args = ( | |
612 | room_id, token.topological, | |
613 | room_id, token.topological, token.stream, | |
614 | before_limit, | |
615 | ) | |
616 | ||
617 | query_after = ( | |
618 | "SELECT topological_ordering, stream_ordering, event_id FROM events" | |
619 | " WHERE room_id = ? AND topological_ordering > ?" | |
620 | " UNION ALL" | |
621 | " SELECT topological_ordering, stream_ordering, event_id FROM events" | |
622 | " WHERE room_id = ? AND topological_ordering = ? AND stream_ordering > ?" | |
623 | " ORDER BY topological_ordering ASC, stream_ordering ASC LIMIT ?" | |
624 | ) | |
625 | after_args = ( | |
626 | room_id, token.topological, | |
627 | room_id, token.topological, token.stream, | |
628 | after_limit, | |
629 | ) | |
630 | else: | |
631 | query_before = ( | |
632 | "SELECT topological_ordering, stream_ordering, event_id FROM events" | |
633 | " WHERE room_id = ? AND %s" | |
634 | " ORDER BY topological_ordering DESC, stream_ordering DESC LIMIT ?" | |
635 | ) % (upper_bound(token, self.database_engine, inclusive=False),) | |
636 | ||
637 | before_args = (room_id, before_limit) | |
638 | ||
639 | query_after = ( | |
640 | "SELECT topological_ordering, stream_ordering, event_id FROM events" | |
641 | " WHERE room_id = ? AND %s" | |
642 | " ORDER BY topological_ordering ASC, stream_ordering ASC LIMIT ?" | |
643 | ) % (lower_bound(token, self.database_engine, inclusive=False),) | |
644 | ||
645 | after_args = (room_id, after_limit) | |
646 | ||
647 | txn.execute(query_before, before_args) | |
648 | ||
649 | rows = self.cursor_to_dict(txn) | |
650 | events_before = [r["event_id"] for r in rows] | |
651 | ||
652 | if rows: | |
653 | start_token = str(RoomStreamToken( | |
654 | rows[0]["topological_ordering"], | |
655 | rows[0]["stream_ordering"] - 1, | |
656 | )) | |
657 | else: | |
658 | start_token = str(RoomStreamToken( | |
659 | token.topological, | |
660 | token.stream - 1, | |
661 | )) | |
662 | ||
663 | txn.execute(query_after, after_args) | |
664 | ||
665 | rows = self.cursor_to_dict(txn) | |
666 | events_after = [r["event_id"] for r in rows] | |
667 | ||
668 | if rows: | |
669 | end_token = str(RoomStreamToken( | |
670 | rows[-1]["topological_ordering"], | |
671 | rows[-1]["stream_ordering"], | |
672 | )) | |
673 | else: | |
674 | end_token = str(token) | |
603 | rows, start_token = self._paginate_room_events_txn( | |
604 | txn, room_id, before_token, direction='b', limit=before_limit, | |
605 | ) | |
606 | events_before = [r.event_id for r in rows] | |
607 | ||
608 | rows, end_token = self._paginate_room_events_txn( | |
609 | txn, room_id, after_token, direction='f', limit=after_limit, | |
610 | ) | |
611 | events_after = [r.event_id for r in rows] | |
675 | 612 | |
676 | 613 | return { |
677 | 614 | "before": { |
734 | 671 | def has_room_changed_since(self, room_id, stream_id): |
735 | 672 | return self._events_stream_cache.has_entity_changed(room_id, stream_id) |
736 | 673 | |
737 | ||
738 | class StreamStore(StreamWorkerStore): | |
739 | def get_room_max_stream_ordering(self): | |
740 | return self._stream_id_gen.get_current_token() | |
741 | ||
742 | def get_room_min_stream_ordering(self): | |
743 | return self._backfill_id_gen.get_current_token() | |
744 | ||
745 | @defer.inlineCallbacks | |
746 | def paginate_room_events(self, room_id, from_key, to_key=None, | |
747 | direction='b', limit=-1, event_filter=None): | |
674 | def _paginate_room_events_txn(self, txn, room_id, from_token, to_token=None, | |
675 | direction='b', limit=-1, event_filter=None): | |
676 | """Returns list of events before or after a given token. | |
677 | ||
678 | Args: | |
679 | txn | |
680 | room_id (str) | |
681 | from_token (RoomStreamToken): The token used to stream from | |
682 | to_token (RoomStreamToken|None): A token which if given limits the | |
683 | results to only those before | |
684 | direction(char): Either 'b' or 'f' to indicate whether we are | |
685 | paginating forwards or backwards from `from_key`. | |
686 | limit (int): The maximum number of events to return. Zero or less | |
687 | means no limit. | |
688 | event_filter (Filter|None): If provided filters the events to | |
689 | those that match the filter. | |
690 | ||
691 | Returns: | |
692 | Deferred[tuple[list[_EventDictReturn], str]]: Returns the results | |
693 | as a list of _EventDictReturn and a token that points to the end | |
694 | of the result set. | |
695 | """ | |
748 | 696 | # Tokens really represent positions between elements, but we use |
749 | 697 | # the convention of pointing to the event before the gap. Hence |
750 | 698 | # we have a bit of asymmetry when it comes to equalities. |
752 | 700 | if direction == 'b': |
753 | 701 | order = "DESC" |
754 | 702 | bounds = upper_bound( |
755 | RoomStreamToken.parse(from_key), self.database_engine | |
756 | ) | |
757 | if to_key: | |
703 | from_token, self.database_engine | |
704 | ) | |
705 | if to_token: | |
758 | 706 | bounds = "%s AND %s" % (bounds, lower_bound( |
759 | RoomStreamToken.parse(to_key), self.database_engine | |
707 | to_token, self.database_engine | |
760 | 708 | )) |
761 | 709 | else: |
762 | 710 | order = "ASC" |
763 | 711 | bounds = lower_bound( |
764 | RoomStreamToken.parse(from_key), self.database_engine | |
765 | ) | |
766 | if to_key: | |
712 | from_token, self.database_engine | |
713 | ) | |
714 | if to_token: | |
767 | 715 | bounds = "%s AND %s" % (bounds, upper_bound( |
768 | RoomStreamToken.parse(to_key), self.database_engine | |
716 | to_token, self.database_engine | |
769 | 717 | )) |
770 | 718 | |
771 | 719 | filter_clause, filter_args = filter_to_clause(event_filter) |
781 | 729 | limit_str = "" |
782 | 730 | |
783 | 731 | sql = ( |
784 | "SELECT * FROM events" | |
732 | "SELECT event_id, topological_ordering, stream_ordering" | |
733 | " FROM events" | |
785 | 734 | " WHERE outlier = ? AND room_id = ? AND %(bounds)s" |
786 | 735 | " ORDER BY topological_ordering %(order)s," |
787 | 736 | " stream_ordering %(order)s %(limit)s" |
791 | 740 | "limit": limit_str |
792 | 741 | } |
793 | 742 | |
794 | def f(txn): | |
795 | txn.execute(sql, args) | |
796 | ||
797 | rows = self.cursor_to_dict(txn) | |
798 | ||
799 | if rows: | |
800 | topo = rows[-1]["topological_ordering"] | |
801 | toke = rows[-1]["stream_ordering"] | |
802 | if direction == 'b': | |
803 | # Tokens are positions between events. | |
804 | # This token points *after* the last event in the chunk. | |
805 | # We need it to point to the event before it in the chunk | |
806 | # when we are going backwards so we subtract one from the | |
807 | # stream part. | |
808 | toke -= 1 | |
809 | next_token = str(RoomStreamToken(topo, toke)) | |
810 | else: | |
811 | # TODO (erikj): We should work out what to do here instead. | |
812 | next_token = to_key if to_key else from_key | |
813 | ||
814 | return rows, next_token, | |
815 | ||
816 | rows, token = yield self.runInteraction("paginate_room_events", f) | |
743 | txn.execute(sql, args) | |
744 | ||
745 | rows = [_EventDictReturn(row[0], row[1], row[2]) for row in txn] | |
746 | ||
747 | if rows: | |
748 | topo = rows[-1].topological_ordering | |
749 | toke = rows[-1].stream_ordering | |
750 | if direction == 'b': | |
751 | # Tokens are positions between events. | |
752 | # This token points *after* the last event in the chunk. | |
753 | # We need it to point to the event before it in the chunk | |
754 | # when we are going backwards so we subtract one from the | |
755 | # stream part. | |
756 | toke -= 1 | |
757 | next_token = RoomStreamToken(topo, toke) | |
758 | else: | |
759 | # TODO (erikj): We should work out what to do here instead. | |
760 | next_token = to_token if to_token else from_token | |
761 | ||
762 | return rows, str(next_token), | |
763 | ||
764 | @defer.inlineCallbacks | |
765 | def paginate_room_events(self, room_id, from_key, to_key=None, | |
766 | direction='b', limit=-1, event_filter=None): | |
767 | """Returns list of events before or after a given token. | |
768 | ||
769 | Args: | |
770 | room_id (str) | |
771 | from_key (str): The token used to stream from | |
772 | to_key (str|None): A token which if given limits the results to | |
773 | only those before | |
774 | direction(char): Either 'b' or 'f' to indicate whether we are | |
775 | paginating forwards or backwards from `from_key`. | |
776 | limit (int): The maximum number of events to return. Zero or less | |
777 | means no limit. | |
778 | event_filter (Filter|None): If provided filters the events to | |
779 | those that match the filter. | |
780 | ||
781 | Returns: | |
782 | tuple[list[dict], str]: Returns the results as a list of dicts and | |
783 | a token that points to the end of the result set. The dicts have | |
784 | the keys "event_id", "topological_ordering" and "stream_orderign". | |
785 | """ | |
786 | ||
787 | from_key = RoomStreamToken.parse(from_key) | |
788 | if to_key: | |
789 | to_key = RoomStreamToken.parse(to_key) | |
790 | ||
791 | rows, token = yield self.runInteraction( | |
792 | "paginate_room_events", self._paginate_room_events_txn, | |
793 | room_id, from_key, to_key, direction, limit, event_filter, | |
794 | ) | |
817 | 795 | |
818 | 796 | events = yield self._get_events( |
819 | [r["event_id"] for r in rows], | |
797 | [r.event_id for r in rows], | |
820 | 798 | get_prev_content=True |
821 | 799 | ) |
822 | 800 | |
823 | 801 | self._set_before_and_after(events, rows) |
824 | 802 | |
825 | 803 | defer.returnValue((events, token)) |
804 | ||
805 | ||
806 | class StreamStore(StreamWorkerStore): | |
807 | def get_room_max_stream_ordering(self): | |
808 | return self._stream_id_gen.get_current_token() | |
809 | ||
810 | def get_room_min_stream_ordering(self): | |
811 | return self._backfill_id_gen.get_current_token() |
21 | 21 | import simplejson as json |
22 | 22 | import logging |
23 | 23 | |
24 | from six.moves import range | |
25 | ||
24 | 26 | logger = logging.getLogger(__name__) |
25 | 27 | |
26 | 28 | |
97 | 99 | |
98 | 100 | batch_size = 50 |
99 | 101 | results = [] |
100 | for i in xrange(0, len(tag_ids), batch_size): | |
102 | for i in range(0, len(tag_ids), batch_size): | |
101 | 103 | tags = yield self.runInteraction( |
102 | 104 | "get_all_updated_tag_content", |
103 | 105 | get_tag_content, |
12 | 12 | # See the License for the specific language governing permissions and |
13 | 13 | # limitations under the License. |
14 | 14 | |
15 | from synapse.api.errors import SynapseError | |
16 | 15 | from synapse.util.logcontext import PreserveLoggingContext |
17 | 16 | |
18 | 17 | from twisted.internet import defer, reactor, task |
21 | 20 | import logging |
22 | 21 | |
23 | 22 | logger = logging.getLogger(__name__) |
24 | ||
25 | ||
26 | class DeferredTimedOutError(SynapseError): | |
27 | def __init__(self): | |
28 | super(DeferredTimedOutError, self).__init__(504, "Timed out") | |
29 | 23 | |
30 | 24 | |
31 | 25 | def unwrapFirstError(failure): |
84 | 78 | except Exception: |
85 | 79 | if not ignore_errs: |
86 | 80 | raise |
87 | ||
88 | def time_bound_deferred(self, given_deferred, time_out): | |
89 | if given_deferred.called: | |
90 | return given_deferred | |
91 | ||
92 | ret_deferred = defer.Deferred() | |
93 | ||
94 | def timed_out_fn(): | |
95 | e = DeferredTimedOutError() | |
96 | ||
97 | try: | |
98 | ret_deferred.errback(e) | |
99 | except Exception: | |
100 | pass | |
101 | ||
102 | try: | |
103 | given_deferred.cancel() | |
104 | except Exception: | |
105 | pass | |
106 | ||
107 | timer = None | |
108 | ||
109 | def cancel(res): | |
110 | try: | |
111 | self.cancel_call_later(timer) | |
112 | except Exception: | |
113 | pass | |
114 | return res | |
115 | ||
116 | ret_deferred.addBoth(cancel) | |
117 | ||
118 | def success(res): | |
119 | try: | |
120 | ret_deferred.callback(res) | |
121 | except Exception: | |
122 | pass | |
123 | ||
124 | return res | |
125 | ||
126 | def err(res): | |
127 | try: | |
128 | ret_deferred.errback(res) | |
129 | except Exception: | |
130 | pass | |
131 | ||
132 | given_deferred.addCallbacks(callback=success, errback=err) | |
133 | ||
134 | timer = self.call_later(time_out, timed_out_fn) | |
135 | ||
136 | return ret_deferred |
14 | 14 | |
15 | 15 | |
16 | 16 | from twisted.internet import defer, reactor |
17 | from twisted.internet.defer import CancelledError | |
18 | from twisted.python import failure | |
17 | 19 | |
18 | 20 | from .logcontext import ( |
19 | PreserveLoggingContext, make_deferred_yieldable, preserve_fn | |
21 | PreserveLoggingContext, make_deferred_yieldable, run_in_background | |
20 | 22 | ) |
21 | 23 | from synapse.util import logcontext, unwrapFirstError |
22 | 24 | |
23 | 25 | from contextlib import contextmanager |
24 | 26 | |
25 | 27 | import logging |
28 | ||
29 | from six.moves import range | |
26 | 30 | |
27 | 31 | logger = logging.getLogger(__name__) |
28 | 32 | |
155 | 159 | def _concurrently_execute_inner(): |
156 | 160 | try: |
157 | 161 | while True: |
158 | yield func(it.next()) | |
162 | yield func(next(it)) | |
159 | 163 | except StopIteration: |
160 | 164 | pass |
161 | 165 | |
162 | 166 | return logcontext.make_deferred_yieldable(defer.gatherResults([ |
163 | preserve_fn(_concurrently_execute_inner)() | |
164 | for _ in xrange(limit) | |
167 | run_in_background(_concurrently_execute_inner) | |
168 | for _ in range(limit) | |
165 | 169 | ], consumeErrors=True)).addErrback(unwrapFirstError) |
166 | 170 | |
167 | 171 | |
391 | 395 | self.key_to_current_writer.pop(key) |
392 | 396 | |
393 | 397 | defer.returnValue(_ctx_manager()) |
398 | ||
399 | ||
400 | class DeferredTimeoutError(Exception): | |
401 | """ | |
402 | This error is raised by default when a L{Deferred} times out. | |
403 | """ | |
404 | ||
405 | ||
406 | def add_timeout_to_deferred(deferred, timeout, on_timeout_cancel=None): | |
407 | """ | |
408 | Add a timeout to a deferred by scheduling it to be cancelled after | |
409 | timeout seconds. | |
410 | ||
411 | This is essentially a backport of deferred.addTimeout, which was introduced | |
412 | in twisted 16.5. | |
413 | ||
414 | If the deferred gets timed out, it errbacks with a DeferredTimeoutError, | |
415 | unless a cancelable function was passed to its initialization or unless | |
416 | a different on_timeout_cancel callable is provided. | |
417 | ||
418 | Args: | |
419 | deferred (defer.Deferred): deferred to be timed out | |
420 | timeout (Number): seconds to time out after | |
421 | ||
422 | on_timeout_cancel (callable): A callable which is called immediately | |
423 | after the deferred times out, and not if this deferred is | |
424 | otherwise cancelled before the timeout. | |
425 | ||
426 | It takes an arbitrary value, which is the value of the deferred at | |
427 | that exact point in time (probably a CancelledError Failure), and | |
428 | the timeout. | |
429 | ||
430 | The default callable (if none is provided) will translate a | |
431 | CancelledError Failure into a DeferredTimeoutError. | |
432 | """ | |
433 | timed_out = [False] | |
434 | ||
435 | def time_it_out(): | |
436 | timed_out[0] = True | |
437 | deferred.cancel() | |
438 | ||
439 | delayed_call = reactor.callLater(timeout, time_it_out) | |
440 | ||
441 | def convert_cancelled(value): | |
442 | if timed_out[0]: | |
443 | to_call = on_timeout_cancel or _cancelled_to_timed_out_error | |
444 | return to_call(value, timeout) | |
445 | return value | |
446 | ||
447 | deferred.addBoth(convert_cancelled) | |
448 | ||
449 | def cancel_timeout(result): | |
450 | # stop the pending call to cancel the deferred if it's been fired | |
451 | if delayed_call.active(): | |
452 | delayed_call.cancel() | |
453 | return result | |
454 | ||
455 | deferred.addBoth(cancel_timeout) | |
456 | ||
457 | ||
458 | def _cancelled_to_timed_out_error(value, timeout): | |
459 | if isinstance(value, failure.Failure): | |
460 | value.trap(CancelledError) | |
461 | raise DeferredTimeoutError(timeout, "Deferred") | |
462 | return value |
14 | 14 | |
15 | 15 | from twisted.internet import threads, reactor |
16 | 16 | |
17 | from synapse.util.logcontext import make_deferred_yieldable, preserve_fn | |
17 | from synapse.util.logcontext import make_deferred_yieldable, run_in_background | |
18 | 18 | |
19 | 19 | from six.moves import queue |
20 | 20 | |
69 | 69 | |
70 | 70 | self._producer = producer |
71 | 71 | self.streaming = streaming |
72 | self._finished_deferred = preserve_fn(threads.deferToThread)(self._writer) | |
72 | self._finished_deferred = run_in_background( | |
73 | threads.deferToThread, self._writer | |
74 | ) | |
73 | 75 | if not streaming: |
74 | 76 | self._producer.resumeProducing() |
75 | 77 |
39 | 39 | # extra resources to existing nodes. See self._resource_id for the key. |
40 | 40 | resource_mappings = {} |
41 | 41 | for full_path, res in desired_tree.items(): |
42 | # twisted requires all resources to be bytes | |
43 | full_path = full_path.encode("utf-8") | |
44 | ||
42 | 45 | logger.info("Attaching %s to path %s", res, full_path) |
43 | 46 | last_resource = root_resource |
44 | for path_seg in full_path.split('/')[1:-1]: | |
47 | for path_seg in full_path.split(b'/')[1:-1]: | |
45 | 48 | if path_seg not in last_resource.listNames(): |
46 | 49 | # resource doesn't exist, so make a "dummy resource" |
47 | 50 | child_resource = NoResource() |
56 | 59 | |
57 | 60 | # =========================== |
58 | 61 | # now attach the actual desired resource |
59 | last_path_seg = full_path.split('/')[-1] | |
62 | last_path_seg = full_path.split(b'/')[-1] | |
60 | 63 | |
61 | 64 | # if there is already a resource here, thieve its children and |
62 | 65 | # replace it |
163 | 163 | current = self.set_current_context(self.previous_context) |
164 | 164 | if current is not self: |
165 | 165 | if current is self.sentinel: |
166 | logger.debug("Expected logging context %s has been lost", self) | |
166 | logger.warn("Expected logging context %s has been lost", self) | |
167 | 167 | else: |
168 | 168 | logger.warn( |
169 | 169 | "Current logging context %s is not expected context %s", |
278 | 278 | context = LoggingContext.set_current_context(self.current_context) |
279 | 279 | |
280 | 280 | if context != self.new_context: |
281 | logger.debug( | |
281 | logger.warn( | |
282 | 282 | "Unexpected logging context: %s is not %s", |
283 | 283 | context, self.new_context, |
284 | 284 | ) |
301 | 301 | def run_in_background(f, *args, **kwargs): |
302 | 302 | """Calls a function, ensuring that the current context is restored after |
303 | 303 | return from the function, and that the sentinel context is set once the |
304 | deferred returned by the funtion completes. | |
304 | deferred returned by the function completes. | |
305 | 305 | |
306 | 306 | Useful for wrapping functions that return a deferred which you don't yield |
307 | on. | |
307 | on (for instance because you want to pass it to deferred.gatherResults()). | |
308 | ||
309 | Note that if you completely discard the result, you should make sure that | |
310 | `f` doesn't raise any deferred exceptions, otherwise a scary-looking | |
311 | CRITICAL error about an unhandled error will be logged without much | |
312 | indication about where it came from. | |
308 | 313 | """ |
309 | 314 | current = LoggingContext.current_context() |
310 | res = f(*args, **kwargs) | |
311 | if isinstance(res, defer.Deferred) and not res.called: | |
312 | # The function will have reset the context before returning, so | |
313 | # we need to restore it now. | |
314 | LoggingContext.set_current_context(current) | |
315 | ||
316 | # The original context will be restored when the deferred | |
317 | # completes, but there is nothing waiting for it, so it will | |
318 | # get leaked into the reactor or some other function which | |
319 | # wasn't expecting it. We therefore need to reset the context | |
320 | # here. | |
321 | # | |
322 | # (If this feels asymmetric, consider it this way: we are | |
323 | # effectively forking a new thread of execution. We are | |
324 | # probably currently within a ``with LoggingContext()`` block, | |
325 | # which is supposed to have a single entry and exit point. But | |
326 | # by spawning off another deferred, we are effectively | |
327 | # adding a new exit point.) | |
328 | res.addBoth(_set_context_cb, LoggingContext.sentinel) | |
315 | try: | |
316 | res = f(*args, **kwargs) | |
317 | except: # noqa: E722 | |
318 | # the assumption here is that the caller doesn't want to be disturbed | |
319 | # by synchronous exceptions, so let's turn them into Failures. | |
320 | return defer.fail() | |
321 | ||
322 | if not isinstance(res, defer.Deferred): | |
323 | return res | |
324 | ||
325 | if res.called and not res.paused: | |
326 | # The function should have maintained the logcontext, so we can | |
327 | # optimise out the messing about | |
328 | return res | |
329 | ||
330 | # The function may have reset the context before returning, so | |
331 | # we need to restore it now. | |
332 | ctx = LoggingContext.set_current_context(current) | |
333 | ||
334 | # The original context will be restored when the deferred | |
335 | # completes, but there is nothing waiting for it, so it will | |
336 | # get leaked into the reactor or some other function which | |
337 | # wasn't expecting it. We therefore need to reset the context | |
338 | # here. | |
339 | # | |
340 | # (If this feels asymmetric, consider it this way: we are | |
341 | # effectively forking a new thread of execution. We are | |
342 | # probably currently within a ``with LoggingContext()`` block, | |
343 | # which is supposed to have a single entry and exit point. But | |
344 | # by spawning off another deferred, we are effectively | |
345 | # adding a new exit point.) | |
346 | res.addBoth(_set_context_cb, ctx) | |
329 | 347 | return res |
330 | 348 | |
331 | 349 | |
340 | 358 | returning a deferred. Then, when the deferred completes, restores the |
341 | 359 | current logcontext before running callbacks/errbacks. |
342 | 360 | |
343 | (This is more-or-less the opposite operation to preserve_fn.) | |
361 | (This is more-or-less the opposite operation to run_in_background.) | |
344 | 362 | """ |
345 | if isinstance(deferred, defer.Deferred) and not deferred.called: | |
346 | prev_context = LoggingContext.set_current_context(LoggingContext.sentinel) | |
347 | deferred.addBoth(_set_context_cb, prev_context) | |
363 | if not isinstance(deferred, defer.Deferred): | |
364 | return deferred | |
365 | ||
366 | if deferred.called and not deferred.paused: | |
367 | # it looks like this deferred is ready to run any callbacks we give it | |
368 | # immediately. We may as well optimise out the logcontext faffery. | |
369 | return deferred | |
370 | ||
371 | # ok, we can't be sure that a yield won't block, so let's reset the | |
372 | # logcontext, and add a callback to the deferred to restore it. | |
373 | prev_context = LoggingContext.set_current_context(LoggingContext.sentinel) | |
374 | deferred.addBoth(_set_context_cb, prev_context) | |
348 | 375 | return deferred |
349 | 376 | |
350 | 377 |
13 | 13 | # limitations under the License. |
14 | 14 | |
15 | 15 | |
16 | import StringIO | |
16 | from six import StringIO | |
17 | 17 | import logging |
18 | 18 | import traceback |
19 | 19 | |
31 | 31 | super(LogFormatter, self).__init__(*args, **kwargs) |
32 | 32 | |
33 | 33 | def formatException(self, ei): |
34 | sio = StringIO.StringIO() | |
34 | sio = StringIO() | |
35 | 35 | (typ, val, tb) = ei |
36 | 36 | |
37 | 37 | # log the stack above the exception capture point if possible, but |
17 | 17 | from synapse.api.errors import LimitExceededError |
18 | 18 | |
19 | 19 | from synapse.util.async import sleep |
20 | from synapse.util.logcontext import preserve_fn | |
20 | from synapse.util.logcontext import ( | |
21 | run_in_background, make_deferred_yieldable, | |
22 | PreserveLoggingContext, | |
23 | ) | |
21 | 24 | |
22 | 25 | import collections |
23 | 26 | import contextlib |
149 | 152 | "Ratelimit [%s]: sleeping req", |
150 | 153 | id(request_id), |
151 | 154 | ) |
152 | ret_defer = preserve_fn(sleep)(self.sleep_msec / 1000.0) | |
155 | ret_defer = run_in_background(sleep, self.sleep_msec / 1000.0) | |
153 | 156 | |
154 | 157 | self.sleeping_requests.add(request_id) |
155 | 158 | |
175 | 178 | return r |
176 | 179 | |
177 | 180 | def on_err(r): |
181 | # XXX: why is this necessary? this is called before we start | |
182 | # processing the request so why would the request be in | |
183 | # current_processing? | |
178 | 184 | self.current_processing.discard(request_id) |
179 | 185 | return r |
180 | 186 | |
186 | 192 | |
187 | 193 | ret_defer.addCallbacks(on_start, on_err) |
188 | 194 | ret_defer.addBoth(on_both) |
189 | return ret_defer | |
195 | return make_deferred_yieldable(ret_defer) | |
190 | 196 | |
191 | 197 | def _on_exit(self, request_id): |
192 | 198 | logger.debug( |
196 | 202 | self.current_processing.discard(request_id) |
197 | 203 | try: |
198 | 204 | request_id, deferred = self.ready_request_queue.popitem() |
205 | ||
206 | # XXX: why do we do the following? the on_start callback above will | |
207 | # do it for us. | |
199 | 208 | self.current_processing.add(request_id) |
200 | deferred.callback(None) | |
209 | ||
210 | with PreserveLoggingContext(): | |
211 | deferred.callback(None) | |
201 | 212 | except KeyError: |
202 | 213 | pass |
202 | 202 | ) |
203 | 203 | except Exception: |
204 | 204 | logger.exception( |
205 | "Failed to store set_destination_retry_timings", | |
205 | "Failed to store destination_retry_timings", | |
206 | 206 | ) |
207 | 207 | |
208 | 208 | # we deliberately do this in the background. |
209 | synapse.util.logcontext.preserve_fn(store_retry_timings)() | |
209 | synapse.util.logcontext.run_in_background(store_retry_timings) |
14 | 14 | |
15 | 15 | import random |
16 | 16 | import string |
17 | from six.moves import range | |
17 | 18 | |
18 | 19 | _string_with_symbols = ( |
19 | 20 | string.digits + string.ascii_letters + ".,;:^&*-_+=#~@" |
21 | 22 | |
22 | 23 | |
23 | 24 | def random_string(length): |
24 | return ''.join(random.choice(string.ascii_letters) for _ in xrange(length)) | |
25 | return ''.join(random.choice(string.ascii_letters) for _ in range(length)) | |
25 | 26 | |
26 | 27 | |
27 | 28 | def random_string_with_symbols(length): |
28 | 29 | return ''.join( |
29 | random.choice(_string_with_symbols) for _ in xrange(length) | |
30 | random.choice(_string_with_symbols) for _ in range(length) | |
30 | 31 | ) |
31 | 32 | |
32 | 33 |
11 | 11 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | 12 | # See the License for the specific language governing permissions and |
13 | 13 | # limitations under the License. |
14 | ||
15 | from six.moves import range | |
14 | 16 | |
15 | 17 | |
16 | 18 | class _Entry(object): |
67 | 69 | # Add empty entries between the end of the current list and when we want |
68 | 70 | # to insert. This ensures there are no gaps. |
69 | 71 | self.entries.extend( |
70 | _Entry(key) for key in xrange(last_key, then_key + 1) | |
72 | _Entry(key) for key in range(last_key, then_key + 1) | |
71 | 73 | ) |
72 | 74 | |
73 | 75 | self.entries[-1].queue.append(obj) |
16 | 16 | _ServiceQueuer, _TransactionController, _Recoverer |
17 | 17 | ) |
18 | 18 | from twisted.internet import defer |
19 | ||
20 | from synapse.util.logcontext import make_deferred_yieldable | |
19 | 21 | from ..utils import MockClock |
20 | 22 | from mock import Mock |
21 | 23 | from tests import unittest |
203 | 205 | |
204 | 206 | def test_send_single_event_with_queue(self): |
205 | 207 | d = defer.Deferred() |
206 | self.txn_ctrl.send = Mock(return_value=d) | |
208 | self.txn_ctrl.send = Mock( | |
209 | side_effect=lambda x, y: make_deferred_yieldable(d), | |
210 | ) | |
207 | 211 | service = Mock(id=4) |
208 | 212 | event = Mock(event_id="first") |
209 | 213 | event2 = Mock(event_id="second") |
234 | 238 | srv_2_event2 = Mock(event_id="srv2b") |
235 | 239 | |
236 | 240 | send_return_list = [srv_1_defer, srv_2_defer] |
237 | self.txn_ctrl.send = Mock(side_effect=lambda x, y: send_return_list.pop(0)) | |
241 | ||
242 | def do_send(x, y): | |
243 | return make_deferred_yieldable(send_return_list.pop(0)) | |
244 | self.txn_ctrl.send = Mock(side_effect=do_send) | |
238 | 245 | |
239 | 246 | # send events for different ASes and make sure they are sent |
240 | 247 | self.queuer.enqueue(srv1, srv_1_event) |
15 | 15 | from tests import unittest |
16 | 16 | |
17 | 17 | from synapse.metrics.metric import ( |
18 | CounterMetric, CallbackMetric, DistributionMetric, CacheMetric | |
18 | CounterMetric, CallbackMetric, DistributionMetric, CacheMetric, | |
19 | _escape_label_value, | |
19 | 20 | ) |
20 | 21 | |
21 | 22 | |
170 | 171 | 'cache:size{name="cache_name"} 1', |
171 | 172 | 'cache:evicted_size{name="cache_name"} 2', |
172 | 173 | ]) |
174 | ||
175 | ||
176 | class LabelValueEscapeTestCase(unittest.TestCase): | |
177 | def test_simple(self): | |
178 | string = "safjhsdlifhyskljfksdfh" | |
179 | self.assertEqual(string, _escape_label_value(string)) | |
180 | ||
181 | def test_escape(self): | |
182 | self.assertEqual( | |
183 | "abc\\\"def\\nghi\\\\", | |
184 | _escape_label_value("abc\"def\nghi\\"), | |
185 | ) | |
186 | ||
187 | def test_sequence_of_escapes(self): | |
188 | self.assertEqual( | |
189 | "abc\\\"def\\nghi\\\\\\n", | |
190 | _escape_label_value("abc\"def\nghi\\\n"), | |
191 | ) |
147 | 147 | |
148 | 148 | @defer.inlineCallbacks |
149 | 149 | def test_stream_basic_permissions(self): |
150 | # invalid token, expect 403 | |
150 | # invalid token, expect 401 | |
151 | # note: this is in violation of the original v1 spec, which expected | |
152 | # 403. However, since the v1 spec no longer exists and the v1 | |
153 | # implementation is now part of the r0 implementation, the newer | |
154 | # behaviour is used instead to be consistent with the r0 spec. | |
155 | # see issue #2602 | |
151 | 156 | (code, response) = yield self.mock_resource.trigger_get( |
152 | 157 | "/events?access_token=%s" % ("invalid" + self.token, ) |
153 | 158 | ) |
154 | self.assertEquals(403, code, msg=str(response)) | |
159 | self.assertEquals(401, code, msg=str(response)) | |
155 | 160 | |
156 | 161 | # valid token, expect content |
157 | 162 | (code, response) = yield self.mock_resource.trigger_get( |
51 | 51 | def _get_user_by_req(request=None, allow_guest=False): |
52 | 52 | return synapse.types.create_requester(myid) |
53 | 53 | |
54 | hs.get_v1auth().get_user_by_req = _get_user_by_req | |
54 | hs.get_auth().get_user_by_req = _get_user_by_req | |
55 | 55 | |
56 | 56 | profile.register_servlets(hs, self.mock_resource) |
57 | 57 |
23 | 23 | from synapse.types import UserID |
24 | 24 | |
25 | 25 | import json |
26 | import urllib | |
26 | from six.moves.urllib import parse as urlparse | |
27 | 27 | |
28 | 28 | from ....utils import MockHttpResource, setup_test_homeserver |
29 | 29 | from .utils import RestTestCase |
59 | 59 | "token_id": 1, |
60 | 60 | "is_guest": False, |
61 | 61 | } |
62 | hs.get_v1auth().get_user_by_access_token = get_user_by_access_token | |
62 | hs.get_auth().get_user_by_access_token = get_user_by_access_token | |
63 | 63 | |
64 | 64 | def _insert_client_ip(*args, **kwargs): |
65 | 65 | return defer.succeed(None) |
69 | 69 | |
70 | 70 | synapse.rest.client.v1.room.register_servlets(hs, self.mock_resource) |
71 | 71 | |
72 | self.auth = hs.get_v1auth() | |
72 | self.auth = hs.get_auth() | |
73 | 73 | |
74 | 74 | # create some rooms under the name rmcreator_id |
75 | 75 | self.uncreated_rmid = "!aa:test" |
424 | 424 | "token_id": 1, |
425 | 425 | "is_guest": False, |
426 | 426 | } |
427 | hs.get_v1auth().get_user_by_access_token = get_user_by_access_token | |
427 | hs.get_auth().get_user_by_access_token = get_user_by_access_token | |
428 | 428 | |
429 | 429 | def _insert_client_ip(*args, **kwargs): |
430 | 430 | return defer.succeed(None) |
506 | 506 | "token_id": 1, |
507 | 507 | "is_guest": False, |
508 | 508 | } |
509 | hs.get_v1auth().get_user_by_access_token = get_user_by_access_token | |
509 | hs.get_auth().get_user_by_access_token = get_user_by_access_token | |
510 | 510 | |
511 | 511 | def _insert_client_ip(*args, **kwargs): |
512 | 512 | return defer.succeed(None) |
596 | 596 | "is_guest": False, |
597 | 597 | } |
598 | 598 | |
599 | hs.get_v1auth().get_user_by_access_token = get_user_by_access_token | |
599 | hs.get_auth().get_user_by_access_token = get_user_by_access_token | |
600 | 600 | |
601 | 601 | def _insert_client_ip(*args, **kwargs): |
602 | 602 | return defer.succeed(None) |
710 | 710 | "token_id": 1, |
711 | 711 | "is_guest": False, |
712 | 712 | } |
713 | hs.get_v1auth().get_user_by_access_token = get_user_by_access_token | |
713 | hs.get_auth().get_user_by_access_token = get_user_by_access_token | |
714 | 714 | |
715 | 715 | def _insert_client_ip(*args, **kwargs): |
716 | 716 | return defer.succeed(None) |
765 | 765 | @defer.inlineCallbacks |
766 | 766 | def test_rooms_members_self(self): |
767 | 767 | path = "/rooms/%s/state/m.room.member/%s" % ( |
768 | urllib.quote(self.room_id), self.user_id | |
768 | urlparse.quote(self.room_id), self.user_id | |
769 | 769 | ) |
770 | 770 | |
771 | 771 | # valid join message (NOOP since we made the room) |
785 | 785 | def test_rooms_members_other(self): |
786 | 786 | self.other_id = "@zzsid1:red" |
787 | 787 | path = "/rooms/%s/state/m.room.member/%s" % ( |
788 | urllib.quote(self.room_id), self.other_id | |
788 | urlparse.quote(self.room_id), self.other_id | |
789 | 789 | ) |
790 | 790 | |
791 | 791 | # valid invite message |
801 | 801 | def test_rooms_members_other_custom_keys(self): |
802 | 802 | self.other_id = "@zzsid1:red" |
803 | 803 | path = "/rooms/%s/state/m.room.member/%s" % ( |
804 | urllib.quote(self.room_id), self.other_id | |
804 | urlparse.quote(self.room_id), self.other_id | |
805 | 805 | ) |
806 | 806 | |
807 | 807 | # valid invite message with custom key |
842 | 842 | "token_id": 1, |
843 | 843 | "is_guest": False, |
844 | 844 | } |
845 | hs.get_v1auth().get_user_by_access_token = get_user_by_access_token | |
845 | hs.get_auth().get_user_by_access_token = get_user_by_access_token | |
846 | 846 | |
847 | 847 | def _insert_client_ip(*args, **kwargs): |
848 | 848 | return defer.succeed(None) |
858 | 858 | @defer.inlineCallbacks |
859 | 859 | def test_invalid_puts(self): |
860 | 860 | path = "/rooms/%s/send/m.room.message/mid1" % ( |
861 | urllib.quote(self.room_id)) | |
861 | urlparse.quote(self.room_id)) | |
862 | 862 | # missing keys or invalid json |
863 | 863 | (code, response) = yield self.mock_resource.trigger( |
864 | 864 | "PUT", path, '{}' |
893 | 893 | @defer.inlineCallbacks |
894 | 894 | def test_rooms_messages_sent(self): |
895 | 895 | path = "/rooms/%s/send/m.room.message/mid1" % ( |
896 | urllib.quote(self.room_id)) | |
896 | urlparse.quote(self.room_id)) | |
897 | 897 | |
898 | 898 | content = '{"body":"test","msgtype":{"type":"a"}}' |
899 | 899 | (code, response) = yield self.mock_resource.trigger("PUT", path, content) |
910 | 910 | |
911 | 911 | # m.text message type |
912 | 912 | path = "/rooms/%s/send/m.room.message/mid2" % ( |
913 | urllib.quote(self.room_id)) | |
913 | urlparse.quote(self.room_id)) | |
914 | 914 | content = '{"body":"test2","msgtype":"m.text"}' |
915 | 915 | (code, response) = yield self.mock_resource.trigger("PUT", path, content) |
916 | 916 | self.assertEquals(200, code, msg=str(response)) |
944 | 944 | "token_id": 1, |
945 | 945 | "is_guest": False, |
946 | 946 | } |
947 | hs.get_v1auth().get_user_by_access_token = get_user_by_access_token | |
947 | hs.get_auth().get_user_by_access_token = get_user_by_access_token | |
948 | 948 | |
949 | 949 | def _insert_client_ip(*args, **kwargs): |
950 | 950 | return defer.succeed(None) |
1016 | 1016 | "token_id": 1, |
1017 | 1017 | "is_guest": False, |
1018 | 1018 | } |
1019 | hs.get_v1auth().get_user_by_access_token = get_user_by_access_token | |
1019 | hs.get_auth().get_user_by_access_token = get_user_by_access_token | |
1020 | 1020 | |
1021 | 1021 | def _insert_client_ip(*args, **kwargs): |
1022 | 1022 | return defer.succeed(None) |
67 | 67 | "is_guest": False, |
68 | 68 | } |
69 | 69 | |
70 | hs.get_v1auth().get_user_by_access_token = get_user_by_access_token | |
70 | hs.get_auth().get_user_by_access_token = get_user_by_access_token | |
71 | 71 | |
72 | 72 | def _insert_client_ip(*args, **kwargs): |
73 | 73 | return defer.succeed(None) |
127 | 127 | yield _rotate(10) |
128 | 128 | yield _assert_counts(1, 1) |
129 | 129 | |
130 | @tests.unittest.DEBUG | |
131 | 130 | @defer.inlineCallbacks |
132 | 131 | def test_find_first_stream_ordering_after_ts(self): |
133 | 132 | def add_event(so, ts): |
0 | # -*- coding: utf-8 -*- | |
1 | # Copyright 2017 Vector Creations Ltd | |
2 | # | |
3 | # Licensed under the Apache License, Version 2.0 (the "License"); | |
4 | # you may not use this file except in compliance with the License. | |
5 | # You may obtain a copy of the License at | |
6 | # | |
7 | # http://www.apache.org/licenses/LICENSE-2.0 | |
8 | # | |
9 | # Unless required by applicable law or agreed to in writing, software | |
10 | # distributed under the License is distributed on an "AS IS" BASIS, | |
11 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
12 | # See the License for the specific language governing permissions and | |
13 | # limitations under the License. | |
14 | from synapse import util | |
15 | from twisted.internet import defer | |
16 | from tests import unittest | |
17 | ||
18 | ||
19 | class ClockTestCase(unittest.TestCase): | |
20 | @defer.inlineCallbacks | |
21 | def test_time_bound_deferred(self): | |
22 | # just a deferred which never resolves | |
23 | slow_deferred = defer.Deferred() | |
24 | ||
25 | clock = util.Clock() | |
26 | time_bound = clock.time_bound_deferred(slow_deferred, 0.001) | |
27 | ||
28 | try: | |
29 | yield time_bound | |
30 | self.fail("Expected timedout error, but got nothing") | |
31 | except util.DeferredTimedOutError: | |
32 | pass |
35 | 35 | yield sleep(0) |
36 | 36 | self._check_test_key("one") |
37 | 37 | |
38 | def _test_preserve_fn(self, function): | |
38 | def _test_run_in_background(self, function): | |
39 | 39 | sentinel_context = LoggingContext.current_context() |
40 | 40 | |
41 | 41 | callback_completed = [False] |
42 | 42 | |
43 | @defer.inlineCallbacks | |
44 | def cb(): | |
43 | def test(): | |
45 | 44 | context_one.request = "one" |
46 | yield function() | |
47 | self._check_test_key("one") | |
45 | d = function() | |
48 | 46 | |
49 | callback_completed[0] = True | |
47 | def cb(res): | |
48 | self._check_test_key("one") | |
49 | callback_completed[0] = True | |
50 | return res | |
51 | d.addCallback(cb) | |
52 | ||
53 | return d | |
50 | 54 | |
51 | 55 | with LoggingContext() as context_one: |
52 | 56 | context_one.request = "one" |
53 | 57 | |
54 | 58 | # fire off function, but don't wait on it. |
55 | logcontext.preserve_fn(cb)() | |
59 | logcontext.run_in_background(test) | |
56 | 60 | |
57 | 61 | self._check_test_key("one") |
58 | 62 | |
79 | 83 | # test is done once d2 finishes |
80 | 84 | return d2 |
81 | 85 | |
82 | def test_preserve_fn_with_blocking_fn(self): | |
86 | def test_run_in_background_with_blocking_fn(self): | |
83 | 87 | @defer.inlineCallbacks |
84 | 88 | def blocking_function(): |
85 | 89 | yield sleep(0) |
86 | 90 | |
87 | return self._test_preserve_fn(blocking_function) | |
91 | return self._test_run_in_background(blocking_function) | |
88 | 92 | |
89 | def test_preserve_fn_with_non_blocking_fn(self): | |
93 | def test_run_in_background_with_non_blocking_fn(self): | |
90 | 94 | @defer.inlineCallbacks |
91 | 95 | def nonblocking_function(): |
92 | 96 | with logcontext.PreserveLoggingContext(): |
93 | 97 | yield defer.succeed(None) |
94 | 98 | |
95 | return self._test_preserve_fn(nonblocking_function) | |
99 | return self._test_run_in_background(nonblocking_function) | |
100 | ||
101 | def test_run_in_background_with_chained_deferred(self): | |
102 | # a function which returns a deferred which looks like it has been | |
103 | # called, but is actually paused | |
104 | def testfunc(): | |
105 | return logcontext.make_deferred_yieldable( | |
106 | _chained_deferred_function() | |
107 | ) | |
108 | ||
109 | return self._test_run_in_background(testfunc) | |
96 | 110 | |
97 | 111 | @defer.inlineCallbacks |
98 | 112 | def test_make_deferred_yieldable(self): |
118 | 132 | self._check_test_key("one") |
119 | 133 | |
120 | 134 | @defer.inlineCallbacks |
135 | def test_make_deferred_yieldable_with_chained_deferreds(self): | |
136 | sentinel_context = LoggingContext.current_context() | |
137 | ||
138 | with LoggingContext() as context_one: | |
139 | context_one.request = "one" | |
140 | ||
141 | d1 = logcontext.make_deferred_yieldable(_chained_deferred_function()) | |
142 | # make sure that the context was reset by make_deferred_yieldable | |
143 | self.assertIs(LoggingContext.current_context(), sentinel_context) | |
144 | ||
145 | yield d1 | |
146 | ||
147 | # now it should be restored | |
148 | self._check_test_key("one") | |
149 | ||
150 | @defer.inlineCallbacks | |
121 | 151 | def test_make_deferred_yieldable_on_non_deferred(self): |
122 | 152 | """Check that make_deferred_yieldable does the right thing when its |
123 | 153 | argument isn't actually a deferred""" |
131 | 161 | r = yield d1 |
132 | 162 | self.assertEqual(r, "bum") |
133 | 163 | self._check_test_key("one") |
164 | ||
165 | ||
166 | # a function which returns a deferred which has been "called", but | |
167 | # which had a function which returned another incomplete deferred on | |
168 | # its callback list, so won't yet call any other new callbacks. | |
169 | def _chained_deferred_function(): | |
170 | d = defer.succeed(None) | |
171 | ||
172 | def cb(res): | |
173 | d2 = defer.Deferred() | |
174 | reactor.callLater(0, d2.callback, res) | |
175 | return d2 | |
176 | d.addCallback(cb) | |
177 | return d |
0 | # -*- coding: utf-8 -*- | |
1 | # Copyright 2018 New Vector Ltd | |
2 | # | |
3 | # Licensed under the Apache License, Version 2.0 (the "License"); | |
4 | # you may not use this file except in compliance with the License. | |
5 | # You may obtain a copy of the License at | |
6 | # | |
7 | # http://www.apache.org/licenses/LICENSE-2.0 | |
8 | # | |
9 | # Unless required by applicable law or agreed to in writing, software | |
10 | # distributed under the License is distributed on an "AS IS" BASIS, | |
11 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
12 | # See the License for the specific language governing permissions and | |
13 | # limitations under the License. | |
14 | import sys | |
15 | ||
16 | from synapse.util.logformatter import LogFormatter | |
17 | from tests import unittest | |
18 | ||
19 | ||
20 | class TestException(Exception): | |
21 | pass | |
22 | ||
23 | ||
24 | class LogFormatterTestCase(unittest.TestCase): | |
25 | def test_formatter(self): | |
26 | formatter = LogFormatter() | |
27 | ||
28 | try: | |
29 | raise TestException("testytest") | |
30 | except TestException: | |
31 | ei = sys.exc_info() | |
32 | ||
33 | output = formatter.formatException(ei) | |
34 | ||
35 | # check the output looks vaguely sane | |
36 | self.assertIn("testytest", output) | |
37 | self.assertIn("Capture point", output) |
14 | 14 | |
15 | 15 | import hashlib |
16 | 16 | from inspect import getcallargs |
17 | import urllib | |
18 | import urlparse | |
17 | from six.moves.urllib import parse as urlparse | |
19 | 18 | |
20 | 19 | from mock import Mock, patch |
21 | 20 | from twisted.internet import defer, reactor |
237 | 236 | if matcher: |
238 | 237 | try: |
239 | 238 | args = [ |
240 | urllib.unquote(u).decode("UTF-8") | |
239 | urlparse.unquote(u).decode("UTF-8") | |
241 | 240 | for u in matcher.groups() |
242 | 241 | ] |
243 | 242 |
0 | 0 | [tox] |
1 | envlist = packaging, py27, pep8 | |
1 | envlist = packaging, py27, py36, pep8 | |
2 | 2 | |
3 | 3 | [testenv] |
4 | 4 | deps = |
45 | 45 | # ) |
46 | 46 | usedevelop=true |
47 | 47 | |
48 | [testenv:py36] | |
49 | usedevelop=true | |
50 | commands = | |
51 | /usr/bin/find "{toxinidir}" -name '*.pyc' -delete | |
52 | coverage run {env:COVERAGE_OPTS:} --source="{toxinidir}/synapse" \ | |
53 | "{envbindir}/trial" {env:TRIAL_FLAGS:} {posargs:tests/metrics tests/config} \ | |
54 | {env:TOXSUFFIX:} | |
55 | {env:DUMP_COVERAGE_COMMAND:coverage report -m} | |
56 | ||
48 | 57 | [testenv:packaging] |
49 | 58 | deps = |
50 | 59 | check-manifest |