Imported Upstream version 0.18.1
Erik Johnston
7 years ago
0 | Changes in synapse v0.18.1 (2016-10-0) | |
1 | ====================================== | |
2 | ||
3 | No changes since v0.18.1-rc1 | |
4 | ||
5 | ||
6 | Changes in synapse v0.18.1-rc1 (2016-09-30) | |
7 | =========================================== | |
8 | ||
9 | Features: | |
10 | ||
11 | * Add total_room_count_estimate to ``/publicRooms`` (PR #1133) | |
12 | ||
13 | ||
14 | Changes: | |
15 | ||
16 | * Time out typing over federation (PR #1140) | |
17 | * Restructure LDAP authentication (PR #1153) | |
18 | ||
19 | ||
20 | Bug fixes: | |
21 | ||
22 | * Fix 3pid invites when server is already in the room (PR #1136) | |
23 | * Fix upgrading with SQLite taking lots of CPU for a few days | |
24 | after upgrade (PR #1144) | |
25 | * Fix upgrading from very old database versions (PR #1145) | |
26 | * Fix port script to work with recently added tables (PR #1146) | |
27 | ||
28 | ||
0 | 29 | Changes in synapse v0.18.0 (2016-09-19) |
1 | 30 | ======================================= |
2 | 31 | |
5 | 34 | data in the background. Servers with large SQLite database may experience |
6 | 35 | degradation of performance while this upgrade is in progress, therefore you may |
7 | 36 | want to consider migrating to using Postgres before upgrading very large SQLite |
8 | daabases | |
37 | databases | |
9 | 38 | |
10 | 39 | |
11 | 40 | Changes: |
38 | 38 | "event_edges": ["is_state"], |
39 | 39 | "presence_list": ["accepted"], |
40 | 40 | "presence_stream": ["currently_active"], |
41 | "public_room_list_stream": ["visibility"], | |
41 | 42 | } |
42 | 43 | |
43 | 44 | |
70 | 71 | "event_to_state_groups", |
71 | 72 | "rejections", |
72 | 73 | "event_search", |
74 | "presence_stream", | |
75 | "push_rules_stream", | |
76 | "current_state_resets", | |
77 | "ex_outlier_stream", | |
78 | "cache_invalidation_stream", | |
79 | "public_room_list_stream", | |
80 | "state_group_edges", | |
81 | "stream_ordering_to_exterm", | |
73 | 82 | ] |
74 | 83 | |
75 | 84 |
15 | 15 | """ This is a reference implementation of a Matrix home server. |
16 | 16 | """ |
17 | 17 | |
18 | __version__ = "0.18.0" | |
18 | __version__ = "0.18.1" |
71 | 71 | auth_events = { |
72 | 72 | (e.type, e.state_key): e for e in auth_events.values() |
73 | 73 | } |
74 | self.check(event, auth_events=auth_events, do_sig_check=False) | |
74 | self.check(event, auth_events=auth_events, do_sig_check=do_sig_check) | |
75 | 75 | |
76 | 76 | def check(self, event, auth_events, do_sig_check=True): |
77 | 77 | """ Checks if this event is correctly authed. |
90 | 90 | if not hasattr(event, "room_id"): |
91 | 91 | raise AuthError(500, "Event has no room_id: %s" % event) |
92 | 92 | |
93 | sender_domain = get_domain_from_id(event.sender) | |
94 | ||
95 | # Check the sender's domain has signed the event | |
96 | if do_sig_check and not event.signatures.get(sender_domain): | |
97 | raise AuthError(403, "Event not signed by sending server") | |
93 | if do_sig_check: | |
94 | sender_domain = get_domain_from_id(event.sender) | |
95 | event_id_domain = get_domain_from_id(event.event_id) | |
96 | ||
97 | is_invite_via_3pid = ( | |
98 | event.type == EventTypes.Member | |
99 | and event.membership == Membership.INVITE | |
100 | and "third_party_invite" in event.content | |
101 | ) | |
102 | ||
103 | # Check the sender's domain has signed the event | |
104 | if not event.signatures.get(sender_domain): | |
105 | # We allow invites via 3pid to have a sender from a different | |
106 | # HS, as the sender must match the sender of the original | |
107 | # 3pid invite. This is checked further down with the | |
108 | # other dedicated membership checks. | |
109 | if not is_invite_via_3pid: | |
110 | raise AuthError(403, "Event not signed by sender's server") | |
111 | ||
112 | # Check the event_id's domain has signed the event | |
113 | if not event.signatures.get(event_id_domain): | |
114 | raise AuthError(403, "Event not signed by sending server") | |
98 | 115 | |
99 | 116 | if auth_events is None: |
100 | 117 | # Oh, we don't know what the state of the room was, so we |
488 | 505 | (EventTypes.ThirdPartyInvite, token,) |
489 | 506 | ) |
490 | 507 | if not invite_event: |
508 | return False | |
509 | ||
510 | if invite_event.sender != event.sender: | |
491 | 511 | return False |
492 | 512 | |
493 | 513 | if event.user_id != invite_event.user_id: |
26 | 26 | from synapse.metrics.resource import MetricsResource, METRICS_PREFIX |
27 | 27 | from synapse.rest.client.v2_alpha import sync |
28 | 28 | from synapse.rest.client.v1 import events |
29 | from synapse.rest.client.v1.room import RoomInitialSyncRestServlet | |
30 | from synapse.rest.client.v1.initial_sync import InitialSyncRestServlet | |
29 | 31 | from synapse.replication.slave.storage._base import BaseSlavedStore |
30 | 32 | from synapse.replication.slave.storage.events import SlavedEventStore |
31 | 33 | from synapse.replication.slave.storage.receipts import SlavedReceiptsStore |
36 | 38 | from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore |
37 | 39 | from synapse.replication.slave.storage.presence import SlavedPresenceStore |
38 | 40 | from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore |
41 | from synapse.replication.slave.storage.room import RoomStore | |
39 | 42 | from synapse.server import HomeServer |
40 | 43 | from synapse.storage.client_ips import ClientIpStore |
41 | 44 | from synapse.storage.engines import create_engine |
73 | 76 | SlavedFilteringStore, |
74 | 77 | SlavedPresenceStore, |
75 | 78 | SlavedDeviceInboxStore, |
79 | RoomStore, | |
76 | 80 | BaseSlavedStore, |
77 | 81 | ClientIpStore, # After BaseSlavedStore because the constructor is different |
78 | 82 | ): |
295 | 299 | resource = JsonResource(self, canonical_json=False) |
296 | 300 | sync.register_servlets(self, resource) |
297 | 301 | events.register_servlets(self, resource) |
302 | InitialSyncRestServlet(self).register(resource) | |
303 | RoomInitialSyncRestServlet(self).register(resource) | |
298 | 304 | resources.update({ |
299 | 305 | "/_matrix/client/r0": resource, |
300 | 306 | "/_matrix/client/unstable": resource, |
135 | 135 | |
136 | 136 | sent_edus_counter.inc() |
137 | 137 | |
138 | # TODO, add errback, etc. | |
139 | 138 | self._transaction_queue.enqueue_edu(edu, key=key) |
140 | return defer.succeed(None) | |
141 | 139 | |
142 | 140 | @log_function |
143 | 141 | def send_device_messages(self, destination): |
30 | 30 | |
31 | 31 | try: |
32 | 32 | import ldap3 |
33 | import ldap3.core.exceptions | |
33 | 34 | except ImportError: |
34 | 35 | ldap3 = None |
35 | 36 | pass |
503 | 504 | raise LoginError(403, "", errcode=Codes.FORBIDDEN) |
504 | 505 | defer.returnValue(user_id) |
505 | 506 | |
507 | def _ldap_simple_bind(self, server, localpart, password): | |
508 | """ Attempt a simple bind with the credentials | |
509 | given by the user against the LDAP server. | |
510 | ||
511 | Returns True, LDAP3Connection | |
512 | if the bind was successful | |
513 | Returns False, None | |
514 | if an error occured | |
515 | """ | |
516 | ||
517 | try: | |
518 | # bind with the the local users ldap credentials | |
519 | bind_dn = "{prop}={value},{base}".format( | |
520 | prop=self.ldap_attributes['uid'], | |
521 | value=localpart, | |
522 | base=self.ldap_base | |
523 | ) | |
524 | conn = ldap3.Connection(server, bind_dn, password) | |
525 | logger.debug( | |
526 | "Established LDAP connection in simple bind mode: %s", | |
527 | conn | |
528 | ) | |
529 | ||
530 | if self.ldap_start_tls: | |
531 | conn.start_tls() | |
532 | logger.debug( | |
533 | "Upgraded LDAP connection in simple bind mode through StartTLS: %s", | |
534 | conn | |
535 | ) | |
536 | ||
537 | if conn.bind(): | |
538 | # GOOD: bind okay | |
539 | logger.debug("LDAP Bind successful in simple bind mode.") | |
540 | return True, conn | |
541 | ||
542 | # BAD: bind failed | |
543 | logger.info( | |
544 | "Binding against LDAP failed for '%s' failed: %s", | |
545 | localpart, conn.result['description'] | |
546 | ) | |
547 | conn.unbind() | |
548 | return False, None | |
549 | ||
550 | except ldap3.core.exceptions.LDAPException as e: | |
551 | logger.warn("Error during LDAP authentication: %s", e) | |
552 | return False, None | |
553 | ||
554 | def _ldap_authenticated_search(self, server, localpart, password): | |
555 | """ Attempt to login with the preconfigured bind_dn | |
556 | and then continue searching and filtering within | |
557 | the base_dn | |
558 | ||
559 | Returns (True, LDAP3Connection) | |
560 | if a single matching DN within the base was found | |
561 | that matched the filter expression, and with which | |
562 | a successful bind was achieved | |
563 | ||
564 | The LDAP3Connection returned is the instance that was used to | |
565 | verify the password not the one using the configured bind_dn. | |
566 | Returns (False, None) | |
567 | if an error occured | |
568 | """ | |
569 | ||
570 | try: | |
571 | conn = ldap3.Connection( | |
572 | server, | |
573 | self.ldap_bind_dn, | |
574 | self.ldap_bind_password | |
575 | ) | |
576 | logger.debug( | |
577 | "Established LDAP connection in search mode: %s", | |
578 | conn | |
579 | ) | |
580 | ||
581 | if self.ldap_start_tls: | |
582 | conn.start_tls() | |
583 | logger.debug( | |
584 | "Upgraded LDAP connection in search mode through StartTLS: %s", | |
585 | conn | |
586 | ) | |
587 | ||
588 | if not conn.bind(): | |
589 | logger.warn( | |
590 | "Binding against LDAP with `bind_dn` failed: %s", | |
591 | conn.result['description'] | |
592 | ) | |
593 | conn.unbind() | |
594 | return False, None | |
595 | ||
596 | # construct search_filter like (uid=localpart) | |
597 | query = "({prop}={value})".format( | |
598 | prop=self.ldap_attributes['uid'], | |
599 | value=localpart | |
600 | ) | |
601 | if self.ldap_filter: | |
602 | # combine with the AND expression | |
603 | query = "(&{query}{filter})".format( | |
604 | query=query, | |
605 | filter=self.ldap_filter | |
606 | ) | |
607 | logger.debug( | |
608 | "LDAP search filter: %s", | |
609 | query | |
610 | ) | |
611 | conn.search( | |
612 | search_base=self.ldap_base, | |
613 | search_filter=query | |
614 | ) | |
615 | ||
616 | if len(conn.response) == 1: | |
617 | # GOOD: found exactly one result | |
618 | user_dn = conn.response[0]['dn'] | |
619 | logger.debug('LDAP search found dn: %s', user_dn) | |
620 | ||
621 | # unbind and simple bind with user_dn to verify the password | |
622 | # Note: do not use rebind(), for some reason it did not verify | |
623 | # the password for me! | |
624 | conn.unbind() | |
625 | return self._ldap_simple_bind(server, localpart, password) | |
626 | else: | |
627 | # BAD: found 0 or > 1 results, abort! | |
628 | if len(conn.response) == 0: | |
629 | logger.info( | |
630 | "LDAP search returned no results for '%s'", | |
631 | localpart | |
632 | ) | |
633 | else: | |
634 | logger.info( | |
635 | "LDAP search returned too many (%s) results for '%s'", | |
636 | len(conn.response), localpart | |
637 | ) | |
638 | conn.unbind() | |
639 | return False, None | |
640 | ||
641 | except ldap3.core.exceptions.LDAPException as e: | |
642 | logger.warn("Error during LDAP authentication: %s", e) | |
643 | return False, None | |
644 | ||
506 | 645 | @defer.inlineCallbacks |
507 | 646 | def _check_ldap_password(self, user_id, password): |
508 | 647 | """ Attempt to authenticate a user against an LDAP Server |
515 | 654 | if not ldap3 or not self.ldap_enabled: |
516 | 655 | defer.returnValue(False) |
517 | 656 | |
518 | if self.ldap_mode not in LDAPMode.LIST: | |
519 | raise RuntimeError( | |
520 | 'Invalid ldap mode specified: {mode}'.format( | |
521 | mode=self.ldap_mode | |
522 | ) | |
523 | ) | |
657 | localpart = UserID.from_string(user_id).localpart | |
524 | 658 | |
525 | 659 | try: |
526 | 660 | server = ldap3.Server(self.ldap_uri) |
527 | 661 | logger.debug( |
528 | "Attempting ldap connection with %s", | |
662 | "Attempting LDAP connection with %s", | |
529 | 663 | self.ldap_uri |
530 | 664 | ) |
531 | 665 | |
532 | localpart = UserID.from_string(user_id).localpart | |
533 | 666 | if self.ldap_mode == LDAPMode.SIMPLE: |
534 | # bind with the the local users ldap credentials | |
535 | bind_dn = "{prop}={value},{base}".format( | |
536 | prop=self.ldap_attributes['uid'], | |
537 | value=localpart, | |
538 | base=self.ldap_base | |
539 | ) | |
540 | conn = ldap3.Connection(server, bind_dn, password) | |
667 | result, conn = self._ldap_simple_bind( | |
668 | server=server, localpart=localpart, password=password | |
669 | ) | |
541 | 670 | logger.debug( |
542 | "Established ldap connection in simple mode: %s", | |
671 | 'LDAP authentication method simple bind returned: %s (conn: %s)', | |
672 | result, | |
543 | 673 | conn |
544 | 674 | ) |
545 | ||
546 | if self.ldap_start_tls: | |
547 | conn.start_tls() | |
548 | logger.debug( | |
549 | "Upgraded ldap connection in simple mode through StartTLS: %s", | |
550 | conn | |
675 | if not result: | |
676 | defer.returnValue(False) | |
677 | elif self.ldap_mode == LDAPMode.SEARCH: | |
678 | result, conn = self._ldap_authenticated_search( | |
679 | server=server, localpart=localpart, password=password | |
680 | ) | |
681 | logger.debug( | |
682 | 'LDAP auth method authenticated search returned: %s (conn: %s)', | |
683 | result, | |
684 | conn | |
685 | ) | |
686 | if not result: | |
687 | defer.returnValue(False) | |
688 | else: | |
689 | raise RuntimeError( | |
690 | 'Invalid LDAP mode specified: {mode}'.format( | |
691 | mode=self.ldap_mode | |
551 | 692 | ) |
552 | ||
553 | conn.bind() | |
554 | ||
555 | elif self.ldap_mode == LDAPMode.SEARCH: | |
556 | # connect with preconfigured credentials and search for local user | |
557 | conn = ldap3.Connection( | |
558 | server, | |
559 | self.ldap_bind_dn, | |
560 | self.ldap_bind_password | |
561 | ) | |
562 | logger.debug( | |
563 | "Established ldap connection in search mode: %s", | |
693 | ) | |
694 | ||
695 | try: | |
696 | logger.info( | |
697 | "User authenticated against LDAP server: %s", | |
564 | 698 | conn |
565 | 699 | ) |
566 | ||
567 | if self.ldap_start_tls: | |
568 | conn.start_tls() | |
569 | logger.debug( | |
570 | "Upgraded ldap connection in search mode through StartTLS: %s", | |
571 | conn | |
572 | ) | |
573 | ||
574 | conn.bind() | |
575 | ||
576 | # find matching dn | |
577 | query = "({prop}={value})".format( | |
578 | prop=self.ldap_attributes['uid'], | |
579 | value=localpart | |
580 | ) | |
581 | if self.ldap_filter: | |
582 | query = "(&{query}{filter})".format( | |
583 | query=query, | |
584 | filter=self.ldap_filter | |
585 | ) | |
586 | logger.debug("ldap search filter: %s", query) | |
587 | result = conn.search(self.ldap_base, query) | |
588 | ||
589 | if result and len(conn.response) == 1: | |
590 | # found exactly one result | |
591 | user_dn = conn.response[0]['dn'] | |
592 | logger.debug('ldap search found dn: %s', user_dn) | |
593 | ||
594 | # unbind and reconnect, rebind with found dn | |
595 | conn.unbind() | |
596 | conn = ldap3.Connection( | |
597 | server, | |
598 | user_dn, | |
599 | password, | |
600 | auto_bind=True | |
601 | ) | |
602 | else: | |
603 | # found 0 or > 1 results, abort! | |
604 | logger.warn( | |
605 | "ldap search returned unexpected (%d!=1) amount of results", | |
606 | len(conn.response) | |
607 | ) | |
608 | defer.returnValue(False) | |
609 | ||
610 | logger.info( | |
611 | "User authenticated against ldap server: %s", | |
612 | conn | |
613 | ) | |
614 | ||
615 | # check for existing account, if none exists, create one | |
616 | if not (yield self.check_user_exists(user_id)): | |
617 | # query user metadata for account creation | |
700 | except NameError: | |
701 | logger.warn("Authentication method yielded no LDAP connection, aborting!") | |
702 | defer.returnValue(False) | |
703 | ||
704 | # check if user with user_id exists | |
705 | if (yield self.check_user_exists(user_id)): | |
706 | # exists, authentication complete | |
707 | conn.unbind() | |
708 | defer.returnValue(True) | |
709 | ||
710 | else: | |
711 | # does not exist, fetch metadata for account creation from | |
712 | # existing ldap connection | |
618 | 713 | query = "({prop}={value})".format( |
619 | 714 | prop=self.ldap_attributes['uid'], |
620 | 715 | value=localpart |
625 | 720 | filter=query, |
626 | 721 | user_filter=self.ldap_filter |
627 | 722 | ) |
628 | logger.debug("ldap registration filter: %s", query) | |
629 | ||
630 | result = conn.search( | |
723 | logger.debug( | |
724 | "ldap registration filter: %s", | |
725 | query | |
726 | ) | |
727 | ||
728 | conn.search( | |
631 | 729 | search_base=self.ldap_base, |
632 | 730 | search_filter=query, |
633 | 731 | attributes=[ |
650 | 748 | # TODO: bind email, set displayname with data from ldap directory |
651 | 749 | |
652 | 750 | logger.info( |
653 | "ldap registration successful: %d: %s (%s, %)", | |
751 | "Registration based on LDAP data was successful: %d: %s (%s, %)", | |
654 | 752 | user_id, |
655 | 753 | localpart, |
656 | 754 | name, |
657 | 755 | |
658 | 756 | ) |
757 | ||
758 | defer.returnValue(True) | |
659 | 759 | else: |
660 | logger.warn( | |
661 | "ldap registration failed: unexpected (%d!=1) amount of results", | |
662 | len(conn.response) | |
663 | ) | |
760 | if len(conn.response) == 0: | |
761 | logger.warn("LDAP registration failed, no result.") | |
762 | else: | |
763 | logger.warn( | |
764 | "LDAP registration failed, too many results (%s)", | |
765 | len(conn.response) | |
766 | ) | |
767 | ||
664 | 768 | defer.returnValue(False) |
665 | 769 | |
666 | defer.returnValue(True) | |
770 | defer.returnValue(False) | |
771 | ||
667 | 772 | except ldap3.core.exceptions.LDAPException as e: |
668 | 773 | logger.warn("Error during ldap authentication: %s", e) |
669 | 774 | defer.returnValue(False) |
1921 | 1921 | original_invite = yield self.store.get_event( |
1922 | 1922 | original_invite_id, allow_none=True |
1923 | 1923 | ) |
1924 | if not original_invite: | |
1924 | if original_invite: | |
1925 | display_name = original_invite.content["display_name"] | |
1926 | event_dict["content"]["third_party_invite"]["display_name"] = display_name | |
1927 | else: | |
1925 | 1928 | logger.info( |
1926 | "Could not find invite event for third_party_invite - " | |
1927 | "discarding: %s" % (event_dict,) | |
1928 | ) | |
1929 | return | |
1930 | ||
1931 | display_name = original_invite.content["display_name"] | |
1932 | event_dict["content"]["third_party_invite"]["display_name"] = display_name | |
1929 | "Could not find invite event for third_party_invite: %r", | |
1930 | event_dict | |
1931 | ) | |
1932 | # We don't discard here as this is not the appropriate place to do | |
1933 | # auth checks. If we need the invite and don't have it then the | |
1934 | # auth check code will explode appropriately. | |
1935 | ||
1933 | 1936 | builder = self.event_builder_factory.new(event_dict) |
1934 | 1937 | EventValidator().validate_new(builder) |
1935 | 1938 | message_handler = self.hs.get_handlers().message_handler |
0 | # -*- coding: utf-8 -*- | |
1 | # Copyright 2016 OpenMarket Ltd | |
2 | # | |
3 | # Licensed under the Apache License, Version 2.0 (the "License"); | |
4 | # you may not use this file except in compliance with the License. | |
5 | # You may obtain a copy of the License at | |
6 | # | |
7 | # http://www.apache.org/licenses/LICENSE-2.0 | |
8 | # | |
9 | # Unless required by applicable law or agreed to in writing, software | |
10 | # distributed under the License is distributed on an "AS IS" BASIS, | |
11 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
12 | # See the License for the specific language governing permissions and | |
13 | # limitations under the License. | |
14 | ||
15 | from twisted.internet import defer | |
16 | ||
17 | from synapse.api.constants import EventTypes, Membership | |
18 | from synapse.api.errors import AuthError, Codes | |
19 | from synapse.events.utils import serialize_event | |
20 | from synapse.events.validator import EventValidator | |
21 | from synapse.streams.config import PaginationConfig | |
22 | from synapse.types import ( | |
23 | UserID, StreamToken, | |
24 | ) | |
25 | from synapse.util import unwrapFirstError | |
26 | from synapse.util.async import concurrently_execute | |
27 | from synapse.util.caches.snapshot_cache import SnapshotCache | |
28 | from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred | |
29 | from synapse.visibility import filter_events_for_client | |
30 | ||
31 | from ._base import BaseHandler | |
32 | ||
33 | import logging | |
34 | ||
35 | ||
36 | logger = logging.getLogger(__name__) | |
37 | ||
38 | ||
39 | class InitialSyncHandler(BaseHandler): | |
40 | def __init__(self, hs): | |
41 | super(InitialSyncHandler, self).__init__(hs) | |
42 | self.hs = hs | |
43 | self.state = hs.get_state_handler() | |
44 | self.clock = hs.get_clock() | |
45 | self.validator = EventValidator() | |
46 | self.snapshot_cache = SnapshotCache() | |
47 | ||
48 | def snapshot_all_rooms(self, user_id=None, pagin_config=None, | |
49 | as_client_event=True, include_archived=False): | |
50 | """Retrieve a snapshot of all rooms the user is invited or has joined. | |
51 | ||
52 | This snapshot may include messages for all rooms where the user is | |
53 | joined, depending on the pagination config. | |
54 | ||
55 | Args: | |
56 | user_id (str): The ID of the user making the request. | |
57 | pagin_config (synapse.api.streams.PaginationConfig): The pagination | |
58 | config used to determine how many messages *PER ROOM* to return. | |
59 | as_client_event (bool): True to get events in client-server format. | |
60 | include_archived (bool): True to get rooms that the user has left | |
61 | Returns: | |
62 | A list of dicts with "room_id" and "membership" keys for all rooms | |
63 | the user is currently invited or joined in on. Rooms where the user | |
64 | is joined on, may return a "messages" key with messages, depending | |
65 | on the specified PaginationConfig. | |
66 | """ | |
67 | key = ( | |
68 | user_id, | |
69 | pagin_config.from_token, | |
70 | pagin_config.to_token, | |
71 | pagin_config.direction, | |
72 | pagin_config.limit, | |
73 | as_client_event, | |
74 | include_archived, | |
75 | ) | |
76 | now_ms = self.clock.time_msec() | |
77 | result = self.snapshot_cache.get(now_ms, key) | |
78 | if result is not None: | |
79 | return result | |
80 | ||
81 | return self.snapshot_cache.set(now_ms, key, self._snapshot_all_rooms( | |
82 | user_id, pagin_config, as_client_event, include_archived | |
83 | )) | |
84 | ||
85 | @defer.inlineCallbacks | |
86 | def _snapshot_all_rooms(self, user_id=None, pagin_config=None, | |
87 | as_client_event=True, include_archived=False): | |
88 | ||
89 | memberships = [Membership.INVITE, Membership.JOIN] | |
90 | if include_archived: | |
91 | memberships.append(Membership.LEAVE) | |
92 | ||
93 | room_list = yield self.store.get_rooms_for_user_where_membership_is( | |
94 | user_id=user_id, membership_list=memberships | |
95 | ) | |
96 | ||
97 | user = UserID.from_string(user_id) | |
98 | ||
99 | rooms_ret = [] | |
100 | ||
101 | now_token = yield self.hs.get_event_sources().get_current_token() | |
102 | ||
103 | presence_stream = self.hs.get_event_sources().sources["presence"] | |
104 | pagination_config = PaginationConfig(from_token=now_token) | |
105 | presence, _ = yield presence_stream.get_pagination_rows( | |
106 | user, pagination_config.get_source_config("presence"), None | |
107 | ) | |
108 | ||
109 | receipt_stream = self.hs.get_event_sources().sources["receipt"] | |
110 | receipt, _ = yield receipt_stream.get_pagination_rows( | |
111 | user, pagination_config.get_source_config("receipt"), None | |
112 | ) | |
113 | ||
114 | tags_by_room = yield self.store.get_tags_for_user(user_id) | |
115 | ||
116 | account_data, account_data_by_room = ( | |
117 | yield self.store.get_account_data_for_user(user_id) | |
118 | ) | |
119 | ||
120 | public_room_ids = yield self.store.get_public_room_ids() | |
121 | ||
122 | limit = pagin_config.limit | |
123 | if limit is None: | |
124 | limit = 10 | |
125 | ||
126 | @defer.inlineCallbacks | |
127 | def handle_room(event): | |
128 | d = { | |
129 | "room_id": event.room_id, | |
130 | "membership": event.membership, | |
131 | "visibility": ( | |
132 | "public" if event.room_id in public_room_ids | |
133 | else "private" | |
134 | ), | |
135 | } | |
136 | ||
137 | if event.membership == Membership.INVITE: | |
138 | time_now = self.clock.time_msec() | |
139 | d["inviter"] = event.sender | |
140 | ||
141 | invite_event = yield self.store.get_event(event.event_id) | |
142 | d["invite"] = serialize_event(invite_event, time_now, as_client_event) | |
143 | ||
144 | rooms_ret.append(d) | |
145 | ||
146 | if event.membership not in (Membership.JOIN, Membership.LEAVE): | |
147 | return | |
148 | ||
149 | try: | |
150 | if event.membership == Membership.JOIN: | |
151 | room_end_token = now_token.room_key | |
152 | deferred_room_state = self.state_handler.get_current_state( | |
153 | event.room_id | |
154 | ) | |
155 | elif event.membership == Membership.LEAVE: | |
156 | room_end_token = "s%d" % (event.stream_ordering,) | |
157 | deferred_room_state = self.store.get_state_for_events( | |
158 | [event.event_id], None | |
159 | ) | |
160 | deferred_room_state.addCallback( | |
161 | lambda states: states[event.event_id] | |
162 | ) | |
163 | ||
164 | (messages, token), current_state = yield preserve_context_over_deferred( | |
165 | defer.gatherResults( | |
166 | [ | |
167 | preserve_fn(self.store.get_recent_events_for_room)( | |
168 | event.room_id, | |
169 | limit=limit, | |
170 | end_token=room_end_token, | |
171 | ), | |
172 | deferred_room_state, | |
173 | ] | |
174 | ) | |
175 | ).addErrback(unwrapFirstError) | |
176 | ||
177 | messages = yield filter_events_for_client( | |
178 | self.store, user_id, messages | |
179 | ) | |
180 | ||
181 | start_token = now_token.copy_and_replace("room_key", token[0]) | |
182 | end_token = now_token.copy_and_replace("room_key", token[1]) | |
183 | time_now = self.clock.time_msec() | |
184 | ||
185 | d["messages"] = { | |
186 | "chunk": [ | |
187 | serialize_event(m, time_now, as_client_event) | |
188 | for m in messages | |
189 | ], | |
190 | "start": start_token.to_string(), | |
191 | "end": end_token.to_string(), | |
192 | } | |
193 | ||
194 | d["state"] = [ | |
195 | serialize_event(c, time_now, as_client_event) | |
196 | for c in current_state.values() | |
197 | ] | |
198 | ||
199 | account_data_events = [] | |
200 | tags = tags_by_room.get(event.room_id) | |
201 | if tags: | |
202 | account_data_events.append({ | |
203 | "type": "m.tag", | |
204 | "content": {"tags": tags}, | |
205 | }) | |
206 | ||
207 | account_data = account_data_by_room.get(event.room_id, {}) | |
208 | for account_data_type, content in account_data.items(): | |
209 | account_data_events.append({ | |
210 | "type": account_data_type, | |
211 | "content": content, | |
212 | }) | |
213 | ||
214 | d["account_data"] = account_data_events | |
215 | except: | |
216 | logger.exception("Failed to get snapshot") | |
217 | ||
218 | yield concurrently_execute(handle_room, room_list, 10) | |
219 | ||
220 | account_data_events = [] | |
221 | for account_data_type, content in account_data.items(): | |
222 | account_data_events.append({ | |
223 | "type": account_data_type, | |
224 | "content": content, | |
225 | }) | |
226 | ||
227 | ret = { | |
228 | "rooms": rooms_ret, | |
229 | "presence": presence, | |
230 | "account_data": account_data_events, | |
231 | "receipts": receipt, | |
232 | "end": now_token.to_string(), | |
233 | } | |
234 | ||
235 | defer.returnValue(ret) | |
236 | ||
237 | @defer.inlineCallbacks | |
238 | def room_initial_sync(self, requester, room_id, pagin_config=None): | |
239 | """Capture the a snapshot of a room. If user is currently a member of | |
240 | the room this will be what is currently in the room. If the user left | |
241 | the room this will be what was in the room when they left. | |
242 | ||
243 | Args: | |
244 | requester(Requester): The user to get a snapshot for. | |
245 | room_id(str): The room to get a snapshot of. | |
246 | pagin_config(synapse.streams.config.PaginationConfig): | |
247 | The pagination config used to determine how many messages to | |
248 | return. | |
249 | Raises: | |
250 | AuthError if the user wasn't in the room. | |
251 | Returns: | |
252 | A JSON serialisable dict with the snapshot of the room. | |
253 | """ | |
254 | ||
255 | user_id = requester.user.to_string() | |
256 | ||
257 | membership, member_event_id = yield self._check_in_room_or_world_readable( | |
258 | room_id, user_id, | |
259 | ) | |
260 | is_peeking = member_event_id is None | |
261 | ||
262 | if membership == Membership.JOIN: | |
263 | result = yield self._room_initial_sync_joined( | |
264 | user_id, room_id, pagin_config, membership, is_peeking | |
265 | ) | |
266 | elif membership == Membership.LEAVE: | |
267 | result = yield self._room_initial_sync_parted( | |
268 | user_id, room_id, pagin_config, membership, member_event_id, is_peeking | |
269 | ) | |
270 | ||
271 | account_data_events = [] | |
272 | tags = yield self.store.get_tags_for_room(user_id, room_id) | |
273 | if tags: | |
274 | account_data_events.append({ | |
275 | "type": "m.tag", | |
276 | "content": {"tags": tags}, | |
277 | }) | |
278 | ||
279 | account_data = yield self.store.get_account_data_for_room(user_id, room_id) | |
280 | for account_data_type, content in account_data.items(): | |
281 | account_data_events.append({ | |
282 | "type": account_data_type, | |
283 | "content": content, | |
284 | }) | |
285 | ||
286 | result["account_data"] = account_data_events | |
287 | ||
288 | defer.returnValue(result) | |
289 | ||
290 | @defer.inlineCallbacks | |
291 | def _room_initial_sync_parted(self, user_id, room_id, pagin_config, | |
292 | membership, member_event_id, is_peeking): | |
293 | room_state = yield self.store.get_state_for_events( | |
294 | [member_event_id], None | |
295 | ) | |
296 | ||
297 | room_state = room_state[member_event_id] | |
298 | ||
299 | limit = pagin_config.limit if pagin_config else None | |
300 | if limit is None: | |
301 | limit = 10 | |
302 | ||
303 | stream_token = yield self.store.get_stream_token_for_event( | |
304 | member_event_id | |
305 | ) | |
306 | ||
307 | messages, token = yield self.store.get_recent_events_for_room( | |
308 | room_id, | |
309 | limit=limit, | |
310 | end_token=stream_token | |
311 | ) | |
312 | ||
313 | messages = yield filter_events_for_client( | |
314 | self.store, user_id, messages, is_peeking=is_peeking | |
315 | ) | |
316 | ||
317 | start_token = StreamToken.START.copy_and_replace("room_key", token[0]) | |
318 | end_token = StreamToken.START.copy_and_replace("room_key", token[1]) | |
319 | ||
320 | time_now = self.clock.time_msec() | |
321 | ||
322 | defer.returnValue({ | |
323 | "membership": membership, | |
324 | "room_id": room_id, | |
325 | "messages": { | |
326 | "chunk": [serialize_event(m, time_now) for m in messages], | |
327 | "start": start_token.to_string(), | |
328 | "end": end_token.to_string(), | |
329 | }, | |
330 | "state": [serialize_event(s, time_now) for s in room_state.values()], | |
331 | "presence": [], | |
332 | "receipts": [], | |
333 | }) | |
334 | ||
335 | @defer.inlineCallbacks | |
336 | def _room_initial_sync_joined(self, user_id, room_id, pagin_config, | |
337 | membership, is_peeking): | |
338 | current_state = yield self.state.get_current_state( | |
339 | room_id=room_id, | |
340 | ) | |
341 | ||
342 | # TODO: These concurrently | |
343 | time_now = self.clock.time_msec() | |
344 | state = [ | |
345 | serialize_event(x, time_now) | |
346 | for x in current_state.values() | |
347 | ] | |
348 | ||
349 | now_token = yield self.hs.get_event_sources().get_current_token() | |
350 | ||
351 | limit = pagin_config.limit if pagin_config else None | |
352 | if limit is None: | |
353 | limit = 10 | |
354 | ||
355 | room_members = [ | |
356 | m for m in current_state.values() | |
357 | if m.type == EventTypes.Member | |
358 | and m.content["membership"] == Membership.JOIN | |
359 | ] | |
360 | ||
361 | presence_handler = self.hs.get_presence_handler() | |
362 | ||
363 | @defer.inlineCallbacks | |
364 | def get_presence(): | |
365 | states = yield presence_handler.get_states( | |
366 | [m.user_id for m in room_members], | |
367 | as_event=True, | |
368 | ) | |
369 | ||
370 | defer.returnValue(states) | |
371 | ||
372 | @defer.inlineCallbacks | |
373 | def get_receipts(): | |
374 | receipts_handler = self.hs.get_handlers().receipts_handler | |
375 | receipts = yield receipts_handler.get_receipts_for_room( | |
376 | room_id, | |
377 | now_token.receipt_key | |
378 | ) | |
379 | defer.returnValue(receipts) | |
380 | ||
381 | presence, receipts, (messages, token) = yield defer.gatherResults( | |
382 | [ | |
383 | preserve_fn(get_presence)(), | |
384 | preserve_fn(get_receipts)(), | |
385 | preserve_fn(self.store.get_recent_events_for_room)( | |
386 | room_id, | |
387 | limit=limit, | |
388 | end_token=now_token.room_key, | |
389 | ) | |
390 | ], | |
391 | consumeErrors=True, | |
392 | ).addErrback(unwrapFirstError) | |
393 | ||
394 | messages = yield filter_events_for_client( | |
395 | self.store, user_id, messages, is_peeking=is_peeking, | |
396 | ) | |
397 | ||
398 | start_token = now_token.copy_and_replace("room_key", token[0]) | |
399 | end_token = now_token.copy_and_replace("room_key", token[1]) | |
400 | ||
401 | time_now = self.clock.time_msec() | |
402 | ||
403 | ret = { | |
404 | "room_id": room_id, | |
405 | "messages": { | |
406 | "chunk": [serialize_event(m, time_now) for m in messages], | |
407 | "start": start_token.to_string(), | |
408 | "end": end_token.to_string(), | |
409 | }, | |
410 | "state": state, | |
411 | "presence": presence, | |
412 | "receipts": receipts, | |
413 | } | |
414 | if not is_peeking: | |
415 | ret["membership"] = membership | |
416 | ||
417 | defer.returnValue(ret) | |
418 | ||
419 | @defer.inlineCallbacks | |
420 | def _check_in_room_or_world_readable(self, room_id, user_id): | |
421 | try: | |
422 | # check_user_was_in_room will return the most recent membership | |
423 | # event for the user if: | |
424 | # * The user is a non-guest user, and was ever in the room | |
425 | # * The user is a guest user, and has joined the room | |
426 | # else it will throw. | |
427 | member_event = yield self.auth.check_user_was_in_room(room_id, user_id) | |
428 | defer.returnValue((member_event.membership, member_event.event_id)) | |
429 | return | |
430 | except AuthError: | |
431 | visibility = yield self.state_handler.get_current_state( | |
432 | room_id, EventTypes.RoomHistoryVisibility, "" | |
433 | ) | |
434 | if ( | |
435 | visibility and | |
436 | visibility.content["history_visibility"] == "world_readable" | |
437 | ): | |
438 | defer.returnValue((Membership.JOIN, None)) | |
439 | return | |
440 | raise AuthError( | |
441 | 403, "Guest access not allowed", errcode=Codes.GUEST_ACCESS_FORBIDDEN | |
442 | ) |
20 | 20 | from synapse.events.utils import serialize_event |
21 | 21 | from synapse.events.validator import EventValidator |
22 | 22 | from synapse.push.action_generator import ActionGenerator |
23 | from synapse.streams.config import PaginationConfig | |
24 | 23 | from synapse.types import ( |
25 | UserID, RoomAlias, RoomStreamToken, StreamToken, get_domain_from_id | |
24 | UserID, RoomAlias, RoomStreamToken, get_domain_from_id | |
26 | 25 | ) |
27 | from synapse.util import unwrapFirstError | |
28 | from synapse.util.async import concurrently_execute, run_on_reactor, ReadWriteLock | |
29 | from synapse.util.caches.snapshot_cache import SnapshotCache | |
30 | from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred | |
26 | from synapse.util.async import run_on_reactor, ReadWriteLock | |
27 | from synapse.util.logcontext import preserve_fn | |
31 | 28 | from synapse.util.metrics import measure_func |
32 | 29 | from synapse.visibility import filter_events_for_client |
33 | 30 | |
48 | 45 | self.state = hs.get_state_handler() |
49 | 46 | self.clock = hs.get_clock() |
50 | 47 | self.validator = EventValidator() |
51 | self.snapshot_cache = SnapshotCache() | |
52 | 48 | |
53 | 49 | self.pagination_lock = ReadWriteLock() |
54 | 50 | |
390 | 386 | defer.returnValue( |
391 | 387 | [serialize_event(c, now) for c in room_state.values()] |
392 | 388 | ) |
393 | ||
394 | def snapshot_all_rooms(self, user_id=None, pagin_config=None, | |
395 | as_client_event=True, include_archived=False): | |
396 | """Retrieve a snapshot of all rooms the user is invited or has joined. | |
397 | ||
398 | This snapshot may include messages for all rooms where the user is | |
399 | joined, depending on the pagination config. | |
400 | ||
401 | Args: | |
402 | user_id (str): The ID of the user making the request. | |
403 | pagin_config (synapse.api.streams.PaginationConfig): The pagination | |
404 | config used to determine how many messages *PER ROOM* to return. | |
405 | as_client_event (bool): True to get events in client-server format. | |
406 | include_archived (bool): True to get rooms that the user has left | |
407 | Returns: | |
408 | A list of dicts with "room_id" and "membership" keys for all rooms | |
409 | the user is currently invited or joined in on. Rooms where the user | |
410 | is joined on, may return a "messages" key with messages, depending | |
411 | on the specified PaginationConfig. | |
412 | """ | |
413 | key = ( | |
414 | user_id, | |
415 | pagin_config.from_token, | |
416 | pagin_config.to_token, | |
417 | pagin_config.direction, | |
418 | pagin_config.limit, | |
419 | as_client_event, | |
420 | include_archived, | |
421 | ) | |
422 | now_ms = self.clock.time_msec() | |
423 | result = self.snapshot_cache.get(now_ms, key) | |
424 | if result is not None: | |
425 | return result | |
426 | ||
427 | return self.snapshot_cache.set(now_ms, key, self._snapshot_all_rooms( | |
428 | user_id, pagin_config, as_client_event, include_archived | |
429 | )) | |
430 | ||
431 | @defer.inlineCallbacks | |
432 | def _snapshot_all_rooms(self, user_id=None, pagin_config=None, | |
433 | as_client_event=True, include_archived=False): | |
434 | ||
435 | memberships = [Membership.INVITE, Membership.JOIN] | |
436 | if include_archived: | |
437 | memberships.append(Membership.LEAVE) | |
438 | ||
439 | room_list = yield self.store.get_rooms_for_user_where_membership_is( | |
440 | user_id=user_id, membership_list=memberships | |
441 | ) | |
442 | ||
443 | user = UserID.from_string(user_id) | |
444 | ||
445 | rooms_ret = [] | |
446 | ||
447 | now_token = yield self.hs.get_event_sources().get_current_token() | |
448 | ||
449 | presence_stream = self.hs.get_event_sources().sources["presence"] | |
450 | pagination_config = PaginationConfig(from_token=now_token) | |
451 | presence, _ = yield presence_stream.get_pagination_rows( | |
452 | user, pagination_config.get_source_config("presence"), None | |
453 | ) | |
454 | ||
455 | receipt_stream = self.hs.get_event_sources().sources["receipt"] | |
456 | receipt, _ = yield receipt_stream.get_pagination_rows( | |
457 | user, pagination_config.get_source_config("receipt"), None | |
458 | ) | |
459 | ||
460 | tags_by_room = yield self.store.get_tags_for_user(user_id) | |
461 | ||
462 | account_data, account_data_by_room = ( | |
463 | yield self.store.get_account_data_for_user(user_id) | |
464 | ) | |
465 | ||
466 | public_room_ids = yield self.store.get_public_room_ids() | |
467 | ||
468 | limit = pagin_config.limit | |
469 | if limit is None: | |
470 | limit = 10 | |
471 | ||
472 | @defer.inlineCallbacks | |
473 | def handle_room(event): | |
474 | d = { | |
475 | "room_id": event.room_id, | |
476 | "membership": event.membership, | |
477 | "visibility": ( | |
478 | "public" if event.room_id in public_room_ids | |
479 | else "private" | |
480 | ), | |
481 | } | |
482 | ||
483 | if event.membership == Membership.INVITE: | |
484 | time_now = self.clock.time_msec() | |
485 | d["inviter"] = event.sender | |
486 | ||
487 | invite_event = yield self.store.get_event(event.event_id) | |
488 | d["invite"] = serialize_event(invite_event, time_now, as_client_event) | |
489 | ||
490 | rooms_ret.append(d) | |
491 | ||
492 | if event.membership not in (Membership.JOIN, Membership.LEAVE): | |
493 | return | |
494 | ||
495 | try: | |
496 | if event.membership == Membership.JOIN: | |
497 | room_end_token = now_token.room_key | |
498 | deferred_room_state = self.state_handler.get_current_state( | |
499 | event.room_id | |
500 | ) | |
501 | elif event.membership == Membership.LEAVE: | |
502 | room_end_token = "s%d" % (event.stream_ordering,) | |
503 | deferred_room_state = self.store.get_state_for_events( | |
504 | [event.event_id], None | |
505 | ) | |
506 | deferred_room_state.addCallback( | |
507 | lambda states: states[event.event_id] | |
508 | ) | |
509 | ||
510 | (messages, token), current_state = yield preserve_context_over_deferred( | |
511 | defer.gatherResults( | |
512 | [ | |
513 | preserve_fn(self.store.get_recent_events_for_room)( | |
514 | event.room_id, | |
515 | limit=limit, | |
516 | end_token=room_end_token, | |
517 | ), | |
518 | deferred_room_state, | |
519 | ] | |
520 | ) | |
521 | ).addErrback(unwrapFirstError) | |
522 | ||
523 | messages = yield filter_events_for_client( | |
524 | self.store, user_id, messages | |
525 | ) | |
526 | ||
527 | start_token = now_token.copy_and_replace("room_key", token[0]) | |
528 | end_token = now_token.copy_and_replace("room_key", token[1]) | |
529 | time_now = self.clock.time_msec() | |
530 | ||
531 | d["messages"] = { | |
532 | "chunk": [ | |
533 | serialize_event(m, time_now, as_client_event) | |
534 | for m in messages | |
535 | ], | |
536 | "start": start_token.to_string(), | |
537 | "end": end_token.to_string(), | |
538 | } | |
539 | ||
540 | d["state"] = [ | |
541 | serialize_event(c, time_now, as_client_event) | |
542 | for c in current_state.values() | |
543 | ] | |
544 | ||
545 | account_data_events = [] | |
546 | tags = tags_by_room.get(event.room_id) | |
547 | if tags: | |
548 | account_data_events.append({ | |
549 | "type": "m.tag", | |
550 | "content": {"tags": tags}, | |
551 | }) | |
552 | ||
553 | account_data = account_data_by_room.get(event.room_id, {}) | |
554 | for account_data_type, content in account_data.items(): | |
555 | account_data_events.append({ | |
556 | "type": account_data_type, | |
557 | "content": content, | |
558 | }) | |
559 | ||
560 | d["account_data"] = account_data_events | |
561 | except: | |
562 | logger.exception("Failed to get snapshot") | |
563 | ||
564 | yield concurrently_execute(handle_room, room_list, 10) | |
565 | ||
566 | account_data_events = [] | |
567 | for account_data_type, content in account_data.items(): | |
568 | account_data_events.append({ | |
569 | "type": account_data_type, | |
570 | "content": content, | |
571 | }) | |
572 | ||
573 | ret = { | |
574 | "rooms": rooms_ret, | |
575 | "presence": presence, | |
576 | "account_data": account_data_events, | |
577 | "receipts": receipt, | |
578 | "end": now_token.to_string(), | |
579 | } | |
580 | ||
581 | defer.returnValue(ret) | |
582 | ||
583 | @defer.inlineCallbacks | |
584 | def room_initial_sync(self, requester, room_id, pagin_config=None): | |
585 | """Capture the a snapshot of a room. If user is currently a member of | |
586 | the room this will be what is currently in the room. If the user left | |
587 | the room this will be what was in the room when they left. | |
588 | ||
589 | Args: | |
590 | requester(Requester): The user to get a snapshot for. | |
591 | room_id(str): The room to get a snapshot of. | |
592 | pagin_config(synapse.streams.config.PaginationConfig): | |
593 | The pagination config used to determine how many messages to | |
594 | return. | |
595 | Raises: | |
596 | AuthError if the user wasn't in the room. | |
597 | Returns: | |
598 | A JSON serialisable dict with the snapshot of the room. | |
599 | """ | |
600 | ||
601 | user_id = requester.user.to_string() | |
602 | ||
603 | membership, member_event_id = yield self._check_in_room_or_world_readable( | |
604 | room_id, user_id, | |
605 | ) | |
606 | is_peeking = member_event_id is None | |
607 | ||
608 | if membership == Membership.JOIN: | |
609 | result = yield self._room_initial_sync_joined( | |
610 | user_id, room_id, pagin_config, membership, is_peeking | |
611 | ) | |
612 | elif membership == Membership.LEAVE: | |
613 | result = yield self._room_initial_sync_parted( | |
614 | user_id, room_id, pagin_config, membership, member_event_id, is_peeking | |
615 | ) | |
616 | ||
617 | account_data_events = [] | |
618 | tags = yield self.store.get_tags_for_room(user_id, room_id) | |
619 | if tags: | |
620 | account_data_events.append({ | |
621 | "type": "m.tag", | |
622 | "content": {"tags": tags}, | |
623 | }) | |
624 | ||
625 | account_data = yield self.store.get_account_data_for_room(user_id, room_id) | |
626 | for account_data_type, content in account_data.items(): | |
627 | account_data_events.append({ | |
628 | "type": account_data_type, | |
629 | "content": content, | |
630 | }) | |
631 | ||
632 | result["account_data"] = account_data_events | |
633 | ||
634 | defer.returnValue(result) | |
635 | ||
636 | @defer.inlineCallbacks | |
637 | def _room_initial_sync_parted(self, user_id, room_id, pagin_config, | |
638 | membership, member_event_id, is_peeking): | |
639 | room_state = yield self.store.get_state_for_events( | |
640 | [member_event_id], None | |
641 | ) | |
642 | ||
643 | room_state = room_state[member_event_id] | |
644 | ||
645 | limit = pagin_config.limit if pagin_config else None | |
646 | if limit is None: | |
647 | limit = 10 | |
648 | ||
649 | stream_token = yield self.store.get_stream_token_for_event( | |
650 | member_event_id | |
651 | ) | |
652 | ||
653 | messages, token = yield self.store.get_recent_events_for_room( | |
654 | room_id, | |
655 | limit=limit, | |
656 | end_token=stream_token | |
657 | ) | |
658 | ||
659 | messages = yield filter_events_for_client( | |
660 | self.store, user_id, messages, is_peeking=is_peeking | |
661 | ) | |
662 | ||
663 | start_token = StreamToken.START.copy_and_replace("room_key", token[0]) | |
664 | end_token = StreamToken.START.copy_and_replace("room_key", token[1]) | |
665 | ||
666 | time_now = self.clock.time_msec() | |
667 | ||
668 | defer.returnValue({ | |
669 | "membership": membership, | |
670 | "room_id": room_id, | |
671 | "messages": { | |
672 | "chunk": [serialize_event(m, time_now) for m in messages], | |
673 | "start": start_token.to_string(), | |
674 | "end": end_token.to_string(), | |
675 | }, | |
676 | "state": [serialize_event(s, time_now) for s in room_state.values()], | |
677 | "presence": [], | |
678 | "receipts": [], | |
679 | }) | |
680 | ||
681 | @defer.inlineCallbacks | |
682 | def _room_initial_sync_joined(self, user_id, room_id, pagin_config, | |
683 | membership, is_peeking): | |
684 | current_state = yield self.state.get_current_state( | |
685 | room_id=room_id, | |
686 | ) | |
687 | ||
688 | # TODO: These concurrently | |
689 | time_now = self.clock.time_msec() | |
690 | state = [ | |
691 | serialize_event(x, time_now) | |
692 | for x in current_state.values() | |
693 | ] | |
694 | ||
695 | now_token = yield self.hs.get_event_sources().get_current_token() | |
696 | ||
697 | limit = pagin_config.limit if pagin_config else None | |
698 | if limit is None: | |
699 | limit = 10 | |
700 | ||
701 | room_members = [ | |
702 | m for m in current_state.values() | |
703 | if m.type == EventTypes.Member | |
704 | and m.content["membership"] == Membership.JOIN | |
705 | ] | |
706 | ||
707 | presence_handler = self.hs.get_presence_handler() | |
708 | ||
709 | @defer.inlineCallbacks | |
710 | def get_presence(): | |
711 | states = yield presence_handler.get_states( | |
712 | [m.user_id for m in room_members], | |
713 | as_event=True, | |
714 | ) | |
715 | ||
716 | defer.returnValue(states) | |
717 | ||
718 | @defer.inlineCallbacks | |
719 | def get_receipts(): | |
720 | receipts_handler = self.hs.get_handlers().receipts_handler | |
721 | receipts = yield receipts_handler.get_receipts_for_room( | |
722 | room_id, | |
723 | now_token.receipt_key | |
724 | ) | |
725 | defer.returnValue(receipts) | |
726 | ||
727 | presence, receipts, (messages, token) = yield defer.gatherResults( | |
728 | [ | |
729 | preserve_fn(get_presence)(), | |
730 | preserve_fn(get_receipts)(), | |
731 | preserve_fn(self.store.get_recent_events_for_room)( | |
732 | room_id, | |
733 | limit=limit, | |
734 | end_token=now_token.room_key, | |
735 | ) | |
736 | ], | |
737 | consumeErrors=True, | |
738 | ).addErrback(unwrapFirstError) | |
739 | ||
740 | messages = yield filter_events_for_client( | |
741 | self.store, user_id, messages, is_peeking=is_peeking, | |
742 | ) | |
743 | ||
744 | start_token = now_token.copy_and_replace("room_key", token[0]) | |
745 | end_token = now_token.copy_and_replace("room_key", token[1]) | |
746 | ||
747 | time_now = self.clock.time_msec() | |
748 | ||
749 | ret = { | |
750 | "room_id": room_id, | |
751 | "messages": { | |
752 | "chunk": [serialize_event(m, time_now) for m in messages], | |
753 | "start": start_token.to_string(), | |
754 | "end": end_token.to_string(), | |
755 | }, | |
756 | "state": state, | |
757 | "presence": presence, | |
758 | "receipts": receipts, | |
759 | } | |
760 | if not is_peeking: | |
761 | ret["membership"] = membership | |
762 | ||
763 | defer.returnValue(ret) | |
764 | 389 | |
765 | 390 | @measure_func("_create_new_client_event") |
766 | 391 | @defer.inlineCallbacks |
123 | 123 | r for r in sorted_rooms |
124 | 124 | if r not in newly_unpublished and rooms_to_num_joined[room_id] > 0 |
125 | 125 | ] |
126 | ||
127 | total_room_count = len(rooms_to_scan) | |
126 | 128 | |
127 | 129 | if since_token: |
128 | 130 | # Filter out rooms we've already returned previously |
187 | 189 | |
188 | 190 | results = { |
189 | 191 | "chunk": chunk, |
192 | "total_room_count_estimate": total_room_count, | |
190 | 193 | } |
191 | 194 | |
192 | 195 | if since_token: |
15 | 15 | from twisted.internet import defer |
16 | 16 | |
17 | 17 | from synapse.api.errors import SynapseError, AuthError |
18 | from synapse.util.logcontext import ( | |
19 | PreserveLoggingContext, preserve_fn, preserve_context_over_deferred, | |
20 | ) | |
18 | from synapse.util.logcontext import preserve_fn | |
21 | 19 | from synapse.util.metrics import Measure |
20 | from synapse.util.wheel_timer import WheelTimer | |
22 | 21 | from synapse.types import UserID, get_domain_from_id |
23 | 22 | |
24 | 23 | import logging |
32 | 31 | # A tiny object useful for storing a user's membership in a room, as a mapping |
33 | 32 | # key |
34 | 33 | RoomMember = namedtuple("RoomMember", ("room_id", "user_id")) |
34 | ||
35 | ||
36 | # How often we expect remote servers to resend us presence. | |
37 | FEDERATION_TIMEOUT = 60 * 1000 | |
38 | ||
39 | # How often to resend typing across federation. | |
40 | FEDERATION_PING_INTERVAL = 40 * 1000 | |
35 | 41 | |
36 | 42 | |
37 | 43 | class TypingHandler(object): |
43 | 49 | self.notifier = hs.get_notifier() |
44 | 50 | self.state = hs.get_state_handler() |
45 | 51 | |
52 | self.hs = hs | |
53 | ||
46 | 54 | self.clock = hs.get_clock() |
55 | self.wheel_timer = WheelTimer(bucket_size=5000) | |
47 | 56 | |
48 | 57 | self.federation = hs.get_replication_layer() |
49 | 58 | |
52 | 61 | hs.get_distributor().observe("user_left_room", self.user_left_room) |
53 | 62 | |
54 | 63 | self._member_typing_until = {} # clock time we expect to stop |
55 | self._member_typing_timer = {} # deferreds to manage theabove | |
64 | self._member_last_federation_poke = {} | |
56 | 65 | |
57 | 66 | # map room IDs to serial numbers |
58 | 67 | self._room_serials = {} |
60 | 69 | # map room IDs to sets of users currently typing |
61 | 70 | self._room_typing = {} |
62 | 71 | |
63 | def tearDown(self): | |
64 | """Cancels all the pending timers. | |
65 | Normally this shouldn't be needed, but it's required from unit tests | |
66 | to avoid a "Reactor was unclean" warning.""" | |
67 | for t in self._member_typing_timer.values(): | |
68 | self.clock.cancel_call_later(t) | |
72 | self.clock.looping_call( | |
73 | self._handle_timeouts, | |
74 | 5000, | |
75 | ) | |
76 | ||
77 | def _handle_timeouts(self): | |
78 | logger.info("Checking for typing timeouts") | |
79 | ||
80 | now = self.clock.time_msec() | |
81 | ||
82 | members = set(self.wheel_timer.fetch(now)) | |
83 | ||
84 | for member in members: | |
85 | if not self.is_typing(member): | |
86 | # Nothing to do if they're no longer typing | |
87 | continue | |
88 | ||
89 | until = self._member_typing_until.get(member, None) | |
90 | if not until or until < now: | |
91 | logger.info("Timing out typing for: %s", member.user_id) | |
92 | preserve_fn(self._stopped_typing)(member) | |
93 | continue | |
94 | ||
95 | # Check if we need to resend a keep alive over federation for this | |
96 | # user. | |
97 | if self.hs.is_mine_id(member.user_id): | |
98 | last_fed_poke = self._member_last_federation_poke.get(member, None) | |
99 | if not last_fed_poke or last_fed_poke + FEDERATION_PING_INTERVAL < now: | |
100 | preserve_fn(self._push_remote)( | |
101 | member=member, | |
102 | typing=True | |
103 | ) | |
104 | ||
105 | def is_typing(self, member): | |
106 | return member.user_id in self._room_typing.get(member.room_id, []) | |
69 | 107 | |
70 | 108 | @defer.inlineCallbacks |
71 | 109 | def started_typing(self, target_user, auth_user, room_id, timeout): |
84 | 122 | "%s has started typing in %s", target_user_id, room_id |
85 | 123 | ) |
86 | 124 | |
87 | until = self.clock.time_msec() + timeout | |
88 | 125 | member = RoomMember(room_id=room_id, user_id=target_user_id) |
89 | 126 | |
90 | was_present = member in self._member_typing_until | |
91 | ||
92 | if member in self._member_typing_timer: | |
93 | self.clock.cancel_call_later(self._member_typing_timer[member]) | |
94 | ||
95 | def _cb(): | |
96 | logger.debug( | |
97 | "%s has timed out in %s", target_user.to_string(), room_id | |
98 | ) | |
99 | self._stopped_typing(member) | |
100 | ||
101 | self._member_typing_until[member] = until | |
102 | self._member_typing_timer[member] = self.clock.call_later( | |
103 | timeout / 1000.0, _cb | |
127 | was_present = member.user_id in self._room_typing.get(room_id, set()) | |
128 | ||
129 | now = self.clock.time_msec() | |
130 | self._member_typing_until[member] = now + timeout | |
131 | ||
132 | self.wheel_timer.insert( | |
133 | now=now, | |
134 | obj=member, | |
135 | then=now + timeout, | |
104 | 136 | ) |
105 | 137 | |
106 | 138 | if was_present: |
108 | 140 | defer.returnValue(None) |
109 | 141 | |
110 | 142 | yield self._push_update( |
111 | room_id=room_id, | |
112 | user_id=target_user_id, | |
143 | member=member, | |
113 | 144 | typing=True, |
114 | 145 | ) |
115 | 146 | |
131 | 162 | ) |
132 | 163 | |
133 | 164 | member = RoomMember(room_id=room_id, user_id=target_user_id) |
134 | ||
135 | if member in self._member_typing_timer: | |
136 | self.clock.cancel_call_later(self._member_typing_timer[member]) | |
137 | del self._member_typing_timer[member] | |
138 | 165 | |
139 | 166 | yield self._stopped_typing(member) |
140 | 167 | |
147 | 174 | |
148 | 175 | @defer.inlineCallbacks |
149 | 176 | def _stopped_typing(self, member): |
150 | if member not in self._member_typing_until: | |
177 | if member.user_id not in self._room_typing.get(member.room_id, set()): | |
151 | 178 | # No point |
152 | 179 | defer.returnValue(None) |
153 | 180 | |
181 | self._member_typing_until.pop(member, None) | |
182 | self._member_last_federation_poke.pop(member, None) | |
183 | ||
154 | 184 | yield self._push_update( |
155 | room_id=member.room_id, | |
156 | user_id=member.user_id, | |
185 | member=member, | |
157 | 186 | typing=False, |
158 | 187 | ) |
159 | 188 | |
160 | del self._member_typing_until[member] | |
161 | ||
162 | if member in self._member_typing_timer: | |
163 | # Don't cancel it - either it already expired, or the real | |
164 | # stopped_typing() will cancel it | |
165 | del self._member_typing_timer[member] | |
166 | ||
167 | @defer.inlineCallbacks | |
168 | def _push_update(self, room_id, user_id, typing): | |
169 | users = yield self.state.get_current_user_in_room(room_id) | |
170 | domains = set(get_domain_from_id(u) for u in users) | |
171 | ||
172 | deferreds = [] | |
173 | for domain in domains: | |
174 | if domain == self.server_name: | |
175 | preserve_fn(self._push_update_local)( | |
176 | room_id=room_id, | |
177 | user_id=user_id, | |
178 | typing=typing | |
179 | ) | |
180 | else: | |
181 | deferreds.append(preserve_fn(self.federation.send_edu)( | |
189 | @defer.inlineCallbacks | |
190 | def _push_update(self, member, typing): | |
191 | if self.hs.is_mine_id(member.user_id): | |
192 | # Only send updates for changes to our own users. | |
193 | yield self._push_remote(member, typing) | |
194 | ||
195 | self._push_update_local( | |
196 | member=member, | |
197 | typing=typing | |
198 | ) | |
199 | ||
200 | @defer.inlineCallbacks | |
201 | def _push_remote(self, member, typing): | |
202 | users = yield self.state.get_current_user_in_room(member.room_id) | |
203 | self._member_last_federation_poke[member] = self.clock.time_msec() | |
204 | ||
205 | now = self.clock.time_msec() | |
206 | self.wheel_timer.insert( | |
207 | now=now, | |
208 | obj=member, | |
209 | then=now + FEDERATION_PING_INTERVAL, | |
210 | ) | |
211 | ||
212 | for domain in set(get_domain_from_id(u) for u in users): | |
213 | if domain != self.server_name: | |
214 | self.federation.send_edu( | |
182 | 215 | destination=domain, |
183 | 216 | edu_type="m.typing", |
184 | 217 | content={ |
185 | "room_id": room_id, | |
186 | "user_id": user_id, | |
218 | "room_id": member.room_id, | |
219 | "user_id": member.user_id, | |
187 | 220 | "typing": typing, |
188 | 221 | }, |
189 | key=(room_id, user_id), | |
190 | )) | |
191 | ||
192 | yield preserve_context_over_deferred( | |
193 | defer.DeferredList(deferreds, consumeErrors=True) | |
194 | ) | |
222 | key=member, | |
223 | ) | |
195 | 224 | |
196 | 225 | @defer.inlineCallbacks |
197 | 226 | def _recv_edu(self, origin, content): |
198 | 227 | room_id = content["room_id"] |
199 | 228 | user_id = content["user_id"] |
229 | ||
230 | member = RoomMember(user_id=user_id, room_id=room_id) | |
200 | 231 | |
201 | 232 | # Check that the string is a valid user id |
202 | 233 | user = UserID.from_string(user_id) |
212 | 243 | domains = set(get_domain_from_id(u) for u in users) |
213 | 244 | |
214 | 245 | if self.server_name in domains: |
246 | logger.info("Got typing update from %s: %r", user_id, content) | |
247 | now = self.clock.time_msec() | |
248 | self._member_typing_until[member] = now + FEDERATION_TIMEOUT | |
249 | self.wheel_timer.insert( | |
250 | now=now, | |
251 | obj=member, | |
252 | then=now + FEDERATION_TIMEOUT, | |
253 | ) | |
215 | 254 | self._push_update_local( |
216 | room_id=room_id, | |
217 | user_id=user_id, | |
255 | member=member, | |
218 | 256 | typing=content["typing"] |
219 | 257 | ) |
220 | 258 | |
221 | def _push_update_local(self, room_id, user_id, typing): | |
222 | room_set = self._room_typing.setdefault(room_id, set()) | |
259 | def _push_update_local(self, member, typing): | |
260 | room_set = self._room_typing.setdefault(member.room_id, set()) | |
223 | 261 | if typing: |
224 | room_set.add(user_id) | |
262 | room_set.add(member.user_id) | |
225 | 263 | else: |
226 | room_set.discard(user_id) | |
264 | room_set.discard(member.user_id) | |
227 | 265 | |
228 | 266 | self._latest_room_serial += 1 |
229 | self._room_serials[room_id] = self._latest_room_serial | |
230 | ||
231 | with PreserveLoggingContext(): | |
232 | self.notifier.on_new_event( | |
233 | "typing_key", self._latest_room_serial, rooms=[room_id] | |
234 | ) | |
267 | self._room_serials[member.room_id] = self._latest_room_serial | |
268 | ||
269 | self.notifier.on_new_event( | |
270 | "typing_key", self._latest_room_serial, rooms=[member.room_id] | |
271 | ) | |
235 | 272 | |
236 | 273 | def get_all_typing_updates(self, last_id, current_id): |
237 | 274 | # TODO: Work out a way to do this without scanning the entire state. |
24 | 24 | |
25 | 25 | def __init__(self, hs): |
26 | 26 | super(InitialSyncRestServlet, self).__init__(hs) |
27 | self.handlers = hs.get_handlers() | |
27 | self.initial_sync_handler = hs.get_initial_sync_handler() | |
28 | 28 | |
29 | 29 | @defer.inlineCallbacks |
30 | 30 | def on_GET(self, request): |
31 | 31 | requester = yield self.auth.get_user_by_req(request) |
32 | 32 | as_client_event = "raw" not in request.args |
33 | 33 | pagination_config = PaginationConfig.from_request(request) |
34 | handler = self.handlers.message_handler | |
35 | 34 | include_archived = request.args.get("archived", None) == ["true"] |
36 | content = yield handler.snapshot_all_rooms( | |
35 | content = yield self.initial_sync_handler.snapshot_all_rooms( | |
37 | 36 | user_id=requester.user.to_string(), |
38 | 37 | pagin_config=pagination_config, |
39 | 38 | as_client_event=as_client_event, |
455 | 455 | |
456 | 456 | def __init__(self, hs): |
457 | 457 | super(RoomInitialSyncRestServlet, self).__init__(hs) |
458 | self.handlers = hs.get_handlers() | |
458 | self.initial_sync_handler = hs.get_initial_sync_handler() | |
459 | 459 | |
460 | 460 | @defer.inlineCallbacks |
461 | 461 | def on_GET(self, request, room_id): |
462 | 462 | requester = yield self.auth.get_user_by_req(request, allow_guest=True) |
463 | 463 | pagination_config = PaginationConfig.from_request(request) |
464 | content = yield self.handlers.message_handler.room_initial_sync( | |
464 | content = yield self.initial_sync_handler.room_initial_sync( | |
465 | 465 | room_id=room_id, |
466 | 466 | requester=requester, |
467 | 467 | pagin_config=pagination_config, |
704 | 704 | |
705 | 705 | yield self.presence_handler.bump_presence_active_time(requester.user) |
706 | 706 | |
707 | # Limit timeout to stop people from setting silly typing timeouts. | |
708 | timeout = min(content.get("timeout", 30000), 120000) | |
709 | ||
707 | 710 | if content["typing"]: |
708 | 711 | yield self.typing_handler.started_typing( |
709 | 712 | target_user=target_user, |
710 | 713 | auth_user=requester.user, |
711 | 714 | room_id=room_id, |
712 | timeout=content.get("timeout", 30000), | |
715 | timeout=timeout, | |
713 | 716 | ) |
714 | 717 | else: |
715 | 718 | yield self.typing_handler.stopped_typing( |
42 | 42 | from synapse.handlers.sync import SyncHandler |
43 | 43 | from synapse.handlers.typing import TypingHandler |
44 | 44 | from synapse.handlers.events import EventHandler, EventStreamHandler |
45 | from synapse.handlers.initial_sync import InitialSyncHandler | |
45 | 46 | from synapse.http.client import SimpleHttpClient, InsecureInterceptableContextFactory |
46 | 47 | from synapse.http.matrixfederationclient import MatrixFederationHttpClient |
47 | 48 | from synapse.notifier import Notifier |
97 | 98 | 'e2e_keys_handler', |
98 | 99 | 'event_handler', |
99 | 100 | 'event_stream_handler', |
101 | 'initial_sync_handler', | |
100 | 102 | 'application_service_api', |
101 | 103 | 'application_service_scheduler', |
102 | 104 | 'application_service_handler', |
227 | 229 | def build_event_stream_handler(self): |
228 | 230 | return EventStreamHandler(self) |
229 | 231 | |
232 | def build_initial_sync_handler(self): | |
233 | return InitialSyncHandler(self) | |
234 | ||
230 | 235 | def build_event_sources(self): |
231 | 236 | return EventSources(self) |
232 | 237 |
397 | 397 | sql = (""" |
398 | 398 | DELETE FROM stream_ordering_to_exterm |
399 | 399 | WHERE |
400 | ( | |
401 | SELECT max(stream_ordering) AS stream_ordering | |
400 | room_id IN ( | |
401 | SELECT room_id | |
402 | 402 | FROM stream_ordering_to_exterm |
403 | WHERE room_id = stream_ordering_to_exterm.room_id | |
404 | ) > ? | |
405 | AND stream_ordering < ? | |
403 | WHERE stream_ordering > ? | |
404 | ) AND stream_ordering < ? | |
406 | 405 | """) |
407 | 406 | txn.execute( |
408 | 407 | sql, |
1354 | 1354 | min_stream_id = rows[-1][0] |
1355 | 1355 | event_ids = [row[1] for row in rows] |
1356 | 1356 | |
1357 | events = self._get_events_txn(txn, event_ids) | |
1358 | ||
1359 | rows = [] | |
1360 | for event in events: | |
1361 | try: | |
1362 | event_id = event.event_id | |
1363 | origin_server_ts = event.origin_server_ts | |
1364 | except (KeyError, AttributeError): | |
1365 | # If the event is missing a necessary field then | |
1366 | # skip over it. | |
1367 | continue | |
1368 | ||
1369 | rows.append((origin_server_ts, event_id)) | |
1357 | rows_to_update = [] | |
1358 | ||
1359 | chunks = [ | |
1360 | event_ids[i:i + 100] | |
1361 | for i in xrange(0, len(event_ids), 100) | |
1362 | ] | |
1363 | for chunk in chunks: | |
1364 | ev_rows = self._simple_select_many_txn( | |
1365 | txn, | |
1366 | table="event_json", | |
1367 | column="event_id", | |
1368 | iterable=chunk, | |
1369 | retcols=["event_id", "json"], | |
1370 | keyvalues={}, | |
1371 | ) | |
1372 | ||
1373 | for row in ev_rows: | |
1374 | event_id = row["event_id"] | |
1375 | event_json = json.loads(row["json"]) | |
1376 | try: | |
1377 | origin_server_ts = event_json["origin_server_ts"] | |
1378 | except (KeyError, AttributeError): | |
1379 | # If the event is missing a necessary field then | |
1380 | # skip over it. | |
1381 | continue | |
1382 | ||
1383 | rows_to_update.append((origin_server_ts, event_id)) | |
1370 | 1384 | |
1371 | 1385 | sql = ( |
1372 | 1386 | "UPDATE events SET origin_server_ts = ? WHERE event_id = ?" |
1373 | 1387 | ) |
1374 | 1388 | |
1375 | for index in range(0, len(rows), INSERT_CLUMP_SIZE): | |
1376 | clump = rows[index:index + INSERT_CLUMP_SIZE] | |
1389 | for index in range(0, len(rows_to_update), INSERT_CLUMP_SIZE): | |
1390 | clump = rows_to_update[index:index + INSERT_CLUMP_SIZE] | |
1377 | 1391 | txn.executemany(sql, clump) |
1378 | 1392 | |
1379 | 1393 | progress = { |
1380 | 1394 | "target_min_stream_id_inclusive": target_min_stream_id, |
1381 | 1395 | "max_stream_id_exclusive": min_stream_id, |
1382 | "rows_inserted": rows_inserted + len(rows) | |
1396 | "rows_inserted": rows_inserted + len(rows_to_update) | |
1383 | 1397 | } |
1384 | 1398 | |
1385 | 1399 | self._background_update_progress_txn( |
1386 | 1400 | txn, self.EVENT_ORIGIN_SERVER_TS_NAME, progress |
1387 | 1401 | ) |
1388 | 1402 | |
1389 | return len(rows) | |
1403 | return len(rows_to_update) | |
1390 | 1404 | |
1391 | 1405 | result = yield self.runInteraction( |
1392 | 1406 | self.EVENT_ORIGIN_SERVER_TS_NAME, reindex_search_txn |
24 | 24 | |
25 | 25 | # Remember to update this number every time a change is made to database |
26 | 26 | # schema files, so the users will be informed on server restarts. |
27 | SCHEMA_VERSION = 35 | |
27 | SCHEMA_VERSION = 36 | |
28 | 28 | |
29 | 29 | dir_path = os.path.abspath(os.path.dirname(__file__)) |
30 | 30 |
0 | /* Copyright 2016 OpenMarket Ltd | |
1 | * | |
2 | * Licensed under the Apache License, Version 2.0 (the "License"); | |
3 | * you may not use this file except in compliance with the License. | |
4 | * You may obtain a copy of the License at | |
5 | * | |
6 | * http://www.apache.org/licenses/LICENSE-2.0 | |
7 | * | |
8 | * Unless required by applicable law or agreed to in writing, software | |
9 | * distributed under the License is distributed on an "AS IS" BASIS, | |
10 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
11 | * See the License for the specific language governing permissions and | |
12 | * limitations under the License. | |
13 | */ | |
14 | ||
15 | -- Re-add some entries to stream_ordering_to_exterm that were incorrectly deleted | |
16 | INSERT INTO stream_ordering_to_exterm (stream_ordering, room_id, event_id) | |
17 | SELECT | |
18 | (SELECT stream_ordering FROM events where event_id = e.event_id) AS stream_ordering, | |
19 | room_id, | |
20 | event_id | |
21 | FROM event_forward_extremities AS e | |
22 | WHERE NOT EXISTS ( | |
23 | SELECT room_id FROM stream_ordering_to_exterm AS s | |
24 | WHERE s.room_id = e.room_id | |
25 | ); |
306 | 306 | |
307 | 307 | def _get_state_groups_from_groups_txn(self, txn, groups, types=None): |
308 | 308 | results = {group: {} for group in groups} |
309 | if types is not None: | |
310 | types = list(set(types)) # deduplicate types list | |
311 | ||
309 | 312 | if isinstance(self.database_engine, PostgresEngine): |
310 | 313 | # Temporarily disable sequential scans in this transaction. This is |
311 | 314 | # a temporary hack until we can add the right indices in |
374 | 377 | # We don't use WITH RECURSIVE on sqlite3 as there are distributions |
375 | 378 | # that ship with an sqlite3 version that doesn't support it (e.g. wheezy) |
376 | 379 | for group in groups: |
377 | group_tree = [group] | |
378 | 380 | next_group = group |
379 | 381 | |
380 | 382 | while next_group: |
383 | # We did this before by getting the list of group ids, and | |
384 | # then passing that list to sqlite to get latest event for | |
385 | # each (type, state_key). However, that was terribly slow | |
386 | # without the right indicies (which we can't add until | |
387 | # after we finish deduping state, which requires this func) | |
388 | args = [next_group] | |
389 | if types: | |
390 | args.extend(i for typ in types for i in typ) | |
391 | ||
392 | txn.execute( | |
393 | "SELECT type, state_key, event_id FROM state_groups_state" | |
394 | " WHERE state_group = ? %s" % (where_clause,), | |
395 | args | |
396 | ) | |
397 | rows = txn.fetchall() | |
398 | results[group].update({ | |
399 | (typ, state_key): event_id | |
400 | for typ, state_key, event_id in rows | |
401 | if (typ, state_key) not in results[group] | |
402 | }) | |
403 | ||
404 | # If the lengths match then we must have all the types, | |
405 | # so no need to go walk further down the tree. | |
406 | if types is not None and len(results[group]) == len(types): | |
407 | break | |
408 | ||
381 | 409 | next_group = self._simple_select_one_onecol_txn( |
382 | 410 | txn, |
383 | 411 | table="state_group_edges", |
385 | 413 | retcol="prev_state_group", |
386 | 414 | allow_none=True, |
387 | 415 | ) |
388 | if next_group: | |
389 | group_tree.append(next_group) | |
390 | ||
391 | sql = (""" | |
392 | SELECT type, state_key, event_id FROM state_groups_state | |
393 | INNER JOIN ( | |
394 | SELECT type, state_key, max(state_group) as state_group | |
395 | FROM state_groups_state | |
396 | WHERE state_group IN (%s) %s | |
397 | GROUP BY type, state_key | |
398 | ) USING (type, state_key, state_group); | |
399 | """) % (",".join("?" for _ in group_tree), where_clause,) | |
400 | ||
401 | args = list(group_tree) | |
402 | if types is not None: | |
403 | args.extend([i for typ in types for i in typ]) | |
404 | ||
405 | txn.execute(sql, args) | |
406 | rows = self.cursor_to_dict(txn) | |
407 | for row in rows: | |
408 | key = (row["type"], row["state_key"]) | |
409 | results[group][key] = row["event_id"] | |
410 | 416 | |
411 | 417 | return results |
412 | 418 |
55 | 55 | try: |
56 | 56 | return string.split(":", 1)[1] |
57 | 57 | except IndexError: |
58 | raise SynapseError(400, "Invalid ID: %r", string) | |
58 | raise SynapseError(400, "Invalid ID: %r" % (string,)) | |
59 | 59 | |
60 | 60 | |
61 | 61 | class DomainSpecificString( |
266 | 266 | from synapse.handlers.typing import RoomMember |
267 | 267 | member = RoomMember(self.room_id, self.u_apple.to_string()) |
268 | 268 | self.handler._member_typing_until[member] = 1002000 |
269 | self.handler._member_typing_timer[member] = ( | |
270 | self.clock.call_later(1002, lambda: 0) | |
271 | ) | |
272 | self.handler._room_typing[self.room_id] = set((self.u_apple.to_string(),)) | |
269 | self.handler._room_typing[self.room_id] = set([self.u_apple.to_string()]) | |
273 | 270 | |
274 | 271 | self.assertEquals(self.event_source.get_current_key(), 0) |
275 | 272 | |
329 | 326 | }, |
330 | 327 | }]) |
331 | 328 | |
332 | self.clock.advance_time(11) | |
329 | self.clock.advance_time(16) | |
333 | 330 | |
334 | 331 | self.on_new_event.assert_has_calls([ |
335 | 332 | call('typing_key', 2, rooms=[self.room_id]), |
104 | 104 | # Need another user to make notifications actually work |
105 | 105 | yield self.join(self.room_id, user="@jim:red") |
106 | 106 | |
107 | def tearDown(self): | |
108 | self.hs.get_typing_handler().tearDown() | |
109 | ||
110 | 107 | @defer.inlineCallbacks |
111 | 108 | def test_set_typing(self): |
112 | 109 | (code, _) = yield self.mock_resource.trigger( |
146 | 143 | |
147 | 144 | self.assertEquals(self.event_source.get_current_key(), 1) |
148 | 145 | |
149 | self.clock.advance_time(31) | |
146 | self.clock.advance_time(36) | |
150 | 147 | |
151 | 148 | self.assertEquals(self.event_source.get_current_key(), 2) |
152 | 149 |
219 | 219 | # list of lists of [absolute_time, callback, expired] in no particular |
220 | 220 | # order |
221 | 221 | self.timers = [] |
222 | self.loopers = [] | |
222 | 223 | |
223 | 224 | def time(self): |
224 | 225 | return self.now |
239 | 240 | return t |
240 | 241 | |
241 | 242 | def looping_call(self, function, interval): |
242 | pass | |
243 | self.loopers.append([function, interval / 1000., self.now]) | |
243 | 244 | |
244 | 245 | def cancel_call_later(self, timer, ignore_errs=False): |
245 | 246 | if timer[2]: |
267 | 268 | callback() |
268 | 269 | else: |
269 | 270 | self.timers.append(t) |
271 | ||
272 | for looped in self.loopers: | |
273 | func, interval, last = looped | |
274 | if last + interval < self.now: | |
275 | func() | |
276 | looped[2] = self.now | |
270 | 277 | |
271 | 278 | def advance_time_msec(self, ms): |
272 | 279 | self.advance_time(ms / 1000.) |