Codebase list matrix-synapse / upstream/1.51.0
New upstream version 1.51.0 Andrej Shadura 2 years ago
135 changed file(s) with 2645 addition(s) and 1714 deletion(s). Raw diff Collapse all Expand all
77 - Use markdown where necessary, mostly for `code blocks`.
88 - End with either a period (.) or an exclamation mark (!).
99 - Start with a capital letter.
10 - Feel free to credit yourself, by adding a sentence "Contributed by @github_username." or "Contributed by [Your Name]." to the end of the entry.
1011 * [ ] Pull request includes a [sign off](https://matrix-org.github.io/synapse/latest/development/contributing_guide.html#sign-off)
1112 * [ ] [Code style](https://matrix-org.github.io/synapse/latest/code_style.html) is correct
1213 (run the [linters](https://matrix-org.github.io/synapse/latest/development/contributing_guide.html#run-the-linters))
365365 # Build initial Synapse image
366366 - run: docker build -t matrixdotorg/synapse:latest -f docker/Dockerfile .
367367 working-directory: synapse
368 env:
369 DOCKER_BUILDKIT: 1
368370
369371 # Build a ready-to-run Synapse image based on the initial image above.
370372 # This new image includes a config file, keys for signing and TLS, and
373375 working-directory: complement/dockerfiles
374376
375377 # Run Complement
376 - run: go test -v -tags synapse_blacklist,msc2403 ./tests/...
378 - run: set -o pipefail && go test -v -json -tags synapse_blacklist,msc2403 ./tests/... 2>&1 | gotestfmt
379 shell: bash
377380 env:
378381 COMPLEMENT_BASE_IMAGE: complement-synapse:latest
379382 working-directory: complement
4949
5050 # docs
5151 book/
52
53 # complement
54 /complement-*
55 /master.tar.gz
0 Synapse 1.50.2 (2022-01-24)
0 Synapse 1.51.0 (2022-01-25)
11 ===========================
22
3 No significant changes since 1.51.0rc2.
4
5 Synapse 1.51.0 deprecates `webclient` listeners and non-HTTP(S) `web_client_location`s. Support for these will be removed in Synapse 1.53.0, at which point Synapse will not be capable of directly serving a web client for Matrix.
6
7 Synapse 1.51.0rc2 (2022-01-24)
8 ==============================
9
310 Bugfixes
411 --------
512
613 - Fix a bug introduced in Synapse 1.40.0 that caused Synapse to fail to process incoming federation traffic after handling a large amount of events in a v1 room. ([\#11806](https://github.com/matrix-org/synapse/issues/11806))
14
15
16 Synapse 1.51.0rc1 (2022-01-21)
17 ==============================
18
19 Features
20 --------
21
22 - Add `track_puppeted_user_ips` config flag to record client IP addresses against puppeted users, and include the puppeted users in monthly active user counts. ([\#11561](https://github.com/matrix-org/synapse/issues/11561), [\#11749](https://github.com/matrix-org/synapse/issues/11749), [\#11757](https://github.com/matrix-org/synapse/issues/11757))
23 - Include whether the requesting user has participated in a thread when generating a summary for [MSC3440](https://github.com/matrix-org/matrix-doc/pull/3440). ([\#11577](https://github.com/matrix-org/synapse/issues/11577))
24 - Return an `M_FORBIDDEN` error code instead of `M_UNKNOWN` when a spam checker module prevents a user from creating a room. ([\#11672](https://github.com/matrix-org/synapse/issues/11672))
25 - Add a flag to the `synapse_review_recent_signups` script to ignore and filter appservice users. ([\#11675](https://github.com/matrix-org/synapse/issues/11675), [\#11770](https://github.com/matrix-org/synapse/issues/11770))
26
27
28 Bugfixes
29 --------
30
31 - Fix a long-standing issue which could cause Synapse to incorrectly accept data in the unsigned field of events
32 received over federation. ([\#11530](https://github.com/matrix-org/synapse/issues/11530))
33 - Fix a long-standing bug where Synapse wouldn't cache a response indicating that a remote user has no devices. ([\#11587](https://github.com/matrix-org/synapse/issues/11587))
34 - Fix an error that occurs whilst trying to get the federation status of a destination server that was working normally. This admin API was newly introduced in Synapse v1.49.0. ([\#11593](https://github.com/matrix-org/synapse/issues/11593))
35 - Fix bundled aggregations not being included in the `/sync` response, per [MSC2675](https://github.com/matrix-org/matrix-doc/pull/2675). ([\#11612](https://github.com/matrix-org/synapse/issues/11612), [\#11659](https://github.com/matrix-org/synapse/issues/11659), [\#11791](https://github.com/matrix-org/synapse/issues/11791))
36 - Fix the `/_matrix/client/v1/room/{roomId}/hierarchy` endpoint returning incorrect fields which have been present since Synapse 1.49.0. ([\#11667](https://github.com/matrix-org/synapse/issues/11667))
37 - Fix preview of some GIF URLs (like tenor.com). Contributed by Philippe Daouadi. ([\#11669](https://github.com/matrix-org/synapse/issues/11669))
38 - Fix a bug where only the first 50 rooms from a space were returned from the `/hierarchy` API. This has existed since the introduction of the API in Synapse v1.41.0. ([\#11695](https://github.com/matrix-org/synapse/issues/11695))
39 - Fix a bug introduced in Synapse v1.18.0 where password reset and address validation emails would not be sent if their subject was configured to use the 'app' template variable. Contributed by @br4nnigan. ([\#11710](https://github.com/matrix-org/synapse/issues/11710), [\#11745](https://github.com/matrix-org/synapse/issues/11745))
40 - Make the 'List Rooms' Admin API sort stable. Contributed by Daniƫl Sonck. ([\#11737](https://github.com/matrix-org/synapse/issues/11737))
41 - Fix a long-standing bug where space hierarchy over federation would only work correctly some of the time. ([\#11775](https://github.com/matrix-org/synapse/issues/11775))
42 - Fix a bug introduced in Synapse v1.46.0 that prevented `on_logged_out` module callbacks from being correctly awaited by Synapse. ([\#11786](https://github.com/matrix-org/synapse/issues/11786))
43
44
45 Improved Documentation
46 ----------------------
47
48 - Warn against using a Let's Encrypt certificate for TLS/DTLS TURN server client connections, and suggest using ZeroSSL certificate instead. This works around client-side connectivity errors caused by WebRTC libraries that reject Let's Encrypt certificates. Contibuted by @AndrewFerr. ([\#11686](https://github.com/matrix-org/synapse/issues/11686))
49 - Document the new `SYNAPSE_TEST_PERSIST_SQLITE_DB` environment variable in the contributing guide. ([\#11715](https://github.com/matrix-org/synapse/issues/11715))
50 - Document that the minimum supported PostgreSQL version is now 10. ([\#11725](https://github.com/matrix-org/synapse/issues/11725))
51 - Fix typo in demo docs: differnt. ([\#11735](https://github.com/matrix-org/synapse/issues/11735))
52 - Update room spec URL in config files. ([\#11739](https://github.com/matrix-org/synapse/issues/11739))
53 - Mention `python3-venv` and `libpq-dev` dependencies in the contribution guide. ([\#11740](https://github.com/matrix-org/synapse/issues/11740))
54 - Update documentation for configuring login with Facebook. ([\#11755](https://github.com/matrix-org/synapse/issues/11755))
55 - Update installation instructions to note that Python 3.6 is no longer supported. ([\#11781](https://github.com/matrix-org/synapse/issues/11781))
56
57
58 Deprecations and Removals
59 -------------------------
60
61 - Remove the unstable `/send_relation` endpoint. ([\#11682](https://github.com/matrix-org/synapse/issues/11682))
62 - Remove `python_twisted_reactor_pending_calls` Prometheus metric. ([\#11724](https://github.com/matrix-org/synapse/issues/11724))
63 - Remove the `password_hash` field from the response dictionaries of the [Users Admin API](https://matrix-org.github.io/synapse/latest/admin_api/user_admin_api.html). ([\#11576](https://github.com/matrix-org/synapse/issues/11576))
64 - **Deprecate support for `webclient` listeners and non-HTTP(S) `web_client_location` configuration. ([\#11774](https://github.com/matrix-org/synapse/issues/11774), [\#11783](https://github.com/matrix-org/synapse/issues/11783))**
65
66
67 Internal Changes
68 ----------------
69
70 - Run `pyupgrade --py37-plus --keep-percent-format` on Synapse. ([\#11685](https://github.com/matrix-org/synapse/issues/11685))
71 - Use buildkit's cache feature to speed up docker builds. ([\#11691](https://github.com/matrix-org/synapse/issues/11691))
72 - Use `auto_attribs` and native type hints for attrs classes. ([\#11692](https://github.com/matrix-org/synapse/issues/11692), [\#11768](https://github.com/matrix-org/synapse/issues/11768))
73 - Remove debug logging for #4422, which has been closed since Synapse 0.99. ([\#11693](https://github.com/matrix-org/synapse/issues/11693))
74 - Remove fallback code for Python 2. ([\#11699](https://github.com/matrix-org/synapse/issues/11699))
75 - Add a test for [an edge case](https://github.com/matrix-org/synapse/pull/11532#discussion_r769104461) in the `/sync` logic. ([\#11701](https://github.com/matrix-org/synapse/issues/11701))
76 - Add the option to write SQLite test dbs to disk when running tests. ([\#11702](https://github.com/matrix-org/synapse/issues/11702))
77 - Improve Complement test output for Gitub Actions. ([\#11707](https://github.com/matrix-org/synapse/issues/11707))
78 - Fix docstring on `add_account_data_for_user`. ([\#11716](https://github.com/matrix-org/synapse/issues/11716))
79 - Complement environment variable name change and update `.gitignore`. ([\#11718](https://github.com/matrix-org/synapse/issues/11718))
80 - Simplify calculation of Prometheus metrics for garbage collection. ([\#11723](https://github.com/matrix-org/synapse/issues/11723))
81 - Improve accuracy of `python_twisted_reactor_tick_time` Prometheus metric. ([\#11724](https://github.com/matrix-org/synapse/issues/11724), [\#11771](https://github.com/matrix-org/synapse/issues/11771))
82 - Minor efficiency improvements when inserting many values into the database. ([\#11742](https://github.com/matrix-org/synapse/issues/11742))
83 - Invite PR authors to give themselves credit in the changelog. ([\#11744](https://github.com/matrix-org/synapse/issues/11744))
84 - Add optional debugging to investigate [issue 8631](https://github.com/matrix-org/synapse/issues/8631). ([\#11760](https://github.com/matrix-org/synapse/issues/11760))
85 - Remove `log_function` utility function and its uses. ([\#11761](https://github.com/matrix-org/synapse/issues/11761))
86 - Add a unit test that checks both `client` and `webclient` resources will function when simultaneously enabled. ([\#11765](https://github.com/matrix-org/synapse/issues/11765))
87 - Allow overriding complement commit using `COMPLEMENT_REF`. ([\#11766](https://github.com/matrix-org/synapse/issues/11766))
88 - Add some comments and type annotations for `_update_outliers_txn`. ([\#11776](https://github.com/matrix-org/synapse/issues/11776))
789
890
991 Synapse 1.50.1 (2022-01-18)
8888 yHoverFormatter: PromConsole.NumberFormatter.humanize,
8989 yUnits: "s",
9090 yTitle: "Time"
91 })
92 </script>
93
94 <h3>Pending calls per tick</h3>
95 <div id="reactor_pending_calls"></div>
96 <script>
97 new PromConsole.Graph({
98 node: document.querySelector("#reactor_pending_calls"),
99 expr: "rate(python_twisted_reactor_pending_calls_sum[30s]) / rate(python_twisted_reactor_pending_calls_count[30s])",
100 name: "[[job]]-[[index]]",
101 min: 0,
102 renderer: "line",
103 height: 150,
104 yAxisFormatter: PromConsole.NumberFormatter.humanize,
105 yHoverFormatter: PromConsole.NumberFormatter.humanize,
106 yTitle: "Pending Calls"
10791 })
10892 </script>
10993
0 matrix-synapse-py3 (1.50.2) stable; urgency=medium
1
2 * New synapse release 1.50.2.
3
4 -- Synapse Packaging team <packages@matrix.org> Mon, 24 Jan 2022 13:37:11 +0000
0 matrix-synapse-py3 (1.51.0) stable; urgency=medium
1
2 * New synapse release 1.51.0.
3
4 -- Synapse Packaging team <packages@matrix.org> Tue, 25 Jan 2022 11:28:51 +0000
5
6 matrix-synapse-py3 (1.51.0~rc2) stable; urgency=medium
7
8 * New synapse release 1.51.0~rc2.
9
10 -- Synapse Packaging team <packages@matrix.org> Mon, 24 Jan 2022 12:25:00 +0000
11
12 matrix-synapse-py3 (1.51.0~rc1) stable; urgency=medium
13
14 * New synapse release 1.51.0~rc1.
15
16 -- Synapse Packaging team <packages@matrix.org> Fri, 21 Jan 2022 10:46:02 +0000
517
618 matrix-synapse-py3 (1.50.1) stable; urgency=medium
719
2121
2222
2323
24 Also note that when joining a public room on a differnt HS via "#foo:bar.net", then you are (in the current impl) joining a room with room_id "foo". This means that it won't work if your HS already has a room with that name.
24 Also note that when joining a public room on a different HS via "#foo:bar.net", then you are (in the current impl) joining a room with room_id "foo". This means that it won't work if your HS already has a room with that name.
2525
00 # Dockerfile to build the matrixdotorg/synapse docker images.
1 #
2 # Note that it uses features which are only available in BuildKit - see
3 # https://docs.docker.com/go/buildkit/ for more information.
14 #
25 # To build the image, run `docker build` command from the root of the
36 # synapse repository:
47 #
5 # docker build -f docker/Dockerfile .
8 # DOCKER_BUILDKIT=1 docker build -f docker/Dockerfile .
69 #
710 # There is an optional PYTHON_VERSION build argument which sets the
811 # version of python to build against: for example:
912 #
10 # docker build -f docker/Dockerfile --build-arg PYTHON_VERSION=3.6 .
13 # DOCKER_BUILDKIT=1 docker build -f docker/Dockerfile --build-arg PYTHON_VERSION=3.9 .
1114 #
1215
1316 ARG PYTHON_VERSION=3.8
1821 FROM docker.io/python:${PYTHON_VERSION}-slim as builder
1922
2023 # install the OS build deps
21 RUN apt-get update && apt-get install -y \
24 #
25 # RUN --mount is specific to buildkit and is documented at
26 # https://github.com/moby/buildkit/blob/master/frontend/dockerfile/docs/syntax.md#build-mounts-run---mount.
27 # Here we use it to set up a cache for apt, to improve rebuild speeds on
28 # slow connections.
29 #
30 RUN \
31 --mount=type=cache,target=/var/cache/apt,sharing=locked \
32 --mount=type=cache,target=/var/lib/apt,sharing=locked \
33 apt-get update && apt-get install -y \
2234 build-essential \
2335 libffi-dev \
2436 libjpeg-dev \
4355 # used while you develop on the source
4456 #
4557 # This is aiming at installing the `install_requires` and `extras_require` from `setup.py`
46 RUN pip install --prefix="/install" --no-warn-script-location \
58 RUN --mount=type=cache,target=/root/.cache/pip \
59 pip install --prefix="/install" --no-warn-script-location \
4760 /synapse[all]
4861
4962 # Copy over the rest of the project
6578 LABEL org.opencontainers.image.source='https://github.com/matrix-org/synapse.git'
6679 LABEL org.opencontainers.image.licenses='Apache-2.0'
6780
68 RUN apt-get update && apt-get install -y \
81 RUN \
82 --mount=type=cache,target=/var/cache/apt,sharing=locked \
83 --mount=type=cache,target=/var/lib/apt,sharing=locked \
84 apt-get update && apt-get install -y \
6985 curl \
7086 gosu \
7187 libjpeg62-turbo \
1414
1515 It returns a JSON body like the following:
1616
17 ```json
18 {
19 "displayname": "User",
17 ```jsonc
18 {
19 "name": "@user:example.com",
20 "displayname": "User", // can be null if not set
2021 "threepids": [
2122 {
2223 "medium": "email",
3132 "validated_at": 1586458409743
3233 }
3334 ],
34 "avatar_url": "<avatar_url>",
35 "avatar_url": "<avatar_url>", // can be null if not set
36 "is_guest": 0,
3537 "admin": 0,
3638 "deactivated": 0,
3739 "shadow_banned": 0,
38 "password_hash": "$2b$12$p9B4GkqYdRTPGD",
3940 "creation_ts": 1560432506,
4041 "appservice_id": null,
4142 "consent_server_notice_sent": null,
1919 <https://docs.microsoft.com/en-us/windows/wsl/install>. Running Synapse natively
2020 on Windows is not officially supported.
2121
22 The code of Synapse is written in Python 3. To do pretty much anything, you'll need [a recent version of Python 3](https://wiki.python.org/moin/BeginnersGuide/Download).
22 The code of Synapse is written in Python 3. To do pretty much anything, you'll need [a recent version of Python 3](https://www.python.org/downloads/). Your Python also needs support for [virtual environments](https://docs.python.org/3/library/venv.html). This is usually built-in, but some Linux distributions like Debian and Ubuntu split it out into its own package. Running `sudo apt install python3-venv` should be enough.
23
24 Synapse can connect to PostgreSQL via the [psycopg2](https://pypi.org/project/psycopg2/) Python library. Building this library from source requires access to PostgreSQL's C header files. On Debian or Ubuntu Linux, these can be installed with `sudo apt install libpq-dev`.
2325
2426 The source code of Synapse is hosted on GitHub. You will also need [a recent version of git](https://github.com/git-guides/install-git).
2527
168170 SYNAPSE_TEST_LOG_LEVEL=DEBUG trial tests
169171 ```
170172
173 By default, tests will use an in-memory SQLite database for test data. For additional
174 help with debugging, one can use an on-disk SQLite database file instead, in order to
175 review database state during and after running tests. This can be done by setting
176 the `SYNAPSE_TEST_PERSIST_SQLITE_DB` environment variable. Doing so will cause the
177 database state to be stored in a file named `test.db` under the trial process'
178 working directory. Typically, this ends up being `_trial_temp/test.db`. For example:
179
180 ```sh
181 SYNAPSE_TEST_PERSIST_SQLITE_DB=1 trial tests
182 ```
183
184 The database file can then be inspected with:
185
186 ```sh
187 sqlite3 _trial_temp/test.db
188 ```
189
190 Note that the database file is cleared at the beginning of each test run. Thus it
191 will always only contain the data generated by the *last run test*. Though generally
192 when debugging, one is only running a single test anyway.
193
171194 ### Running tests under PostgreSQL
172195
173196 Invoking `trial` as above will use an in-memory SQLite database. This is great for
3434 5. If the media is HTML:
3535 1. Decodes the HTML via the stored file.
3636 2. Generates an Open Graph response from the HTML.
37 3. If an image exists in the Open Graph response:
37 3. If a JSON oEmbed URL was found in the HTML via autodiscovery:
38 1. Downloads the URL and stores it into a file via the media storage provider
39 and saves the local media metadata.
40 2. Convert the oEmbed response to an Open Graph response.
41 3. Override any Open Graph data from the HTML with data from oEmbed.
42 4. If an image exists in the Open Graph response:
3843 1. Downloads the URL and stores it into a file via the media storage
3944 provider and saves the local media metadata.
4045 2. Generates thumbnails.
389389
390390 ### Facebook
391391
392 Like Github, Facebook provide a custom OAuth2 API rather than an OIDC-compliant
393 one so requires a little more configuration.
394
395392 0. You will need a Facebook developer account. You can register for one
396393 [here](https://developers.facebook.com/async/registration/).
397394 1. On the [apps](https://developers.facebook.com/apps/) page of the developer
411408 idp_name: Facebook
412409 idp_brand: "facebook" # optional: styling hint for clients
413410 discover: false
414 issuer: "https://facebook.com"
411 issuer: "https://www.facebook.com"
415412 client_id: "your-client-id" # TO BE FILLED
416413 client_secret: "your-client-secret" # TO BE FILLED
417414 scopes: ["openid", "email"]
418 authorization_endpoint: https://facebook.com/dialog/oauth
419 token_endpoint: https://graph.facebook.com/v9.0/oauth/access_token
420 user_profile_method: "userinfo_endpoint"
421 userinfo_endpoint: "https://graph.facebook.com/v9.0/me?fields=id,name,email,picture"
422 user_mapping_provider:
423 config:
424 subject_claim: "id"
425 display_name_template: "{{ user.name }}"
415 authorization_endpoint: "https://facebook.com/dialog/oauth"
416 token_endpoint: "https://graph.facebook.com/v9.0/oauth/access_token"
417 jwks_uri: "https://www.facebook.com/.well-known/oauth/openid/jwks/"
418 user_mapping_provider:
419 config:
420 display_name_template: "{{ user.name }}"
421 email_template: "{{ '{{ user.email }}' }}"
426422 ```
427423
428424 Relevant documents:
429 * https://developers.facebook.com/docs/facebook-login/manually-build-a-login-flow
430 * Using Facebook's Graph API: https://developers.facebook.com/docs/graph-api/using-graph-api/
431 * Reference to the User endpoint: https://developers.facebook.com/docs/graph-api/reference/user
425 * [Manually Build a Login Flow](https://developers.facebook.com/docs/facebook-login/manually-build-a-login-flow)
426 * [Using Facebook's Graph API](https://developers.facebook.com/docs/graph-api/using-graph-api/)
427 * [Reference to the User endpoint](https://developers.facebook.com/docs/graph-api/reference/user)
428
429 Facebook do have an [OIDC discovery endpoint](https://www.facebook.com/.well-known/openid-configuration),
430 but it has a `response_types_supported` which excludes "code" (which we rely on, and
431 is even mentioned in their [documentation](https://developers.facebook.com/docs/facebook-login/manually-build-a-login-flow#login)),
432 so we have to disable discovery and configure the URIs manually.
432433
433434 ### Gitea
434435
7373 #
7474 pid_file: DATADIR/homeserver.pid
7575
76 # The absolute URL to the web client which /_matrix/client will redirect
77 # to if 'webclient' is configured under the 'listeners' configuration.
78 #
79 # This option can be also set to the filesystem path to the web client
80 # which will be served at /_matrix/client/ if 'webclient' is configured
81 # under the 'listeners' configuration, however this is a security risk:
82 # https://github.com/matrix-org/synapse#security-note
76 # The absolute URL to the web client which / will redirect to.
8377 #
8478 #web_client_location: https://riot.example.com/
8579
163157 # The default room version for newly created rooms.
164158 #
165159 # Known room versions are listed here:
166 # https://matrix.org/docs/spec/#complete-list-of-room-versions
160 # https://spec.matrix.org/latest/rooms/#complete-list-of-room-versions
167161 #
168162 # For example, for room version 1, default_room_version should be set
169163 # to "1".
308302 #
309303 # static: static resources under synapse/static (/_matrix/static). (Mostly
310304 # useful for 'fallback authentication'.)
311 #
312 # webclient: A web client. Requires web_client_location to be set.
313305 #
314306 listeners:
315307 # TLS-enabled listener: for when matrix traffic is sent directly to synapse.
15021494 #additional_event_types:
15031495 # - org.example.custom.event.type
15041496
1497 # We record the IP address of clients used to access the API for various
1498 # reasons, including displaying it to the user in the "Where you're signed in"
1499 # dialog.
1500 #
1501 # By default, when puppeting another user via the admin API, the client IP
1502 # address is recorded against the user who created the access token (ie, the
1503 # admin user), and *not* the puppeted user.
1504 #
1505 # Uncomment the following to also record the IP address against the puppeted
1506 # user. (This also means that the puppeted user will count as an "active" user
1507 # for the purpose of monthly active user tracking - see 'limit_usage_by_mau' etc
1508 # above.)
1509 #
1510 #track_puppeted_user_ips: true
1511
15051512
15061513 # A list of application service config files to use
15071514 #
18691876 # Defaults to false. Avoid this in production.
18701877 #
18711878 # user_profile_method: Whether to fetch the user profile from the userinfo
1872 # endpoint. Valid values are: 'auto' or 'userinfo_endpoint'.
1873 #
1874 # Defaults to 'auto', which fetches the userinfo endpoint if 'openid' is
1875 # included in 'scopes'. Set to 'userinfo_endpoint' to always fetch the
1879 # endpoint, or to rely on the data returned in the id_token from the
1880 # token_endpoint.
1881 #
1882 # Valid values are: 'auto' or 'userinfo_endpoint'.
1883 #
1884 # Defaults to 'auto', which uses the userinfo endpoint if 'openid' is
1885 # not included in 'scopes'. Set to 'userinfo_endpoint' to always use the
18761886 # userinfo endpoint.
18771887 #
18781888 # allow_existing_users: set to 'true' to allow a user logging in via OIDC to
193193 System requirements:
194194
195195 - POSIX-compliant system (tested on Linux & OS X)
196 - Python 3.6 or later, up to Python 3.9.
196 - Python 3.7 or later, up to Python 3.9.
197197 - At least 1GB of free RAM if you want to join large public rooms like #matrix:matrix.org
198198
199199 To install the Synapse homeserver run:
136136
137137 # TLS private key file
138138 pkey=/path/to/privkey.pem
139
140 # Ensure the configuration lines that disable TLS/DTLS are commented-out or removed
141 #no-tls
142 #no-dtls
139143 ```
140144
141145 In this case, replace the `turn:` schemes in the `turn_uris` settings below
143147
144148 We recommend that you only try to set up TLS/DTLS once you have set up a
145149 basic installation and got it working.
150
151 NB: If your TLS certificate was provided by Let's Encrypt, TLS/DTLS will
152 not work with any Matrix client that uses Chromium's WebRTC library. This
153 currently includes Element Android & iOS; for more details, see their
154 [respective](https://github.com/vector-im/element-android/issues/1533)
155 [issues](https://github.com/vector-im/element-ios/issues/2712) as well as the underlying
156 [WebRTC issue](https://bugs.chromium.org/p/webrtc/issues/detail?id=11710).
157 Consider using a ZeroSSL certificate for your TURN server as a working alternative.
146158
147159 1. Ensure your firewall allows traffic into the TURN server on the ports
148160 you've configured it to listen on (By default: 3478 and 5349 for TURN
249261 * Check that you have opened your firewall to allow UDP traffic to the UDP
250262 relay ports (49152-65535 by default).
251263
264 * Try disabling `coturn`'s TLS/DTLS listeners and enable only its (unencrypted)
265 TCP/UDP listeners. (This will only leave signaling traffic unencrypted;
266 voice & video WebRTC traffic is always encrypted.)
267
252268 * Some WebRTC implementations (notably, that of Google Chrome) appear to get
253269 confused by TURN servers which are reachable over IPv6 (this appears to be
254270 an unexpected side-effect of its handling of multiple IP addresses as
8383 wget https://packages.matrix.org/debian/pool/main/m/matrix-synapse-py3/matrix-synapse-py3_1.3.0+stretch1_amd64.deb
8484 dpkg -i matrix-synapse-py3_1.3.0+stretch1_amd64.deb
8585 ```
86
87 # Upgrading to v1.51.0
88
89 ## Deprecation of `webclient` listeners and non-HTTP(S) `web_client_location`
90
91 Listeners of type `webclient` are deprecated and scheduled to be removed in
92 Synapse v1.53.0.
93
94 Similarly, a non-HTTP(S) `web_client_location` configuration is deprecated and
95 will become a configuration error in Synapse v1.53.0.
96
8697
8798 # Upgrading to v1.50.0
8899
77 # By default the script will fetch the latest Complement master branch and
88 # run tests with that. This can be overridden to use a custom Complement
99 # checkout by setting the COMPLEMENT_DIR environment variable to the
10 # filepath of a local Complement checkout.
10 # filepath of a local Complement checkout or by setting the COMPLEMENT_REF
11 # environment variable to pull a different branch or commit.
1112 #
1213 # By default Synapse is run in monolith mode. This can be overridden by
1314 # setting the WORKERS environment variable.
2223 # Exit if a line returns a non-zero exit code
2324 set -e
2425
26 # enable buildkit for the docker builds
27 export DOCKER_BUILDKIT=1
28
2529 # Change to the repository root
2630 cd "$(dirname $0)/.."
2731
2832 # Check for a user-specified Complement checkout
2933 if [[ -z "$COMPLEMENT_DIR" ]]; then
30 echo "COMPLEMENT_DIR not set. Fetching the latest Complement checkout..."
31 wget -Nq https://github.com/matrix-org/complement/archive/master.tar.gz
32 tar -xzf master.tar.gz
33 COMPLEMENT_DIR=complement-master
34 echo "Checkout available at 'complement-master'"
34 COMPLEMENT_REF=${COMPLEMENT_REF:-master}
35 echo "COMPLEMENT_DIR not set. Fetching Complement checkout from ${COMPLEMENT_REF}..."
36 wget -Nq https://github.com/matrix-org/complement/archive/${COMPLEMENT_REF}.tar.gz
37 tar -xzf ${COMPLEMENT_REF}.tar.gz
38 COMPLEMENT_DIR=complement-${COMPLEMENT_REF}
39 echo "Checkout available at 'complement-${COMPLEMENT_REF}'"
3540 fi
3641
3742 # Build the base Synapse image from the local checkout
4651 COMPLEMENT_DOCKERFILE=SynapseWorkers.Dockerfile
4752 # And provide some more configuration to complement.
4853 export COMPLEMENT_CA=true
49 export COMPLEMENT_VERSION_CHECK_ITERATIONS=500
54 export COMPLEMENT_SPAWN_HS_TIMEOUT_SECS=25
5055 else
5156 export COMPLEMENT_BASE_IMAGE=complement-synapse
5257 COMPLEMENT_DOCKERFILE=Synapse.Dockerfile
6469 fi
6570
6671 # Run the tests!
72 echo "Images built; running complement"
6773 go test -v -tags synapse_blacklist,msc2403 -count=1 $EXTRA_COMPLEMENT_ARGS ./tests/...
4646 except ImportError:
4747 pass
4848
49 __version__ = "1.50.2"
49 __version__ = "1.51.0"
5050
5151 if bool(os.environ.get("SYNAPSE_TEST_PATCH_LOG_CONTEXTS", False)):
5252 # We import here so that we don't have to install a bunch of deps when
4545 ips: List[str] = attr.Factory(list)
4646
4747
48 def get_recent_users(txn: LoggingTransaction, since_ms: int) -> List[UserInfo]:
48 def get_recent_users(
49 txn: LoggingTransaction, since_ms: int, exclude_app_service: bool
50 ) -> List[UserInfo]:
4951 """Fetches recently registered users and some info on them."""
5052
5153 sql = """
5456 ? <= creation_ts
5557 AND deactivated = 0
5658 """
59
60 if exclude_app_service:
61 sql += " AND appservice_id IS NULL"
5762
5863 txn.execute(sql, (since_ms / 1000,))
5964
112117 "-e",
113118 "--exclude-emails",
114119 action="store_true",
115 help="Exclude users that have validated email addresses",
120 help="Exclude users that have validated email addresses.",
116121 )
117122 parser.add_argument(
118123 "-u",
119124 "--only-users",
120125 action="store_true",
121126 help="Only print user IDs that match.",
127 )
128 parser.add_argument(
129 "-a",
130 "--exclude-app-service",
131 help="Exclude appservice users.",
132 action="store_true",
122133 )
123134
124135 config = ReviewConfig()
132143
133144 since_ms = time.time() * 1000 - Config.parse_duration(config_args.since)
134145 exclude_users_with_email = config_args.exclude_emails
146 exclude_users_with_appservice = config_args.exclude_app_service
135147 include_context = not config_args.only_users
136148
137149 for database_config in config.database.databases:
142154
143155 with make_conn(database_config, engine, "review_recent_signups") as db_conn:
144156 # This generates a type of Cursor, not LoggingTransaction.
145 user_infos = get_recent_users(db_conn.cursor(), since_ms) # type: ignore[arg-type]
157 user_infos = get_recent_users(db_conn.cursor(), since_ms, exclude_users_with_appservice) # type: ignore[arg-type]
146158
147159 for user_info in user_infos:
148160 if exclude_users_with_email and user_info.emails:
7070 self._auth_blocking = AuthBlocking(self.hs)
7171
7272 self._track_appservice_user_ips = hs.config.appservice.track_appservice_user_ips
73 self._track_puppeted_user_ips = hs.config.api.track_puppeted_user_ips
7374 self._macaroon_secret_key = hs.config.key.macaroon_secret_key
7475 self._force_tracing_for_users = hs.config.tracing.force_tracing_for_users
7576
245246 user_agent=user_agent,
246247 device_id=device_id,
247248 )
249 # Track also the puppeted user client IP if enabled and the user is puppeting
250 if (
251 user_info.user_id != user_info.token_owner
252 and self._track_puppeted_user_ips
253 ):
254 await self.store.insert_client_ip(
255 user_id=user_info.user_id,
256 access_token=access_token,
257 ip=ip_addr,
258 user_agent=user_agent,
259 device_id=device_id,
260 )
248261
249262 if is_guest and not allow_guest:
250263 raise AuthError(
4545 UNSTABLE = "unstable"
4646
4747
48 @attr.s(slots=True, frozen=True)
48 @attr.s(slots=True, frozen=True, auto_attribs=True)
4949 class RoomVersion:
5050 """An object which describes the unique attributes of a room version."""
5151
52 identifier = attr.ib(type=str) # the identifier for this version
53 disposition = attr.ib(type=str) # one of the RoomDispositions
54 event_format = attr.ib(type=int) # one of the EventFormatVersions
55 state_res = attr.ib(type=int) # one of the StateResolutionVersions
56 enforce_key_validity = attr.ib(type=bool)
52 identifier: str # the identifier for this version
53 disposition: str # one of the RoomDispositions
54 event_format: int # one of the EventFormatVersions
55 state_res: int # one of the StateResolutionVersions
56 enforce_key_validity: bool
5757
5858 # Before MSC2432, m.room.aliases had special auth rules and redaction rules
59 special_case_aliases_auth = attr.ib(type=bool)
59 special_case_aliases_auth: bool
6060 # Strictly enforce canonicaljson, do not allow:
6161 # * Integers outside the range of [-2 ^ 53 + 1, 2 ^ 53 - 1]
6262 # * Floats
6363 # * NaN, Infinity, -Infinity
64 strict_canonicaljson = attr.ib(type=bool)
64 strict_canonicaljson: bool
6565 # MSC2209: Check 'notifications' key while verifying
6666 # m.room.power_levels auth rules.
67 limit_notifications_power_levels = attr.ib(type=bool)
67 limit_notifications_power_levels: bool
6868 # MSC2174/MSC2176: Apply updated redaction rules algorithm.
69 msc2176_redaction_rules = attr.ib(type=bool)
69 msc2176_redaction_rules: bool
7070 # MSC3083: Support the 'restricted' join_rule.
71 msc3083_join_rules = attr.ib(type=bool)
71 msc3083_join_rules: bool
7272 # MSC3375: Support for the proper redaction rules for MSC3083. This mustn't
7373 # be enabled if MSC3083 is not.
74 msc3375_redaction_rules = attr.ib(type=bool)
74 msc3375_redaction_rules: bool
7575 # MSC2403: Allows join_rules to be set to 'knock', changes auth rules to allow sending
7676 # m.room.membership event with membership 'knock'.
77 msc2403_knocking = attr.ib(type=bool)
77 msc2403_knocking: bool
7878 # MSC2716: Adds m.room.power_levels -> content.historical field to control
7979 # whether "insertion", "chunk", "marker" events can be sent
80 msc2716_historical = attr.ib(type=bool)
80 msc2716_historical: bool
8181 # MSC2716: Adds support for redacting "insertion", "chunk", and "marker" events
82 msc2716_redactions = attr.ib(type=bool)
82 msc2716_redactions: bool
8383
8484
8585 class RoomVersions:
5959 from synapse.events.third_party_rules import load_legacy_third_party_event_rules
6060 from synapse.handlers.auth import load_legacy_password_auth_providers
6161 from synapse.logging.context import PreserveLoggingContext
62 from synapse.metrics import register_threadpool
62 from synapse.metrics import install_gc_manager, register_threadpool
6363 from synapse.metrics.background_process_metrics import wrap_as_background_process
6464 from synapse.metrics.jemalloc import setup_jemalloc_stats
6565 from synapse.types import ISynapseReactor
158158 change_resource_limit(soft_file_limit)
159159 if gc_thresholds:
160160 gc.set_threshold(*gc_thresholds)
161 install_gc_manager()
161162 run_command()
162163
163164 # make sure that we run the reactor with the sentinel log context,
130130 resources.update(self._module_web_resources)
131131 self._module_web_resources_consumed = True
132132
133 # try to find something useful to redirect '/' to
134 if WEB_CLIENT_PREFIX in resources:
135 root_resource: Resource = RootOptionsRedirectResource(WEB_CLIENT_PREFIX)
133 # Try to find something useful to serve at '/':
134 #
135 # 1. Redirect to the web client if it is an HTTP(S) URL.
136 # 2. Redirect to the web client served via Synapse.
137 # 3. Redirect to the static "Synapse is running" page.
138 # 4. Do not redirect and use a blank resource.
139 if self.config.server.web_client_location_is_redirect:
140 root_resource: Resource = RootOptionsRedirectResource(
141 self.config.server.web_client_location
142 )
143 elif WEB_CLIENT_PREFIX in resources:
144 root_resource = RootOptionsRedirectResource(WEB_CLIENT_PREFIX)
136145 elif STATIC_PREFIX in resources:
137146 root_resource = RootOptionsRedirectResource(STATIC_PREFIX)
138147 else:
261270 resources[SERVER_KEY_V2_PREFIX] = KeyApiV2Resource(self)
262271
263272 if name == "webclient":
273 # webclient listeners are deprecated as of Synapse v1.51.0, remove it
274 # in > v1.53.0.
264275 webclient_loc = self.config.server.web_client_location
265276
266277 if webclient_loc is None:
267278 logger.warning(
268279 "Not enabling webclient resource, as web_client_location is unset."
269280 )
270 elif webclient_loc.startswith("http://") or webclient_loc.startswith(
271 "https://"
272 ):
281 elif self.config.server.web_client_location_is_redirect:
273282 resources[WEB_CLIENT_PREFIX] = RootRedirect(webclient_loc)
274283 else:
275284 logger.warning(
2828 def read_config(self, config: JsonDict, **kwargs):
2929 validate_config(_MAIN_SCHEMA, config, ())
3030 self.room_prejoin_state = list(self._get_prejoin_state_types(config))
31 self.track_puppeted_user_ips = config.get("track_puppeted_user_ips", False)
3132
3233 def generate_config_section(cls, **kwargs) -> str:
3334 formatted_default_state_types = "\n".join(
5859 #
5960 #additional_event_types:
6061 # - org.example.custom.event.type
62
63 # We record the IP address of clients used to access the API for various
64 # reasons, including displaying it to the user in the "Where you're signed in"
65 # dialog.
66 #
67 # By default, when puppeting another user via the admin API, the client IP
68 # address is recorded against the user who created the access token (ie, the
69 # admin user), and *not* the puppeted user.
70 #
71 # Uncomment the following to also record the IP address against the puppeted
72 # user. (This also means that the puppeted user will count as an "active" user
73 # for the purpose of monthly active user tracking - see 'limit_usage_by_mau' etc
74 # above.)
75 #
76 #track_puppeted_user_ips: true
6177 """ % {
6278 "formatted_default_state_types": formatted_default_state_types
6379 }
137153 "properties": {
138154 "room_prejoin_state": _ROOM_PREJOIN_STATE_CONFIG_SCHEMA,
139155 "room_invite_state_types": _ROOM_INVITE_STATE_TYPES_SCHEMA,
156 "track_puppeted_user_ips": {
157 "type": "boolean",
158 },
140159 },
141160 }
5454 ---------------------------------------------------------------------------------------"""
5555
5656
57 @attr.s(slots=True, frozen=True)
57 @attr.s(slots=True, frozen=True, auto_attribs=True)
5858 class EmailSubjectConfig:
59 message_from_person_in_room = attr.ib(type=str)
60 message_from_person = attr.ib(type=str)
61 messages_from_person = attr.ib(type=str)
62 messages_in_room = attr.ib(type=str)
63 messages_in_room_and_others = attr.ib(type=str)
64 messages_from_person_and_others = attr.ib(type=str)
65 invite_from_person = attr.ib(type=str)
66 invite_from_person_to_room = attr.ib(type=str)
67 invite_from_person_to_space = attr.ib(type=str)
68 password_reset = attr.ib(type=str)
69 email_validation = attr.ib(type=str)
59 message_from_person_in_room: str
60 message_from_person: str
61 messages_from_person: str
62 messages_in_room: str
63 messages_in_room_and_others: str
64 messages_from_person_and_others: str
65 invite_from_person: str
66 invite_from_person_to_room: str
67 invite_from_person_to_space: str
68 password_reset: str
69 email_validation: str
7070
7171
7272 class EmailConfig(Config):
147147 # Defaults to false. Avoid this in production.
148148 #
149149 # user_profile_method: Whether to fetch the user profile from the userinfo
150 # endpoint. Valid values are: 'auto' or 'userinfo_endpoint'.
151 #
152 # Defaults to 'auto', which fetches the userinfo endpoint if 'openid' is
153 # included in 'scopes'. Set to 'userinfo_endpoint' to always fetch the
150 # endpoint, or to rely on the data returned in the id_token from the
151 # token_endpoint.
152 #
153 # Valid values are: 'auto' or 'userinfo_endpoint'.
154 #
155 # Defaults to 'auto', which uses the userinfo endpoint if 'openid' is
156 # not included in 'scopes'. Set to 'userinfo_endpoint' to always use the
154157 # userinfo endpoint.
155158 #
156159 # allow_existing_users: set to 'true' to allow a user logging in via OIDC to
199199 """Object describing the http-specific parts of the config of a listener"""
200200
201201 x_forwarded: bool = False
202 resources: List[HttpResourceConfig] = attr.ib(factory=list)
203 additional_resources: Dict[str, dict] = attr.ib(factory=dict)
202 resources: List[HttpResourceConfig] = attr.Factory(list)
203 additional_resources: Dict[str, dict] = attr.Factory(dict)
204204 tag: Optional[str] = None
205205
206206
258258 raise ConfigError(str(e))
259259
260260 self.pid_file = self.abspath(config.get("pid_file"))
261 self.web_client_location = config.get("web_client_location", None)
262261 self.soft_file_limit = config.get("soft_file_limit", 0)
263262 self.daemonize = config.get("daemonize")
264263 self.print_pidfile = config.get("print_pidfile")
505504 l2.append(listener)
506505 self.listeners = l2
507506
508 if not self.web_client_location:
509 _warn_if_webclient_configured(self.listeners)
507 self.web_client_location = config.get("web_client_location", None)
508 self.web_client_location_is_redirect = self.web_client_location and (
509 self.web_client_location.startswith("http://")
510 or self.web_client_location.startswith("https://")
511 )
512 # A non-HTTP(S) web client location is deprecated.
513 if self.web_client_location and not self.web_client_location_is_redirect:
514 logger.warning(NO_MORE_NONE_HTTP_WEB_CLIENT_LOCATION_WARNING)
515
516 # Warn if webclient is configured for a worker.
517 _warn_if_webclient_configured(self.listeners)
510518
511519 self.gc_thresholds = read_gc_thresholds(config.get("gc_thresholds", None))
512520 self.gc_seconds = self.read_gc_intervals(config.get("gc_min_interval", None))
792800 #
793801 pid_file: %(pid_file)s
794802
795 # The absolute URL to the web client which /_matrix/client will redirect
796 # to if 'webclient' is configured under the 'listeners' configuration.
797 #
798 # This option can be also set to the filesystem path to the web client
799 # which will be served at /_matrix/client/ if 'webclient' is configured
800 # under the 'listeners' configuration, however this is a security risk:
801 # https://github.com/matrix-org/synapse#security-note
803 # The absolute URL to the web client which / will redirect to.
802804 #
803805 #web_client_location: https://riot.example.com/
804806
882884 # The default room version for newly created rooms.
883885 #
884886 # Known room versions are listed here:
885 # https://matrix.org/docs/spec/#complete-list-of-room-versions
887 # https://spec.matrix.org/latest/rooms/#complete-list-of-room-versions
886888 #
887889 # For example, for room version 1, default_room_version should be set
888890 # to "1".
10091011 #
10101012 # static: static resources under synapse/static (/_matrix/static). (Mostly
10111013 # useful for 'fallback authentication'.)
1012 #
1013 # webclient: A web client. Requires web_client_location to be set.
10141014 #
10151015 listeners:
10161016 # TLS-enabled listener: for when matrix traffic is sent directly to synapse.
13481348 return ListenerConfig(port, bind_addresses, listener_type, tls, http_config)
13491349
13501350
1351 NO_MORE_NONE_HTTP_WEB_CLIENT_LOCATION_WARNING = """
1352 Synapse no longer supports serving a web client. To remove this warning,
1353 configure 'web_client_location' with an HTTP(S) URL.
1354 """
1355
1356
13511357 NO_MORE_WEB_CLIENT_WARNING = """
1352 Synapse no longer includes a web client. To enable a web client, configure
1353 web_client_location. To remove this warning, remove 'webclient' from the 'listeners'
1358 Synapse no longer includes a web client. To redirect the root resource to a web client, configure
1359 'web_client_location'. To remove this warning, remove 'webclient' from the 'listeners'
13541360 configuration.
13551361 """
13561362
5050 return obj
5151
5252
53 @attr.s
53 @attr.s(auto_attribs=True)
5454 class InstanceLocationConfig:
5555 """The host and port to talk to an instance via HTTP replication."""
5656
57 host = attr.ib(type=str)
58 port = attr.ib(type=int)
57 host: str
58 port: int
5959
6060
6161 @attr.s
7676 can only be a single instance.
7777 """
7878
79 events = attr.ib(
80 default=["master"],
81 type=List[str],
82 converter=_instance_to_list_converter,
83 )
84 typing = attr.ib(
85 default=["master"],
86 type=List[str],
87 converter=_instance_to_list_converter,
88 )
89 to_device = attr.ib(
90 default=["master"],
91 type=List[str],
92 converter=_instance_to_list_converter,
93 )
94 account_data = attr.ib(
95 default=["master"],
96 type=List[str],
97 converter=_instance_to_list_converter,
98 )
99 receipts = attr.ib(
100 default=["master"],
101 type=List[str],
102 converter=_instance_to_list_converter,
103 )
104 presence = attr.ib(
105 default=["master"],
106 type=List[str],
79 events: List[str] = attr.ib(
80 default=["master"],
81 converter=_instance_to_list_converter,
82 )
83 typing: List[str] = attr.ib(
84 default=["master"],
85 converter=_instance_to_list_converter,
86 )
87 to_device: List[str] = attr.ib(
88 default=["master"],
89 converter=_instance_to_list_converter,
90 )
91 account_data: List[str] = attr.ib(
92 default=["master"],
93 converter=_instance_to_list_converter,
94 )
95 receipts: List[str] = attr.ib(
96 default=["master"],
97 converter=_instance_to_list_converter,
98 )
99 presence: List[str] = attr.ib(
100 default=["master"],
107101 converter=_instance_to_list_converter,
108102 )
109103
5757 logger = logging.getLogger(__name__)
5858
5959
60 @attr.s(slots=True, cmp=False)
60 @attr.s(slots=True, frozen=True, cmp=False, auto_attribs=True)
6161 class VerifyJsonRequest:
6262 """
6363 A request to verify a JSON object.
7777 key_ids: The set of key_ids to that could be used to verify the JSON object
7878 """
7979
80 server_name = attr.ib(type=str)
81 get_json_object = attr.ib(type=Callable[[], JsonDict])
82 minimum_valid_until_ts = attr.ib(type=int)
83 key_ids = attr.ib(type=List[str])
80 server_name: str
81 get_json_object: Callable[[], JsonDict]
82 minimum_valid_until_ts: int
83 key_ids: List[str]
8484
8585 @staticmethod
8686 def from_json_object(
123123 pass
124124
125125
126 @attr.s(slots=True)
126 @attr.s(slots=True, frozen=True, auto_attribs=True)
127127 class _FetchKeyRequest:
128128 """A request for keys for a given server.
129129
137137 key_ids: The IDs of the keys to attempt to fetch
138138 """
139139
140 server_name = attr.ib(type=str)
141 minimum_valid_until_ts = attr.ib(type=int)
142 key_ids = attr.ib(type=List[str])
140 server_name: str
141 minimum_valid_until_ts: int
142 key_ids: List[str]
143143
144144
145145 class Keyring:
2727 from synapse.storage.databases.main import DataStore
2828
2929
30 @attr.s(slots=True)
30 @attr.s(slots=True, auto_attribs=True)
3131 class EventContext:
3232 """
3333 Holds information relevant to persisting an event
102102 accessed via get_prev_state_ids.
103103 """
104104
105 rejected = attr.ib(default=False, type=Union[bool, str])
106 _state_group = attr.ib(default=None, type=Optional[int])
107 state_group_before_event = attr.ib(default=None, type=Optional[int])
108 prev_group = attr.ib(default=None, type=Optional[int])
109 delta_ids = attr.ib(default=None, type=Optional[StateMap[str]])
110 app_service = attr.ib(default=None, type=Optional[ApplicationService])
111
112 _current_state_ids = attr.ib(default=None, type=Optional[StateMap[str]])
113 _prev_state_ids = attr.ib(default=None, type=Optional[StateMap[str]])
105 rejected: Union[bool, str] = False
106 _state_group: Optional[int] = None
107 state_group_before_event: Optional[int] = None
108 prev_group: Optional[int] = None
109 delta_ids: Optional[StateMap[str]] = None
110 app_service: Optional[ApplicationService] = None
111
112 _current_state_ids: Optional[StateMap[str]] = None
113 _prev_state_ids: Optional[StateMap[str]] = None
114114
115115 @staticmethod
116116 def with_state(
1313 # limitations under the License.
1414 import collections.abc
1515 import re
16 from typing import (
17 TYPE_CHECKING,
18 Any,
19 Callable,
20 Dict,
21 Iterable,
22 List,
23 Mapping,
24 Optional,
25 Union,
26 )
16 from typing import Any, Callable, Dict, Iterable, List, Mapping, Optional, Union
2717
2818 from frozendict import frozendict
2919
3121 from synapse.api.errors import Codes, SynapseError
3222 from synapse.api.room_versions import RoomVersion
3323 from synapse.types import JsonDict
34 from synapse.util.async_helpers import yieldable_gather_results
3524 from synapse.util.frozenutils import unfreeze
3625
3726 from . import EventBase
38
39 if TYPE_CHECKING:
40 from synapse.server import HomeServer
4127
4228 # Split strings on "." but not "\." This uses a negative lookbehind assertion for '\'
4329 # (?<!stuff) matches if the current position in the string is not preceded
384370 clients.
385371 """
386372
387 def __init__(self, hs: "HomeServer"):
388 self.store = hs.get_datastore()
389 self._msc1849_enabled = hs.config.experimental.msc1849_enabled
390 self._msc3440_enabled = hs.config.experimental.msc3440_enabled
391
392 async def serialize_event(
373 def serialize_event(
393374 self,
394375 event: Union[JsonDict, EventBase],
395376 time_now: int,
396377 *,
397 bundle_aggregations: bool = False,
378 bundle_aggregations: Optional[Dict[str, JsonDict]] = None,
398379 **kwargs: Any,
399380 ) -> JsonDict:
400381 """Serializes a single event.
417398 serialized_event = serialize_event(event, time_now, **kwargs)
418399
419400 # Check if there are any bundled aggregations to include with the event.
420 #
421 # Do not bundle aggregations if any of the following at true:
422 #
423 # * Support is disabled via the configuration or the caller.
424 # * The event is a state event.
425 # * The event has been redacted.
426 if (
427 self._msc1849_enabled
428 and bundle_aggregations
429 and not event.is_state()
430 and not event.internal_metadata.is_redacted()
431 ):
432 await self._injected_bundled_aggregations(event, time_now, serialized_event)
401 if bundle_aggregations:
402 event_aggregations = bundle_aggregations.get(event.event_id)
403 if event_aggregations:
404 self._inject_bundled_aggregations(
405 event,
406 time_now,
407 bundle_aggregations[event.event_id],
408 serialized_event,
409 )
433410
434411 return serialized_event
435412
436 async def _injected_bundled_aggregations(
437 self, event: EventBase, time_now: int, serialized_event: JsonDict
413 def _inject_bundled_aggregations(
414 self,
415 event: EventBase,
416 time_now: int,
417 aggregations: JsonDict,
418 serialized_event: JsonDict,
438419 ) -> None:
439420 """Potentially injects bundled aggregations into the unsigned portion of the serialized event.
440421
441422 Args:
442423 event: The event being serialized.
443424 time_now: The current time in milliseconds
425 aggregations: The bundled aggregation to serialize.
444426 serialized_event: The serialized event which may be modified.
445427
446428 """
447 # Do not bundle aggregations for an event which represents an edit or an
448 # annotation. It does not make sense for them to have related events.
449 relates_to = event.content.get("m.relates_to")
450 if isinstance(relates_to, (dict, frozendict)):
451 relation_type = relates_to.get("rel_type")
452 if relation_type in (RelationTypes.ANNOTATION, RelationTypes.REPLACE):
453 return
454
455 event_id = event.event_id
456 room_id = event.room_id
457
458 # The bundled aggregations to include.
459 aggregations = {}
460
461 annotations = await self.store.get_aggregation_groups_for_event(
462 event_id, room_id
463 )
464 if annotations.chunk:
465 aggregations[RelationTypes.ANNOTATION] = annotations.to_dict()
466
467 references = await self.store.get_relations_for_event(
468 event_id, room_id, RelationTypes.REFERENCE, direction="f"
469 )
470 if references.chunk:
471 aggregations[RelationTypes.REFERENCE] = references.to_dict()
472
473 edit = None
474 if event.type == EventTypes.Message:
475 edit = await self.store.get_applicable_edit(event_id, room_id)
476
477 if edit:
429 # Make a copy in-case the object is cached.
430 aggregations = aggregations.copy()
431
432 if RelationTypes.REPLACE in aggregations:
478433 # If there is an edit replace the content, preserving existing
479434 # relations.
435 edit = aggregations[RelationTypes.REPLACE]
480436
481437 # Ensure we take copies of the edit content, otherwise we risk modifying
482438 # the original event.
501457 }
502458
503459 # If this event is the start of a thread, include a summary of the replies.
504 if self._msc3440_enabled:
505 (
506 thread_count,
507 latest_thread_event,
508 ) = await self.store.get_thread_summary(event_id, room_id)
509 if latest_thread_event:
510 aggregations[RelationTypes.THREAD] = {
511 # Don't bundle aggregations as this could recurse forever.
512 "latest_event": await self.serialize_event(
513 latest_thread_event, time_now, bundle_aggregations=False
514 ),
515 "count": thread_count,
516 }
517
518 # If any bundled aggregations were found, include them.
519 if aggregations:
520 serialized_event["unsigned"].setdefault("m.relations", {}).update(
521 aggregations
460 if RelationTypes.THREAD in aggregations:
461 # Serialize the latest thread event.
462 latest_thread_event = aggregations[RelationTypes.THREAD]["latest_event"]
463
464 # Don't bundle aggregations as this could recurse forever.
465 aggregations[RelationTypes.THREAD]["latest_event"] = self.serialize_event(
466 latest_thread_event, time_now, bundle_aggregations=None
522467 )
523468
524 async def serialize_events(
469 # Include the bundled aggregations in the event.
470 serialized_event["unsigned"].setdefault("m.relations", {}).update(aggregations)
471
472 def serialize_events(
525473 self, events: Iterable[Union[JsonDict, EventBase]], time_now: int, **kwargs: Any
526474 ) -> List[JsonDict]:
527475 """Serializes multiple events.
534482 Returns:
535483 The list of serialized events
536484 """
537 return await yieldable_gather_results(
538 self.serialize_event, events, time_now=time_now, **kwargs
539 )
485 return [
486 self.serialize_event(event, time_now=time_now, **kwargs) for event in events
487 ]
540488
541489
542490 def copy_power_levels_contents(
229229 # origin, etc etc)
230230 assert_params_in_dict(pdu_json, ("type", "depth"))
231231
232 # Strip any unauthorized values from "unsigned" if they exist
233 if "unsigned" in pdu_json:
234 _strip_unsigned_values(pdu_json)
235
232236 depth = pdu_json["depth"]
233237 if not isinstance(depth, int):
234238 raise SynapseError(400, "Depth %r not an intger" % (depth,), Codes.BAD_JSON)
244248
245249 event = make_event_from_dict(pdu_json, room_version)
246250 return event
251
252
253 def _strip_unsigned_values(pdu_dict: JsonDict) -> None:
254 """
255 Strip any unsigned values unless specifically allowed, as defined by the whitelist.
256
257 pdu: the json dict to strip values from. Note that the dict is mutated by this
258 function
259 """
260 unsigned = pdu_dict["unsigned"]
261
262 if not isinstance(unsigned, dict):
263 pdu_dict["unsigned"] = {}
264
265 if pdu_dict["type"] == "m.room.member":
266 whitelist = ["knock_room_state", "invite_room_state", "age"]
267 else:
268 whitelist = ["age"]
269
270 filtered_unsigned = {k: v for k, v in unsigned.items() if k in whitelist}
271 pdu_dict["unsigned"] = filtered_unsigned
5555 from synapse.events import EventBase, builder
5656 from synapse.federation.federation_base import FederationBase, event_from_pdu_json
5757 from synapse.federation.transport.client import SendJoinResponse
58 from synapse.logging.utils import log_function
5958 from synapse.types import JsonDict, get_domain_from_id
6059 from synapse.util.async_helpers import concurrently_execute
6160 from synapse.util.caches.expiringcache import ExpiringCache
118117 # It is a map of (room ID, suggested-only) -> the response of
119118 # get_room_hierarchy.
120119 self._get_room_hierarchy_cache: ExpiringCache[
121 Tuple[str, bool], Tuple[JsonDict, Sequence[JsonDict], Sequence[str]]
120 Tuple[str, bool],
121 Tuple[JsonDict, Sequence[JsonDict], Sequence[JsonDict], Sequence[str]],
122122 ] = ExpiringCache(
123123 cache_name="get_room_hierarchy_cache",
124124 clock=self._clock,
143143 if destination_dict:
144144 self.pdu_destination_tried[event_id] = destination_dict
145145
146 @log_function
147146 async def make_query(
148147 self,
149148 destination: str,
177176 ignore_backoff=ignore_backoff,
178177 )
179178
180 @log_function
181179 async def query_client_keys(
182180 self, destination: str, content: JsonDict, timeout: int
183181 ) -> JsonDict:
195193 destination, content, timeout
196194 )
197195
198 @log_function
199196 async def query_user_devices(
200197 self, destination: str, user_id: str, timeout: int = 30000
201198 ) -> JsonDict:
207204 destination, user_id, timeout
208205 )
209206
210 @log_function
211207 async def claim_client_keys(
212208 self, destination: str, content: JsonDict, timeout: int
213209 ) -> JsonDict:
13371333 destinations: Iterable[str],
13381334 room_id: str,
13391335 suggested_only: bool,
1340 ) -> Tuple[JsonDict, Sequence[JsonDict], Sequence[str]]:
1336 ) -> Tuple[JsonDict, Sequence[JsonDict], Sequence[JsonDict], Sequence[str]]:
13411337 """
13421338 Call other servers to get a hierarchy of the given room.
13431339
13521348
13531349 Returns:
13541350 A tuple of:
1355 The room as a JSON dictionary.
1351 The room as a JSON dictionary, without a "children_state" key.
1352 A list of `m.space.child` state events.
13561353 A list of children rooms, as JSON dictionaries.
13571354 A list of inaccessible children room IDs.
13581355
13671364
13681365 async def send_request(
13691366 destination: str,
1370 ) -> Tuple[JsonDict, Sequence[JsonDict], Sequence[str]]:
1367 ) -> Tuple[JsonDict, Sequence[JsonDict], Sequence[JsonDict], Sequence[str]]:
13711368 try:
13721369 res = await self.transport_layer.get_room_hierarchy(
13731370 destination=destination,
13961393 raise InvalidResponseError("'room' must be a dict")
13971394
13981395 # Validate children_state of the room.
1399 children_state = room.get("children_state", [])
1396 children_state = room.pop("children_state", [])
14001397 if not isinstance(children_state, Sequence):
14011398 raise InvalidResponseError("'room.children_state' must be a list")
14021399 if any(not isinstance(e, dict) for e in children_state):
14251422 "Invalid room ID in 'inaccessible_children' list"
14261423 )
14271424
1428 return room, children, inaccessible_children
1425 return room, children_state, children, inaccessible_children
14291426
14301427 try:
14311428 result = await self._try_destination_list(
14731470 if event.room_id == room_id:
14741471 children_events.append(event.data)
14751472 children_room_ids.add(event.state_key)
1476 # And add them under the requested room.
1477 requested_room["children_state"] = children_events
14781473
14791474 # Find the children rooms.
14801475 children = []
14841479
14851480 # It isn't clear from the response whether some of the rooms are
14861481 # not accessible.
1487 result = (requested_room, children, ())
1482 result = (requested_room, children_events, children, ())
14881483
14891484 # Cache the result to avoid fetching data over federation every time.
14901485 self._get_room_hierarchy_cache[(room_id, suggested_only)] = result
5757 run_in_background,
5858 )
5959 from synapse.logging.opentracing import log_kv, start_active_span_from_edu, trace
60 from synapse.logging.utils import log_function
6160 from synapse.metrics.background_process_metrics import wrap_as_background_process
6261 from synapse.replication.http.federation import (
6362 ReplicationFederationSendEduRestServlet,
858857 res = {"auth_chain": [a.get_pdu_json(time_now) for a in auth_pdus]}
859858 return 200, res
860859
861 @log_function
862860 async def on_query_client_keys(
863861 self, origin: str, content: Dict[str, str]
864862 ) -> Tuple[int, Dict[str, Any]]:
939937
940938 return {"events": [ev.get_pdu_json(time_now) for ev in missing_events]}
941939
942 @log_function
943940 async def on_openid_userinfo(self, token: str) -> Optional[str]:
944941 ts_now_ms = self._clock.time_msec()
945942 return await self.store.get_user_id_for_open_id_token(token, ts_now_ms)
2222 from typing import Optional, Tuple
2323
2424 from synapse.federation.units import Transaction
25 from synapse.logging.utils import log_function
2625 from synapse.storage.databases.main import DataStore
2726 from synapse.types import JsonDict
2827
3534 def __init__(self, datastore: DataStore):
3635 self.store = datastore
3736
38 @log_function
3937 async def have_responded(
4038 self, origin: str, transaction: Transaction
4139 ) -> Optional[Tuple[int, JsonDict]]:
5250
5351 return await self.store.get_received_txn_response(transaction_id, origin)
5452
55 @log_function
5653 async def set_response(
5754 self, origin: str, transaction: Transaction, code: int, response: JsonDict
5855 ) -> None:
606606 self._pending_pdus = []
607607
608608
609 @attr.s(slots=True)
609 @attr.s(slots=True, auto_attribs=True)
610610 class _TransactionQueueManager:
611611 """A helper async context manager for pulling stuff off the queues and
612612 tracking what was last successfully sent, etc.
613613 """
614614
615 queue = attr.ib(type=PerDestinationQueue)
616
617 _device_stream_id = attr.ib(type=Optional[int], default=None)
618 _device_list_id = attr.ib(type=Optional[int], default=None)
619 _last_stream_ordering = attr.ib(type=Optional[int], default=None)
620 _pdus = attr.ib(type=List[EventBase], factory=list)
615 queue: PerDestinationQueue
616
617 _device_stream_id: Optional[int] = None
618 _device_list_id: Optional[int] = None
619 _last_stream_ordering: Optional[int] = None
620 _pdus: List[EventBase] = attr.Factory(list)
621621
622622 async def __aenter__(self) -> Tuple[List[EventBase], List[Edu]]:
623623 # First we calculate the EDUs we want to send, if any.
3434 import synapse.server
3535
3636 logger = logging.getLogger(__name__)
37 issue_8631_logger = logging.getLogger("synapse.8631_debug")
3738
3839 last_pdu_ts_metric = Gauge(
3940 "synapse_federation_last_sent_pdu_time",
123124 len(pdus),
124125 len(edus),
125126 )
127 if issue_8631_logger.isEnabledFor(logging.DEBUG):
128 DEVICE_UPDATE_EDUS = {"m.device_list_update", "m.signing_key_update"}
129 device_list_updates = [
130 edu.content for edu in edus if edu.edu_type in DEVICE_UPDATE_EDUS
131 ]
132 if device_list_updates:
133 issue_8631_logger.debug(
134 "about to send txn [%s] including device list updates: %s",
135 transaction.transaction_id,
136 device_list_updates,
137 )
126138
127139 # Actually send the transaction
128140
4343 from synapse.events import EventBase, make_event_from_dict
4444 from synapse.federation.units import Transaction
4545 from synapse.http.matrixfederationclient import ByteParser
46 from synapse.logging.utils import log_function
4746 from synapse.types import JsonDict
4847
4948 logger = logging.getLogger(__name__)
6160 self.server_name = hs.hostname
6261 self.client = hs.get_federation_http_client()
6362
64 @log_function
6563 async def get_room_state_ids(
6664 self, destination: str, room_id: str, event_id: str
6765 ) -> JsonDict:
8785 try_trailing_slash_on_400=True,
8886 )
8987
90 @log_function
9188 async def get_event(
9289 self, destination: str, event_id: str, timeout: Optional[int] = None
9390 ) -> JsonDict:
110107 destination, path=path, timeout=timeout, try_trailing_slash_on_400=True
111108 )
112109
113 @log_function
114110 async def backfill(
115111 self, destination: str, room_id: str, event_tuples: Collection[str], limit: int
116112 ) -> Optional[JsonDict]:
148144 destination, path=path, args=args, try_trailing_slash_on_400=True
149145 )
150146
151 @log_function
152147 async def timestamp_to_event(
153148 self, destination: str, room_id: str, timestamp: int, direction: str
154149 ) -> Union[JsonDict, List]:
184179
185180 return remote_response
186181
187 @log_function
188182 async def send_transaction(
189183 self,
190184 transaction: Transaction,
233227 try_trailing_slash_on_400=True,
234228 )
235229
236 @log_function
237230 async def make_query(
238231 self,
239232 destination: str,
253246 ignore_backoff=ignore_backoff,
254247 )
255248
256 @log_function
257249 async def make_membership_event(
258250 self,
259251 destination: str,
316308 ignore_backoff=ignore_backoff,
317309 )
318310
319 @log_function
320311 async def send_join_v1(
321312 self,
322313 room_version: RoomVersion,
335326 max_response_size=MAX_RESPONSE_SIZE_SEND_JOIN,
336327 )
337328
338 @log_function
339329 async def send_join_v2(
340330 self,
341331 room_version: RoomVersion,
354344 max_response_size=MAX_RESPONSE_SIZE_SEND_JOIN,
355345 )
356346
357 @log_function
358347 async def send_leave_v1(
359348 self, destination: str, room_id: str, event_id: str, content: JsonDict
360349 ) -> Tuple[int, JsonDict]:
371360 ignore_backoff=True,
372361 )
373362
374 @log_function
375363 async def send_leave_v2(
376364 self, destination: str, room_id: str, event_id: str, content: JsonDict
377365 ) -> JsonDict:
388376 ignore_backoff=True,
389377 )
390378
391 @log_function
392379 async def send_knock_v1(
393380 self,
394381 destination: str,
422409 destination=destination, path=path, data=content
423410 )
424411
425 @log_function
426412 async def send_invite_v1(
427413 self, destination: str, room_id: str, event_id: str, content: JsonDict
428414 ) -> Tuple[int, JsonDict]:
432418 destination=destination, path=path, data=content, ignore_backoff=True
433419 )
434420
435 @log_function
436421 async def send_invite_v2(
437422 self, destination: str, room_id: str, event_id: str, content: JsonDict
438423 ) -> JsonDict:
442427 destination=destination, path=path, data=content, ignore_backoff=True
443428 )
444429
445 @log_function
446430 async def get_public_rooms(
447431 self,
448432 remote_server: str,
515499
516500 return response
517501
518 @log_function
519502 async def exchange_third_party_invite(
520503 self, destination: str, room_id: str, event_dict: JsonDict
521504 ) -> JsonDict:
525508 destination=destination, path=path, data=event_dict
526509 )
527510
528 @log_function
529511 async def get_event_auth(
530512 self, destination: str, room_id: str, event_id: str
531513 ) -> JsonDict:
533515
534516 return await self.client.get_json(destination=destination, path=path)
535517
536 @log_function
537518 async def query_client_keys(
538519 self, destination: str, query_content: JsonDict, timeout: int
539520 ) -> JsonDict:
575556 destination=destination, path=path, data=query_content, timeout=timeout
576557 )
577558
578 @log_function
579559 async def query_user_devices(
580560 self, destination: str, user_id: str, timeout: int
581561 ) -> JsonDict:
615595 destination=destination, path=path, timeout=timeout
616596 )
617597
618 @log_function
619598 async def claim_client_keys(
620599 self, destination: str, query_content: JsonDict, timeout: int
621600 ) -> JsonDict:
654633 destination=destination, path=path, data=query_content, timeout=timeout
655634 )
656635
657 @log_function
658636 async def get_missing_events(
659637 self,
660638 destination: str,
679657 timeout=timeout,
680658 )
681659
682 @log_function
683660 async def get_group_profile(
684661 self, destination: str, group_id: str, requester_user_id: str
685662 ) -> JsonDict:
693670 ignore_backoff=True,
694671 )
695672
696 @log_function
697673 async def update_group_profile(
698674 self, destination: str, group_id: str, requester_user_id: str, content: JsonDict
699675 ) -> JsonDict:
715691 ignore_backoff=True,
716692 )
717693
718 @log_function
719694 async def get_group_summary(
720695 self, destination: str, group_id: str, requester_user_id: str
721696 ) -> JsonDict:
729704 ignore_backoff=True,
730705 )
731706
732 @log_function
733707 async def get_rooms_in_group(
734708 self, destination: str, group_id: str, requester_user_id: str
735709 ) -> JsonDict:
797771 ignore_backoff=True,
798772 )
799773
800 @log_function
801774 async def get_users_in_group(
802775 self, destination: str, group_id: str, requester_user_id: str
803776 ) -> JsonDict:
811784 ignore_backoff=True,
812785 )
813786
814 @log_function
815787 async def get_invited_users_in_group(
816788 self, destination: str, group_id: str, requester_user_id: str
817789 ) -> JsonDict:
825797 ignore_backoff=True,
826798 )
827799
828 @log_function
829800 async def accept_group_invite(
830801 self, destination: str, group_id: str, user_id: str, content: JsonDict
831802 ) -> JsonDict:
836807 destination=destination, path=path, data=content, ignore_backoff=True
837808 )
838809
839 @log_function
840810 def join_group(
841811 self, destination: str, group_id: str, user_id: str, content: JsonDict
842812 ) -> Awaitable[JsonDict]:
847817 destination=destination, path=path, data=content, ignore_backoff=True
848818 )
849819
850 @log_function
851820 async def invite_to_group(
852821 self,
853822 destination: str,
867836 ignore_backoff=True,
868837 )
869838
870 @log_function
871839 async def invite_to_group_notification(
872840 self, destination: str, group_id: str, user_id: str, content: JsonDict
873841 ) -> JsonDict:
881849 destination=destination, path=path, data=content, ignore_backoff=True
882850 )
883851
884 @log_function
885852 async def remove_user_from_group(
886853 self,
887854 destination: str,
901868 ignore_backoff=True,
902869 )
903870
904 @log_function
905871 async def remove_user_from_group_notification(
906872 self, destination: str, group_id: str, user_id: str, content: JsonDict
907873 ) -> JsonDict:
915881 destination=destination, path=path, data=content, ignore_backoff=True
916882 )
917883
918 @log_function
919884 async def renew_group_attestation(
920885 self, destination: str, group_id: str, user_id: str, content: JsonDict
921886 ) -> JsonDict:
929894 destination=destination, path=path, data=content, ignore_backoff=True
930895 )
931896
932 @log_function
933897 async def update_group_summary_room(
934898 self,
935899 destination: str,
958922 ignore_backoff=True,
959923 )
960924
961 @log_function
962925 async def delete_group_summary_room(
963926 self,
964927 destination: str,
985948 ignore_backoff=True,
986949 )
987950
988 @log_function
989951 async def get_group_categories(
990952 self, destination: str, group_id: str, requester_user_id: str
991953 ) -> JsonDict:
999961 ignore_backoff=True,
1000962 )
1001963
1002 @log_function
1003964 async def get_group_category(
1004965 self, destination: str, group_id: str, requester_user_id: str, category_id: str
1005966 ) -> JsonDict:
1013974 ignore_backoff=True,
1014975 )
1015976
1016 @log_function
1017977 async def update_group_category(
1018978 self,
1019979 destination: str,
1033993 ignore_backoff=True,
1034994 )
1035995
1036 @log_function
1037996 async def delete_group_category(
1038997 self, destination: str, group_id: str, requester_user_id: str, category_id: str
1039998 ) -> JsonDict:
10471006 ignore_backoff=True,
10481007 )
10491008
1050 @log_function
10511009 async def get_group_roles(
10521010 self, destination: str, group_id: str, requester_user_id: str
10531011 ) -> JsonDict:
10611019 ignore_backoff=True,
10621020 )
10631021
1064 @log_function
10651022 async def get_group_role(
10661023 self, destination: str, group_id: str, requester_user_id: str, role_id: str
10671024 ) -> JsonDict:
10751032 ignore_backoff=True,
10761033 )
10771034
1078 @log_function
10791035 async def update_group_role(
10801036 self,
10811037 destination: str,
10951051 ignore_backoff=True,
10961052 )
10971053
1098 @log_function
10991054 async def delete_group_role(
11001055 self, destination: str, group_id: str, requester_user_id: str, role_id: str
11011056 ) -> JsonDict:
11091064 ignore_backoff=True,
11101065 )
11111066
1112 @log_function
11131067 async def update_group_summary_user(
11141068 self,
11151069 destination: str,
11351089 ignore_backoff=True,
11361090 )
11371091
1138 @log_function
11391092 async def set_group_join_policy(
11401093 self, destination: str, group_id: str, requester_user_id: str, content: JsonDict
11411094 ) -> JsonDict:
11501103 ignore_backoff=True,
11511104 )
11521105
1153 @log_function
11541106 async def delete_group_summary_user(
11551107 self,
11561108 destination: str,
3535 from synapse.util.versionstring import get_version_string
3636
3737 logger = logging.getLogger(__name__)
38 issue_8631_logger = logging.getLogger("synapse.8631_debug")
3839
3940
4041 class BaseFederationServerServlet(BaseFederationServlet):
9394 len(transaction_data.get("pdus", [])),
9495 len(transaction_data.get("edus", [])),
9596 )
97
98 if issue_8631_logger.isEnabledFor(logging.DEBUG):
99 DEVICE_UPDATE_EDUS = {"m.device_list_update", "m.signing_key_update"}
100 device_list_updates = [
101 edu.content
102 for edu in transaction_data.get("edus", [])
103 if edu.edu_type in DEVICE_UPDATE_EDUS
104 ]
105 if device_list_updates:
106 issue_8631_logger.debug(
107 "received transaction [%s] including device list updates: %s",
108 transaction_id,
109 device_list_updates,
110 )
96111
97112 except Exception as e:
98113 logger.exception(e)
7676 async def add_account_data_for_user(
7777 self, user_id: str, account_data_type: str, content: JsonDict
7878 ) -> int:
79 """Add some account_data to a room for a user.
79 """Add some global account_data for a user.
8080
8181 Args:
8282 user_id: The user to add a tag for.
5454
5555 async def get_user(self, user: UserID) -> Optional[JsonDict]:
5656 """Function to get user details"""
57 ret = await self.store.get_user_by_id(user.to_string())
58 if ret:
59 profile = await self.store.get_profileinfo(user.localpart)
60 threepids = await self.store.user_get_threepids(user.to_string())
61 external_ids = [
62 ({"auth_provider": auth_provider, "external_id": external_id})
63 for auth_provider, external_id in await self.store.get_external_ids_by_user(
64 user.to_string()
65 )
66 ]
67 ret["displayname"] = profile.display_name
68 ret["avatar_url"] = profile.avatar_url
69 ret["threepids"] = threepids
70 ret["external_ids"] = external_ids
71 return ret
57 user_info_dict = await self.store.get_user_by_id(user.to_string())
58 if user_info_dict is None:
59 return None
60
61 # Restrict returned information to a known set of fields. This prevents additional
62 # fields added to get_user_by_id from modifying Synapse's external API surface.
63 user_info_to_return = {
64 "name",
65 "admin",
66 "deactivated",
67 "shadow_banned",
68 "creation_ts",
69 "appservice_id",
70 "consent_server_notice_sent",
71 "consent_version",
72 "user_type",
73 "is_guest",
74 }
75
76 # Restrict returned keys to a known set.
77 user_info_dict = {
78 key: value
79 for key, value in user_info_dict.items()
80 if key in user_info_to_return
81 }
82
83 # Add additional user metadata
84 profile = await self.store.get_profileinfo(user.localpart)
85 threepids = await self.store.user_get_threepids(user.to_string())
86 external_ids = [
87 ({"auth_provider": auth_provider, "external_id": external_id})
88 for auth_provider, external_id in await self.store.get_external_ids_by_user(
89 user.to_string()
90 )
91 ]
92 user_info_dict["displayname"] = profile.display_name
93 user_info_dict["avatar_url"] = profile.avatar_url
94 user_info_dict["threepids"] = threepids
95 user_info_dict["external_ids"] = external_ids
96
97 return user_info_dict
7298
7399 async def export_user_data(self, user_id: str, writer: "ExfiltrationWriter") -> Any:
74100 """Write all data we have on the user to the given writer.
167167 }
168168
169169
170 @attr.s(slots=True)
170 @attr.s(slots=True, auto_attribs=True)
171171 class SsoLoginExtraAttributes:
172172 """Data we track about SAML2 sessions"""
173173
174174 # time the session was created, in milliseconds
175 creation_time = attr.ib(type=int)
176 extra_attributes = attr.ib(type=JsonDict)
177
178
179 @attr.s(slots=True, frozen=True)
175 creation_time: int
176 extra_attributes: JsonDict
177
178
179 @attr.s(slots=True, frozen=True, auto_attribs=True)
180180 class LoginTokenAttributes:
181181 """Data we store in a short-term login token"""
182182
183 user_id = attr.ib(type=str)
184
185 auth_provider_id = attr.ib(type=str)
183 user_id: str
184
185 auth_provider_id: str
186186 """The SSO Identity Provider that the user authenticated with, to get this token."""
187187
188 auth_provider_session_id = attr.ib(type=Optional[str])
188 auth_provider_session_id: Optional[str]
189189 """The session ID advertised by the SSO Identity Provider."""
190190
191191
22802280 # call all of the on_logged_out callbacks
22812281 for callback in self.on_logged_out_callbacks:
22822282 try:
2283 callback(user_id, device_id, access_token)
2283 await callback(user_id, device_id, access_token)
22842284 except Exception as e:
22852285 logger.warning("Failed to run module API callback %s: %s", callback, e)
22862286 continue
947947 devices = []
948948 ignore_devices = True
949949 else:
950 prev_stream_id = await self.store.get_device_list_last_stream_id_for_remote(
951 user_id
952 )
950953 cached_devices = await self.store.get_cached_devices_for_user(user_id)
951 if cached_devices == {d["device_id"]: d for d in devices}:
954
955 # To ensure that a user with no devices is cached, we skip the resync only
956 # if we have a stream_id from previously writing a cache entry.
957 if prev_stream_id is not None and cached_devices == {
958 d["device_id"]: d for d in devices
959 }:
952960 logging.info(
953961 "Skipping device list resync for %s, as our cache matches already",
954962 user_id,
13201320 return old_key == new_key_copy
13211321
13221322
1323 @attr.s(slots=True)
1323 @attr.s(slots=True, auto_attribs=True)
13241324 class SignatureListItem:
13251325 """An item in the signature list as used by upload_signatures_for_device_keys."""
13261326
1327 signing_key_id = attr.ib(type=str)
1328 target_user_id = attr.ib(type=str)
1329 target_device_id = attr.ib(type=str)
1330 signature = attr.ib(type=JsonDict)
1327 signing_key_id: str
1328 target_user_id: str
1329 target_device_id: str
1330 signature: JsonDict
13311331
13321332
13331333 class SigningKeyEduUpdater:
1919 from synapse.api.errors import AuthError, SynapseError
2020 from synapse.events import EventBase
2121 from synapse.handlers.presence import format_user_presence_state
22 from synapse.logging.utils import log_function
2322 from synapse.streams.config import PaginationConfig
2423 from synapse.types import JsonDict, UserID
2524 from synapse.visibility import filter_events_for_client
4241 self._server_notices_sender = hs.get_server_notices_sender()
4342 self._event_serializer = hs.get_event_client_serializer()
4443
45 @log_function
4644 async def get_stream(
4745 self,
4846 auth_user_id: str,
118116
119117 events.extend(to_add)
120118
121 chunks = await self._event_serializer.serialize_events(
119 chunks = self._event_serializer.serialize_events(
122120 events,
123121 time_now,
124122 as_client_event=as_client_event,
5050 preserve_fn,
5151 run_in_background,
5252 )
53 from synapse.logging.utils import log_function
5453 from synapse.replication.http.federation import (
5554 ReplicationCleanRoomRestServlet,
5655 ReplicationStoreRoomOnOutlierMembershipRestServlet,
555554
556555 run_in_background(self._handle_queued_pdus, room_queue)
557556
558 @log_function
559557 async def do_knock(
560558 self,
561559 target_hosts: List[str],
927925
928926 return event
929927
930 @log_function
931928 async def on_make_knock_request(
932929 self, origin: str, room_id: str, user_id: str
933930 ) -> EventBase:
10381035 else:
10391036 return []
10401037
1041 @log_function
10421038 async def on_backfill_request(
10431039 self, origin: str, room_id: str, pdu_list: List[str], limit: int
10441040 ) -> List[EventBase]:
10551051
10561052 return events
10571053
1058 @log_function
10591054 async def get_persisted_pdu(
10601055 self, origin: str, event_id: str
10611056 ) -> Optional[EventBase]:
11171112
11181113 return missing_events
11191114
1120 @log_function
11211115 async def exchange_third_party_invite(
11221116 self, sender_user_id: str, target_user_id: str, room_id: str, signed: JsonDict
11231117 ) -> None:
5555 from synapse.events.snapshot import EventContext
5656 from synapse.federation.federation_client import InvalidResponseError
5757 from synapse.logging.context import nested_logging_context, run_in_background
58 from synapse.logging.utils import log_function
5958 from synapse.metrics.background_process_metrics import run_as_background_process
6059 from synapse.replication.http.devices import ReplicationUserDevicesResyncRestServlet
6160 from synapse.replication.http.federation import (
274273
275274 await self._process_received_pdu(origin, pdu, state=None)
276275
277 @log_function
278276 async def on_send_membership_event(
279277 self, origin: str, event: EventBase
280278 ) -> Tuple[EventBase, EventContext]:
471469
472470 return await self.persist_events_and_notify(room_id, [(event, context)])
473471
474 @log_function
475472 async def backfill(
476473 self, dest: str, room_id: str, limit: int, extremities: Collection[str]
477474 ) -> None:
169169 d["inviter"] = event.sender
170170
171171 invite_event = await self.store.get_event(event.event_id)
172 d["invite"] = await self._event_serializer.serialize_event(
172 d["invite"] = self._event_serializer.serialize_event(
173173 invite_event,
174174 time_now,
175175 as_client_event=as_client_event,
221221
222222 d["messages"] = {
223223 "chunk": (
224 await self._event_serializer.serialize_events(
224 self._event_serializer.serialize_events(
225225 messages,
226226 time_now=time_now,
227227 as_client_event=as_client_event,
231231 "end": await end_token.to_string(self.store),
232232 }
233233
234 d["state"] = await self._event_serializer.serialize_events(
234 d["state"] = self._event_serializer.serialize_events(
235235 current_state.values(),
236236 time_now=time_now,
237237 as_client_event=as_client_event,
375375 "messages": {
376376 "chunk": (
377377 # Don't bundle aggregations as this is a deprecated API.
378 await self._event_serializer.serialize_events(messages, time_now)
378 self._event_serializer.serialize_events(messages, time_now)
379379 ),
380380 "start": await start_token.to_string(self.store),
381381 "end": await end_token.to_string(self.store),
382382 },
383383 "state": (
384384 # Don't bundle aggregations as this is a deprecated API.
385 await self._event_serializer.serialize_events(
386 room_state.values(), time_now
387 )
385 self._event_serializer.serialize_events(room_state.values(), time_now)
388386 ),
389387 "presence": [],
390388 "receipts": [],
403401 # TODO: These concurrently
404402 time_now = self.clock.time_msec()
405403 # Don't bundle aggregations as this is a deprecated API.
406 state = await self._event_serializer.serialize_events(
404 state = self._event_serializer.serialize_events(
407405 current_state.values(), time_now
408406 )
409407
479477 "messages": {
480478 "chunk": (
481479 # Don't bundle aggregations as this is a deprecated API.
482 await self._event_serializer.serialize_events(messages, time_now)
480 self._event_serializer.serialize_events(messages, time_now)
483481 ),
484482 "start": await start_token.to_string(self.store),
485483 "end": await end_token.to_string(self.store),
245245 room_state = room_state_events[membership_event_id]
246246
247247 now = self.clock.time_msec()
248 events = await self._event_serializer.serialize_events(room_state.values(), now)
248 events = self._event_serializer.serialize_events(room_state.values(), now)
249249 return events
250250
251251 async def get_joined_members(self, requester: Requester, room_id: str) -> dict:
536536 state_dict = await self.store.get_events(list(state_ids.values()))
537537 state = state_dict.values()
538538
539 aggregations = await self.store.get_bundled_aggregations(events, user_id)
540
539541 time_now = self.clock.time_msec()
540542
541543 chunk = {
542544 "chunk": (
543 await self._event_serializer.serialize_events(
545 self._event_serializer.serialize_events(
544546 events,
545547 time_now,
546 bundle_aggregations=True,
548 bundle_aggregations=aggregations,
547549 as_client_event=as_client_event,
548550 )
549551 ),
552554 }
553555
554556 if state:
555 chunk["state"] = await self._event_serializer.serialize_events(
557 chunk["state"] = self._event_serializer.serialize_events(
556558 state, time_now, as_client_event=as_client_event
557559 )
558560
5454 from synapse.appservice import ApplicationService
5555 from synapse.events.presence_router import PresenceRouter
5656 from synapse.logging.context import run_in_background
57 from synapse.logging.utils import log_function
5857 from synapse.metrics import LaterGauge
5958 from synapse.metrics.background_process_metrics import run_as_background_process
6059 from synapse.replication.http.presence import (
15411540 self.clock = hs.get_clock()
15421541 self.store = hs.get_datastore()
15431542
1544 @log_function
15451543 async def get_new_events(
15461544 self,
15471545 user: UserID,
392392 user_id = requester.user.to_string()
393393
394394 if not await self.spam_checker.user_may_create_room(user_id):
395 raise SynapseError(403, "You are not permitted to create rooms")
395 raise SynapseError(
396 403, "You are not permitted to create rooms", Codes.FORBIDDEN
397 )
396398
397399 creation_content: JsonDict = {
398400 "room_version": new_room_version.identifier,
684686 invite_3pid_list,
685687 )
686688 ):
687 raise SynapseError(403, "You are not permitted to create rooms")
689 raise SynapseError(
690 403, "You are not permitted to create rooms", Codes.FORBIDDEN
691 )
688692
689693 if ratelimit:
690694 await self.request_ratelimiter.ratelimit(requester)
11751179 # there's something there but not see the content, so use the event that's in
11761180 # `filtered` rather than the event we retrieved from the datastore.
11771181 results["event"] = filtered[0]
1182
1183 # Fetch the aggregations.
1184 aggregations = await self.store.get_bundled_aggregations(
1185 [results["event"]], user.to_string()
1186 )
1187 aggregations.update(
1188 await self.store.get_bundled_aggregations(
1189 results["events_before"], user.to_string()
1190 )
1191 )
1192 aggregations.update(
1193 await self.store.get_bundled_aggregations(
1194 results["events_after"], user.to_string()
1195 )
1196 )
1197 results["aggregations"] = aggregations
11781198
11791199 if results["events_after"]:
11801200 last_event_id = results["events_after"][-1].event_id
152152 rooms_result: List[JsonDict] = []
153153 events_result: List[JsonDict] = []
154154
155 if max_rooms_per_space is None or max_rooms_per_space > MAX_ROOMS_PER_SPACE:
156 max_rooms_per_space = MAX_ROOMS_PER_SPACE
157
155158 while room_queue and len(rooms_result) < MAX_ROOMS:
156159 queue_entry = room_queue.popleft()
157160 room_id = queue_entry.room_id
166169 # The client-specified max_rooms_per_space limit doesn't apply to the
167170 # room_id specified in the request, so we ignore it if this is the
168171 # first room we are processing.
169 max_children = max_rooms_per_space if processed_rooms else None
172 max_children = max_rooms_per_space if processed_rooms else MAX_ROOMS
170173
171174 if is_in_room:
172175 room_entry = await self._summarize_local_room(
208211 # Before returning to the client, remove the allowed_room_ids
209212 # and allowed_spaces keys.
210213 room.pop("allowed_room_ids", None)
211 room.pop("allowed_spaces", None)
214 room.pop("allowed_spaces", None) # historical
212215
213216 rooms_result.append(room)
214217 events.extend(room_entry.children_state_events)
394397 None,
395398 room_id,
396399 suggested_only,
397 # TODO Handle max children.
400 # Do not limit the maximum children.
398401 max_children=None,
399402 )
400403
524527 rooms_result: List[JsonDict] = []
525528 events_result: List[JsonDict] = []
526529
530 # Set a limit on the number of rooms to return.
531 if max_rooms_per_space is None or max_rooms_per_space > MAX_ROOMS_PER_SPACE:
532 max_rooms_per_space = MAX_ROOMS_PER_SPACE
533
527534 while room_queue and len(rooms_result) < MAX_ROOMS:
528535 room_id = room_queue.popleft()
529536 if room_id in processed_rooms:
582589
583590 # Iterate through each child and potentially add it, but not its children,
584591 # to the response.
585 for child_room in root_room_entry.children_state_events:
592 for child_room in itertools.islice(
593 root_room_entry.children_state_events, MAX_ROOMS_PER_SPACE
594 ):
586595 room_id = child_room.get("state_key")
587596 assert isinstance(room_id, str)
588597 # If the room is unknown, skip it.
632641 suggested_only: True if only suggested children should be returned.
633642 Otherwise, all children are returned.
634643 max_children:
635 The maximum number of children rooms to include. This is capped
636 to a server-set limit.
644 The maximum number of children rooms to include. A value of None
645 means no limit.
637646
638647 Returns:
639648 A room entry if the room should be returned. None, otherwise.
655664 # we only care about suggested children
656665 child_events = filter(_is_suggested_child_event, child_events)
657666
658 if max_children is None or max_children > MAX_ROOMS_PER_SPACE:
659 max_children = MAX_ROOMS_PER_SPACE
667 # TODO max_children is legacy code for the /spaces endpoint.
668 if max_children is not None:
669 child_iter: Iterable[EventBase] = itertools.islice(
670 child_events, max_children
671 )
672 else:
673 child_iter = child_events
660674
661675 stripped_events: List[JsonDict] = [
662676 {
667681 "sender": e.sender,
668682 "origin_server_ts": e.origin_server_ts,
669683 }
670 for e in itertools.islice(child_events, max_children)
684 for e in child_iter
671685 ]
672686 return _RoomEntry(room_id, room_entry, stripped_events)
673687
765779 try:
766780 (
767781 room_response,
782 children_state_events,
768783 children,
769784 inaccessible_children,
770785 ) = await self._federation_client.get_room_hierarchy(
789804 }
790805
791806 return (
792 _RoomEntry(room_id, room_response, room_response.pop("children_state", ())),
807 _RoomEntry(room_id, room_response, children_state_events),
793808 children_by_room_id,
794809 set(inaccessible_children),
795810 )
9871002 "canonical_alias": stats["canonical_alias"],
9881003 "num_joined_members": stats["joined_members"],
9891004 "avatar_url": stats["avatar"],
1005 # plural join_rules is a documentation error but kept for historical
1006 # purposes. Should match /publicRooms.
9901007 "join_rules": stats["join_rules"],
1008 "join_rule": stats["join_rules"],
9911009 "world_readable": (
9921010 stats["history_visibility"] == HistoryVisibility.WORLD_READABLE
9931011 ),
9941012 "guest_can_join": stats["guest_access"] == "can_join",
995 "creation_ts": create_event.origin_server_ts,
9961013 "room_type": create_event.content.get(EventContentFields.ROOM_TYPE),
9971014 }
9981015
419419 time_now = self.clock.time_msec()
420420
421421 for context in contexts.values():
422 context["events_before"] = await self._event_serializer.serialize_events(
422 context["events_before"] = self._event_serializer.serialize_events(
423423 context["events_before"], time_now
424424 )
425 context["events_after"] = await self._event_serializer.serialize_events(
425 context["events_after"] = self._event_serializer.serialize_events(
426426 context["events_after"], time_now
427427 )
428428
440440 results.append(
441441 {
442442 "rank": rank_map[e.event_id],
443 "result": (
444 await self._event_serializer.serialize_event(e, time_now)
445 ),
443 "result": self._event_serializer.serialize_event(e, time_now),
446444 "context": contexts.get(e.event_id, {}),
447445 }
448446 )
456454 if state_results:
457455 s = {}
458456 for room_id, state_events in state_results.items():
459 s[room_id] = await self._event_serializer.serialize_events(
457 s[room_id] = self._event_serializer.serialize_events(
460458 state_events, time_now
461459 )
462460
125125 raise NotImplementedError()
126126
127127
128 @attr.s
128 @attr.s(auto_attribs=True)
129129 class UserAttributes:
130130 # the localpart of the mxid that the mapper has assigned to the user.
131131 # if `None`, the mapper has not picked a userid, and the user should be prompted to
132132 # enter one.
133 localpart = attr.ib(type=Optional[str])
134 display_name = attr.ib(type=Optional[str], default=None)
135 emails = attr.ib(type=Collection[str], default=attr.Factory(list))
136
137
138 @attr.s(slots=True)
133 localpart: Optional[str]
134 display_name: Optional[str] = None
135 emails: Collection[str] = attr.Factory(list)
136
137
138 @attr.s(slots=True, auto_attribs=True)
139139 class UsernameMappingSession:
140140 """Data we track about SSO sessions"""
141141
142142 # A unique identifier for this SSO provider, e.g. "oidc" or "saml".
143 auth_provider_id = attr.ib(type=str)
143 auth_provider_id: str
144144
145145 # user ID on the IdP server
146 remote_user_id = attr.ib(type=str)
146 remote_user_id: str
147147
148148 # attributes returned by the ID mapper
149 display_name = attr.ib(type=Optional[str])
150 emails = attr.ib(type=Collection[str])
149 display_name: Optional[str]
150 emails: Collection[str]
151151
152152 # An optional dictionary of extra attributes to be provided to the client in the
153153 # login response.
154 extra_login_attributes = attr.ib(type=Optional[JsonDict])
154 extra_login_attributes: Optional[JsonDict]
155155
156156 # where to redirect the client back to
157 client_redirect_url = attr.ib(type=str)
157 client_redirect_url: str
158158
159159 # expiry time for the session, in milliseconds
160 expiry_time_ms = attr.ib(type=int)
160 expiry_time_ms: int
161161
162162 # choices made by the user
163 chosen_localpart = attr.ib(type=Optional[str], default=None)
164 use_display_name = attr.ib(type=bool, default=True)
165 emails_to_use = attr.ib(type=Collection[str], default=())
166 terms_accepted_version = attr.ib(type=Optional[str], default=None)
163 chosen_localpart: Optional[str] = None
164 use_display_name: bool = True
165 emails_to_use: Collection[str] = ()
166 terms_accepted_version: Optional[str] = None
167167
168168
169169 # the HTTP cookie used to track the mapping session id
5959
6060 logger = logging.getLogger(__name__)
6161
62 # Debug logger for https://github.com/matrix-org/synapse/issues/4422
63 issue4422_logger = logging.getLogger("synapse.handler.sync.4422_debug")
64
65
6662 # Counts the number of times we returned a non-empty sync. `type` is one of
6763 # "initial_sync", "full_state_sync" or "incremental_sync", `lazy_loaded` is
6864 # "true" or "false" depending on if the request asked for lazy loaded members or
10197 prev_batch: StreamToken
10298 events: List[EventBase]
10399 limited: bool
100 # A mapping of event ID to the bundled aggregations for the above events.
101 # This is only calculated if limited is true.
102 bundled_aggregations: Optional[Dict[str, Dict[str, Any]]] = None
104103
105104 def __bool__(self) -> bool:
106105 """Make the result appear empty if there are no updates. This is used
633632
634633 prev_batch_token = now_token.copy_and_replace("room_key", room_key)
635634
635 # Don't bother to bundle aggregations if the timeline is unlimited,
636 # as clients will have all the necessary information.
637 bundled_aggregations = None
638 if limited or newly_joined_room:
639 bundled_aggregations = await self.store.get_bundled_aggregations(
640 recents, sync_config.user.to_string()
641 )
642
636643 return TimelineBatch(
637644 events=recents,
638645 prev_batch=prev_batch_token,
639646 limited=limited or newly_joined_room,
647 bundled_aggregations=bundled_aggregations,
640648 )
641649
642650 async def get_state_after_event(
11601168
11611169 num_events = 0
11621170
1163 # debug for https://github.com/matrix-org/synapse/issues/4422
1171 # debug for https://github.com/matrix-org/synapse/issues/9424
11641172 for joined_room in sync_result_builder.joined:
1165 room_id = joined_room.room_id
1166 if room_id in newly_joined_rooms:
1167 issue4422_logger.debug(
1168 "Sync result for newly joined room %s: %r", room_id, joined_room
1169 )
11701173 num_events += len(joined_room.timeline.events)
11711174
11721175 log_kv(
17371740 if old_mem_ev_id:
17381741 old_mem_ev = await self.store.get_event(
17391742 old_mem_ev_id, allow_none=True
1740 )
1741
1742 # debug for #4422
1743 if has_join:
1744 prev_membership = None
1745 if old_mem_ev:
1746 prev_membership = old_mem_ev.membership
1747 issue4422_logger.debug(
1748 "Previous membership for room %s with join: %s (event %s)",
1749 room_id,
1750 prev_membership,
1751 old_mem_ev_id,
17521743 )
17531744
17541745 if not old_mem_ev or old_mem_ev.membership != Membership.JOIN:
18921883 upto_token=since_token,
18931884 )
18941885
1895 if newly_joined:
1896 # debugging for https://github.com/matrix-org/synapse/issues/4422
1897 issue4422_logger.debug(
1898 "RoomSyncResultBuilder events for newly joined room %s: %r",
1899 room_id,
1900 entry.events,
1901 )
19021886 room_entries.append(entry)
19031887
19041888 return _RoomChanges(
20752059 # Note: `batch` can be both empty and limited here in the case where
20762060 # `_load_filtered_recents` can't find any events the user should see
20772061 # (e.g. due to having ignored the sender of the last 50 events).
2078
2079 if newly_joined:
2080 # debug for https://github.com/matrix-org/synapse/issues/4422
2081 issue4422_logger.debug(
2082 "Timeline events after filtering in newly-joined room %s: %r",
2083 room_id,
2084 batch,
2085 )
20862062
20872063 # When we join the room (or the client requests full_state), we should
20882064 # send down any existing tags. Usually the user won't have tags in a
3131 pass
3232
3333
34 @attr.s
34 @attr.s(auto_attribs=True)
3535 class ProxyCredentials:
36 username_password = attr.ib(type=bytes)
36 username_password: bytes
3737
3838 def as_proxy_authorization_value(self) -> bytes:
3939 """
122122 pass
123123
124124
125 @attr.s(slots=True, frozen=True)
125 @attr.s(slots=True, frozen=True, auto_attribs=True)
126126 class MatrixFederationRequest:
127 method = attr.ib(type=str)
127 method: str
128128 """HTTP method
129129 """
130130
131 path = attr.ib(type=str)
131 path: str
132132 """HTTP path
133133 """
134134
135 destination = attr.ib(type=str)
135 destination: str
136136 """The remote server to send the HTTP request to.
137137 """
138138
139 json = attr.ib(default=None, type=Optional[JsonDict])
139 json: Optional[JsonDict] = None
140140 """JSON to send in the body.
141141 """
142142
143 json_callback = attr.ib(default=None, type=Optional[Callable[[], JsonDict]])
143 json_callback: Optional[Callable[[], JsonDict]] = None
144144 """A callback to generate the JSON.
145145 """
146146
147 query = attr.ib(default=None, type=Optional[dict])
147 query: Optional[dict] = None
148148 """Query arguments.
149149 """
150150
151 txn_id = attr.ib(default=None, type=Optional[str])
151 txn_id: Optional[str] = None
152152 """Unique ID for this request (for logging)
153153 """
154154
155 uri = attr.ib(init=False, type=bytes)
155 uri: bytes = attr.ib(init=False)
156156 """The URI of this request
157157 """
158158
533533
534534
535535 @implementer(IAddress)
536 @attr.s(frozen=True, slots=True)
536 @attr.s(frozen=True, slots=True, auto_attribs=True)
537537 class _XForwardedForAddress:
538 host = attr.ib(type=str)
538 host: str
539539
540540
541541 class SynapseSite(Site):
3838 logger = logging.getLogger(__name__)
3939
4040
41 @attr.s
41 @attr.s(slots=True, auto_attribs=True)
4242 @implementer(IPushProducer)
4343 class LogProducer:
4444 """
5353
5454 # This is essentially ITCPTransport, but that is missing certain fields
5555 # (connected and registerProducer) which are part of the implementation.
56 transport = attr.ib(type=Connection)
57 _format = attr.ib(type=Callable[[logging.LogRecord], str])
58 _buffer = attr.ib(type=deque)
59 _paused = attr.ib(default=False, type=bool, init=False)
56 transport: Connection
57 _format: Callable[[logging.LogRecord], str]
58 _buffer: Deque[logging.LogRecord]
59 _paused: bool = attr.ib(default=False, init=False)
6060
6161 def pauseProducing(self):
6262 self._paused = True
192192 return res
193193
194194
195 @attr.s(slots=True)
195 @attr.s(slots=True, auto_attribs=True)
196196 class ContextRequest:
197197 """
198198 A bundle of attributes from the SynapseRequest object.
204204 their children.
205205 """
206206
207 request_id = attr.ib(type=str)
208 ip_address = attr.ib(type=str)
209 site_tag = attr.ib(type=str)
210 requester = attr.ib(type=Optional[str])
211 authenticated_entity = attr.ib(type=Optional[str])
212 method = attr.ib(type=str)
213 url = attr.ib(type=str)
214 protocol = attr.ib(type=str)
215 user_agent = attr.ib(type=str)
207 request_id: str
208 ip_address: str
209 site_tag: str
210 requester: Optional[str]
211 authenticated_entity: Optional[str]
212 method: str
213 url: str
214 protocol: str
215 user_agent: str
216216
217217
218218 LoggingContextOrSentinel = Union["LoggingContext", "_Sentinel"]
246246 class BaseReporter: # type: ignore[no-redef]
247247 pass
248248
249 @attr.s(slots=True, frozen=True)
249 @attr.s(slots=True, frozen=True, auto_attribs=True)
250250 class _WrappedRustReporter(BaseReporter):
251251 """Wrap the reporter to ensure `report_span` never throws."""
252252
253 _reporter = attr.ib(type=Reporter, default=attr.Factory(Reporter))
253 _reporter: Reporter = attr.Factory(Reporter)
254254
255255 def set_process(self, *args, **kwargs):
256256 return self._reporter.set_process(*args, **kwargs)
+0
-76
synapse/logging/utils.py less more
0 # Copyright 2014-2016 OpenMarket Ltd
1 #
2 # Licensed under the Apache License, Version 2.0 (the "License");
3 # you may not use this file except in compliance with the License.
4 # You may obtain a copy of the License at
5 #
6 # http://www.apache.org/licenses/LICENSE-2.0
7 #
8 # Unless required by applicable law or agreed to in writing, software
9 # distributed under the License is distributed on an "AS IS" BASIS,
10 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 # See the License for the specific language governing permissions and
12 # limitations under the License.
13
14
15 import logging
16 from functools import wraps
17 from inspect import getcallargs
18 from typing import Callable, TypeVar, cast
19
20 _TIME_FUNC_ID = 0
21
22
23 def _log_debug_as_f(f, msg, msg_args):
24 name = f.__module__
25 logger = logging.getLogger(name)
26
27 if logger.isEnabledFor(logging.DEBUG):
28 lineno = f.__code__.co_firstlineno
29 pathname = f.__code__.co_filename
30
31 record = logger.makeRecord(
32 name=name,
33 level=logging.DEBUG,
34 fn=pathname,
35 lno=lineno,
36 msg=msg,
37 args=msg_args,
38 exc_info=None,
39 )
40
41 logger.handle(record)
42
43
44 F = TypeVar("F", bound=Callable)
45
46
47 def log_function(f: F) -> F:
48 """Function decorator that logs every call to that function."""
49 func_name = f.__name__
50
51 @wraps(f)
52 def wrapped(*args, **kwargs):
53 name = f.__module__
54 logger = logging.getLogger(name)
55 level = logging.DEBUG
56
57 if logger.isEnabledFor(level):
58 bound_args = getcallargs(f, *args, **kwargs)
59
60 def format(value):
61 r = str(value)
62 if len(r) > 50:
63 r = r[:50] + "..."
64 return r
65
66 func_args = ["%s=%s" % (k, format(v)) for k, v in bound_args.items()]
67
68 msg_args = {"func_name": func_name, "args": ", ".join(func_args)}
69
70 _log_debug_as_f(f, "Invoked '%(func_name)s' with args: %(args)s", msg_args)
71
72 return f(*args, **kwargs)
73
74 wrapped.__name__ = func_name
75 return cast(F, wrapped)
1111 # See the License for the specific language governing permissions and
1212 # limitations under the License.
1313
14 import functools
15 import gc
1614 import itertools
1715 import logging
1816 import os
1917 import platform
2018 import threading
21 import time
2219 from typing import (
23 Any,
2420 Callable,
2521 Dict,
2622 Generic,
3329 Type,
3430 TypeVar,
3531 Union,
36 cast,
3732 )
3833
3934 import attr
4035 from prometheus_client import CollectorRegistry, Counter, Gauge, Histogram, Metric
4136 from prometheus_client.core import (
4237 REGISTRY,
43 CounterMetricFamily,
4438 GaugeHistogramMetricFamily,
4539 GaugeMetricFamily,
4640 )
4741
48 from twisted.internet import reactor
49 from twisted.internet.base import ReactorBase
5042 from twisted.python.threadpool import ThreadPool
5143
52 import synapse
44 import synapse.metrics._reactor_metrics
5345 from synapse.metrics._exposition import (
5446 MetricsResource,
5547 generate_latest,
5648 start_http_server,
5749 )
50 from synapse.metrics._gc import MIN_TIME_BETWEEN_GCS, install_gc_manager
5851 from synapse.util.versionstring import get_version_string
5952
6053 logger = logging.getLogger(__name__)
6154
6255 METRICS_PREFIX = "/_synapse/metrics"
6356
64 running_on_pypy = platform.python_implementation() == "PyPy"
6557 all_gauges: "Dict[str, Union[LaterGauge, InFlightGauge]]" = {}
6658
6759 HAVE_PROC_SELF_STAT = os.path.exists("/proc/self/stat")
7567 yield metric
7668
7769
78 @attr.s(slots=True, hash=True)
70 @attr.s(slots=True, hash=True, auto_attribs=True)
7971 class LaterGauge:
8072
81 name = attr.ib(type=str)
82 desc = attr.ib(type=str)
83 labels = attr.ib(hash=False, type=Optional[Iterable[str]])
73 name: str
74 desc: str
75 labels: Optional[Iterable[str]] = attr.ib(hash=False)
8476 # callback: should either return a value (if there are no labels for this metric),
8577 # or dict mapping from a label tuple to a value
86 caller = attr.ib(
87 type=Callable[
88 [], Union[Mapping[Tuple[str, ...], Union[int, float]], Union[int, float]]
89 ]
90 )
78 caller: Callable[
79 [], Union[Mapping[Tuple[str, ...], Union[int, float]], Union[int, float]]
80 ]
9181
9282 def collect(self) -> Iterable[Metric]:
9383
156146 # Create a class which have the sub_metrics values as attributes, which
157147 # default to 0 on initialization. Used to pass to registered callbacks.
158148 self._metrics_class: Type[MetricsEntry] = attr.make_class(
159 "_MetricsEntry", attrs={x: attr.ib(0) for x in sub_metrics}, slots=True
149 "_MetricsEntry",
150 attrs={x: attr.ib(default=0) for x in sub_metrics},
151 slots=True,
160152 )
161153
162154 # Counts number of in flight blocks for a given set of label values
368360
369361 REGISTRY.register(CPUMetrics())
370362
371 #
372 # Python GC metrics
373 #
374
375 gc_unreachable = Gauge("python_gc_unreachable_total", "Unreachable GC objects", ["gen"])
376 gc_time = Histogram(
377 "python_gc_time",
378 "Time taken to GC (sec)",
379 ["gen"],
380 buckets=[
381 0.0025,
382 0.005,
383 0.01,
384 0.025,
385 0.05,
386 0.10,
387 0.25,
388 0.50,
389 1.00,
390 2.50,
391 5.00,
392 7.50,
393 15.00,
394 30.00,
395 45.00,
396 60.00,
397 ],
398 )
399
400
401 class GCCounts:
402 def collect(self) -> Iterable[Metric]:
403 cm = GaugeMetricFamily("python_gc_counts", "GC object counts", labels=["gen"])
404 for n, m in enumerate(gc.get_count()):
405 cm.add_metric([str(n)], m)
406
407 yield cm
408
409
410 if not running_on_pypy:
411 REGISTRY.register(GCCounts())
412
413
414 #
415 # PyPy GC / memory metrics
416 #
417
418
419 class PyPyGCStats:
420 def collect(self) -> Iterable[Metric]:
421
422 # @stats is a pretty-printer object with __str__() returning a nice table,
423 # plus some fields that contain data from that table.
424 # unfortunately, fields are pretty-printed themselves (i. e. '4.5MB').
425 stats = gc.get_stats(memory_pressure=False) # type: ignore
426 # @s contains same fields as @stats, but as actual integers.
427 s = stats._s # type: ignore
428
429 # also note that field naming is completely braindead
430 # and only vaguely correlates with the pretty-printed table.
431 # >>>> gc.get_stats(False)
432 # Total memory consumed:
433 # GC used: 8.7MB (peak: 39.0MB) # s.total_gc_memory, s.peak_memory
434 # in arenas: 3.0MB # s.total_arena_memory
435 # rawmalloced: 1.7MB # s.total_rawmalloced_memory
436 # nursery: 4.0MB # s.nursery_size
437 # raw assembler used: 31.0kB # s.jit_backend_used
438 # -----------------------------
439 # Total: 8.8MB # stats.memory_used_sum
440 #
441 # Total memory allocated:
442 # GC allocated: 38.7MB (peak: 41.1MB) # s.total_allocated_memory, s.peak_allocated_memory
443 # in arenas: 30.9MB # s.peak_arena_memory
444 # rawmalloced: 4.1MB # s.peak_rawmalloced_memory
445 # nursery: 4.0MB # s.nursery_size
446 # raw assembler allocated: 1.0MB # s.jit_backend_allocated
447 # -----------------------------
448 # Total: 39.7MB # stats.memory_allocated_sum
449 #
450 # Total time spent in GC: 0.073 # s.total_gc_time
451
452 pypy_gc_time = CounterMetricFamily(
453 "pypy_gc_time_seconds_total",
454 "Total time spent in PyPy GC",
455 labels=[],
456 )
457 pypy_gc_time.add_metric([], s.total_gc_time / 1000)
458 yield pypy_gc_time
459
460 pypy_mem = GaugeMetricFamily(
461 "pypy_memory_bytes",
462 "Memory tracked by PyPy allocator",
463 labels=["state", "class", "kind"],
464 )
465 # memory used by JIT assembler
466 pypy_mem.add_metric(["used", "", "jit"], s.jit_backend_used)
467 pypy_mem.add_metric(["allocated", "", "jit"], s.jit_backend_allocated)
468 # memory used by GCed objects
469 pypy_mem.add_metric(["used", "", "arenas"], s.total_arena_memory)
470 pypy_mem.add_metric(["allocated", "", "arenas"], s.peak_arena_memory)
471 pypy_mem.add_metric(["used", "", "rawmalloced"], s.total_rawmalloced_memory)
472 pypy_mem.add_metric(["allocated", "", "rawmalloced"], s.peak_rawmalloced_memory)
473 pypy_mem.add_metric(["used", "", "nursery"], s.nursery_size)
474 pypy_mem.add_metric(["allocated", "", "nursery"], s.nursery_size)
475 # totals
476 pypy_mem.add_metric(["used", "totals", "gc"], s.total_gc_memory)
477 pypy_mem.add_metric(["allocated", "totals", "gc"], s.total_allocated_memory)
478 pypy_mem.add_metric(["used", "totals", "gc_peak"], s.peak_memory)
479 pypy_mem.add_metric(["allocated", "totals", "gc_peak"], s.peak_allocated_memory)
480 yield pypy_mem
481
482
483 if running_on_pypy:
484 REGISTRY.register(PyPyGCStats())
485
486
487 #
488 # Twisted reactor metrics
489 #
490
491 tick_time = Histogram(
492 "python_twisted_reactor_tick_time",
493 "Tick time of the Twisted reactor (sec)",
494 buckets=[0.001, 0.002, 0.005, 0.01, 0.025, 0.05, 0.1, 0.2, 0.5, 1, 2, 5],
495 )
496 pending_calls_metric = Histogram(
497 "python_twisted_reactor_pending_calls",
498 "Pending calls",
499 buckets=[1, 2, 5, 10, 25, 50, 100, 250, 500, 1000],
500 )
501363
502364 #
503365 # Federation Metrics
549411 get_version_string(synapse),
550412 " ".join([platform.system(), platform.release()]),
551413 ).set(1)
552
553 last_ticked = time.time()
554414
555415 # 3PID send info
556416 threepid_send_requests = Histogram(
599459 )
600460
601461
602 class ReactorLastSeenMetric:
603 def collect(self) -> Iterable[Metric]:
604 cm = GaugeMetricFamily(
605 "python_twisted_reactor_last_seen",
606 "Seconds since the Twisted reactor was last seen",
607 )
608 cm.add_metric([], time.time() - last_ticked)
609 yield cm
610
611
612 REGISTRY.register(ReactorLastSeenMetric())
613
614 # The minimum time in seconds between GCs for each generation, regardless of the current GC
615 # thresholds and counts.
616 MIN_TIME_BETWEEN_GCS = (1.0, 10.0, 30.0)
617
618 # The time (in seconds since the epoch) of the last time we did a GC for each generation.
619 _last_gc = [0.0, 0.0, 0.0]
620
621
622 F = TypeVar("F", bound=Callable[..., Any])
623
624
625 def runUntilCurrentTimer(reactor: ReactorBase, func: F) -> F:
626 @functools.wraps(func)
627 def f(*args: Any, **kwargs: Any) -> Any:
628 now = reactor.seconds()
629 num_pending = 0
630
631 # _newTimedCalls is one long list of *all* pending calls. Below loop
632 # is based off of impl of reactor.runUntilCurrent
633 for delayed_call in reactor._newTimedCalls:
634 if delayed_call.time > now:
635 break
636
637 if delayed_call.delayed_time > 0:
638 continue
639
640 num_pending += 1
641
642 num_pending += len(reactor.threadCallQueue)
643 start = time.time()
644 ret = func(*args, **kwargs)
645 end = time.time()
646
647 # record the amount of wallclock time spent running pending calls.
648 # This is a proxy for the actual amount of time between reactor polls,
649 # since about 25% of time is actually spent running things triggered by
650 # I/O events, but that is harder to capture without rewriting half the
651 # reactor.
652 tick_time.observe(end - start)
653 pending_calls_metric.observe(num_pending)
654
655 # Update the time we last ticked, for the metric to test whether
656 # Synapse's reactor has frozen
657 global last_ticked
658 last_ticked = end
659
660 if running_on_pypy:
661 return ret
662
663 # Check if we need to do a manual GC (since its been disabled), and do
664 # one if necessary. Note we go in reverse order as e.g. a gen 1 GC may
665 # promote an object into gen 2, and we don't want to handle the same
666 # object multiple times.
667 threshold = gc.get_threshold()
668 counts = gc.get_count()
669 for i in (2, 1, 0):
670 # We check if we need to do one based on a straightforward
671 # comparison between the threshold and count. We also do an extra
672 # check to make sure that we don't a GC too often.
673 if threshold[i] < counts[i] and MIN_TIME_BETWEEN_GCS[i] < end - _last_gc[i]:
674 if i == 0:
675 logger.debug("Collecting gc %d", i)
676 else:
677 logger.info("Collecting gc %d", i)
678
679 start = time.time()
680 unreachable = gc.collect(i)
681 end = time.time()
682
683 _last_gc[i] = end
684
685 gc_time.labels(i).observe(end - start)
686 gc_unreachable.labels(i).set(unreachable)
687
688 return ret
689
690 return cast(F, f)
691
692
693 try:
694 # Ensure the reactor has all the attributes we expect
695 reactor.seconds # type: ignore
696 reactor.runUntilCurrent # type: ignore
697 reactor._newTimedCalls # type: ignore
698 reactor.threadCallQueue # type: ignore
699
700 # runUntilCurrent is called when we have pending calls. It is called once
701 # per iteratation after fd polling.
702 reactor.runUntilCurrent = runUntilCurrentTimer(reactor, reactor.runUntilCurrent) # type: ignore
703
704 # We manually run the GC each reactor tick so that we can get some metrics
705 # about time spent doing GC,
706 if not running_on_pypy:
707 gc.disable()
708 except AttributeError:
709 pass
710
711
712462 __all__ = [
713463 "MetricsResource",
714464 "generate_latest",
716466 "LaterGauge",
717467 "InFlightGauge",
718468 "GaugeBucketCollector",
469 "MIN_TIME_BETWEEN_GCS",
470 "install_gc_manager",
719471 ]
0 # Copyright 2015-2022 The Matrix.org Foundation C.I.C.
1 #
2 # Licensed under the Apache License, Version 2.0 (the "License");
3 # you may not use this file except in compliance with the License.
4 # You may obtain a copy of the License at
5 #
6 # http://www.apache.org/licenses/LICENSE-2.0
7 #
8 # Unless required by applicable law or agreed to in writing, software
9 # distributed under the License is distributed on an "AS IS" BASIS,
10 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 # See the License for the specific language governing permissions and
12 # limitations under the License.
13
14
15 import gc
16 import logging
17 import platform
18 import time
19 from typing import Iterable
20
21 from prometheus_client.core import (
22 REGISTRY,
23 CounterMetricFamily,
24 Gauge,
25 GaugeMetricFamily,
26 Histogram,
27 Metric,
28 )
29
30 from twisted.internet import task
31
32 """Prometheus metrics for garbage collection"""
33
34
35 logger = logging.getLogger(__name__)
36
37 # The minimum time in seconds between GCs for each generation, regardless of the current GC
38 # thresholds and counts.
39 MIN_TIME_BETWEEN_GCS = (1.0, 10.0, 30.0)
40
41 running_on_pypy = platform.python_implementation() == "PyPy"
42
43 #
44 # Python GC metrics
45 #
46
47 gc_unreachable = Gauge("python_gc_unreachable_total", "Unreachable GC objects", ["gen"])
48 gc_time = Histogram(
49 "python_gc_time",
50 "Time taken to GC (sec)",
51 ["gen"],
52 buckets=[
53 0.0025,
54 0.005,
55 0.01,
56 0.025,
57 0.05,
58 0.10,
59 0.25,
60 0.50,
61 1.00,
62 2.50,
63 5.00,
64 7.50,
65 15.00,
66 30.00,
67 45.00,
68 60.00,
69 ],
70 )
71
72
73 class GCCounts:
74 def collect(self) -> Iterable[Metric]:
75 cm = GaugeMetricFamily("python_gc_counts", "GC object counts", labels=["gen"])
76 for n, m in enumerate(gc.get_count()):
77 cm.add_metric([str(n)], m)
78
79 yield cm
80
81
82 def install_gc_manager() -> None:
83 """Disable automatic GC, and replace it with a task that runs every 100ms
84
85 This means that (a) we can limit how often GC runs; (b) we can get some metrics
86 about GC activity.
87
88 It does nothing on PyPy.
89 """
90
91 if running_on_pypy:
92 return
93
94 REGISTRY.register(GCCounts())
95
96 gc.disable()
97
98 # The time (in seconds since the epoch) of the last time we did a GC for each generation.
99 _last_gc = [0.0, 0.0, 0.0]
100
101 def _maybe_gc() -> None:
102 # Check if we need to do a manual GC (since its been disabled), and do
103 # one if necessary. Note we go in reverse order as e.g. a gen 1 GC may
104 # promote an object into gen 2, and we don't want to handle the same
105 # object multiple times.
106 threshold = gc.get_threshold()
107 counts = gc.get_count()
108 end = time.time()
109 for i in (2, 1, 0):
110 # We check if we need to do one based on a straightforward
111 # comparison between the threshold and count. We also do an extra
112 # check to make sure that we don't a GC too often.
113 if threshold[i] < counts[i] and MIN_TIME_BETWEEN_GCS[i] < end - _last_gc[i]:
114 if i == 0:
115 logger.debug("Collecting gc %d", i)
116 else:
117 logger.info("Collecting gc %d", i)
118
119 start = time.time()
120 unreachable = gc.collect(i)
121 end = time.time()
122
123 _last_gc[i] = end
124
125 gc_time.labels(i).observe(end - start)
126 gc_unreachable.labels(i).set(unreachable)
127
128 gc_task = task.LoopingCall(_maybe_gc)
129 gc_task.start(0.1)
130
131
132 #
133 # PyPy GC / memory metrics
134 #
135
136
137 class PyPyGCStats:
138 def collect(self) -> Iterable[Metric]:
139
140 # @stats is a pretty-printer object with __str__() returning a nice table,
141 # plus some fields that contain data from that table.
142 # unfortunately, fields are pretty-printed themselves (i. e. '4.5MB').
143 stats = gc.get_stats(memory_pressure=False) # type: ignore
144 # @s contains same fields as @stats, but as actual integers.
145 s = stats._s # type: ignore
146
147 # also note that field naming is completely braindead
148 # and only vaguely correlates with the pretty-printed table.
149 # >>>> gc.get_stats(False)
150 # Total memory consumed:
151 # GC used: 8.7MB (peak: 39.0MB) # s.total_gc_memory, s.peak_memory
152 # in arenas: 3.0MB # s.total_arena_memory
153 # rawmalloced: 1.7MB # s.total_rawmalloced_memory
154 # nursery: 4.0MB # s.nursery_size
155 # raw assembler used: 31.0kB # s.jit_backend_used
156 # -----------------------------
157 # Total: 8.8MB # stats.memory_used_sum
158 #
159 # Total memory allocated:
160 # GC allocated: 38.7MB (peak: 41.1MB) # s.total_allocated_memory, s.peak_allocated_memory
161 # in arenas: 30.9MB # s.peak_arena_memory
162 # rawmalloced: 4.1MB # s.peak_rawmalloced_memory
163 # nursery: 4.0MB # s.nursery_size
164 # raw assembler allocated: 1.0MB # s.jit_backend_allocated
165 # -----------------------------
166 # Total: 39.7MB # stats.memory_allocated_sum
167 #
168 # Total time spent in GC: 0.073 # s.total_gc_time
169
170 pypy_gc_time = CounterMetricFamily(
171 "pypy_gc_time_seconds_total",
172 "Total time spent in PyPy GC",
173 labels=[],
174 )
175 pypy_gc_time.add_metric([], s.total_gc_time / 1000)
176 yield pypy_gc_time
177
178 pypy_mem = GaugeMetricFamily(
179 "pypy_memory_bytes",
180 "Memory tracked by PyPy allocator",
181 labels=["state", "class", "kind"],
182 )
183 # memory used by JIT assembler
184 pypy_mem.add_metric(["used", "", "jit"], s.jit_backend_used)
185 pypy_mem.add_metric(["allocated", "", "jit"], s.jit_backend_allocated)
186 # memory used by GCed objects
187 pypy_mem.add_metric(["used", "", "arenas"], s.total_arena_memory)
188 pypy_mem.add_metric(["allocated", "", "arenas"], s.peak_arena_memory)
189 pypy_mem.add_metric(["used", "", "rawmalloced"], s.total_rawmalloced_memory)
190 pypy_mem.add_metric(["allocated", "", "rawmalloced"], s.peak_rawmalloced_memory)
191 pypy_mem.add_metric(["used", "", "nursery"], s.nursery_size)
192 pypy_mem.add_metric(["allocated", "", "nursery"], s.nursery_size)
193 # totals
194 pypy_mem.add_metric(["used", "totals", "gc"], s.total_gc_memory)
195 pypy_mem.add_metric(["allocated", "totals", "gc"], s.total_allocated_memory)
196 pypy_mem.add_metric(["used", "totals", "gc_peak"], s.peak_memory)
197 pypy_mem.add_metric(["allocated", "totals", "gc_peak"], s.peak_allocated_memory)
198 yield pypy_mem
199
200
201 if running_on_pypy:
202 REGISTRY.register(PyPyGCStats())
0 # Copyright 2022 The Matrix.org Foundation C.I.C.
1 #
2 # Licensed under the Apache License, Version 2.0 (the "License");
3 # you may not use this file except in compliance with the License.
4 # You may obtain a copy of the License at
5 #
6 # http://www.apache.org/licenses/LICENSE-2.0
7 #
8 # Unless required by applicable law or agreed to in writing, software
9 # distributed under the License is distributed on an "AS IS" BASIS,
10 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 # See the License for the specific language governing permissions and
12 # limitations under the License.
13
14 import select
15 import time
16 from typing import Any, Iterable, List, Tuple
17
18 from prometheus_client import Histogram, Metric
19 from prometheus_client.core import REGISTRY, GaugeMetricFamily
20
21 from twisted.internet import reactor
22
23 #
24 # Twisted reactor metrics
25 #
26
27 tick_time = Histogram(
28 "python_twisted_reactor_tick_time",
29 "Tick time of the Twisted reactor (sec)",
30 buckets=[0.001, 0.002, 0.005, 0.01, 0.025, 0.05, 0.1, 0.2, 0.5, 1, 2, 5],
31 )
32
33
34 class EpollWrapper:
35 """a wrapper for an epoll object which records the time between polls"""
36
37 def __init__(self, poller: "select.epoll"): # type: ignore[name-defined]
38 self.last_polled = time.time()
39 self._poller = poller
40
41 def poll(self, *args, **kwargs) -> List[Tuple[int, int]]: # type: ignore[no-untyped-def]
42 # record the time since poll() was last called. This gives a good proxy for
43 # how long it takes to run everything in the reactor - ie, how long anything
44 # waiting for the next tick will have to wait.
45 tick_time.observe(time.time() - self.last_polled)
46
47 ret = self._poller.poll(*args, **kwargs)
48
49 self.last_polled = time.time()
50 return ret
51
52 def __getattr__(self, item: str) -> Any:
53 return getattr(self._poller, item)
54
55
56 class ReactorLastSeenMetric:
57 def __init__(self, epoll_wrapper: EpollWrapper):
58 self._epoll_wrapper = epoll_wrapper
59
60 def collect(self) -> Iterable[Metric]:
61 cm = GaugeMetricFamily(
62 "python_twisted_reactor_last_seen",
63 "Seconds since the Twisted reactor was last seen",
64 )
65 cm.add_metric([], time.time() - self._epoll_wrapper.last_polled)
66 yield cm
67
68
69 try:
70 # if the reactor has a `_poller` attribute, which is an `epoll` object
71 # (ie, it's an EPollReactor), we wrap the `epoll` with a thing that will
72 # measure the time between ticks
73 from select import epoll # type: ignore[attr-defined]
74
75 poller = reactor._poller # type: ignore[attr-defined]
76 except (AttributeError, ImportError):
77 pass
78 else:
79 if isinstance(poller, epoll):
80 poller = EpollWrapper(poller)
81 reactor._poller = poller # type: ignore[attr-defined]
82 REGISTRY.register(ReactorLastSeenMetric(poller))
3939 from synapse.logging import issue9533_logger
4040 from synapse.logging.context import PreserveLoggingContext
4141 from synapse.logging.opentracing import log_kv, start_active_span
42 from synapse.logging.utils import log_function
4342 from synapse.metrics import LaterGauge
4443 from synapse.streams.config import PaginationConfig
4544 from synapse.types import (
192191 return bool(self.events)
193192
194193
195 @attr.s(slots=True, frozen=True)
194 @attr.s(slots=True, frozen=True, auto_attribs=True)
196195 class _PendingRoomEventEntry:
197 event_pos = attr.ib(type=PersistedEventPosition)
198 extra_users = attr.ib(type=Collection[UserID])
199
200 room_id = attr.ib(type=str)
201 type = attr.ib(type=str)
202 state_key = attr.ib(type=Optional[str])
203 membership = attr.ib(type=Optional[str])
196 event_pos: PersistedEventPosition
197 extra_users: Collection[UserID]
198
199 room_id: str
200 type: str
201 state_key: Optional[str]
202 membership: Optional[str]
204203
205204
206205 class Notifier:
685684 else:
686685 return False
687686
688 @log_function
689687 def remove_expired_streams(self) -> None:
690688 time_now_ms = self.clock.time_msec()
691689 expired_streams = []
699697 for expired_stream in expired_streams:
700698 expired_stream.remove(self)
701699
702 @log_function
703700 def _register_with_keys(self, user_stream: _NotifierUserStream):
704701 self.user_to_user_stream[user_stream.user_id] = user_stream
705702
2222 from synapse.server import HomeServer
2323
2424
25 @attr.s(slots=True)
25 @attr.s(slots=True, auto_attribs=True)
2626 class PusherConfig:
2727 """Parameters necessary to configure a pusher."""
2828
29 id = attr.ib(type=Optional[str])
30 user_name = attr.ib(type=str)
31 access_token = attr.ib(type=Optional[int])
32 profile_tag = attr.ib(type=str)
33 kind = attr.ib(type=str)
34 app_id = attr.ib(type=str)
35 app_display_name = attr.ib(type=str)
36 device_display_name = attr.ib(type=str)
37 pushkey = attr.ib(type=str)
38 ts = attr.ib(type=int)
39 lang = attr.ib(type=Optional[str])
40 data = attr.ib(type=Optional[JsonDict])
41 last_stream_ordering = attr.ib(type=int)
42 last_success = attr.ib(type=Optional[int])
43 failing_since = attr.ib(type=Optional[int])
29 id: Optional[str]
30 user_name: str
31 access_token: Optional[int]
32 profile_tag: str
33 kind: str
34 app_id: str
35 app_display_name: str
36 device_display_name: str
37 pushkey: str
38 ts: int
39 lang: Optional[str]
40 data: Optional[JsonDict]
41 last_stream_ordering: int
42 last_success: Optional[int]
43 failing_since: Optional[int]
4444
4545 def as_dict(self) -> Dict[str, Any]:
4646 """Information that can be retrieved about a pusher after creation."""
5656 }
5757
5858
59 @attr.s(slots=True)
59 @attr.s(slots=True, auto_attribs=True)
6060 class ThrottleParams:
6161 """Parameters for controlling the rate of sending pushes via email."""
6262
63 last_sent_ts = attr.ib(type=int)
64 throttle_ms = attr.ib(type=int)
63 last_sent_ts: int
64 throttle_ms: int
6565
6666
6767 class Pusher(metaclass=abc.ABCMeta):
297297 StateGroup = Union[object, int]
298298
299299
300 @attr.s(slots=True)
300 @attr.s(slots=True, auto_attribs=True)
301301 class RulesForRoomData:
302302 """The data stored in the cache by `RulesForRoom`.
303303
306306 """
307307
308308 # event_id -> (user_id, state)
309 member_map = attr.ib(type=MemberMap, factory=dict)
309 member_map: MemberMap = attr.Factory(dict)
310310 # user_id -> rules
311 rules_by_user = attr.ib(type=RulesByUser, factory=dict)
311 rules_by_user: RulesByUser = attr.Factory(dict)
312312
313313 # The last state group we updated the caches for. If the state_group of
314314 # a new event comes along, we know that we can just return the cached
315315 # result.
316316 # On invalidation of the rules themselves (if the user changes them),
317317 # we invalidate everything and set state_group to `object()`
318 state_group = attr.ib(type=StateGroup, factory=object)
318 state_group: StateGroup = attr.Factory(object)
319319
320320 # A sequence number to keep track of when we're allowed to update the
321321 # cache. We bump the sequence number when we invalidate the cache. If
322322 # the sequence number changes while we're calculating stuff we should
323323 # not update the cache with it.
324 sequence = attr.ib(type=int, default=0)
324 sequence: int = 0
325325
326326 # A cache of user_ids that we *know* aren't interesting, e.g. user_ids
327327 # owned by AS's, or remote users, etc. (I.e. users we will never need to
328328 # calculate push for)
329329 # These never need to be invalidated as we will never set up push for
330330 # them.
331 uninteresting_user_set = attr.ib(type=Set[str], factory=set)
331 uninteresting_user_set: Set[str] = attr.Factory(set)
332332
333333
334334 class RulesForRoom:
552552 self.data.state_group = state_group
553553
554554
555 @attr.attrs(slots=True, frozen=True)
555 @attr.attrs(slots=True, frozen=True, auto_attribs=True)
556556 class _Invalidation:
557557 # _Invalidation is passed as an `on_invalidate` callback to bulk_get_push_rules,
558558 # which means that it it is stored on the bulk_get_push_rules cache entry. In order
563563 # attrs provides suitable __hash__ and __eq__ methods, provided we remember to
564564 # set `frozen=True`.
565565
566 cache = attr.ib(type=LruCache)
567 room_id = attr.ib(type=str)
566 cache: LruCache
567 room_id: str
568568
569569 def __call__(self) -> None:
570570 rules_data = self.cache.get(self.room_id, None, update_metrics=False)
177177 await self.send_email(
178178 email_address,
179179 self.email_subjects.email_validation
180 % {"server_name": self.hs.config.server.server_name},
180 % {"server_name": self.hs.config.server.server_name, "app": self.app_name},
181181 template_vars,
182182 )
183183
208208 await self.send_email(
209209 email_address,
210210 self.email_subjects.email_validation
211 % {"server_name": self.hs.config.server.server_name},
211 % {"server_name": self.hs.config.server.server_name, "app": self.app_name},
212212 template_vars,
213213 )
214214
4949 """
5050
5151
52 @attr.s(slots=True, frozen=True)
52 @attr.s(slots=True, frozen=True, auto_attribs=True)
5353 class EventsStreamRow:
5454 """A parsed row from the events replication stream"""
5555
56 type = attr.ib() # str: the TypeId of one of the *EventsStreamRows
57 data = attr.ib() # BaseEventsStreamRow
56 type: str # the TypeId of one of the *EventsStreamRows
57 data: "BaseEventsStreamRow"
5858
5959
6060 class BaseEventsStreamRow:
7878 return cls(*data)
7979
8080
81 @attr.s(slots=True, frozen=True)
81 @attr.s(slots=True, frozen=True, auto_attribs=True)
8282 class EventsStreamEventRow(BaseEventsStreamRow):
8383 TypeId = "ev"
8484
85 event_id = attr.ib(type=str)
86 room_id = attr.ib(type=str)
87 type = attr.ib(type=str)
88 state_key = attr.ib(type=Optional[str])
89 redacts = attr.ib(type=Optional[str])
90 relates_to = attr.ib(type=Optional[str])
91 membership = attr.ib(type=Optional[str])
92 rejected = attr.ib(type=bool)
93
94
95 @attr.s(slots=True, frozen=True)
85 event_id: str
86 room_id: str
87 type: str
88 state_key: Optional[str]
89 redacts: Optional[str]
90 relates_to: Optional[str]
91 membership: Optional[str]
92 rejected: bool
93
94
95 @attr.s(slots=True, frozen=True, auto_attribs=True)
9696 class EventsStreamCurrentStateRow(BaseEventsStreamRow):
9797 TypeId = "state"
9898
99 room_id = attr.ib() # str
100 type = attr.ib() # str
101 state_key = attr.ib() # str
102 event_id = attr.ib() # str, optional
99 room_id: str
100 type: str
101 state_key: str
102 event_id: Optional[str]
103103
104104
105105 _EventRows: Tuple[Type[BaseEventsStreamRow], ...] = (
122122 job_name = body["job_name"]
123123
124124 if job_name == "populate_stats_process_rooms":
125 jobs = [
126 {
127 "update_name": "populate_stats_process_rooms",
128 "progress_json": "{}",
129 },
130 ]
125 jobs = [("populate_stats_process_rooms", "{}", "")]
131126 elif job_name == "regenerate_directory":
132127 jobs = [
133 {
134 "update_name": "populate_user_directory_createtables",
135 "progress_json": "{}",
136 "depends_on": "",
137 },
138 {
139 "update_name": "populate_user_directory_process_rooms",
140 "progress_json": "{}",
141 "depends_on": "populate_user_directory_createtables",
142 },
143 {
144 "update_name": "populate_user_directory_process_users",
145 "progress_json": "{}",
146 "depends_on": "populate_user_directory_process_rooms",
147 },
148 {
149 "update_name": "populate_user_directory_cleanup",
150 "progress_json": "{}",
151 "depends_on": "populate_user_directory_process_users",
152 },
128 ("populate_user_directory_createtables", "{}", ""),
129 (
130 "populate_user_directory_process_rooms",
131 "{}",
132 "populate_user_directory_createtables",
133 ),
134 (
135 "populate_user_directory_process_users",
136 "{}",
137 "populate_user_directory_process_rooms",
138 ),
139 (
140 "populate_user_directory_cleanup",
141 "{}",
142 "populate_user_directory_process_users",
143 ),
153144 ]
154145 else:
155146 raise SynapseError(HTTPStatus.BAD_REQUEST, "Invalid job_name")
157148 try:
158149 await self._store.db_pool.simple_insert_many(
159150 table="background_updates",
151 keys=("update_name", "progress_json", "depends_on"),
160152 values=jobs,
161153 desc=f"admin_api_run_{job_name}",
162154 )
110110 ) -> Tuple[int, JsonDict]:
111111 await assert_requester_is_admin(self._auth, request)
112112
113 if not await self._store.is_destination_known(destination):
114 raise NotFoundError("Unknown destination")
115
113116 destination_retry_timings = await self._store.get_destination_retry_timings(
114117 destination
115118 )
116
117 if not destination_retry_timings:
118 raise NotFoundError("Unknown destination")
119119
120120 last_successful_stream_ordering = (
121121 await self._store.get_destination_last_successful_stream_ordering(
123123 )
124124 )
125125
126 response = {
126 response: JsonDict = {
127127 "destination": destination,
128 "failure_ts": destination_retry_timings.failure_ts,
129 "retry_last_ts": destination_retry_timings.retry_last_ts,
130 "retry_interval": destination_retry_timings.retry_interval,
131128 "last_successful_stream_ordering": last_successful_stream_ordering,
132129 }
133130
131 if destination_retry_timings:
132 response = {
133 **response,
134 "failure_ts": destination_retry_timings.failure_ts,
135 "retry_last_ts": destination_retry_timings.retry_last_ts,
136 "retry_interval": destination_retry_timings.retry_interval,
137 }
138 else:
139 response = {
140 **response,
141 "failure_ts": None,
142 "retry_last_ts": 0,
143 "retry_interval": 0,
144 }
145
134146 return HTTPStatus.OK, response
465465 )
466466
467467 deleted_media, total = await self.media_repository.delete_local_media_ids(
468 ([row["media_id"] for row in media])
468 [row["media_id"] for row in media]
469469 )
470470
471471 return HTTPStatus.OK, {"deleted_media": deleted_media, "total": total}
423423 event_ids = await self.store.get_current_state_ids(room_id)
424424 events = await self.store.get_events(event_ids.values())
425425 now = self.clock.time_msec()
426 room_state = await self._event_serializer.serialize_events(events.values(), now)
426 room_state = self._event_serializer.serialize_events(events.values(), now)
427427 ret = {"state": room_state}
428428
429429 return HTTPStatus.OK, ret
743743 )
744744
745745 time_now = self.clock.time_msec()
746 results["events_before"] = await self._event_serializer.serialize_events(
747 results["events_before"],
748 time_now,
749 bundle_aggregations=True,
750 )
751 results["event"] = await self._event_serializer.serialize_event(
752 results["event"],
753 time_now,
754 bundle_aggregations=True,
755 )
756 results["events_after"] = await self._event_serializer.serialize_events(
757 results["events_after"],
758 time_now,
759 bundle_aggregations=True,
760 )
761 results["state"] = await self._event_serializer.serialize_events(
746 aggregations = results.pop("aggregations", None)
747 results["events_before"] = self._event_serializer.serialize_events(
748 results["events_before"], time_now, bundle_aggregations=aggregations
749 )
750 results["event"] = self._event_serializer.serialize_event(
751 results["event"], time_now, bundle_aggregations=aggregations
752 )
753 results["events_after"] = self._event_serializer.serialize_events(
754 results["events_after"], time_now, bundle_aggregations=aggregations
755 )
756 results["state"] = self._event_serializer.serialize_events(
762757 results["state"], time_now
763758 )
764759
172172 if not self.hs.is_mine(target_user):
173173 raise SynapseError(HTTPStatus.BAD_REQUEST, "Can only look up local users")
174174
175 ret = await self.admin_handler.get_user(target_user)
176
177 if not ret:
175 user_info_dict = await self.admin_handler.get_user(target_user)
176 if not user_info_dict:
178177 raise NotFoundError("User not found")
179178
180 return HTTPStatus.OK, ret
179 return HTTPStatus.OK, user_info_dict
181180
182181 async def on_PUT(
183182 self, request: SynapseRequest, user_id: str
398397 target_user, requester, body["avatar_url"], True
399398 )
400399
401 user = await self.admin_handler.get_user(target_user)
402 assert user is not None
403
404 return 201, user
400 user_info_dict = await self.admin_handler.get_user(target_user)
401 assert user_info_dict is not None
402
403 return HTTPStatus.CREATED, user_info_dict
405404
406405
407406 class UserRegisterServlet(RestServlet):
9090
9191 time_now = self.clock.time_msec()
9292 if event:
93 result = await self._event_serializer.serialize_event(event, time_now)
93 result = self._event_serializer.serialize_event(event, time_now)
9494 return 200, result
9595 else:
9696 return 404, "Event not found."
7171 "actions": pa.actions,
7272 "ts": pa.received_ts,
7373 "event": (
74 await self._event_serializer.serialize_event(
74 self._event_serializer.serialize_event(
7575 notif_events[pa.event_id],
7676 self.clock.time_msec(),
7777 event_format=format_event_for_client_v2_without_room_id,
1818 """
1919
2020 import logging
21 from typing import TYPE_CHECKING, Awaitable, Optional, Tuple
22
23 from synapse.api.constants import EventTypes, RelationTypes
24 from synapse.api.errors import ShadowBanError, SynapseError
21 from typing import TYPE_CHECKING, Optional, Tuple
22
23 from synapse.api.constants import RelationTypes
24 from synapse.api.errors import SynapseError
2525 from synapse.http.server import HttpServer
26 from synapse.http.servlet import (
27 RestServlet,
28 parse_integer,
29 parse_json_object_from_request,
30 parse_string,
31 )
26 from synapse.http.servlet import RestServlet, parse_integer, parse_string
3227 from synapse.http.site import SynapseRequest
33 from synapse.rest.client.transactions import HttpTransactionCache
28 from synapse.rest.client._base import client_patterns
3429 from synapse.storage.relations import (
3530 AggregationPaginationToken,
3631 PaginationChunk,
3732 RelationPaginationToken,
3833 )
3934 from synapse.types import JsonDict
40 from synapse.util.stringutils import random_string
41
42 from ._base import client_patterns
4335
4436 if TYPE_CHECKING:
4537 from synapse.server import HomeServer
4638
4739 logger = logging.getLogger(__name__)
48
49
50 class RelationSendServlet(RestServlet):
51 """Helper API for sending events that have relation data.
52
53 Example API shape to send a šŸ‘ reaction to a room:
54
55 POST /rooms/!foo/send_relation/$bar/m.annotation/m.reaction?key=%F0%9F%91%8D
56 {}
57
58 {
59 "event_id": "$foobar"
60 }
61 """
62
63 PATTERN = (
64 "/rooms/(?P<room_id>[^/]*)/send_relation"
65 "/(?P<parent_id>[^/]*)/(?P<relation_type>[^/]*)/(?P<event_type>[^/]*)"
66 )
67
68 def __init__(self, hs: "HomeServer"):
69 super().__init__()
70 self.auth = hs.get_auth()
71 self.event_creation_handler = hs.get_event_creation_handler()
72 self.txns = HttpTransactionCache(hs)
73
74 def register(self, http_server: HttpServer) -> None:
75 http_server.register_paths(
76 "POST",
77 client_patterns(self.PATTERN + "$", releases=()),
78 self.on_PUT_or_POST,
79 self.__class__.__name__,
80 )
81 http_server.register_paths(
82 "PUT",
83 client_patterns(self.PATTERN + "/(?P<txn_id>[^/]*)$", releases=()),
84 self.on_PUT,
85 self.__class__.__name__,
86 )
87
88 def on_PUT(
89 self,
90 request: SynapseRequest,
91 room_id: str,
92 parent_id: str,
93 relation_type: str,
94 event_type: str,
95 txn_id: Optional[str] = None,
96 ) -> Awaitable[Tuple[int, JsonDict]]:
97 return self.txns.fetch_or_execute_request(
98 request,
99 self.on_PUT_or_POST,
100 request,
101 room_id,
102 parent_id,
103 relation_type,
104 event_type,
105 txn_id,
106 )
107
108 async def on_PUT_or_POST(
109 self,
110 request: SynapseRequest,
111 room_id: str,
112 parent_id: str,
113 relation_type: str,
114 event_type: str,
115 txn_id: Optional[str] = None,
116 ) -> Tuple[int, JsonDict]:
117 requester = await self.auth.get_user_by_req(request, allow_guest=True)
118
119 if event_type == EventTypes.Member:
120 # Add relations to a membership is meaningless, so we just deny it
121 # at the CS API rather than trying to handle it correctly.
122 raise SynapseError(400, "Cannot send member events with relations")
123
124 content = parse_json_object_from_request(request)
125
126 aggregation_key = parse_string(request, "key", encoding="utf-8")
127
128 content["m.relates_to"] = {
129 "event_id": parent_id,
130 "rel_type": relation_type,
131 }
132 if aggregation_key is not None:
133 content["m.relates_to"]["key"] = aggregation_key
134
135 event_dict = {
136 "type": event_type,
137 "content": content,
138 "room_id": room_id,
139 "sender": requester.user.to_string(),
140 }
141
142 try:
143 (
144 event,
145 _,
146 ) = await self.event_creation_handler.create_and_send_nonmember_event(
147 requester, event_dict=event_dict, txn_id=txn_id
148 )
149 event_id = event.event_id
150 except ShadowBanError:
151 event_id = "$" + random_string(43)
152
153 return 200, {"event_id": event_id}
15440
15541
15642 class RelationPaginationServlet(RestServlet):
226112 now = self.clock.time_msec()
227113 # Do not bundle aggregations when retrieving the original event because
228114 # we want the content before relations are applied to it.
229 original_event = await self._event_serializer.serialize_event(
230 event, now, bundle_aggregations=False
115 original_event = self._event_serializer.serialize_event(
116 event, now, bundle_aggregations=None
231117 )
232118 # The relations returned for the requested event do include their
233119 # bundled aggregations.
234 serialized_events = await self._event_serializer.serialize_events(
235 events, now, bundle_aggregations=True
120 aggregations = await self.store.get_bundled_aggregations(
121 events, requester.user.to_string()
122 )
123 serialized_events = self._event_serializer.serialize_events(
124 events, now, bundle_aggregations=aggregations
236125 )
237126
238127 return_value = pagination_chunk.to_dict()
421310 )
422311
423312 now = self.clock.time_msec()
424 serialized_events = await self._event_serializer.serialize_events(events, now)
313 serialized_events = self._event_serializer.serialize_events(events, now)
425314
426315 return_value = result.to_dict()
427316 return_value["chunk"] = serialized_events
430319
431320
432321 def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
433 RelationSendServlet(hs).register(http_server)
434322 RelationPaginationServlet(hs).register(http_server)
435323 RelationAggregationPaginationServlet(hs).register(http_server)
436324 RelationAggregationGroupPaginationServlet(hs).register(http_server)
641641 def __init__(self, hs: "HomeServer"):
642642 super().__init__()
643643 self.clock = hs.get_clock()
644 self._store = hs.get_datastore()
644645 self.event_handler = hs.get_event_handler()
645646 self._event_serializer = hs.get_event_client_serializer()
646647 self.auth = hs.get_auth()
659660 # https://matrix.org/docs/spec/client_server/r0.5.0#get-matrix-client-r0-rooms-roomid-event-eventid
660661 raise SynapseError(404, "Event not found.", errcode=Codes.NOT_FOUND)
661662
662 time_now = self.clock.time_msec()
663663 if event:
664 event_dict = await self._event_serializer.serialize_event(
665 event, time_now, bundle_aggregations=True
664 # Ensure there are bundled aggregations available.
665 aggregations = await self._store.get_bundled_aggregations(
666 [event], requester.user.to_string()
667 )
668
669 time_now = self.clock.time_msec()
670 event_dict = self._event_serializer.serialize_event(
671 event, time_now, bundle_aggregations=aggregations
666672 )
667673 return 200, event_dict
668674
707713 raise SynapseError(404, "Event not found.", errcode=Codes.NOT_FOUND)
708714
709715 time_now = self.clock.time_msec()
710 results["events_before"] = await self._event_serializer.serialize_events(
711 results["events_before"], time_now, bundle_aggregations=True
712 )
713 results["event"] = await self._event_serializer.serialize_event(
714 results["event"], time_now, bundle_aggregations=True
715 )
716 results["events_after"] = await self._event_serializer.serialize_events(
717 results["events_after"], time_now, bundle_aggregations=True
718 )
719 results["state"] = await self._event_serializer.serialize_events(
716 aggregations = results.pop("aggregations", None)
717 results["events_before"] = self._event_serializer.serialize_events(
718 results["events_before"], time_now, bundle_aggregations=aggregations
719 )
720 results["event"] = self._event_serializer.serialize_event(
721 results["event"], time_now, bundle_aggregations=aggregations
722 )
723 results["events_after"] = self._event_serializer.serialize_events(
724 results["events_after"], time_now, bundle_aggregations=aggregations
725 )
726 results["state"] = self._event_serializer.serialize_events(
720727 results["state"], time_now
721728 )
722729
1616 from typing import (
1717 TYPE_CHECKING,
1818 Any,
19 Awaitable,
2019 Callable,
2120 Dict,
2221 Iterable,
394393 """
395394 invited = {}
396395 for room in rooms:
397 invite = await self._event_serializer.serialize_event(
396 invite = self._event_serializer.serialize_event(
398397 room.invite,
399398 time_now,
400399 token_id=token_id,
431430 """
432431 knocked = {}
433432 for room in rooms:
434 knock = await self._event_serializer.serialize_event(
433 knock = self._event_serializer.serialize_event(
435434 room.knock,
436435 time_now,
437436 token_id=token_id,
524523 The room, encoded in our response format
525524 """
526525
527 def serialize(events: Iterable[EventBase]) -> Awaitable[List[JsonDict]]:
526 def serialize(
527 events: Iterable[EventBase],
528 aggregations: Optional[Dict[str, Dict[str, Any]]] = None,
529 ) -> List[JsonDict]:
528530 return self._event_serializer.serialize_events(
529531 events,
530532 time_now=time_now,
531 # Don't bother to bundle aggregations if the timeline is unlimited,
532 # as clients will have all the necessary information.
533 # bundle_aggregations=room.timeline.limited,
534 #
535 # richvdh 2021-12-15: disable this temporarily as it has too high an
536 # overhead for initialsyncs. We need to figure out a way that the
537 # bundling can be done *before* the events are stored in the
538 # SyncResponseCache so that this part can be synchronous.
539 #
540 # Ensure to re-enable the test at tests/rest/client/test_relations.py::RelationsTestCase.test_bundled_aggregations.
541 bundle_aggregations=False,
533 bundle_aggregations=aggregations,
542534 token_id=token_id,
543535 event_format=event_formatter,
544536 only_event_fields=only_fields,
560552 event.room_id,
561553 )
562554
563 serialized_state = await serialize(state_events)
564 serialized_timeline = await serialize(timeline_events)
555 serialized_state = serialize(state_events)
556 serialized_timeline = serialize(
557 timeline_events, room.timeline.bundled_aggregations
558 )
565559
566560 account_data = room.account_data
567561
342342 """
343343
344344
345 @attr.s(slots=True)
345 @attr.s(slots=True, auto_attribs=True)
346346 class ReadableFileWrapper:
347347 """Wrapper that allows reading a file in chunks, yielding to the reactor,
348348 and writing to a callback.
353353
354354 CHUNK_SIZE = 2 ** 14
355355
356 clock = attr.ib(type=Clock)
357 path = attr.ib(type=str)
356 clock: Clock
357 path: str
358358
359359 async def write_chunks_to(self, callback: Callable[[bytes], None]) -> None:
360360 """Reads the file in chunks and calls the callback with each chunk."""
3232 class OEmbedResult:
3333 # The Open Graph result (converted from the oEmbed result).
3434 open_graph_result: JsonDict
35 # The author_name of the oEmbed result
36 author_name: Optional[str]
3537 # Number of milliseconds to cache the content, according to the oEmbed response.
3638 #
3739 # This will be None if no cache-age is provided in the oEmbed response (or
153155 "og:url": url,
154156 }
155157
156 # Use either title or author's name as the title.
157 title = oembed.get("title") or oembed.get("author_name")
158 title = oembed.get("title")
158159 if title:
159160 open_graph_response["og:title"] = title
161
162 author_name = oembed.get("author_name")
160163
161164 # Use the provider name and as the site.
162165 provider_name = oembed.get("provider_name")
192195 # Trap any exception and let the code follow as usual.
193196 logger.warning("Error parsing oEmbed metadata from %s: %r", url, e)
194197 open_graph_response = {}
198 author_name = None
195199 cache_age = None
196200
197 return OEmbedResult(open_graph_response, cache_age)
201 return OEmbedResult(open_graph_response, author_name, cache_age)
198202
199203
200204 def _fetch_urls(tree: "etree.Element", tag_name: str) -> List[str]:
261261
262262 # The number of milliseconds that the response should be considered valid.
263263 expiration_ms = media_info.expires
264 author_name: Optional[str] = None
264265
265266 if _is_media(media_info.media_type):
266267 file_id = media_info.filesystem_id
293294 # Check if this HTML document points to oEmbed information and
294295 # defer to that.
295296 oembed_url = self._oembed.autodiscover_from_html(tree)
296 og = {}
297 og_from_oembed: JsonDict = {}
297298 if oembed_url:
298299 oembed_info = await self._download_url(oembed_url, user)
299 og, expiration_ms = await self._handle_oembed_response(
300 (
301 og_from_oembed,
302 author_name,
303 expiration_ms,
304 ) = await self._handle_oembed_response(
300305 url, oembed_info, expiration_ms
301306 )
302307
303 # If there was no oEmbed URL (or oEmbed parsing failed), attempt
304 # to generate the Open Graph information from the HTML.
305 if not oembed_url or not og:
306 og = parse_html_to_open_graph(tree, media_info.uri)
308 # Parse Open Graph information from the HTML in case the oEmbed
309 # response failed or is incomplete.
310 og_from_html = parse_html_to_open_graph(tree, media_info.uri)
311
312 # Compile the Open Graph response by using the scraped
313 # information from the HTML and overlaying any information
314 # from the oEmbed response.
315 og = {**og_from_html, **og_from_oembed}
307316
308317 await self._precache_image_url(user, media_info, og)
309318 else:
311320
312321 elif oembed_url:
313322 # Handle the oEmbed information.
314 og, expiration_ms = await self._handle_oembed_response(
323 og, author_name, expiration_ms = await self._handle_oembed_response(
315324 url, media_info, expiration_ms
316325 )
317326 await self._precache_image_url(user, media_info, og)
319328 else:
320329 logger.warning("Failed to find any OG data in %s", url)
321330 og = {}
331
332 # If we don't have a title but we have author_name, copy it as
333 # title
334 if not og.get("og:title") and author_name:
335 og["og:title"] = author_name
322336
323337 # filter out any stupidly long values
324338 keys_to_remove = []
483497
484498 async def _handle_oembed_response(
485499 self, url: str, media_info: MediaInfo, expiration_ms: int
486 ) -> Tuple[JsonDict, int]:
500 ) -> Tuple[JsonDict, Optional[str], int]:
487501 """
488502 Parse the downloaded oEmbed info.
489503
496510 Returns:
497511 A tuple of:
498512 The Open Graph dictionary, if the oEmbed info can be parsed.
513 The author name if it could be retrieved from oEmbed.
499514 The (possibly updated) length of time, in milliseconds, the media is valid for.
500515 """
501516 # If JSON was not returned, there's nothing to do.
502517 if not _is_json(media_info.media_type):
503 return {}, expiration_ms
518 return {}, None, expiration_ms
504519
505520 with open(media_info.filename, "rb") as file:
506521 body = file.read()
512527 if open_graph_result and oembed_response.cache_age is not None:
513528 expiration_ms = oembed_response.cache_age
514529
515 return open_graph_result, expiration_ms
530 return open_graph_result, oembed_response.author_name, expiration_ms
516531
517532 def _start_expire_url_cache_data(self) -> Deferred:
518533 return run_as_background_process(
758758
759759 @cache_in_self
760760 def get_event_client_serializer(self) -> EventClientSerializer:
761 return EventClientSerializer(self)
761 return EventClientSerializer()
762762
763763 @cache_in_self
764764 def get_password_policy_handler(self) -> PasswordPolicyHandler:
4444 from synapse.events import EventBase
4545 from synapse.events.snapshot import EventContext
4646 from synapse.logging.context import ContextResourceUsage
47 from synapse.logging.utils import log_function
4847 from synapse.state import v1, v2
4948 from synapse.storage.databases.main.events_worker import EventRedactBehaviour
5049 from synapse.storage.roommember import ProfileInfo
449448 return {key: state_map[ev_id] for key, ev_id in new_state.items()}
450449
451450
452 @attr.s(slots=True)
451 @attr.s(slots=True, auto_attribs=True)
453452 class _StateResMetrics:
454453 """Keeps track of some usage metrics about state res."""
455454
456455 # System and User CPU time, in seconds
457 cpu_time = attr.ib(type=float, default=0.0)
456 cpu_time: float = 0.0
458457
459458 # time spent on database transactions (excluding scheduling time). This roughly
460459 # corresponds to the amount of work done on the db server, excluding event fetches.
461 db_time = attr.ib(type=float, default=0.0)
460 db_time: float = 0.0
462461
463462 # number of events fetched from the db.
464 db_events = attr.ib(type=int, default=0)
463 db_events: int = 0
465464
466465
467466 _biggest_room_by_cpu_counter = Counter(
511510
512511 self.clock.looping_call(self._report_metrics, 120 * 1000)
513512
514 @log_function
515513 async def resolve_state_groups(
516514 self,
517515 room_id: str,
142142 return db_conn
143143
144144
145 @attr.s(slots=True)
145 @attr.s(slots=True, auto_attribs=True)
146146 class LoggingDatabaseConnection:
147147 """A wrapper around a database connection that returns `LoggingTransaction`
148148 as its cursor class.
150150 This is mainly used on startup to ensure that queries get logged correctly
151151 """
152152
153 conn = attr.ib(type=Connection)
154 engine = attr.ib(type=BaseDatabaseEngine)
155 default_txn_name = attr.ib(type=str)
153 conn: Connection
154 engine: BaseDatabaseEngine
155 default_txn_name: str
156156
157157 def cursor(
158158 self, *, txn_name=None, after_callbacks=None, exception_callbacks=None
933933 txn.execute(sql, vals)
934934
935935 async def simple_insert_many(
936 self, table: str, values: List[Dict[str, Any]], desc: str
937 ) -> None:
938 """Executes an INSERT query on the named table.
939
940 The input is given as a list of dicts, with one dict per row.
941 Generally simple_insert_many_values should be preferred for new code.
942
943 Args:
944 table: string giving the table name
945 values: dict of new column names and values for them
946 desc: description of the transaction, for logging and metrics
947 """
948 await self.runInteraction(desc, self.simple_insert_many_txn, table, values)
949
950 @staticmethod
951 def simple_insert_many_txn(
952 txn: LoggingTransaction, table: str, values: List[Dict[str, Any]]
953 ) -> None:
954 """Executes an INSERT query on the named table.
955
956 The input is given as a list of dicts, with one dict per row.
957 Generally simple_insert_many_values_txn should be preferred for new code.
958
959 Args:
960 txn: The transaction to use.
961 table: string giving the table name
962 values: dict of new column names and values for them
963 """
964 if not values:
965 return
966
967 # This is a *slight* abomination to get a list of tuples of key names
968 # and a list of tuples of value names.
969 #
970 # i.e. [{"a": 1, "b": 2}, {"c": 3, "d": 4}]
971 # => [("a", "b",), ("c", "d",)] and [(1, 2,), (3, 4,)]
972 #
973 # The sort is to ensure that we don't rely on dictionary iteration
974 # order.
975 keys, vals = zip(
976 *(zip(*(sorted(i.items(), key=lambda kv: kv[0]))) for i in values if i)
977 )
978
979 for k in keys:
980 if k != keys[0]:
981 raise RuntimeError("All items must have the same keys")
982
983 return DatabasePool.simple_insert_many_values_txn(txn, table, keys[0], vals)
984
985 async def simple_insert_many_values(
986936 self,
987937 table: str,
988938 keys: Collection[str],
1001951 desc: description of the transaction, for logging and metrics
1002952 """
1003953 await self.runInteraction(
1004 desc, self.simple_insert_many_values_txn, table, keys, values
954 desc, self.simple_insert_many_txn, table, keys, values
1005955 )
1006956
1007957 @staticmethod
1008 def simple_insert_many_values_txn(
958 def simple_insert_many_txn(
1009959 txn: LoggingTransaction,
1010960 table: str,
1011961 keys: Collection[str],
449449 async def add_account_data_for_user(
450450 self, user_id: str, account_data_type: str, content: JsonDict
451451 ) -> int:
452 """Add some account_data to a room for a user.
452 """Add some global account_data for a user.
453453
454454 Args:
455455 user_id: The user to add a tag for.
535535 self.db_pool.simple_insert_many_txn(
536536 txn,
537537 table="ignored_users",
538 keys=("ignorer_user_id", "ignored_user_id"),
538539 values=[
539 {"ignorer_user_id": user_id, "ignored_user_id": u}
540 for u in currently_ignored_users - previously_ignored_users
540 (user_id, u) for u in currently_ignored_users - previously_ignored_users
541541 ],
542542 )
543543
431431 self.db_pool.simple_insert_many_txn(
432432 txn,
433433 table="device_federation_outbox",
434 keys=(
435 "destination",
436 "stream_id",
437 "queued_ts",
438 "messages_json",
439 "instance_name",
440 ),
434441 values=[
435 {
436 "destination": destination,
437 "stream_id": stream_id,
438 "queued_ts": now_ms,
439 "messages_json": json_encoder.encode(edu),
440 "instance_name": self._instance_name,
441 }
442 (
443 destination,
444 stream_id,
445 now_ms,
446 json_encoder.encode(edu),
447 self._instance_name,
448 )
442449 for destination, edu in remote_messages_by_destination.items()
443450 ],
444451 )
570577 self.db_pool.simple_insert_many_txn(
571578 txn,
572579 table="device_inbox",
580 keys=("user_id", "device_id", "stream_id", "message_json", "instance_name"),
573581 values=[
574 {
575 "user_id": user_id,
576 "device_id": device_id,
577 "stream_id": stream_id,
578 "message_json": message_json,
579 "instance_name": self._instance_name,
580 }
582 (user_id, device_id, stream_id, message_json, self._instance_name)
581583 for user_id, messages_by_device in local_by_user_then_device.items()
582584 for device_id, message_json in messages_by_device.items()
583585 ],
5252 from synapse.server import HomeServer
5353
5454 logger = logging.getLogger(__name__)
55 issue_8631_logger = logging.getLogger("synapse.8631_debug")
5556
5657 DROP_DEVICE_LIST_STREAMS_NON_UNIQUE_INDEXES = (
5758 "drop_device_list_streams_non_unique_indexes"
227228 # Return an empty list if there are no updates
228229 if not updates:
229230 return now_stream_id, []
231
232 if issue_8631_logger.isEnabledFor(logging.DEBUG):
233 data = {(user, device): stream_id for user, device, stream_id, _ in updates}
234 issue_8631_logger.debug(
235 "device updates need to be sent to %s: %s", destination, data
236 )
230237
231238 # get the cross-signing keys of the users in the list, so that we can
232239 # determine which of the device changes were cross-signing keys
364371 # and remove the length budgeting above.
365372 results.append(("org.matrix.signing_key_update", result))
366373
374 if issue_8631_logger.isEnabledFor(logging.DEBUG):
375 for (user_id, edu) in results:
376 issue_8631_logger.debug(
377 "device update to %s for %s from %s to %s: %s",
378 destination,
379 user_id,
380 from_stream_id,
381 last_processed_stream_id,
382 edu,
383 )
384
367385 return last_processed_stream_id, results
368386
369387 def _get_device_updates_by_remote_txn(
780798 @cached(max_entries=10000)
781799 async def get_device_list_last_stream_id_for_remote(
782800 self, user_id: str
783 ) -> Optional[Any]:
801 ) -> Optional[str]:
784802 """Get the last stream_id we got for a user. May be None if we haven't
785803 got any information for them.
786804 """
796814 cached_method_name="get_device_list_last_stream_id_for_remote",
797815 list_name="user_ids",
798816 )
799 async def get_device_list_last_stream_id_for_remotes(self, user_ids: Iterable[str]):
817 async def get_device_list_last_stream_id_for_remotes(
818 self, user_ids: Iterable[str]
819 ) -> Dict[str, Optional[str]]:
800820 rows = await self.db_pool.simple_select_many_batch(
801821 table="device_lists_remote_extremeties",
802822 column="user_id",
13831403 content: JsonDict,
13841404 stream_id: str,
13851405 ) -> None:
1406 """Delete, update or insert a cache entry for this (user, device) pair."""
13861407 if content.get("deleted"):
13871408 self.db_pool.simple_delete_txn(
13881409 txn,
14421463 def _update_remote_device_list_cache_txn(
14431464 self, txn: LoggingTransaction, user_id: str, devices: List[dict], stream_id: int
14441465 ) -> None:
1466 """Replace the list of cached devices for this user with the given list."""
14451467 self.db_pool.simple_delete_txn(
14461468 txn, table="device_lists_remote_cache", keyvalues={"user_id": user_id}
14471469 )
14491471 self.db_pool.simple_insert_many_txn(
14501472 txn,
14511473 table="device_lists_remote_cache",
1474 keys=("user_id", "device_id", "content"),
14521475 values=[
1453 {
1454 "user_id": user_id,
1455 "device_id": content["device_id"],
1456 "content": json_encoder.encode(content),
1457 }
1476 (user_id, content["device_id"], json_encoder.encode(content))
14581477 for content in devices
14591478 ],
14601479 )
15421561 self.db_pool.simple_insert_many_txn(
15431562 txn,
15441563 table="device_lists_stream",
1564 keys=("stream_id", "user_id", "device_id"),
15451565 values=[
1546 {"stream_id": stream_id, "user_id": user_id, "device_id": device_id}
1566 (stream_id, user_id, device_id)
15471567 for stream_id, device_id in zip(stream_ids, device_ids)
15481568 ],
15491569 )
15701590 self.db_pool.simple_insert_many_txn(
15711591 txn,
15721592 table="device_lists_outbound_pokes",
1593 keys=(
1594 "destination",
1595 "stream_id",
1596 "user_id",
1597 "device_id",
1598 "sent",
1599 "ts",
1600 "opentracing_context",
1601 ),
15731602 values=[
1574 {
1575 "destination": destination,
1576 "stream_id": next(next_stream_id),
1577 "user_id": user_id,
1578 "device_id": device_id,
1579 "sent": False,
1580 "ts": now,
1581 "opentracing_context": json_encoder.encode(context)
1603 (
1604 destination,
1605 next(next_stream_id),
1606 user_id,
1607 device_id,
1608 False,
1609 now,
1610 json_encoder.encode(context)
15821611 if whitelisted_homeserver(destination)
15831612 else "{}",
1584 }
1613 )
15851614 for destination in hosts
15861615 for device_id in device_ids
15871616 ],
111111 self.db_pool.simple_insert_many_txn(
112112 txn,
113113 table="room_alias_servers",
114 values=[
115 {"room_alias": room_alias.to_string(), "server": server}
116 for server in servers
117 ],
114 keys=("room_alias", "server"),
115 values=[(room_alias.to_string(), server) for server in servers],
118116 )
119117
120118 self._invalidate_cache_and_stream(
109109 values = []
110110 for (room_id, session_id, room_key) in room_keys:
111111 values.append(
112 {
113 "user_id": user_id,
114 "version": version_int,
115 "room_id": room_id,
116 "session_id": session_id,
117 "first_message_index": room_key["first_message_index"],
118 "forwarded_count": room_key["forwarded_count"],
119 "is_verified": room_key["is_verified"],
120 "session_data": json_encoder.encode(room_key["session_data"]),
121 }
112 (
113 user_id,
114 version_int,
115 room_id,
116 session_id,
117 room_key["first_message_index"],
118 room_key["forwarded_count"],
119 room_key["is_verified"],
120 json_encoder.encode(room_key["session_data"]),
121 )
122122 )
123123 log_kv(
124124 {
130130 )
131131
132132 await self.db_pool.simple_insert_many(
133 table="e2e_room_keys", values=values, desc="add_e2e_room_keys"
133 table="e2e_room_keys",
134 keys=(
135 "user_id",
136 "version",
137 "room_id",
138 "session_id",
139 "first_message_index",
140 "forwarded_count",
141 "is_verified",
142 "session_data",
143 ),
144 values=values,
145 desc="add_e2e_room_keys",
134146 )
135147
136148 @trace
4949 from synapse.server import HomeServer
5050
5151
52 @attr.s(slots=True)
52 @attr.s(slots=True, auto_attribs=True)
5353 class DeviceKeyLookupResult:
5454 """The type returned by get_e2e_device_keys_and_signatures"""
5555
56 display_name = attr.ib(type=Optional[str])
56 display_name: Optional[str]
5757
5858 # the key data from e2e_device_keys_json. Typically includes fields like
5959 # "algorithm", "keys" (including the curve25519 identity key and the ed25519 signing
6060 # key) and "signatures" (a map from (user id) to (key id/device_id) to signature.)
61 keys = attr.ib(type=Optional[JsonDict])
61 keys: Optional[JsonDict]
6262
6363
6464 class EndToEndKeyBackgroundStore(SQLBaseStore):
386386 self.db_pool.simple_insert_many_txn(
387387 txn,
388388 table="e2e_one_time_keys_json",
389 keys=(
390 "user_id",
391 "device_id",
392 "algorithm",
393 "key_id",
394 "ts_added_ms",
395 "key_json",
396 ),
389397 values=[
390 {
391 "user_id": user_id,
392 "device_id": device_id,
393 "algorithm": algorithm,
394 "key_id": key_id,
395 "ts_added_ms": time_now,
396 "key_json": json_bytes,
397 }
398 (user_id, device_id, algorithm, key_id, time_now, json_bytes)
398399 for algorithm, key_id, json_bytes in new_keys
399400 ],
400401 )
11851186 """
11861187 await self.db_pool.simple_insert_many(
11871188 "e2e_cross_signing_signatures",
1188 [
1189 {
1190 "user_id": user_id,
1191 "key_id": item.signing_key_id,
1192 "target_user_id": item.target_user_id,
1193 "target_device_id": item.target_device_id,
1194 "signature": item.signature,
1195 }
1189 keys=(
1190 "user_id",
1191 "key_id",
1192 "target_user_id",
1193 "target_device_id",
1194 "signature",
1195 ),
1196 values=[
1197 (
1198 user_id,
1199 item.signing_key_id,
1200 item.target_user_id,
1201 item.target_device_id,
1202 item.signature,
1203 )
11961204 for item in signatures
11971205 ],
1198 "add_e2e_signing_key",
1199 )
1206 desc="add_e2e_signing_key",
1207 )
874874 self.db_pool.simple_insert_many_txn(
875875 txn,
876876 table="event_push_summary",
877 keys=(
878 "user_id",
879 "room_id",
880 "notif_count",
881 "unread_count",
882 "stream_ordering",
883 ),
877884 values=[
878 {
879 "user_id": user_id,
880 "room_id": room_id,
881 "notif_count": summary.notif_count,
882 "unread_count": summary.unread_count,
883 "stream_ordering": summary.stream_ordering,
884 }
885 (
886 user_id,
887 room_id,
888 summary.notif_count,
889 summary.unread_count,
890 summary.stream_ordering,
891 )
885892 for ((user_id, room_id), summary) in summaries.items()
886893 if summary.old_user_id is None
887894 ],
3838 from synapse.crypto.event_signing import compute_event_reference_hash
3939 from synapse.events import EventBase # noqa: F401
4040 from synapse.events.snapshot import EventContext # noqa: F401
41 from synapse.logging.utils import log_function
4241 from synapse.storage._base import db_to_json, make_in_list_sql_clause
4342 from synapse.storage.database import (
4443 DatabasePool,
6867 )
6968
7069
71 @attr.s(slots=True)
70 @attr.s(slots=True, auto_attribs=True)
7271 class DeltaState:
7372 """Deltas to use to update the `current_state_events` table.
7473
7978 should e.g. be removed from `current_state_events` table.
8079 """
8180
82 to_delete = attr.ib(type=List[Tuple[str, str]])
83 to_insert = attr.ib(type=StateMap[str])
84 no_longer_in_room = attr.ib(type=bool, default=False)
81 to_delete: List[Tuple[str, str]]
82 to_insert: StateMap[str]
83 no_longer_in_room: bool = False
8584
8685
8786 class PersistEventsStore:
327326
328327 return existing_prevs
329328
330 @log_function
331329 def _persist_events_txn(
332330 self,
333331 txn: LoggingTransaction,
441439 self.db_pool.simple_insert_many_txn(
442440 txn,
443441 table="event_auth",
442 keys=("event_id", "room_id", "auth_id"),
444443 values=[
445 {
446 "event_id": event.event_id,
447 "room_id": event.room_id,
448 "auth_id": auth_id,
449 }
444 (event.event_id, event.room_id, auth_id)
450445 for event in events
451446 for auth_id in event.auth_event_ids()
452447 if event.is_state()
674669 db_pool.simple_insert_many_txn(
675670 txn,
676671 table="event_auth_chains",
672 keys=("event_id", "chain_id", "sequence_number"),
677673 values=[
678 {"event_id": event_id, "chain_id": c_id, "sequence_number": seq}
674 (event_id, c_id, seq)
679675 for event_id, (c_id, seq) in new_chain_tuples.items()
680676 ],
681677 )
781777 db_pool.simple_insert_many_txn(
782778 txn,
783779 table="event_auth_chain_links",
780 keys=(
781 "origin_chain_id",
782 "origin_sequence_number",
783 "target_chain_id",
784 "target_sequence_number",
785 ),
784786 values=[
785 {
786 "origin_chain_id": source_id,
787 "origin_sequence_number": source_seq,
788 "target_chain_id": target_id,
789 "target_sequence_number": target_seq,
790 }
787 (source_id, source_seq, target_id, target_seq)
791788 for (
792789 source_id,
793790 source_seq,
942939 txn_id = getattr(event.internal_metadata, "txn_id", None)
943940 if token_id and txn_id:
944941 to_insert.append(
945 {
946 "event_id": event.event_id,
947 "room_id": event.room_id,
948 "user_id": event.sender,
949 "token_id": token_id,
950 "txn_id": txn_id,
951 "inserted_ts": self._clock.time_msec(),
952 }
942 (
943 event.event_id,
944 event.room_id,
945 event.sender,
946 token_id,
947 txn_id,
948 self._clock.time_msec(),
949 )
953950 )
954951
955952 if to_insert:
956953 self.db_pool.simple_insert_many_txn(
957954 txn,
958955 table="event_txn_id",
956 keys=(
957 "event_id",
958 "room_id",
959 "user_id",
960 "token_id",
961 "txn_id",
962 "inserted_ts",
963 ),
959964 values=to_insert,
960965 )
961966
11601165 self.db_pool.simple_insert_many_txn(
11611166 txn,
11621167 table="event_forward_extremities",
1168 keys=("event_id", "room_id"),
11631169 values=[
1164 {"event_id": ev_id, "room_id": room_id}
1170 (ev_id, room_id)
11651171 for room_id, new_extrem in new_forward_extremities.items()
11661172 for ev_id in new_extrem
11671173 ],
11731179 self.db_pool.simple_insert_many_txn(
11741180 txn,
11751181 table="stream_ordering_to_exterm",
1182 keys=("room_id", "event_id", "stream_ordering"),
11761183 values=[
1177 {
1178 "room_id": room_id,
1179 "event_id": event_id,
1180 "stream_ordering": max_stream_order,
1181 }
1184 (room_id, event_id, max_stream_order)
11821185 for room_id, new_extrem in new_forward_extremities.items()
11831186 for event_id in new_extrem
11841187 ],
12501253 for room_id, depth in depth_updates.items():
12511254 self._update_min_depth_for_room_txn(txn, room_id, depth)
12521255
1253 def _update_outliers_txn(self, txn, events_and_contexts):
1256 def _update_outliers_txn(
1257 self,
1258 txn: LoggingTransaction,
1259 events_and_contexts: List[Tuple[EventBase, EventContext]],
1260 ) -> List[Tuple[EventBase, EventContext]]:
12541261 """Update any outliers with new event info.
12551262
1256 This turns outliers into ex-outliers (unless the new event was
1257 rejected).
1263 This turns outliers into ex-outliers (unless the new event was rejected), and
1264 also removes any other events we have already seen from the list.
12581265
12591266 Args:
1260 txn (twisted.enterprise.adbapi.Connection): db connection
1261 events_and_contexts (list[(EventBase, EventContext)]): events
1262 we are persisting
1267 txn: db connection
1268 events_and_contexts: events we are persisting
12631269
12641270 Returns:
1265 list[(EventBase, EventContext)] new list, without events which
1266 are already in the events table.
1271 new list, without events which are already in the events table.
12671272 """
12681273 txn.execute(
12691274 "SELECT event_id, outlier FROM events WHERE event_id in (%s)"
12711276 [event.event_id for event, _ in events_and_contexts],
12721277 )
12731278
1274 have_persisted = {event_id: outlier for event_id, outlier in txn}
1279 have_persisted: Dict[str, bool] = {
1280 event_id: outlier for event_id, outlier in txn
1281 }
12751282
12761283 to_remove = set()
12771284 for event, context in events_and_contexts:
12811288 to_remove.add(event)
12821289
12831290 if context.rejected:
1284 # If the event is rejected then we don't care if the event
1285 # was an outlier or not.
1291 # If the incoming event is rejected then we don't care if the event
1292 # was an outlier or not - what we have is at least as good.
12861293 continue
12871294
12881295 outlier_persisted = have_persisted[event.event_id]
12891296 if not event.internal_metadata.is_outlier() and outlier_persisted:
12901297 # We received a copy of an event that we had already stored as
1291 # an outlier in the database. We now have some state at that
1298 # an outlier in the database. We now have some state at that event
12921299 # so we need to update the state_groups table with that state.
1300 #
1301 # Note that we do not update the stream_ordering of the event in this
1302 # scenario. XXX: does this cause bugs? It will mean we won't send such
1303 # events down /sync. In general they will be historical events, so that
1304 # doesn't matter too much, but that is not always the case.
1305
1306 logger.info("Updating state for ex-outlier event %s", event.event_id)
12931307
12941308 # insert into event_to_state_groups.
12951309 try:
13411355 d.pop("redacted_because", None)
13421356 return d
13431357
1344 self.db_pool.simple_insert_many_values_txn(
1358 self.db_pool.simple_insert_many_txn(
13451359 txn,
13461360 table="event_json",
13471361 keys=("event_id", "room_id", "internal_metadata", "json", "format_version"),
13571371 ),
13581372 )
13591373
1360 self.db_pool.simple_insert_many_values_txn(
1374 self.db_pool.simple_insert_many_txn(
13611375 txn,
13621376 table="events",
13631377 keys=(
14111425 )
14121426 txn.execute(sql + clause, [False] + args)
14131427
1414 self.db_pool.simple_insert_many_values_txn(
1428 self.db_pool.simple_insert_many_txn(
14151429 txn,
14161430 table="state_events",
14171431 keys=("event_id", "room_id", "type", "state_key"),
16211635 return self.db_pool.simple_insert_many_txn(
16221636 txn=txn,
16231637 table="event_labels",
1638 keys=("event_id", "label", "room_id", "topological_ordering"),
16241639 values=[
1625 {
1626 "event_id": event_id,
1627 "label": label,
1628 "room_id": room_id,
1629 "topological_ordering": topological_ordering,
1630 }
1631 for label in labels
1640 (event_id, label, room_id, topological_ordering) for label in labels
16321641 ],
16331642 )
16341643
16561665 vals = []
16571666 for event in events:
16581667 ref_alg, ref_hash_bytes = compute_event_reference_hash(event)
1659 vals.append(
1660 {
1661 "event_id": event.event_id,
1662 "algorithm": ref_alg,
1663 "hash": memoryview(ref_hash_bytes),
1664 }
1665 )
1668 vals.append((event.event_id, ref_alg, memoryview(ref_hash_bytes)))
16661669
16671670 self.db_pool.simple_insert_many_txn(
1668 txn, table="event_reference_hashes", values=vals
1671 txn,
1672 table="event_reference_hashes",
1673 keys=("event_id", "algorithm", "hash"),
1674 values=vals,
16691675 )
16701676
16711677 def _store_room_members_txn(
16881694 self.db_pool.simple_insert_many_txn(
16891695 txn,
16901696 table="room_memberships",
1697 keys=(
1698 "event_id",
1699 "user_id",
1700 "sender",
1701 "room_id",
1702 "membership",
1703 "display_name",
1704 "avatar_url",
1705 ),
16911706 values=[
1692 {
1693 "event_id": event.event_id,
1694 "user_id": event.state_key,
1695 "sender": event.user_id,
1696 "room_id": event.room_id,
1697 "membership": event.membership,
1698 "display_name": non_null_str_or_none(
1699 event.content.get("displayname")
1700 ),
1701 "avatar_url": non_null_str_or_none(event.content.get("avatar_url")),
1702 }
1707 (
1708 event.event_id,
1709 event.state_key,
1710 event.user_id,
1711 event.room_id,
1712 event.membership,
1713 non_null_str_or_none(event.content.get("displayname")),
1714 non_null_str_or_none(event.content.get("avatar_url")),
1715 )
17031716 for event in events
17041717 ],
17051718 )
17901803 txn.call_after(
17911804 self.store.get_thread_summary.invalidate, (parent_id, event.room_id)
17921805 )
1806 # It should be safe to only invalidate the cache if the user has not
1807 # previously participated in the thread, but that's difficult (and
1808 # potentially error-prone) so it is always invalidated.
1809 txn.call_after(
1810 self.store.get_thread_participated.invalidate,
1811 (parent_id, event.room_id, event.sender),
1812 )
17931813
17941814 def _handle_insertion_event(self, txn: LoggingTransaction, event: EventBase):
17951815 """Handles keeping track of insertion events and edges/connections.
21622182 self.db_pool.simple_insert_many_txn(
21632183 txn,
21642184 table="event_edges",
2185 keys=("event_id", "prev_event_id", "room_id", "is_state"),
21652186 values=[
2166 {
2167 "event_id": ev.event_id,
2168 "prev_event_id": e_id,
2169 "room_id": ev.room_id,
2170 "is_state": False,
2171 }
2187 (ev.event_id, e_id, ev.room_id, False)
21722188 for ev in events
21732189 for e_id in ev.prev_event_ids()
21742190 ],
22252241 )
22262242
22272243
2228 @attr.s(slots=True)
2244 @attr.s(slots=True, auto_attribs=True)
22292245 class _LinkMap:
22302246 """A helper type for tracking links between chains."""
22312247
22322248 # Stores the set of links as nested maps: source chain ID -> target chain ID
22332249 # -> source sequence number -> target sequence number.
2234 maps = attr.ib(type=Dict[int, Dict[int, Dict[int, int]]], factory=dict)
2250 maps: Dict[int, Dict[int, Dict[int, int]]] = attr.Factory(dict)
22352251
22362252 # Stores the links that have been added (with new set to true), as tuples of
22372253 # `(source chain ID, source sequence no, target chain ID, target sequence no.)`
2238 additions = attr.ib(type=Set[Tuple[int, int, int, int]], factory=set)
2254 additions: Set[Tuple[int, int, int, int]] = attr.Factory(set)
22392255
22402256 def add_link(
22412257 self,
6464 REPLACE_STREAM_ORDERING_COLUMN = "replace_stream_ordering_column"
6565
6666
67 @attr.s(slots=True, frozen=True)
67 @attr.s(slots=True, frozen=True, auto_attribs=True)
6868 class _CalculateChainCover:
6969 """Return value for _calculate_chain_cover_txn."""
7070
7171 # The last room_id/depth/stream processed.
72 room_id = attr.ib(type=str)
73 depth = attr.ib(type=int)
74 stream = attr.ib(type=int)
72 room_id: str
73 depth: int
74 stream: int
7575
7676 # Number of rows processed
77 processed_count = attr.ib(type=int)
77 processed_count: int
7878
7979 # Map from room_id to last depth/stream processed for each room that we have
8080 # processed all events for (i.e. the rooms we can flip the
8181 # `has_auth_chain_index` for)
82 finished_room_map = attr.ib(type=Dict[str, Tuple[int, int]])
82 finished_room_map: Dict[str, Tuple[int, int]]
8383
8484
8585 class EventsBackgroundUpdatesStore(SQLBaseStore):
683683 self.db_pool.simple_insert_many_txn(
684684 txn=txn,
685685 table="event_labels",
686 keys=("event_id", "label", "room_id", "topological_ordering"),
686687 values=[
687 {
688 "event_id": event_id,
689 "label": label,
690 "room_id": event_json["room_id"],
691 "topological_ordering": event_json["depth"],
692 }
688 (
689 event_id,
690 label,
691 event_json["room_id"],
692 event_json["depth"],
693 )
693694 for label in event_json["content"].get(
694695 EventContentFields.LABELS, []
695696 )
802803
803804 if not has_state:
804805 state_events.append(
805 {
806 "event_id": event.event_id,
807 "room_id": event.room_id,
808 "type": event.type,
809 "state_key": event.state_key,
810 }
806 (event.event_id, event.room_id, event.type, event.state_key)
811807 )
812808
813809 if not has_event_auth:
814810 # Old, dodgy, events may have duplicate auth events, which we
815811 # need to deduplicate as we have a unique constraint.
816812 for auth_id in set(event.auth_event_ids()):
817 auth_events.append(
818 {
819 "room_id": event.room_id,
820 "event_id": event.event_id,
821 "auth_id": auth_id,
822 }
823 )
813 auth_events.append((event.event_id, event.room_id, auth_id))
824814
825815 if state_events:
826816 await self.db_pool.simple_insert_many(
827817 table="state_events",
818 keys=("event_id", "room_id", "type", "state_key"),
828819 values=state_events,
829820 desc="_rejected_events_metadata_state_events",
830821 )
832823 if auth_events:
833824 await self.db_pool.simple_insert_many(
834825 table="event_auth",
826 keys=("event_id", "room_id", "auth_id"),
835827 values=auth_events,
836828 desc="_rejected_events_metadata_event_auth",
837829 )
128128 self.db_pool.simple_insert_many_txn(
129129 txn,
130130 table="presence_stream",
131 keys=(
132 "stream_id",
133 "user_id",
134 "state",
135 "last_active_ts",
136 "last_federation_update_ts",
137 "last_user_sync_ts",
138 "status_msg",
139 "currently_active",
140 "instance_name",
141 ),
131142 values=[
132 {
133 "stream_id": stream_id,
134 "user_id": state.user_id,
135 "state": state.state,
136 "last_active_ts": state.last_active_ts,
137 "last_federation_update_ts": state.last_federation_update_ts,
138 "last_user_sync_ts": state.last_user_sync_ts,
139 "status_msg": state.status_msg,
140 "currently_active": state.currently_active,
141 "instance_name": self._instance_name,
142 }
143 (
144 stream_id,
145 state.user_id,
146 state.state,
147 state.last_active_ts,
148 state.last_federation_update_ts,
149 state.last_user_sync_ts,
150 state.status_msg,
151 state.currently_active,
152 self._instance_name,
153 )
143154 for stream_id, state in zip(stream_orderings, presence_states)
144155 ],
145156 )
560560 self.db_pool.simple_insert_many_txn(
561561 txn,
562562 table="deleted_pushers",
563 keys=("stream_id", "app_id", "pushkey", "user_id"),
563564 values=[
564 {
565 "stream_id": stream_id,
566 "app_id": pusher.app_id,
567 "pushkey": pusher.pushkey,
568 "user_id": user_id,
569 }
565 (stream_id, pusher.app_id, pusher.pushkey, user_id)
570566 for stream_id, pusher in zip(stream_ids, pushers)
571567 ],
572568 )
5050 pass
5151
5252
53 @attr.s(frozen=True, slots=True)
53 @attr.s(frozen=True, slots=True, auto_attribs=True)
5454 class TokenLookupResult:
5555 """Result of looking up an access token.
5656
6868 cached.
6969 """
7070
71 user_id = attr.ib(type=str)
72 is_guest = attr.ib(type=bool, default=False)
73 shadow_banned = attr.ib(type=bool, default=False)
74 token_id = attr.ib(type=Optional[int], default=None)
75 device_id = attr.ib(type=Optional[str], default=None)
76 valid_until_ms = attr.ib(type=Optional[int], default=None)
77 token_owner = attr.ib(type=str)
78 token_used = attr.ib(type=bool, default=False)
71 user_id: str
72 is_guest: bool = False
73 shadow_banned: bool = False
74 token_id: Optional[int] = None
75 device_id: Optional[str] = None
76 valid_until_ms: Optional[int] = None
77 token_owner: str = attr.ib()
78 token_used: bool = False
7979
8080 # Make the token owner default to the user ID, which is the common case.
8181 @token_owner.default
1212 # limitations under the License.
1313
1414 import logging
15 from typing import List, Optional, Tuple, Union, cast
15 from typing import (
16 TYPE_CHECKING,
17 Any,
18 Dict,
19 Iterable,
20 List,
21 Optional,
22 Tuple,
23 Union,
24 cast,
25 )
1626
1727 import attr
18
19 from synapse.api.constants import RelationTypes
28 from frozendict import frozendict
29
30 from synapse.api.constants import EventTypes, RelationTypes
2031 from synapse.events import EventBase
2132 from synapse.storage._base import SQLBaseStore
22 from synapse.storage.database import LoggingTransaction, make_in_list_sql_clause
33 from synapse.storage.database import (
34 DatabasePool,
35 LoggingDatabaseConnection,
36 LoggingTransaction,
37 make_in_list_sql_clause,
38 )
2339 from synapse.storage.databases.main.stream import generate_pagination_where_clause
2440 from synapse.storage.relations import (
2541 AggregationPaginationToken,
2844 )
2945 from synapse.util.caches.descriptors import cached
3046
47 if TYPE_CHECKING:
48 from synapse.server import HomeServer
49
3150 logger = logging.getLogger(__name__)
3251
3352
3453 class RelationsWorkerStore(SQLBaseStore):
54 def __init__(
55 self,
56 database: DatabasePool,
57 db_conn: LoggingDatabaseConnection,
58 hs: "HomeServer",
59 ):
60 super().__init__(database, db_conn, hs)
61
62 self._msc1849_enabled = hs.config.experimental.msc1849_enabled
63 self._msc3440_enabled = hs.config.experimental.msc3440_enabled
64
3565 @cached(tree=True)
3666 async def get_relations_for_event(
3767 self,
353383 async def get_thread_summary(
354384 self, event_id: str, room_id: str
355385 ) -> Tuple[int, Optional[EventBase]]:
356 """Get the number of threaded replies, the senders of those replies, and
357 the latest reply (if any) for the given event.
386 """Get the number of threaded replies and the latest reply (if any) for the given event.
358387
359388 Args:
360389 event_id: Summarize the thread related to this event ID.
367396 def _get_thread_summary_txn(
368397 txn: LoggingTransaction,
369398 ) -> Tuple[int, Optional[str]]:
370 # Fetch the count of threaded events and the latest event ID.
399 # Fetch the latest event ID in the thread.
371400 # TODO Should this only allow m.room.message events.
372401 sql = """
373402 SELECT event_id
388417
389418 latest_event_id = row[0]
390419
420 # Fetch the number of threaded replies.
391421 sql = """
392422 SELECT COUNT(event_id)
393423 FROM event_relations
411441 latest_event = await self.get_event(latest_event_id, allow_none=True) # type: ignore[attr-defined]
412442
413443 return count, latest_event
444
445 @cached()
446 async def get_thread_participated(
447 self, event_id: str, room_id: str, user_id: str
448 ) -> bool:
449 """Get whether the requesting user participated in a thread.
450
451 This is separate from get_thread_summary since that can be cached across
452 all users while this value is specific to the requeser.
453
454 Args:
455 event_id: The thread related to this event ID.
456 room_id: The room the event belongs to.
457 user_id: The user requesting the summary.
458
459 Returns:
460 True if the requesting user participated in the thread, otherwise false.
461 """
462
463 def _get_thread_summary_txn(txn: LoggingTransaction) -> bool:
464 # Fetch whether the requester has participated or not.
465 sql = """
466 SELECT 1
467 FROM event_relations
468 INNER JOIN events USING (event_id)
469 WHERE
470 relates_to_id = ?
471 AND room_id = ?
472 AND relation_type = ?
473 AND sender = ?
474 """
475
476 txn.execute(sql, (event_id, room_id, RelationTypes.THREAD, user_id))
477 return bool(txn.fetchone())
478
479 return await self.db_pool.runInteraction(
480 "get_thread_summary", _get_thread_summary_txn
481 )
414482
415483 async def events_have_relations(
416484 self,
514582 "get_if_user_has_annotated_event", _get_if_user_has_annotated_event
515583 )
516584
585 async def _get_bundled_aggregation_for_event(
586 self, event: EventBase, user_id: str
587 ) -> Optional[Dict[str, Any]]:
588 """Generate bundled aggregations for an event.
589
590 Note that this does not use a cache, but depends on cached methods.
591
592 Args:
593 event: The event to calculate bundled aggregations for.
594 user_id: The user requesting the bundled aggregations.
595
596 Returns:
597 The bundled aggregations for an event, if bundled aggregations are
598 enabled and the event can have bundled aggregations.
599 """
600 # State events and redacted events do not get bundled aggregations.
601 if event.is_state() or event.internal_metadata.is_redacted():
602 return None
603
604 # Do not bundle aggregations for an event which represents an edit or an
605 # annotation. It does not make sense for them to have related events.
606 relates_to = event.content.get("m.relates_to")
607 if isinstance(relates_to, (dict, frozendict)):
608 relation_type = relates_to.get("rel_type")
609 if relation_type in (RelationTypes.ANNOTATION, RelationTypes.REPLACE):
610 return None
611
612 event_id = event.event_id
613 room_id = event.room_id
614
615 # The bundled aggregations to include, a mapping of relation type to a
616 # type-specific value. Some types include the direct return type here
617 # while others need more processing during serialization.
618 aggregations: Dict[str, Any] = {}
619
620 annotations = await self.get_aggregation_groups_for_event(event_id, room_id)
621 if annotations.chunk:
622 aggregations[RelationTypes.ANNOTATION] = annotations.to_dict()
623
624 references = await self.get_relations_for_event(
625 event_id, room_id, RelationTypes.REFERENCE, direction="f"
626 )
627 if references.chunk:
628 aggregations[RelationTypes.REFERENCE] = references.to_dict()
629
630 edit = None
631 if event.type == EventTypes.Message:
632 edit = await self.get_applicable_edit(event_id, room_id)
633
634 if edit:
635 aggregations[RelationTypes.REPLACE] = edit
636
637 # If this event is the start of a thread, include a summary of the replies.
638 if self._msc3440_enabled:
639 thread_count, latest_thread_event = await self.get_thread_summary(
640 event_id, room_id
641 )
642 participated = await self.get_thread_participated(
643 event_id, room_id, user_id
644 )
645 if latest_thread_event:
646 aggregations[RelationTypes.THREAD] = {
647 "latest_event": latest_thread_event,
648 "count": thread_count,
649 "current_user_participated": participated,
650 }
651
652 # Store the bundled aggregations in the event metadata for later use.
653 return aggregations
654
655 async def get_bundled_aggregations(
656 self,
657 events: Iterable[EventBase],
658 user_id: str,
659 ) -> Dict[str, Dict[str, Any]]:
660 """Generate bundled aggregations for events.
661
662 Args:
663 events: The iterable of events to calculate bundled aggregations for.
664 user_id: The user requesting the bundled aggregations.
665
666 Returns:
667 A map of event ID to the bundled aggregation for the event. Not all
668 events may have bundled aggregations in the results.
669 """
670 # If bundled aggregations are disabled, nothing to do.
671 if not self._msc1849_enabled:
672 return {}
673
674 # TODO Parallelize.
675 results = {}
676 for event in events:
677 event_result = await self._get_bundled_aggregation_for_event(event, user_id)
678 if event_result is not None:
679 results[event.event_id] = event_result
680
681 return results
682
517683
518684 class RelationsStore(RelationsWorkerStore):
519685 pass
550550 FROM room_stats_state state
551551 INNER JOIN room_stats_current curr USING (room_id)
552552 INNER JOIN rooms USING (room_id)
553 %s
554 ORDER BY %s %s
553 {where}
554 ORDER BY {order_by} {direction}, state.room_id {direction}
555555 LIMIT ?
556556 OFFSET ?
557 """ % (
558 where_statement,
559 order_by_column,
560 "ASC" if order_by_asc else "DESC",
557 """.format(
558 where=where_statement,
559 order_by=order_by_column,
560 direction="ASC" if order_by_asc else "DESC",
561561 )
562562
563563 # Use a nested SELECT statement as SQL can't count(*) with an OFFSET
564564 count_sql = """
565565 SELECT count(*) FROM (
566566 SELECT room_id FROM room_stats_state state
567 %s
567 {where}
568568 ) AS get_room_ids
569 """ % (
570 where_statement,
569 """.format(
570 where=where_statement,
571571 )
572572
573573 def _get_rooms_paginate_txn(
11761176 await self.db_pool.runInteraction("forget_membership", f)
11771177
11781178
1179 @attr.s(slots=True)
1179 @attr.s(slots=True, auto_attribs=True)
11801180 class _JoinedHostsCache:
11811181 """The cached data used by the `_get_joined_hosts_cache`."""
11821182
11831183 # Dict of host to the set of their users in the room at the state group.
1184 hosts_to_joined_users = attr.ib(type=Dict[str, Set[str]], factory=dict)
1184 hosts_to_joined_users: Dict[str, Set[str]] = attr.Factory(dict)
11851185
11861186 # The state group `hosts_to_joined_users` is derived from. Will be an object
11871187 # if the instance is newly created or if the state is not based on a state
11881188 # group. (An object is used as a sentinel value to ensure that it never is
11891189 # equal to anything else).
1190 state_group = attr.ib(type=Union[object, int], factory=object)
1190 state_group: Union[object, int] = attr.Factory(object)
11911191
11921192 def __len__(self):
11931193 return sum(len(v) for v in self.hosts_to_joined_users.values())
0 # -*- coding: utf-8 -*-
10 # Copyright 2021 The Matrix.org Foundation C.I.C.
21 #
32 # Licensed under the Apache License, Version 2.0 (the "License");
559559 return await self.db_pool.runInteraction(
560560 "get_destinations_paginate_txn", get_destinations_paginate_txn
561561 )
562
563 async def is_destination_known(self, destination: str) -> bool:
564 """Check if a destination is known to the server."""
565 result = await self.db_pool.simple_select_one_onecol(
566 table="destinations",
567 keyvalues={"destination": destination},
568 retcol="1",
569 allow_none=True,
570 desc="is_destination_known",
571 )
572 return bool(result)
2222 from synapse.util import json_encoder, stringutils
2323
2424
25 @attr.s(slots=True)
25 @attr.s(slots=True, auto_attribs=True)
2626 class UIAuthSessionData:
27 session_id = attr.ib(type=str)
27 session_id: str
2828 # The dictionary from the client root level, not the 'auth' key.
29 clientdict = attr.ib(type=JsonDict)
29 clientdict: JsonDict
3030 # The URI and method the session was intiatied with. These are checked at
3131 # each stage of the authentication to ensure that the asked for operation
3232 # has not changed.
33 uri = attr.ib(type=str)
34 method = attr.ib(type=str)
33 uri: str
34 method: str
3535 # A string description of the operation that the current authentication is
3636 # authorising.
37 description = attr.ib(type=str)
37 description: str
3838
3939
4040 class UIAuthWorkerStore(SQLBaseStore):
104104 GROUP BY room_id
105105 """
106106 txn.execute(sql)
107 rooms = [{"room_id": x[0], "events": x[1]} for x in txn.fetchall()]
108 self.db_pool.simple_insert_many_txn(txn, TEMP_TABLE + "_rooms", rooms)
107 rooms = list(txn.fetchall())
108 self.db_pool.simple_insert_many_txn(
109 txn, TEMP_TABLE + "_rooms", keys=("room_id", "events"), values=rooms
110 )
109111 del rooms
110112
111113 sql = (
116118 txn.execute(sql)
117119
118120 txn.execute("SELECT name FROM users")
119 users = [{"user_id": x[0]} for x in txn.fetchall()]
120
121 self.db_pool.simple_insert_many_txn(txn, TEMP_TABLE + "_users", users)
121 users = list(txn.fetchall())
122
123 self.db_pool.simple_insert_many_txn(
124 txn, TEMP_TABLE + "_users", keys=("user_id",), values=users
125 )
122126
123127 new_pos = await self.get_max_stream_id_in_current_state_deltas()
124128 await self.db_pool.runInteraction(
326326 self.db_pool.simple_insert_many_txn(
327327 txn,
328328 table="state_groups_state",
329 keys=(
330 "state_group",
331 "room_id",
332 "type",
333 "state_key",
334 "event_id",
335 ),
329336 values=[
330 {
331 "state_group": state_group,
332 "room_id": room_id,
333 "type": key[0],
334 "state_key": key[1],
335 "event_id": state_id,
336 }
337 (state_group, room_id, key[0], key[1], state_id)
337338 for key, state_id in delta_state.items()
338339 ],
339340 )
459459 self.db_pool.simple_insert_many_txn(
460460 txn,
461461 table="state_groups_state",
462 keys=("state_group", "room_id", "type", "state_key", "event_id"),
462463 values=[
463 {
464 "state_group": state_group,
465 "room_id": room_id,
466 "type": key[0],
467 "state_key": key[1],
468 "event_id": state_id,
469 }
464 (state_group, room_id, key[0], key[1], state_id)
470465 for key, state_id in delta_ids.items()
471466 ],
472467 )
474469 self.db_pool.simple_insert_many_txn(
475470 txn,
476471 table="state_groups_state",
472 keys=("state_group", "room_id", "type", "state_key", "event_id"),
477473 values=[
478 {
479 "state_group": state_group,
480 "room_id": room_id,
481 "type": key[0],
482 "state_key": key[1],
483 "event_id": state_id,
484 }
474 (state_group, room_id, key[0], key[1], state_id)
485475 for key, state_id in current_state_ids.items()
486476 ],
487477 )
588578 self.db_pool.simple_insert_many_txn(
589579 txn,
590580 table="state_groups_state",
581 keys=("state_group", "room_id", "type", "state_key", "event_id"),
591582 values=[
592 {
593 "state_group": sg,
594 "room_id": room_id,
595 "type": key[0],
596 "state_key": key[1],
597 "event_id": state_id,
598 }
583 (sg, room_id, key[0], key[1], state_id)
599584 for key, state_id in curr_state.items()
600585 ],
601586 )
2020 logger = logging.getLogger(__name__)
2121
2222
23 @attr.s(slots=True, frozen=True)
23 @attr.s(slots=True, frozen=True, auto_attribs=True)
2424 class FetchKeyResult:
25 verify_key = attr.ib(type=VerifyKey) # the key itself
26 valid_until_ts = attr.ib(type=int) # how long we can use this key for
25 verify_key: VerifyKey # the key itself
26 valid_until_ts: int # how long we can use this key for
695695 )
696696
697697
698 @attr.s(slots=True)
698 @attr.s(slots=True, auto_attribs=True)
699699 class _DirectoryListing:
700700 """Helper class to store schema file name and the
701701 absolute path to it.
704704 `file_name` attr is kept first.
705705 """
706706
707 file_name = attr.ib(type=str)
708 absolute_path = attr.ib(type=str)
707 file_name: str
708 absolute_path: str
2222 logger = logging.getLogger(__name__)
2323
2424
25 @attr.s(slots=True)
25 @attr.s(slots=True, auto_attribs=True)
2626 class PaginationChunk:
2727 """Returned by relation pagination APIs.
2828
3434 None then there are no previous results.
3535 """
3636
37 chunk = attr.ib(type=List[JsonDict])
38 next_batch = attr.ib(type=Optional[Any], default=None)
39 prev_batch = attr.ib(type=Optional[Any], default=None)
37 chunk: List[JsonDict]
38 next_batch: Optional[Any] = None
39 prev_batch: Optional[Any] = None
4040
4141 def to_dict(self) -> Dict[str, Any]:
4242 d = {"chunk": self.chunk}
5050 return d
5151
5252
53 @attr.s(frozen=True, slots=True)
53 @attr.s(frozen=True, slots=True, auto_attribs=True)
5454 class RelationPaginationToken:
5555 """Pagination token for relation pagination API.
5656
6363 stream: The stream ordering of the boundary event.
6464 """
6565
66 topological = attr.ib(type=int)
67 stream = attr.ib(type=int)
66 topological: int
67 stream: int
6868
6969 @staticmethod
7070 def from_string(string: str) -> "RelationPaginationToken":
8181 return attr.astuple(self)
8282
8383
84 @attr.s(frozen=True, slots=True)
84 @attr.s(frozen=True, slots=True, auto_attribs=True)
8585 class AggregationPaginationToken:
8686 """Pagination token for relation aggregation pagination API.
8787
9393 stream: The MAX stream ordering in the boundary group.
9494 """
9595
96 count = attr.ib(type=int)
97 stream = attr.ib(type=int)
96 count: int
97 stream: int
9898
9999 @staticmethod
100100 def from_string(string: str) -> "AggregationPaginationToken":
4444 T = TypeVar("T")
4545
4646
47 @attr.s(slots=True, frozen=True)
47 @attr.s(slots=True, frozen=True, auto_attribs=True)
4848 class StateFilter:
4949 """A filter used when querying for state.
5050
5757 appear in `types`.
5858 """
5959
60 types = attr.ib(type="frozendict[str, Optional[FrozenSet[str]]]")
61 include_others = attr.ib(default=False, type=bool)
60 types: "frozendict[str, Optional[FrozenSet[str]]]"
61 include_others: bool = False
6262
6363 def __attrs_post_init__(self):
6464 # If `include_others` is set we canonicalise the filter by removing
761761 return self.inner.__exit__(exc_type, exc, tb)
762762
763763
764 @attr.s(slots=True)
764 @attr.s(slots=True, auto_attribs=True)
765765 class _MultiWriterCtxManager:
766766 """Async context manager returned by MultiWriterIdGenerator"""
767767
768 id_gen = attr.ib(type=MultiWriterIdGenerator)
769 multiple_ids = attr.ib(type=Optional[int], default=None)
770 stream_ids = attr.ib(type=List[int], factory=list)
768 id_gen: MultiWriterIdGenerator
769 multiple_ids: Optional[int] = None
770 stream_ids: List[int] = attr.Factory(list)
771771
772772 async def __aenter__(self) -> Union[int, List[int]]:
773773 # It's safe to run this in autocommit mode as fetching values from a
2727 MAX_LIMIT = 1000
2828
2929
30 @attr.s(slots=True)
30 @attr.s(slots=True, auto_attribs=True)
3131 class PaginationConfig:
3232 """A configuration object which stores pagination parameters."""
3333
34 from_token = attr.ib(type=Optional[StreamToken])
35 to_token = attr.ib(type=Optional[StreamToken])
36 direction = attr.ib(type=str)
37 limit = attr.ib(type=Optional[int])
34 from_token: Optional[StreamToken]
35 to_token: Optional[StreamToken]
36 direction: str
37 limit: Optional[int]
3838
3939 @classmethod
4040 async def from_request(
1919 Any,
2020 ClassVar,
2121 Dict,
22 List,
2223 Mapping,
24 Match,
2325 MutableMapping,
2426 Optional,
2527 Tuple,
7880 """The interfaces necessary for Synapse to function."""
7981
8082
81 @attr.s(frozen=True, slots=True)
83 @attr.s(frozen=True, slots=True, auto_attribs=True)
8284 class Requester:
8385 """
8486 Represents the user making a request
9698 "puppeting" the user.
9799 """
98100
99 user = attr.ib(type="UserID")
100 access_token_id = attr.ib(type=Optional[int])
101 is_guest = attr.ib(type=bool)
102 shadow_banned = attr.ib(type=bool)
103 device_id = attr.ib(type=Optional[str])
104 app_service = attr.ib(type=Optional["ApplicationService"])
105 authenticated_entity = attr.ib(type=str)
101 user: "UserID"
102 access_token_id: Optional[int]
103 is_guest: bool
104 shadow_banned: bool
105 device_id: Optional[str]
106 app_service: Optional["ApplicationService"]
107 authenticated_entity: str
106108
107109 def serialize(self):
108110 """Converts self to a type that can be serialized as JSON, and then
209211 DS = TypeVar("DS", bound="DomainSpecificString")
210212
211213
212 @attr.s(slots=True, frozen=True, repr=False)
214 @attr.s(slots=True, frozen=True, repr=False, auto_attribs=True)
213215 class DomainSpecificString(metaclass=abc.ABCMeta):
214216 """Common base class among ID/name strings that have a local part and a
215217 domain name, prefixed with a sigil.
222224
223225 SIGIL: ClassVar[str] = abc.abstractproperty() # type: ignore
224226
225 localpart = attr.ib(type=str)
226 domain = attr.ib(type=str)
227 localpart: str
228 domain: str
227229
228230 # Because this is a frozen class, it is deeply immutable.
229231 def __copy__(self):
379381 onto different mxids
380382
381383 Returns:
382 unicode: string suitable for a mxid localpart
384 string suitable for a mxid localpart
383385 """
384386 if not isinstance(username, bytes):
385387 username = username.encode("utf-8")
387389 # first we sort out upper-case characters
388390 if case_sensitive:
389391
390 def f1(m):
392 def f1(m: Match[bytes]) -> bytes:
391393 return b"_" + m.group().lower()
392394
393395 username = UPPER_CASE_PATTERN.sub(f1, username)
394396 else:
395397 username = username.lower()
396398
397 # then we sort out non-ascii characters
398 def f2(m):
399 g = m.group()[0]
400 if isinstance(g, str):
401 # on python 2, we need to do a ord(). On python 3, the
402 # byte itself will do.
403 g = ord(g)
404 return b"=%02x" % (g,)
399 # then we sort out non-ascii characters by converting to the hex equivalent.
400 def f2(m: Match[bytes]) -> bytes:
401 return b"=%02x" % (m.group()[0],)
405402
406403 username = NON_MXID_CHARACTER_PATTERN.sub(f2, username)
407404
408405 # we also do the =-escaping to mxids starting with an underscore.
409406 username = re.sub(b"^_", b"=5f", username)
410407
411 # we should now only have ascii bytes left, so can decode back to a
412 # unicode.
408 # we should now only have ascii bytes left, so can decode back to a string.
413409 return username.decode("ascii")
414410
415411
465461 attributes, must be hashable.
466462 """
467463
468 topological = attr.ib(
469 type=Optional[int],
464 topological: Optional[int] = attr.ib(
470465 validator=attr.validators.optional(attr.validators.instance_of(int)),
471466 )
472 stream = attr.ib(type=int, validator=attr.validators.instance_of(int))
473
474 instance_map = attr.ib(
475 type="frozendict[str, int]",
467 stream: int = attr.ib(validator=attr.validators.instance_of(int))
468
469 instance_map: "frozendict[str, int]" = attr.ib(
476470 factory=frozendict,
477471 validator=attr.validators.deep_mapping(
478472 key_validator=attr.validators.instance_of(str),
481475 ),
482476 )
483477
484 def __attrs_post_init__(self):
478 def __attrs_post_init__(self) -> None:
485479 """Validates that both `topological` and `instance_map` aren't set."""
486480
487481 if self.instance_map and self.topological:
597591 return "s%d" % (self.stream,)
598592
599593
600 @attr.s(slots=True, frozen=True)
594 @attr.s(slots=True, frozen=True, auto_attribs=True)
601595 class StreamToken:
602596 """A collection of positions within multiple streams.
603597
605599 must be hashable.
606600 """
607601
608 room_key = attr.ib(
609 type=RoomStreamToken, validator=attr.validators.instance_of(RoomStreamToken)
602 room_key: RoomStreamToken = attr.ib(
603 validator=attr.validators.instance_of(RoomStreamToken)
610604 )
611 presence_key = attr.ib(type=int)
612 typing_key = attr.ib(type=int)
613 receipt_key = attr.ib(type=int)
614 account_data_key = attr.ib(type=int)
615 push_rules_key = attr.ib(type=int)
616 to_device_key = attr.ib(type=int)
617 device_list_key = attr.ib(type=int)
618 groups_key = attr.ib(type=int)
605 presence_key: int
606 typing_key: int
607 receipt_key: int
608 account_data_key: int
609 push_rules_key: int
610 to_device_key: int
611 device_list_key: int
612 groups_key: int
619613
620614 _SEPARATOR = "_"
621 START: "StreamToken"
615 START: ClassVar["StreamToken"]
622616
623617 @classmethod
624618 async def from_string(cls, store: "DataStore", string: str) -> "StreamToken":
678672 StreamToken.START = StreamToken(RoomStreamToken(None, 0), 0, 0, 0, 0, 0, 0, 0, 0)
679673
680674
681 @attr.s(slots=True, frozen=True)
675 @attr.s(slots=True, frozen=True, auto_attribs=True)
682676 class PersistedEventPosition:
683677 """Position of a newly persisted event with instance that persisted it.
684678
686680 RoomStreamToken.
687681 """
688682
689 instance_name = attr.ib(type=str)
690 stream = attr.ib(type=int)
683 instance_name: str
684 stream: int
691685
692686 def persisted_after(self, token: RoomStreamToken) -> bool:
693687 return token.get_stream_pos_for_instance(self.instance_name) < self.stream
737731 __str__ = to_string
738732
739733
740 @attr.s(slots=True)
734 @attr.s(slots=True, frozen=True, auto_attribs=True)
741735 class ReadReceipt:
742736 """Information about a read-receipt"""
743737
744 room_id = attr.ib()
745 receipt_type = attr.ib()
746 user_id = attr.ib()
747 event_ids = attr.ib()
748 data = attr.ib()
738 room_id: str
739 receipt_type: str
740 user_id: str
741 event_ids: List[str]
742 data: JsonDict
749743
750744
751745 def get_verify_key_from_cross_signing_key(key_info):
308308 return deferred.addCallback(tuple)
309309
310310
311 @attr.s(slots=True)
311 @attr.s(slots=True, auto_attribs=True)
312312 class _LinearizerEntry:
313313 # The number of things executing.
314 count = attr.ib(type=int)
314 count: int
315315 # Deferreds for the things blocked from executing.
316 deferreds = attr.ib(type=collections.OrderedDict)
316 deferreds: collections.OrderedDict
317317
318318
319319 class Linearizer:
3232
3333 # This class can't be generic because it uses slots with attrs.
3434 # See: https://github.com/python-attrs/attrs/issues/313
35 @attr.s(slots=True)
35 @attr.s(slots=True, auto_attribs=True)
3636 class DictionaryEntry: # should be: Generic[DKT, DV].
3737 """Returned when getting an entry from the cache
3838
4040 full: Whether the cache has the full or dict or just some keys.
4141 If not full then not all requested keys will necessarily be present
4242 in `value`
43 known_absent: Keys that were looked up in the dict and were not
44 there.
43 known_absent: Keys that were looked up in the dict and were not there.
4544 value: The full or partial dict value
4645 """
4746
48 full = attr.ib(type=bool)
49 known_absent = attr.ib(type=Set[Any]) # should be: Set[DKT]
50 value = attr.ib(type=Dict[Any, Any]) # should be: Dict[DKT, DV]
47 full: bool
48 known_absent: Set[Any] # should be: Set[DKT]
49 value: Dict[Any, Any] # should be: Dict[DKT, DV]
5150
5251 def __len__(self) -> int:
5352 return len(self.value)
273273 self.assertEquals(failure.value.code, 400)
274274 self.assertEquals(failure.value.errcode, Codes.EXCLUSIVE)
275275
276 def test_get_user_by_req__puppeted_token__not_tracking_puppeted_mau(self):
277 self.store.get_user_by_access_token = simple_async_mock(
278 TokenLookupResult(
279 user_id="@baldrick:matrix.org",
280 device_id="device",
281 token_owner="@admin:matrix.org",
282 )
283 )
284 self.store.insert_client_ip = simple_async_mock(None)
285 request = Mock(args={})
286 request.getClientIP.return_value = "127.0.0.1"
287 request.args[b"access_token"] = [self.test_token]
288 request.requestHeaders.getRawHeaders = mock_getRawHeaders()
289 self.get_success(self.auth.get_user_by_req(request))
290 self.store.insert_client_ip.assert_called_once()
291
292 def test_get_user_by_req__puppeted_token__tracking_puppeted_mau(self):
293 self.auth._track_puppeted_user_ips = True
294 self.store.get_user_by_access_token = simple_async_mock(
295 TokenLookupResult(
296 user_id="@baldrick:matrix.org",
297 device_id="device",
298 token_owner="@admin:matrix.org",
299 )
300 )
301 self.store.insert_client_ip = simple_async_mock(None)
302 request = Mock(args={})
303 request.getClientIP.return_value = "127.0.0.1"
304 request.args[b"access_token"] = [self.test_token]
305 request.requestHeaders.getRawHeaders = mock_getRawHeaders()
306 self.get_success(self.auth.get_user_by_req(request))
307 self.assertEquals(self.store.insert_client_ip.call_count, 2)
308
276309 def test_get_user_from_macaroon(self):
277310 self.store.get_user_by_access_token = simple_async_mock(
278311 TokenLookupResult(user_id="@baldrick:matrix.org", device_id="device")
1212 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1313 # See the License for the specific language governing permissions and
1414 # limitations under the License.
15 from typing import Iterable
1516 from unittest import mock
1617
18 from parameterized import parameterized
1719 from signedjson import key as key, sign as sign
1820
1921 from twisted.internet import defer
2224 from synapse.api.errors import Codes, SynapseError
2325
2426 from tests import unittest
27 from tests.test_utils import make_awaitable
2528
2629
2730 class E2eKeysHandlerTestCase(unittest.HomeserverTestCase):
764767 remote_user_id = "@test:other"
765768 local_user_id = "@test:test"
766769
770 # Pretend we're sharing a room with the user we're querying. If not,
771 # `_query_devices_for_destination` will return early.
767772 self.store.get_rooms_for_user = mock.Mock(
768773 return_value=defer.succeed({"some_room_id"})
769774 )
830835 }
831836 },
832837 )
838
839 @parameterized.expand(
840 [
841 # The remote homeserver's response indicates that this user has 0/1/2 devices.
842 ([],),
843 (["device_1"],),
844 (["device_1", "device_2"],),
845 ]
846 )
847 def test_query_all_devices_caches_result(self, device_ids: Iterable[str]):
848 """Test that requests for all of a remote user's devices are cached.
849
850 We do this by asserting that only one call over federation was made, and that
851 the two queries to the local homeserver produce the same response.
852 """
853 local_user_id = "@test:test"
854 remote_user_id = "@test:other"
855 request_body = {"device_keys": {remote_user_id: []}}
856
857 response_devices = [
858 {
859 "device_id": device_id,
860 "keys": {
861 "algorithms": ["dummy"],
862 "device_id": device_id,
863 "keys": {f"dummy:{device_id}": "dummy"},
864 "signatures": {device_id: {f"dummy:{device_id}": "dummy"}},
865 "unsigned": {},
866 "user_id": "@test:other",
867 },
868 }
869 for device_id in device_ids
870 ]
871
872 response_body = {
873 "devices": response_devices,
874 "user_id": remote_user_id,
875 "stream_id": 12345, # an integer, according to the spec
876 }
877
878 e2e_handler = self.hs.get_e2e_keys_handler()
879
880 # Pretend we're sharing a room with the user we're querying. If not,
881 # `_query_devices_for_destination` will return early.
882 mock_get_rooms = mock.patch.object(
883 self.store,
884 "get_rooms_for_user",
885 new_callable=mock.MagicMock,
886 return_value=make_awaitable(["some_room_id"]),
887 )
888 mock_request = mock.patch.object(
889 self.hs.get_federation_client(),
890 "query_user_devices",
891 new_callable=mock.MagicMock,
892 return_value=make_awaitable(response_body),
893 )
894
895 with mock_get_rooms, mock_request as mocked_federation_request:
896 # Make the first query and sanity check it succeeds.
897 response_1 = self.get_success(
898 e2e_handler.query_devices(
899 request_body,
900 timeout=10,
901 from_user_id=local_user_id,
902 from_device_id="some_device_id",
903 )
904 )
905 self.assertEqual(response_1["failures"], {})
906
907 # We should have made a federation request to do so.
908 mocked_federation_request.assert_called_once()
909
910 # Reset the mock so we can prove we don't make a second federation request.
911 mocked_federation_request.reset_mock()
912
913 # Repeat the query.
914 response_2 = self.get_success(
915 e2e_handler.query_devices(
916 request_body,
917 timeout=10,
918 from_user_id=local_user_id,
919 from_device_id="some_device_id",
920 )
921 )
922 self.assertEqual(response_2["failures"], {})
923
924 # We should not have made a second federation request.
925 mocked_federation_request.assert_not_called()
926
927 # The two requests to the local homeserver should be identical.
928 self.assertEqual(response_1, response_2)
2121 import synapse
2222 from synapse.handlers.auth import load_legacy_password_auth_providers
2323 from synapse.module_api import ModuleApi
24 from synapse.rest.client import devices, login
24 from synapse.rest.client import devices, login, logout
2525 from synapse.types import JsonDict
2626
2727 from tests import unittest
154154 synapse.rest.admin.register_servlets,
155155 login.register_servlets,
156156 devices.register_servlets,
157 logout.register_servlets,
157158 ]
158159
159160 def setUp(self):
717718 # ("unknown login type")
718719 channel = self._send_password_login("localuser", "localpass")
719720 self.assertEqual(channel.code, 400, channel.result)
721
722 def test_on_logged_out(self):
723 """Tests that the on_logged_out callback is called when the user logs out."""
724 self.register_user("rin", "password")
725 tok = self.login("rin", "password")
726
727 self.called = False
728
729 async def on_logged_out(user_id, device_id, access_token):
730 self.called = True
731
732 on_logged_out = Mock(side_effect=on_logged_out)
733 self.hs.get_password_auth_provider().on_logged_out_callbacks.append(
734 on_logged_out
735 )
736
737 channel = self.make_request(
738 "POST",
739 "/_matrix/client/v3/logout",
740 {},
741 access_token=tok,
742 )
743 self.assertEqual(channel.code, 200)
744 on_logged_out.assert_called_once()
745 self.assertTrue(self.called)
720746
721747 def _get_login_flows(self) -> JsonDict:
722748 channel = self.make_request("GET", "/_matrix/client/r0/login")
2727 from synapse.api.errors import AuthError, NotFoundError, SynapseError
2828 from synapse.api.room_versions import RoomVersions
2929 from synapse.events import make_event_from_dict
30 from synapse.federation.transport.client import TransportLayerClient
3031 from synapse.handlers.room_summary import _child_events_comparison_key, _RoomEntry
3132 from synapse.rest import admin
3233 from synapse.rest.client import login, room
133134 self._add_child(self.space, self.room, self.token)
134135
135136 def _add_child(
136 self, space_id: str, room_id: str, token: str, order: Optional[str] = None
137 self,
138 space_id: str,
139 room_id: str,
140 token: str,
141 order: Optional[str] = None,
142 via: Optional[List[str]] = None,
137143 ) -> None:
138144 """Add a child room to a space."""
139 content: JsonDict = {"via": [self.hs.hostname]}
145 if via is None:
146 via = [self.hs.hostname]
147
148 content: JsonDict = {"via": via}
140149 if order is not None:
141150 content["order"] = order
142151 self.helper.send_state(
250259 result = self.get_success(
251260 self.handler.get_room_hierarchy(create_requester(self.user), self.space)
252261 )
262 self._assert_hierarchy(result, expected)
263
264 def test_large_space(self):
265 """Test a space with a large number of rooms."""
266 rooms = [self.room]
267 # Make at least 51 rooms that are part of the space.
268 for _ in range(55):
269 room = self.helper.create_room_as(self.user, tok=self.token)
270 self._add_child(self.space, room, self.token)
271 rooms.append(room)
272
273 result = self.get_success(self.handler.get_space_summary(self.user, self.space))
274 # The spaces result should have the space and the first 50 rooms in it,
275 # along with the links from space -> room for those 50 rooms.
276 expected = [(self.space, rooms[:50])] + [(room, []) for room in rooms[:49]]
277 self._assert_rooms(result, expected)
278
279 # The result should have the space and the rooms in it, along with the links
280 # from space -> room.
281 expected = [(self.space, rooms)] + [(room, []) for room in rooms]
282
283 # Make two requests to fully paginate the results.
284 result = self.get_success(
285 self.handler.get_room_hierarchy(create_requester(self.user), self.space)
286 )
287 result2 = self.get_success(
288 self.handler.get_room_hierarchy(
289 create_requester(self.user), self.space, from_token=result["next_batch"]
290 )
291 )
292 # Combine the results.
293 result["rooms"] += result2["rooms"]
253294 self._assert_hierarchy(result, expected)
254295
255296 def test_visibility(self):
10031044 )
10041045 self._assert_hierarchy(result, expected)
10051046
1047 def test_fed_caching(self):
1048 """
1049 Federation `/hierarchy` responses should be cached.
1050 """
1051 fed_hostname = self.hs.hostname + "2"
1052 fed_subspace = "#space:" + fed_hostname
1053 fed_room = "#room:" + fed_hostname
1054
1055 # Add a room to the space which is on another server.
1056 self._add_child(self.space, fed_subspace, self.token, via=[fed_hostname])
1057
1058 federation_requests = 0
1059
1060 async def get_room_hierarchy(
1061 _self: TransportLayerClient,
1062 destination: str,
1063 room_id: str,
1064 suggested_only: bool,
1065 ) -> JsonDict:
1066 nonlocal federation_requests
1067 federation_requests += 1
1068
1069 return {
1070 "room": {
1071 "room_id": fed_subspace,
1072 "world_readable": True,
1073 "room_type": RoomTypes.SPACE,
1074 "children_state": [
1075 {
1076 "type": EventTypes.SpaceChild,
1077 "room_id": fed_subspace,
1078 "state_key": fed_room,
1079 "content": {"via": [fed_hostname]},
1080 },
1081 ],
1082 },
1083 "children": [
1084 {
1085 "room_id": fed_room,
1086 "world_readable": True,
1087 },
1088 ],
1089 "inaccessible_children": [],
1090 }
1091
1092 expected = [
1093 (self.space, [self.room, fed_subspace]),
1094 (self.room, ()),
1095 (fed_subspace, [fed_room]),
1096 (fed_room, ()),
1097 ]
1098
1099 with mock.patch(
1100 "synapse.federation.transport.client.TransportLayerClient.get_room_hierarchy",
1101 new=get_room_hierarchy,
1102 ):
1103 result = self.get_success(
1104 self.handler.get_room_hierarchy(create_requester(self.user), self.space)
1105 )
1106 self.assertEqual(federation_requests, 1)
1107 self._assert_hierarchy(result, expected)
1108
1109 # The previous federation response should be reused.
1110 result = self.get_success(
1111 self.handler.get_room_hierarchy(create_requester(self.user), self.space)
1112 )
1113 self.assertEqual(federation_requests, 1)
1114 self._assert_hierarchy(result, expected)
1115
1116 # Expire the response cache
1117 self.reactor.advance(5 * 60 + 1)
1118
1119 # A new federation request should be made.
1120 result = self.get_success(
1121 self.handler.get_room_hierarchy(create_requester(self.user), self.space)
1122 )
1123 self.assertEqual(federation_requests, 2)
1124 self._assert_hierarchy(result, expected)
1125
10061126
10071127 class RoomSummaryTestCase(unittest.HomeserverTestCase):
10081128 servlets = [
1010 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1111 # See the License for the specific language governing permissions and
1212 # limitations under the License.
13
1413 from typing import Optional
15 from unittest.mock import Mock
14 from unittest.mock import MagicMock, Mock, patch
1615
1716 from synapse.api.constants import EventTypes, JoinRules
1817 from synapse.api.errors import Codes, ResourceLimitError
1918 from synapse.api.filtering import Filtering
2019 from synapse.api.room_versions import RoomVersions
21 from synapse.handlers.sync import SyncConfig
20 from synapse.handlers.sync import SyncConfig, SyncResult
2221 from synapse.rest import admin
2322 from synapse.rest.client import knock, login, room
2423 from synapse.server import HomeServer
2625
2726 import tests.unittest
2827 import tests.utils
28 from tests.test_utils import make_awaitable
2929
3030
3131 class SyncTestCase(tests.unittest.HomeserverTestCase):
184184 self.assertNotIn(joined_room, [r.room_id for r in result.joined])
185185 self.assertNotIn(invite_room, [r.room_id for r in result.invited])
186186 self.assertNotIn(knock_room, [r.room_id for r in result.knocked])
187
188 def test_ban_wins_race_with_join(self):
189 """Rooms shouldn't appear under "joined" if a join loses a race to a ban.
190
191 A complicated edge case. Imagine the following scenario:
192
193 * you attempt to join a room
194 * racing with that is a ban which comes in over federation, which ends up with
195 an earlier stream_ordering than the join.
196 * you get a sync response with a sync token which is _after_ the ban, but before
197 the join
198 * now your join lands; it is a valid event because its `prev_event`s predate the
199 ban, but will not make it into current_state_events (because bans win over
200 joins in state res, essentially).
201 * When we do a sync from the incremental sync, the only event in the timeline
202 is your join ... and yet you aren't joined.
203
204 The ban coming in over federation isn't crucial for this behaviour; the key
205 requirements are:
206 1. the homeserver generates a join event with prev_events that precede the ban
207 (so that it passes the "are you banned" test)
208 2. the join event has a stream_ordering after that of the ban.
209
210 We use monkeypatching to artificially trigger condition (1).
211 """
212 # A local user Alice creates a room.
213 owner = self.register_user("alice", "password")
214 owner_tok = self.login(owner, "password")
215 room_id = self.helper.create_room_as(owner, is_public=True, tok=owner_tok)
216
217 # Do a sync as Alice to get the latest event in the room.
218 alice_sync_result: SyncResult = self.get_success(
219 self.sync_handler.wait_for_sync_for_user(
220 create_requester(owner), generate_sync_config(owner)
221 )
222 )
223 self.assertEqual(len(alice_sync_result.joined), 1)
224 self.assertEqual(alice_sync_result.joined[0].room_id, room_id)
225 last_room_creation_event_id = (
226 alice_sync_result.joined[0].timeline.events[-1].event_id
227 )
228
229 # Eve, a ne'er-do-well, registers.
230 eve = self.register_user("eve", "password")
231 eve_token = self.login(eve, "password")
232
233 # Alice preemptively bans Eve.
234 self.helper.ban(room_id, owner, eve, tok=owner_tok)
235
236 # Eve syncs.
237 eve_requester = create_requester(eve)
238 eve_sync_config = generate_sync_config(eve)
239 eve_sync_after_ban: SyncResult = self.get_success(
240 self.sync_handler.wait_for_sync_for_user(eve_requester, eve_sync_config)
241 )
242
243 # Sanity check this sync result. We shouldn't be joined to the room.
244 self.assertEqual(eve_sync_after_ban.joined, [])
245
246 # Eve tries to join the room. We monkey patch the internal logic which selects
247 # the prev_events used when creating the join event, such that the ban does not
248 # precede the join.
249 mocked_get_prev_events = patch.object(
250 self.hs.get_datastore(),
251 "get_prev_events_for_room",
252 new_callable=MagicMock,
253 return_value=make_awaitable([last_room_creation_event_id]),
254 )
255 with mocked_get_prev_events:
256 self.helper.join(room_id, eve, tok=eve_token)
257
258 # Eve makes a second, incremental sync.
259 eve_incremental_sync_after_join: SyncResult = self.get_success(
260 self.sync_handler.wait_for_sync_for_user(
261 eve_requester,
262 eve_sync_config,
263 since_token=eve_sync_after_ban.next_batch,
264 )
265 )
266 # Eve should not see herself as joined to the room.
267 self.assertEqual(eve_incremental_sync_after_join.joined, [])
268
269 # If we did a third initial sync, we should _still_ see eve is not joined to the room.
270 eve_initial_sync_after_join: SyncResult = self.get_success(
271 self.sync_handler.wait_for_sync_for_user(
272 eve_requester,
273 eve_sync_config,
274 since_token=None,
275 )
276 )
277 self.assertEqual(eve_initial_sync_after_join.joined, [])
187278
188279
189280 _request_key = 0
0 # Copyright 2022 The Matrix.org Foundation C.I.C.
1 #
2 # Licensed under the Apache License, Version 2.0 (the "License");
3 # you may not use this file except in compliance with the License.
4 # You may obtain a copy of the License at
5 #
6 # http://www.apache.org/licenses/LICENSE-2.0
7 #
8 # Unless required by applicable law or agreed to in writing, software
9 # distributed under the License is distributed on an "AS IS" BASIS,
10 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 # See the License for the specific language governing permissions and
12 # limitations under the License.
13 from http import HTTPStatus
14 from typing import Dict
15
16 from twisted.web.resource import Resource
17
18 from synapse.app.homeserver import SynapseHomeServer
19 from synapse.config.server import HttpListenerConfig, HttpResourceConfig, ListenerConfig
20 from synapse.http.site import SynapseSite
21
22 from tests.server import make_request
23 from tests.unittest import HomeserverTestCase, create_resource_tree, override_config
24
25
26 class WebClientTests(HomeserverTestCase):
27 @override_config(
28 {
29 "web_client_location": "https://example.org",
30 }
31 )
32 def test_webclient_resolves_with_client_resource(self):
33 """
34 Tests that both client and webclient resources can be accessed simultaneously.
35
36 This is a regression test created in response to https://github.com/matrix-org/synapse/issues/11763.
37 """
38 for resource_name_order_list in [
39 ["webclient", "client"],
40 ["client", "webclient"],
41 ]:
42 # Create a dictionary from path regex -> resource
43 resource_dict: Dict[str, Resource] = {}
44
45 for resource_name in resource_name_order_list:
46 resource_dict.update(
47 SynapseHomeServer._configure_named_resource(self.hs, resource_name)
48 )
49
50 # Create a root resource which ties the above resources together into one
51 root_resource = Resource()
52 create_resource_tree(resource_dict, root_resource)
53
54 # Create a site configured with this resource to make HTTP requests against
55 listener_config = ListenerConfig(
56 port=8008,
57 bind_addresses=["127.0.0.1"],
58 type="http",
59 http_options=HttpListenerConfig(
60 resources=[HttpResourceConfig(names=resource_name_order_list)]
61 ),
62 )
63 test_site = SynapseSite(
64 logger_name="synapse.access.http.fake",
65 site_tag=self.hs.config.server.server_name,
66 config=listener_config,
67 resource=root_resource,
68 server_version_string="1",
69 max_request_body_size=1234,
70 reactor=self.reactor,
71 )
72
73 # Attempt to make requests to endpoints on both the webclient and client resources
74 # on test_site.
75 self._request_client_and_webclient_resources(test_site)
76
77 def _request_client_and_webclient_resources(self, test_site: SynapseSite) -> None:
78 """Make a request to an endpoint on both the webclient and client-server resources
79 of the given SynapseSite.
80
81 Args:
82 test_site: The SynapseSite object to make requests against.
83 """
84
85 # Ensure that the *webclient* resource is behaving as expected (we get redirected to
86 # the configured web_client_location)
87 channel = make_request(
88 self.reactor,
89 site=test_site,
90 method="GET",
91 path="/_matrix/client",
92 )
93 # Check that we are being redirected to the webclient location URI.
94 self.assertEqual(channel.code, HTTPStatus.FOUND)
95 self.assertEqual(
96 channel.headers.getRawHeaders("Location"), ["https://example.org"]
97 )
98
99 # Ensure that a request to the *client* resource works.
100 channel = make_request(
101 self.reactor,
102 site=test_site,
103 method="GET",
104 path="/_matrix/client/v3/login",
105 )
106 self.assertEqual(channel.code, HTTPStatus.OK)
107 self.assertIn("flows", channel.json_body)
313313 retry_interval,
314314 last_successful_stream_ordering,
315315 ) in dest:
316 self.get_success(
317 self.store.set_destination_retry_timings(
318 destination, failure_ts, retry_last_ts, retry_interval
319 )
320 )
321 self.get_success(
322 self.store.set_destination_last_successful_stream_ordering(
323 destination, last_successful_stream_ordering
324 )
316 self._create_destination(
317 destination,
318 failure_ts,
319 retry_last_ts,
320 retry_interval,
321 last_successful_stream_ordering,
325322 )
326323
327324 # order by default (destination)
412409 _search_test(None, "foo")
413410 _search_test(None, "bar")
414411
415 def test_get_single_destination(self) -> None:
416 """
417 Get one specific destinations.
418 """
419 self._create_destinations(5)
412 def test_get_single_destination_with_retry_timings(self) -> None:
413 """Get one specific destination which has retry timings."""
414 self._create_destinations(1)
420415
421416 channel = self.make_request(
422417 "GET",
431426 # convert channel.json_body into a List
432427 self._check_fields([channel.json_body])
433428
429 def test_get_single_destination_no_retry_timings(self) -> None:
430 """Get one specific destination which has no retry timings."""
431 self._create_destination("sub0.example.com")
432
433 channel = self.make_request(
434 "GET",
435 self.url + "/sub0.example.com",
436 access_token=self.admin_user_tok,
437 )
438
439 self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
440 self.assertEqual("sub0.example.com", channel.json_body["destination"])
441 self.assertEqual(0, channel.json_body["retry_last_ts"])
442 self.assertEqual(0, channel.json_body["retry_interval"])
443 self.assertIsNone(channel.json_body["failure_ts"])
444 self.assertIsNone(channel.json_body["last_successful_stream_ordering"])
445
446 def _create_destination(
447 self,
448 destination: str,
449 failure_ts: Optional[int] = None,
450 retry_last_ts: int = 0,
451 retry_interval: int = 0,
452 last_successful_stream_ordering: Optional[int] = None,
453 ) -> None:
454 """Create one specific destination
455
456 Args:
457 destination: the destination we have successfully sent to
458 failure_ts: when the server started failing (ms since epoch)
459 retry_last_ts: time of last retry attempt in unix epoch ms
460 retry_interval: how long until next retry in ms
461 last_successful_stream_ordering: the stream_ordering of the most
462 recent successfully-sent PDU
463 """
464 self.get_success(
465 self.store.set_destination_retry_timings(
466 destination, failure_ts, retry_last_ts, retry_interval
467 )
468 )
469 if last_successful_stream_ordering is not None:
470 self.get_success(
471 self.store.set_destination_last_successful_stream_ordering(
472 destination, last_successful_stream_ordering
473 )
474 )
475
434476 def _create_destinations(self, number_destinations: int) -> None:
435477 """Create a number of destinations
436478
439481 """
440482 for i in range(0, number_destinations):
441483 dest = f"sub{i}.example.com"
442 self.get_success(self.store.set_destination_retry_timings(dest, 50, 50, 50))
443 self.get_success(
444 self.store.set_destination_last_successful_stream_ordering(dest, 100)
445 )
484 self._create_destination(dest, 50, 50, 50, 100)
446485
447486 def _check_fields(self, content: List[JsonDict]) -> None:
448487 """Checks that the expected destination attributes are present in content
222222 # Create all possible single character tokens
223223 tokens = []
224224 for c in string.ascii_letters + string.digits + "._~-":
225 tokens.append(
226 {
227 "token": c,
228 "uses_allowed": None,
229 "pending": 0,
230 "completed": 0,
231 "expiry_time": None,
232 }
233 )
225 tokens.append((c, None, 0, 0, None))
234226 self.get_success(
235227 self.store.db_pool.simple_insert_many(
236228 "registration_tokens",
237 tokens,
238 "create_all_registration_tokens",
229 keys=("token", "uses_allowed", "pending", "completed", "expiry_time"),
230 values=tokens,
231 desc="create_all_registration_tokens",
239232 )
240233 )
241234
10881088 )
10891089 room_ids.append(room_id)
10901090
1091 room_ids.sort()
1092
10911093 # Request the list of rooms
10921094 url = "/_synapse/admin/v1/rooms"
10931095 channel = self.make_request(
13591361 room_id_2 = self.helper.create_room_as(self.admin_user, tok=self.admin_user_tok)
13601362 room_id_3 = self.helper.create_room_as(self.admin_user, tok=self.admin_user_tok)
13611363
1364 # Also create a list sorted by IDs for properties that are equal (and thus sorted by room_id)
1365 sorted_by_room_id_asc = [room_id_1, room_id_2, room_id_3]
1366 sorted_by_room_id_asc.sort()
1367 sorted_by_room_id_desc = sorted_by_room_id_asc.copy()
1368 sorted_by_room_id_desc.reverse()
1369
13621370 # Set room names in alphabetical order. room 1 -> A, 2 -> B, 3 -> C
13631371 self.helper.send_state(
13641372 room_id_1,
14041412 _order_test("canonical_alias", [room_id_1, room_id_2, room_id_3])
14051413 _order_test("canonical_alias", [room_id_3, room_id_2, room_id_1], reverse=True)
14061414
1415 # Note: joined_member counts are sorted in descending order when dir=f
14071416 _order_test("joined_members", [room_id_3, room_id_2, room_id_1])
14081417 _order_test("joined_members", [room_id_1, room_id_2, room_id_3], reverse=True)
14091418
1419 # Note: joined_local_member counts are sorted in descending order when dir=f
14101420 _order_test("joined_local_members", [room_id_3, room_id_2, room_id_1])
14111421 _order_test(
14121422 "joined_local_members", [room_id_1, room_id_2, room_id_3], reverse=True
14131423 )
14141424
1415 _order_test("version", [room_id_1, room_id_2, room_id_3])
1416 _order_test("version", [room_id_1, room_id_2, room_id_3], reverse=True)
1417
1418 _order_test("creator", [room_id_1, room_id_2, room_id_3])
1419 _order_test("creator", [room_id_1, room_id_2, room_id_3], reverse=True)
1420
1421 _order_test("encryption", [room_id_1, room_id_2, room_id_3])
1422 _order_test("encryption", [room_id_1, room_id_2, room_id_3], reverse=True)
1423
1424 _order_test("federatable", [room_id_1, room_id_2, room_id_3])
1425 _order_test("federatable", [room_id_1, room_id_2, room_id_3], reverse=True)
1426
1427 _order_test("public", [room_id_1, room_id_2, room_id_3])
1428 # Different sort order of SQlite and PostreSQL
1429 # _order_test("public", [room_id_3, room_id_2, room_id_1], reverse=True)
1430
1431 _order_test("join_rules", [room_id_1, room_id_2, room_id_3])
1432 _order_test("join_rules", [room_id_1, room_id_2, room_id_3], reverse=True)
1433
1434 _order_test("guest_access", [room_id_1, room_id_2, room_id_3])
1435 _order_test("guest_access", [room_id_1, room_id_2, room_id_3], reverse=True)
1436
1437 _order_test("history_visibility", [room_id_1, room_id_2, room_id_3])
1438 _order_test(
1439 "history_visibility", [room_id_1, room_id_2, room_id_3], reverse=True
1440 )
1441
1425 # Note: versions are sorted in descending order when dir=f
1426 _order_test("version", sorted_by_room_id_asc, reverse=True)
1427 _order_test("version", sorted_by_room_id_desc)
1428
1429 _order_test("creator", sorted_by_room_id_asc)
1430 _order_test("creator", sorted_by_room_id_desc, reverse=True)
1431
1432 _order_test("encryption", sorted_by_room_id_asc)
1433 _order_test("encryption", sorted_by_room_id_desc, reverse=True)
1434
1435 _order_test("federatable", sorted_by_room_id_asc)
1436 _order_test("federatable", sorted_by_room_id_desc, reverse=True)
1437
1438 _order_test("public", sorted_by_room_id_asc)
1439 _order_test("public", sorted_by_room_id_desc, reverse=True)
1440
1441 _order_test("join_rules", sorted_by_room_id_asc)
1442 _order_test("join_rules", sorted_by_room_id_desc, reverse=True)
1443
1444 _order_test("guest_access", sorted_by_room_id_asc)
1445 _order_test("guest_access", sorted_by_room_id_desc, reverse=True)
1446
1447 _order_test("history_visibility", sorted_by_room_id_asc)
1448 _order_test("history_visibility", sorted_by_room_id_desc, reverse=True)
1449
1450 # Note: state_event counts are sorted in descending order when dir=f
14421451 _order_test("state_events", [room_id_3, room_id_2, room_id_1])
14431452 _order_test("state_events", [room_id_1, room_id_2, room_id_3], reverse=True)
14441453
11801180 self.other_user, device_id=None, valid_until_ms=None
11811181 )
11821182 )
1183
11831184 self.url_prefix = "/_synapse/admin/v2/users/%s"
11841185 self.url_other_user = self.url_prefix % self.other_user
11851186
11871188 """
11881189 If the user is not a server admin, an error is returned.
11891190 """
1190 url = "/_synapse/admin/v2/users/@bob:test"
1191 url = self.url_prefix % "@bob:test"
11911192
11921193 channel = self.make_request(
11931194 "GET",
12151216
12161217 channel = self.make_request(
12171218 "GET",
1218 "/_synapse/admin/v2/users/@unknown_person:test",
1219 self.url_prefix % "@unknown_person:test",
12191220 access_token=self.admin_user_tok,
12201221 )
12211222
13361337 """
13371338 Check that a new admin user is created successfully.
13381339 """
1339 url = "/_synapse/admin/v2/users/@bob:test"
1340 url = self.url_prefix % "@bob:test"
13401341
13411342 # Create user (server admin)
13421343 body = {
13851386 """
13861387 Check that a new regular user is created successfully.
13871388 """
1388 url = "/_synapse/admin/v2/users/@bob:test"
1389 url = self.url_prefix % "@bob:test"
13891390
13901391 # Create user
13911392 body = {
14771478 )
14781479
14791480 # Register new user with admin API
1480 url = "/_synapse/admin/v2/users/@bob:test"
1481 url = self.url_prefix % "@bob:test"
14811482
14821483 # Create user
14831484 channel = self.make_request(
15141515 )
15151516
15161517 # Register new user with admin API
1517 url = "/_synapse/admin/v2/users/@bob:test"
1518 url = self.url_prefix % "@bob:test"
15181519
15191520 # Create user
15201521 channel = self.make_request(
15441545 Check that a new regular user is created successfully and
15451546 got an email pusher.
15461547 """
1547 url = "/_synapse/admin/v2/users/@bob:test"
1548 url = self.url_prefix % "@bob:test"
15481549
15491550 # Create user
15501551 body = {
15871588 Check that a new regular user is created successfully and
15881589 got not an email pusher.
15891590 """
1590 url = "/_synapse/admin/v2/users/@bob:test"
1591 url = self.url_prefix % "@bob:test"
15911592
15921593 # Create user
15931594 body = {
20842085 self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
20852086 self.assertEqual("@user:test", channel.json_body["name"])
20862087 self.assertTrue(channel.json_body["deactivated"])
2087 self.assertIsNone(channel.json_body["password_hash"])
20882088 self.assertEqual(0, len(channel.json_body["threepids"]))
20892089 self.assertEqual("mxc://servername/mediaid", channel.json_body["avatar_url"])
20902090 self.assertEqual("User", channel.json_body["displayname"])
2091
2092 # This key was removed intentionally. Ensure it is not accidentally re-included.
2093 self.assertNotIn("password_hash", channel.json_body)
2094
20912095 # the user is deactivated, the threepid will be deleted
20922096
20932097 # Get user
21002104 self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
21012105 self.assertEqual("@user:test", channel.json_body["name"])
21022106 self.assertTrue(channel.json_body["deactivated"])
2103 self.assertIsNone(channel.json_body["password_hash"])
21042107 self.assertEqual(0, len(channel.json_body["threepids"]))
21052108 self.assertEqual("mxc://servername/mediaid", channel.json_body["avatar_url"])
21062109 self.assertEqual("User", channel.json_body["displayname"])
21072110
2111 # This key was removed intentionally. Ensure it is not accidentally re-included.
2112 self.assertNotIn("password_hash", channel.json_body)
2113
21082114 @override_config({"user_directory": {"enabled": True, "search_all_users": True}})
21092115 def test_change_name_deactivate_user_user_directory(self):
21102116 """
21762182 self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
21772183 self.assertEqual("@user:test", channel.json_body["name"])
21782184 self.assertFalse(channel.json_body["deactivated"])
2179 self.assertIsNotNone(channel.json_body["password_hash"])
21802185 self._is_erased("@user:test", False)
2186
2187 # This key was removed intentionally. Ensure it is not accidentally re-included.
2188 self.assertNotIn("password_hash", channel.json_body)
21812189
21822190 @override_config({"password_config": {"localdb_enabled": False}})
21832191 def test_reactivate_user_localdb_disabled(self):
22082216 self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
22092217 self.assertEqual("@user:test", channel.json_body["name"])
22102218 self.assertFalse(channel.json_body["deactivated"])
2211 self.assertIsNone(channel.json_body["password_hash"])
22122219 self._is_erased("@user:test", False)
2220
2221 # This key was removed intentionally. Ensure it is not accidentally re-included.
2222 self.assertNotIn("password_hash", channel.json_body)
22132223
22142224 @override_config({"password_config": {"enabled": False}})
22152225 def test_reactivate_user_password_disabled(self):
22402250 self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
22412251 self.assertEqual("@user:test", channel.json_body["name"])
22422252 self.assertFalse(channel.json_body["deactivated"])
2243 self.assertIsNone(channel.json_body["password_hash"])
22442253 self._is_erased("@user:test", False)
2254
2255 # This key was removed intentionally. Ensure it is not accidentally re-included.
2256 self.assertNotIn("password_hash", channel.json_body)
22452257
22462258 def test_set_user_as_admin(self):
22472259 """
23272339 Ensure an account can't accidentally be deactivated by using a str value
23282340 for the deactivated body parameter
23292341 """
2330 url = "/_synapse/admin/v2/users/@bob:test"
2342 url = self.url_prefix % "@bob:test"
23312343
23322344 # Create user
23332345 channel = self.make_request(
23912403 # Deactivate the user.
23922404 channel = self.make_request(
23932405 "PUT",
2394 "/_synapse/admin/v2/users/%s" % urllib.parse.quote(user_id),
2406 self.url_prefix % urllib.parse.quote(user_id),
23952407 access_token=self.admin_user_tok,
23962408 content={"deactivated": True},
23972409 )
23982410 self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
23992411 self.assertTrue(channel.json_body["deactivated"])
2400 self.assertIsNone(channel.json_body["password_hash"])
24012412 self._is_erased(user_id, False)
24022413 d = self.store.mark_user_erased(user_id)
24032414 self.assertIsNone(self.get_success(d))
24042415 self._is_erased(user_id, True)
2416
2417 # This key was removed intentionally. Ensure it is not accidentally re-included.
2418 self.assertNotIn("password_hash", channel.json_body)
24052419
24062420 def _check_fields(self, content: JsonDict):
24072421 """Checks that the expected user attributes are present in content
24152429 self.assertIn("admin", content)
24162430 self.assertIn("deactivated", content)
24172431 self.assertIn("shadow_banned", content)
2418 self.assertIn("password_hash", content)
24192432 self.assertIn("creation_ts", content)
24202433 self.assertIn("appservice_id", content)
24212434 self.assertIn("consent_server_notice_sent", content)
24222435 self.assertIn("consent_version", content)
24232436 self.assertIn("external_ids", content)
2437
2438 # This key was removed intentionally. Ensure it is not accidentally re-included.
2439 self.assertNotIn("password_hash", content)
24242440
24252441
24262442 class UserMembershipRestTestCase(unittest.HomeserverTestCase):
2020 from synapse.api.constants import EventTypes, RelationTypes
2121 from synapse.rest import admin
2222 from synapse.rest.client import login, register, relations, room, sync
23 from synapse.types import JsonDict
2324
2425 from tests import unittest
2526 from tests.server import FakeChannel
9293 channel.json_body,
9394 )
9495
95 def test_deny_membership(self):
96 """Test that we deny relations on membership events"""
97 channel = self._send_relation(RelationTypes.ANNOTATION, EventTypes.Member)
98 self.assertEquals(400, channel.code, channel.json_body)
99
10096 def test_deny_invalid_event(self):
10197 """Test that we deny relations on non-existant events"""
10298 channel = self._send_relation(
458454
459455 @unittest.override_config({"experimental_features": {"msc3440_enabled": True}})
460456 def test_bundled_aggregations(self):
461 """Test that annotations, references, and threads get correctly bundled."""
457 """
458 Test that annotations, references, and threads get correctly bundled.
459
460 Note that this doesn't test against /relations since only thread relations
461 get bundled via that API. See test_aggregation_get_event_for_thread.
462
463 See test_edit for a similar test for edits.
464 """
462465 # Setup by sending a variety of relations.
463466 channel = self._send_relation(RelationTypes.ANNOTATION, "m.reaction", "a")
464467 self.assertEquals(200, channel.code, channel.json_body)
486489 self.assertEquals(200, channel.code, channel.json_body)
487490 thread_2 = channel.json_body["event_id"]
488491
489 def assert_bundle(actual):
492 def assert_bundle(event_json: JsonDict) -> None:
490493 """Assert the expected values of the bundled aggregations."""
494 relations_dict = event_json["unsigned"].get("m.relations")
491495
492496 # Ensure the fields are as expected.
493497 self.assertCountEqual(
494 actual.keys(),
498 relations_dict.keys(),
495499 (
496500 RelationTypes.ANNOTATION,
497501 RelationTypes.REFERENCE,
507511 {"type": "m.reaction", "key": "b", "count": 1},
508512 ]
509513 },
510 actual[RelationTypes.ANNOTATION],
514 relations_dict[RelationTypes.ANNOTATION],
511515 )
512516
513517 self.assertEquals(
514518 {"chunk": [{"event_id": reply_1}, {"event_id": reply_2}]},
515 actual[RelationTypes.REFERENCE],
519 relations_dict[RelationTypes.REFERENCE],
516520 )
517521
518522 self.assertEquals(
519523 2,
520 actual[RelationTypes.THREAD].get("count"),
524 relations_dict[RelationTypes.THREAD].get("count"),
525 )
526 self.assertTrue(
527 relations_dict[RelationTypes.THREAD].get("current_user_participated")
521528 )
522529 # The latest thread event has some fields that don't matter.
523530 self.assert_dict(
534541 "type": "m.room.test",
535542 "user_id": self.user_id,
536543 },
537 actual[RelationTypes.THREAD].get("latest_event"),
538 )
539
540 def _find_and_assert_event(events):
541 """
542 Find the parent event in a chunk of events and assert that it has the proper bundled aggregations.
543 """
544 for event in events:
545 if event["event_id"] == self.parent_id:
546 break
547 else:
548 raise AssertionError(f"Event {self.parent_id} not found in chunk")
549 assert_bundle(event["unsigned"].get("m.relations"))
544 relations_dict[RelationTypes.THREAD].get("latest_event"),
545 )
550546
551547 # Request the event directly.
552548 channel = self.make_request(
555551 access_token=self.user_token,
556552 )
557553 self.assertEquals(200, channel.code, channel.json_body)
558 assert_bundle(channel.json_body["unsigned"].get("m.relations"))
554 assert_bundle(channel.json_body)
559555
560556 # Request the room messages.
561557 channel = self.make_request(
564560 access_token=self.user_token,
565561 )
566562 self.assertEquals(200, channel.code, channel.json_body)
567 _find_and_assert_event(channel.json_body["chunk"])
563 assert_bundle(self._find_event_in_chunk(channel.json_body["chunk"]))
568564
569565 # Request the room context.
570566 channel = self.make_request(
573569 access_token=self.user_token,
574570 )
575571 self.assertEquals(200, channel.code, channel.json_body)
576 assert_bundle(channel.json_body["event"]["unsigned"].get("m.relations"))
572 assert_bundle(channel.json_body["event"])
577573
578574 # Request sync.
579 # channel = self.make_request("GET", "/sync", access_token=self.user_token)
580 # self.assertEquals(200, channel.code, channel.json_body)
581 # room_timeline = channel.json_body["rooms"]["join"][self.room]["timeline"]
582 # self.assertTrue(room_timeline["limited"])
583 # _find_and_assert_event(room_timeline["events"])
584
585 # Note that /relations is tested separately in test_aggregation_get_event_for_thread
586 # since it needs different data configured.
575 channel = self.make_request("GET", "/sync", access_token=self.user_token)
576 self.assertEquals(200, channel.code, channel.json_body)
577 room_timeline = channel.json_body["rooms"]["join"][self.room]["timeline"]
578 self.assertTrue(room_timeline["limited"])
579 self._find_event_in_chunk(room_timeline["events"])
587580
588581 def test_aggregation_get_event_for_annotation(self):
589582 """Test that annotations do not get bundled aggregations included
778771
779772 edit_event_id = channel.json_body["event_id"]
780773
781 channel = self.make_request(
782 "GET",
783 "/rooms/%s/event/%s" % (self.room, self.parent_id),
784 access_token=self.user_token,
785 )
786 self.assertEquals(200, channel.code, channel.json_body)
787
774 def assert_bundle(event_json: JsonDict) -> None:
775 """Assert the expected values of the bundled aggregations."""
776 relations_dict = event_json["unsigned"].get("m.relations")
777 self.assertIn(RelationTypes.REPLACE, relations_dict)
778
779 m_replace_dict = relations_dict[RelationTypes.REPLACE]
780 for key in ["event_id", "sender", "origin_server_ts"]:
781 self.assertIn(key, m_replace_dict)
782
783 self.assert_dict(
784 {"event_id": edit_event_id, "sender": self.user_id}, m_replace_dict
785 )
786
787 channel = self.make_request(
788 "GET",
789 f"/rooms/{self.room}/event/{self.parent_id}",
790 access_token=self.user_token,
791 )
792 self.assertEquals(200, channel.code, channel.json_body)
788793 self.assertEquals(channel.json_body["content"], new_body)
789
790 relations_dict = channel.json_body["unsigned"].get("m.relations")
791 self.assertIn(RelationTypes.REPLACE, relations_dict)
792
793 m_replace_dict = relations_dict[RelationTypes.REPLACE]
794 for key in ["event_id", "sender", "origin_server_ts"]:
795 self.assertIn(key, m_replace_dict)
796
797 self.assert_dict(
798 {"event_id": edit_event_id, "sender": self.user_id}, m_replace_dict
799 )
794 assert_bundle(channel.json_body)
795
796 # Request the room messages.
797 channel = self.make_request(
798 "GET",
799 f"/rooms/{self.room}/messages?dir=b",
800 access_token=self.user_token,
801 )
802 self.assertEquals(200, channel.code, channel.json_body)
803 assert_bundle(self._find_event_in_chunk(channel.json_body["chunk"]))
804
805 # Request the room context.
806 channel = self.make_request(
807 "GET",
808 f"/rooms/{self.room}/context/{self.parent_id}",
809 access_token=self.user_token,
810 )
811 self.assertEquals(200, channel.code, channel.json_body)
812 assert_bundle(channel.json_body["event"])
813
814 # Request sync, but limit the timeline so it becomes limited (and includes
815 # bundled aggregations).
816 filter = urllib.parse.quote_plus(
817 '{"room": {"timeline": {"limit": 2}}}'.encode()
818 )
819 channel = self.make_request(
820 "GET", f"/sync?filter={filter}", access_token=self.user_token
821 )
822 self.assertEquals(200, channel.code, channel.json_body)
823 room_timeline = channel.json_body["rooms"]["join"][self.room]["timeline"]
824 self.assertTrue(room_timeline["limited"])
825 assert_bundle(self._find_event_in_chunk(room_timeline["events"]))
800826
801827 def test_multi_edit(self):
802828 """Test that multiple edits, including attempts by people who
11031129 self.assertEquals(200, channel.code, channel.json_body)
11041130 self.assertEquals(channel.json_body["chunk"], [])
11051131
1132 def _find_event_in_chunk(self, events: List[JsonDict]) -> JsonDict:
1133 """
1134 Find the parent event in a chunk of events and assert that it has the proper bundled aggregations.
1135 """
1136 for event in events:
1137 if event["event_id"] == self.parent_id:
1138 return event
1139
1140 raise AssertionError(f"Event {self.parent_id} not found in chunk")
1141
11061142 def _send_relation(
11071143 self,
11081144 relation_type: str,
11181154 relation_type: One of `RelationTypes`
11191155 event_type: The type of the event to create
11201156 key: The aggregation key used for m.annotation relation type.
1121 content: The content of the created event.
1157 content: The content of the created event. Will be modified to configure
1158 the m.relates_to key based on the other provided parameters.
11221159 access_token: The access token used to send the relation, defaults
11231160 to `self.user_token`
11241161 parent_id: The event_id this relation relates to. If None, then self.parent_id
11291166 if not access_token:
11301167 access_token = self.user_token
11311168
1132 query = ""
1133 if key:
1134 query = "?key=" + urllib.parse.quote_plus(key.encode("utf-8"))
1135
11361169 original_id = parent_id if parent_id else self.parent_id
11371170
1171 if content is None:
1172 content = {}
1173 content["m.relates_to"] = {
1174 "event_id": original_id,
1175 "rel_type": relation_type,
1176 }
1177 if key is not None:
1178 content["m.relates_to"]["key"] = key
1179
11381180 channel = self.make_request(
11391181 "POST",
1140 "/_matrix/client/unstable/rooms/%s/send_relation/%s/%s/%s%s"
1141 % (self.room, original_id, relation_type, event_type, query),
1142 content or {},
1182 f"/_matrix/client/v3/rooms/{self.room}/send/{event_type}",
1183 content,
11431184 access_token=access_token,
11441185 )
11451186 return channel
227227 self.assertIsNotNone(event)
228228
229229 time_now = self.clock.time_msec()
230 serialized = self.get_success(self.serializer.serialize_event(event, time_now))
230 serialized = self.serializer.serialize_event(event, time_now)
231231
232232 return serialized
233233
195195 expect_code=expect_code,
196196 )
197197
198 def ban(self, room: str, src: str, targ: str, **kwargs: object):
199 """A convenience helper: `change_membership` with `membership` preset to "ban"."""
200 self.change_membership(
201 room=room,
202 src=src,
203 targ=targ,
204 membership=Membership.BAN,
205 **kwargs,
206 )
207
198208 def change_membership(
199209 self,
200210 room: str,
1313 import hashlib
1414 import json
1515 import logging
16 import os
17 import os.path
1618 import time
1719 import uuid
1820 import warnings
7072 POSTGRES_HOST,
7173 POSTGRES_PASSWORD,
7274 POSTGRES_USER,
75 SQLITE_PERSIST_DB,
7376 USE_POSTGRES_FOR_TESTS,
7477 MockClock,
7578 default_config,
738741 },
739742 }
740743 else:
744 if SQLITE_PERSIST_DB:
745 # The current working directory is in _trial_temp, so this gets created within that directory.
746 test_db_location = os.path.abspath("test.db")
747 logger.debug("Will persist db to %s", test_db_location)
748 # Ensure each test gets a clean database.
749 try:
750 os.remove(test_db_location)
751 except FileNotFoundError:
752 pass
753 else:
754 logger.debug("Removed existing DB at %s", test_db_location)
755 else:
756 test_db_location = ":memory:"
757
741758 database_config = {
742759 "name": "sqlite3",
743 "args": {"database": ":memory:", "cp_min": 1, "cp_max": 1},
760 "args": {"database": test_db_location, "cp_min": 1, "cp_max": 1},
744761 }
745762
746763 if "db_txn_limit" in kwargs:
530530 self.get_success(
531531 self.store.db_pool.simple_insert_many(
532532 table="federation_inbound_events_staging",
533 keys=(
534 "origin",
535 "room_id",
536 "received_ts",
537 "event_id",
538 "event_json",
539 "internal_metadata",
540 ),
533541 values=[
534 {
535 "origin": "some_origin",
536 "room_id": room_id,
537 "received_ts": 0,
538 "event_id": f"$fake_event_id_{i + 1}",
539 "event_json": json_encoder.encode(
542 (
543 "some_origin",
544 room_id,
545 0,
546 f"$fake_event_id_{i + 1}",
547 json_encoder.encode(
540548 {"prev_events": [prev_event_format(f"$fake_event_id_{i}")]}
541549 ),
542 "internal_metadata": "{}",
543 }
550 "{}",
551 )
544552 for i in range(500)
545553 ],
546554 desc="test_prune_inbound_federation_queue",
1616 from twisted.internet.defer import succeed
1717
1818 from synapse.api.errors import FederationError
19 from synapse.api.room_versions import RoomVersions
1920 from synapse.events import make_event_from_dict
21 from synapse.federation.federation_base import event_from_pdu_json
2022 from synapse.logging.context import LoggingContext
2123 from synapse.types import UserID, create_requester
2224 from synapse.util import Clock
275277 "ed25519:" + remote_self_signing_key in self_signing_key["keys"].keys(),
276278 )
277279 self.assertTrue(remote_self_signing_key in self_signing_key["keys"].values())
280
281
282 class StripUnsignedFromEventsTestCase(unittest.TestCase):
283 def test_strip_unauthorized_unsigned_values(self):
284 event1 = {
285 "sender": "@baduser:test.serv",
286 "state_key": "@baduser:test.serv",
287 "event_id": "$event1:test.serv",
288 "depth": 1000,
289 "origin_server_ts": 1,
290 "type": "m.room.member",
291 "origin": "test.servx",
292 "content": {"membership": "join"},
293 "auth_events": [],
294 "unsigned": {"malicious garbage": "hackz", "more warez": "more hackz"},
295 }
296 filtered_event = event_from_pdu_json(event1, RoomVersions.V1)
297 # Make sure unauthorized fields are stripped from unsigned
298 self.assertNotIn("more warez", filtered_event.unsigned)
299
300 def test_strip_event_maintains_allowed_fields(self):
301 event2 = {
302 "sender": "@baduser:test.serv",
303 "state_key": "@baduser:test.serv",
304 "event_id": "$event2:test.serv",
305 "depth": 1000,
306 "origin_server_ts": 1,
307 "type": "m.room.member",
308 "origin": "test.servx",
309 "auth_events": [],
310 "content": {"membership": "join"},
311 "unsigned": {
312 "malicious garbage": "hackz",
313 "more warez": "more hackz",
314 "age": 14,
315 "invite_room_state": [],
316 },
317 }
318
319 filtered_event2 = event_from_pdu_json(event2, RoomVersions.V1)
320 self.assertIn("age", filtered_event2.unsigned)
321 self.assertEqual(14, filtered_event2.unsigned["age"])
322 self.assertNotIn("more warez", filtered_event2.unsigned)
323 # Invite_room_state is allowed in events of type m.room.member
324 self.assertIn("invite_room_state", filtered_event2.unsigned)
325 self.assertEqual([], filtered_event2.unsigned["invite_room_state"])
326
327 def test_strip_event_removes_fields_based_on_event_type(self):
328 event3 = {
329 "sender": "@baduser:test.serv",
330 "state_key": "@baduser:test.serv",
331 "event_id": "$event3:test.serv",
332 "depth": 1000,
333 "origin_server_ts": 1,
334 "type": "m.room.power_levels",
335 "origin": "test.servx",
336 "content": {},
337 "auth_events": [],
338 "unsigned": {
339 "malicious garbage": "hackz",
340 "more warez": "more hackz",
341 "age": 14,
342 "invite_room_state": [],
343 },
344 }
345 filtered_event3 = event_from_pdu_json(event3, RoomVersions.V1)
346 self.assertIn("age", filtered_event3.unsigned)
347 # Invite_room_state field is only permitted in event type m.room.member
348 self.assertNotIn("invite_room_state", filtered_event3.unsigned)
349 self.assertNotIn("more warez", filtered_event3.unsigned)
1818 import warnings
1919 from asyncio import Future
2020 from binascii import unhexlify
21 from typing import Any, Awaitable, Callable, TypeVar
21 from typing import Awaitable, Callable, TypeVar
2222 from unittest.mock import Mock
2323
2424 import attr
4545 raise Exception("awaitable has not yet completed")
4646
4747
48 def make_awaitable(result: Any) -> Awaitable[Any]:
48 def make_awaitable(result: TV) -> Awaitable[TV]:
4949 """
5050 Makes an awaitable, suitable for mocking an `async` function.
5151 This uses Futures as they can be awaited multiple times so can be returned
4040 POSTGRES_HOST = os.environ.get("SYNAPSE_POSTGRES_HOST", None)
4141 POSTGRES_PASSWORD = os.environ.get("SYNAPSE_POSTGRES_PASSWORD", None)
4242 POSTGRES_BASE_DB = "_synapse_unit_tests_base_%s" % (os.getpid(),)
43
44 # When debugging a specific test, it's occasionally useful to write the
45 # DB to disk and query it with the sqlite CLI.
46 SQLITE_PERSIST_DB = os.environ.get("SYNAPSE_TEST_PERSIST_SQLITE_DB") is not None
4347
4448 # the dbname we will connect to in order to create the base database.
4549 POSTGRES_DBNAME_FOR_INITIAL_CREATE = "postgres"