Codebase list matrix-synapse / upstream/0.27.3
New upstream version 0.27.3 Erik Johnston 6 years ago
52 changed file(s) with 971 addition(s) and 293 deletion(s). Raw diff Collapse all Expand all
0 Changes in synapse v0.27.3 (2018-04-11)
1 ======================================
2
3 Bug fixes:
4
5 * URL quote path segments over federation (#3082)
6
7 Changes in synapse v0.27.3-rc2 (2018-04-09)
8 ==========================================
9
10 v0.27.3-rc1 used a stale version of the develop branch so the changelog overstates
11 the functionality. v0.27.3-rc2 is up to date, rc1 should be ignored.
12
13 Changes in synapse v0.27.3-rc1 (2018-04-09)
14 =======================================
15
16 Notable changes include API support for joinability of groups. Also new metrics
17 and phone home stats. Phone home stats include better visibility of system usage
18 so we can tweak synpase to work better for all users rather than our own experience
19 with matrix.org. Also, recording 'r30' stat which is the measure we use to track
20 overal growth of the Matrix ecosystem. It is defined as:-
21
22 Counts the number of native 30 day retained users, defined as:-
23 * Users who have created their accounts more than 30 days
24 * Where last seen at most 30 days ago
25 * Where account creation and last_seen are > 30 days"
26
27
28 Features:
29
30 * Add joinability for groups (PR #3045)
31 * Implement group join API (PR #3046)
32 * Add counter metrics for calculating state delta (PR #3033)
33 * R30 stats (PR #3041)
34 * Measure time it takes to calculate state group ID (PR #3043)
35 * Add basic performance statistics to phone home (PR #3044)
36 * Add response size metrics (PR #3071)
37 * phone home cache size configurations (PR #3063)
38
39 Changes:
40
41 * Add a blurb explaining the main synapse worker (PR #2886) Thanks to @turt2live!
42 * Replace old style error catching with 'as' keyword (PR #3000) Thanks to @NotAFile!
43 * Use .iter* to avoid copies in StateHandler (PR #3006)
44 * Linearize calls to _generate_user_id (PR #3029)
45 * Remove last usage of ujson (PR #3030)
46 * Use simplejson throughout (PR #3048)
47 * Use static JSONEncoders (PR #3049)
48 * Remove uses of events.content (PR #3060)
49 * Improve database cache performance (PR #3068)
50
51 Bug fixes:
52
53 * Add room_id to the response of `rooms/{roomId}/join` (PR #2986) Thanks to @jplatte!
54 * Fix replication after switch to simplejson (PR #3015)
55 * Fix replication after switch to simplejson (PR #3015)
56 * 404 correctly on missing paths via NoResource (PR #3022)
57 * Fix error when claiming e2e keys from offline servers (PR #3034)
58 * fix tests/storage/test_user_directory.py (PR #3042)
59 * use PUT instead of POST for federating groups/m.join_policy (PR #3070) Thanks to @krombel!
60 * postgres port script: fix state_groups_pkey error (PR #3072)
61
62
063 Changes in synapse v0.27.2 (2018-03-26)
164 =======================================
265
58121
59122 Changes:
60123
61 * Continue to factor out processing from main process and into worker processes. See updated `docs/workers.rst <docs/metrics-howto.rst>`_ (PR #2892 - #2904, #2913, #2920 - #2926, #2947, #2847, #2854, #2872, #2873, #2874, #2928, #2929, #2934, #2856, #2976 - #2984, #2987 - #2989, #2991 - #2993, #2995, #2784)
124 * Continue to factor out processing from main process and into worker processes. See updated `docs/workers.rst <docs/workers.rst>`_ (PR #2892 - #2904, #2913, #2920 - #2926, #2947, #2847, #2854, #2872, #2873, #2874, #2928, #2929, #2934, #2856, #2976 - #2984, #2987 - #2989, #2991 - #2993, #2995, #2784)
62125 * Ensure state cache is used when persisting events (PR #2864, #2871, #2802, #2835, #2836, #2841, #2842, #2849)
63126 * Change the default config to bind on both IPv4 and IPv6 on all platforms (PR #2435) Thanks to @silkeh!
64127 * No longer require a specific version of saml2 (PR #2695) Thanks to @okurz!
156156
157157 In case of problems, please see the _`Troubleshooting` section below.
158158
159 Alternatively, Silvio Fricke has contributed a Dockerfile to automate the
160 above in Docker at https://registry.hub.docker.com/u/silviof/docker-matrix/.
159 Alternatively, Andreas Peters (previously Silvio Fricke) has contributed a Dockerfile to automate the
160 above in Docker at https://hub.docker.com/r/avhost/docker-matrix/tags/
161161
162162 Also, Martin Giess has created an auto-deployment process with vagrant/ansible,
163163 tested with VirtualBox/AWS/DigitalOcean - see https://github.com/EMnify/matrix-synapse-auto-deploy
4646 # You may need to specify a port (eg, :8448) if your server is not
4747 # configured on port 443.
4848 curl -kv https://<host.name>/_matrix/client/versions 2>&1 | grep "Server:"
49
50 Upgrading to $NEXT_VERSION
51 ====================
52
53 This release expands the anonymous usage stats sent if the opt-in
54 ``report_stats`` configuration is set to ``true``. We now capture RSS memory
55 and cpu use at a very coarse level. This requires administrators to install
56 the optional ``psutil`` python module.
57
58 We would appreciate it if you could assist by ensuring this module is available
59 and ``report_stats`` is enabled. This will let us see if performance changes to
60 synapse are having an impact to the general community.
4961
5062 Upgrading to v0.15.0
5163 ====================
5454
5555 You then create a set of configs for the various worker processes. These
5656 should be worker configuration files, and should be stored in a dedicated
57 subdirectory, to allow synctl to manipulate them.
57 subdirectory, to allow synctl to manipulate them. An additional configuration
58 for the master synapse process will need to be created because the process will
59 not be started automatically. That configuration should look like this::
60
61 worker_app: synapse.app.homeserver
62 daemonize: true
5863
5964 Each worker configuration file inherits the configuration of the main homeserver
6065 configuration file. You can then override configuration specific to that worker,
229234 ``synapse.app.event_creator``
230235 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
231236
232 Handles non-state event creation. It can handle REST endpoints matching::
237 Handles some event creation. It can handle REST endpoints matching::
233238
234239 ^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/send
240 ^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/(join|invite|leave|ban|unban|kick)$
241 ^/_matrix/client/(api/v1|r0|unstable)/join/
235242
236243 It will create events locally and then send them on to the main synapse
237244 instance to be persisted and handled.
00 #!/usr/bin/env python
11 # -*- coding: utf-8 -*-
22 # Copyright 2015, 2016 OpenMarket Ltd
3 # Copyright 2018 New Vector Ltd
34 #
45 # Licensed under the Apache License, Version 2.0 (the "License");
56 # you may not use this file except in compliance with the License.
249250 @defer.inlineCallbacks
250251 def handle_table(self, table, postgres_size, table_size, forward_chunk,
251252 backward_chunk):
253 logger.info(
254 "Table %s: %i/%i (rows %i-%i) already ported",
255 table, postgres_size, table_size,
256 backward_chunk+1, forward_chunk-1,
257 )
258
252259 if not table_size:
253260 return
254261
466473 self.progress.set_state("Preparing PostgreSQL")
467474 self.setup_db(postgres_config, postgres_engine)
468475
469 # Step 2. Get tables.
470 self.progress.set_state("Fetching tables")
471 sqlite_tables = yield self.sqlite_store._simple_select_onecol(
472 table="sqlite_master",
473 keyvalues={
474 "type": "table",
475 },
476 retcol="name",
477 )
478
479 postgres_tables = yield self.postgres_store._simple_select_onecol(
480 table="information_schema.tables",
481 keyvalues={},
482 retcol="distinct table_name",
483 )
484
485 tables = set(sqlite_tables) & set(postgres_tables)
486
487 self.progress.set_state("Creating tables")
488
489 logger.info("Found %d tables", len(tables))
490
476 self.progress.set_state("Creating port tables")
491477 def create_port_table(txn):
492478 txn.execute(
493 "CREATE TABLE port_from_sqlite3 ("
479 "CREATE TABLE IF NOT EXISTS port_from_sqlite3 ("
494480 " table_name varchar(100) NOT NULL UNIQUE,"
495481 " forward_rowid bigint NOT NULL,"
496482 " backward_rowid bigint NOT NULL"
516502 "alter_table", alter_table
517503 )
518504 except Exception as e:
519 logger.info("Failed to create port table: %s", e)
520
521 try:
522 yield self.postgres_store.runInteraction(
523 "create_port_table", create_port_table
524 )
525 except Exception as e:
526 logger.info("Failed to create port table: %s", e)
527
528 self.progress.set_state("Setting up")
529
530 # Set up tables.
505 pass
506
507 yield self.postgres_store.runInteraction(
508 "create_port_table", create_port_table
509 )
510
511 # Step 2. Get tables.
512 self.progress.set_state("Fetching tables")
513 sqlite_tables = yield self.sqlite_store._simple_select_onecol(
514 table="sqlite_master",
515 keyvalues={
516 "type": "table",
517 },
518 retcol="name",
519 )
520
521 postgres_tables = yield self.postgres_store._simple_select_onecol(
522 table="information_schema.tables",
523 keyvalues={},
524 retcol="distinct table_name",
525 )
526
527 tables = set(sqlite_tables) & set(postgres_tables)
528 logger.info("Found %d tables", len(tables))
529
530 # Step 3. Figure out what still needs copying
531 self.progress.set_state("Checking on port progress")
531532 setup_res = yield defer.gatherResults(
532533 [
533534 self.setup_table(table)
538539 consumeErrors=True,
539540 )
540541
541 # Process tables.
542 # Step 4. Do the copying.
543 self.progress.set_state("Copying to postgres")
542544 yield defer.gatherResults(
543545 [
544546 self.handle_table(*res)
546548 ],
547549 consumeErrors=True,
548550 )
551
552 # Step 5. Do final post-processing
553 yield self._setup_state_group_id_seq()
549554
550555 self.progress.done()
551556 except:
705710 done = int(done) if done else 0
706711
707712 defer.returnValue((done, remaining + done))
713
714 def _setup_state_group_id_seq(self):
715 def r(txn):
716 txn.execute("SELECT MAX(id) FROM state_groups")
717 next_id = txn.fetchone()[0]+1
718 txn.execute(
719 "ALTER SEQUENCE state_group_id_seq RESTART WITH %s",
720 (next_id,),
721 )
722 return self.postgres_store.runInteraction("setup_state_group_id_seq", r)
708723
709724
710725 ##############################################
1515 """ This is a reference implementation of a Matrix home server.
1616 """
1717
18 __version__ = "0.27.2"
18 __version__ = "0.27.3"
1414
1515 """Contains exceptions and error codes."""
1616
17 import json
1817 import logging
18
19 import simplejson as json
1920
2021 logger = logging.getLogger(__name__)
2122
3535 from synapse.util.manhole import manhole
3636 from synapse.util.versionstring import get_version_string
3737 from twisted.internet import reactor
38 from twisted.web.resource import Resource
38 from twisted.web.resource import NoResource
3939
4040 logger = logging.getLogger("synapse.app.appservice")
4141
6363 if name == "metrics":
6464 resources[METRICS_PREFIX] = MetricsResource(self)
6565
66 root_resource = create_resource_tree(resources, Resource())
66 root_resource = create_resource_tree(resources, NoResource())
6767
6868 _base.listen_tcp(
6969 bind_addresses,
4343 from synapse.util.manhole import manhole
4444 from synapse.util.versionstring import get_version_string
4545 from twisted.internet import reactor
46 from twisted.web.resource import Resource
46 from twisted.web.resource import NoResource
4747
4848 logger = logging.getLogger("synapse.app.client_reader")
4949
8787 "/_matrix/client/api/v1": resource,
8888 })
8989
90 root_resource = create_resource_tree(resources, Resource())
90 root_resource = create_resource_tree(resources, NoResource())
9191
9292 _base.listen_tcp(
9393 bind_addresses,
5151 from synapse.util.manhole import manhole
5252 from synapse.util.versionstring import get_version_string
5353 from twisted.internet import reactor
54 from twisted.web.resource import Resource
54 from twisted.web.resource import NoResource
5555
5656 logger = logging.getLogger("synapse.app.event_creator")
5757
103103 "/_matrix/client/api/v1": resource,
104104 })
105105
106 root_resource = create_resource_tree(resources, Resource())
106 root_resource = create_resource_tree(resources, NoResource())
107107
108108 _base.listen_tcp(
109109 bind_addresses,
4040 from synapse.util.manhole import manhole
4141 from synapse.util.versionstring import get_version_string
4242 from twisted.internet import reactor
43 from twisted.web.resource import Resource
43 from twisted.web.resource import NoResource
4444
4545 logger = logging.getLogger("synapse.app.federation_reader")
4646
7676 FEDERATION_PREFIX: TransportLayerServer(self),
7777 })
7878
79 root_resource = create_resource_tree(resources, Resource())
79 root_resource = create_resource_tree(resources, NoResource())
8080
8181 _base.listen_tcp(
8282 bind_addresses,
4141 from synapse.util.manhole import manhole
4242 from synapse.util.versionstring import get_version_string
4343 from twisted.internet import defer, reactor
44 from twisted.web.resource import Resource
44 from twisted.web.resource import NoResource
4545
4646 logger = logging.getLogger("synapse.app.federation_sender")
4747
9090 if name == "metrics":
9191 resources[METRICS_PREFIX] = MetricsResource(self)
9292
93 root_resource = create_resource_tree(resources, Resource())
93 root_resource = create_resource_tree(resources, NoResource())
9494
9595 _base.listen_tcp(
9696 bind_addresses,
4343 from synapse.util.manhole import manhole
4444 from synapse.util.versionstring import get_version_string
4545 from twisted.internet import defer, reactor
46 from twisted.web.resource import Resource
46 from twisted.web.resource import NoResource
4747
4848 logger = logging.getLogger("synapse.app.frontend_proxy")
4949
141141 "/_matrix/client/api/v1": resource,
142142 })
143143
144 root_resource = create_resource_tree(resources, Resource())
144 root_resource = create_resource_tree(resources, NoResource())
145145
146146 _base.listen_tcp(
147147 bind_addresses,
4747 from synapse.storage import are_all_users_on_domain
4848 from synapse.storage.engines import IncorrectDatabaseSetup, create_engine
4949 from synapse.storage.prepare_database import UpgradeDatabaseException, prepare_database
50 from synapse.util.caches import CACHE_SIZE_FACTOR
5051 from synapse.util.httpresourcetree import create_resource_tree
5152 from synapse.util.logcontext import LoggingContext
5253 from synapse.util.manhole import manhole
5556 from synapse.util.versionstring import get_version_string
5657 from twisted.application import service
5758 from twisted.internet import defer, reactor
58 from twisted.web.resource import EncodingResourceWrapper, Resource
59 from twisted.web.resource import EncodingResourceWrapper, NoResource
5960 from twisted.web.server import GzipEncoderFactory
6061 from twisted.web.static import File
6162
125126 if WEB_CLIENT_PREFIX in resources:
126127 root_resource = RootRedirect(WEB_CLIENT_PREFIX)
127128 else:
128 root_resource = Resource()
129 root_resource = NoResource()
129130
130131 root_resource = create_resource_tree(resources, root_resource)
131132
401402
402403 stats = {}
403404
405 # Contains the list of processes we will be monitoring
406 # currently either 0 or 1
407 stats_process = []
408
404409 @defer.inlineCallbacks
405410 def phone_stats_home():
406411 logger.info("Gathering stats for reporting")
424429 stats["daily_active_rooms"] = yield hs.get_datastore().count_daily_active_rooms()
425430 stats["daily_messages"] = yield hs.get_datastore().count_daily_messages()
426431
432 r30_results = yield hs.get_datastore().count_r30_users()
433 for name, count in r30_results.iteritems():
434 stats["r30_users_" + name] = count
435
427436 daily_sent_messages = yield hs.get_datastore().count_daily_sent_messages()
428437 stats["daily_sent_messages"] = daily_sent_messages
438 stats["cache_factor"] = CACHE_SIZE_FACTOR
439 stats["event_cache_size"] = hs.config.event_cache_size
440
441 if len(stats_process) > 0:
442 stats["memory_rss"] = 0
443 stats["cpu_average"] = 0
444 for process in stats_process:
445 stats["memory_rss"] += process.memory_info().rss
446 stats["cpu_average"] += int(process.cpu_percent(interval=None))
429447
430448 logger.info("Reporting stats to matrix.org: %s" % (stats,))
431449 try:
436454 except Exception as e:
437455 logger.warn("Error reporting stats: %s", e)
438456
457 def performance_stats_init():
458 try:
459 import psutil
460 process = psutil.Process()
461 # Ensure we can fetch both, and make the initial request for cpu_percent
462 # so the next request will use this as the initial point.
463 process.memory_info().rss
464 process.cpu_percent(interval=None)
465 logger.info("report_stats can use psutil")
466 stats_process.append(process)
467 except (ImportError, AttributeError):
468 logger.warn(
469 "report_stats enabled but psutil is not installed or incorrect version."
470 " Disabling reporting of memory/cpu stats."
471 " Ensuring psutil is available will help matrix.org track performance"
472 " changes across releases."
473 )
474
439475 if hs.config.report_stats:
440476 logger.info("Scheduling stats reporting for 3 hour intervals")
441477 clock.looping_call(phone_stats_home, 3 * 60 * 60 * 1000)
478
479 # We need to defer this init for the cases that we daemonize
480 # otherwise the process ID we get is that of the non-daemon process
481 clock.call_later(0, performance_stats_init)
442482
443483 # We wait 5 minutes to send the first set of stats as the server can
444484 # be quite busy the first few minutes
4242 from synapse.util.manhole import manhole
4343 from synapse.util.versionstring import get_version_string
4444 from twisted.internet import reactor
45 from twisted.web.resource import Resource
45 from twisted.web.resource import NoResource
4646
4747 logger = logging.getLogger("synapse.app.media_repository")
4848
8383 ),
8484 })
8585
86 root_resource = create_resource_tree(resources, Resource())
86 root_resource = create_resource_tree(resources, NoResource())
8787
8888 _base.listen_tcp(
8989 bind_addresses,
3636 from synapse.util.manhole import manhole
3737 from synapse.util.versionstring import get_version_string
3838 from twisted.internet import defer, reactor
39 from twisted.web.resource import Resource
39 from twisted.web.resource import NoResource
4040
4141 logger = logging.getLogger("synapse.app.pusher")
4242
9393 if name == "metrics":
9494 resources[METRICS_PREFIX] = MetricsResource(self)
9595
96 root_resource = create_resource_tree(resources, Resource())
96 root_resource = create_resource_tree(resources, NoResource())
9797
9898 _base.listen_tcp(
9999 bind_addresses,
5555 from synapse.util.stringutils import random_string
5656 from synapse.util.versionstring import get_version_string
5757 from twisted.internet import defer, reactor
58 from twisted.web.resource import Resource
58 from twisted.web.resource import NoResource
5959
6060 logger = logging.getLogger("synapse.app.synchrotron")
6161
268268 "/_matrix/client/api/v1": resource,
269269 })
270270
271 root_resource = create_resource_tree(resources, Resource())
271 root_resource = create_resource_tree(resources, NoResource())
272272
273273 _base.listen_tcp(
274274 bind_addresses,
3737 try:
3838 os.kill(pid, 0)
3939 return True
40 except OSError, err:
40 except OSError as err:
4141 if err.errno == errno.EPERM:
4242 return True
4343 return False
9797 try:
9898 os.kill(pid, signal.SIGTERM)
9999 write("stopped %s" % (app,), colour=GREEN)
100 except OSError, err:
100 except OSError as err:
101101 if err.errno == errno.ESRCH:
102102 write("%s not running" % (app,), colour=YELLOW)
103103 elif err.errno == errno.EPERM:
251251 for running_pid in running_pids:
252252 while pid_running(running_pid):
253253 time.sleep(0.2)
254 write("All processes exited; now restarting...")
254255
255256 if action == "start" or action == "restart":
256257 if start_stop_synapse:
4242 from synapse.util.manhole import manhole
4343 from synapse.util.versionstring import get_version_string
4444 from twisted.internet import reactor
45 from twisted.web.resource import Resource
45 from twisted.web.resource import NoResource
4646
4747 logger = logging.getLogger("synapse.app.user_dir")
4848
115115 "/_matrix/client/api/v1": resource,
116116 })
117117
118 root_resource = create_resource_tree(resources, Resource())
118 root_resource = create_resource_tree(resources, NoResource())
119119
120120 _base.listen_tcp(
121121 bind_addresses,
00 # -*- coding: utf-8 -*-
11 # Copyright 2014-2016 OpenMarket Ltd
2 # Copyright 2018 New Vector Ltd
23 #
34 # Licensed under the Apache License, Version 2.0 (the "License");
45 # you may not use this file except in compliance with the License.
1920 from synapse.util.logutils import log_function
2021
2122 import logging
23 import urllib
2224
2325
2426 logger = logging.getLogger(__name__)
4850 logger.debug("get_room_state dest=%s, room=%s",
4951 destination, room_id)
5052
51 path = PREFIX + "/state/%s/" % room_id
53 path = _create_path(PREFIX, "/state/%s/", room_id)
5254 return self.client.get_json(
5355 destination, path=path, args={"event_id": event_id},
5456 )
7072 logger.debug("get_room_state_ids dest=%s, room=%s",
7173 destination, room_id)
7274
73 path = PREFIX + "/state_ids/%s/" % room_id
75 path = _create_path(PREFIX, "/state_ids/%s/", room_id)
7476 return self.client.get_json(
7577 destination, path=path, args={"event_id": event_id},
7678 )
9294 logger.debug("get_pdu dest=%s, event_id=%s",
9395 destination, event_id)
9496
95 path = PREFIX + "/event/%s/" % (event_id, )
97 path = _create_path(PREFIX, "/event/%s/", event_id)
9698 return self.client.get_json(destination, path=path, timeout=timeout)
9799
98100 @log_function
118120 # TODO: raise?
119121 return
120122
121 path = PREFIX + "/backfill/%s/" % (room_id,)
123 path = _create_path(PREFIX, "/backfill/%s/", room_id)
122124
123125 args = {
124126 "v": event_tuples,
156158 # generated by the json_data_callback.
157159 json_data = transaction.get_dict()
158160
161 path = _create_path(PREFIX, "/send/%s/", transaction.transaction_id)
162
159163 response = yield self.client.put_json(
160164 transaction.destination,
161 path=PREFIX + "/send/%s/" % transaction.transaction_id,
165 path=path,
162166 data=json_data,
163167 json_data_callback=json_data_callback,
164168 long_retries=True,
176180 @log_function
177181 def make_query(self, destination, query_type, args, retry_on_dns_fail,
178182 ignore_backoff=False):
179 path = PREFIX + "/query/%s" % query_type
183 path = _create_path(PREFIX, "/query/%s", query_type)
180184
181185 content = yield self.client.get_json(
182186 destination=destination,
221225 "make_membership_event called with membership='%s', must be one of %s" %
222226 (membership, ",".join(valid_memberships))
223227 )
224 path = PREFIX + "/make_%s/%s/%s" % (membership, room_id, user_id)
228 path = _create_path(PREFIX, "/make_%s/%s/%s", membership, room_id, user_id)
225229
226230 ignore_backoff = False
227231 retry_on_dns_fail = False
247251 @defer.inlineCallbacks
248252 @log_function
249253 def send_join(self, destination, room_id, event_id, content):
250 path = PREFIX + "/send_join/%s/%s" % (room_id, event_id)
254 path = _create_path(PREFIX, "/send_join/%s/%s", room_id, event_id)
251255
252256 response = yield self.client.put_json(
253257 destination=destination,
260264 @defer.inlineCallbacks
261265 @log_function
262266 def send_leave(self, destination, room_id, event_id, content):
263 path = PREFIX + "/send_leave/%s/%s" % (room_id, event_id)
267 path = _create_path(PREFIX, "/send_leave/%s/%s", room_id, event_id)
264268
265269 response = yield self.client.put_json(
266270 destination=destination,
279283 @defer.inlineCallbacks
280284 @log_function
281285 def send_invite(self, destination, room_id, event_id, content):
282 path = PREFIX + "/invite/%s/%s" % (room_id, event_id)
286 path = _create_path(PREFIX, "/invite/%s/%s", room_id, event_id)
283287
284288 response = yield self.client.put_json(
285289 destination=destination,
321325 @defer.inlineCallbacks
322326 @log_function
323327 def exchange_third_party_invite(self, destination, room_id, event_dict):
324 path = PREFIX + "/exchange_third_party_invite/%s" % (room_id,)
328 path = _create_path(PREFIX, "/exchange_third_party_invite/%s", room_id,)
325329
326330 response = yield self.client.put_json(
327331 destination=destination,
334338 @defer.inlineCallbacks
335339 @log_function
336340 def get_event_auth(self, destination, room_id, event_id):
337 path = PREFIX + "/event_auth/%s/%s" % (room_id, event_id)
341 path = _create_path(PREFIX, "/event_auth/%s/%s", room_id, event_id)
338342
339343 content = yield self.client.get_json(
340344 destination=destination,
346350 @defer.inlineCallbacks
347351 @log_function
348352 def send_query_auth(self, destination, room_id, event_id, content):
349 path = PREFIX + "/query_auth/%s/%s" % (room_id, event_id)
353 path = _create_path(PREFIX, "/query_auth/%s/%s", room_id, event_id)
350354
351355 content = yield self.client.post_json(
352356 destination=destination,
408412 Returns:
409413 A dict containg the device keys.
410414 """
411 path = PREFIX + "/user/devices/" + user_id
415 path = _create_path(PREFIX, "/user/devices/%s", user_id)
412416
413417 content = yield self.client.get_json(
414418 destination=destination,
458462 @log_function
459463 def get_missing_events(self, destination, room_id, earliest_events,
460464 latest_events, limit, min_depth, timeout):
461 path = PREFIX + "/get_missing_events/%s" % (room_id,)
465 path = _create_path(PREFIX, "/get_missing_events/%s", room_id,)
462466
463467 content = yield self.client.post_json(
464468 destination=destination,
478482 def get_group_profile(self, destination, group_id, requester_user_id):
479483 """Get a group profile
480484 """
481 path = PREFIX + "/groups/%s/profile" % (group_id,)
485 path = _create_path(PREFIX, "/groups/%s/profile", group_id,)
482486
483487 return self.client.get_json(
484488 destination=destination,
497501 requester_user_id (str)
498502 content (dict): The new profile of the group
499503 """
500 path = PREFIX + "/groups/%s/profile" % (group_id,)
504 path = _create_path(PREFIX, "/groups/%s/profile", group_id,)
501505
502506 return self.client.post_json(
503507 destination=destination,
511515 def get_group_summary(self, destination, group_id, requester_user_id):
512516 """Get a group summary
513517 """
514 path = PREFIX + "/groups/%s/summary" % (group_id,)
518 path = _create_path(PREFIX, "/groups/%s/summary", group_id,)
515519
516520 return self.client.get_json(
517521 destination=destination,
524528 def get_rooms_in_group(self, destination, group_id, requester_user_id):
525529 """Get all rooms in a group
526530 """
527 path = PREFIX + "/groups/%s/rooms" % (group_id,)
531 path = _create_path(PREFIX, "/groups/%s/rooms", group_id,)
528532
529533 return self.client.get_json(
530534 destination=destination,
537541 content):
538542 """Add a room to a group
539543 """
540 path = PREFIX + "/groups/%s/room/%s" % (group_id, room_id,)
544 path = _create_path(PREFIX, "/groups/%s/room/%s", group_id, room_id,)
541545
542546 return self.client.post_json(
543547 destination=destination,
551555 config_key, content):
552556 """Update room in group
553557 """
554 path = PREFIX + "/groups/%s/room/%s/config/%s" % (group_id, room_id, config_key,)
558 path = _create_path(
559 PREFIX, "/groups/%s/room/%s/config/%s",
560 group_id, room_id, config_key,
561 )
555562
556563 return self.client.post_json(
557564 destination=destination,
564571 def remove_room_from_group(self, destination, group_id, requester_user_id, room_id):
565572 """Remove a room from a group
566573 """
567 path = PREFIX + "/groups/%s/room/%s" % (group_id, room_id,)
574 path = _create_path(PREFIX, "/groups/%s/room/%s", group_id, room_id,)
568575
569576 return self.client.delete_json(
570577 destination=destination,
577584 def get_users_in_group(self, destination, group_id, requester_user_id):
578585 """Get users in a group
579586 """
580 path = PREFIX + "/groups/%s/users" % (group_id,)
587 path = _create_path(PREFIX, "/groups/%s/users", group_id,)
581588
582589 return self.client.get_json(
583590 destination=destination,
590597 def get_invited_users_in_group(self, destination, group_id, requester_user_id):
591598 """Get users that have been invited to a group
592599 """
593 path = PREFIX + "/groups/%s/invited_users" % (group_id,)
600 path = _create_path(PREFIX, "/groups/%s/invited_users", group_id,)
594601
595602 return self.client.get_json(
596603 destination=destination,
603610 def accept_group_invite(self, destination, group_id, user_id, content):
604611 """Accept a group invite
605612 """
606 path = PREFIX + "/groups/%s/users/%s/accept_invite" % (group_id, user_id)
613 path = _create_path(
614 PREFIX, "/groups/%s/users/%s/accept_invite",
615 group_id, user_id,
616 )
617
618 return self.client.post_json(
619 destination=destination,
620 path=path,
621 data=content,
622 ignore_backoff=True,
623 )
624
625 @log_function
626 def join_group(self, destination, group_id, user_id, content):
627 """Attempts to join a group
628 """
629 path = _create_path(PREFIX, "/groups/%s/users/%s/join", group_id, user_id)
607630
608631 return self.client.post_json(
609632 destination=destination,
616639 def invite_to_group(self, destination, group_id, user_id, requester_user_id, content):
617640 """Invite a user to a group
618641 """
619 path = PREFIX + "/groups/%s/users/%s/invite" % (group_id, user_id)
642 path = _create_path(PREFIX, "/groups/%s/users/%s/invite", group_id, user_id)
620643
621644 return self.client.post_json(
622645 destination=destination,
632655 invited.
633656 """
634657
635 path = PREFIX + "/groups/local/%s/users/%s/invite" % (group_id, user_id)
658 path = _create_path(PREFIX, "/groups/local/%s/users/%s/invite", group_id, user_id)
636659
637660 return self.client.post_json(
638661 destination=destination,
646669 user_id, content):
647670 """Remove a user fron a group
648671 """
649 path = PREFIX + "/groups/%s/users/%s/remove" % (group_id, user_id)
672 path = _create_path(PREFIX, "/groups/%s/users/%s/remove", group_id, user_id)
650673
651674 return self.client.post_json(
652675 destination=destination,
663686 kicked from the group.
664687 """
665688
666 path = PREFIX + "/groups/local/%s/users/%s/remove" % (group_id, user_id)
689 path = _create_path(PREFIX, "/groups/local/%s/users/%s/remove", group_id, user_id)
667690
668691 return self.client.post_json(
669692 destination=destination,
678701 the attestations
679702 """
680703
681 path = PREFIX + "/groups/%s/renew_attestation/%s" % (group_id, user_id)
704 path = _create_path(PREFIX, "/groups/%s/renew_attestation/%s", group_id, user_id)
682705
683706 return self.client.post_json(
684707 destination=destination,
693716 """Update a room entry in a group summary
694717 """
695718 if category_id:
696 path = PREFIX + "/groups/%s/summary/categories/%s/rooms/%s" % (
719 path = _create_path(
720 PREFIX, "/groups/%s/summary/categories/%s/rooms/%s",
697721 group_id, category_id, room_id,
698722 )
699723 else:
700 path = PREFIX + "/groups/%s/summary/rooms/%s" % (group_id, room_id,)
724 path = _create_path(PREFIX, "/groups/%s/summary/rooms/%s", group_id, room_id,)
701725
702726 return self.client.post_json(
703727 destination=destination,
713737 """Delete a room entry in a group summary
714738 """
715739 if category_id:
716 path = PREFIX + "/groups/%s/summary/categories/%s/rooms/%s" % (
740 path = _create_path(
741 PREFIX + "/groups/%s/summary/categories/%s/rooms/%s",
717742 group_id, category_id, room_id,
718743 )
719744 else:
720 path = PREFIX + "/groups/%s/summary/rooms/%s" % (group_id, room_id,)
745 path = _create_path(PREFIX, "/groups/%s/summary/rooms/%s", group_id, room_id,)
721746
722747 return self.client.delete_json(
723748 destination=destination,
730755 def get_group_categories(self, destination, group_id, requester_user_id):
731756 """Get all categories in a group
732757 """
733 path = PREFIX + "/groups/%s/categories" % (group_id,)
758 path = _create_path(PREFIX, "/groups/%s/categories", group_id,)
734759
735760 return self.client.get_json(
736761 destination=destination,
743768 def get_group_category(self, destination, group_id, requester_user_id, category_id):
744769 """Get category info in a group
745770 """
746 path = PREFIX + "/groups/%s/categories/%s" % (group_id, category_id,)
771 path = _create_path(PREFIX, "/groups/%s/categories/%s", group_id, category_id,)
747772
748773 return self.client.get_json(
749774 destination=destination,
757782 content):
758783 """Update a category in a group
759784 """
760 path = PREFIX + "/groups/%s/categories/%s" % (group_id, category_id,)
785 path = _create_path(PREFIX, "/groups/%s/categories/%s", group_id, category_id,)
761786
762787 return self.client.post_json(
763788 destination=destination,
772797 category_id):
773798 """Delete a category in a group
774799 """
775 path = PREFIX + "/groups/%s/categories/%s" % (group_id, category_id,)
800 path = _create_path(PREFIX, "/groups/%s/categories/%s", group_id, category_id,)
776801
777802 return self.client.delete_json(
778803 destination=destination,
785810 def get_group_roles(self, destination, group_id, requester_user_id):
786811 """Get all roles in a group
787812 """
788 path = PREFIX + "/groups/%s/roles" % (group_id,)
813 path = _create_path(PREFIX, "/groups/%s/roles", group_id,)
789814
790815 return self.client.get_json(
791816 destination=destination,
798823 def get_group_role(self, destination, group_id, requester_user_id, role_id):
799824 """Get a roles info
800825 """
801 path = PREFIX + "/groups/%s/roles/%s" % (group_id, role_id,)
826 path = _create_path(PREFIX, "/groups/%s/roles/%s", group_id, role_id,)
802827
803828 return self.client.get_json(
804829 destination=destination,
812837 content):
813838 """Update a role in a group
814839 """
815 path = PREFIX + "/groups/%s/roles/%s" % (group_id, role_id,)
840 path = _create_path(PREFIX, "/groups/%s/roles/%s", group_id, role_id,)
816841
817842 return self.client.post_json(
818843 destination=destination,
826851 def delete_group_role(self, destination, group_id, requester_user_id, role_id):
827852 """Delete a role in a group
828853 """
829 path = PREFIX + "/groups/%s/roles/%s" % (group_id, role_id,)
854 path = _create_path(PREFIX, "/groups/%s/roles/%s", group_id, role_id,)
830855
831856 return self.client.delete_json(
832857 destination=destination,
841866 """Update a users entry in a group
842867 """
843868 if role_id:
844 path = PREFIX + "/groups/%s/summary/roles/%s/users/%s" % (
869 path = _create_path(
870 PREFIX, "/groups/%s/summary/roles/%s/users/%s",
845871 group_id, role_id, user_id,
846872 )
847873 else:
848 path = PREFIX + "/groups/%s/summary/users/%s" % (group_id, user_id,)
849
850 return self.client.post_json(
874 path = _create_path(PREFIX, "/groups/%s/summary/users/%s", group_id, user_id,)
875
876 return self.client.post_json(
877 destination=destination,
878 path=path,
879 args={"requester_user_id": requester_user_id},
880 data=content,
881 ignore_backoff=True,
882 )
883
884 @log_function
885 def set_group_join_policy(self, destination, group_id, requester_user_id,
886 content):
887 """Sets the join policy for a group
888 """
889 path = _create_path(PREFIX, "/groups/%s/settings/m.join_policy", group_id,)
890
891 return self.client.put_json(
851892 destination=destination,
852893 path=path,
853894 args={"requester_user_id": requester_user_id},
861902 """Delete a users entry in a group
862903 """
863904 if role_id:
864 path = PREFIX + "/groups/%s/summary/roles/%s/users/%s" % (
905 path = _create_path(
906 PREFIX, "/groups/%s/summary/roles/%s/users/%s",
865907 group_id, role_id, user_id,
866908 )
867909 else:
868 path = PREFIX + "/groups/%s/summary/users/%s" % (group_id, user_id,)
910 path = _create_path(PREFIX, "/groups/%s/summary/users/%s", group_id, user_id,)
869911
870912 return self.client.delete_json(
871913 destination=destination,
888930 data=content,
889931 ignore_backoff=True,
890932 )
933
934
935 def _create_path(prefix, path, *args):
936 """Creates a path from the prefix, path template and args. Ensures that
937 all args are url encoded.
938
939 Example:
940
941 _create_path(PREFIX, "/event/%s/", event_id)
942
943 Args:
944 prefix (str)
945 path (str): String template for the path
946 args: ([str]): Args to insert into path. Each arg will be url encoded
947
948 Returns:
949 str
950 """
951 return prefix + path % tuple(urllib.quote(arg, "") for arg in args)
00 # -*- coding: utf-8 -*-
11 # Copyright 2014-2016 OpenMarket Ltd
2 # Copyright 2018 New Vector Ltd
23 #
34 # Licensed under the Apache License, Version 2.0 (the "License");
45 # you may not use this file except in compliance with the License.
801802 defer.returnValue((200, new_content))
802803
803804
805 class FederationGroupsJoinServlet(BaseFederationServlet):
806 """Attempt to join a group
807 """
808 PATH = "/groups/(?P<group_id>[^/]*)/users/(?P<user_id>[^/]*)/join$"
809
810 @defer.inlineCallbacks
811 def on_POST(self, origin, content, query, group_id, user_id):
812 if get_domain_from_id(user_id) != origin:
813 raise SynapseError(403, "user_id doesn't match origin")
814
815 new_content = yield self.handler.join_group(
816 group_id, user_id, content,
817 )
818
819 defer.returnValue((200, new_content))
820
821
804822 class FederationGroupsRemoveUserServlet(BaseFederationServlet):
805823 """Leave or kick a user from the group
806824 """
11211139 )
11221140
11231141 defer.returnValue((200, resp))
1142
1143
1144 class FederationGroupsSettingJoinPolicyServlet(BaseFederationServlet):
1145 """Sets whether a group is joinable without an invite or knock
1146 """
1147 PATH = "/groups/(?P<group_id>[^/]*)/settings/m.join_policy$"
1148
1149 @defer.inlineCallbacks
1150 def on_PUT(self, origin, content, query, group_id):
1151 requester_user_id = parse_string_from_args(query, "requester_user_id")
1152 if get_domain_from_id(requester_user_id) != origin:
1153 raise SynapseError(403, "requester_user_id doesn't match origin")
1154
1155 new_content = yield self.handler.set_group_join_policy(
1156 group_id, requester_user_id, content
1157 )
1158
1159 defer.returnValue((200, new_content))
11241160
11251161
11261162 FEDERATION_SERVLET_CLASSES = (
11621198 FederationGroupsInvitedUsersServlet,
11631199 FederationGroupsInviteServlet,
11641200 FederationGroupsAcceptInviteServlet,
1201 FederationGroupsJoinServlet,
11651202 FederationGroupsRemoveUserServlet,
11661203 FederationGroupsSummaryRoomsServlet,
11671204 FederationGroupsCategoriesServlet,
11711208 FederationGroupsSummaryUsersServlet,
11721209 FederationGroupsAddRoomsServlet,
11731210 FederationGroupsAddRoomsConfigServlet,
1211 FederationGroupsSettingJoinPolicyServlet,
11741212 )
11751213
11761214
00 # -*- coding: utf-8 -*-
11 # Copyright 2017 Vector Creations Ltd
2 # Copyright 2018 New Vector Ltd
23 #
34 # Licensed under the Apache License, Version 2.0 (the "License");
45 # you may not use this file except in compliance with the License.
205206 defer.returnValue({})
206207
207208 @defer.inlineCallbacks
209 def set_group_join_policy(self, group_id, requester_user_id, content):
210 """Sets the group join policy.
211
212 Currently supported policies are:
213 - "invite": an invite must be received and accepted in order to join.
214 - "open": anyone can join.
215 """
216 yield self.check_group_is_ours(
217 group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id
218 )
219
220 join_policy = _parse_join_policy_from_contents(content)
221 if join_policy is None:
222 raise SynapseError(
223 400, "No value specified for 'm.join_policy'"
224 )
225
226 yield self.store.set_group_join_policy(group_id, join_policy=join_policy)
227
228 defer.returnValue({})
229
230 @defer.inlineCallbacks
208231 def get_group_categories(self, group_id, requester_user_id):
209232 """Get all categories in a group (as seen by user)
210233 """
380403
381404 yield self.check_group_is_ours(group_id, requester_user_id)
382405
383 group_description = yield self.store.get_group(group_id)
384
385 if group_description:
406 group = yield self.store.get_group(group_id)
407
408 if group:
409 cols = [
410 "name", "short_description", "long_description",
411 "avatar_url", "is_public",
412 ]
413 group_description = {key: group[key] for key in cols}
414 group_description["is_openly_joinable"] = group["join_policy"] == "open"
415
386416 defer.returnValue(group_description)
387417 else:
388418 raise SynapseError(404, "Unknown group")
654684 raise SynapseError(502, "Unknown state returned by HS")
655685
656686 @defer.inlineCallbacks
657 def accept_invite(self, group_id, requester_user_id, content):
658 """User tries to accept an invite to the group.
659
660 This is different from them asking to join, and so should error if no
661 invite exists (and they're not a member of the group)
662 """
663
664 yield self.check_group_is_ours(group_id, requester_user_id, and_exists=True)
665
666 is_invited = yield self.store.is_user_invited_to_local_group(
667 group_id, requester_user_id,
668 )
669 if not is_invited:
670 raise SynapseError(403, "User not invited to group")
671
672 if not self.hs.is_mine_id(requester_user_id):
687 def _add_user(self, group_id, user_id, content):
688 """Add a user to a group based on a content dict.
689
690 See accept_invite, join_group.
691 """
692 if not self.hs.is_mine_id(user_id):
673693 local_attestation = self.attestations.create_attestation(
674 group_id, requester_user_id,
675 )
694 group_id, user_id,
695 )
696
676697 remote_attestation = content["attestation"]
677698
678699 yield self.attestations.verify_attestation(
679700 remote_attestation,
680 user_id=requester_user_id,
701 user_id=user_id,
681702 group_id=group_id,
682703 )
683704 else:
687708 is_public = _parse_visibility_from_contents(content)
688709
689710 yield self.store.add_user_to_group(
690 group_id, requester_user_id,
711 group_id, user_id,
691712 is_admin=False,
692713 is_public=is_public,
693714 local_attestation=local_attestation,
694715 remote_attestation=remote_attestation,
695716 )
717
718 defer.returnValue(local_attestation)
719
720 @defer.inlineCallbacks
721 def accept_invite(self, group_id, requester_user_id, content):
722 """User tries to accept an invite to the group.
723
724 This is different from them asking to join, and so should error if no
725 invite exists (and they're not a member of the group)
726 """
727
728 yield self.check_group_is_ours(group_id, requester_user_id, and_exists=True)
729
730 is_invited = yield self.store.is_user_invited_to_local_group(
731 group_id, requester_user_id,
732 )
733 if not is_invited:
734 raise SynapseError(403, "User not invited to group")
735
736 local_attestation = yield self._add_user(group_id, requester_user_id, content)
737
738 defer.returnValue({
739 "state": "join",
740 "attestation": local_attestation,
741 })
742
743 @defer.inlineCallbacks
744 def join_group(self, group_id, requester_user_id, content):
745 """User tries to join the group.
746
747 This will error if the group requires an invite/knock to join
748 """
749
750 group_info = yield self.check_group_is_ours(
751 group_id, requester_user_id, and_exists=True
752 )
753 if group_info['join_policy'] != "open":
754 raise SynapseError(403, "Group is not publicly joinable")
755
756 local_attestation = yield self._add_user(group_id, requester_user_id, content)
696757
697758 defer.returnValue({
698759 "state": "join",
834895 })
835896
836897
898 def _parse_join_policy_from_contents(content):
899 """Given a content for a request, return the specified join policy or None
900 """
901
902 join_policy_dict = content.get("m.join_policy")
903 if join_policy_dict:
904 return _parse_join_policy_dict(join_policy_dict)
905 else:
906 return None
907
908
909 def _parse_join_policy_dict(join_policy_dict):
910 """Given a dict for the "m.join_policy" config return the join policy specified
911 """
912 join_policy_type = join_policy_dict.get("type")
913 if not join_policy_type:
914 return "invite"
915
916 if join_policy_type not in ("invite", "open"):
917 raise SynapseError(
918 400, "Synapse only supports 'invite'/'open' join rule"
919 )
920 return join_policy_type
921
922
837923 def _parse_visibility_from_contents(content):
838924 """Given a content for a request parse out whether the entity should be
839925 public or not
154154
155155 try:
156156 yield self.store.delete_device(user_id, device_id)
157 except errors.StoreError, e:
157 except errors.StoreError as e:
158158 if e.code == 404:
159159 # no match
160160 pass
203203
204204 try:
205205 yield self.store.delete_devices(user_id, device_ids)
206 except errors.StoreError, e:
206 except errors.StoreError as e:
207207 if e.code == 404:
208208 # no match
209209 pass
242242 new_display_name=content.get("display_name")
243243 )
244244 yield self.notify_device_update(user_id, [device_id])
245 except errors.StoreError, e:
245 except errors.StoreError as e:
246246 if e.code == 404:
247247 raise errors.NotFoundError()
248248 else:
00 # -*- coding: utf-8 -*-
11 # Copyright 2016 OpenMarket Ltd
2 # Copyright 2018 New Vector Ltd
23 #
34 # Licensed under the Apache License, Version 2.0 (the "License");
45 # you may not use this file except in compliance with the License.
133134 if user_id in destination_query:
134135 results[user_id] = keys
135136
136 except CodeMessageException as e:
137 failures[destination] = {
138 "status": e.code, "message": e.message
139 }
140 except NotRetryingDestination as e:
141 failures[destination] = {
142 "status": 503, "message": "Not ready for retry",
143 }
144 except FederationDeniedError as e:
145 failures[destination] = {
146 "status": 403, "message": "Federation Denied",
147 }
148137 except Exception as e:
149 # include ConnectionRefused and other errors
150 failures[destination] = {
151 "status": 503, "message": e.message
152 }
138 failures[destination] = _exception_to_failure(e)
153139
154140 yield make_deferred_yieldable(defer.gatherResults([
155141 preserve_fn(do_remote_query)(destination)
251237 for user_id, keys in remote_result["one_time_keys"].items():
252238 if user_id in device_keys:
253239 json_result[user_id] = keys
254 except CodeMessageException as e:
255 failures[destination] = {
256 "status": e.code, "message": e.message
257 }
258 except NotRetryingDestination as e:
259 failures[destination] = {
260 "status": 503, "message": "Not ready for retry",
261 }
262240 except Exception as e:
263 # include ConnectionRefused and other errors
264 failures[destination] = {
265 "status": 503, "message": e.message
266 }
241 failures[destination] = _exception_to_failure(e)
267242
268243 yield make_deferred_yieldable(defer.gatherResults([
269244 preserve_fn(claim_client_keys)(destination)
361336 )
362337
363338
339 def _exception_to_failure(e):
340 if isinstance(e, CodeMessageException):
341 return {
342 "status": e.code, "message": e.message,
343 }
344
345 if isinstance(e, NotRetryingDestination):
346 return {
347 "status": 503, "message": "Not ready for retry",
348 }
349
350 if isinstance(e, FederationDeniedError):
351 return {
352 "status": 403, "message": "Federation Denied",
353 }
354
355 # include ConnectionRefused and other errors
356 #
357 # Note that some Exceptions (notably twisted's ResponseFailed etc) don't
358 # give a string for e.message, which simplejson then fails to serialize.
359 return {
360 "status": 503, "message": str(e.message),
361 }
362
363
364364 def _one_time_keys_match(old_key_json, new_key):
365365 old_key = json.loads(old_key_json)
366366
00 # -*- coding: utf-8 -*-
11 # Copyright 2017 Vector Creations Ltd
2 # Copyright 2018 New Vector Ltd
23 #
34 # Licensed under the Apache License, Version 2.0 (the "License");
45 # you may not use this file except in compliance with the License.
8889 delete_group_role = _create_rerouter("delete_group_role")
8990 get_group_role = _create_rerouter("get_group_role")
9091 get_group_roles = _create_rerouter("get_group_roles")
92
93 set_group_join_policy = _create_rerouter("set_group_join_policy")
9194
9295 @defer.inlineCallbacks
9396 def get_group_summary(self, group_id, requester_user_id):
225228 def join_group(self, group_id, user_id, content):
226229 """Request to join a group
227230 """
228 raise NotImplementedError() # TODO
231 if self.is_mine_id(group_id):
232 yield self.groups_server_handler.join_group(
233 group_id, user_id, content
234 )
235 local_attestation = None
236 remote_attestation = None
237 else:
238 local_attestation = self.attestations.create_attestation(group_id, user_id)
239 content["attestation"] = local_attestation
240
241 res = yield self.transport_client.join_group(
242 get_domain_from_id(group_id), group_id, user_id, content,
243 )
244
245 remote_attestation = res["attestation"]
246
247 yield self.attestations.verify_attestation(
248 remote_attestation,
249 group_id=group_id,
250 user_id=user_id,
251 server_name=get_domain_from_id(group_id),
252 )
253
254 # TODO: Check that the group is public and we're being added publically
255 is_publicised = content.get("publicise", False)
256
257 token = yield self.store.register_user_group_membership(
258 group_id, user_id,
259 membership="join",
260 is_admin=False,
261 local_attestation=local_attestation,
262 remote_attestation=remote_attestation,
263 is_publicised=is_publicised,
264 )
265 self.notifier.on_new_event(
266 "groups_key", token, users=[user_id],
267 )
268
269 defer.returnValue({})
229270
230271 @defer.inlineCallbacks
231272 def accept_invite(self, group_id, user_id, content):
1414 # limitations under the License.
1515
1616 """Utilities for interacting with Identity Servers"""
17
18 import logging
19
20 import simplejson as json
21
1722 from twisted.internet import defer
1823
1924 from synapse.api.errors import (
2227 from ._base import BaseHandler
2328 from synapse.util.async import run_on_reactor
2429 from synapse.api.errors import SynapseError, Codes
25
26 import json
27 import logging
2830
2931 logger = logging.getLogger(__name__)
3032
2626 from synapse.util.async import run_on_reactor, ReadWriteLock, Limiter
2727 from synapse.util.logcontext import preserve_fn, run_in_background
2828 from synapse.util.metrics import measure_func
29 from synapse.util.frozenutils import unfreeze
29 from synapse.util.frozenutils import frozendict_json_encoder
3030 from synapse.util.stringutils import random_string
3131 from synapse.visibility import filter_events_for_client
3232 from synapse.replication.http.send_event import send_event_to_master
677677
678678 # Ensure that we can round trip before trying to persist in db
679679 try:
680 dump = simplejson.dumps(unfreeze(event.content))
680 dump = frozendict_json_encoder.encode(event.content)
681681 simplejson.loads(dump)
682682 except Exception:
683683 logger.exception("Failed to encode content: %r", event.content)
2323 from synapse.http.client import CaptchaServerHttpClient
2424 from synapse import types
2525 from synapse.types import UserID
26 from synapse.util.async import run_on_reactor
26 from synapse.util.async import run_on_reactor, Linearizer
2727 from synapse.util.threepids import check_3pid_allowed
2828 from ._base import BaseHandler
2929
4444 self._next_generated_user_id = None
4545
4646 self.macaroon_gen = hs.get_macaroon_generator()
47
48 self._generate_user_id_linearizer = Linearizer(
49 name="_generate_user_id_linearizer",
50 )
4751
4852 @defer.inlineCallbacks
4953 def check_username(self, localpart, guest_access_token=None,
344348 @defer.inlineCallbacks
345349 def _generate_user_id(self, reseed=False):
346350 if reseed or self._next_generated_user_id is None:
347 self._next_generated_user_id = (
348 yield self.store.find_next_generated_user_id_localpart()
349 )
351 with (yield self._generate_user_id_linearizer.queue(())):
352 if reseed or self._next_generated_user_id is None:
353 self._next_generated_user_id = (
354 yield self.store.find_next_generated_user_id_localpart()
355 )
350356
351357 id = self._next_generated_user_id
352358 self._next_generated_user_id += 1
285285 headers_dict[b"Authorization"] = auth_headers
286286
287287 @defer.inlineCallbacks
288 def put_json(self, destination, path, data={}, json_data_callback=None,
288 def put_json(self, destination, path, args={}, data={},
289 json_data_callback=None,
289290 long_retries=False, timeout=None,
290291 ignore_backoff=False,
291292 backoff_on_404=False):
295296 destination (str): The remote server to send the HTTP request
296297 to.
297298 path (str): The HTTP path.
299 args (dict): query params
298300 data (dict): A dict containing the data that will be used as
299301 the request body. This will be encoded as JSON.
300302 json_data_callback (callable): A callable returning the dict to
341343 path,
342344 body_callback=body_callback,
343345 headers_dict={"Content-Type": ["application/json"]},
346 query_bytes=encode_query_args(args),
344347 long_retries=long_retries,
345348 timeout=timeout,
346349 ignore_backoff=ignore_backoff,
372375 giving up. None indicates no timeout.
373376 ignore_backoff (bool): true to ignore the historical backoff data and
374377 try the request anyway.
378 args (dict): query params
375379 Returns:
376380 Deferred: Succeeds when we get a 2xx HTTP response. The result
377381 will be the decoded JSON body.
110110 # seconds spent waiting for a db connection, when processing this request
111111 response_db_sched_duration = metrics.register_counter(
112112 "response_db_sched_duration_seconds", labels=["method", "servlet", "tag"]
113 )
114
115 # size in bytes of the response written
116 response_size = metrics.register_counter(
117 "response_size", labels=["method", "servlet", "tag"]
113118 )
114119
115120 _next_request_id = 0
425430 context.db_sched_duration_ms / 1000., request.method, self.name, tag
426431 )
427432
433 response_size.inc_by(request.sentLength, request.method, self.name, tag)
434
428435
429436 class RootRedirect(resource.Resource):
430437 """Redirects the root '/' path to another path."""
487494 request.setHeader(b"Content-Type", b"application/json")
488495 request.setHeader(b"Server", version_string)
489496 request.setHeader(b"Content-Length", b"%d" % (len(json_bytes),))
497 request.setHeader(b"Cache-Control", b"no-cache, no-store, must-revalidate")
490498
491499 if send_cors:
492500 set_cors_headers(request)
3333 "bcrypt": ["bcrypt>=3.1.0"],
3434 "pillow": ["PIL"],
3535 "pydenticon": ["pydenticon"],
36 "ujson": ["ujson"],
3736 "blist": ["blist"],
3837 "pysaml2>=3.0.0": ["saml2>=3.0.0"],
3938 "pymacaroons-pynacl": ["pymacaroons"],
2323
2424 logger = logging.getLogger(__name__)
2525
26 _json_encoder = simplejson.JSONEncoder(namedtuple_as_object=False)
27
2628
2729 class Command(object):
2830 """The base command class.
106108 return " ".join((
107109 self.stream_name,
108110 str(self.token) if self.token is not None else "batch",
109 simplejson.dumps(self.row, namedtuple_as_object=False),
111 _json_encoder.encode(self.row),
110112 ))
111113
112114
301303
302304 def to_line(self):
303305 return " ".join((
304 self.cache_func, simplejson.dumps(self.keys, namedtuple_as_object=False)
306 self.cache_func, _json_encoder.encode(self.keys),
305307 ))
306308
307309
333335 )
334336
335337 def to_line(self):
336 return self.user_id + " " + simplejson.dumps((
338 return self.user_id + " " + _json_encoder.encode((
337339 self.access_token, self.ip, self.user_agent, self.device_id,
338340 self.last_seen,
339341 ))
654654 content=event_content,
655655 )
656656
657 defer.returnValue((200, {}))
657 return_value = {}
658
659 if membership_action == "join":
660 return_value["room_id"] = room_id
661
662 defer.returnValue((200, return_value))
658663
659664 def _has_3pid_invite_keys(self, content):
660665 for key in {"id_server", "medium", "address"}:
00 # -*- coding: utf-8 -*-
11 # Copyright 2017 Vector Creations Ltd
2 # Copyright 2018 New Vector Ltd
23 #
34 # Licensed under the Apache License, Version 2.0 (the "License");
45 # you may not use this file except in compliance with the License.
400401 defer.returnValue((200, result))
401402
402403
404 class GroupSettingJoinPolicyServlet(RestServlet):
405 """Set group join policy
406 """
407 PATTERNS = client_v2_patterns("/groups/(?P<group_id>[^/]*)/settings/m.join_policy$")
408
409 def __init__(self, hs):
410 super(GroupSettingJoinPolicyServlet, self).__init__()
411 self.auth = hs.get_auth()
412 self.groups_handler = hs.get_groups_local_handler()
413
414 @defer.inlineCallbacks
415 def on_PUT(self, request, group_id):
416 requester = yield self.auth.get_user_by_req(request)
417 requester_user_id = requester.user.to_string()
418
419 content = parse_json_object_from_request(request)
420
421 result = yield self.groups_handler.set_group_join_policy(
422 group_id,
423 requester_user_id,
424 content,
425 )
426
427 defer.returnValue((200, result))
428
429
403430 class GroupCreateServlet(RestServlet):
404431 """Create a group
405432 """
737764 GroupInvitedUsersServlet(hs).register(http_server)
738765 GroupUsersServlet(hs).register(http_server)
739766 GroupRoomServlet(hs).register(http_server)
767 GroupSettingJoinPolicyServlet(hs).register(http_server)
740768 GroupCreateServlet(hs).register(http_server)
741769 GroupAdminRoomsServlet(hs).register(http_server)
742770 GroupAdminRoomsConfigServlet(hs).register(http_server)
131131
132132 state_map = yield self.store.get_events(state.values(), get_prev_content=False)
133133 state = {
134 key: state_map[e_id] for key, e_id in state.items() if e_id in state_map
134 key: state_map[e_id] for key, e_id in state.iteritems() if e_id in state_map
135135 }
136136
137137 defer.returnValue(state)
377377 new_state = resolve_events_with_state_map(state_set_ids, state_map)
378378
379379 new_state = {
380 key: state_map[ev_id] for key, ev_id in new_state.items()
380 key: state_map[ev_id] for key, ev_id in new_state.iteritems()
381381 }
382382
383383 return new_state
457457 # build a map from state key to the event_ids which set that state.
458458 # dict[(str, str), set[str])
459459 state = {}
460 for st in state_groups_ids.values():
461 for key, e_id in st.items():
460 for st in state_groups_ids.itervalues():
461 for key, e_id in st.iteritems():
462462 state.setdefault(key, set()).add(e_id)
463463
464464 # build a map from state key to the event_ids which set that state,
465465 # including only those where there are state keys in conflict.
466466 conflicted_state = {
467467 k: list(v)
468 for k, v in state.items()
468 for k, v in state.iteritems()
469469 if len(v) > 1
470470 }
471471
479479 )
480480 else:
481481 new_state = {
482 key: e_ids.pop() for key, e_ids in state.items()
482 key: e_ids.pop() for key, e_ids in state.iteritems()
483483 }
484484
485 # if the new state matches any of the input state groups, we can
486 # use that state group again. Otherwise we will generate a state_id
487 # which will be used as a cache key for future resolutions, but
488 # not get persisted.
489 state_group = None
490 new_state_event_ids = frozenset(new_state.values())
491 for sg, events in state_groups_ids.items():
492 if new_state_event_ids == frozenset(e_id for e_id in events):
493 state_group = sg
494 break
495
496 # TODO: We want to create a state group for this set of events, to
497 # increase cache hits, but we need to make sure that it doesn't
498 # end up as a prev_group without being added to the database
499
500 prev_group = None
501 delta_ids = None
502 for old_group, old_ids in state_groups_ids.iteritems():
503 if not set(new_state) - set(old_ids):
504 n_delta_ids = {
505 k: v
506 for k, v in new_state.iteritems()
507 if old_ids.get(k) != v
508 }
509 if not delta_ids or len(n_delta_ids) < len(delta_ids):
510 prev_group = old_group
511 delta_ids = n_delta_ids
485 with Measure(self.clock, "state.create_group_ids"):
486 # if the new state matches any of the input state groups, we can
487 # use that state group again. Otherwise we will generate a state_id
488 # which will be used as a cache key for future resolutions, but
489 # not get persisted.
490 state_group = None
491 new_state_event_ids = frozenset(new_state.itervalues())
492 for sg, events in state_groups_ids.iteritems():
493 if new_state_event_ids == frozenset(e_id for e_id in events):
494 state_group = sg
495 break
496
497 # TODO: We want to create a state group for this set of events, to
498 # increase cache hits, but we need to make sure that it doesn't
499 # end up as a prev_group without being added to the database
500
501 prev_group = None
502 delta_ids = None
503 for old_group, old_ids in state_groups_ids.iteritems():
504 if not set(new_state) - set(old_ids):
505 n_delta_ids = {
506 k: v
507 for k, v in new_state.iteritems()
508 if old_ids.get(k) != v
509 }
510 if not delta_ids or len(n_delta_ids) < len(delta_ids):
511 prev_group = old_group
512 delta_ids = n_delta_ids
512513
513514 cache = _StateCacheEntry(
514515 state=new_state,
701702
702703 auth_events = {
703704 key: state_map[ev_id]
704 for key, ev_id in auth_event_ids.items()
705 for key, ev_id in auth_event_ids.iteritems()
705706 if ev_id in state_map
706707 }
707708
739740
740741 auth_events.update(resolved_state)
741742
742 for key, events in conflicted_state.items():
743 for key, events in conflicted_state.iteritems():
743744 if key[0] == EventTypes.JoinRules:
744745 logger.debug("Resolving conflicted join rules %r", events)
745746 resolved_state[key] = _resolve_auth_events(
749750
750751 auth_events.update(resolved_state)
751752
752 for key, events in conflicted_state.items():
753 for key, events in conflicted_state.iteritems():
753754 if key[0] == EventTypes.Member:
754755 logger.debug("Resolving conflicted member lists %r", events)
755756 resolved_state[key] = _resolve_auth_events(
759760
760761 auth_events.update(resolved_state)
761762
762 for key, events in conflicted_state.items():
763 for key, events in conflicted_state.iteritems():
763764 if key not in resolved_state:
764765 logger.debug("Resolving conflicted state %r:%r", key, events)
765766 resolved_state[key] = _resolve_normal_events(
1212 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1313 # See the License for the specific language governing permissions and
1414 # limitations under the License.
15
16 from twisted.internet import defer
1715
1816 from synapse.storage.devices import DeviceStore
1917 from .appservice import (
243241
244242 return [UserPresenceState(**row) for row in rows]
245243
246 @defer.inlineCallbacks
247244 def count_daily_users(self):
248245 """
249246 Counts the number of users who used this homeserver in the last 24 hours.
250247 """
251248 def _count_users(txn):
252 yesterday = int(self._clock.time_msec()) - (1000 * 60 * 60 * 24),
249 yesterday = int(self._clock.time_msec()) - (1000 * 60 * 60 * 24)
253250
254251 sql = """
255252 SELECT COALESCE(count(*), 0) FROM (
263260 count, = txn.fetchone()
264261 return count
265262
266 ret = yield self.runInteraction("count_users", _count_users)
267 defer.returnValue(ret)
263 return self.runInteraction("count_users", _count_users)
264
265 def count_r30_users(self):
266 """
267 Counts the number of 30 day retained users, defined as:-
268 * Users who have created their accounts more than 30 days
269 * Where last seen at most 30 days ago
270 * Where account creation and last_seen are > 30 days
271
272 Returns counts globaly for a given user as well as breaking
273 by platform
274 """
275 def _count_r30_users(txn):
276 thirty_days_in_secs = 86400 * 30
277 now = int(self._clock.time())
278 thirty_days_ago_in_secs = now - thirty_days_in_secs
279
280 sql = """
281 SELECT platform, COALESCE(count(*), 0) FROM (
282 SELECT
283 users.name, platform, users.creation_ts * 1000,
284 MAX(uip.last_seen)
285 FROM users
286 INNER JOIN (
287 SELECT
288 user_id,
289 last_seen,
290 CASE
291 WHEN user_agent LIKE '%%Android%%' THEN 'android'
292 WHEN user_agent LIKE '%%iOS%%' THEN 'ios'
293 WHEN user_agent LIKE '%%Electron%%' THEN 'electron'
294 WHEN user_agent LIKE '%%Mozilla%%' THEN 'web'
295 WHEN user_agent LIKE '%%Gecko%%' THEN 'web'
296 ELSE 'unknown'
297 END
298 AS platform
299 FROM user_ips
300 ) uip
301 ON users.name = uip.user_id
302 AND users.appservice_id is NULL
303 AND users.creation_ts < ?
304 AND uip.last_seen/1000 > ?
305 AND (uip.last_seen/1000) - users.creation_ts > 86400 * 30
306 GROUP BY users.name, platform, users.creation_ts
307 ) u GROUP BY platform
308 """
309
310 results = {}
311 txn.execute(sql, (thirty_days_ago_in_secs,
312 thirty_days_ago_in_secs))
313
314 for row in txn:
315 if row[0] is 'unknown':
316 pass
317 results[row[0]] = row[1]
318
319 sql = """
320 SELECT COALESCE(count(*), 0) FROM (
321 SELECT users.name, users.creation_ts * 1000,
322 MAX(uip.last_seen)
323 FROM users
324 INNER JOIN (
325 SELECT
326 user_id,
327 last_seen
328 FROM user_ips
329 ) uip
330 ON users.name = uip.user_id
331 AND appservice_id is NULL
332 AND users.creation_ts < ?
333 AND uip.last_seen/1000 > ?
334 AND (uip.last_seen/1000) - users.creation_ts > 86400 * 30
335 GROUP BY users.name, users.creation_ts
336 ) u
337 """
338
339 txn.execute(sql, (thirty_days_ago_in_secs,
340 thirty_days_ago_in_secs))
341
342 count, = txn.fetchone()
343 results['all'] = count
344
345 return results
346
347 return self.runInteraction("count_r30_users", _count_r30_users)
268348
269349 def get_users(self):
270350 """Function to reterive a list of users in users table.
4545 index_name="user_ips_device_id",
4646 table="user_ips",
4747 columns=["user_id", "device_id", "last_seen"],
48 )
49
50 self.register_background_index_update(
51 "user_ips_last_seen_index",
52 index_name="user_ips_last_seen",
53 table="user_ips",
54 columns=["user_id", "last_seen"],
4855 )
4956
5057 # (user_id, access_token, ip) -> (user_agent, device_id, last_seen)
1313 # See the License for the specific language governing permissions and
1414 # limitations under the License.
1515
16 from collections import OrderedDict, deque, namedtuple
17 from functools import wraps
18 import logging
19
20 import simplejson as json
21 from twisted.internet import defer
22
23
1624 from synapse.storage.events_worker import EventsWorkerStore
17
18 from twisted.internet import defer
19
20 from synapse.events import USE_FROZEN_DICTS
21
2225 from synapse.util.async import ObservableDeferred
26 from synapse.util.frozenutils import frozendict_json_encoder
2327 from synapse.util.logcontext import (
24 PreserveLoggingContext, make_deferred_yieldable
28 PreserveLoggingContext, make_deferred_yieldable,
2529 )
2630 from synapse.util.logutils import log_function
2731 from synapse.util.metrics import Measure
2933 from synapse.api.errors import SynapseError
3034 from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
3135 from synapse.types import get_domain_from_id
32
33 from canonicaljson import encode_canonical_json
34 from collections import deque, namedtuple, OrderedDict
35 from functools import wraps
36
3736 import synapse.metrics
38
39 import logging
40 import simplejson as json
4137
4238 # these are only included to make the type annotations work
4339 from synapse.events import EventBase # noqa: F401
5248 "persisted_events_sep", labels=["type", "origin_type", "origin_entity"]
5349 )
5450
51 # The number of times we are recalculating the current state
52 state_delta_counter = metrics.register_counter(
53 "state_delta",
54 )
55 # The number of times we are recalculating state when there is only a
56 # single forward extremity
57 state_delta_single_event_counter = metrics.register_counter(
58 "state_delta_single_event",
59 )
60 # The number of times we are reculating state when we could have resonably
61 # calculated the delta when we calculated the state for an event we were
62 # persisting.
63 state_delta_reuse_delta_counter = metrics.register_counter(
64 "state_delta_reuse_delta",
65 )
66
5567
5668 def encode_json(json_object):
57 if USE_FROZEN_DICTS:
58 return encode_canonical_json(json_object)
59 else:
60 return json.dumps(json_object, ensure_ascii=False)
69 return frozendict_json_encoder.encode(json_object)
6170
6271
6372 class _EventPeristenceQueue(object):
367376 room_id, ev_ctx_rm, latest_event_ids
368377 )
369378
370 if new_latest_event_ids == set(latest_event_ids):
379 latest_event_ids = set(latest_event_ids)
380 if new_latest_event_ids == latest_event_ids:
371381 # No change in extremities, so no change in state
372382 continue
373383
387397 # a long chain of single ancestor non-state events.
388398 if all_single_prev_not_state:
389399 continue
400
401 state_delta_counter.inc()
402 if len(new_latest_event_ids) == 1:
403 state_delta_single_event_counter.inc()
404
405 # This is a fairly handwavey check to see if we could
406 # have guessed what the delta would have been when
407 # processing one of these events.
408 # What we're interested in is if the latest extremities
409 # were the same when we created the event as they are
410 # now. When this server creates a new event (as opposed
411 # to receiving it over federation) it will use the
412 # forward extremities as the prev_events, so we can
413 # guess this by looking at the prev_events and checking
414 # if they match the current forward extremities.
415 for ev, _ in ev_ctx_rm:
416 prev_event_ids = set(e for e, _ in ev.prev_events)
417 if latest_event_ids == prev_event_ids:
418 state_delta_reuse_delta_counter.inc()
419 break
390420
391421 logger.info(
392422 "Calculating state delta for room %s", room_id,
00 # -*- coding: utf-8 -*-
11 # Copyright 2017 Vector Creations Ltd
2 # Copyright 2018 New Vector Ltd
23 #
34 # Licensed under the Apache License, Version 2.0 (the "License");
45 # you may not use this file except in compliance with the License.
1819
1920 from ._base import SQLBaseStore
2021
21 import ujson as json
22 import simplejson as json
2223
2324
2425 # The category ID for the "default" category. We don't store as null in the
2829
2930
3031 class GroupServerStore(SQLBaseStore):
32 def set_group_join_policy(self, group_id, join_policy):
33 """Set the join policy of a group.
34
35 join_policy can be one of:
36 * "invite"
37 * "open"
38 """
39 return self._simple_update_one(
40 table="groups",
41 keyvalues={
42 "group_id": group_id,
43 },
44 updatevalues={
45 "join_policy": join_policy,
46 },
47 desc="set_group_join_policy",
48 )
49
3150 def get_group(self, group_id):
3251 return self._simple_select_one(
3352 table="groups",
3554 "group_id": group_id,
3655 },
3756 retcols=(
38 "name", "short_description", "long_description", "avatar_url", "is_public"
57 "name", "short_description", "long_description",
58 "avatar_url", "is_public", "join_policy",
3959 ),
4060 allow_none=True,
41 desc="is_user_in_group",
61 desc="get_group",
4262 )
4363
4464 def get_users_in_group(self, group_id, include_private=False):
00 # -*- coding: utf-8 -*-
11 # Copyright 2014 - 2016 OpenMarket Ltd
2 # Copyright 2018 New Vector Ltd
23 #
34 # Licensed under the Apache License, Version 2.0 (the "License");
45 # you may not use this file except in compliance with the License.
2425
2526 # Remember to update this number every time a change is made to database
2627 # schema files, so the users will be informed on server restarts.
27 SCHEMA_VERSION = 47
28 SCHEMA_VERSION = 48
2829
2930 dir_path = os.path.abspath(os.path.dirname(__file__))
3031
459459 """
460460 def _find_next_generated_user_id(txn):
461461 txn.execute("SELECT name FROM users")
462 rows = self.cursor_to_dict(txn)
463462
464463 regex = re.compile("^@(\d+):")
465464
466465 found = set()
467466
468 for r in rows:
469 user_id = r["name"]
467 for user_id, in txn:
470468 match = regex.search(user_id)
471469 if match:
472470 found.add(int(match.group(1)))
593593
594594 while next_token:
595595 sql = """
596 SELECT stream_ordering, content FROM events
596 SELECT stream_ordering, json FROM events
597 JOIN event_json USING (event_id)
597598 WHERE room_id = ?
598599 AND stream_ordering < ?
599600 AND contains_url = ? AND outlier = ?
605606 next_token = None
606607 for stream_ordering, content_json in txn:
607608 next_token = stream_ordering
608 content = json.loads(content_json)
609
609 event_json = json.loads(content_json)
610 content = event_json["content"]
610611 content_url = content.get("url")
611612 thumbnail_url = content.get("info", {}).get("thumbnail_url")
612613
644644
645645 def add_membership_profile_txn(txn):
646646 sql = ("""
647 SELECT stream_ordering, event_id, events.room_id, content
647 SELECT stream_ordering, event_id, events.room_id, event_json.json
648648 FROM events
649 INNER JOIN event_json USING (event_id)
649650 INNER JOIN room_memberships USING (event_id)
650651 WHERE ? <= stream_ordering AND stream_ordering < ?
651652 AND type = 'm.room.member'
666667 event_id = row["event_id"]
667668 room_id = row["room_id"]
668669 try:
669 content = json.loads(row["content"])
670 event_json = json.loads(row["json"])
671 content = event_json['content']
670672 except Exception:
671673 continue
672674
1111 # See the License for the specific language governing permissions and
1212 # limitations under the License.
1313
14 import json
1514 import logging
15
16 import simplejson as json
1617
1718 logger = logging.getLogger(__name__)
1819
0 /* Copyright 2018 New Vector Ltd
1 *
2 * Licensed under the Apache License, Version 2.0 (the "License");
3 * you may not use this file except in compliance with the License.
4 * You may obtain a copy of the License at
5 *
6 * http://www.apache.org/licenses/LICENSE-2.0
7 *
8 * Unless required by applicable law or agreed to in writing, software
9 * distributed under the License is distributed on an "AS IS" BASIS,
10 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 * See the License for the specific language governing permissions and
12 * limitations under the License.
13 */
14
15 INSERT into background_updates (update_name, progress_json)
16 VALUES ('user_ips_last_seen_index', '{}');
0 /* Copyright 2018 New Vector Ltd
1 *
2 * Licensed under the Apache License, Version 2.0 (the "License");
3 * you may not use this file except in compliance with the License.
4 * You may obtain a copy of the License at
5 *
6 * http://www.apache.org/licenses/LICENSE-2.0
7 *
8 * Unless required by applicable law or agreed to in writing, software
9 * distributed under the License is distributed on an "AS IS" BASIS,
10 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 * See the License for the specific language governing permissions and
12 * limitations under the License.
13 */
14
15 /*
16 * This isn't a real ENUM because sqlite doesn't support it
17 * and we use a default of NULL for inserted rows and interpret
18 * NULL at the python store level as necessary so that existing
19 * rows are given the correct default policy.
20 */
21 ALTER TABLE groups ADD COLUMN join_policy TEXT NOT NULL DEFAULT 'invite';
7474
7575 def reindex_search_txn(txn):
7676 sql = (
77 "SELECT stream_ordering, event_id, room_id, type, content, "
77 "SELECT stream_ordering, event_id, room_id, type, json, "
7878 " origin_server_ts FROM events"
79 " JOIN event_json USING (event_id)"
7980 " WHERE ? <= stream_ordering AND stream_ordering < ?"
8081 " AND (%s)"
8182 " ORDER BY stream_ordering DESC"
103104 stream_ordering = row["stream_ordering"]
104105 origin_server_ts = row["origin_server_ts"]
105106 try:
106 content = json.loads(row["content"])
107 event_json = json.loads(row["json"])
108 content = event_json["content"]
107109 except Exception:
108110 continue
109111
666666 # The array of numbers are the weights for the various part of the
667667 # search: (domain, _, display name, localpart)
668668 sql = """
669 SELECT d.user_id, display_name, avatar_url
669 SELECT d.user_id AS user_id, display_name, avatar_url
670670 FROM user_directory_search
671671 INNER JOIN user_directory AS d USING (user_id)
672672 %s
701701 search_query = _parse_query_sqlite(search_term)
702702
703703 sql = """
704 SELECT d.user_id, display_name, avatar_url
704 SELECT d.user_id AS user_id, display_name, avatar_url
705705 FROM user_directory_search
706706 INNER JOIN user_directory AS d USING (user_id)
707707 %s
00 # -*- coding: utf-8 -*-
11 # Copyright 2015, 2016 OpenMarket Ltd
2 # Copyright 2018 New Vector Ltd
23 #
34 # Licensed under the Apache License, Version 2.0 (the "License");
45 # you may not use this file except in compliance with the License.
3839
3940 class CacheEntry(object):
4041 __slots__ = [
41 "deferred", "sequence", "callbacks", "invalidated"
42 "deferred", "callbacks", "invalidated"
4243 ]
4344
44 def __init__(self, deferred, sequence, callbacks):
45 def __init__(self, deferred, callbacks):
4546 self.deferred = deferred
46 self.sequence = sequence
4747 self.callbacks = set(callbacks)
4848 self.invalidated = False
4949
6161 "max_entries",
6262 "name",
6363 "keylen",
64 "sequence",
6564 "thread",
6665 "metrics",
6766 "_pending_deferred_cache",
7978
8079 self.name = name
8180 self.keylen = keylen
82 self.sequence = 0
8381 self.thread = None
8482 self.metrics = register_cache(name, self.cache)
8583
112110 callbacks = [callback] if callback else []
113111 val = self._pending_deferred_cache.get(key, _CacheSentinel)
114112 if val is not _CacheSentinel:
115 if val.sequence == self.sequence:
116 val.callbacks.update(callbacks)
117 if update_metrics:
118 self.metrics.inc_hits()
119 return val.deferred
113 val.callbacks.update(callbacks)
114 if update_metrics:
115 self.metrics.inc_hits()
116 return val.deferred
120117
121118 val = self.cache.get(key, _CacheSentinel, callbacks=callbacks)
122119 if val is not _CacheSentinel:
136133 self.check_thread()
137134 entry = CacheEntry(
138135 deferred=value,
139 sequence=self.sequence,
140136 callbacks=callbacks,
141137 )
142
143 entry.callbacks.update(callbacks)
144138
145139 existing_entry = self._pending_deferred_cache.pop(key, None)
146140 if existing_entry:
149143 self._pending_deferred_cache[key] = entry
150144
151145 def shuffle(result):
152 if self.sequence == entry.sequence:
153 existing_entry = self._pending_deferred_cache.pop(key, None)
154 if existing_entry is entry:
155 self.cache.set(key, result, entry.callbacks)
156 else:
157 entry.invalidate()
146 existing_entry = self._pending_deferred_cache.pop(key, None)
147 if existing_entry is entry:
148 self.cache.set(key, result, entry.callbacks)
158149 else:
150 # oops, the _pending_deferred_cache has been updated since
151 # we started our query, so we are out of date.
152 #
153 # Better put back whatever we took out. (We do it this way
154 # round, rather than peeking into the _pending_deferred_cache
155 # and then removing on a match, to make the common case faster)
156 if existing_entry is not None:
157 self._pending_deferred_cache[key] = existing_entry
158
159 # we're not going to put this entry into the cache, so need
160 # to make sure that the invalidation callbacks are called.
161 # That was probably done when _pending_deferred_cache was
162 # updated, but it's possible that `set` was called without
163 # `invalidate` being previously called, in which case it may
164 # not have been. Either way, let's double-check now.
159165 entry.invalidate()
160166 return result
161167
167173
168174 def invalidate(self, key):
169175 self.check_thread()
170
171 # Increment the sequence number so that any SELECT statements that
172 # raced with the INSERT don't update the cache (SYN-369)
173 self.sequence += 1
176 self.cache.pop(key, None)
177
178 # if we have a pending lookup for this key, remove it from the
179 # _pending_deferred_cache, which will (a) stop it being returned
180 # for future queries and (b) stop it being persisted as a proper entry
181 # in self.cache.
174182 entry = self._pending_deferred_cache.pop(key, None)
183
184 # run the invalidation callbacks now, rather than waiting for the
185 # deferred to resolve.
175186 if entry:
176187 entry.invalidate()
177
178 self.cache.pop(key, None)
179188
180189 def invalidate_many(self, key):
181190 self.check_thread()
183192 raise TypeError(
184193 "The cache key must be a tuple not %r" % (type(key),)
185194 )
186 self.sequence += 1
187195 self.cache.del_multi(key)
188196
197 # if we have a pending lookup for this key, remove it from the
198 # _pending_deferred_cache, as above
189199 entry_dict = self._pending_deferred_cache.pop(key, None)
190200 if entry_dict is not None:
191201 for entry in iterate_tree_cache_entry(entry_dict):
193203
194204 def invalidate_all(self):
195205 self.check_thread()
196 self.sequence += 1
197206 self.cache.clear()
207 for entry in self._pending_deferred_cache.itervalues():
208 entry.invalidate()
209 self._pending_deferred_cache.clear()
198210
199211
200212 class _CacheDescriptorBase(object):
1313 # limitations under the License.
1414
1515 from frozendict import frozendict
16 import simplejson as json
1617
1718
1819 def freeze(o):
4849 pass
4950
5051 return o
52
53
54 def _handle_frozendict(obj):
55 """Helper for EventEncoder. Makes frozendicts serializable by returning
56 the underlying dict
57 """
58 if type(obj) is frozendict:
59 # fishing the protected dict out of the object is a bit nasty,
60 # but we don't really want the overhead of copying the dict.
61 return obj._dict
62 raise TypeError('Object of type %s is not JSON serializable' %
63 obj.__class__.__name__)
64
65
66 # A JSONEncoder which is capable of encoding frozendics without barfing
67 frozendict_json_encoder = json.JSONEncoder(
68 default=_handle_frozendict,
69 )
1111 # See the License for the specific language governing permissions and
1212 # limitations under the License.
1313
14 from twisted.web.resource import Resource
14 from twisted.web.resource import NoResource
1515
1616 import logging
1717
4444 for path_seg in full_path.split('/')[1:-1]:
4545 if path_seg not in last_resource.listNames():
4646 # resource doesn't exist, so make a "dummy resource"
47 child_resource = Resource()
47 child_resource = NoResource()
4848 last_resource.putChild(path_seg, child_resource)
4949 res_id = _resource_id(last_resource, path_seg)
5050 resource_mappings[res_id] = child_resource
00 # -*- coding: utf-8 -*-
11 # Copyright 2016 OpenMarket Ltd
2 # Copyright 2018 New Vector Ltd
23 #
34 # Licensed under the Apache License, Version 2.0 (the "License");
45 # you may not use this file except in compliance with the License.
1112 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1213 # See the License for the specific language governing permissions and
1314 # limitations under the License.
15 from functools import partial
1416 import logging
1517
1618 import mock
2224 from tests import unittest
2325
2426 logger = logging.getLogger(__name__)
27
28
29 class CacheTestCase(unittest.TestCase):
30 def test_invalidate_all(self):
31 cache = descriptors.Cache("testcache")
32
33 callback_record = [False, False]
34
35 def record_callback(idx):
36 callback_record[idx] = True
37
38 # add a couple of pending entries
39 d1 = defer.Deferred()
40 cache.set("key1", d1, partial(record_callback, 0))
41
42 d2 = defer.Deferred()
43 cache.set("key2", d2, partial(record_callback, 1))
44
45 # lookup should return the deferreds
46 self.assertIs(cache.get("key1"), d1)
47 self.assertIs(cache.get("key2"), d2)
48
49 # let one of the lookups complete
50 d2.callback("result2")
51 self.assertEqual(cache.get("key2"), "result2")
52
53 # now do the invalidation
54 cache.invalidate_all()
55
56 # lookup should return none
57 self.assertIsNone(cache.get("key1", None))
58 self.assertIsNone(cache.get("key2", None))
59
60 # both callbacks should have been callbacked
61 self.assertTrue(
62 callback_record[0], "Invalidation callback for key1 not called",
63 )
64 self.assertTrue(
65 callback_record[1], "Invalidation callback for key2 not called",
66 )
67
68 # letting the other lookup complete should do nothing
69 d1.callback("result1")
70 self.assertIsNone(cache.get("key1", None))
2571
2672
2773 class DescriptorTestCase(unittest.TestCase):