New upstream version 1.7.1
Andrej Shadura
4 years ago
0 | Synapse 1.7.1 (2019-12-18) | |
1 | ========================== | |
2 | ||
3 | This release includes several security fixes as well as a fix to a bug exposed by the security fixes. Administrators are encouraged to upgrade as soon as possible. | |
4 | ||
5 | Security updates | |
6 | ---------------- | |
7 | ||
8 | - Fix a bug which could cause room events to be incorrectly authorized using events from a different room. ([\#6501](https://github.com/matrix-org/synapse/issues/6501), [\#6503](https://github.com/matrix-org/synapse/issues/6503), [\#6521](https://github.com/matrix-org/synapse/issues/6521), [\#6524](https://github.com/matrix-org/synapse/issues/6524), [\#6530](https://github.com/matrix-org/synapse/issues/6530), [\#6531](https://github.com/matrix-org/synapse/issues/6531)) | |
9 | - Fix a bug causing responses to the `/context` client endpoint to not use the pruned version of the event. ([\#6553](https://github.com/matrix-org/synapse/issues/6553)) | |
10 | - Fix a cause of state resets in room versions 2 onwards. ([\#6556](https://github.com/matrix-org/synapse/issues/6556), [\#6560](https://github.com/matrix-org/synapse/issues/6560)) | |
11 | ||
12 | Bugfixes | |
13 | -------- | |
14 | ||
15 | - Fix a bug which could cause the federation server to incorrectly return errors when handling certain obscure event graphs. ([\#6526](https://github.com/matrix-org/synapse/issues/6526), [\#6527](https://github.com/matrix-org/synapse/issues/6527)) | |
16 | ||
0 | 17 | Synapse 1.7.0 (2019-12-13) |
1 | 18 | ========================== |
2 | 19 | |
87 | 104 | - Add a test scenario to make sure room history purges don't break `/messages` in the future. ([\#6392](https://github.com/matrix-org/synapse/issues/6392)) |
88 | 105 | - Clarifications for the email configuration settings. ([\#6423](https://github.com/matrix-org/synapse/issues/6423)) |
89 | 106 | - Add more tests to the blacklist when running in worker mode. ([\#6429](https://github.com/matrix-org/synapse/issues/6429)) |
90 | - Refactor data store layer to support multiple databases in the future. ([\#6454](https://github.com/matrix-org/synapse/issues/6454), [\#6464](https://github.com/matrix-org/synapse/issues/6464), [\#6469](https://github.com/matrix-org/synapse/issues/6469), [\#6487](https://github.com/matrix-org/synapse/issues/6487)) | |
107 | - Refactor data store layer to support multiple databases in the future. ([\#6454](https://github.com/matrix-org/synapse/issues/6454), [\#6464](https://github.com/matrix-org/synapse/issues/6464), [\#6469](https://github.com/matrix-org/synapse/issues/6469), [\#6487](https://github.com/matrix-org/synapse/issues/6487)) | |
91 | 108 | - Port synapse.rest.client.v1 to async/await. ([\#6482](https://github.com/matrix-org/synapse/issues/6482)) |
92 | 109 | - Port synapse.rest.client.v2_alpha to async/await. ([\#6483](https://github.com/matrix-org/synapse/issues/6483)) |
93 | 110 | - Port SyncHandler to async/await. ([\#6484](https://github.com/matrix-org/synapse/issues/6484)) |
0 | Add option `limit_profile_requests_to_users_who_share_rooms` to prevent requirement of a local user sharing a room with another user to query their profile information. |
0 | matrix-synapse-py3 (1.7.1) stable; urgency=medium | |
1 | ||
2 | * New synapse release 1.7.1. | |
3 | ||
4 | -- Synapse Packaging team <packages@matrix.org> Wed, 18 Dec 2019 09:37:59 +0000 | |
5 | ||
0 | 6 | matrix-synapse-py3 (1.7.0) stable; urgency=medium |
1 | 7 | |
2 | 8 | * New synapse release 1.7.0. |
52 | 52 | # the server. |
53 | 53 | # |
54 | 54 | #require_auth_for_profile_requests: true |
55 | ||
56 | # Uncomment to require a user to share a room with another user in order | |
57 | # to retrieve their profile information. Only checked on Client-Server | |
58 | # requests. Profile requests from other servers should be checked by the | |
59 | # requesting server. Defaults to 'false'. | |
60 | # | |
61 | #limit_profile_requests_to_users_who_share_rooms: true | |
55 | 62 | |
56 | 63 | # If set to 'true', removes the need for authentication to access the server's |
57 | 64 | # public rooms directory through the client API, meaning that anyone can |
35 | 35 | except ImportError: |
36 | 36 | pass |
37 | 37 | |
38 | __version__ = "1.7.0" | |
38 | __version__ = "1.7.1" | |
39 | 39 | |
40 | 40 | if bool(os.environ.get("SYNAPSE_TEST_PATCH_LOG_CONTEXTS", False)): |
41 | 41 | # We import here so that we don't have to install a bunch of deps when |
13 | 13 | # limitations under the License. |
14 | 14 | |
15 | 15 | import logging |
16 | from typing import Dict, Tuple | |
16 | 17 | |
17 | 18 | from six import itervalues |
18 | 19 | |
24 | 25 | import synapse.logging.opentracing as opentracing |
25 | 26 | import synapse.types |
26 | 27 | from synapse import event_auth |
27 | from synapse.api.constants import ( | |
28 | EventTypes, | |
29 | JoinRules, | |
30 | LimitBlockingTypes, | |
31 | Membership, | |
32 | UserTypes, | |
33 | ) | |
28 | from synapse.api.constants import EventTypes, LimitBlockingTypes, Membership, UserTypes | |
34 | 29 | from synapse.api.errors import ( |
35 | 30 | AuthError, |
36 | 31 | Codes, |
512 | 507 | """ |
513 | 508 | return self.store.is_server_admin(user) |
514 | 509 | |
515 | @defer.inlineCallbacks | |
516 | def compute_auth_events(self, event, current_state_ids, for_verification=False): | |
510 | def compute_auth_events( | |
511 | self, | |
512 | event, | |
513 | current_state_ids: Dict[Tuple[str, str], str], | |
514 | for_verification: bool = False, | |
515 | ): | |
516 | """Given an event and current state return the list of event IDs used | |
517 | to auth an event. | |
518 | ||
519 | If `for_verification` is False then only return auth events that | |
520 | should be added to the event's `auth_events`. | |
521 | ||
522 | Returns: | |
523 | defer.Deferred(list[str]): List of event IDs. | |
524 | """ | |
525 | ||
517 | 526 | if event.type == EventTypes.Create: |
518 | return [] | |
527 | return defer.succeed([]) | |
528 | ||
529 | # Currently we ignore the `for_verification` flag even though there are | |
530 | # some situations where we can drop particular auth events when adding | |
531 | # to the event's `auth_events` (e.g. joins pointing to previous joins | |
532 | # when room is publically joinable). Dropping event IDs has the | |
533 | # advantage that the auth chain for the room grows slower, but we use | |
534 | # the auth chain in state resolution v2 to order events, which means | |
535 | # care must be taken if dropping events to ensure that it doesn't | |
536 | # introduce undesirable "state reset" behaviour. | |
537 | # | |
538 | # All of which sounds a bit tricky so we don't bother for now. | |
519 | 539 | |
520 | 540 | auth_ids = [] |
521 | ||
522 | key = (EventTypes.PowerLevels, "") | |
523 | power_level_event_id = current_state_ids.get(key) | |
524 | ||
525 | if power_level_event_id: | |
526 | auth_ids.append(power_level_event_id) | |
527 | ||
528 | key = (EventTypes.JoinRules, "") | |
529 | join_rule_event_id = current_state_ids.get(key) | |
530 | ||
531 | key = (EventTypes.Member, event.sender) | |
532 | member_event_id = current_state_ids.get(key) | |
533 | ||
534 | key = (EventTypes.Create, "") | |
535 | create_event_id = current_state_ids.get(key) | |
536 | if create_event_id: | |
537 | auth_ids.append(create_event_id) | |
538 | ||
539 | if join_rule_event_id: | |
540 | join_rule_event = yield self.store.get_event(join_rule_event_id) | |
541 | join_rule = join_rule_event.content.get("join_rule") | |
542 | is_public = join_rule == JoinRules.PUBLIC if join_rule else False | |
543 | else: | |
544 | is_public = False | |
545 | ||
546 | if event.type == EventTypes.Member: | |
547 | e_type = event.content["membership"] | |
548 | if e_type in [Membership.JOIN, Membership.INVITE]: | |
549 | if join_rule_event_id: | |
550 | auth_ids.append(join_rule_event_id) | |
551 | ||
552 | if e_type == Membership.JOIN: | |
553 | if member_event_id and not is_public: | |
554 | auth_ids.append(member_event_id) | |
555 | else: | |
556 | if member_event_id: | |
557 | auth_ids.append(member_event_id) | |
558 | ||
559 | if for_verification: | |
560 | key = (EventTypes.Member, event.state_key) | |
561 | existing_event_id = current_state_ids.get(key) | |
562 | if existing_event_id: | |
563 | auth_ids.append(existing_event_id) | |
564 | ||
565 | if e_type == Membership.INVITE: | |
566 | if "third_party_invite" in event.content: | |
567 | key = ( | |
568 | EventTypes.ThirdPartyInvite, | |
569 | event.content["third_party_invite"]["signed"]["token"], | |
570 | ) | |
571 | third_party_invite_id = current_state_ids.get(key) | |
572 | if third_party_invite_id: | |
573 | auth_ids.append(third_party_invite_id) | |
574 | elif member_event_id: | |
575 | member_event = yield self.store.get_event(member_event_id) | |
576 | if member_event.content["membership"] == Membership.JOIN: | |
577 | auth_ids.append(member_event.event_id) | |
578 | ||
579 | return auth_ids | |
541 | for etype, state_key in event_auth.auth_types_for_event(event): | |
542 | auth_ev_id = current_state_ids.get((etype, state_key)) | |
543 | if auth_ev_id: | |
544 | auth_ids.append(auth_ev_id) | |
545 | ||
546 | return defer.succeed(auth_ids) | |
580 | 547 | |
581 | 548 | @defer.inlineCallbacks |
582 | 549 | def check_can_change_room_list(self, room_id, user): |
99 | 99 | # display names) of other users through the client API. |
100 | 100 | self.require_auth_for_profile_requests = config.get( |
101 | 101 | "require_auth_for_profile_requests", False |
102 | ) | |
103 | ||
104 | # Whether to require sharing a room with a user to retrieve their | |
105 | # profile data | |
106 | self.limit_profile_requests_to_users_who_share_rooms = config.get( | |
107 | "limit_profile_requests_to_users_who_share_rooms", False, | |
102 | 108 | ) |
103 | 109 | |
104 | 110 | if "restrict_public_rooms_to_local_users" in config and ( |
620 | 626 | # |
621 | 627 | #require_auth_for_profile_requests: true |
622 | 628 | |
629 | # Uncomment to require a user to share a room with another user in order | |
630 | # to retrieve their profile information. Only checked on Client-Server | |
631 | # requests. Profile requests from other servers should be checked by the | |
632 | # requesting server. Defaults to 'false'. | |
633 | # | |
634 | #limit_profile_requests_to_users_who_share_rooms: true | |
635 | ||
623 | 636 | # If set to 'true', removes the need for authentication to access the server's |
624 | 637 | # public rooms directory through the client API, meaning that anyone can |
625 | 638 | # query the room directory. Defaults to 'false'. |
13 | 13 | # limitations under the License. |
14 | 14 | |
15 | 15 | import logging |
16 | from typing import Set, Tuple | |
16 | 17 | |
17 | 18 | from canonicaljson import encode_canonical_json |
18 | 19 | from signedjson.key import decode_verify_key_bytes |
46 | 47 | |
47 | 48 | if not hasattr(event, "room_id"): |
48 | 49 | raise AuthError(500, "Event has no room_id: %s" % event) |
50 | ||
51 | room_id = event.room_id | |
52 | ||
53 | # I'm not really expecting to get auth events in the wrong room, but let's | |
54 | # sanity-check it | |
55 | for auth_event in auth_events.values(): | |
56 | if auth_event.room_id != room_id: | |
57 | raise Exception( | |
58 | "During auth for event %s in room %s, found event %s in the state " | |
59 | "which is in room %s" | |
60 | % (event.event_id, room_id, auth_event.event_id, auth_event.room_id) | |
61 | ) | |
49 | 62 | |
50 | 63 | if do_sig_check: |
51 | 64 | sender_domain = get_domain_from_id(event.sender) |
624 | 637 | return public_keys |
625 | 638 | |
626 | 639 | |
627 | def auth_types_for_event(event): | |
640 | def auth_types_for_event(event) -> Set[Tuple[str]]: | |
628 | 641 | """Given an event, return a list of (EventType, StateKey) that may be |
629 | 642 | needed to auth the event. The returned list may be a superset of what |
630 | 643 | would actually be required depending on the full state of the room. |
633 | 646 | actually auth the event. |
634 | 647 | """ |
635 | 648 | if event.type == EventTypes.Create: |
636 | return [] | |
637 | ||
638 | auth_types = [ | |
649 | return set() | |
650 | ||
651 | auth_types = { | |
639 | 652 | (EventTypes.PowerLevels, ""), |
640 | 653 | (EventTypes.Member, event.sender), |
641 | 654 | (EventTypes.Create, ""), |
642 | ] | |
655 | } | |
643 | 656 | |
644 | 657 | if event.type == EventTypes.Member: |
645 | 658 | membership = event.content["membership"] |
646 | 659 | if membership in [Membership.JOIN, Membership.INVITE]: |
647 | auth_types.append((EventTypes.JoinRules, "")) | |
648 | ||
649 | auth_types.append((EventTypes.Member, event.state_key)) | |
660 | auth_types.add((EventTypes.JoinRules, "")) | |
661 | ||
662 | auth_types.add((EventTypes.Member, event.state_key)) | |
650 | 663 | |
651 | 664 | if membership == Membership.INVITE: |
652 | 665 | if "third_party_invite" in event.content: |
654 | 667 | EventTypes.ThirdPartyInvite, |
655 | 668 | event.content["third_party_invite"]["signed"]["token"], |
656 | 669 | ) |
657 | auth_types.append(key) | |
670 | auth_types.add(key) | |
658 | 671 | |
659 | 672 | return auth_types |
16 | 16 | import copy |
17 | 17 | import itertools |
18 | 18 | import logging |
19 | ||
20 | from six.moves import range | |
21 | 19 | |
22 | 20 | from prometheus_client import Counter |
23 | 21 | |
38 | 36 | ) |
39 | 37 | from synapse.events import builder, room_version_to_event_format |
40 | 38 | from synapse.federation.federation_base import FederationBase, event_from_pdu_json |
41 | from synapse.logging.context import make_deferred_yieldable, run_in_background | |
39 | from synapse.logging.context import make_deferred_yieldable | |
42 | 40 | from synapse.logging.utils import log_function |
43 | 41 | from synapse.util import unwrapFirstError |
44 | 42 | from synapse.util.caches.expiringcache import ExpiringCache |
309 | 307 | return signed_pdu |
310 | 308 | |
311 | 309 | @defer.inlineCallbacks |
312 | @log_function | |
313 | def get_state_for_room(self, destination, room_id, event_id): | |
314 | """Requests all of the room state at a given event from a remote homeserver. | |
315 | ||
316 | Args: | |
317 | destination (str): The remote homeserver to query for the state. | |
318 | room_id (str): The id of the room we're interested in. | |
319 | event_id (str): The id of the event we want the state at. | |
310 | def get_room_state_ids(self, destination: str, room_id: str, event_id: str): | |
311 | """Calls the /state_ids endpoint to fetch the state at a particular point | |
312 | in the room, and the auth events for the given event | |
320 | 313 | |
321 | 314 | Returns: |
322 | Deferred[Tuple[List[EventBase], List[EventBase]]]: | |
323 | A list of events in the state, and a list of events in the auth chain | |
324 | for the given event. | |
315 | Tuple[List[str], List[str]]: a tuple of (state event_ids, auth event_ids) | |
325 | 316 | """ |
326 | 317 | result = yield self.transport_layer.get_room_state_ids( |
327 | 318 | destination, room_id, event_id=event_id |
330 | 321 | state_event_ids = result["pdu_ids"] |
331 | 322 | auth_event_ids = result.get("auth_chain_ids", []) |
332 | 323 | |
333 | fetched_events, failed_to_fetch = yield self.get_events_from_store_or_dest( | |
334 | destination, room_id, set(state_event_ids + auth_event_ids) | |
335 | ) | |
336 | ||
337 | if failed_to_fetch: | |
338 | logger.warning( | |
339 | "Failed to fetch missing state/auth events for %s: %s", | |
340 | room_id, | |
341 | failed_to_fetch, | |
342 | ) | |
343 | ||
344 | event_map = {ev.event_id: ev for ev in fetched_events} | |
345 | ||
346 | pdus = [event_map[e_id] for e_id in state_event_ids if e_id in event_map] | |
347 | auth_chain = [event_map[e_id] for e_id in auth_event_ids if e_id in event_map] | |
348 | ||
349 | auth_chain.sort(key=lambda e: e.depth) | |
350 | ||
351 | return pdus, auth_chain | |
352 | ||
353 | @defer.inlineCallbacks | |
354 | def get_events_from_store_or_dest(self, destination, room_id, event_ids): | |
355 | """Fetch events from a remote destination, checking if we already have them. | |
356 | ||
357 | Args: | |
358 | destination (str) | |
359 | room_id (str) | |
360 | event_ids (list) | |
361 | ||
362 | Returns: | |
363 | Deferred: A deferred resolving to a 2-tuple where the first is a list of | |
364 | events and the second is a list of event ids that we failed to fetch. | |
365 | """ | |
366 | seen_events = yield self.store.get_events(event_ids, allow_rejected=True) | |
367 | signed_events = list(seen_events.values()) | |
368 | ||
369 | failed_to_fetch = set() | |
370 | ||
371 | missing_events = set(event_ids) | |
372 | for k in seen_events: | |
373 | missing_events.discard(k) | |
374 | ||
375 | if not missing_events: | |
376 | return signed_events, failed_to_fetch | |
377 | ||
378 | logger.debug( | |
379 | "Fetching unknown state/auth events %s for room %s", | |
380 | missing_events, | |
381 | event_ids, | |
382 | ) | |
383 | ||
384 | room_version = yield self.store.get_room_version(room_id) | |
385 | ||
386 | batch_size = 20 | |
387 | missing_events = list(missing_events) | |
388 | for i in range(0, len(missing_events), batch_size): | |
389 | batch = set(missing_events[i : i + batch_size]) | |
390 | ||
391 | deferreds = [ | |
392 | run_in_background( | |
393 | self.get_pdu, | |
394 | destinations=[destination], | |
395 | event_id=e_id, | |
396 | room_version=room_version, | |
397 | ) | |
398 | for e_id in batch | |
399 | ] | |
400 | ||
401 | res = yield make_deferred_yieldable( | |
402 | defer.DeferredList(deferreds, consumeErrors=True) | |
403 | ) | |
404 | for success, result in res: | |
405 | if success and result: | |
406 | signed_events.append(result) | |
407 | batch.discard(result.event_id) | |
408 | ||
409 | # We removed all events we successfully fetched from `batch` | |
410 | failed_to_fetch.update(batch) | |
411 | ||
412 | return signed_events, failed_to_fetch | |
324 | if not isinstance(state_event_ids, list) or not isinstance( | |
325 | auth_event_ids, list | |
326 | ): | |
327 | raise Exception("invalid response from /state_ids") | |
328 | ||
329 | return state_event_ids, auth_event_ids | |
413 | 330 | |
414 | 331 | @defer.inlineCallbacks |
415 | 332 | @log_function |
63 | 63 | from synapse.replication.http.membership import ReplicationUserJoinedLeftRoomRestServlet |
64 | 64 | from synapse.state import StateResolutionStore, resolve_events_with_store |
65 | 65 | from synapse.types import UserID, get_domain_from_id |
66 | from synapse.util import unwrapFirstError | |
67 | from synapse.util.async_helpers import Linearizer | |
66 | from synapse.util.async_helpers import Linearizer, concurrently_execute | |
68 | 67 | from synapse.util.distributor import user_joined_room |
69 | 68 | from synapse.util.retryutils import NotRetryingDestination |
70 | 69 | from synapse.visibility import filter_events_for_server |
239 | 238 | return None |
240 | 239 | |
241 | 240 | state = None |
242 | auth_chain = [] | |
243 | 241 | |
244 | 242 | # Get missing pdus if necessary. |
245 | 243 | if not pdu.internal_metadata.is_outlier(): |
345 | 343 | |
346 | 344 | # Calculate the state after each of the previous events, and |
347 | 345 | # resolve them to find the correct state at the current event. |
348 | auth_chains = set() | |
349 | 346 | event_map = {event_id: pdu} |
350 | 347 | try: |
351 | 348 | # Get the state of the events we know about |
369 | 366 | p, |
370 | 367 | ) |
371 | 368 | |
372 | room_version = yield self.store.get_room_version(room_id) | |
373 | ||
374 | 369 | with nested_logging_context(p): |
375 | 370 | # note that if any of the missing prevs share missing state or |
376 | 371 | # auth events, the requests to fetch those events are deduped |
377 | 372 | # by the get_pdu_cache in federation_client. |
378 | ( | |
379 | remote_state, | |
380 | got_auth_chain, | |
381 | ) = yield self.federation_client.get_state_for_room( | |
382 | origin, room_id, p | |
373 | (remote_state, _,) = yield self._get_state_for_room( | |
374 | origin, room_id, p, include_event_in_state=True | |
383 | 375 | ) |
384 | ||
385 | # we want the state *after* p; get_state_for_room returns the | |
386 | # state *before* p. | |
387 | remote_event = yield self.federation_client.get_pdu( | |
388 | [origin], p, room_version, outlier=True | |
389 | ) | |
390 | ||
391 | if remote_event is None: | |
392 | raise Exception( | |
393 | "Unable to get missing prev_event %s" % (p,) | |
394 | ) | |
395 | ||
396 | if remote_event.is_state(): | |
397 | remote_state.append(remote_event) | |
398 | ||
399 | # XXX hrm I'm not convinced that duplicate events will compare | |
400 | # for equality, so I'm not sure this does what the author | |
401 | # hoped. | |
402 | auth_chains.update(got_auth_chain) | |
403 | 376 | |
404 | 377 | remote_state_map = { |
405 | 378 | (x.type, x.state_key): x.event_id for x in remote_state |
409 | 382 | for x in remote_state: |
410 | 383 | event_map[x.event_id] = x |
411 | 384 | |
385 | room_version = yield self.store.get_room_version(room_id) | |
412 | 386 | state_map = yield resolve_events_with_store( |
387 | room_id, | |
413 | 388 | room_version, |
414 | 389 | state_maps, |
415 | 390 | event_map, |
429 | 404 | event_map.update(evs) |
430 | 405 | |
431 | 406 | state = [event_map[e] for e in six.itervalues(state_map)] |
432 | auth_chain = list(auth_chains) | |
433 | 407 | except Exception: |
434 | 408 | logger.warning( |
435 | 409 | "[%s %s] Error attempting to resolve state at missing " |
445 | 419 | affected=event_id, |
446 | 420 | ) |
447 | 421 | |
448 | yield self._process_received_pdu( | |
449 | origin, pdu, state=state, auth_chain=auth_chain | |
450 | ) | |
422 | yield self._process_received_pdu(origin, pdu, state=state) | |
451 | 423 | |
452 | 424 | @defer.inlineCallbacks |
453 | 425 | def _get_missing_events_for_pdu(self, origin, pdu, prevs, min_depth): |
583 | 555 | raise |
584 | 556 | |
585 | 557 | @defer.inlineCallbacks |
586 | def _process_received_pdu(self, origin, event, state, auth_chain): | |
558 | @log_function | |
559 | def _get_state_for_room( | |
560 | self, destination, room_id, event_id, include_event_in_state | |
561 | ): | |
562 | """Requests all of the room state at a given event from a remote homeserver. | |
563 | ||
564 | Args: | |
565 | destination (str): The remote homeserver to query for the state. | |
566 | room_id (str): The id of the room we're interested in. | |
567 | event_id (str): The id of the event we want the state at. | |
568 | include_event_in_state: if true, the event itself will be included in the | |
569 | returned state event list. | |
570 | ||
571 | Returns: | |
572 | Deferred[Tuple[List[EventBase], List[EventBase]]]: | |
573 | A list of events in the state, and a list of events in the auth chain | |
574 | for the given event. | |
575 | """ | |
576 | ( | |
577 | state_event_ids, | |
578 | auth_event_ids, | |
579 | ) = yield self.federation_client.get_room_state_ids( | |
580 | destination, room_id, event_id=event_id | |
581 | ) | |
582 | ||
583 | desired_events = set(state_event_ids + auth_event_ids) | |
584 | ||
585 | if include_event_in_state: | |
586 | desired_events.add(event_id) | |
587 | ||
588 | event_map = yield self._get_events_from_store_or_dest( | |
589 | destination, room_id, desired_events | |
590 | ) | |
591 | ||
592 | failed_to_fetch = desired_events - event_map.keys() | |
593 | if failed_to_fetch: | |
594 | logger.warning( | |
595 | "Failed to fetch missing state/auth events for %s: %s", | |
596 | room_id, | |
597 | failed_to_fetch, | |
598 | ) | |
599 | ||
600 | remote_state = [ | |
601 | event_map[e_id] for e_id in state_event_ids if e_id in event_map | |
602 | ] | |
603 | ||
604 | if include_event_in_state: | |
605 | remote_event = event_map.get(event_id) | |
606 | if not remote_event: | |
607 | raise Exception("Unable to get missing prev_event %s" % (event_id,)) | |
608 | if remote_event.is_state() and remote_event.rejected_reason is None: | |
609 | remote_state.append(remote_event) | |
610 | ||
611 | auth_chain = [event_map[e_id] for e_id in auth_event_ids if e_id in event_map] | |
612 | auth_chain.sort(key=lambda e: e.depth) | |
613 | ||
614 | return remote_state, auth_chain | |
615 | ||
616 | @defer.inlineCallbacks | |
617 | def _get_events_from_store_or_dest(self, destination, room_id, event_ids): | |
618 | """Fetch events from a remote destination, checking if we already have them. | |
619 | ||
620 | Args: | |
621 | destination (str) | |
622 | room_id (str) | |
623 | event_ids (Iterable[str]) | |
624 | ||
625 | Persists any events we don't already have as outliers. | |
626 | ||
627 | If we fail to fetch any of the events, a warning will be logged, and the event | |
628 | will be omitted from the result. Likewise, any events which turn out not to | |
629 | be in the given room. | |
630 | ||
631 | Returns: | |
632 | Deferred[dict[str, EventBase]]: A deferred resolving to a map | |
633 | from event_id to event | |
634 | """ | |
635 | fetched_events = yield self.store.get_events(event_ids, allow_rejected=True) | |
636 | ||
637 | missing_events = set(event_ids) - fetched_events.keys() | |
638 | ||
639 | if missing_events: | |
640 | logger.debug( | |
641 | "Fetching unknown state/auth events %s for room %s", | |
642 | missing_events, | |
643 | room_id, | |
644 | ) | |
645 | ||
646 | yield self._get_events_and_persist( | |
647 | destination=destination, room_id=room_id, events=missing_events | |
648 | ) | |
649 | ||
650 | # we need to make sure we re-load from the database to get the rejected | |
651 | # state correct. | |
652 | fetched_events.update( | |
653 | (yield self.store.get_events(missing_events, allow_rejected=True)) | |
654 | ) | |
655 | ||
656 | # check for events which were in the wrong room. | |
657 | # | |
658 | # this can happen if a remote server claims that the state or | |
659 | # auth_events at an event in room A are actually events in room B | |
660 | ||
661 | bad_events = list( | |
662 | (event_id, event.room_id) | |
663 | for event_id, event in fetched_events.items() | |
664 | if event.room_id != room_id | |
665 | ) | |
666 | ||
667 | for bad_event_id, bad_room_id in bad_events: | |
668 | # This is a bogus situation, but since we may only discover it a long time | |
669 | # after it happened, we try our best to carry on, by just omitting the | |
670 | # bad events from the returned auth/state set. | |
671 | logger.warning( | |
672 | "Remote server %s claims event %s in room %s is an auth/state " | |
673 | "event in room %s", | |
674 | destination, | |
675 | bad_event_id, | |
676 | bad_room_id, | |
677 | room_id, | |
678 | ) | |
679 | del fetched_events[bad_event_id] | |
680 | ||
681 | return fetched_events | |
682 | ||
683 | @defer.inlineCallbacks | |
684 | def _process_received_pdu(self, origin, event, state): | |
587 | 685 | """ Called when we have a new pdu. We need to do auth checks and put it |
588 | 686 | through the StateHandler. |
687 | ||
688 | Args: | |
689 | origin: server sending the event | |
690 | ||
691 | event: event to be persisted | |
692 | ||
693 | state: Normally None, but if we are handling a gap in the graph | |
694 | (ie, we are missing one or more prev_events), the resolved state at the | |
695 | event | |
589 | 696 | """ |
590 | 697 | room_id = event.room_id |
591 | 698 | event_id = event.event_id |
592 | 699 | |
593 | 700 | logger.debug("[%s %s] Processing event: %s", room_id, event_id, event) |
594 | ||
595 | event_ids = set() | |
596 | if state: | |
597 | event_ids |= {e.event_id for e in state} | |
598 | if auth_chain: | |
599 | event_ids |= {e.event_id for e in auth_chain} | |
600 | ||
601 | seen_ids = yield self.store.have_seen_events(event_ids) | |
602 | ||
603 | if state and auth_chain is not None: | |
604 | # If we have any state or auth_chain given to us by the replication | |
605 | # layer, then we should handle them (if we haven't before.) | |
606 | ||
607 | event_infos = [] | |
608 | ||
609 | for e in itertools.chain(auth_chain, state): | |
610 | if e.event_id in seen_ids: | |
611 | continue | |
612 | e.internal_metadata.outlier = True | |
613 | auth_ids = e.auth_event_ids() | |
614 | auth = { | |
615 | (e.type, e.state_key): e | |
616 | for e in auth_chain | |
617 | if e.event_id in auth_ids or e.type == EventTypes.Create | |
618 | } | |
619 | event_infos.append(_NewEventInfo(event=e, auth_events=auth)) | |
620 | seen_ids.add(e.event_id) | |
621 | ||
622 | logger.info( | |
623 | "[%s %s] persisting newly-received auth/state events %s", | |
624 | room_id, | |
625 | event_id, | |
626 | [e.event.event_id for e in event_infos], | |
627 | ) | |
628 | yield self._handle_new_events(origin, event_infos) | |
629 | 701 | |
630 | 702 | try: |
631 | 703 | context = yield self._handle_new_event(origin, event, state=state) |
681 | 753 | """ |
682 | 754 | if dest == self.server_name: |
683 | 755 | raise SynapseError(400, "Can't backfill from self.") |
684 | ||
685 | room_version = yield self.store.get_room_version(room_id) | |
686 | 756 | |
687 | 757 | events = yield self.federation_client.backfill( |
688 | 758 | dest, room_id, limit=limit, extremities=extremities |
712 | 782 | |
713 | 783 | event_ids = set(e.event_id for e in events) |
714 | 784 | |
785 | # build a list of events whose prev_events weren't in the batch. | |
786 | # (XXX: this will include events whose prev_events we already have; that doesn't | |
787 | # sound right?) | |
715 | 788 | edges = [ev.event_id for ev in events if set(ev.prev_event_ids()) - event_ids] |
716 | 789 | |
717 | 790 | logger.info("backfill: Got %d events with %d edges", len(events), len(edges)) |
722 | 795 | state_events = {} |
723 | 796 | events_to_state = {} |
724 | 797 | for e_id in edges: |
725 | state, auth = yield self.federation_client.get_state_for_room( | |
798 | state, auth = yield self._get_state_for_room( | |
726 | 799 | destination=dest, room_id=room_id, event_id=e_id |
727 | 800 | ) |
728 | 801 | auth_events.update({a.event_id: a for a in auth}) |
740 | 813 | auth_events.update( |
741 | 814 | {e_id: event_map[e_id] for e_id in required_auth if e_id in event_map} |
742 | 815 | ) |
743 | missing_auth = required_auth - set(auth_events) | |
744 | failed_to_fetch = set() | |
745 | ||
746 | # Try and fetch any missing auth events from both DB and remote servers. | |
747 | # We repeatedly do this until we stop finding new auth events. | |
748 | while missing_auth - failed_to_fetch: | |
749 | logger.info("Missing auth for backfill: %r", missing_auth) | |
750 | ret_events = yield self.store.get_events(missing_auth - failed_to_fetch) | |
751 | auth_events.update(ret_events) | |
752 | ||
753 | required_auth.update( | |
754 | a_id for event in ret_events.values() for a_id in event.auth_event_ids() | |
755 | ) | |
756 | missing_auth = required_auth - set(auth_events) | |
757 | ||
758 | if missing_auth - failed_to_fetch: | |
759 | logger.info( | |
760 | "Fetching missing auth for backfill: %r", | |
761 | missing_auth - failed_to_fetch, | |
762 | ) | |
763 | ||
764 | results = yield make_deferred_yieldable( | |
765 | defer.gatherResults( | |
766 | [ | |
767 | run_in_background( | |
768 | self.federation_client.get_pdu, | |
769 | [dest], | |
770 | event_id, | |
771 | room_version=room_version, | |
772 | outlier=True, | |
773 | timeout=10000, | |
774 | ) | |
775 | for event_id in missing_auth - failed_to_fetch | |
776 | ], | |
777 | consumeErrors=True, | |
778 | ) | |
779 | ).addErrback(unwrapFirstError) | |
780 | auth_events.update({a.event_id: a for a in results if a}) | |
781 | required_auth.update( | |
782 | a_id | |
783 | for event in results | |
784 | if event | |
785 | for a_id in event.auth_event_ids() | |
786 | ) | |
787 | missing_auth = required_auth - set(auth_events) | |
788 | ||
789 | failed_to_fetch = missing_auth - set(auth_events) | |
790 | ||
791 | seen_events = yield self.store.have_seen_events( | |
792 | set(auth_events.keys()) | set(state_events.keys()) | |
793 | ) | |
794 | ||
795 | # We now have a chunk of events plus associated state and auth chain to | |
796 | # persist. We do the persistence in two steps: | |
797 | # 1. Auth events and state get persisted as outliers, plus the | |
798 | # backward extremities get persisted (as non-outliers). | |
799 | # 2. The rest of the events in the chunk get persisted one by one, as | |
800 | # each one depends on the previous event for its state. | |
801 | # | |
802 | # The important thing is that events in the chunk get persisted as | |
803 | # non-outliers, including when those events are also in the state or | |
804 | # auth chain. Caution must therefore be taken to ensure that they are | |
805 | # not accidentally marked as outliers. | |
806 | ||
807 | # Step 1a: persist auth events that *don't* appear in the chunk | |
816 | ||
808 | 817 | ev_infos = [] |
809 | for a in auth_events.values(): | |
810 | # We only want to persist auth events as outliers that we haven't | |
811 | # seen and aren't about to persist as part of the backfilled chunk. | |
812 | if a.event_id in seen_events or a.event_id in event_map: | |
813 | continue | |
814 | ||
815 | a.internal_metadata.outlier = True | |
816 | ev_infos.append( | |
817 | _NewEventInfo( | |
818 | event=a, | |
819 | auth_events={ | |
820 | ( | |
821 | auth_events[a_id].type, | |
822 | auth_events[a_id].state_key, | |
823 | ): auth_events[a_id] | |
824 | for a_id in a.auth_event_ids() | |
825 | if a_id in auth_events | |
826 | }, | |
827 | ) | |
828 | ) | |
829 | ||
830 | # Step 1b: persist the events in the chunk we fetched state for (i.e. | |
831 | # the backwards extremities) as non-outliers. | |
818 | ||
819 | # Step 1: persist the events in the chunk we fetched state for (i.e. | |
820 | # the backwards extremities), with custom auth events and state | |
832 | 821 | for e_id in events_to_state: |
833 | 822 | # For paranoia we ensure that these events are marked as |
834 | 823 | # non-outliers |
1069 | 1058 | tried_domains.update(dom for dom, _ in likely_domains) |
1070 | 1059 | |
1071 | 1060 | return False |
1061 | ||
1062 | @defer.inlineCallbacks | |
1063 | def _get_events_and_persist( | |
1064 | self, destination: str, room_id: str, events: Iterable[str] | |
1065 | ): | |
1066 | """Fetch the given events from a server, and persist them as outliers. | |
1067 | ||
1068 | Logs a warning if we can't find the given event. | |
1069 | """ | |
1070 | ||
1071 | room_version = yield self.store.get_room_version(room_id) | |
1072 | ||
1073 | event_infos = [] | |
1074 | ||
1075 | async def get_event(event_id: str): | |
1076 | with nested_logging_context(event_id): | |
1077 | try: | |
1078 | event = await self.federation_client.get_pdu( | |
1079 | [destination], event_id, room_version, outlier=True, | |
1080 | ) | |
1081 | if event is None: | |
1082 | logger.warning( | |
1083 | "Server %s didn't return event %s", destination, event_id, | |
1084 | ) | |
1085 | return | |
1086 | ||
1087 | # recursively fetch the auth events for this event | |
1088 | auth_events = await self._get_events_from_store_or_dest( | |
1089 | destination, room_id, event.auth_event_ids() | |
1090 | ) | |
1091 | auth = {} | |
1092 | for auth_event_id in event.auth_event_ids(): | |
1093 | ae = auth_events.get(auth_event_id) | |
1094 | if ae: | |
1095 | auth[(ae.type, ae.state_key)] = ae | |
1096 | ||
1097 | event_infos.append(_NewEventInfo(event, None, auth)) | |
1098 | ||
1099 | except Exception as e: | |
1100 | logger.warning( | |
1101 | "Error fetching missing state/auth event %s: %s %s", | |
1102 | event_id, | |
1103 | type(e), | |
1104 | e, | |
1105 | ) | |
1106 | ||
1107 | yield concurrently_execute(get_event, events, 5) | |
1108 | ||
1109 | yield self._handle_new_events( | |
1110 | destination, event_infos, | |
1111 | ) | |
1072 | 1112 | |
1073 | 1113 | def _sanity_check_event(self, ev): |
1074 | 1114 | """ |
294 | 294 | be found to be in any room the server is in, and therefore the query |
295 | 295 | is denied. |
296 | 296 | """ |
297 | ||
297 | 298 | # Implementation of MSC1301: don't allow looking up profiles if the |
298 | 299 | # requester isn't in the same room as the target. We expect requester to |
299 | 300 | # be None when this function is called outside of a profile query, e.g. |
300 | 301 | # when building a membership event. In this case, we must allow the |
301 | 302 | # lookup. |
302 | if not self.hs.config.require_auth_for_profile_requests or not requester: | |
303 | if ( | |
304 | not self.hs.config.limit_profile_requests_to_users_who_share_rooms | |
305 | or not requester | |
306 | ): | |
303 | 307 | return |
304 | 308 | |
305 | 309 | # Always allow the user to query their own profile. |
906 | 906 | |
907 | 907 | results["events_before"] = yield filter_evts(results["events_before"]) |
908 | 908 | results["events_after"] = yield filter_evts(results["events_after"]) |
909 | results["event"] = event | |
909 | # filter_evts can return a pruned event in case the user is allowed to see that | |
910 | # there's something there but not see the content, so use the event that's in | |
911 | # `filtered` rather than the event we retrieved from the datastore. | |
912 | results["event"] = filtered[0] | |
910 | 913 | |
911 | 914 | if results["events_after"]: |
912 | 915 | last_event_id = results["events_after"][-1].event_id |
937 | 940 | if event_filter: |
938 | 941 | state_events = event_filter.filter(state_events) |
939 | 942 | |
940 | results["state"] = state_events | |
943 | results["state"] = yield filter_evts(state_events) | |
941 | 944 | |
942 | 945 | # We use a dummy token here as we only care about the room portion of |
943 | 946 | # the token, which we replace. |
15 | 15 | |
16 | 16 | import logging |
17 | 17 | from collections import namedtuple |
18 | from typing import Iterable, Optional | |
18 | from typing import Dict, Iterable, List, Optional, Tuple | |
19 | 19 | |
20 | 20 | from six import iteritems, itervalues |
21 | 21 | |
415 | 415 | |
416 | 416 | with Measure(self.clock, "state._resolve_events"): |
417 | 417 | new_state = yield resolve_events_with_store( |
418 | event.room_id, | |
418 | 419 | room_version, |
419 | 420 | state_set_ids, |
420 | 421 | event_map=state_map, |
460 | 461 | not be called for a single state group |
461 | 462 | |
462 | 463 | Args: |
463 | room_id (str): room we are resolving for (used for logging) | |
464 | room_id (str): room we are resolving for (used for logging and sanity checks) | |
464 | 465 | room_version (str): version of the room |
465 | 466 | state_groups_ids (dict[int, dict[(str, str), str]]): |
466 | 467 | map from state group id to the state in that state group |
516 | 517 | logger.info("Resolving conflicted state for %r", room_id) |
517 | 518 | with Measure(self.clock, "state._resolve_events"): |
518 | 519 | new_state = yield resolve_events_with_store( |
520 | room_id, | |
519 | 521 | room_version, |
520 | 522 | list(itervalues(state_groups_ids)), |
521 | 523 | event_map=event_map, |
587 | 589 | ) |
588 | 590 | |
589 | 591 | |
590 | def resolve_events_with_store(room_version, state_sets, event_map, state_res_store): | |
592 | def resolve_events_with_store( | |
593 | room_id: str, | |
594 | room_version: str, | |
595 | state_sets: List[Dict[Tuple[str, str], str]], | |
596 | event_map: Optional[Dict[str, EventBase]], | |
597 | state_res_store: "StateResolutionStore", | |
598 | ): | |
591 | 599 | """ |
592 | 600 | Args: |
593 | room_version(str): Version of the room | |
594 | ||
595 | state_sets(list): List of dicts of (type, state_key) -> event_id, | |
601 | room_id: the room we are working in | |
602 | ||
603 | room_version: Version of the room | |
604 | ||
605 | state_sets: List of dicts of (type, state_key) -> event_id, | |
596 | 606 | which are the different state groups to resolve. |
597 | 607 | |
598 | event_map(dict[str,FrozenEvent]|None): | |
608 | event_map: | |
599 | 609 | a dict from event_id to event, for any events that we happen to |
600 | 610 | have in flight (eg, those currently being persisted). This will be |
601 | 611 | used as a starting point fof finding the state we need; any missing |
602 | 612 | events will be requested via state_map_factory. |
603 | 613 | |
604 | If None, all events will be fetched via state_map_factory. | |
605 | ||
606 | state_res_store (StateResolutionStore) | |
607 | ||
608 | Returns | |
614 | If None, all events will be fetched via state_res_store. | |
615 | ||
616 | state_res_store: a place to fetch events from | |
617 | ||
618 | Returns: | |
609 | 619 | Deferred[dict[(str, str), str]]: |
610 | 620 | a map from (type, state_key) to event_id. |
611 | 621 | """ |
612 | 622 | v = KNOWN_ROOM_VERSIONS[room_version] |
613 | 623 | if v.state_res == StateResolutionVersions.V1: |
614 | 624 | return v1.resolve_events_with_store( |
615 | state_sets, event_map, state_res_store.get_events | |
625 | room_id, state_sets, event_map, state_res_store.get_events | |
616 | 626 | ) |
617 | 627 | else: |
618 | 628 | return v2.resolve_events_with_store( |
619 | room_version, state_sets, event_map, state_res_store | |
629 | room_id, room_version, state_sets, event_map, state_res_store | |
620 | 630 | ) |
621 | 631 | |
622 | 632 |
14 | 14 | |
15 | 15 | import hashlib |
16 | 16 | import logging |
17 | from typing import Callable, Dict, List, Optional, Tuple | |
17 | 18 | |
18 | 19 | from six import iteritems, iterkeys, itervalues |
19 | 20 | |
23 | 24 | from synapse.api.constants import EventTypes |
24 | 25 | from synapse.api.errors import AuthError |
25 | 26 | from synapse.api.room_versions import RoomVersions |
27 | from synapse.events import EventBase | |
26 | 28 | |
27 | 29 | logger = logging.getLogger(__name__) |
28 | 30 | |
31 | 33 | |
32 | 34 | |
33 | 35 | @defer.inlineCallbacks |
34 | def resolve_events_with_store(state_sets, event_map, state_map_factory): | |
36 | def resolve_events_with_store( | |
37 | room_id: str, | |
38 | state_sets: List[Dict[Tuple[str, str], str]], | |
39 | event_map: Optional[Dict[str, EventBase]], | |
40 | state_map_factory: Callable, | |
41 | ): | |
35 | 42 | """ |
36 | 43 | Args: |
37 | state_sets(list): List of dicts of (type, state_key) -> event_id, | |
44 | room_id: the room we are working in | |
45 | ||
46 | state_sets: List of dicts of (type, state_key) -> event_id, | |
38 | 47 | which are the different state groups to resolve. |
39 | 48 | |
40 | event_map(dict[str,FrozenEvent]|None): | |
49 | event_map: | |
41 | 50 | a dict from event_id to event, for any events that we happen to |
42 | 51 | have in flight (eg, those currently being persisted). This will be |
43 | 52 | used as a starting point fof finding the state we need; any missing |
45 | 54 | |
46 | 55 | If None, all events will be fetched via state_map_factory. |
47 | 56 | |
48 | state_map_factory(func): will be called | |
57 | state_map_factory: will be called | |
49 | 58 | with a list of event_ids that are needed, and should return with |
50 | 59 | a Deferred of dict of event_id to event. |
51 | 60 | |
52 | Returns | |
61 | Returns: | |
53 | 62 | Deferred[dict[(str, str), str]]: |
54 | 63 | a map from (type, state_key) to event_id. |
55 | 64 | """ |
74 | 83 | state_map = yield state_map_factory(needed_events) |
75 | 84 | if event_map is not None: |
76 | 85 | state_map.update(event_map) |
86 | ||
87 | # everything in the state map should be in the right room | |
88 | for event in state_map.values(): | |
89 | if event.room_id != room_id: | |
90 | raise Exception( | |
91 | "Attempting to state-resolve for room %s with event %s which is in %s" | |
92 | % (room_id, event.event_id, event.room_id,) | |
93 | ) | |
77 | 94 | |
78 | 95 | # get the ids of the auth events which allow us to authenticate the |
79 | 96 | # conflicted state, picking only from the unconflicting state. |
94 | 111 | ) |
95 | 112 | |
96 | 113 | state_map_new = yield state_map_factory(new_needed_events) |
114 | for event in state_map_new.values(): | |
115 | if event.room_id != room_id: | |
116 | raise Exception( | |
117 | "Attempting to state-resolve for room %s with event %s which is in %s" | |
118 | % (room_id, event.event_id, event.room_id,) | |
119 | ) | |
120 | ||
97 | 121 | state_map.update(state_map_new) |
98 | 122 | |
99 | 123 | return _resolve_with_state( |
15 | 15 | import heapq |
16 | 16 | import itertools |
17 | 17 | import logging |
18 | from typing import Dict, List, Optional, Tuple | |
18 | 19 | |
19 | 20 | from six import iteritems, itervalues |
20 | 21 | |
21 | 22 | from twisted.internet import defer |
22 | 23 | |
24 | import synapse.state | |
23 | 25 | from synapse import event_auth |
24 | 26 | from synapse.api.constants import EventTypes |
25 | 27 | from synapse.api.errors import AuthError |
28 | from synapse.events import EventBase | |
26 | 29 | |
27 | 30 | logger = logging.getLogger(__name__) |
28 | 31 | |
29 | 32 | |
30 | 33 | @defer.inlineCallbacks |
31 | def resolve_events_with_store(room_version, state_sets, event_map, state_res_store): | |
34 | def resolve_events_with_store( | |
35 | room_id: str, | |
36 | room_version: str, | |
37 | state_sets: List[Dict[Tuple[str, str], str]], | |
38 | event_map: Optional[Dict[str, EventBase]], | |
39 | state_res_store: "synapse.state.StateResolutionStore", | |
40 | ): | |
32 | 41 | """Resolves the state using the v2 state resolution algorithm |
33 | 42 | |
34 | 43 | Args: |
35 | room_version (str): The room version | |
36 | ||
37 | state_sets(list): List of dicts of (type, state_key) -> event_id, | |
44 | room_id: the room we are working in | |
45 | ||
46 | room_version: The room version | |
47 | ||
48 | state_sets: List of dicts of (type, state_key) -> event_id, | |
38 | 49 | which are the different state groups to resolve. |
39 | 50 | |
40 | event_map(dict[str,FrozenEvent]|None): | |
51 | event_map: | |
41 | 52 | a dict from event_id to event, for any events that we happen to |
42 | 53 | have in flight (eg, those currently being persisted). This will be |
43 | 54 | used as a starting point fof finding the state we need; any missing |
45 | 56 | |
46 | 57 | If None, all events will be fetched via state_res_store. |
47 | 58 | |
48 | state_res_store (StateResolutionStore) | |
49 | ||
50 | Returns | |
59 | state_res_store: | |
60 | ||
61 | Returns: | |
51 | 62 | Deferred[dict[(str, str), str]]: |
52 | 63 | a map from (type, state_key) to event_id. |
53 | 64 | """ |
83 | 94 | ) |
84 | 95 | event_map.update(events) |
85 | 96 | |
97 | # everything in the event map should be in the right room | |
98 | for event in event_map.values(): | |
99 | if event.room_id != room_id: | |
100 | raise Exception( | |
101 | "Attempting to state-resolve for room %s with event %s which is in %s" | |
102 | % (room_id, event.event_id, event.room_id,) | |
103 | ) | |
104 | ||
86 | 105 | full_conflicted_set = set(eid for eid in full_conflicted_set if eid in event_map) |
87 | 106 | |
88 | 107 | logger.debug("%d full_conflicted_set entries", len(full_conflicted_set)) |
93 | 112 | ) |
94 | 113 | |
95 | 114 | sorted_power_events = yield _reverse_topological_power_sort( |
96 | power_events, event_map, state_res_store, full_conflicted_set | |
115 | room_id, power_events, event_map, state_res_store, full_conflicted_set | |
97 | 116 | ) |
98 | 117 | |
99 | 118 | logger.debug("sorted %d power events", len(sorted_power_events)) |
100 | 119 | |
101 | 120 | # Now sequentially auth each one |
102 | 121 | resolved_state = yield _iterative_auth_checks( |
122 | room_id, | |
103 | 123 | room_version, |
104 | 124 | sorted_power_events, |
105 | 125 | unconflicted_state, |
120 | 140 | |
121 | 141 | pl = resolved_state.get((EventTypes.PowerLevels, ""), None) |
122 | 142 | leftover_events = yield _mainline_sort( |
123 | leftover_events, pl, event_map, state_res_store | |
143 | room_id, leftover_events, pl, event_map, state_res_store | |
124 | 144 | ) |
125 | 145 | |
126 | 146 | logger.debug("resolving remaining events") |
127 | 147 | |
128 | 148 | resolved_state = yield _iterative_auth_checks( |
129 | room_version, leftover_events, resolved_state, event_map, state_res_store | |
149 | room_id, | |
150 | room_version, | |
151 | leftover_events, | |
152 | resolved_state, | |
153 | event_map, | |
154 | state_res_store, | |
130 | 155 | ) |
131 | 156 | |
132 | 157 | logger.debug("resolved") |
140 | 165 | |
141 | 166 | |
142 | 167 | @defer.inlineCallbacks |
143 | def _get_power_level_for_sender(event_id, event_map, state_res_store): | |
168 | def _get_power_level_for_sender(room_id, event_id, event_map, state_res_store): | |
144 | 169 | """Return the power level of the sender of the given event according to |
145 | 170 | their auth events. |
146 | 171 | |
147 | 172 | Args: |
173 | room_id (str) | |
148 | 174 | event_id (str) |
149 | 175 | event_map (dict[str,FrozenEvent]) |
150 | 176 | state_res_store (StateResolutionStore) |
152 | 178 | Returns: |
153 | 179 | Deferred[int] |
154 | 180 | """ |
155 | event = yield _get_event(event_id, event_map, state_res_store) | |
181 | event = yield _get_event(room_id, event_id, event_map, state_res_store) | |
156 | 182 | |
157 | 183 | pl = None |
158 | 184 | for aid in event.auth_event_ids(): |
159 | aev = yield _get_event(aid, event_map, state_res_store) | |
185 | aev = yield _get_event(room_id, aid, event_map, state_res_store) | |
160 | 186 | if (aev.type, aev.state_key) == (EventTypes.PowerLevels, ""): |
161 | 187 | pl = aev |
162 | 188 | break |
164 | 190 | if pl is None: |
165 | 191 | # Couldn't find power level. Check if they're the creator of the room |
166 | 192 | for aid in event.auth_event_ids(): |
167 | aev = yield _get_event(aid, event_map, state_res_store) | |
193 | aev = yield _get_event(room_id, aid, event_map, state_res_store) | |
168 | 194 | if (aev.type, aev.state_key) == (EventTypes.Create, ""): |
169 | 195 | if aev.content.get("creator") == event.sender: |
170 | 196 | return 100 |
278 | 304 | |
279 | 305 | @defer.inlineCallbacks |
280 | 306 | def _add_event_and_auth_chain_to_graph( |
281 | graph, event_id, event_map, state_res_store, auth_diff | |
307 | graph, room_id, event_id, event_map, state_res_store, auth_diff | |
282 | 308 | ): |
283 | 309 | """Helper function for _reverse_topological_power_sort that add the event |
284 | 310 | and its auth chain (that is in the auth diff) to the graph |
286 | 312 | Args: |
287 | 313 | graph (dict[str, set[str]]): A map from event ID to the events auth |
288 | 314 | event IDs |
315 | room_id (str): the room we are working in | |
289 | 316 | event_id (str): Event to add to the graph |
290 | 317 | event_map (dict[str,FrozenEvent]) |
291 | 318 | state_res_store (StateResolutionStore) |
297 | 324 | eid = state.pop() |
298 | 325 | graph.setdefault(eid, set()) |
299 | 326 | |
300 | event = yield _get_event(eid, event_map, state_res_store) | |
327 | event = yield _get_event(room_id, eid, event_map, state_res_store) | |
301 | 328 | for aid in event.auth_event_ids(): |
302 | 329 | if aid in auth_diff: |
303 | 330 | if aid not in graph: |
307 | 334 | |
308 | 335 | |
309 | 336 | @defer.inlineCallbacks |
310 | def _reverse_topological_power_sort(event_ids, event_map, state_res_store, auth_diff): | |
337 | def _reverse_topological_power_sort( | |
338 | room_id, event_ids, event_map, state_res_store, auth_diff | |
339 | ): | |
311 | 340 | """Returns a list of the event_ids sorted by reverse topological ordering, |
312 | 341 | and then by power level and origin_server_ts |
313 | 342 | |
314 | 343 | Args: |
344 | room_id (str): the room we are working in | |
315 | 345 | event_ids (list[str]): The events to sort |
316 | 346 | event_map (dict[str,FrozenEvent]) |
317 | 347 | state_res_store (StateResolutionStore) |
324 | 354 | graph = {} |
325 | 355 | for event_id in event_ids: |
326 | 356 | yield _add_event_and_auth_chain_to_graph( |
327 | graph, event_id, event_map, state_res_store, auth_diff | |
357 | graph, room_id, event_id, event_map, state_res_store, auth_diff | |
328 | 358 | ) |
329 | 359 | |
330 | 360 | event_to_pl = {} |
331 | 361 | for event_id in graph: |
332 | pl = yield _get_power_level_for_sender(event_id, event_map, state_res_store) | |
362 | pl = yield _get_power_level_for_sender( | |
363 | room_id, event_id, event_map, state_res_store | |
364 | ) | |
333 | 365 | event_to_pl[event_id] = pl |
334 | 366 | |
335 | 367 | def _get_power_order(event_id): |
347 | 379 | |
348 | 380 | @defer.inlineCallbacks |
349 | 381 | def _iterative_auth_checks( |
350 | room_version, event_ids, base_state, event_map, state_res_store | |
382 | room_id, room_version, event_ids, base_state, event_map, state_res_store | |
351 | 383 | ): |
352 | 384 | """Sequentially apply auth checks to each event in given list, updating the |
353 | 385 | state as it goes along. |
354 | 386 | |
355 | 387 | Args: |
388 | room_id (str) | |
356 | 389 | room_version (str) |
357 | 390 | event_ids (list[str]): Ordered list of events to apply auth checks to |
358 | 391 | base_state (dict[tuple[str, str], str]): The set of state to start with |
369 | 402 | |
370 | 403 | auth_events = {} |
371 | 404 | for aid in event.auth_event_ids(): |
372 | ev = yield _get_event(aid, event_map, state_res_store) | |
405 | ev = yield _get_event(room_id, aid, event_map, state_res_store) | |
373 | 406 | |
374 | 407 | if ev.rejected_reason is None: |
375 | 408 | auth_events[(ev.type, ev.state_key)] = ev |
377 | 410 | for key in event_auth.auth_types_for_event(event): |
378 | 411 | if key in resolved_state: |
379 | 412 | ev_id = resolved_state[key] |
380 | ev = yield _get_event(ev_id, event_map, state_res_store) | |
413 | ev = yield _get_event(room_id, ev_id, event_map, state_res_store) | |
381 | 414 | |
382 | 415 | if ev.rejected_reason is None: |
383 | 416 | auth_events[key] = event_map[ev_id] |
399 | 432 | |
400 | 433 | |
401 | 434 | @defer.inlineCallbacks |
402 | def _mainline_sort(event_ids, resolved_power_event_id, event_map, state_res_store): | |
435 | def _mainline_sort( | |
436 | room_id, event_ids, resolved_power_event_id, event_map, state_res_store | |
437 | ): | |
403 | 438 | """Returns a sorted list of event_ids sorted by mainline ordering based on |
404 | 439 | the given event resolved_power_event_id |
405 | 440 | |
406 | 441 | Args: |
442 | room_id (str): room we're working in | |
407 | 443 | event_ids (list[str]): Events to sort |
408 | 444 | resolved_power_event_id (str): The final resolved power level event ID |
409 | 445 | event_map (dict[str,FrozenEvent]) |
416 | 452 | pl = resolved_power_event_id |
417 | 453 | while pl: |
418 | 454 | mainline.append(pl) |
419 | pl_ev = yield _get_event(pl, event_map, state_res_store) | |
455 | pl_ev = yield _get_event(room_id, pl, event_map, state_res_store) | |
420 | 456 | auth_events = pl_ev.auth_event_ids() |
421 | 457 | pl = None |
422 | 458 | for aid in auth_events: |
423 | ev = yield _get_event(aid, event_map, state_res_store) | |
459 | ev = yield _get_event(room_id, aid, event_map, state_res_store) | |
424 | 460 | if (ev.type, ev.state_key) == (EventTypes.PowerLevels, ""): |
425 | 461 | pl = aid |
426 | 462 | break |
456 | 492 | Deferred[int] |
457 | 493 | """ |
458 | 494 | |
495 | room_id = event.room_id | |
496 | ||
459 | 497 | # We do an iterative search, replacing `event with the power level in its |
460 | 498 | # auth events (if any) |
461 | 499 | while event: |
467 | 505 | event = None |
468 | 506 | |
469 | 507 | for aid in auth_events: |
470 | aev = yield _get_event(aid, event_map, state_res_store) | |
508 | aev = yield _get_event(room_id, aid, event_map, state_res_store) | |
471 | 509 | if (aev.type, aev.state_key) == (EventTypes.PowerLevels, ""): |
472 | 510 | event = aev |
473 | 511 | break |
477 | 515 | |
478 | 516 | |
479 | 517 | @defer.inlineCallbacks |
480 | def _get_event(event_id, event_map, state_res_store): | |
518 | def _get_event(room_id, event_id, event_map, state_res_store): | |
481 | 519 | """Helper function to look up event in event_map, falling back to looking |
482 | 520 | it up in the store |
483 | 521 | |
484 | 522 | Args: |
523 | room_id (str) | |
485 | 524 | event_id (str) |
486 | 525 | event_map (dict[str,FrozenEvent]) |
487 | 526 | state_res_store (StateResolutionStore) |
492 | 531 | if event_id not in event_map: |
493 | 532 | events = yield state_res_store.get_events([event_id], allow_rejected=True) |
494 | 533 | event_map.update(events) |
495 | return event_map[event_id] | |
534 | event = event_map[event_id] | |
535 | assert event is not None | |
536 | if event.room_id != room_id: | |
537 | raise Exception( | |
538 | "In state res for room %s, event %s is in %s" | |
539 | % (room_id, event_id, event.room_id) | |
540 | ) | |
541 | return event | |
496 | 542 | |
497 | 543 | |
498 | 544 | def lexicographical_topological_sort(graph, key): |
139 | 139 | |
140 | 140 | Args: |
141 | 141 | func (func): Function to execute, should return a deferred or coroutine. |
142 | args (list): List of arguments to pass to func, each invocation of func | |
143 | gets a signle argument. | |
142 | args (Iterable): List of arguments to pass to func, each invocation of func | |
143 | gets a single argument. | |
144 | 144 | limit (int): Maximum number of conccurent executions. |
145 | 145 | |
146 | 146 | Returns: |
51 | 51 | apply_retention_policies=True, |
52 | 52 | ): |
53 | 53 | """ |
54 | Check which events a user is allowed to see | |
54 | Check which events a user is allowed to see. If the user can see the event but its | |
55 | sender asked for their data to be erased, prune the content of the event. | |
55 | 56 | |
56 | 57 | Args: |
57 | 58 | storage |
236 | 236 | |
237 | 237 | config = self.default_config() |
238 | 238 | config["require_auth_for_profile_requests"] = True |
239 | config["limit_profile_requests_to_users_who_share_rooms"] = True | |
239 | 240 | self.hs = self.setup_test_homeserver(config=config) |
240 | 241 | |
241 | 242 | return self.hs |
308 | 309 | def make_homeserver(self, reactor, clock): |
309 | 310 | config = self.default_config() |
310 | 311 | config["require_auth_for_profile_requests"] = True |
312 | config["limit_profile_requests_to_users_who_share_rooms"] = True | |
311 | 313 | self.hs = self.setup_test_homeserver(config=config) |
312 | 314 | |
313 | 315 | return self.hs |
28 | 28 | from synapse.api.constants import EventContentFields, EventTypes, Membership |
29 | 29 | from synapse.handlers.pagination import PurgeStatus |
30 | 30 | from synapse.rest.client.v1 import login, profile, room |
31 | from synapse.rest.client.v2_alpha import account | |
31 | 32 | from synapse.util.stringutils import random_string |
32 | 33 | |
33 | 34 | from tests import unittest |
1596 | 1597 | ) |
1597 | 1598 | |
1598 | 1599 | return event_id |
1600 | ||
1601 | ||
1602 | class ContextTestCase(unittest.HomeserverTestCase): | |
1603 | ||
1604 | servlets = [ | |
1605 | synapse.rest.admin.register_servlets_for_client_rest_resource, | |
1606 | room.register_servlets, | |
1607 | login.register_servlets, | |
1608 | account.register_servlets, | |
1609 | ] | |
1610 | ||
1611 | def prepare(self, reactor, clock, homeserver): | |
1612 | self.user_id = self.register_user("user", "password") | |
1613 | self.tok = self.login("user", "password") | |
1614 | self.room_id = self.helper.create_room_as(self.user_id, tok=self.tok) | |
1615 | ||
1616 | self.other_user_id = self.register_user("user2", "password") | |
1617 | self.other_tok = self.login("user2", "password") | |
1618 | ||
1619 | self.helper.invite(self.room_id, self.user_id, self.other_user_id, tok=self.tok) | |
1620 | self.helper.join(self.room_id, self.other_user_id, tok=self.other_tok) | |
1621 | ||
1622 | def test_erased_sender(self): | |
1623 | """Test that an erasure request results in the requester's events being hidden | |
1624 | from any new member of the room. | |
1625 | """ | |
1626 | ||
1627 | # Send a bunch of events in the room. | |
1628 | ||
1629 | self.helper.send(self.room_id, "message 1", tok=self.tok) | |
1630 | self.helper.send(self.room_id, "message 2", tok=self.tok) | |
1631 | event_id = self.helper.send(self.room_id, "message 3", tok=self.tok)["event_id"] | |
1632 | self.helper.send(self.room_id, "message 4", tok=self.tok) | |
1633 | self.helper.send(self.room_id, "message 5", tok=self.tok) | |
1634 | ||
1635 | # Check that we can still see the messages before the erasure request. | |
1636 | ||
1637 | request, channel = self.make_request( | |
1638 | "GET", | |
1639 | '/rooms/%s/context/%s?filter={"types":["m.room.message"]}' | |
1640 | % (self.room_id, event_id), | |
1641 | access_token=self.tok, | |
1642 | ) | |
1643 | self.render(request) | |
1644 | self.assertEqual(channel.code, 200, channel.result) | |
1645 | ||
1646 | events_before = channel.json_body["events_before"] | |
1647 | ||
1648 | self.assertEqual(len(events_before), 2, events_before) | |
1649 | self.assertEqual( | |
1650 | events_before[0].get("content", {}).get("body"), | |
1651 | "message 2", | |
1652 | events_before[0], | |
1653 | ) | |
1654 | self.assertEqual( | |
1655 | events_before[1].get("content", {}).get("body"), | |
1656 | "message 1", | |
1657 | events_before[1], | |
1658 | ) | |
1659 | ||
1660 | self.assertEqual( | |
1661 | channel.json_body["event"].get("content", {}).get("body"), | |
1662 | "message 3", | |
1663 | channel.json_body["event"], | |
1664 | ) | |
1665 | ||
1666 | events_after = channel.json_body["events_after"] | |
1667 | ||
1668 | self.assertEqual(len(events_after), 2, events_after) | |
1669 | self.assertEqual( | |
1670 | events_after[0].get("content", {}).get("body"), | |
1671 | "message 4", | |
1672 | events_after[0], | |
1673 | ) | |
1674 | self.assertEqual( | |
1675 | events_after[1].get("content", {}).get("body"), | |
1676 | "message 5", | |
1677 | events_after[1], | |
1678 | ) | |
1679 | ||
1680 | # Deactivate the first account and erase the user's data. | |
1681 | ||
1682 | deactivate_account_handler = self.hs.get_deactivate_account_handler() | |
1683 | self.get_success( | |
1684 | deactivate_account_handler.deactivate_account(self.user_id, erase_data=True) | |
1685 | ) | |
1686 | ||
1687 | # Invite another user in the room. This is needed because messages will be | |
1688 | # pruned only if the user wasn't a member of the room when the messages were | |
1689 | # sent. | |
1690 | ||
1691 | invited_user_id = self.register_user("user3", "password") | |
1692 | invited_tok = self.login("user3", "password") | |
1693 | ||
1694 | self.helper.invite( | |
1695 | self.room_id, self.other_user_id, invited_user_id, tok=self.other_tok | |
1696 | ) | |
1697 | self.helper.join(self.room_id, invited_user_id, tok=invited_tok) | |
1698 | ||
1699 | # Check that a user that joined the room after the erasure request can't see | |
1700 | # the messages anymore. | |
1701 | ||
1702 | request, channel = self.make_request( | |
1703 | "GET", | |
1704 | '/rooms/%s/context/%s?filter={"types":["m.room.message"]}' | |
1705 | % (self.room_id, event_id), | |
1706 | access_token=invited_tok, | |
1707 | ) | |
1708 | self.render(request) | |
1709 | self.assertEqual(channel.code, 200, channel.result) | |
1710 | ||
1711 | events_before = channel.json_body["events_before"] | |
1712 | ||
1713 | self.assertEqual(len(events_before), 2, events_before) | |
1714 | self.assertDictEqual(events_before[0].get("content"), {}, events_before[0]) | |
1715 | self.assertDictEqual(events_before[1].get("content"), {}, events_before[1]) | |
1716 | ||
1717 | self.assertDictEqual( | |
1718 | channel.json_body["event"].get("content"), {}, channel.json_body["event"] | |
1719 | ) | |
1720 | ||
1721 | events_after = channel.json_body["events_after"] | |
1722 | ||
1723 | self.assertEqual(len(events_after), 2, events_after) | |
1724 | self.assertDictEqual(events_after[0].get("content"), {}, events_after[0]) | |
1725 | self.assertEqual(events_after[1].get("content"), {}, events_after[1]) |
57 | 57 | self.type = type |
58 | 58 | self.state_key = state_key |
59 | 59 | self.content = content |
60 | self.room_id = ROOM_ID | |
60 | 61 | |
61 | 62 | def to_event(self, auth_events, prev_events): |
62 | 63 | """Given the auth_events and prev_events, convert to a Frozen Event |
417 | 418 | state_before = dict(state_at_event[prev_events[0]]) |
418 | 419 | else: |
419 | 420 | state_d = resolve_events_with_store( |
421 | ROOM_ID, | |
420 | 422 | RoomVersions.V2.identifier, |
421 | 423 | [state_at_event[n] for n in prev_events], |
422 | 424 | event_map=event_map, |
564 | 566 | # Test that we correctly handle passing `None` as the event_map |
565 | 567 | |
566 | 568 | state_d = resolve_events_with_store( |
569 | ROOM_ID, | |
567 | 570 | RoomVersions.V2.identifier, |
568 | 571 | [self.state_at_bob, self.state_at_charlie], |
569 | 572 | event_map=None, |