Fix origin handling for pushed transactions
Use the actual origin for push transactions, rather than whatever the remote
server claimed.
Richard van der Hoff authored 5 years ago
Andrej Shadura committed 5 years ago
98 | 98 | |
99 | 99 | @defer.inlineCallbacks |
100 | 100 | @log_function |
101 | def on_incoming_transaction(self, transaction_data): | |
101 | def on_incoming_transaction(self, origin, transaction_data): | |
102 | 102 | # keep this as early as possible to make the calculated origin ts as |
103 | 103 | # accurate as possible. |
104 | 104 | request_time = self._clock.time_msec() |
107 | 107 | |
108 | 108 | if not transaction.transaction_id: |
109 | 109 | raise Exception("Transaction missing transaction_id") |
110 | if not transaction.origin: | |
111 | raise Exception("Transaction missing origin") | |
112 | 110 | |
113 | 111 | logger.debug("[%s] Got transaction", transaction.transaction_id) |
114 | 112 | |
115 | 113 | # use a linearizer to ensure that we don't process the same transaction |
116 | 114 | # multiple times in parallel. |
117 | 115 | with (yield self._transaction_linearizer.queue( |
118 | (transaction.origin, transaction.transaction_id), | |
116 | (origin, transaction.transaction_id), | |
119 | 117 | )): |
120 | 118 | result = yield self._handle_incoming_transaction( |
121 | transaction, request_time, | |
119 | origin, transaction, request_time, | |
122 | 120 | ) |
123 | 121 | |
124 | 122 | defer.returnValue(result) |
125 | 123 | |
126 | 124 | @defer.inlineCallbacks |
127 | def _handle_incoming_transaction(self, transaction, request_time): | |
125 | def _handle_incoming_transaction(self, origin, transaction, request_time): | |
128 | 126 | """ Process an incoming transaction and return the HTTP response |
129 | 127 | |
130 | 128 | Args: |
129 | origin (unicode): the server making the request | |
131 | 130 | transaction (Transaction): incoming transaction |
132 | 131 | request_time (int): timestamp that the HTTP request arrived at |
133 | 132 | |
134 | 133 | Returns: |
135 | 134 | Deferred[(int, object)]: http response code and body |
136 | 135 | """ |
137 | response = yield self.transaction_actions.have_responded(transaction) | |
136 | response = yield self.transaction_actions.have_responded(origin, transaction) | |
138 | 137 | |
139 | 138 | if response: |
140 | 139 | logger.debug( |
148 | 147 | |
149 | 148 | received_pdus_counter.inc(len(transaction.pdus)) |
150 | 149 | |
151 | origin_host, _ = parse_server_name(transaction.origin) | |
150 | origin_host, _ = parse_server_name(origin) | |
152 | 151 | |
153 | 152 | pdus_by_room = {} |
154 | 153 | |
189 | 188 | event_id = pdu.event_id |
190 | 189 | try: |
191 | 190 | yield self._handle_received_pdu( |
192 | transaction.origin, pdu | |
191 | origin, pdu | |
193 | 192 | ) |
194 | 193 | pdu_results[event_id] = {} |
195 | 194 | except FederationError as e: |
211 | 210 | if hasattr(transaction, "edus"): |
212 | 211 | for edu in (Edu(**x) for x in transaction.edus): |
213 | 212 | yield self.received_edu( |
214 | transaction.origin, | |
213 | origin, | |
215 | 214 | edu.edu_type, |
216 | 215 | edu.content |
217 | 216 | ) |
223 | 222 | logger.debug("Returning: %s", str(response)) |
224 | 223 | |
225 | 224 | yield self.transaction_actions.set_response( |
225 | origin, | |
226 | 226 | transaction, |
227 | 227 | 200, response |
228 | 228 | ) |
35 | 35 | self.store = datastore |
36 | 36 | |
37 | 37 | @log_function |
38 | def have_responded(self, transaction): | |
38 | def have_responded(self, origin, transaction): | |
39 | 39 | """ Have we already responded to a transaction with the same id and |
40 | 40 | origin? |
41 | 41 | |
49 | 49 | "transaction_id") |
50 | 50 | |
51 | 51 | return self.store.get_received_txn_response( |
52 | transaction.transaction_id, transaction.origin | |
52 | transaction.transaction_id, origin | |
53 | 53 | ) |
54 | 54 | |
55 | 55 | @log_function |
56 | def set_response(self, transaction, code, response): | |
56 | def set_response(self, origin, transaction, code, response): | |
57 | 57 | """ Persist how we responded to a transaction. |
58 | 58 | |
59 | 59 | Returns: |
65 | 65 | |
66 | 66 | return self.store.set_received_txn_response( |
67 | 67 | transaction.transaction_id, |
68 | transaction.origin, | |
68 | origin, | |
69 | 69 | code, |
70 | 70 | response, |
71 | 71 | ) |
352 | 352 | |
353 | 353 | try: |
354 | 354 | code, response = yield self.handler.on_incoming_transaction( |
355 | transaction_data | |
355 | origin, transaction_data, | |
356 | 356 | ) |
357 | 357 | except Exception: |
358 | 358 | logger.exception("on_incoming_transaction failed") |
32 | 32 | ) |
33 | 33 | |
34 | 34 | |
35 | def _expect_edu(destination, edu_type, content, origin="test"): | |
35 | def _expect_edu_transaction(edu_type, content, origin="test"): | |
36 | 36 | return { |
37 | 37 | "origin": origin, |
38 | 38 | "origin_server_ts": 1000000, |
41 | 41 | } |
42 | 42 | |
43 | 43 | |
44 | def _make_edu_json(origin, edu_type, content): | |
45 | return json.dumps(_expect_edu("test", edu_type, content, origin=origin)).encode( | |
44 | def _make_edu_transaction_json(edu_type, content): | |
45 | return json.dumps(_expect_edu_transaction(edu_type, content)).encode( | |
46 | 46 | 'utf8' |
47 | 47 | ) |
48 | 48 | |
189 | 189 | call( |
190 | 190 | "farm", |
191 | 191 | path="/_matrix/federation/v1/send/1000000/", |
192 | data=_expect_edu( | |
193 | "farm", | |
192 | data=_expect_edu_transaction( | |
194 | 193 | "m.typing", |
195 | 194 | content={ |
196 | 195 | "room_id": self.room_id, |
220 | 219 | |
221 | 220 | self.assertEquals(self.event_source.get_current_key(), 0) |
222 | 221 | |
223 | yield self.mock_federation_resource.trigger( | |
222 | (code, response) = yield self.mock_federation_resource.trigger( | |
224 | 223 | "PUT", |
225 | 224 | "/_matrix/federation/v1/send/1000000/", |
226 | _make_edu_json( | |
227 | "farm", | |
225 | _make_edu_transaction_json( | |
228 | 226 | "m.typing", |
229 | 227 | content={ |
230 | 228 | "room_id": self.room_id, |
232 | 230 | "typing": True, |
233 | 231 | }, |
234 | 232 | ), |
235 | federation_auth=True, | |
233 | federation_auth_origin=b'farm', | |
236 | 234 | ) |
237 | 235 | |
238 | 236 | self.on_new_event.assert_has_calls( |
263 | 261 | call( |
264 | 262 | "farm", |
265 | 263 | path="/_matrix/federation/v1/send/1000000/", |
266 | data=_expect_edu( | |
267 | "farm", | |
264 | data=_expect_edu_transaction( | |
268 | 265 | "m.typing", |
269 | 266 | content={ |
270 | 267 | "room_id": self.room_id, |
305 | 305 | |
306 | 306 | @patch('twisted.web.http.Request') |
307 | 307 | @defer.inlineCallbacks |
308 | def trigger(self, http_method, path, content, mock_request, federation_auth=False): | |
308 | def trigger( | |
309 | self, http_method, path, content, mock_request, | |
310 | federation_auth_origin=None, | |
311 | ): | |
309 | 312 | """ Fire an HTTP event. |
310 | 313 | |
311 | 314 | Args: |
314 | 317 | content : The HTTP body |
315 | 318 | mock_request : Mocked request to pass to the event so it can get |
316 | 319 | content. |
320 | federation_auth_origin (bytes|None): domain to authenticate as, for federation | |
317 | 321 | Returns: |
318 | 322 | A tuple of (code, response) |
319 | 323 | Raises: |
334 | 338 | mock_request.getClientIP.return_value = "-" |
335 | 339 | |
336 | 340 | headers = {} |
337 | if federation_auth: | |
338 | headers[b"Authorization"] = [b"X-Matrix origin=test,key=,sig="] | |
341 | if federation_auth_origin is not None: | |
342 | headers[b"Authorization"] = [ | |
343 | b"X-Matrix origin=%s,key=,sig=" % (federation_auth_origin, ) | |
344 | ] | |
339 | 345 | mock_request.requestHeaders.getRawHeaders = mock_getRawHeaders(headers) |
340 | 346 | |
341 | 347 | # return the right path if the event requires it |