Codebase list django-axes / upstream/5.24.0
New upstream version 5.24.0 Sunil Mohan Adapa 2 years ago
83 changed file(s) with 4065 addition(s) and 2729 deletion(s). Raw diff Collapse all Expand all
+0
-3
.coveragerc less more
0 [run]
1 branch = True
2 source = axes
0 version: 2
1 updates:
2 - package-ecosystem: "pip"
3 directory: "/"
4 schedule:
5 interval: "daily"
6 time: "12:00"
7 open-pull-requests-limit: 10
0 name: Release
1
2 on:
3 push:
4 tags:
5 - '*'
6
7 jobs:
8 build:
9 if: github.repository == 'jazzband/django-axes'
10 runs-on: ubuntu-latest
11
12 steps:
13 - uses: actions/checkout@v2
14 with:
15 fetch-depth: 0
16
17 - name: Set up Python
18 uses: actions/setup-python@v2
19 with:
20 python-version: 3.8
21
22 - name: Install dependencies
23 run: |
24 python -m pip install -U pip
25 python -m pip install -U setuptools twine wheel
26
27 - name: Build package
28 run: |
29 python setup.py --version
30 python setup.py sdist --format=gztar bdist_wheel
31 twine check dist/*
32
33 - name: Upload packages to Jazzband
34 if: github.event_name == 'push' && startsWith(github.ref, 'refs/tags')
35 uses: pypa/gh-action-pypi-publish@master
36 with:
37 user: jazzband
38 password: ${{ secrets.JAZZBAND_RELEASE_KEY }}
39 repository_url: https://jazzband.co/projects/django-axes/upload
0 name: Test
1
2 on: [push, pull_request]
3
4 jobs:
5 build:
6 name: build (Python ${{ matrix.python-version }}, Django ${{ matrix.django-version }})
7 runs-on: ubuntu-latest
8 strategy:
9 fail-fast: false
10 matrix:
11 python-version: ['3.6', '3.7', '3.8', '3.9', 'pypy3']
12 django-version: ['2.2', '3.1', '3.2']
13 # Tox configuration for QA environment
14 include:
15 - python-version: '3.8'
16 django-version: 'qa'
17 # Django > 3.2 only supports >= Python 3.8
18 - python-version: '3.8'
19 django-version: 'main'
20 experimental: true
21 - python-version: '3.9'
22 django-version: 'main'
23 experimental: true
24 - python-version: 'pypy3'
25 django-version: 'main'
26 experimental: true
27
28 steps:
29 - uses: actions/checkout@v2
30
31 - name: Set up Python ${{ matrix.python-version }}
32 uses: actions/setup-python@v2
33 with:
34 python-version: ${{ matrix.python-version }}
35
36 - name: Get pip cache dir
37 id: pip-cache
38 run: |
39 echo "::set-output name=dir::$(pip cache dir)"
40
41 - name: Cache
42 uses: actions/cache@v2
43 with:
44 path: ${{ steps.pip-cache.outputs.dir }}
45 key:
46 ${{ matrix.python-version }}-v1-${{ hashFiles('**/setup.py') }}
47 restore-keys: |
48 ${{ matrix.python-version }}-v1-
49
50 - name: Install dependencies
51 run: |
52 python -m pip install --upgrade pip
53 python -m pip install --upgrade tox tox-gh-actions
54
55 - name: Tox tests
56 run: |
57 tox -v
58 env:
59 DJANGO: ${{ matrix.django-version }}
60
61 - name: Upload coverage
62 uses: codecov/codecov-action@v1
63 with:
64 name: Python ${{ matrix.python-version }}
11 *.pyc
22 *.swp
33 .coverage
4 coverage.xml
45 .DS_Store
56 .idea
67 .mypy_cache/
1415 test.db
1516 .eggs
1617 pip-wheel-metadata
18 .vscode/
+0
-40
.travis.yml less more
0 dist: xenial
1 language: python
2 cache: pip
3 python:
4 - 3.6
5 - 3.7
6 - 3.8
7 - pypy3
8 env:
9 - DJANGO=2.2
10 - DJANGO=3.0
11 - DJANGO=3.1
12 - DJANGO=master
13 jobs:
14 fast_finish: true
15 allow_failures:
16 - env: DJANGO=master
17 include:
18 - python: 3.6
19 env: TOXENV=qa
20 - stage: deploy
21 env:
22 python: 3.6
23 script: skip
24 deploy:
25 provider: pypi
26 user: jazzband
27 server: https://jazzband.co/projects/django-axes/upload
28 distributions: sdist bdist_wheel
29 password:
30 secure: TCH5tGIggL2wsWce2svMwpEpPiwVOYqq1R3uSBTexszleP0OafNq/wZk2KZEReR5w1Aq68qp5F5Eeh2ZjJTq4f9M4LtTvqQzrmyNP55DYk/uB1rBJm9b4gBgMtAknxdI2g7unkhQEDo4suuPCVofM7rrDughySNpmvlUQYDttHQ=
31 skip_existing: true
32 on:
33 tags: true
34 repo: jazzband/django-axes
35 python: 3.6
36 install: pip install tox-travis codecov
37 script: tox
38 after_success:
39 - codecov
00
11 Changes
22 =======
3
4
5 5.24.0 (2021-09-09)
6 -------------------
7
8 - Use atomic transaction for updating AccessAttempts in database handler.
9 [okapies]
10
11
12 5.23.0 (2021-09-02)
13 -------------------
14
15 - Pass ``request`` as argument to ``AXES_CLIENT_STR_CALLABLE``.
16 [sarahboyce]
17
18
19 5.22.0 (2021-08-31)
20 -------------------
21
22 - Improve ``failures_since_start`` handling by moving the counter incrementation
23 from non-atomic Python code call to atomic database function.
24 [okapies]
25 - Add publicly available ``request.axes_failures_since_start`` attribute.
26 [okapies]
27
28
29 5.21.0 (2021-08-19)
30 -------------------
31
32 - Add configurable lockout HTTP status code responses
33 with the new ``AXES_HTTP_RESPONSE_CODE`` setting.
34 [phil-bell]
35
36
37 5.20.0 (2021-06-29)
38 -------------------
39
40 - Improve race condition handling in e.g. multi-process environments by using
41 ``get_or_create`` for access attempt fetching and updates.
42 [uli-klank]
43
44
45 5.19.0 (2021-06-16)
46 -------------------
47
48 - Add Polish locale.
49 [Quadric]
50
51
52 5.18.0 (2021-06-09)
53 -------------------
54
55 - Fix ``default_auto_field`` warning.
56 [zkanda]
57
58
59 5.17.0 (2021-06-05)
60 -------------------
61
62 - Fix ``default_app_config`` deprecation.
63 Django 3.2 automatically detects ``AppConfig`` and therefore this setting is no longer required.
64 [nikolaik]
65
66
67 5.16.0 (2021-05-19)
68 -------------------
69
70 - Add ``AXES_CLIENT_STR_CALLABLE`` setting.
71 [smtydn]
72
73
74 5.15.0 (2021-05-03)
75 -------------------
76
77 - Add option to cleanse sensitive GET and POST params in database handler
78 with the ``AXES_SENSITIVE_PARAMETERS`` setting.
79 [mcoconnor]
80
81
82 5.14.0 (2021-04-06)
83 -------------------
84
85 - Improve message formatting for lockout message and translations.
86 [ashokdelphia]
87 - Remove support for Django 3.0.
88 [hramezani]
89 - Add support for Django 3.2.
90 [hramezani]
91
92
93 5.13.1 (2021-02-22)
94 -------------------
95
96 - Default ``AXES_VERBOSE`` to ``AXES_ENABLED`` configuration setting,
97 disabling verbose startup logging when Axes itself is disabled.
98 [christianbundy]
99 - Update documentation.
100 [KStenK]
101
102
103 5.13.0 (2021-02-15)
104 -------------------
105
106 - Add support for resetting attempts with cache backend.
107 [nattyg93]
108
109
110 5.12.0 (2021-01-07)
111 -------------------
112
113 - Clean up test structure and migrate tests outside
114 the main package for a smaller wheel distributions.
115 [aleksihakli]
116 - Move configuration to pyproject.toml for cleaner layout.
117 [aleksihakli]
118 - Clean up test settings override configuration.
119 [hramezani]
120
121
122 5.11.1 (2021-01-06)
123 -------------------
124
125 - Fix cache entry creations for None username.
126 [cabarnes]
127
128
129 5.11.0 (2021-01-05)
130 -------------------
131
132 - Add lockout view CORS support with ``AXES_ALLOWED_CORS_ORIGINS`` configuration flag.
133 [vladox]
134 - Add missing ``@wraps`` decorator to ``axes.decorators.axes_dispatch``.
135 [aleksihakli]
136
137
138 5.10.1 (2021-01-04)
139 -------------------
140
141 - Add ``DEFAULT_AUTO_FIELD`` to test settings.
142 [hramezani]
143 - Fix documentation language.
144 [danielquinn]
145 - Fix Python package version specifiers and remove redundant imports.
146 [aleksihakli]
147
148
149 5.10.0 (2020-12-18)
150 -------------------
151
152 - Deprecate stock DRF support from 5.8.0,
153 require users to set it up per project.
154 Check the documentation for more information.
155 [aleksihakli]
156
157
158 5.9.1 (2020-12-02)
159 ------------------
160
161 - Move tests to GitHub Actions
162 [jezdez]
163 - Fix running Axes code in middleware when ``AXES_ENABLED`` is ``False``.
164 [ashokdelphia]
165
166
167 5.9.0 (2020-11-05)
168 ------------------
169
170 - Add Python 3.9 support.
171 [hramezani]
172 - Prevent ``AccessAttempt`` creation with database handler when
173 username is not set and ``AXES_ONLY_USER_FAILURES`` setting is not set.
174 [hramezani]
175
176
177 5.8.0 (2020-10-16)
178 ------------------
179
180 - Improve Django REST Framework (DRF) integration.
181 [Anatoly]
182
183
184 5.7.1 (2020-09-27)
185 ------------------
186
187 - Adjust settings import and handling chain
188 for cleaner module import and invocation order.
189 [aleksihakli]
190 - Adjust the use of ``AXES_ENABLED`` flag so that
191 imports are always done the same way and initial log
192 is written regardless of the setting and it only affects
193 code that is decorated or wrapped with ``toggleable``.
194 [alekshakli]
195
196
197 5.7.0 (2020-09-26)
198 ------------------
199
200 - Deprecate ``AXES_LOGGER`` Axes setting and move to ``__name__``
201 based logging and fully qualified Python module name log identifiers.
202 [aleksihakli]
203
204
205 5.6.2 (2020-09-20)
206 ------------------
207
208 - Fix regression in ``axes_reset_user`` management command.
209 [aleksihakli]
210
211
212 5.6.1 (2020-09-17)
213 ------------------
214
215 - Improve test dependency management and upgrade black code formatter.
216 [smithdc1]
217
218
219 5.6.0 (2020-09-12)
220 ------------------
221
222 - Add proper development ``subTest`` support via ``pytest-subtests`` package.
223 [smithdc1]
224 - Deprecate ``django-appconf`` and use plain settings for Axes.
225 [aleksihakli]
226
227
228 5.5.2 (2020-09-11)
229 ------------------
230
231 - Update deprecating use of the ``request.is_ajax`` method.
232 [smithdc1]
233
234
235 5.5.1 (2020-09-10)
236 ------------------
237
238 - Update deprecated uses of Django modules and members.
239 [smithdc1]
240
241
242 5.5.0 (2020-08-21)
243 ------------------
244
245 - Add support for locking requests based on
246 username OR IP address with inclusive or
247 using the ``LOCK_OUT_BY_USER_OR_IP`` flag.
248 [PetrDlouhy]
249 - Deprecate Signal ``providing_args`` for Django 3.1 support.
250 [coredumperror]
3251
4252
5253 5.4.3 (2020-08-06)
+0
-4
MANIFEST.in less more
0 include LICENSE README.rst CHANGES.rst
1 recursive-include axes *.py
2 recursive-include axes/locale *.mo *.po
3 recursive-include docs *.rst
1313 :target: https://pypi.org/project/django-axes/
1414 :alt: PyPI release
1515
16 .. image:: https://img.shields.io/pypi/pyversions/django-axes.svg
17 :target: https://pypi.org/project/django-axes/
18 :alt: Supported Python versions
19
20 .. image:: https://img.shields.io/pypi/djversions/django-axes.svg
21 :target: https://pypi.org/project/django-axes/
22 :alt: Supported Django versions
23
1624 .. image:: https://img.shields.io/readthedocs/django-axes.svg
1725 :target: https://django-axes.readthedocs.io/
1826 :alt: Documentation
1927
20 .. image:: https://secure.travis-ci.org/jazzband/django-axes.svg?branch=master
21 :target: http://travis-ci.org/jazzband/django-axes
22 :alt: Build Status
28 .. image:: https://github.com/jazzband/django-axes/workflows/Test/badge.svg
29 :target: https://github.com/jazzband/django-axes/actions
30 :alt: GitHub Actions
2331
2432 .. image:: https://codecov.io/gh/jazzband/django-axes/branch/master/graph/badge.svg
2533 :target: https://codecov.io/gh/jazzband/django-axes
2634 :alt: Coverage
2735
2836
29 Axes is a very simple way for you to keep track of failed
30 authentication attempts for your login views.
37 Axes is a Django plugin for keeping track of suspicious
38 login attempts for your Django based website
39 and implementing simple brute-force attack blocking.
3140
3241 The name is sort of a geeky pun, since it can be interpreted as:
3342
3443 * ``access``, as in monitoring access attempts, or
3544 * ``axes``, as in tools you can use to hack (generally on wood).
3645
37 In this case, however, the hacking part of it can be taken a bit further:
38 **Axes is intended to help you stop people from brute forcing your Django site**.
39
4046
4147 Functionality
4248 -------------
4349
4450 Axes records login attempts to your Django powered site and prevents attackers
45 from brute forcing the site when they exceed the configured attempt limit.
51 from attempting further logins to your site when they exceed the configured attempt limit.
4652
4753 Axes can track the attempts and persist them in the database indefinitely,
4854 or alternatively use a fast and DDoS resistant cache implementation.
7379 Contributions
7480 -------------
7581
76 This is a `Jazzband <https://jazzband.co>`_ project.
82 All contributions are welcome!
83
84 It is best to separate proposed changes and PRs into small, distinct patches
85 by type so that they can be merged faster into upstream and released quicker.
86
87 One way to organize contributions would be to separate PRs for e.g.
88
89 * bugfixes,
90 * new features,
91 * code and design improvements,
92 * documentation improvements, or
93 * tooling and CI improvements.
94
95 Merging contributions requires passing the checks configured
96 with the CI. This includes running tests and linters successfully
97 on the currently officially supported Python and Django versions.
98
99 The test automation is run automatically with GitHub Actions, but you can
100 run it locally with the ``tox`` command before pushing commits.
101
102 Please note that this is a `Jazzband <https://jazzband.co>`_ project.
77103 By contributing you agree to abide by the
78104 `Contributor Code of Conduct <https://jazzband.co/about/conduct>`_
79105 and follow the `guidelines <https://jazzband.co/about/guidelines>`_.
80
81 It is best to separate proposed changes and PRs into small, distinct patches
82 by type so that they can be merged faster into upstream and released quicker:
83
84 * features,
85 * bugfixes,
86 * code style improvements, and
87 * documentation improvements.
88
89 All contributions are required to pass the quality gates configured
90 with the CI. This includes running tests and linters successfully
91 on the currently officially supported Python and Django versions.
92
93 The test automation is run automatically by Travis CI, but you can
94 run it locally with the ``tox`` command before pushing commits.
00 from pkg_resources import get_distribution
11
2 default_app_config = "axes.apps.AppConfig"
2 import django
3
4 if django.VERSION < (3, 2):
5 default_app_config = "axes.apps.AppConfig"
36
47 __version__ = get_distribution("django-axes").version
0 from django.conf import settings
10 from django.contrib import admin
21 from django.utils.translation import gettext_lazy as _
32
3 from axes.conf import settings
44 from axes.models import AccessAttempt, AccessLog
55
66
00 from logging import getLogger
1
2 from django import apps
13 from pkg_resources import get_distribution
24
3 from django import apps
4
5 from axes.conf import settings
6
7 log = getLogger(settings.AXES_LOGGER)
5 log = getLogger(__name__)
86
97
108 class AppConfig(apps.AppConfig):
9 default_auto_field = "django.db.models.AutoField"
1110 name = "axes"
12 logging_initialized = False
11 initialized = False
1312
1413 @classmethod
1514 def initialize(cls):
2019 It displays version information exactly once at application startup.
2120 """
2221
23 if not settings.AXES_ENABLED:
22 if cls.initialized:
2423 return
24 cls.initialized = True
2525
26 if not settings.AXES_VERBOSE:
27 return
26 # Only import settings, checks, and signals one time after Django has been initialized
27 from axes.conf import settings # noqa
28 from axes import checks, signals # noqa
2829
29 if cls.logging_initialized:
30 return
31 cls.logging_initialized = True
30 # Skip startup log messages if Axes is not set to verbose
31 if settings.AXES_VERBOSE:
32 log.info("AXES: BEGIN LOG")
33 log.info(
34 "AXES: Using django-axes version %s",
35 get_distribution("django-axes").version,
36 )
3237
33 log.info("AXES: BEGIN LOG")
34 log.info(
35 "AXES: Using django-axes version %s",
36 get_distribution("django-axes").version,
37 )
38
39 if settings.AXES_ONLY_USER_FAILURES:
40 log.info("AXES: blocking by username only.")
41 elif settings.AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP:
42 log.info("AXES: blocking by combination of username and IP.")
43 else:
44 log.info("AXES: blocking by IP only.")
38 if settings.AXES_ONLY_USER_FAILURES:
39 log.info("AXES: blocking by username only.")
40 elif settings.AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP:
41 log.info("AXES: blocking by combination of username and IP.")
42 elif settings.AXES_LOCK_OUT_BY_USER_OR_IP:
43 log.info("AXES: blocking by username or IP.")
44 else:
45 log.info("AXES: blocking by IP only.")
4546
4647 def ready(self):
4748 self.initialize()
48
49 from axes import checks, signals # noqa
00 from logging import getLogger
1 from typing import List
12
23 from django.db.models import QuerySet
34 from django.utils.timezone import datetime, now
45
56 from axes.conf import settings
7 from axes.helpers import get_client_username, get_client_parameters, get_cool_off
68 from axes.models import AccessAttempt
7 from axes.helpers import get_client_username, get_client_parameters, get_cool_off
89
9 log = getLogger(settings.AXES_LOGGER)
10 log = getLogger(__name__)
1011
1112
1213 def get_cool_off_threshold(attempt_time: datetime = None) -> datetime:
2526 return attempt_time - cool_off
2627
2728
28 def filter_user_attempts(request, credentials: dict = None) -> QuerySet:
29 def filter_user_attempts(request, credentials: dict = None) -> List[QuerySet]:
2930 """
30 Return a queryset of AccessAttempts that match the given request and credentials.
31 Return a list querysets of AccessAttempts that match the given request and credentials.
3132 """
3233
3334 username = get_client_username(request, credentials)
3435
35 filter_kwargs = get_client_parameters(
36 filter_kwargs_list = get_client_parameters(
3637 username, request.axes_ip_address, request.axes_user_agent
3738 )
38
39 return AccessAttempt.objects.filter(**filter_kwargs)
39 attempts_list = [
40 AccessAttempt.objects.filter(**filter_kwargs)
41 for filter_kwargs in filter_kwargs_list
42 ]
43 return attempts_list
4044
4145
42 def get_user_attempts(request, credentials: dict = None) -> QuerySet:
46 def get_user_attempts(request, credentials: dict = None) -> List[QuerySet]:
4347 """
44 Get valid user attempts that match the given request and credentials.
48 Get list of querysets with valid user attempts that match the given request and credentials.
4549 """
4650
47 attempts = filter_user_attempts(request, credentials)
51 attempts_list = filter_user_attempts(request, credentials)
4852
4953 if settings.AXES_COOLOFF_TIME is None:
5054 log.debug(
5155 "AXES: Getting all access attempts from database because no AXES_COOLOFF_TIME is configured"
5256 )
53 return attempts
57 return attempts_list
5458
5559 threshold = get_cool_off_threshold(request.axes_attempt_time)
5660 log.debug("AXES: Getting access attempts that are newer than %s", threshold)
57 return attempts.filter(attempt_time__gte=threshold)
61 return [attempts.filter(attempt_time__gte=threshold) for attempts in attempts_list]
5862
5963
6064 def clean_expired_user_attempts(attempt_time: datetime = None) -> int:
8387 Reset all user attempts that match the given request and credentials.
8488 """
8589
86 attempts = filter_user_attempts(request, credentials)
90 attempts_list = filter_user_attempts(request, credentials)
8791
88 count, _ = attempts.delete()
92 count = 0
93 for attempts in attempts_list:
94 _count, _ = attempts.delete()
95 count += _count
8996 log.info("AXES: Reset %s access attempts from database.", count)
9097
9198 return count
120120 def axes_deprecation_check(app_configs, **kwargs): # pylint: disable=unused-argument
121121 warnings = []
122122
123 deprecated_settings = ["AXES_DISABLE_SUCCESS_ACCESS_LOG"]
123 deprecated_settings = [
124 "AXES_DISABLE_SUCCESS_ACCESS_LOG",
125 "AXES_LOGGER",
126 ]
124127
125128 for deprecated_setting in deprecated_settings:
126129 try:
00 from django.conf import settings
11 from django.utils.translation import gettext_lazy as _
22
3 from appconf import AppConf
43
4 # disable plugin when set to False
5 settings.AXES_ENABLED = getattr(settings, "AXES_ENABLED", True)
56
6 class AxesAppConf(AppConf):
7 class Meta:
8 prefix = "axes"
7 # see if the user has overridden the failure limit
8 settings.AXES_FAILURE_LIMIT = getattr(settings, "AXES_FAILURE_LIMIT", 3)
99
10 # disable plugin when set to False
11 ENABLED = True
10 # see if the user has set axes to lock out logins after failure limit
11 settings.AXES_LOCK_OUT_AT_FAILURE = getattr(settings, "AXES_LOCK_OUT_AT_FAILURE", True)
1212
13 # see if the user has overridden the failure limit
14 FAILURE_LIMIT = 3
13 # lock out with the combination of username and IP address
14 settings.AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP = getattr(
15 settings, "AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP", False
16 )
1517
16 # see if the user has set axes to lock out logins after failure limit
17 LOCK_OUT_AT_FAILURE = True
18 # lock out with the username or IP address
19 settings.AXES_LOCK_OUT_BY_USER_OR_IP = getattr(
20 settings, "AXES_LOCK_OUT_BY_USER_OR_IP", False
21 )
1822
19 # lock out with the combination of username and IP address
20 LOCK_OUT_BY_COMBINATION_USER_AND_IP = False
23 # lock out with username and never the IP or user agent
24 settings.AXES_ONLY_USER_FAILURES = getattr(settings, "AXES_ONLY_USER_FAILURES", False)
2125
22 # lock out with username and never the IP or user agent
23 ONLY_USER_FAILURES = False
26 # lock out just for admin site
27 settings.AXES_ONLY_ADMIN_SITE = getattr(settings, "AXES_ONLY_ADMIN_SITE", False)
2428
25 # lock out just for admin site
26 ONLY_ADMIN_SITE = False
29 # show Axes logs in admin
30 settings.AXES_ENABLE_ADMIN = getattr(settings, "AXES_ENABLE_ADMIN", True)
2731
28 # show Axes logs in admin
29 ENABLE_ADMIN = True
32 # lock out with the user agent, has no effect when ONLY_USER_FAILURES is set
33 settings.AXES_USE_USER_AGENT = getattr(settings, "AXES_USE_USER_AGENT", False)
3034
31 # lock out with the user agent, has no effect when ONLY_USER_FAILURES is set
32 USE_USER_AGENT = False
35 # use a specific username field to retrieve from login POST data
36 settings.AXES_USERNAME_FORM_FIELD = getattr(
37 settings, "AXES_USERNAME_FORM_FIELD", "username"
38 )
3339
34 # use a specific username field to retrieve from login POST data
35 USERNAME_FORM_FIELD = "username"
40 # use a specific password field to retrieve from login POST data
41 settings.AXES_PASSWORD_FORM_FIELD = getattr(
42 settings, "AXES_PASSWORD_FORM_FIELD", "password"
43 ) # noqa
3644
37 # use a specific password field to retrieve from login POST data
38 PASSWORD_FORM_FIELD = "password" # noqa
45 # use a provided callable to transform the POSTed username into the one used in credentials
46 settings.AXES_USERNAME_CALLABLE = getattr(settings, "AXES_USERNAME_CALLABLE", None)
3947
40 # use a provided callable to transform the POSTed username into the one used in credentials
41 USERNAME_CALLABLE = None
48 # determine if given user should be always allowed to attempt authentication
49 settings.AXES_WHITELIST_CALLABLE = getattr(settings, "AXES_WHITELIST_CALLABLE", None)
4250
43 # determine if given user should be always allowed to attempt authentication
44 WHITELIST_CALLABLE = None
51 # return custom lockout response if configured
52 settings.AXES_LOCKOUT_CALLABLE = getattr(settings, "AXES_LOCKOUT_CALLABLE", None)
4553
46 # return custom lockout response if configured
47 LOCKOUT_CALLABLE = None
54 # reset the number of failed attempts after one successful attempt
55 settings.AXES_RESET_ON_SUCCESS = getattr(settings, "AXES_RESET_ON_SUCCESS", False)
4856
49 # reset the number of failed attempts after one successful attempt
50 RESET_ON_SUCCESS = False
57 settings.AXES_DISABLE_ACCESS_LOG = getattr(settings, "AXES_DISABLE_ACCESS_LOG", False)
5158
52 DISABLE_ACCESS_LOG = False
59 settings.AXES_HANDLER = getattr(
60 settings, "AXES_HANDLER", "axes.handlers.database.AxesDatabaseHandler"
61 )
5362
54 HANDLER = "axes.handlers.database.AxesDatabaseHandler"
63 settings.AXES_LOCKOUT_TEMPLATE = getattr(settings, "AXES_LOCKOUT_TEMPLATE", None)
5564
56 LOGGER = "axes.watch_login"
65 settings.AXES_LOCKOUT_URL = getattr(settings, "AXES_LOCKOUT_URL", None)
5766
58 LOCKOUT_TEMPLATE = None
67 settings.AXES_COOLOFF_TIME = getattr(settings, "AXES_COOLOFF_TIME", None)
5968
60 LOCKOUT_URL = None
69 settings.AXES_VERBOSE = getattr(settings, "AXES_VERBOSE", settings.AXES_ENABLED)
6170
62 COOLOFF_TIME = None
71 # whitelist and blacklist
72 settings.AXES_NEVER_LOCKOUT_WHITELIST = getattr(
73 settings, "AXES_NEVER_LOCKOUT_WHITELIST", False
74 )
6375
64 VERBOSE = True
76 settings.AXES_NEVER_LOCKOUT_GET = getattr(settings, "AXES_NEVER_LOCKOUT_GET", False)
6577
66 # whitelist and blacklist
67 NEVER_LOCKOUT_WHITELIST = False
78 settings.AXES_ONLY_WHITELIST = getattr(settings, "AXES_ONLY_WHITELIST", False)
6879
69 NEVER_LOCKOUT_GET = False
80 settings.AXES_IP_WHITELIST = getattr(settings, "AXES_IP_WHITELIST", None)
7081
71 ONLY_WHITELIST = False
82 settings.AXES_IP_BLACKLIST = getattr(settings, "AXES_IP_BLACKLIST", None)
7283
73 IP_WHITELIST = None
84 # message to show when locked out and have cooloff enabled
85 settings.AXES_COOLOFF_MESSAGE = getattr(
86 settings,
87 "AXES_COOLOFF_MESSAGE",
88 _("Account locked: too many login attempts. Please try again later."),
89 )
7490
75 IP_BLACKLIST = None
91 # message to show when locked out and have cooloff disabled
92 settings.AXES_PERMALOCK_MESSAGE = getattr(
93 settings,
94 "AXES_PERMALOCK_MESSAGE",
95 _(
96 "Account locked: too many login attempts. Contact an admin to unlock your account."
97 ),
98 )
7699
77 # message to show when locked out and have cooloff enabled
78 COOLOFF_MESSAGE = _(
79 "Account locked: too many login attempts. Please try again later"
80 )
100 # if your deployment is using reverse proxies, set this value to 'left-most' or 'right-most' per your configuration
101 settings.AXES_PROXY_ORDER = getattr(settings, "AXES_PROXY_ORDER", "left-most")
81102
82 # message to show when locked out and have cooloff disabled
83 PERMALOCK_MESSAGE = _(
84 "Account locked: too many login attempts. Contact an admin to unlock your account."
85 )
103 # if your deployment is using reverse proxies, set this value to the number of proxies in front of Django
104 settings.AXES_PROXY_COUNT = getattr(settings, "AXES_PROXY_COUNT", None)
86105
87 # if your deployment is using reverse proxies, set this value to 'left-most' or 'right-most' per your configuration
88 PROXY_ORDER = "left-most"
106 # if your deployment is using reverse proxies, set to your trusted proxy IP addresses prefixes if needed
107 settings.AXES_PROXY_TRUSTED_IPS = getattr(settings, "AXES_PROXY_TRUSTED_IPS", None)
89108
90 # if your deployment is using reverse proxies, set this value to the number of proxies in front of Django
91 PROXY_COUNT = None
109 # set to the names of request.META attributes that should be checked for the IP address of the client
110 # if your deployment is using reverse proxies, ensure that the header attributes are securely set by the proxy
111 # ensure that the client can not spoof the headers by setting them and sending them through the proxy
112 settings.AXES_META_PRECEDENCE_ORDER = getattr(
113 settings,
114 "AXES_META_PRECEDENCE_ORDER",
115 getattr(settings, "IPWARE_META_PRECEDENCE_ORDER", ("REMOTE_ADDR",)),
116 )
92117
93 # if your deployment is using reverse proxies, set to your trusted proxy IP addresses prefixes if needed
94 PROXY_TRUSTED_IPS = None
118 # set CORS allowed origins when calling authentication over ajax
119 settings.AXES_ALLOWED_CORS_ORIGINS = getattr(settings, "AXES_ALLOWED_CORS_ORIGINS", "*")
95120
96 # set to the names of request.META attributes that should be checked for the IP address of the client
97 # if your deployment is using reverse proxies, ensure that the header attributes are securely set by the proxy
98 # ensure that the client can not spoof the headers by setting them and sending them through the proxy
99 META_PRECEDENCE_ORDER = getattr(
100 settings,
101 "AXES_META_PRECEDENCE_ORDER",
102 getattr(settings, "IPWARE_META_PRECEDENCE_ORDER", ("REMOTE_ADDR",)),
103 )
121 # set the list of sensitive parameters to cleanse from get/post data before logging
122 settings.AXES_SENSITIVE_PARAMETERS = getattr(
123 settings,
124 "AXES_SENSITIVE_PARAMETERS",
125 [],
126 )
104127
105 # set to `True` if using with Django REST Framework
106 REST_FRAMEWORK_ACTIVE = False
128 # set the callable for the readable string that can be used in
129 # e.g. logging to distinguish client requests
130 settings.AXES_CLIENT_STR_CALLABLE = getattr(settings, "AXES_CLIENT_STR_CALLABLE", None)
131
132 # set the HTTP response code given by too many requests
133 settings.AXES_HTTP_RESPONSE_CODE = getattr(settings, "AXES_HTTP_RESPONSE_CODE", 403)
44
55
66 def axes_dispatch(func):
7 @wraps(func)
78 def inner(request, *args, **kwargs):
89 if AxesProxyHandler.is_allowed(request):
910 return func(request, *args, **kwargs)
0 import re
01 from abc import ABC, abstractmethod
1 import re
22
33 from django.urls import reverse
44 from django.urls.exceptions import NoReverseMatch
145145
146146 return False
147147
148 def reset_attempts(self, *, ip_address: str = None, username: str = None) -> int:
148 def reset_attempts(
149 self,
150 *,
151 ip_address: str = None,
152 username: str = None,
153 ip_or_username: bool = False,
154 ) -> int:
149155 """
150156 Resets access attempts that match the given IP address or username.
151157
11
22 from axes.conf import settings
33 from axes.handlers.base import AxesBaseHandler, AbstractAxesHandler
4 from axes.signals import user_locked_out
54 from axes.helpers import (
65 get_cache,
76 get_cache_timeout,
1110 get_credentials,
1211 get_failure_limit,
1312 )
13 from axes.models import AccessAttempt
14 from axes.signals import user_locked_out
1415
15 log = getLogger(settings.AXES_LOGGER)
16 log = getLogger(__name__)
1617
1718
1819 class AxesCacheHandler(AbstractAxesHandler, AxesBaseHandler):
2425 self.cache = get_cache()
2526 self.cache_timeout = get_cache_timeout()
2627
28 def reset_attempts(
29 self,
30 *,
31 ip_address: str = None,
32 username: str = None,
33 ip_or_username: bool = False,
34 ) -> int:
35 cache_keys: list = []
36 count = 0
37
38 if ip_address is None and username is None:
39 raise NotImplementedError("Cannot clear all entries from cache")
40 if ip_or_username:
41 raise NotImplementedError(
42 "Due to the cache key ip_or_username=True is not supported"
43 )
44
45 cache_keys.extend(
46 get_client_cache_key(
47 AccessAttempt(username=username, ip_address=ip_address)
48 )
49 )
50
51 for cache_key in cache_keys:
52 deleted = self.cache.delete(cache_key)
53 count += int(deleted) if deleted is not None else 1
54
55 log.info("AXES: Reset %d access attempts from database.", count)
56
57 return count
58
2759 def get_failures(self, request, credentials: dict = None) -> int:
28 cache_key = get_client_cache_key(request, credentials)
29 return self.cache.get(cache_key, default=0)
60 cache_keys = get_client_cache_key(request, credentials)
61 failure_count = max(
62 self.cache.get(cache_key, default=0) for cache_key in cache_keys
63 )
64 return failure_count
3065
3166 def user_login_failed(
3267 self, sender, credentials: dict, request=None, **kwargs
4479 return
4580
4681 username = get_client_username(request, credentials)
82 if settings.AXES_ONLY_USER_FAILURES and username is None:
83 log.warning(
84 "AXES: Username is None and AXES_ONLY_USER_FAILURES is enabled, new record will NOT be created."
85 )
86 return
87
4788 client_str = get_client_str(
4889 username,
4990 request.axes_ip_address,
5091 request.axes_user_agent,
5192 request.axes_path_info,
93 request,
5294 )
5395
5496 if self.is_whitelisted(request, credentials):
5698 return
5799
58100 failures_since_start = 1 + self.get_failures(request, credentials)
101 request.axes_failures_since_start = failures_since_start
59102
60103 if failures_since_start > 1:
61104 log.warning(
70113 client_str,
71114 )
72115
73 cache_key = get_client_cache_key(request, credentials)
74 self.cache.set(cache_key, failures_since_start, self.cache_timeout)
116 cache_keys = get_client_cache_key(request, credentials)
117 for cache_key in cache_keys:
118 failures = self.cache.get(cache_key, default=0)
119 self.cache.set(cache_key, failures + 1, self.cache_timeout)
75120
76121 if (
77122 settings.AXES_LOCK_OUT_AT_FAILURE
103148 request.axes_ip_address,
104149 request.axes_user_agent,
105150 request.axes_path_info,
151 request,
106152 )
107153
108154 log.info("AXES: Successful login by %s.", client_str)
109155
110156 if settings.AXES_RESET_ON_SUCCESS:
111 cache_key = get_client_cache_key(request, credentials)
112 failures_since_start = self.cache.get(cache_key, default=0)
113 self.cache.delete(cache_key)
114 log.info(
115 "AXES: Deleted %d failed login attempts by %s from cache.",
116 failures_since_start,
117 client_str,
118 )
157 cache_keys = get_client_cache_key(request, credentials)
158 for cache_key in cache_keys:
159 failures_since_start = self.cache.get(cache_key, default=0)
160 self.cache.delete(cache_key)
161 log.info(
162 "AXES: Deleted %d failed login attempts by %s from cache.",
163 failures_since_start,
164 client_str,
165 )
119166
120167 def user_logged_out(self, sender, request, user, **kwargs):
121168 username = user.get_username() if user else None
124171 request.axes_ip_address,
125172 request.axes_user_agent,
126173 request.axes_path_info,
174 request,
127175 )
128176
129177 log.info("AXES: Successful logout by %s.", client_str)
00 from logging import getLogger
11
2 from django.db.models import Max, Value
2 from django.db import transaction
3 from django.db.models import F, Sum, Value, Q
34 from django.db.models.functions import Concat
45 from django.utils import timezone
56
1011 )
1112 from axes.conf import settings
1213 from axes.handlers.base import AxesBaseHandler, AbstractAxesHandler
13 from axes.models import AccessLog, AccessAttempt
14 from axes.signals import user_locked_out
1514 from axes.helpers import (
1615 get_client_str,
1716 get_client_username,
1918 get_failure_limit,
2019 get_query_str,
2120 )
22
23
24 log = getLogger(settings.AXES_LOGGER)
21 from axes.models import AccessLog, AccessAttempt
22 from axes.signals import user_locked_out
23
24 log = getLogger(__name__)
2525
2626
2727 class AxesDatabaseHandler(AbstractAxesHandler, AxesBaseHandler):
3232 process, caching its output can be dangerous.
3333 """
3434
35 def reset_attempts(self, *, ip_address: str = None, username: str = None) -> int:
35 def reset_attempts(
36 self,
37 *,
38 ip_address: str = None,
39 username: str = None,
40 ip_or_username: bool = False,
41 ) -> int:
3642 attempts = AccessAttempt.objects.all()
3743
38 if ip_address:
39 attempts = attempts.filter(ip_address=ip_address)
40 if username:
41 attempts = attempts.filter(username=username)
44 if ip_or_username:
45 attempts = attempts.filter(Q(ip_address=ip_address) | Q(username=username))
46 else:
47 if ip_address:
48 attempts = attempts.filter(ip_address=ip_address)
49 if username:
50 attempts = attempts.filter(username=username)
4251
4352 count, _ = attempts.delete()
4453 log.info("AXES: Reset %d access attempts from database.", count)
6170 return count
6271
6372 def get_failures(self, request, credentials: dict = None) -> int:
64 attempts = get_user_attempts(request, credentials)
65 return (
66 attempts.aggregate(Max("failures_since_start"))["failures_since_start__max"]
67 or 0
68 )
73 attempts_list = get_user_attempts(request, credentials)
74 attempt_count = max(
75 (
76 attempts.aggregate(Sum("failures_since_start"))[
77 "failures_since_start__sum"
78 ]
79 or 0
80 )
81 for attempts in attempts_list
82 )
83 return attempt_count
6984
7085 def user_login_failed(
7186 self, sender, credentials: dict, request=None, **kwargs
7287 ): # pylint: disable=too-many-locals
7388 """
74 When user login fails, save AccessAttempt record in database and lock user out if necessary.
75
76 :raises AxesSignalPermissionDenied: if user should be locked out.
77 """
89 When user login fails, save AccessAttempt record in database, mark request with lockout attribute and emit lockout signal.
90 """
91
92 log.info("AXES: User login failed, running database handler for failure.")
7893
7994 if request is None:
8095 log.error(
91106 request.axes_ip_address,
92107 request.axes_user_agent,
93108 request.axes_path_info,
94 )
95
96 # This replaces null byte chars that crash saving failures, meaning an attacker doesn't get locked out.
109 request,
110 )
111
112 # This replaces null byte chars that crash saving failures.
97113 get_data = get_query_str(request.GET).replace("\0", "0x00")
98114 post_data = get_query_str(request.POST).replace("\0", "0x00")
99115
101117 log.info("AXES: Login failed from whitelisted client %s.", client_str)
102118 return
103119
104 # 2. database query: Calculate the current maximum failure number from the existing attempts
105 failures_since_start = 1 + self.get_failures(request, credentials)
106
107 # 3. database query: Insert or update access records with the new failure data
108 if failures_since_start > 1:
109 # Update failed attempt information but do not touch the username, IP address, or user agent fields,
110 # because attackers can request the site with multiple different configurations
111 # in order to bypass the defense mechanisms that are used by the site.
112
120 # 2. database query: Get or create access record with the new failure data
121 if settings.AXES_ONLY_USER_FAILURES and username is None:
113122 log.warning(
114 "AXES: Repeated login failure by %s. Count = %d of %d. Updating existing record in the database.",
115 client_str,
116 failures_since_start,
117 get_failure_limit(request, credentials),
118 )
119
120 separator = "\n---------\n"
121
122 attempts = get_user_attempts(request, credentials)
123 attempts.update(
124 get_data=Concat("get_data", Value(separator + get_data)),
125 post_data=Concat("post_data", Value(separator + post_data)),
126 http_accept=request.axes_http_accept,
127 path_info=request.axes_path_info,
128 failures_since_start=failures_since_start,
129 attempt_time=request.axes_attempt_time,
130 username=username,
123 "AXES: Username is None and AXES_ONLY_USER_FAILURES is enabled, new record will NOT be created."
131124 )
132125 else:
133 # Record failed attempt with all the relevant information.
134 # Filtering based on username, IP address and user agent handled elsewhere,
135 # and this handler just records the available information for further use.
136
137 log.warning(
138 "AXES: New login failure by %s. Creating new record in the database.",
139 client_str,
140 )
141
142 AccessAttempt.objects.create(
143 username=username,
144 ip_address=request.axes_ip_address,
145 user_agent=request.axes_user_agent,
146 get_data=get_data,
147 post_data=post_data,
148 http_accept=request.axes_http_accept,
149 path_info=request.axes_path_info,
150 failures_since_start=failures_since_start,
151 attempt_time=request.axes_attempt_time,
152 )
126 with transaction.atomic():
127 (
128 attempt,
129 created,
130 ) = AccessAttempt.objects.select_for_update().get_or_create(
131 username=username,
132 ip_address=request.axes_ip_address,
133 user_agent=request.axes_user_agent,
134 defaults={
135 "get_data": get_data,
136 "post_data": post_data,
137 "http_accept": request.axes_http_accept,
138 "path_info": request.axes_path_info,
139 "failures_since_start": 1,
140 "attempt_time": request.axes_attempt_time,
141 },
142 )
143
144 # Record failed attempt with all the relevant information.
145 # Filtering based on username, IP address and user agent handled elsewhere,
146 # and this handler just records the available information for further use.
147 if created:
148 log.warning(
149 "AXES: New login failure by %s. Created new record in the database.",
150 client_str,
151 )
152
153 # 3. database query if there were previous attempts in the database
154 # Update failed attempt information but do not touch the username, IP address, or user agent fields,
155 # because attackers can request the site with multiple different configurations
156 # in order to bypass the defense mechanisms that are used by the site.
157 else:
158 separator = "\n---------\n"
159
160 attempt.get_data = Concat("get_data", Value(separator + get_data))
161 attempt.post_data = Concat(
162 "post_data", Value(separator + post_data)
163 )
164 attempt.http_accept = request.axes_http_accept
165 attempt.path_info = request.axes_path_info
166 attempt.failures_since_start = F("failures_since_start") + 1
167 attempt.attempt_time = request.axes_attempt_time
168 attempt.save()
169
170 log.warning(
171 "AXES: Repeated login failure by %s. Updated existing record in the database.",
172 client_str,
173 )
174
175 # 3. or 4. database query: Calculate the current maximum failure number from the existing attempts
176 failures_since_start = self.get_failures(request, credentials)
177 request.axes_failures_since_start = failures_since_start
153178
154179 if (
155180 settings.AXES_LOCK_OUT_AT_FAILURE
160185 )
161186
162187 request.axes_locked_out = True
163
164188 user_locked_out.send(
165189 "axes",
166190 request=request,
185209 request.axes_ip_address,
186210 request.axes_user_agent,
187211 request.axes_path_info,
212 request,
188213 )
189214
190215 log.info("AXES: Successful login by %s.", client_str)
225250 request.axes_ip_address,
226251 request.axes_user_agent,
227252 request.axes_path_info,
253 request,
228254 )
229255
230256 log.info("AXES: Successful logout by %s.", client_str)
1212 toggleable,
1313 )
1414
15 log = getLogger(settings.AXES_LOGGER)
15 log = getLogger(__name__)
1616
1717
1818 class AxesProxyHandler(AbstractAxesHandler, AxesBaseHandler):
4242 return cls.implementation
4343
4444 @classmethod
45 def reset_attempts(cls, *, ip_address: str = None, username: str = None) -> int:
45 def reset_attempts(
46 cls,
47 *,
48 ip_address: str = None,
49 username: str = None,
50 ip_or_username: bool = False,
51 ) -> int:
4652 return cls.get_implementation().reset_attempts(
47 ip_address=ip_address, username=username
53 ip_address=ip_address, username=username, ip_or_username=ip_or_username
4854 )
4955
5056 @classmethod
6975 request.axes_user_agent = get_client_user_agent(request)
7076 request.axes_path_info = get_client_path_info(request)
7177 request.axes_http_accept = get_client_http_accept(request)
78 request.axes_failures_since_start = None
7279 request.axes_updated = True
7380
7481 @classmethod
112119 return cls.get_implementation().post_save_access_attempt(instance, **kwargs)
113120
114121 @classmethod
122 @toggleable
115123 def post_delete_access_attempt(cls, instance, **kwargs):
116124 return cls.get_implementation().post_delete_access_attempt(instance, **kwargs)
55 Signal handler implementation that does nothing, ideal for a test suite.
66 """
77
8 def reset_attempts(self, *, ip_address: str = None, username: str = None) -> int:
8 def reset_attempts(
9 self,
10 *,
11 ip_address: str = None,
12 username: str = None,
13 ip_or_username: bool = False,
14 ) -> int:
915 return 0
1016
1117 def reset_logs(self, *, age_days: int = None) -> int:
44 from typing import Callable, Optional, Type, Union
55 from urllib.parse import urlencode
66
7 import ipware.ip
78 from django.core.cache import caches, BaseCache
89 from django.http import HttpRequest, HttpResponse, JsonResponse, QueryDict
910 from django.shortcuts import render, redirect
1011 from django.utils.module_loading import import_string
11
12 import ipware.ip
1312
1413 from axes.conf import settings
1514 from axes.models import AccessBase
173172 return request.META.get("HTTP_ACCEPT", "<unknown>")[:1025]
174173
175174
176 def get_client_parameters(username: str, ip_address: str, user_agent: str) -> dict:
175 def get_client_parameters(username: str, ip_address: str, user_agent: str) -> list:
177176 """
178177 Get query parameters for filtering AccessAttempt queryset.
179178
180179 This method returns a dict that guarantees iteration order for keys and values,
181180 and can so be used in e.g. the generation of hash keys or other deterministic functions.
182 """
183
184 filter_kwargs = dict()
181
182 Returns list of dict, every item of list are separate parameters
183 """
185184
186185 if settings.AXES_ONLY_USER_FAILURES:
187186 # 1. Only individual usernames can be tracked with parametrization
188 filter_kwargs["username"] = username
187 filter_query = [{"username": username}]
189188 else:
190 if settings.AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP:
189 if settings.AXES_LOCK_OUT_BY_USER_OR_IP:
190 # One of `username` or `IP address` is used
191 filter_query = [{"username": username}, {"ip_address": ip_address}]
192 elif settings.AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP:
191193 # 2. A combination of username and IP address can be used as well
192 filter_kwargs["username"] = username
193 filter_kwargs["ip_address"] = ip_address
194 filter_query = [{"username": username, "ip_address": ip_address}]
194195 else:
195196 # 3. Default case is to track the IP address only, which is the most secure option
196 filter_kwargs["ip_address"] = ip_address
197 filter_query = [{"ip_address": ip_address}]
197198
198199 if settings.AXES_USE_USER_AGENT:
199200 # 4. The HTTP User-Agent can be used to track e.g. one browser
200 filter_kwargs["user_agent"] = user_agent
201
202 return filter_kwargs
201 filter_query.append({"user_agent": user_agent})
202
203 return filter_query
204
205
206 def make_cache_key_list(filter_kwargs_list):
207 cache_keys = []
208 for filter_kwargs in filter_kwargs_list:
209 cache_key_components = "".join(
210 value for value in filter_kwargs.values() if value
211 )
212 cache_key_digest = md5(cache_key_components.encode()).hexdigest()
213 cache_keys.append(f"axes-{cache_key_digest}")
214 return cache_keys
203215
204216
205217 def get_client_cache_key(
222234 ip_address = get_client_ip_address(request_or_attempt)
223235 user_agent = get_client_user_agent(request_or_attempt)
224236
225 filter_kwargs = get_client_parameters(username, ip_address, user_agent)
226
227 cache_key_components = "".join(value for value in filter_kwargs.values() if value)
228 cache_key_digest = md5(cache_key_components.encode()).hexdigest()
229 cache_key = f"axes-{cache_key_digest}"
230
231 return cache_key
237 filter_kwargs_list = get_client_parameters(username, ip_address, user_agent)
238
239 return make_cache_key_list(filter_kwargs_list)
232240
233241
234242 def get_client_str(
235 username: str, ip_address: str, user_agent: str, path_info: str
243 username: str,
244 ip_address: str,
245 user_agent: str,
246 path_info: str,
247 request: HttpRequest,
236248 ) -> str:
237249 """
238250 Get a readable string that can be used in e.g. logging to distinguish client requests.
240252 Example log format would be
241253 ``{username: "example", ip_address: "127.0.0.1", path_info: "/example/"}``
242254 """
255
256 if settings.AXES_CLIENT_STR_CALLABLE:
257 log.debug("Using settings.AXES_CLIENT_STR_CALLABLE to get client string.")
258
259 if callable(settings.AXES_CLIENT_STR_CALLABLE):
260 return settings.AXES_CLIENT_STR_CALLABLE(
261 username, ip_address, user_agent, path_info, request
262 )
263 if isinstance(settings.AXES_CLIENT_STR_CALLABLE, str):
264 return import_string(settings.AXES_CLIENT_STR_CALLABLE)(
265 username, ip_address, user_agent, path_info, request
266 )
267 raise TypeError(
268 "settings.AXES_CLIENT_STR_CALLABLE needs to be a string, callable or None."
269 )
243270
244271 client_dict = dict()
245272
250277 client_dict["user_agent"] = user_agent
251278 else:
252279 # Other modes initialize the attributes that are used for the actual lockouts
253 client_dict = get_client_parameters(username, ip_address, user_agent)
280 client_list = get_client_parameters(username, ip_address, user_agent)
281 client_dict = {}
282 for client in client_list:
283 client_dict.update(client)
254284
255285 # Path info is always included as last component in the client string for traceability purposes
256286 if path_info and isinstance(path_info, (tuple, list)):
265295 return client_str
266296
267297
298 def cleanse_parameters(params: dict) -> dict:
299 """
300 Replace sensitive parameter values in a parameter dict with
301 a safe placeholder value.
302
303 Parameters name ``'password'`` will always be cleansed. Additionally,
304 parameters named in ``settings.AXES_SENSITIVE_PARAMETERS`` and
305 ``settings.AXES_PASSWORD_FORM_FIELD will be cleansed.
306
307 This is used to prevent passwords and similar values from
308 being logged in cleartext.
309 """
310 sensitive_parameters = ["password"] + settings.AXES_SENSITIVE_PARAMETERS
311 if settings.AXES_PASSWORD_FORM_FIELD:
312 sensitive_parameters.append(settings.AXES_PASSWORD_FORM_FIELD)
313
314 if sensitive_parameters:
315 cleansed = params.copy()
316 for param in sensitive_parameters:
317 if param in cleansed:
318 cleansed[param] = "********************"
319 return cleansed
320 return params
321
322
268323 def get_query_str(query: Type[QueryDict], max_length: int = 1024) -> str:
269324 """
270325 Turns a query dictionary into an easy-to-read list of key-value pairs.
271326
272 If a field is called either ``'password'`` or ``settings.AXES_PASSWORD_FORM_FIELD`` it will be excluded.
327 If a field is called either ``'password'`` or ``settings.AXES_PASSWORD_FORM_FIELD`` or if the fieldname is included
328 in ``settings.AXES_SENSITIVE_PARAMETERS`` its value will be masked.
273329
274330 The length of the output is limited to max_length to avoid a DoS attack via excessively large payloads.
275331 """
276332
277 query_dict = query.copy()
278 query_dict.pop("password", None)
279 query_dict.pop(settings.AXES_PASSWORD_FORM_FIELD, None)
333 query_dict = cleanse_parameters(query.copy())
280334
281335 template = Template("$key=$value")
282336 items = [{"key": k, "value": v} for k, v in query_dict.items()]
311365 "settings.AXES_LOCKOUT_CALLABLE needs to be a string, callable, or None."
312366 )
313367
314 status = 403
368 status = settings.AXES_HTTP_RESPONSE_CODE
315369 context = {
316370 "failure_limit": get_failure_limit(request, credentials),
317371 "username": get_client_username(request, credentials) or "",
328382 }
329383 )
330384
331 if request.is_ajax():
332 return JsonResponse(context, status=status)
385 if request.META.get("HTTP_X_REQUESTED_WITH") == "XMLHttpRequest":
386 json_response = JsonResponse(context, status=status)
387 json_response[
388 "Access-Control-Allow-Origin"
389 ] = settings.AXES_ALLOWED_CORS_ORIGINS
390 json_response["Access-Control-Allow-Methods"] = "POST, OPTIONS"
391 json_response[
392 "Access-Control-Allow-Headers"
393 ] = "Origin, Content-Type, Accept, Authorization, x-requested-with"
394 return json_response
333395
334396 if settings.AXES_LOCKOUT_TEMPLATE:
335397 return render(request, settings.AXES_LOCKOUT_TEMPLATE, context, status=status)
2626 msgstr "Meta-Daten"
2727
2828 #: axes/conf.py:58
29 msgid "Account locked: too many login attempts. Please try again later"
29 msgid "Account locked: too many login attempts. Please try again later."
3030 msgstr ""
3131 "Zugang gesperrt: zu viele fehlgeschlagene Anmeldeversuche. Bitte versuchen "
3232 "Sie es später erneut."
0 # SOME DESCRIPTIVE TITLE.
1 # Copyright (C) YEAR THE PACKAGE'S COPYRIGHT HOLDER
2 # This file is distributed under the same license as the PACKAGE package.
3 # FIRST AUTHOR <EMAIL@ADDRESS>, YEAR.
4 #
5 msgid ""
6 msgstr ""
7 "Project-Id-Version: \n"
8 "Report-Msgid-Bugs-To: \n"
9 "POT-Creation-Date: 2021-06-11 23:36+0200\n"
10 "PO-Revision-Date: 2021-06-16 10:51+0300\n"
11 "Language: pl\n"
12 "MIME-Version: 1.0\n"
13 "Content-Type: text/plain; charset=UTF-8\n"
14 "Content-Transfer-Encoding: 8bit\n"
15 "Plural-Forms: nplurals=4; plural=(n==1 ? 0 : (n%10>=2 && n%10<=4) && (n"
16 "%100<12 || n%100>14) ? 1 : n!=1 && (n%10>=0 && n%10<=1) || (n%10>=5 && n"
17 "%10<=9) || (n%100>=12 && n%100<=14) ? 2 : 3);\n"
18 "Last-Translator: \n"
19 "Language-Team: \n"
20 "X-Generator: Poedit 3.0\n"
21
22 #: .\axes\admin.py:26
23 msgid "Form Data"
24 msgstr "Dane formularza"
25
26 #: .\axes\admin.py:27 .\axes\admin.py:64
27 msgid "Meta Data"
28 msgstr "Metadane"
29
30 #: .\axes\conf.py:89
31 msgid "Account locked: too many login attempts. Please try again later."
32 msgstr ""
33 "Konto zablokowane: zbyt wiele prób logowania. Spróbuj ponownie później."
34
35 #: .\axes\conf.py:97
36 msgid ""
37 "Account locked: too many login attempts. Contact an admin to unlock your "
38 "account."
39 msgstr ""
40 "Konto zablokowane: zbyt wiele prób logowania. Skontaktuj się z "
41 "administratorem, aby odblokować swoje konto."
42
43 #: .\axes\models.py:6
44 #, fuzzy
45 msgid "User Agent"
46 msgstr "User Agent"
47
48 #: .\axes\models.py:8
49 msgid "IP Address"
50 msgstr "Adres IP"
51
52 #: .\axes\models.py:10
53 msgid "Username"
54 msgstr "Nazwa Użytkownika"
55
56 #: .\axes\models.py:12
57 #, fuzzy
58 msgid "HTTP Accept"
59 msgstr "HTTP Accept"
60
61 #: .\axes\models.py:14
62 msgid "Path"
63 msgstr "Ścieżka"
64
65 #: .\axes\models.py:16
66 msgid "Attempt Time"
67 msgstr "Czas wystąpienia"
68
69 #: .\axes\models.py:25
70 msgid "GET Data"
71 msgstr "Dane GET"
72
73 #: .\axes\models.py:27
74 msgid "POST Data"
75 msgstr "Dane POST"
76
77 #: .\axes\models.py:29
78 msgid "Failed Logins"
79 msgstr "Nieudane logowania"
80
81 #: .\axes\models.py:35
82 msgid "access attempt"
83 msgstr "próba dostępu"
84
85 #: .\axes\models.py:36
86 msgid "access attempts"
87 msgstr "próby dostępu"
88
89 #: .\axes\models.py:40
90 msgid "Logout Time"
91 msgstr "Czas wylogowania"
92
93 #: .\axes\models.py:46
94 msgid "access log"
95 msgstr "dziennik logowania"
96
97 #: .\axes\models.py:47
98 msgid "access logs"
99 msgstr "dzienniki logowania"
2626 msgstr "Метаданные"
2727
2828 #: axes/conf.py:58
29 msgid "Account locked: too many login attempts. Please try again later"
29 msgid "Account locked: too many login attempts. Please try again later."
3030 msgstr ""
3131 "Учетная запись заблокирована: слишком много попыток входа. "
3232 "Повторите попытку позже."
2626 msgstr "Meta-Verisi"
2727
2828 #: axes/conf.py:58
29 msgid "Account locked: too many login attempts. Please try again later"
29 msgid "Account locked: too many login attempts. Please try again later."
3030 msgstr ""
31 "Hesap kilitlendi: cok fazla erişim denemesi. Lütfen daha sonra tekrar deneyiniz"
31 "Hesap kilitlendi: cok fazla erişim denemesi. Lütfen daha sonra tekrar deneyiniz."
3232
3333 #: axes/conf.py:61
3434 msgid ""
3636 "account."
3737 msgstr ""
3838 "Hesap kilitlendi: cok fazla erişim denemesi. Hesabını açtırmak için yöneticiyle iletişime"
39 "geçin"
39 "geçin."
4040
4141 #: axes/models.py:9
4242 msgid "User Agent"
00 from typing import Callable
1
2 from django.conf import settings
13
24 from axes.helpers import get_lockout_response
35
68 """
79 Middleware that calculates necessary HTTP request attributes for attempt monitoring
810 and maps lockout signals into readable HTTP 403 Forbidden responses.
11
12 If a project uses ``django rest framework`` then the middleware updates the
13 request and checks whether the limit has been exceeded. It's needed only
14 for integration with DRF because it uses its own request object.
915
1016 This middleware recognizes a logout monitoring flag in the request and
1117 and uses the ``axes.helpers.get_lockout_response`` handler for returning
2834 def __call__(self, request):
2935 response = self.get_response(request)
3036
31 if getattr(request, "axes_locked_out", None):
32 response = get_lockout_response(request) # type: ignore
37 if settings.AXES_ENABLED:
38 if getattr(request, "axes_locked_out", None):
39 response = get_lockout_response(request) # type: ignore
3340
3441 return response
66 )
77 from django.core.signals import setting_changed
88 from django.db.models.signals import post_save, post_delete
9 from django.dispatch import Signal
910 from django.dispatch import receiver
10 from django.dispatch import Signal
1111
12 from axes.conf import settings
12 from axes.handlers.proxy import AxesProxyHandler
1313 from axes.models import AccessAttempt
14 from axes.handlers.proxy import AxesProxyHandler
1514
16 log = getLogger(settings.AXES_LOGGER)
15 log = getLogger(__name__)
1716
1817
19 user_locked_out = Signal(providing_args=["request", "username", "ip_address"])
18 # This signal provides the following arguments to any listeners:
19 # request - The current Request object.
20 # username - The username of the User who has been locked out.
21 # ip_address - The IP of the user who has been locked out.
22 user_locked_out = Signal()
2023
2124
2225 @receiver(user_login_failed)
+0
-0
axes/tests/__init__.py less more
(Empty file)
+0
-185
axes/tests/base.py less more
0 from random import choice
1 from string import ascii_letters, digits
2 from time import sleep
3
4 from django.contrib.auth import get_user_model
5 from django.http import HttpRequest
6 from django.test import TestCase
7 from django.urls import reverse
8 from django.utils.timezone import now
9
10 from axes.utils import reset
11 from axes.conf import settings
12 from axes.helpers import (
13 get_cache,
14 get_client_http_accept,
15 get_client_ip_address,
16 get_client_path_info,
17 get_client_user_agent,
18 get_cool_off,
19 get_credentials,
20 get_failure_limit,
21 )
22 from axes.models import AccessAttempt, AccessLog
23
24
25 def custom_failure_limit(request, credentials):
26 return 3
27
28
29 class AxesTestCase(TestCase):
30 """
31 Test case using custom settings for testing.
32 """
33
34 VALID_USERNAME = "axes-valid-username"
35 VALID_PASSWORD = "axes-valid-password"
36 VALID_EMAIL = "axes-valid-email@example.com"
37 VALID_USER_AGENT = "axes-user-agent"
38 VALID_IP_ADDRESS = "127.0.0.1"
39
40 INVALID_USERNAME = "axes-invalid-username"
41 INVALID_PASSWORD = "axes-invalid-password"
42 INVALID_EMAIL = "axes-invalid-email@example.com"
43
44 LOCKED_MESSAGE = "Account locked: too many login attempts."
45 LOGOUT_MESSAGE = "Logged out"
46 LOGIN_FORM_KEY = '<input type="submit" value="Log in" />'
47
48 STATUS_SUCCESS = 200
49 ALLOWED = 302
50 BLOCKED = 403
51
52 def setUp(self):
53 """
54 Create a valid user for login.
55 """
56
57 self.username = self.VALID_USERNAME
58 self.password = self.VALID_PASSWORD
59 self.email = self.VALID_EMAIL
60
61 self.ip_address = self.VALID_IP_ADDRESS
62 self.user_agent = self.VALID_USER_AGENT
63 self.path_info = reverse("admin:login")
64
65 self.user = get_user_model().objects.create_superuser(
66 username=self.username, password=self.password, email=self.email
67 )
68
69 self.request = HttpRequest()
70 self.request.method = "POST"
71 self.request.META["REMOTE_ADDR"] = self.ip_address
72 self.request.META["HTTP_USER_AGENT"] = self.user_agent
73 self.request.META["PATH_INFO"] = self.path_info
74
75 self.request.axes_attempt_time = now()
76 self.request.axes_ip_address = get_client_ip_address(self.request)
77 self.request.axes_user_agent = get_client_user_agent(self.request)
78 self.request.axes_path_info = get_client_path_info(self.request)
79 self.request.axes_http_accept = get_client_http_accept(self.request)
80
81 self.credentials = get_credentials(self.username)
82
83 def tearDown(self):
84 get_cache().clear()
85
86 def get_kwargs_with_defaults(self, **kwargs):
87 defaults = {
88 "user_agent": self.user_agent,
89 "ip_address": self.ip_address,
90 "username": self.username,
91 }
92
93 defaults.update(kwargs)
94 return defaults
95
96 def create_attempt(self, **kwargs):
97 kwargs = self.get_kwargs_with_defaults(**kwargs)
98 kwargs.setdefault("failures_since_start", 1)
99 return AccessAttempt.objects.create(**kwargs)
100
101 def create_log(self, **kwargs):
102 return AccessLog.objects.create(**self.get_kwargs_with_defaults(**kwargs))
103
104 def reset(self, ip=None, username=None):
105 return reset(ip, username)
106
107 def login(self, is_valid_username=False, is_valid_password=False, **kwargs):
108 """
109 Login a user.
110
111 A valid credential is used when is_valid_username is True,
112 otherwise it will use a random string to make a failed login.
113 """
114
115 if is_valid_username:
116 username = self.VALID_USERNAME
117 else:
118 username = "".join(choice(ascii_letters + digits) for _ in range(10))
119
120 if is_valid_password:
121 password = self.VALID_PASSWORD
122 else:
123 password = self.INVALID_PASSWORD
124
125 post_data = {"username": username, "password": password, **kwargs}
126
127 return self.client.post(
128 reverse("admin:login"),
129 post_data,
130 REMOTE_ADDR=self.ip_address,
131 HTTP_USER_AGENT=self.user_agent,
132 )
133
134 def logout(self):
135 return self.client.post(
136 reverse("admin:logout"),
137 REMOTE_ADDR=self.ip_address,
138 HTTP_USER_AGENT=self.user_agent,
139 )
140
141 def check_login(self):
142 response = self.login(is_valid_username=True, is_valid_password=True)
143 self.assertNotContains(
144 response, self.LOGIN_FORM_KEY, status_code=self.ALLOWED, html=True
145 )
146
147 def almost_lockout(self):
148 for _ in range(1, get_failure_limit(None, None)):
149 response = self.login()
150 self.assertContains(response, self.LOGIN_FORM_KEY, html=True)
151
152 def lockout(self):
153 self.almost_lockout()
154 return self.login()
155
156 def check_lockout(self):
157 response = self.lockout()
158 if settings.AXES_LOCK_OUT_AT_FAILURE == True:
159 self.assertContains(response, self.LOCKED_MESSAGE, status_code=self.BLOCKED)
160 else:
161 self.assertNotContains(
162 response, self.LOCKED_MESSAGE, status_code=self.STATUS_SUCCESS
163 )
164
165 def cool_off(self):
166 sleep(get_cool_off().total_seconds())
167
168 def check_logout(self):
169 response = self.logout()
170 self.assertContains(
171 response, self.LOGOUT_MESSAGE, status_code=self.STATUS_SUCCESS
172 )
173
174 def check_handler(self):
175 """
176 Check a handler and its basic functionality with lockouts, cool offs, login, and logout.
177
178 This is a check that is intended to successfully run for each and every new handler.
179 """
180
181 self.check_lockout()
182 self.cool_off()
183 self.check_login()
184 self.check_logout()
+0
-74
axes/tests/settings.py less more
0 DATABASES = {"default": {"ENGINE": "django.db.backends.sqlite3", "NAME": ":memory:"}}
1
2 CACHES = {
3 "default": {
4 # This cache backend is OK to use in development and testing
5 # but has the potential to break production setups with more than on process
6 # due to each process having their own local memory based cache
7 "BACKEND": "django.core.cache.backends.locmem.LocMemCache"
8 }
9 }
10
11 SITE_ID = 1
12
13 MIDDLEWARE = [
14 "django.middleware.common.CommonMiddleware",
15 "django.contrib.sessions.middleware.SessionMiddleware",
16 "django.contrib.auth.middleware.AuthenticationMiddleware",
17 "django.contrib.messages.middleware.MessageMiddleware",
18 "axes.middleware.AxesMiddleware",
19 ]
20
21 AUTHENTICATION_BACKENDS = [
22 "axes.backends.AxesBackend",
23 "django.contrib.auth.backends.ModelBackend",
24 ]
25
26 PASSWORD_HASHERS = ["django.contrib.auth.hashers.MD5PasswordHasher"]
27
28 ROOT_URLCONF = "axes.tests.urls"
29
30 INSTALLED_APPS = [
31 "django.contrib.auth",
32 "django.contrib.contenttypes",
33 "django.contrib.sessions",
34 "django.contrib.sites",
35 "django.contrib.messages",
36 "django.contrib.admin",
37 "axes",
38 ]
39
40 TEMPLATES = [
41 {
42 "BACKEND": "django.template.backends.django.DjangoTemplates",
43 "DIRS": [],
44 "APP_DIRS": True,
45 "OPTIONS": {
46 "context_processors": [
47 "django.template.context_processors.debug",
48 "django.template.context_processors.request",
49 "django.contrib.auth.context_processors.auth",
50 "django.contrib.messages.context_processors.messages",
51 ]
52 },
53 }
54 ]
55
56 LOGGING = {
57 "version": 1,
58 "disable_existing_loggers": False,
59 "handlers": {"console": {"class": "logging.StreamHandler"}},
60 "loggers": {"axes": {"handlers": ["console"], "level": "INFO", "propagate": False}},
61 }
62
63 SECRET_KEY = "too-secret-for-test"
64
65 USE_I18N = False
66
67 USE_L10N = False
68
69 USE_TZ = False
70
71 LOGIN_REDIRECT_URL = "/admin/"
72
73 AXES_FAILURE_LIMIT = 10
+0
-28
axes/tests/test_admin.py less more
0 from contextlib import suppress
1 from importlib import reload
2
3 from django.contrib import admin
4 from django.test import override_settings
5
6 import axes.admin
7 from axes.models import AccessAttempt, AccessLog
8 from axes.tests.base import AxesTestCase
9
10
11 class AxesEnableAdminFlag(AxesTestCase):
12 def setUp(self):
13 with suppress(admin.sites.NotRegistered):
14 admin.site.unregister(AccessAttempt)
15 with suppress(admin.sites.NotRegistered):
16 admin.site.unregister(AccessLog)
17
18 @override_settings(AXES_ENABLE_ADMIN=False)
19 def test_disable_admin(self):
20 reload(axes.admin)
21 self.assertFalse(admin.site.is_registered(AccessAttempt))
22 self.assertFalse(admin.site.is_registered(AccessLog))
23
24 def test_enable_admin_by_default(self):
25 reload(axes.admin)
26 self.assertTrue(admin.site.is_registered(AccessAttempt))
27 self.assertTrue(admin.site.is_registered(AccessLog))
+0
-46
axes/tests/test_attempts.py less more
0 from unittest.mock import patch
1
2 from django.test import override_settings
3 from django.utils.timezone import now
4
5 from axes.attempts import get_cool_off_threshold
6 from axes.models import AccessAttempt
7 from axes.tests.base import AxesTestCase
8 from axes.utils import reset
9
10
11 class GetCoolOffThresholdTestCase(AxesTestCase):
12 @override_settings(AXES_COOLOFF_TIME=42)
13 def test_get_cool_off_threshold(self):
14 timestamp = now()
15
16 with patch("axes.attempts.now", return_value=timestamp):
17 attempt_time = timestamp
18 threshold_now = get_cool_off_threshold(attempt_time)
19
20 attempt_time = None
21 threshold_none = get_cool_off_threshold(attempt_time)
22
23 self.assertEqual(threshold_now, threshold_none)
24
25 @override_settings(AXES_COOLOFF_TIME=None)
26 def test_get_cool_off_threshold_error(self):
27 with self.assertRaises(TypeError):
28 get_cool_off_threshold()
29
30
31 class ResetTestCase(AxesTestCase):
32 def test_reset(self):
33 self.create_attempt()
34 reset()
35 self.assertFalse(AccessAttempt.objects.count())
36
37 def test_reset_ip(self):
38 self.create_attempt(ip_address=self.ip_address)
39 reset(ip=self.ip_address)
40 self.assertFalse(AccessAttempt.objects.count())
41
42 def test_reset_username(self):
43 self.create_attempt(username=self.username)
44 reset(username=self.username)
45 self.assertFalse(AccessAttempt.objects.count())
+0
-23
axes/tests/test_backends.py less more
0 from unittest.mock import patch, MagicMock
1
2 from axes.backends import AxesBackend
3 from axes.exceptions import (
4 AxesBackendRequestParameterRequired,
5 AxesBackendPermissionDenied,
6 )
7 from axes.tests.base import AxesTestCase
8
9
10 class BackendTestCase(AxesTestCase):
11 def test_authenticate_raises_on_missing_request(self):
12 request = None
13
14 with self.assertRaises(AxesBackendRequestParameterRequired):
15 AxesBackend().authenticate(request)
16
17 @patch("axes.handlers.proxy.AxesProxyHandler.is_allowed", return_value=False)
18 def test_authenticate_raises_on_locked_request(self, _):
19 request = MagicMock()
20
21 with self.assertRaises(AxesBackendPermissionDenied):
22 AxesBackend().authenticate(request)
+0
-110
axes/tests/test_checks.py less more
0 from django.core.checks import run_checks, Warning # pylint: disable=redefined-builtin
1 from django.test import override_settings, modify_settings
2
3 from axes.backends import AxesBackend
4 from axes.checks import Messages, Hints, Codes
5 from axes.tests.base import AxesTestCase
6
7
8 class CacheCheckTestCase(AxesTestCase):
9 @override_settings(
10 AXES_HANDLER="axes.handlers.cache.AxesCacheHandler",
11 CACHES={
12 "default": {
13 "BACKEND": "django.core.cache.backends.db.DatabaseCache",
14 "LOCATION": "axes_cache",
15 }
16 },
17 )
18 def test_cache_check(self):
19 warnings = run_checks()
20 self.assertEqual(warnings, [])
21
22 @override_settings(
23 AXES_HANDLER="axes.handlers.cache.AxesCacheHandler",
24 CACHES={
25 "default": {"BACKEND": "django.core.cache.backends.locmem.LocMemCache"}
26 },
27 )
28 def test_cache_check_warnings(self):
29 warnings = run_checks()
30 warning = Warning(
31 msg=Messages.CACHE_INVALID, hint=Hints.CACHE_INVALID, id=Codes.CACHE_INVALID
32 )
33
34 self.assertEqual(warnings, [warning])
35
36 @override_settings(
37 AXES_HANDLER="axes.handlers.database.AxesDatabaseHandler",
38 CACHES={
39 "default": {"BACKEND": "django.core.cache.backends.locmem.LocMemCache"}
40 },
41 )
42 def test_cache_check_does_not_produce_check_warnings_with_database_handler(self):
43 warnings = run_checks()
44 self.assertEqual(warnings, [])
45
46
47 class MiddlewareCheckTestCase(AxesTestCase):
48 @modify_settings(MIDDLEWARE={"remove": ["axes.middleware.AxesMiddleware"]})
49 def test_cache_check_warnings(self):
50 warnings = run_checks()
51 warning = Warning(
52 msg=Messages.MIDDLEWARE_INVALID,
53 hint=Hints.MIDDLEWARE_INVALID,
54 id=Codes.MIDDLEWARE_INVALID,
55 )
56
57 self.assertEqual(warnings, [warning])
58
59
60 class AxesSpecializedBackend(AxesBackend):
61 pass
62
63
64 class BackendCheckTestCase(AxesTestCase):
65 @modify_settings(AUTHENTICATION_BACKENDS={"remove": ["axes.backends.AxesBackend"]})
66 def test_backend_missing(self):
67 warnings = run_checks()
68 warning = Warning(
69 msg=Messages.BACKEND_INVALID,
70 hint=Hints.BACKEND_INVALID,
71 id=Codes.BACKEND_INVALID,
72 )
73
74 self.assertEqual(warnings, [warning])
75
76 @override_settings(
77 AUTHENTICATION_BACKENDS=["axes.tests.test_checks.AxesSpecializedBackend"]
78 )
79 def test_specialized_backend(self):
80 warnings = run_checks()
81 self.assertEqual(warnings, [])
82
83 @override_settings(
84 AUTHENTICATION_BACKENDS=["axes.tests.test_checks.AxesNotDefinedBackend"]
85 )
86 def test_import_error(self):
87 with self.assertRaises(ImportError):
88 run_checks()
89
90 @override_settings(AUTHENTICATION_BACKENDS=["module.not_defined"])
91 def test_module_not_found_error(self):
92 with self.assertRaises(ModuleNotFoundError):
93 run_checks()
94
95
96 class DeprecatedSettingsTestCase(AxesTestCase):
97 def setUp(self):
98 self.disable_success_access_log_warning = Warning(
99 msg=Messages.SETTING_DEPRECATED.format(
100 deprecated_setting="AXES_DISABLE_SUCCESS_ACCESS_LOG"
101 ),
102 hint=Hints.SETTING_DEPRECATED,
103 id=Codes.SETTING_DEPRECATED,
104 )
105
106 @override_settings(AXES_DISABLE_SUCCESS_ACCESS_LOG=True)
107 def test_deprecated_success_access_log_flag(self):
108 warnings = run_checks()
109 self.assertEqual(warnings, [self.disable_success_access_log_warning])
+0
-40
axes/tests/test_decorators.py less more
0 from unittest.mock import MagicMock, patch
1
2 from django.http import HttpResponse
3
4 from axes.decorators import axes_dispatch, axes_form_invalid
5 from axes.tests.base import AxesTestCase
6
7
8 class DecoratorTestCase(AxesTestCase):
9 SUCCESS_RESPONSE = HttpResponse(status=200, content="Dispatched")
10 LOCKOUT_RESPONSE = HttpResponse(status=403, content="Locked out")
11
12 def setUp(self):
13 self.request = MagicMock()
14 self.cls = MagicMock(return_value=self.request)
15 self.func = MagicMock(return_value=self.SUCCESS_RESPONSE)
16
17 @patch("axes.handlers.proxy.AxesProxyHandler.is_allowed", return_value=False)
18 @patch("axes.decorators.get_lockout_response", return_value=LOCKOUT_RESPONSE)
19 def test_axes_dispatch_locks_out(self, _, __):
20 response = axes_dispatch(self.func)(self.request)
21 self.assertEqual(response.content, self.LOCKOUT_RESPONSE.content)
22
23 @patch("axes.handlers.proxy.AxesProxyHandler.is_allowed", return_value=True)
24 @patch("axes.decorators.get_lockout_response", return_value=LOCKOUT_RESPONSE)
25 def test_axes_dispatch_dispatches(self, _, __):
26 response = axes_dispatch(self.func)(self.request)
27 self.assertEqual(response.content, self.SUCCESS_RESPONSE.content)
28
29 @patch("axes.handlers.proxy.AxesProxyHandler.is_allowed", return_value=False)
30 @patch("axes.decorators.get_lockout_response", return_value=LOCKOUT_RESPONSE)
31 def test_axes_form_invalid_locks_out(self, _, __):
32 response = axes_form_invalid(self.func)(self.cls)
33 self.assertEqual(response.content, self.LOCKOUT_RESPONSE.content)
34
35 @patch("axes.handlers.proxy.AxesProxyHandler.is_allowed", return_value=True)
36 @patch("axes.decorators.get_lockout_response", return_value=LOCKOUT_RESPONSE)
37 def test_axes_form_invalid_dispatches(self, _, __):
38 response = axes_form_invalid(self.func)(self.cls)
39 self.assertEqual(response.content, self.SUCCESS_RESPONSE.content)
+0
-293
axes/tests/test_handlers.py less more
0 from unittest.mock import MagicMock, patch
1
2 from django.test import override_settings
3 from django.urls import reverse
4 from django.utils import timezone
5 from django.utils.timezone import timedelta
6
7 from axes.conf import settings
8 from axes.handlers.proxy import AxesProxyHandler
9 from axes.helpers import get_client_str
10 from axes.models import AccessAttempt, AccessLog
11 from axes.tests.base import AxesTestCase
12
13
14 @override_settings(AXES_HANDLER="axes.handlers.base.AxesHandler")
15 class AxesHandlerTestCase(AxesTestCase):
16 @override_settings(AXES_IP_BLACKLIST=["127.0.0.1"])
17 def test_is_allowed_with_blacklisted_ip_address(self):
18 self.assertFalse(AxesProxyHandler.is_allowed(self.request))
19
20 @override_settings(
21 AXES_NEVER_LOCKOUT_WHITELIST=True, AXES_IP_WHITELIST=["127.0.0.1"]
22 )
23 def test_is_allowed_with_whitelisted_ip_address(self):
24 self.assertTrue(AxesProxyHandler.is_allowed(self.request))
25
26 @override_settings(AXES_NEVER_LOCKOUT_GET=True)
27 def test_is_allowed_with_whitelisted_method(self):
28 self.request.method = "GET"
29 self.assertTrue(AxesProxyHandler.is_allowed(self.request))
30
31 @override_settings(AXES_LOCK_OUT_AT_FAILURE=False)
32 def test_is_allowed_no_lock_out(self):
33 self.assertTrue(AxesProxyHandler.is_allowed(self.request))
34
35 @override_settings(AXES_ONLY_ADMIN_SITE=True)
36 def test_only_admin_site(self):
37 request = MagicMock()
38 request.path = "/test/"
39 self.assertTrue(AxesProxyHandler.is_allowed(self.request))
40
41 def test_is_admin_site(self):
42 request = MagicMock()
43 tests = ( # (AXES_ONLY_ADMIN_SITE, URL, Expected)
44 (True, "/test/", True),
45 (True, reverse("admin:index"), False),
46 (False, "/test/", False),
47 (False, reverse("admin:index"), False),
48 )
49
50 for setting_value, url, expected in tests:
51 with override_settings(AXES_ONLY_ADMIN_SITE=setting_value):
52 request.path = url
53 self.assertEqual(AxesProxyHandler().is_admin_site(request), expected)
54
55 @override_settings(ROOT_URLCONF="axes.tests.urls_empty")
56 @override_settings(AXES_ONLY_ADMIN_SITE=True)
57 def test_is_admin_site_no_admin_site(self):
58 request = MagicMock()
59 request.path = "/admin/"
60 self.assertTrue(AxesProxyHandler().is_admin_site(self.request))
61
62
63 class AxesProxyHandlerTestCase(AxesTestCase):
64 def setUp(self):
65 self.sender = MagicMock()
66 self.credentials = MagicMock()
67 self.request = MagicMock()
68 self.user = MagicMock()
69 self.instance = MagicMock()
70
71 @patch("axes.handlers.proxy.AxesProxyHandler.implementation", None)
72 def test_setting_changed_signal_triggers_handler_reimport(self):
73 self.assertIsNone(AxesProxyHandler.implementation)
74
75 with self.settings(AXES_HANDLER="axes.handlers.database.AxesDatabaseHandler"):
76 self.assertIsNotNone(AxesProxyHandler.implementation)
77
78 @patch("axes.handlers.proxy.AxesProxyHandler.implementation")
79 def test_user_login_failed(self, handler):
80 self.assertFalse(handler.user_login_failed.called)
81 AxesProxyHandler.user_login_failed(self.sender, self.credentials, self.request)
82 self.assertTrue(handler.user_login_failed.called)
83
84 @patch("axes.handlers.proxy.AxesProxyHandler.implementation")
85 def test_user_logged_in(self, handler):
86 self.assertFalse(handler.user_logged_in.called)
87 AxesProxyHandler.user_logged_in(self.sender, self.request, self.user)
88 self.assertTrue(handler.user_logged_in.called)
89
90 @patch("axes.handlers.proxy.AxesProxyHandler.implementation")
91 def test_user_logged_out(self, handler):
92 self.assertFalse(handler.user_logged_out.called)
93 AxesProxyHandler.user_logged_out(self.sender, self.request, self.user)
94 self.assertTrue(handler.user_logged_out.called)
95
96 @patch("axes.handlers.proxy.AxesProxyHandler.implementation")
97 def test_post_save_access_attempt(self, handler):
98 self.assertFalse(handler.post_save_access_attempt.called)
99 AxesProxyHandler.post_save_access_attempt(self.instance)
100 self.assertTrue(handler.post_save_access_attempt.called)
101
102 @patch("axes.handlers.proxy.AxesProxyHandler.implementation")
103 def test_post_delete_access_attempt(self, handler):
104 self.assertFalse(handler.post_delete_access_attempt.called)
105 AxesProxyHandler.post_delete_access_attempt(self.instance)
106 self.assertTrue(handler.post_delete_access_attempt.called)
107
108
109 class AxesHandlerBaseTestCase(AxesTestCase):
110 def check_whitelist(self, log):
111 with override_settings(
112 AXES_NEVER_LOCKOUT_WHITELIST=True, AXES_IP_WHITELIST=[self.ip_address]
113 ):
114 AxesProxyHandler.user_login_failed(
115 sender=None, request=self.request, credentials=self.credentials
116 )
117 client_str = get_client_str(
118 self.username, self.ip_address, self.user_agent, self.path_info
119 )
120 log.info.assert_called_with(
121 "AXES: Login failed from whitelisted client %s.", client_str
122 )
123
124 def check_empty_request(self, log, handler):
125 AxesProxyHandler.user_login_failed(sender=None, credentials={}, request=None)
126 log.error.assert_called_with(
127 f"AXES: {handler}.user_login_failed does not function without a request."
128 )
129
130
131 @override_settings(
132 AXES_HANDLER="axes.handlers.database.AxesDatabaseHandler",
133 AXES_COOLOFF_TIME=timedelta(seconds=1),
134 AXES_RESET_ON_SUCCESS=True,
135 )
136 class AxesDatabaseHandlerTestCase(AxesHandlerBaseTestCase):
137 def test_handler_reset_attempts(self):
138 self.create_attempt()
139 self.assertEqual(1, AxesProxyHandler.reset_attempts())
140 self.assertFalse(AccessAttempt.objects.count())
141
142 def test_handler_reset_logs(self):
143 self.create_log()
144 self.assertEqual(1, AxesProxyHandler.reset_logs())
145 self.assertFalse(AccessLog.objects.count())
146
147 def test_handler_reset_logs_older_than_42_days(self):
148 self.create_log()
149
150 then = timezone.now() - timezone.timedelta(days=90)
151 with patch("django.utils.timezone.now", return_value=then):
152 self.create_log()
153
154 self.assertEqual(AccessLog.objects.count(), 2)
155 self.assertEqual(1, AxesProxyHandler.reset_logs(age_days=42))
156 self.assertEqual(AccessLog.objects.count(), 1)
157
158 @override_settings(AXES_RESET_ON_SUCCESS=True)
159 def test_handler(self):
160 self.check_handler()
161
162 @override_settings(AXES_RESET_ON_SUCCESS=False)
163 def test_handler_without_reset(self):
164 self.check_handler()
165
166 @override_settings(AXES_FAILURE_LIMIT=lambda *args: 3)
167 def test_handler_callable_failure_limit(self):
168 self.check_handler()
169
170 @override_settings(AXES_FAILURE_LIMIT="axes.tests.base.custom_failure_limit")
171 def test_handler_str_failure_limit(self):
172 self.check_handler()
173
174 @override_settings(AXES_FAILURE_LIMIT=None)
175 def test_handler_invalid_failure_limit(self):
176 with self.assertRaises(TypeError):
177 self.check_handler()
178
179 @override_settings(AXES_LOCK_OUT_AT_FAILURE=False)
180 def test_handler_without_lockout(self):
181 self.check_handler()
182
183 @patch("axes.handlers.database.log")
184 def test_empty_request(self, log):
185 self.check_empty_request(log, "AxesDatabaseHandler")
186
187 @patch("axes.handlers.database.log")
188 def test_whitelist(self, log):
189 self.check_whitelist(log)
190
191 def test_user_login_failed_multiple_username(self):
192 configurations = (
193 (1, 2, {}, ["admin", "admin1"]),
194 (1, 2, {"AXES_USE_USER_AGENT": True}, ["admin", "admin1"]),
195 (2, 1, {"AXES_ONLY_USER_FAILURES": True}, ["admin", "admin1"]),
196 (
197 2,
198 1,
199 {"AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP": True},
200 ["admin", "admin1"],
201 ),
202 (
203 1,
204 2,
205 {"AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP": True},
206 ["admin", "admin"],
207 ),
208 )
209
210 for (
211 total_attempts_count,
212 failures_since_start,
213 overrides,
214 usernames,
215 ) in configurations:
216 with self.settings(**overrides):
217 with self.subTest(
218 total_attempts_count=total_attempts_count,
219 failures_since_start=failures_since_start,
220 settings=overrides,
221 ):
222 self.login(username=usernames[0])
223 attempt = AccessAttempt.objects.get(username=usernames[0])
224 self.assertEqual(1, attempt.failures_since_start)
225
226 # check the number of failures associated to the attempt
227 self.login(username=usernames[1])
228 attempt = AccessAttempt.objects.get(username=usernames[1])
229 self.assertEqual(failures_since_start, attempt.failures_since_start)
230
231 # check the number of distinct attempts
232 self.assertEqual(
233 total_attempts_count, AccessAttempt.objects.count()
234 )
235
236 AccessAttempt.objects.all().delete()
237
238
239 @override_settings(
240 AXES_HANDLER="axes.handlers.cache.AxesCacheHandler",
241 AXES_COOLOFF_TIME=timedelta(seconds=1),
242 )
243 class AxesCacheHandlerTestCase(AxesHandlerBaseTestCase):
244 @override_settings(AXES_RESET_ON_SUCCESS=True)
245 def test_handler(self):
246 self.check_handler()
247
248 @override_settings(AXES_RESET_ON_SUCCESS=False)
249 def test_handler_without_reset(self):
250 self.check_handler()
251
252 @override_settings(AXES_LOCK_OUT_AT_FAILURE=False)
253 def test_handler_without_lockout(self):
254 self.check_handler()
255
256 @patch("axes.handlers.cache.log")
257 def test_empty_request(self, log):
258 self.check_empty_request(log, "AxesCacheHandler")
259
260 @patch("axes.handlers.cache.log")
261 def test_whitelist(self, log):
262 self.check_whitelist(log)
263
264
265 @override_settings(AXES_HANDLER="axes.handlers.dummy.AxesDummyHandler")
266 class AxesDummyHandlerTestCase(AxesHandlerBaseTestCase):
267 def test_handler(self):
268 for _ in range(settings.AXES_FAILURE_LIMIT):
269 self.login()
270
271 self.check_login()
272
273 def test_handler_is_allowed(self):
274 self.assertEqual(True, AxesProxyHandler.is_allowed(self.request, {}))
275
276 def test_handler_get_failures(self):
277 self.assertEqual(0, AxesProxyHandler.get_failures(self.request, {}))
278
279
280 @override_settings(AXES_HANDLER="axes.handlers.test.AxesTestHandler")
281 class AxesTestHandlerTestCase(AxesHandlerBaseTestCase):
282 def test_handler_reset_attempts(self):
283 self.assertEqual(0, AxesProxyHandler.reset_attempts())
284
285 def test_handler_reset_logs(self):
286 self.assertEqual(0, AxesProxyHandler.reset_logs())
287
288 def test_handler_is_allowed(self):
289 self.assertEqual(True, AxesProxyHandler.is_allowed(self.request, {}))
290
291 def test_handler_get_failures(self):
292 self.assertEqual(0, AxesProxyHandler.get_failures(self.request, {}))
+0
-93
axes/tests/test_helpers.py less more
0 from datetime import timedelta
1
2 from django.contrib.auth import get_user_model
3 from django.http import HttpRequest, HttpResponse
4 from django.test import override_settings
5
6 from axes.helpers import get_cool_off, get_lockout_response, is_user_attempt_whitelisted
7 from axes.tests.base import AxesTestCase
8
9
10 def mock_get_cool_off_str():
11 return timedelta(seconds=30)
12
13
14 class AxesCoolOffTestCase(AxesTestCase):
15 @override_settings(AXES_COOLOFF_TIME=None)
16 def test_get_cool_off_none(self):
17 self.assertIsNone(get_cool_off())
18
19 @override_settings(AXES_COOLOFF_TIME=2)
20 def test_get_cool_off_int(self):
21 self.assertEqual(get_cool_off(), timedelta(hours=2))
22
23 @override_settings(AXES_COOLOFF_TIME=lambda: timedelta(seconds=30))
24 def test_get_cool_off_callable(self):
25 self.assertEqual(get_cool_off(), timedelta(seconds=30))
26
27 @override_settings(
28 AXES_COOLOFF_TIME="axes.tests.test_helpers.mock_get_cool_off_str"
29 )
30 def test_get_cool_off_path(self):
31 self.assertEqual(get_cool_off(), timedelta(seconds=30))
32
33
34 def mock_is_whitelisted(request, credentials):
35 return True
36
37
38 class AxesWhitelistTestCase(AxesTestCase):
39 def setUp(self):
40 self.user_model = get_user_model()
41 self.user = self.user_model.objects.create(username="jane.doe")
42 self.request = HttpRequest()
43 self.credentials = dict()
44
45 def test_is_whitelisted(self):
46 self.assertFalse(is_user_attempt_whitelisted(self.request, self.credentials))
47
48 @override_settings(AXES_WHITELIST_CALLABLE=mock_is_whitelisted)
49 def test_is_whitelisted_override_callable(self):
50 self.assertTrue(is_user_attempt_whitelisted(self.request, self.credentials))
51
52 @override_settings(
53 AXES_WHITELIST_CALLABLE="axes.tests.test_helpers.mock_is_whitelisted"
54 )
55 def test_is_whitelisted_override_path(self):
56 self.assertTrue(is_user_attempt_whitelisted(self.request, self.credentials))
57
58 @override_settings(AXES_WHITELIST_CALLABLE=42)
59 def test_is_whitelisted_override_invalid(self):
60 with self.assertRaises(TypeError):
61 is_user_attempt_whitelisted(self.request, self.credentials)
62
63
64 def mock_get_lockout_response(request, credentials):
65 return HttpResponse(status=400)
66
67
68 class AxesLockoutTestCase(AxesTestCase):
69 def setUp(self):
70 self.request = HttpRequest()
71 self.credentials = dict()
72
73 def test_get_lockout_response(self):
74 response = get_lockout_response(self.request, self.credentials)
75 self.assertEqual(403, response.status_code)
76
77 @override_settings(AXES_LOCKOUT_CALLABLE=mock_get_lockout_response)
78 def test_get_lockout_response_override_callable(self):
79 response = get_lockout_response(self.request, self.credentials)
80 self.assertEqual(400, response.status_code)
81
82 @override_settings(
83 AXES_LOCKOUT_CALLABLE="axes.tests.test_helpers.mock_get_lockout_response"
84 )
85 def test_get_lockout_response_override_path(self):
86 response = get_lockout_response(self.request, self.credentials)
87 self.assertEqual(400, response.status_code)
88
89 @override_settings(AXES_LOCKOUT_CALLABLE=42)
90 def test_get_lockout_response_override_invalid(self):
91 with self.assertRaises(TypeError):
92 get_lockout_response(self.request, self.credentials)
+0
-116
axes/tests/test_logging.py less more
0 from unittest.mock import patch
1
2 from django.test import override_settings
3 from django.urls import reverse
4
5 from axes.apps import AppConfig
6 from axes.models import AccessAttempt, AccessLog
7 from axes.tests.base import AxesTestCase
8
9
10 @patch("axes.apps.AppConfig.logging_initialized", False)
11 @patch("axes.apps.log")
12 class AppsTestCase(AxesTestCase):
13 def test_axes_config_log_re_entrant(self, log):
14 """
15 Test that initialize call count does not increase on repeat calls.
16 """
17
18 AppConfig.initialize()
19 calls = log.info.call_count
20
21 AppConfig.initialize()
22 self.assertTrue(
23 calls == log.info.call_count and calls > 0,
24 "AxesConfig.initialize needs to be re-entrant",
25 )
26
27 @override_settings(AXES_VERBOSE=False)
28 def test_axes_config_log_not_verbose(self, log):
29 AppConfig.initialize()
30 self.assertFalse(log.info.called)
31
32 @override_settings(AXES_ONLY_USER_FAILURES=True)
33 def test_axes_config_log_user_only(self, log):
34 AppConfig.initialize()
35 log.info.assert_called_with("AXES: blocking by username only.")
36
37 @override_settings(AXES_ONLY_USER_FAILURES=False)
38 def test_axes_config_log_ip_only(self, log):
39 AppConfig.initialize()
40 log.info.assert_called_with("AXES: blocking by IP only.")
41
42 @override_settings(AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP=True)
43 def test_axes_config_log_user_ip(self, log):
44 AppConfig.initialize()
45 log.info.assert_called_with("AXES: blocking by combination of username and IP.")
46
47
48 class AccessLogTestCase(AxesTestCase):
49 def test_access_log_on_logout(self):
50 """
51 Test a valid logout and make sure the logout_time is updated.
52 """
53
54 self.login(is_valid_username=True, is_valid_password=True)
55 self.assertIsNone(AccessLog.objects.latest("id").logout_time)
56
57 response = self.client.get(reverse("admin:logout"))
58 self.assertContains(response, "Logged out")
59
60 self.assertIsNotNone(AccessLog.objects.latest("id").logout_time)
61
62 def test_log_data_truncated(self):
63 """
64 Test that get_query_str properly truncates data to the max_length (default 1024).
65 """
66
67 # An impossibly large post dict
68 extra_data = {"a" * x: x for x in range(1024)}
69 self.login(**extra_data)
70 self.assertEqual(len(AccessAttempt.objects.latest("id").post_data), 1024)
71
72 @override_settings(AXES_DISABLE_ACCESS_LOG=True)
73 def test_valid_logout_without_success_log(self):
74 AccessLog.objects.all().delete()
75
76 response = self.login(is_valid_username=True, is_valid_password=True)
77 response = self.client.get(reverse("admin:logout"))
78
79 self.assertEqual(AccessLog.objects.all().count(), 0)
80 self.assertContains(response, "Logged out", html=True)
81
82 @override_settings(AXES_DISABLE_ACCESS_LOG=True)
83 def test_valid_login_without_success_log(self):
84 """
85 Test that a valid login does not generate an AccessLog when DISABLE_SUCCESS_ACCESS_LOG is True.
86 """
87
88 AccessLog.objects.all().delete()
89
90 response = self.login(is_valid_username=True, is_valid_password=True)
91
92 self.assertEqual(response.status_code, 302)
93 self.assertEqual(AccessLog.objects.all().count(), 0)
94
95 @override_settings(AXES_DISABLE_ACCESS_LOG=True)
96 def test_valid_logout_without_log(self):
97 AccessLog.objects.all().delete()
98
99 response = self.login(is_valid_username=True, is_valid_password=True)
100 response = self.client.get(reverse("admin:logout"))
101
102 self.assertEqual(AccessLog.objects.count(), 0)
103 self.assertContains(response, "Logged out", html=True)
104
105 @override_settings(AXES_DISABLE_ACCESS_LOG=True)
106 def test_non_valid_login_without_log(self):
107 """
108 Test that a non-valid login does generate an AccessLog when DISABLE_ACCESS_LOG is True.
109 """
110 AccessLog.objects.all().delete()
111
112 response = self.login(is_valid_username=True, is_valid_password=False)
113 self.assertEqual(response.status_code, 200)
114
115 self.assertEqual(AccessLog.objects.all().count(), 0)
+0
-491
axes/tests/test_login.py less more
0 """
1 Integration tests for the login handling.
2
3 TODO: Clean up the tests in this module.
4 """
5
6 from importlib import import_module
7
8 from django.http import HttpRequest
9 from django.test import override_settings, TestCase
10 from django.urls import reverse
11 from django.contrib.auth import get_user_model, login, logout
12
13 from axes.conf import settings
14 from axes.models import AccessAttempt
15 from axes.tests.base import AxesTestCase
16
17
18 class DjangoLoginTestCase(TestCase):
19 def setUp(self):
20 engine = import_module(settings.SESSION_ENGINE)
21
22 self.request = HttpRequest()
23 self.request.session = engine.SessionStore()
24
25 self.username = "john.doe"
26 self.password = "hunter2"
27
28 self.user = get_user_model().objects.create(username=self.username)
29 self.user.set_password(self.password)
30 self.user.save()
31 self.user.backend = "django.contrib.auth.backends.ModelBackend"
32
33
34 class DjangoContribAuthLoginTestCase(DjangoLoginTestCase):
35 def test_login(self):
36 login(self.request, self.user)
37
38 def test_logout(self):
39 login(self.request, self.user)
40 logout(self.request)
41
42
43 @override_settings(AXES_ENABLED=False)
44 class DjangoTestClientLoginTestCase(DjangoLoginTestCase):
45 def test_client_login(self):
46 self.client.login(username=self.username, password=self.password)
47
48 def test_client_logout(self):
49 self.client.login(username=self.username, password=self.password)
50 self.client.logout()
51
52 def test_client_force_login(self):
53 self.client.force_login(self.user)
54
55
56 class LoginTestCase(AxesTestCase):
57 """
58 Test for lockouts under different configurations and circumstances to prevent false positives and false negatives.
59
60 Always block attempted logins for the same user from the same IP.
61 Always allow attempted logins for a different user from a different IP.
62 """
63
64 IP_1 = "10.1.1.1"
65 IP_2 = "10.2.2.2"
66 USER_1 = "valid-user-1"
67 USER_2 = "valid-user-2"
68 EMAIL_1 = "valid-email-1@example.com"
69 EMAIL_2 = "valid-email-2@example.com"
70
71 VALID_USERNAME = USER_1
72 VALID_EMAIL = EMAIL_1
73 VALID_PASSWORD = "valid-password"
74
75 VALID_IP_ADDRESS = IP_1
76
77 WRONG_PASSWORD = "wrong-password"
78 LOCKED_MESSAGE = "Account locked: too many login attempts."
79 LOGIN_FORM_KEY = '<input type="submit" value="Log in" />'
80 ALLOWED = 302
81 BLOCKED = 403
82
83 def _login(self, username, password, ip_addr="127.0.0.1", **kwargs):
84 """
85 Login a user and get the response.
86
87 IP address can be configured to test IP blocking functionality.
88 """
89
90 post_data = {"username": username, "password": password}
91
92 post_data.update(kwargs)
93
94 return self.client.post(
95 reverse("admin:login"),
96 post_data,
97 REMOTE_ADDR=ip_addr,
98 HTTP_USER_AGENT="test-browser",
99 )
100
101 def _lockout_user_from_ip(self, username, ip_addr):
102 for _ in range(settings.AXES_FAILURE_LIMIT):
103 response = self._login(
104 username=username, password=self.WRONG_PASSWORD, ip_addr=ip_addr
105 )
106 return response
107
108 def _lockout_user1_from_ip1(self):
109 return self._lockout_user_from_ip(username=self.USER_1, ip_addr=self.IP_1)
110
111 def setUp(self):
112 """
113 Create two valid users for authentication.
114 """
115
116 super().setUp()
117
118 self.user2 = get_user_model().objects.create_superuser(
119 username=self.USER_2,
120 email=self.EMAIL_2,
121 password=self.VALID_PASSWORD,
122 is_staff=True,
123 is_superuser=True,
124 )
125
126 def test_login(self):
127 """
128 Test a valid login for a real username.
129 """
130
131 response = self._login(self.username, self.password)
132 self.assertNotContains(
133 response, self.LOGIN_FORM_KEY, status_code=self.ALLOWED, html=True
134 )
135
136 def test_lockout_limit_once(self):
137 """
138 Test the login lock trying to login one more time than failure limit.
139 """
140
141 response = self.lockout()
142 self.assertContains(response, self.LOCKED_MESSAGE, status_code=self.BLOCKED)
143
144 def test_lockout_limit_many(self):
145 """
146 Test the login lock trying to login a lot of times more than failure limit.
147 """
148
149 self.lockout()
150
151 for _ in range(settings.AXES_FAILURE_LIMIT):
152 response = self.login()
153 self.assertContains(response, self.LOCKED_MESSAGE, status_code=self.BLOCKED)
154
155 @override_settings(AXES_RESET_ON_SUCCESS=False)
156 def test_reset_on_success_false(self):
157 self.almost_lockout()
158 self.login(is_valid_username=True, is_valid_password=True)
159
160 response = self.login()
161 self.assertContains(response, self.LOCKED_MESSAGE, status_code=self.BLOCKED)
162 self.assertTrue(AccessAttempt.objects.count())
163
164 @override_settings(AXES_RESET_ON_SUCCESS=True)
165 def test_reset_on_success_true(self):
166 self.almost_lockout()
167 self.assertTrue(AccessAttempt.objects.count())
168
169 self.login(is_valid_username=True, is_valid_password=True)
170 self.assertFalse(AccessAttempt.objects.count())
171
172 response = self.lockout()
173 self.assertContains(response, self.LOCKED_MESSAGE, status_code=self.BLOCKED)
174 self.assertTrue(AccessAttempt.objects.count())
175
176 @override_settings(AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP=True)
177 def test_lockout_by_combination_user_and_ip(self):
178 """
179 Test login failure when AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP is True.
180 """
181
182 # test until one try before the limit
183 for _ in range(1, settings.AXES_FAILURE_LIMIT):
184 response = self.login(is_valid_username=True, is_valid_password=False)
185 # Check if we are in the same login page
186 self.assertContains(response, self.LOGIN_FORM_KEY, html=True)
187
188 # So, we shouldn't have gotten a lock-out yet.
189 # But we should get one now
190 response = self.login(is_valid_username=True, is_valid_password=False)
191 self.assertContains(response, self.LOCKED_MESSAGE, status_code=403)
192
193 @override_settings(AXES_ONLY_USER_FAILURES=True)
194 def test_lockout_by_only_user_failures(self):
195 """
196 Test login failure when AXES_ONLY_USER_FAILURES is True.
197 """
198
199 # test until one try before the limit
200 for _ in range(1, settings.AXES_FAILURE_LIMIT):
201 response = self._login(self.username, self.WRONG_PASSWORD)
202
203 # Check if we are in the same login page
204 self.assertContains(response, self.LOGIN_FORM_KEY, html=True)
205
206 # So, we shouldn't have gotten a lock-out yet.
207 # But we should get one now
208 response = self._login(self.username, self.WRONG_PASSWORD)
209 self.assertContains(response, self.LOCKED_MESSAGE, status_code=self.BLOCKED)
210
211 # reset the username only and make sure we can log in now even though our IP has failed each time
212 self.reset(username=self.username)
213
214 response = self._login(self.username, self.password)
215
216 # Check if we are still in the login page
217 self.assertNotContains(
218 response, self.LOGIN_FORM_KEY, status_code=self.ALLOWED, html=True
219 )
220
221 # now create failure_limit + 1 failed logins and then we should still
222 # be able to login with valid_username
223 for _ in range(settings.AXES_FAILURE_LIMIT):
224 response = self._login(self.username, self.password)
225
226 # Check if we can still log in with valid user
227 response = self._login(self.username, self.password)
228 self.assertNotContains(
229 response, self.LOGIN_FORM_KEY, status_code=self.ALLOWED, html=True
230 )
231
232 # Test for true and false positives when blocking by IP *OR* user (default)
233 # Cache disabled. Default settings.
234 def test_lockout_by_ip_blocks_when_same_user_same_ip_without_cache(self):
235 # User 1 is locked out from IP 1.
236 self._lockout_user1_from_ip1()
237
238 # User 1 is still blocked from IP 1.
239 response = self._login(self.USER_1, self.VALID_PASSWORD, ip_addr=self.IP_1)
240 self.assertEqual(response.status_code, self.BLOCKED)
241
242 def test_lockout_by_ip_allows_when_same_user_diff_ip_without_cache(self):
243 # User 1 is locked out from IP 1.
244 self._lockout_user1_from_ip1()
245
246 # User 1 can still login from IP 2.
247 response = self._login(self.USER_1, self.VALID_PASSWORD, ip_addr=self.IP_2)
248 self.assertEqual(response.status_code, self.ALLOWED)
249
250 def test_lockout_by_ip_blocks_when_diff_user_same_ip_without_cache(self):
251 # User 1 is locked out from IP 1.
252 self._lockout_user1_from_ip1()
253
254 # User 2 is also locked out from IP 1.
255 response = self._login(self.USER_2, self.VALID_PASSWORD, ip_addr=self.IP_1)
256 self.assertEqual(response.status_code, self.BLOCKED)
257
258 def test_lockout_by_ip_allows_when_diff_user_diff_ip_without_cache(self):
259 # User 1 is locked out from IP 1.
260 self._lockout_user1_from_ip1()
261
262 # User 2 can still login from IP 2.
263 response = self._login(self.USER_2, self.VALID_PASSWORD, ip_addr=self.IP_2)
264 self.assertEqual(response.status_code, self.ALLOWED)
265
266 # Test for true and false positives when blocking by user only.
267 # Cache disabled. When AXES_ONLY_USER_FAILURES = True
268 @override_settings(AXES_ONLY_USER_FAILURES=True)
269 def test_lockout_by_user_blocks_when_same_user_same_ip_without_cache(self):
270 # User 1 is locked out from IP 1.
271 self._lockout_user1_from_ip1()
272
273 # User 1 is still blocked from IP 1.
274 response = self._login(self.USER_1, self.VALID_PASSWORD, ip_addr=self.IP_1)
275 self.assertEqual(response.status_code, self.BLOCKED)
276
277 @override_settings(AXES_ONLY_USER_FAILURES=True)
278 def test_lockout_by_user_blocks_when_same_user_diff_ip_without_cache(self):
279 # User 1 is locked out from IP 1.
280 self._lockout_user1_from_ip1()
281
282 # User 1 is also locked out from IP 2.
283 response = self._login(self.USER_1, self.VALID_PASSWORD, ip_addr=self.IP_2)
284 self.assertEqual(response.status_code, self.BLOCKED)
285
286 @override_settings(AXES_ONLY_USER_FAILURES=True)
287 def test_lockout_by_user_allows_when_diff_user_same_ip_without_cache(self):
288 # User 1 is locked out from IP 1.
289 self._lockout_user1_from_ip1()
290
291 # User 2 can still login from IP 1.
292 response = self._login(self.USER_2, self.VALID_PASSWORD, ip_addr=self.IP_1)
293 self.assertEqual(response.status_code, self.ALLOWED)
294
295 @override_settings(AXES_ONLY_USER_FAILURES=True)
296 def test_lockout_by_user_allows_when_diff_user_diff_ip_without_cache(self):
297 # User 1 is locked out from IP 1.
298 self._lockout_user1_from_ip1()
299
300 # User 2 can still login from IP 2.
301 response = self._login(self.USER_2, self.VALID_PASSWORD, ip_addr=self.IP_2)
302 self.assertEqual(response.status_code, self.ALLOWED)
303
304 @override_settings(AXES_ONLY_USER_FAILURES=True)
305 def test_lockout_by_user_with_empty_username_allows_other_users_without_cache(self):
306 # User with empty username is locked out from IP 1.
307 self._lockout_user_from_ip(username="", ip_addr=self.IP_1)
308
309 # Still possible to access the login page
310 response = self.client.get(reverse("admin:login"), REMOTE_ADDR=self.IP_1)
311 self.assertContains(response, self.LOGIN_FORM_KEY, status_code=200, html=True)
312
313 # Test for true and false positives when blocking by user and IP together.
314 # Cache disabled. When LOCK_OUT_BY_COMBINATION_USER_AND_IP = True
315 @override_settings(AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP=True)
316 def test_lockout_by_user_and_ip_blocks_when_same_user_same_ip_without_cache(self):
317 # User 1 is locked out from IP 1.
318 self._lockout_user1_from_ip1()
319
320 # User 1 is still blocked from IP 1.
321 response = self._login(self.USER_1, self.VALID_PASSWORD, ip_addr=self.IP_1)
322 self.assertEqual(response.status_code, self.BLOCKED)
323
324 @override_settings(AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP=True)
325 def test_lockout_by_user_and_ip_allows_when_same_user_diff_ip_without_cache(self):
326 # User 1 is locked out from IP 1.
327 self._lockout_user1_from_ip1()
328
329 # User 1 can still login from IP 2.
330 response = self._login(self.USER_1, self.VALID_PASSWORD, ip_addr=self.IP_2)
331 self.assertEqual(response.status_code, self.ALLOWED)
332
333 @override_settings(AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP=True)
334 def test_lockout_by_user_and_ip_allows_when_diff_user_same_ip_without_cache(self):
335 # User 1 is locked out from IP 1.
336 self._lockout_user1_from_ip1()
337
338 # User 2 can still login from IP 1.
339 response = self._login(self.USER_2, self.VALID_PASSWORD, ip_addr=self.IP_1)
340 self.assertEqual(response.status_code, self.ALLOWED)
341
342 @override_settings(AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP=True)
343 def test_lockout_by_user_and_ip_allows_when_diff_user_diff_ip_without_cache(self):
344 # User 1 is locked out from IP 1.
345 self._lockout_user1_from_ip1()
346
347 # User 2 can still login from IP 2.
348 response = self._login(self.USER_2, self.VALID_PASSWORD, ip_addr=self.IP_2)
349 self.assertEqual(response.status_code, self.ALLOWED)
350
351 @override_settings(AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP=True)
352 def test_lockout_by_user_and_ip_with_empty_username_allows_other_users_without_cache(
353 self,
354 ):
355 # User with empty username is locked out from IP 1.
356 self._lockout_user_from_ip(username="", ip_addr=self.IP_1)
357
358 # Still possible to access the login page
359 response = self.client.get(reverse("admin:login"), REMOTE_ADDR=self.IP_1)
360 self.assertContains(response, self.LOGIN_FORM_KEY, status_code=200, html=True)
361
362 # Test for true and false positives when blocking by IP *OR* user (default)
363 # With cache enabled. Default criteria.
364 def test_lockout_by_ip_blocks_when_same_user_same_ip_using_cache(self):
365 # User 1 is locked out from IP 1.
366 self._lockout_user1_from_ip1()
367
368 # User 1 is still blocked from IP 1.
369 response = self._login(self.USER_1, self.VALID_PASSWORD, ip_addr=self.IP_1)
370 self.assertEqual(response.status_code, self.BLOCKED)
371
372 def test_lockout_by_ip_allows_when_same_user_diff_ip_using_cache(self):
373 # User 1 is locked out from IP 1.
374 self._lockout_user1_from_ip1()
375
376 # User 1 can still login from IP 2.
377 response = self._login(self.USER_1, self.VALID_PASSWORD, ip_addr=self.IP_2)
378 self.assertEqual(response.status_code, self.ALLOWED)
379
380 def test_lockout_by_ip_blocks_when_diff_user_same_ip_using_cache(self):
381 # User 1 is locked out from IP 1.
382 self._lockout_user1_from_ip1()
383
384 # User 2 is also locked out from IP 1.
385 response = self._login(self.USER_2, self.VALID_PASSWORD, ip_addr=self.IP_1)
386 self.assertEqual(response.status_code, self.BLOCKED)
387
388 def test_lockout_by_ip_allows_when_diff_user_diff_ip_using_cache(self):
389 # User 1 is locked out from IP 1.
390 self._lockout_user1_from_ip1()
391
392 # User 2 can still login from IP 2.
393 response = self._login(self.USER_2, self.VALID_PASSWORD, ip_addr=self.IP_2)
394 self.assertEqual(response.status_code, self.ALLOWED)
395
396 @override_settings(AXES_ONLY_USER_FAILURES=True)
397 def test_lockout_by_user_with_empty_username_allows_other_users_using_cache(self):
398 # User with empty username is locked out from IP 1.
399 self._lockout_user_from_ip(username="", ip_addr=self.IP_1)
400
401 # Still possible to access the login page
402 response = self.client.get(reverse("admin:login"), REMOTE_ADDR=self.IP_1)
403 self.assertContains(response, self.LOGIN_FORM_KEY, status_code=200, html=True)
404
405 # Test for true and false positives when blocking by user only.
406 # With cache enabled. When AXES_ONLY_USER_FAILURES = True
407 @override_settings(AXES_ONLY_USER_FAILURES=True)
408 def test_lockout_by_user_blocks_when_same_user_same_ip_using_cache(self):
409 # User 1 is locked out from IP 1.
410 self._lockout_user1_from_ip1()
411
412 # User 1 is still blocked from IP 1.
413 response = self._login(self.USER_1, self.VALID_PASSWORD, ip_addr=self.IP_1)
414 self.assertEqual(response.status_code, self.BLOCKED)
415
416 @override_settings(AXES_ONLY_USER_FAILURES=True)
417 def test_lockout_by_user_blocks_when_same_user_diff_ip_using_cache(self):
418 # User 1 is locked out from IP 1.
419 self._lockout_user1_from_ip1()
420
421 # User 1 is also locked out from IP 2.
422 response = self._login(self.USER_1, self.VALID_PASSWORD, ip_addr=self.IP_2)
423 self.assertEqual(response.status_code, self.BLOCKED)
424
425 @override_settings(AXES_ONLY_USER_FAILURES=True)
426 def test_lockout_by_user_allows_when_diff_user_same_ip_using_cache(self):
427 # User 1 is locked out from IP 1.
428 self._lockout_user1_from_ip1()
429
430 # User 2 can still login from IP 1.
431 response = self._login(self.USER_2, self.VALID_PASSWORD, ip_addr=self.IP_1)
432 self.assertEqual(response.status_code, self.ALLOWED)
433
434 @override_settings(AXES_ONLY_USER_FAILURES=True)
435 def test_lockout_by_user_allows_when_diff_user_diff_ip_using_cache(self):
436 # User 1 is locked out from IP 1.
437 self._lockout_user1_from_ip1()
438
439 # User 2 can still login from IP 2.
440 response = self._login(self.USER_2, self.VALID_PASSWORD, ip_addr=self.IP_2)
441 self.assertEqual(response.status_code, self.ALLOWED)
442
443 # Test for true and false positives when blocking by user and IP together.
444 # With cache enabled. When LOCK_OUT_BY_COMBINATION_USER_AND_IP = True
445 @override_settings(AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP=True)
446 def test_lockout_by_user_and_ip_blocks_when_same_user_same_ip_using_cache(self):
447 # User 1 is locked out from IP 1.
448 self._lockout_user1_from_ip1()
449
450 # User 1 is still blocked from IP 1.
451 response = self._login(self.USER_1, self.VALID_PASSWORD, ip_addr=self.IP_1)
452 self.assertEqual(response.status_code, self.BLOCKED)
453
454 @override_settings(AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP=True)
455 def test_lockout_by_user_and_ip_allows_when_same_user_diff_ip_using_cache(self):
456 # User 1 is locked out from IP 1.
457 self._lockout_user1_from_ip1()
458
459 # User 1 can still login from IP 2.
460 response = self._login(self.USER_1, self.VALID_PASSWORD, ip_addr=self.IP_2)
461 self.assertEqual(response.status_code, self.ALLOWED)
462
463 @override_settings(AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP=True)
464 def test_lockout_by_user_and_ip_allows_when_diff_user_same_ip_using_cache(self):
465 # User 1 is locked out from IP 1.
466 self._lockout_user1_from_ip1()
467
468 # User 2 can still login from IP 1.
469 response = self._login(self.USER_2, self.VALID_PASSWORD, ip_addr=self.IP_1)
470 self.assertEqual(response.status_code, self.ALLOWED)
471
472 @override_settings(AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP=True)
473 def test_lockout_by_user_and_ip_allows_when_diff_user_diff_ip_using_cache(self):
474 # User 1 is locked out from IP 1.
475 self._lockout_user1_from_ip1()
476
477 # User 2 can still login from IP 2.
478 response = self._login(self.USER_2, self.VALID_PASSWORD, ip_addr=self.IP_2)
479 self.assertEqual(response.status_code, self.ALLOWED)
480
481 @override_settings(AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP=True)
482 def test_lockout_by_user_and_ip_with_empty_username_allows_other_users_using_cache(
483 self,
484 ):
485 # User with empty username is locked out from IP 1.
486 self._lockout_user_from_ip(username="", ip_addr=self.IP_1)
487
488 # Still possible to access the login page
489 response = self.client.get(reverse("admin:login"), REMOTE_ADDR=self.IP_1)
490 self.assertContains(response, self.LOGIN_FORM_KEY, status_code=200, html=True)
+0
-109
axes/tests/test_management.py less more
0 from io import StringIO
1 from unittest.mock import patch, Mock
2
3 from django.core.management import call_command
4 from django.utils import timezone
5
6 from axes.models import AccessAttempt, AccessLog
7 from axes.tests.base import AxesTestCase
8
9
10 class ResetAccessLogsManagementCommandTestCase(AxesTestCase):
11 def setUp(self):
12 self.msg_not_found = "No logs found.\n"
13 self.msg_num_found = "{} logs removed.\n"
14
15 days_3 = timezone.now() - timezone.timedelta(days=3)
16 with patch("django.utils.timezone.now", Mock(return_value=days_3)):
17 AccessLog.objects.create()
18
19 days_13 = timezone.now() - timezone.timedelta(days=9)
20 with patch("django.utils.timezone.now", Mock(return_value=days_13)):
21 AccessLog.objects.create()
22
23 days_30 = timezone.now() - timezone.timedelta(days=27)
24 with patch("django.utils.timezone.now", Mock(return_value=days_30)):
25 AccessLog.objects.create()
26
27 def test_axes_delete_access_logs_default(self):
28 out = StringIO()
29 call_command("axes_reset_logs", stdout=out)
30 self.assertEqual(self.msg_not_found, out.getvalue())
31
32 def test_axes_delete_access_logs_older_than_2_days(self):
33 out = StringIO()
34 call_command("axes_reset_logs", age=2, stdout=out)
35 self.assertEqual(self.msg_num_found.format(3), out.getvalue())
36
37 def test_axes_delete_access_logs_older_than_4_days(self):
38 out = StringIO()
39 call_command("axes_reset_logs", age=4, stdout=out)
40 self.assertEqual(self.msg_num_found.format(2), out.getvalue())
41
42 def test_axes_delete_access_logs_older_than_16_days(self):
43 out = StringIO()
44 call_command("axes_reset_logs", age=16, stdout=out)
45 self.assertEqual(self.msg_num_found.format(1), out.getvalue())
46
47
48 class ManagementCommandTestCase(AxesTestCase):
49 def setUp(self):
50 AccessAttempt.objects.create(
51 username="jane.doe", ip_address="10.0.0.1", failures_since_start="4"
52 )
53
54 AccessAttempt.objects.create(
55 username="john.doe", ip_address="10.0.0.2", failures_since_start="15"
56 )
57
58 def test_axes_list_attempts(self):
59 out = StringIO()
60 call_command("axes_list_attempts", stdout=out)
61
62 expected = "10.0.0.1\tjane.doe\t4\n10.0.0.2\tjohn.doe\t15\n"
63 self.assertEqual(expected, out.getvalue())
64
65 def test_axes_reset(self):
66 out = StringIO()
67 call_command("axes_reset", stdout=out)
68
69 expected = "2 attempts removed.\n"
70 self.assertEqual(expected, out.getvalue())
71
72 def test_axes_reset_not_found(self):
73 out = StringIO()
74 call_command("axes_reset", stdout=out)
75
76 out = StringIO()
77 call_command("axes_reset", stdout=out)
78
79 expected = "No attempts found.\n"
80 self.assertEqual(expected, out.getvalue())
81
82 def test_axes_reset_ip(self):
83 out = StringIO()
84 call_command("axes_reset_ip", "10.0.0.1", stdout=out)
85
86 expected = "1 attempts removed.\n"
87 self.assertEqual(expected, out.getvalue())
88
89 def test_axes_reset_ip_not_found(self):
90 out = StringIO()
91 call_command("axes_reset_ip", "10.0.0.3", stdout=out)
92
93 expected = "No attempts found.\n"
94 self.assertEqual(expected, out.getvalue())
95
96 def test_axes_reset_username(self):
97 out = StringIO()
98 call_command("axes_reset_username", "john.doe", stdout=out)
99
100 expected = "1 attempts removed.\n"
101 self.assertEqual(expected, out.getvalue())
102
103 def test_axes_reset_username_not_found(self):
104 out = StringIO()
105 call_command("axes_reset_username", "ivan.renko", stdout=out)
106
107 expected = "No attempts found.\n"
108 self.assertEqual(expected, out.getvalue())
+0
-28
axes/tests/test_middleware.py less more
0 from django.http import HttpResponse, HttpRequest
1
2 from axes.middleware import AxesMiddleware
3 from axes.tests.base import AxesTestCase
4
5
6 class MiddlewareTestCase(AxesTestCase):
7 STATUS_SUCCESS = 200
8 STATUS_LOCKOUT = 403
9
10 def setUp(self):
11 self.request = HttpRequest()
12
13 def test_success_response(self):
14 def get_response(request):
15 request.axes_locked_out = False
16 return HttpResponse()
17
18 response = AxesMiddleware(get_response)(self.request)
19 self.assertEqual(response.status_code, self.STATUS_SUCCESS)
20
21 def test_lockout_response(self):
22 def get_response(request):
23 request.axes_locked_out = True
24 return HttpResponse()
25
26 response = AxesMiddleware(get_response)(self.request)
27 self.assertEqual(response.status_code, self.STATUS_LOCKOUT)
+0
-36
axes/tests/test_models.py less more
0 from django.apps.registry import apps
1 from django.db import connection
2 from django.db.migrations.autodetector import MigrationAutodetector
3 from django.db.migrations.executor import MigrationExecutor
4 from django.db.migrations.state import ProjectState
5
6 from axes.models import AccessAttempt, AccessLog
7 from axes.tests.base import AxesTestCase
8
9
10 class ModelsTestCase(AxesTestCase):
11 def setUp(self):
12 self.failures_since_start = 42
13
14 self.access_attempt = AccessAttempt(
15 failures_since_start=self.failures_since_start
16 )
17 self.access_log = AccessLog()
18
19 def test_access_attempt_str(self):
20 self.assertIn("Access", str(self.access_attempt))
21
22 def test_access_log_str(self):
23 self.assertIn("Access", str(self.access_log))
24
25
26 class MigrationsTestCase(AxesTestCase):
27 def test_missing_migrations(self):
28 executor = MigrationExecutor(connection)
29 autodetector = MigrationAutodetector(
30 executor.loader.project_state(), ProjectState.from_apps(apps)
31 )
32
33 changes = autodetector.changes(graph=executor.loader.graph)
34
35 self.assertEqual({}, changes)
+0
-18
axes/tests/test_signals.py less more
0 from unittest.mock import MagicMock
1
2 from axes.tests.base import AxesTestCase
3 from axes.signals import user_locked_out
4
5
6 class SignalTestCase(AxesTestCase):
7 def test_send_lockout_signal(self):
8 """
9 Test if the lockout signal is correctly emitted when user is locked out.
10 """
11
12 handler = MagicMock()
13 user_locked_out.connect(handler)
14
15 self.assertEqual(0, handler.call_count)
16 self.lockout()
17 self.assertEqual(1, handler.call_count)
+0
-599
axes/tests/test_utils.py less more
0 from datetime import timedelta
1 from hashlib import md5
2 from unittest.mock import patch
3
4 from django.http import JsonResponse, HttpResponseRedirect, HttpResponse, HttpRequest
5 from django.test import override_settings, RequestFactory
6
7 from axes.apps import AppConfig
8 from axes.models import AccessAttempt
9 from axes.tests.base import AxesTestCase
10 from axes.helpers import (
11 get_cache_timeout,
12 get_client_str,
13 get_client_username,
14 get_client_cache_key,
15 get_client_parameters,
16 get_cool_off_iso8601,
17 get_lockout_response,
18 is_client_ip_address_blacklisted,
19 is_client_ip_address_whitelisted,
20 is_ip_address_in_blacklist,
21 is_ip_address_in_whitelist,
22 is_client_method_whitelisted,
23 toggleable,
24 )
25
26
27 @override_settings(AXES_ENABLED=False)
28 class AxesDisabledTestCase(AxesTestCase):
29 def test_initialize(self):
30 AppConfig.logging_initialized = False
31 AppConfig.initialize()
32 self.assertFalse(AppConfig.logging_initialized)
33
34 def test_toggleable(self):
35 def is_true():
36 return True
37
38 self.assertTrue(is_true())
39 self.assertIsNone(toggleable(is_true)())
40
41
42 class CacheTestCase(AxesTestCase):
43 @override_settings(AXES_COOLOFF_TIME=3) # hours
44 def test_get_cache_timeout_integer(self):
45 timeout_seconds = float(60 * 60 * 3)
46 self.assertEqual(get_cache_timeout(), timeout_seconds)
47
48 @override_settings(AXES_COOLOFF_TIME=timedelta(seconds=420))
49 def test_get_cache_timeout_timedelta(self):
50 self.assertEqual(get_cache_timeout(), 420)
51
52 @override_settings(AXES_COOLOFF_TIME=None)
53 def test_get_cache_timeout_none(self):
54 self.assertEqual(get_cache_timeout(), None)
55
56
57 class TimestampTestCase(AxesTestCase):
58 def test_iso8601(self):
59 """
60 Test get_cool_off_iso8601 correctly translates datetime.timedelta to ISO 8601 formatted duration.
61 """
62
63 expected = {
64 timedelta(days=1, hours=25, minutes=42, seconds=8): "P2DT1H42M8S",
65 timedelta(days=7, seconds=342): "P7DT5M42S",
66 timedelta(days=0, hours=2, minutes=42): "PT2H42M",
67 timedelta(hours=20, seconds=42): "PT20H42S",
68 timedelta(seconds=300): "PT5M",
69 timedelta(seconds=9005): "PT2H30M5S",
70 timedelta(minutes=9005): "P6DT6H5M",
71 timedelta(days=15): "P15D",
72 }
73
74 for delta, iso_duration in expected.items():
75 with self.subTest(iso_duration):
76 self.assertEqual(get_cool_off_iso8601(delta), iso_duration)
77
78
79 class ClientStringTestCase(AxesTestCase):
80 @staticmethod
81 def get_expected_client_str(*args, **kwargs):
82 client_str_template = '{{username: "{0}", ip_address: "{1}", user_agent: "{2}", path_info: "{3}"}}'
83 return client_str_template.format(*args, **kwargs)
84
85 @override_settings(AXES_VERBOSE=True)
86 def test_verbose_ip_only_client_details(self):
87 username = "test@example.com"
88 ip_address = "127.0.0.1"
89 user_agent = "Googlebot/2.1 (+http://www.googlebot.com/bot.html)"
90 path_info = "/admin/"
91
92 expected = self.get_expected_client_str(
93 username, ip_address, user_agent, path_info
94 )
95 actual = get_client_str(username, ip_address, user_agent, path_info)
96
97 self.assertEqual(expected, actual)
98
99 @override_settings(AXES_VERBOSE=True)
100 def test_imbalanced_quotes(self):
101 username = "butterfly.. },,,"
102 ip_address = "127.0.0.1"
103 user_agent = "Googlebot/2.1 (+http://www.googlebot.com/bot.html)"
104 path_info = "/admin/"
105
106 expected = self.get_expected_client_str(
107 username, ip_address, user_agent, path_info
108 )
109 actual = get_client_str(username, ip_address, user_agent, path_info)
110
111 self.assertEqual(expected, actual)
112
113 @override_settings(AXES_VERBOSE=True)
114 def test_verbose_ip_only_client_details_tuple(self):
115 username = "test@example.com"
116 ip_address = "127.0.0.1"
117 user_agent = "Googlebot/2.1 (+http://www.googlebot.com/bot.html)"
118 path_info = ("admin", "login")
119
120 expected = self.get_expected_client_str(
121 username, ip_address, user_agent, path_info[0]
122 )
123 actual = get_client_str(username, ip_address, user_agent, path_info)
124
125 self.assertEqual(expected, actual)
126
127 @override_settings(AXES_VERBOSE=False)
128 def test_non_verbose_ip_only_client_details(self):
129 username = "test@example.com"
130 ip_address = "127.0.0.1"
131 user_agent = "Googlebot/2.1 (+http://www.googlebot.com/bot.html)"
132 path_info = "/admin/"
133
134 expected = '{ip_address: "127.0.0.1", path_info: "/admin/"}'
135 actual = get_client_str(username, ip_address, user_agent, path_info)
136
137 self.assertEqual(expected, actual)
138
139 @override_settings(AXES_ONLY_USER_FAILURES=True)
140 @override_settings(AXES_VERBOSE=True)
141 def test_verbose_user_only_client_details(self):
142 username = "test@example.com"
143 ip_address = "127.0.0.1"
144 user_agent = "Googlebot/2.1 (+http://www.googlebot.com/bot.html)"
145 path_info = "/admin/"
146
147 expected = self.get_expected_client_str(
148 username, ip_address, user_agent, path_info
149 )
150 actual = get_client_str(username, ip_address, user_agent, path_info)
151
152 self.assertEqual(expected, actual)
153
154 @override_settings(AXES_ONLY_USER_FAILURES=True)
155 @override_settings(AXES_VERBOSE=False)
156 def test_non_verbose_user_only_client_details(self):
157 username = "test@example.com"
158 ip_address = "127.0.0.1"
159 user_agent = "Googlebot/2.1 (+http://www.googlebot.com/bot.html)"
160 path_info = "/admin/"
161
162 expected = '{username: "test@example.com", path_info: "/admin/"}'
163 actual = get_client_str(username, ip_address, user_agent, path_info)
164
165 self.assertEqual(expected, actual)
166
167 @override_settings(AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP=True)
168 @override_settings(AXES_VERBOSE=True)
169 def test_verbose_user_ip_combo_client_details(self):
170 username = "test@example.com"
171 ip_address = "127.0.0.1"
172 user_agent = "Googlebot/2.1 (+http://www.googlebot.com/bot.html)"
173 path_info = "/admin/"
174
175 expected = self.get_expected_client_str(
176 username, ip_address, user_agent, path_info
177 )
178 actual = get_client_str(username, ip_address, user_agent, path_info)
179
180 self.assertEqual(expected, actual)
181
182 @override_settings(AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP=True)
183 @override_settings(AXES_VERBOSE=False)
184 def test_non_verbose_user_ip_combo_client_details(self):
185 username = "test@example.com"
186 ip_address = "127.0.0.1"
187 user_agent = "Googlebot/2.1 (+http://www.googlebot.com/bot.html)"
188 path_info = "/admin/"
189
190 expected = '{username: "test@example.com", ip_address: "127.0.0.1", path_info: "/admin/"}'
191 actual = get_client_str(username, ip_address, user_agent, path_info)
192
193 self.assertEqual(expected, actual)
194
195 @override_settings(AXES_USE_USER_AGENT=True)
196 @override_settings(AXES_VERBOSE=True)
197 def test_verbose_user_agent_client_details(self):
198 username = "test@example.com"
199 ip_address = "127.0.0.1"
200 user_agent = "Googlebot/2.1 (+http://www.googlebot.com/bot.html)"
201 path_info = "/admin/"
202
203 expected = self.get_expected_client_str(
204 username, ip_address, user_agent, path_info
205 )
206 actual = get_client_str(username, ip_address, user_agent, path_info)
207
208 self.assertEqual(expected, actual)
209
210 @override_settings(AXES_USE_USER_AGENT=True)
211 @override_settings(AXES_VERBOSE=False)
212 def test_non_verbose_user_agent_client_details(self):
213 username = "test@example.com"
214 ip_address = "127.0.0.1"
215 user_agent = "Googlebot/2.1 (+http://www.googlebot.com/bot.html)"
216 path_info = "/admin/"
217
218 expected = '{ip_address: "127.0.0.1", user_agent: "Googlebot/2.1 (+http://www.googlebot.com/bot.html)", path_info: "/admin/"}'
219 actual = get_client_str(username, ip_address, user_agent, path_info)
220
221 self.assertEqual(expected, actual)
222
223
224 class ClientParametersTestCase(AxesTestCase):
225 @override_settings(AXES_ONLY_USER_FAILURES=True)
226 def test_get_filter_kwargs_user(self):
227 self.assertEqual(
228 dict(
229 get_client_parameters(self.username, self.ip_address, self.user_agent)
230 ),
231 {"username": self.username},
232 )
233
234 @override_settings(
235 AXES_ONLY_USER_FAILURES=False,
236 AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP=False,
237 AXES_USE_USER_AGENT=False,
238 )
239 def test_get_filter_kwargs_ip(self):
240 self.assertEqual(
241 dict(
242 get_client_parameters(self.username, self.ip_address, self.user_agent)
243 ),
244 {"ip_address": self.ip_address},
245 )
246
247 @override_settings(
248 AXES_ONLY_USER_FAILURES=False,
249 AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP=True,
250 AXES_USE_USER_AGENT=False,
251 )
252 def test_get_filter_kwargs_user_and_ip(self):
253 self.assertEqual(
254 dict(
255 get_client_parameters(self.username, self.ip_address, self.user_agent)
256 ),
257 {"username": self.username, "ip_address": self.ip_address},
258 )
259
260 @override_settings(
261 AXES_ONLY_USER_FAILURES=False,
262 AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP=False,
263 AXES_USE_USER_AGENT=True,
264 )
265 def test_get_filter_kwargs_ip_and_agent(self):
266 self.assertEqual(
267 dict(
268 get_client_parameters(self.username, self.ip_address, self.user_agent)
269 ),
270 {"ip_address": self.ip_address, "user_agent": self.user_agent},
271 )
272
273 @override_settings(
274 AXES_ONLY_USER_FAILURES=False,
275 AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP=True,
276 AXES_USE_USER_AGENT=True,
277 )
278 def test_get_filter_kwargs_user_ip_agent(self):
279 self.assertEqual(
280 dict(
281 get_client_parameters(self.username, self.ip_address, self.user_agent)
282 ),
283 {
284 "username": self.username,
285 "ip_address": self.ip_address,
286 "user_agent": self.user_agent,
287 },
288 )
289
290
291 class ClientCacheKeyTestCase(AxesTestCase):
292 def test_get_cache_key(self):
293 """
294 Test the cache key format.
295 """
296
297 cache_hash_digest = md5(self.ip_address.encode()).hexdigest()
298 cache_hash_key = f"axes-{cache_hash_digest}"
299
300 # Getting cache key from request
301 request_factory = RequestFactory()
302 request = request_factory.post(
303 "/admin/login/", data={"username": self.username, "password": "test"}
304 )
305
306 self.assertEqual(cache_hash_key, get_client_cache_key(request))
307
308 # Getting cache key from AccessAttempt Object
309 attempt = AccessAttempt(
310 user_agent="<unknown>",
311 ip_address=self.ip_address,
312 username=self.username,
313 get_data="",
314 post_data="",
315 http_accept=request.META.get("HTTP_ACCEPT", "<unknown>"),
316 path_info=request.META.get("PATH_INFO", "<unknown>"),
317 failures_since_start=0,
318 )
319
320 self.assertEqual(cache_hash_key, get_client_cache_key(attempt))
321
322 def test_get_cache_key_empty_ip_address(self):
323 """
324 Simulate an empty IP address in the request.
325 """
326
327 empty_ip_address = ""
328
329 cache_hash_digest = md5(empty_ip_address.encode()).hexdigest()
330 cache_hash_key = f"axes-{cache_hash_digest}"
331
332 # Getting cache key from request
333 request_factory = RequestFactory()
334 request = request_factory.post(
335 "/admin/login/",
336 data={"username": self.username, "password": "test"},
337 REMOTE_ADDR=empty_ip_address,
338 )
339
340 self.assertEqual(cache_hash_key, get_client_cache_key(request))
341
342 # Getting cache key from AccessAttempt Object
343 attempt = AccessAttempt(
344 user_agent="<unknown>",
345 ip_address=empty_ip_address,
346 username=self.username,
347 get_data="",
348 post_data="",
349 http_accept=request.META.get("HTTP_ACCEPT", "<unknown>"),
350 path_info=request.META.get("PATH_INFO", "<unknown>"),
351 failures_since_start=0,
352 )
353
354 self.assertEqual(cache_hash_key, get_client_cache_key(attempt))
355
356 def test_get_cache_key_credentials(self):
357 """
358 Test the cache key format.
359 """
360
361 ip_address = self.ip_address
362 cache_hash_digest = md5(ip_address.encode()).hexdigest()
363 cache_hash_key = f"axes-{cache_hash_digest}"
364
365 # Getting cache key from request
366 request_factory = RequestFactory()
367 request = request_factory.post(
368 "/admin/login/", data={"username": self.username, "password": "test"}
369 )
370
371 # Difference between the upper test: new call signature with credentials
372 credentials = {"username": self.username}
373
374 self.assertEqual(cache_hash_key, get_client_cache_key(request, credentials))
375
376 # Getting cache key from AccessAttempt Object
377 attempt = AccessAttempt(
378 user_agent="<unknown>",
379 ip_address=ip_address,
380 username=self.username,
381 get_data="",
382 post_data="",
383 http_accept=request.META.get("HTTP_ACCEPT", "<unknown>"),
384 path_info=request.META.get("PATH_INFO", "<unknown>"),
385 failures_since_start=0,
386 )
387 self.assertEqual(cache_hash_key, get_client_cache_key(attempt))
388
389
390 class UsernameTestCase(AxesTestCase):
391 @override_settings(AXES_USERNAME_FORM_FIELD="username")
392 def test_default_get_client_username(self):
393 expected = "test-username"
394
395 request = HttpRequest()
396 request.POST["username"] = expected
397
398 actual = get_client_username(request)
399
400 self.assertEqual(expected, actual)
401
402 def test_default_get_client_username_drf(self):
403 class DRFRequest:
404 def __init__(self):
405 self.data = {}
406 self.POST = {}
407
408 expected = "test-username"
409
410 request = DRFRequest()
411 request.data["username"] = expected
412
413 actual = get_client_username(request)
414
415 self.assertEqual(expected, actual)
416
417 @override_settings(AXES_USERNAME_FORM_FIELD="username")
418 def test_default_get_client_username_credentials(self):
419 expected = "test-username"
420 expected_in_credentials = "test-credentials-username"
421
422 request = HttpRequest()
423 request.POST["username"] = expected
424 credentials = {"username": expected_in_credentials}
425
426 actual = get_client_username(request, credentials)
427
428 self.assertEqual(expected_in_credentials, actual)
429
430 def sample_customize_username(request, credentials):
431 return "prefixed-" + request.POST.get("username")
432
433 @override_settings(AXES_USERNAME_FORM_FIELD="username")
434 @override_settings(AXES_USERNAME_CALLABLE=sample_customize_username)
435 def test_custom_get_client_username_from_request(self):
436 provided = "test-username"
437 expected = "prefixed-" + provided
438 provided_in_credentials = "test-credentials-username"
439
440 request = HttpRequest()
441 request.POST["username"] = provided
442 credentials = {"username": provided_in_credentials}
443
444 actual = get_client_username(request, credentials)
445
446 self.assertEqual(expected, actual)
447
448 def sample_customize_username_credentials(request, credentials):
449 return "prefixed-" + credentials.get("username")
450
451 @override_settings(AXES_USERNAME_FORM_FIELD="username")
452 @override_settings(AXES_USERNAME_CALLABLE=sample_customize_username_credentials)
453 def test_custom_get_client_username_from_credentials(self):
454 provided = "test-username"
455 provided_in_credentials = "test-credentials-username"
456 expected_in_credentials = "prefixed-" + provided_in_credentials
457
458 request = HttpRequest()
459 request.POST["username"] = provided
460 credentials = {"username": provided_in_credentials}
461
462 actual = get_client_username(request, credentials)
463
464 self.assertEqual(expected_in_credentials, actual)
465
466 @override_settings(
467 AXES_USERNAME_CALLABLE=lambda request, credentials: "example"
468 ) # pragma: no cover
469 def test_get_client_username(self):
470 self.assertEqual(get_client_username(HttpRequest(), {}), "example")
471
472 @override_settings(AXES_USERNAME_CALLABLE=lambda request: None) # pragma: no cover
473 def test_get_client_username_invalid_callable_too_few_arguments(self):
474 with self.assertRaises(TypeError):
475 get_client_username(HttpRequest(), {})
476
477 @override_settings(
478 AXES_USERNAME_CALLABLE=lambda request, credentials, extra: None
479 ) # pragma: no cover
480 def test_get_client_username_invalid_callable_too_many_arguments(self):
481 with self.assertRaises(TypeError):
482 get_client_username(HttpRequest(), {})
483
484 @override_settings(AXES_USERNAME_CALLABLE=True)
485 def test_get_client_username_not_callable(self):
486 with self.assertRaises(TypeError):
487 get_client_username(HttpRequest(), {})
488
489 @override_settings(AXES_USERNAME_CALLABLE="axes.tests.test_utils.get_username")
490 def test_get_client_username_str(self):
491 self.assertEqual(get_client_username(HttpRequest(), {}), "username")
492
493
494 def get_username(request, credentials: dict) -> str:
495 return "username"
496
497
498 class IPWhitelistTestCase(AxesTestCase):
499 def setUp(self):
500 self.request = HttpRequest()
501 self.request.method = "POST"
502 self.request.META["REMOTE_ADDR"] = "127.0.0.1"
503 self.request.axes_ip_address = "127.0.0.1"
504
505 @override_settings(AXES_IP_WHITELIST=None)
506 def test_ip_in_whitelist_none(self):
507 self.assertFalse(is_ip_address_in_whitelist("127.0.0.2"))
508
509 @override_settings(AXES_IP_WHITELIST=["127.0.0.1"])
510 def test_ip_in_whitelist(self):
511 self.assertTrue(is_ip_address_in_whitelist("127.0.0.1"))
512 self.assertFalse(is_ip_address_in_whitelist("127.0.0.2"))
513
514 @override_settings(AXES_IP_BLACKLIST=None)
515 def test_ip_in_blacklist_none(self):
516 self.assertFalse(is_ip_address_in_blacklist("127.0.0.2"))
517
518 @override_settings(AXES_IP_BLACKLIST=["127.0.0.1"])
519 def test_ip_in_blacklist(self):
520 self.assertTrue(is_ip_address_in_blacklist("127.0.0.1"))
521 self.assertFalse(is_ip_address_in_blacklist("127.0.0.2"))
522
523 @override_settings(AXES_IP_BLACKLIST=["127.0.0.1"])
524 def test_is_client_ip_address_blacklisted_ip_in_blacklist(self):
525 self.assertTrue(is_client_ip_address_blacklisted(self.request))
526
527 @override_settings(AXES_IP_BLACKLIST=["127.0.0.2"])
528 def test_is_is_client_ip_address_blacklisted_ip_not_in_blacklist(self):
529 self.assertFalse(is_client_ip_address_blacklisted(self.request))
530
531 @override_settings(AXES_NEVER_LOCKOUT_WHITELIST=True)
532 @override_settings(AXES_IP_WHITELIST=["127.0.0.1"])
533 def test_is_client_ip_address_blacklisted_ip_in_whitelist(self):
534 self.assertFalse(is_client_ip_address_blacklisted(self.request))
535
536 @override_settings(AXES_ONLY_WHITELIST=True)
537 @override_settings(AXES_IP_WHITELIST=["127.0.0.2"])
538 def test_is_already_locked_ip_not_in_whitelist(self):
539 self.assertTrue(is_client_ip_address_blacklisted(self.request))
540
541 @override_settings(AXES_NEVER_LOCKOUT_WHITELIST=True)
542 @override_settings(AXES_IP_WHITELIST=["127.0.0.1"])
543 def test_is_client_ip_address_whitelisted_never_lockout(self):
544 self.assertTrue(is_client_ip_address_whitelisted(self.request))
545
546 @override_settings(AXES_ONLY_WHITELIST=True)
547 @override_settings(AXES_IP_WHITELIST=["127.0.0.1"])
548 def test_is_client_ip_address_whitelisted_only_allow(self):
549 self.assertTrue(is_client_ip_address_whitelisted(self.request))
550
551 @override_settings(AXES_ONLY_WHITELIST=True)
552 @override_settings(AXES_IP_WHITELIST=["127.0.0.2"])
553 def test_is_client_ip_address_whitelisted_not(self):
554 self.assertFalse(is_client_ip_address_whitelisted(self.request))
555
556
557 class MethodWhitelistTestCase(AxesTestCase):
558 def setUp(self):
559 self.request = HttpRequest()
560 self.request.method = "GET"
561
562 @override_settings(AXES_NEVER_LOCKOUT_GET=True)
563 def test_is_client_method_whitelisted(self):
564 self.assertTrue(is_client_method_whitelisted(self.request))
565
566 @override_settings(AXES_NEVER_LOCKOUT_GET=False)
567 def test_is_client_method_whitelisted_not(self):
568 self.assertFalse(is_client_method_whitelisted(self.request))
569
570
571 class LockoutResponseTestCase(AxesTestCase):
572 def setUp(self):
573 self.request = HttpRequest()
574
575 @override_settings(AXES_COOLOFF_TIME=42)
576 def test_get_lockout_response_cool_off(self):
577 get_lockout_response(request=self.request)
578
579 @override_settings(AXES_LOCKOUT_TEMPLATE="example.html")
580 @patch("axes.helpers.render")
581 def test_get_lockout_response_lockout_template(self, render):
582 self.assertFalse(render.called)
583 get_lockout_response(request=self.request)
584 self.assertTrue(render.called)
585
586 @override_settings(AXES_LOCKOUT_URL="https://example.com")
587 def test_get_lockout_response_lockout_url(self):
588 response = get_lockout_response(request=self.request)
589 self.assertEqual(type(response), HttpResponseRedirect)
590
591 def test_get_lockout_response_lockout_json(self):
592 self.request.is_ajax = lambda: True
593 response = get_lockout_response(request=self.request)
594 self.assertEqual(type(response), JsonResponse)
595
596 def test_get_lockout_response_lockout_response(self):
597 response = get_lockout_response(request=self.request)
598 self.assertEqual(type(response), HttpResponse)
+0
-5
axes/tests/urls.py less more
0 from django.conf.urls import url
1 from django.contrib import admin
2
3
4 urlpatterns = [url(r"^admin/", admin.site.urls)]
+0
-1
axes/tests/urls_empty.py less more
0 urlpatterns: list = []
55 """
66
77 from logging import getLogger
8 from typing import Optional
89
10 from django.http import HttpRequest
11
12 from axes.conf import settings
913 from axes.handlers.proxy import AxesProxyHandler
14 from axes.helpers import get_client_ip_address
1015
1116 log = getLogger(__name__)
1217
1318
14 def reset(ip: str = None, username: str = None) -> int:
19 def reset(ip: str = None, username: str = None, ip_or_username=False) -> int:
1520 """
1621 Reset records that match IP or username, and return the count of removed attempts.
1722
1823 This utility method is meant to be used from the CLI or via Python API.
1924 """
2025
21 return AxesProxyHandler.reset_attempts(ip_address=ip, username=username)
26 return AxesProxyHandler.reset_attempts(
27 ip_address=ip, username=username, ip_or_username=ip_or_username
28 )
29
30
31 def reset_request(request: HttpRequest) -> int:
32 """
33 Reset records that match IP or username, and return the count of removed attempts.
34
35 This utility method is meant to be used from the CLI or via Python API.
36 """
37
38 ip: Optional[str] = get_client_ip_address(request)
39 username = request.GET.get("username", None)
40
41 ip_or_username = settings.AXES_LOCK_OUT_BY_USER_OR_IP
42 if settings.AXES_ONLY_USER_FAILURES:
43 ip = None
44 elif not (
45 settings.AXES_LOCK_OUT_BY_USER_OR_IP
46 or settings.AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP
47 ):
48 username = None
49
50 if not ip and not username:
51 return 0
52 # We don't want to reset everything, if there is some wrong request parameter
53
54 # if settings.AXES_USE_USER_AGENT:
55 # TODO: reset based on user_agent?
56 return reset(ip, username, ip_or_username)
66
77 Refer to the project source code repository in
88 `GitHub <https://github.com/jazzband/django-axes/>`_ and see the
9 `Travis CI configuration <https://github.com/jazzband/django-axes/blob/master/.travis.yml>`_ and
9 `Tox configuration <https://github.com/jazzband/django-axes/blob/master/tox.ini>`_ and
1010 `Python package definition <https://github.com/jazzband/django-axes/blob/master/setup.py>`_
1111 to check if your Django and Python version are supported.
1212
13 The `Travis CI builds <https://travis-ci.org/jazzband/django-axes>`_
13 The `GitHub Actions builds <https://github.com/jazzband/django-axes/actions>`_
1414 test Axes compatibility with the Django master branch for future compatibility as well.
7272 ----------------------------
7373
7474 If you are implementing custom authentication, request middleware, or signal handlers
75 the Axes checks system might false positives in the Django checks framework.
75 the Axes checks system might generate false positives in the Django checks framework.
7676
7777 You can silence the unnecessary warnings by using the following Django settings::
7878
103103
104104 This disables the Axes middleware, authentication backend and signal receivers,
105105 which might fix errors with incompatible test configurations.
106
107
108 Disabling atomic requests
109 -------------------------
110
111 Django offers atomic database transactions that are tied to HTTP requests
112 and toggled on and off with the ``ATOMIC_REQUESTS`` configuration.
113
114 When ``ATOMIC_REQUESTS`` is set to ``True`` Django will always either perform
115 all database read and write operations in one successful atomic transaction
116 or in a case of failure roll them back, leaving no trace of the failed
117 request in the database.
118
119 However, sometimes Axes or another plugin can misbehave or not act correctly with
120 other code, preventing the login mechanisms from working due to e.g. exception
121 being thrown in some part of the code, preventing access attempts being logged
122 to database with Axes or causing similar problems.
123
124 If new attempts or log objects are not being correctly written to the Axes tables,
125 it is possible to configure Django ``ATOMIC_REQUESTS`` setting to to ``False``::
126
127 ATOMIC_REQUESTS = False
128
129 Please note that atomic requests are usually desirable when writing e.g. RESTful APIs,
130 but sometimes it can be problematic and warrant a disable.
131
132 Before disabling atomic requests or configuring them please read the relevant
133 Django documentation and make sure you know what you are configuring
134 rather than just toggling the flag on and off for testing.
135
136 Also note that the cache backend can provide correct functionality with
137 Memcached or Redis caches even with exceptions being thrown in the stack.
22 Usage
33 =====
44
5 Once Axes is is installed and configured, you can login and logout
5 Once Axes is installed and configured, you can login and logout
66 of your application via the ``django.contrib.auth`` views.
77 The attempts will be logged and visible in the Access Attempts section in admin.
88
4646 from IP under a particular username if the attempt limit has been exceeded,
4747 otherwise lock out based on IP.
4848 Default: ``False``
49 * ``AXES_LOCK_OUT_BY_USER_OR_IP``: If ``True``, prevent login
50 from if the attempt limit has been exceeded for IP or username.
51 Default: ``False``
4952 * ``AXES_USE_USER_AGENT``: If ``True``, lock out and log based on the IP address
5053 and the user agent. This means requests from different user agents but from
5154 the same IP are treated differently. This settings has no effect if the
5255 ``AXES_ONLY_USER_FAILURES`` setting is active.
5356 Default: ``False``
54 * ``AXES_LOGGER``: If set, specifies a logging mechanism for Axes to use.
55 Default: ``'axes.watch_login'``
5657 * ``AXES_HANDLER``: The path to the handler class to use.
5758 If set, overrides the default signal handler backend.
58 Default: ``'axes.handlers.database.DatabaseHandler'``
59 Default: ``'axes.handlers.database.AxesDatabaseHandler'``
5960 * ``AXES_CACHE``: The name of the cache for Axes to use.
6061 Default: ``'default'``
6162 * ``AXES_LOCKOUT_TEMPLATE``: If set, specifies a template to render when a
62 user is locked out. Template receives ``cooloff_time`` and ``failure_limit`` as
63 user is locked out. Template receives ``cooloff_timedelta``, ``cooloff_time``, ``username`` and ``failure_limit`` as
6364 context variables.
6465 Default: ``None``
6566 * ``AXES_LOCKOUT_URL``: If set, specifies a URL to redirect to on lockout. If both
9293 Default: ``None``
9394 * ``AXES_PASSWORD_FORM_FIELD``: the name of the form or credentials field that contains your users password.
9495 Default: ``password``
96 * ``AXES_SENSITIVE_PARAMETERS``: Configures POST and GET parameter values (in addition to the value of
97 ``AXES_PASSWORD_FORM_FIELD``) to mask in login attempt logging.
98 Default: ``[]``
9599 * ``AXES_NEVER_LOCKOUT_GET``: If ``True``, Axes will never lock out HTTP GET requests.
96100 Default: ``False``
97101 * ``AXES_NEVER_LOCKOUT_WHITELIST``: If ``True``, users can always login from whitelisted IP addresses.
107111 Default: ``False``
108112 * ``AXES_RESET_ON_SUCCESS``: If ``True``, a successful login will reset the number of failed logins.
109113 Default: ``False``
114 * ``AXES_ALLOWED_CORS_ORIGINS``: Configures lockout response CORS headers for XHR requests.
115 Default: ``*``
116 * ``AXES_HTTP_RESPONSE_CODE``: Sets the http response code returned when ``AXES_FAILURE_LIMIT`` is
117 reached.
118 For example: ``AXES_HTTP_RESPONSE_CODE = 429``
119 Default: ``403``
110120
111121 The configuration option precedences for the access attempt monitoring are:
112122
153153 authenticate. If you want to re-use the same function for consistency, that's
154154 fine, but Axes does not inject these changes into the authentication flow
155155 for you.
156
157
158 Customizing lockout responses
159 -----------------------------
160
161 Axes can be configured with ``AXES_LOCKOUT_CALLABLE`` to return a custom lockout response when using the plugin with e.g. DRF (Django REST Framework) or other third party libraries which require specialized formats such as JSON or XML response formats or customized response status codes.
162
163 An example of usage could be e.g. a custom view for processing lockouts.
164
165 ``example/views.py``::
166
167 from django.http import JsonResponse
168
169 def lockout(request, credentials, *args, **kwargs):
170 return JsonResponse({"status": "Locked out due to too many login failures"}, status=403)
171
172 ``settings.py``::
173
174 AXES_LOCKOUT_CALLABLE = "example.views.lockout"
88
99 In the following table
1010 **Compatible** means that a component should be fully compatible out-of-the-box,
11 **Functional** means that a component should be functional after customization, and
11 **Functional** means that a component should be functional after configuration, and
1212 **Incompatible** means that a component has been reported as non-functional with Axes.
1313
1414 ======================= ============= ============ ============ ==============
1515 Project Version Compatible Functional Incompatible
1616 ======================= ============= ============ ============ ==============
17 Django REST Framework |gte| 3.7.0 |check|
18 Django REST Framework |lt| 3.7.0 |check|
17 Django REST Framework |check|
1918 Django Allauth |check|
2019 Django Simple Captcha |check|
2120 Django OAuth Toolkit |check|
9089
9190 urlpatterns = [
9291 # Override allauth default login view with a patched view
93 url(r'^accounts/login/$', LoginView.as_view(form_class=AxesLoginForm), name='account_login'),
94 url(r'^accounts/', include('allauth.urls')),
92 path('accounts/login/', LoginView.as_view(form_class=AxesLoginForm), name='account_login'),
93 path('accounts/', include('allauth.urls')),
9594 ]
9695
9796
9897 Integration with Django REST Framework
9998 --------------------------------------
10099
101 .. note::
102 Modern versions of Django REST Framework after 3.7.0 work normally with Axes
103 out-of-the-box and require no customization in DRF.
104
105
106 Django REST Framework versions prior to 3.7.0
107 require the request object to be passed for authentication
108 by a customized DRF authentication class::
109
110 from rest_framework.authentication import BasicAuthentication
111
112 class AxesBasicAuthentication(BasicAuthentication):
113 """
114 Extended basic authentication backend class that supplies the
115 request object into the authentication call for Axes compatibility.
116
117 NOTE: This patch is only needed for DRF versions < 3.7.0.
118 """
119
120 def authenticate(self, request):
121 # NOTE: Request is added as an instance attribute in here
122 self._current_request = request
123 return super().authenticate(request)
124
125 def authenticate_credentials(self, userid, password, request=None):
126 credentials = {
127 get_user_model().USERNAME_FIELD: userid,
128 'password': password
129 }
130
131 # NOTE: Request is added as an argument to the authenticate call here
132 user = authenticate(request=request or self._current_request, **credentials)
133
134 if user is None:
135 raise exceptions.AuthenticationFailed(_('Invalid username/password.'))
136
137 if not user.is_active:
138 raise exceptions.AuthenticationFailed(_('User inactive or deleted.'))
139
140 return (user, None)
100 Django Axes requires REST Framework to be connected
101 via lockout signals for correct functionality.
102
103 You can use the following snippet in your project signals such as ``example/signals.py``::
104
105 from django.dispatch import receiver
106
107 from axes.signals import user_locked_out
108 from rest_framework.exceptions import PermissionDenied
109
110
111 @receiver(user_locked_out)
112 def raise_permission_denied(*args, **kwargs):
113 raise PermissionDenied("Too many failed login attempts")
114
115 And then configure your application to load it in ``examples/apps.py``::
116
117 from django import apps
118
119
120 class AppConfig(apps.AppConfig):
121 name = "example"
122
123 def ready(self):
124 from example import signals # noqa
125
126 Please check the Django signals documentation for more information:
127
128 https://docs.djangoproject.com/en/3.1/topics/signals/
129
130 When a user login fails a signal is emitted and PermissionDenied
131 raises a HTTP 403 reply which interrupts the login process.
132
133 This functionality was handled in the middleware for a time,
134 but that resulted in extra database requests being made for
135 each and every web request, and was migrated to signals.
141136
142137
143138 Integration with Django Simple Captcha
160155
161156 ``example/views.py``::
162157
163 from axes.utils import reset
164 from axes.helpers import get_client_ip_address
158 from axes.utils import reset_request
165159 from django.http.response import HttpResponseRedirect
166160 from django.shortcuts import render
167161 from django.urls import reverse_lazy
173167 if request.POST:
174168 form = AxesCaptchaForm(request.POST)
175169 if form.is_valid():
176 ip = get_client_ip_address(request)
177 reset(ip=ip)
170 reset_request(request)
178171 return HttpResponseRedirect(reverse_lazy('auth_login'))
179172 else:
180173 form = AxesCaptchaForm()
3636
3737 $ tox
3838
39 Tox runs the same test set that is run by Travis, and your code should be good to go if it passes.
39 Tox runs the same test set that is run by GitHub Actions, and your code should be good to go if it passes.
4040
4141 If you wish to limit the testing to specific environment(s), you can parametrize the tox run::
4242
43 $ tox -e py37-django21
43 $ tox -e py39-django22
4444
4545 After you have pushed your changes, open a pull request on GitHub for getting your code upstreamed.
55 http://www.sphinx-doc.org/en/master/usage/configuration.html
66 """
77
8 from os import environ
8 import sphinx_rtd_theme
99 from pkg_resources import get_distribution
1010
1111 import django
12 import sphinx_rtd_theme
12 from django.conf import settings
1313
14 environ.setdefault("DJANGO_SETTINGS_MODULE", "axes.tests.settings")
14 settings.configure(INSTALLED_APPS=["django", "django.contrib.auth", "axes"], DEBUG=True)
1515 django.setup()
16
1617
1718 # -- Extra custom configuration ------------------------------------------
1819
33 import sys
44
55 if __name__ == "__main__":
6 os.environ.setdefault("DJANGO_SETTINGS_MODULE", "axes.tests.settings")
6 os.environ.setdefault("DJANGO_SETTINGS_MODULE", "tests.settings")
77
88 from django.core.management import execute_from_command_line
99
00 [build-system]
11 requires = ["setuptools>=30.3.0", "wheel", "setuptools_scm"]
2
3 [tool.pytest.ini_options]
4 testpaths = "tests"
5 addopts = "--cov axes --cov-append --cov-branch --cov-report term-missing --cov-report=xml"
6 DJANGO_SETTINGS_MODULE = "tests.settings"
7
8 [tool.tox]
9 legacy_tox_ini = """
10 [tox]
11 envlist =
12 py{36,37,38,39,py3}-dj{22,31,32}
13 py{38,39}-djmain
14 py38-djqa
15
16 [gh-actions]
17 python =
18 3.6: py36
19 3.7: py37
20 3.8: py38
21 3.9: py39
22 pypy3: pypy3
23
24 [gh-actions:env]
25 DJANGO =
26 2.2: dj22
27 3.1: dj31
28 3.2: dj32
29 main: djmain
30 qa: djqa
31
32 # Normal test environment runs pytest which orchestrates other tools
33 [testenv]
34 deps =
35 -r requirements-test.txt
36 dj22: django>=2.2,<2.3
37 dj31: django>=3.1,<3.2
38 dj32: django>=3.2,<3.3
39 djmain: https://github.com/django/django/archive/main.tar.gz
40 usedevelop = true
41 commands = pytest
42 setenv =
43 PYTHONDONTWRITEBYTECODE=1
44
45 # Django development version is allowed to fail the test matrix
46 [testenv:py{38,39,py3}-djmain]
47 ignore_errors = true
48 ignore_outcome = true
49
50 # QA runs type checks, linting, and code formatting checks
51 [testenv:py38-djqa]
52 deps = -r requirements-qa.txt
53 commands =
54 mypy axes
55 prospector
56 black -t py36 --check --diff axes
57 """
+0
-4
pytest.ini less more
0 [pytest]
1 addopts = --cov axes --cov-config .coveragerc --cov-append --cov-report term-missing
2 python_files = tests.py test_*.py tests_*.py *_tests.py *_test.py
3 DJANGO_SETTINGS_MODULE = axes.tests.settings
0 black==21.8b0
1 mypy==0.910
2 prospector==1.3.1
3 types-pkg_resources # Type stub
0 -e .
1 coverage==5.5
2 pytest==6.2.5
3 pytest-cov==2.12.1
4 pytest-django==4.4.0
5 pytest-subtests==0.5.0
00 -e .
1 black==19.3b0
2 coverage==5.2.1
3 mypy==0.761; python_version < '3.8' and python_implementation != 'PyPy'
4 prospector==1.2.0; python_version < '3.8' and python_implementation != 'PyPy'
5 pytest==6.0.1
6 pytest-cov==2.10.0
7 pytest-django==3.9.0
8 sphinx_rtd_theme==0.5.0
9 tox==3.18.1
1 -r requirements-qa.txt
2 -r requirements-test.txt
3 sphinx_rtd_theme==0.5.2
4 tox==3.24.3
3535 use_scm_version=True,
3636 setup_requires=["setuptools_scm"],
3737 python_requires="~=3.6",
38 install_requires=["django>=2.0", "django-appconf>=1.0.3", "django-ipware>=3,<4"],
38 install_requires=["django>=2.2", "django-ipware>=3,<5"],
3939 include_package_data=True,
40 packages=find_packages(),
40 packages=find_packages(exclude=["tests"]),
4141 classifiers=[
4242 "Development Status :: 5 - Production/Stable",
4343 "Environment :: Web Environment",
4444 "Environment :: Plugins",
4545 "Framework :: Django",
4646 "Framework :: Django :: 2.2",
47 "Framework :: Django :: 3.0",
4847 "Framework :: Django :: 3.1",
48 "Framework :: Django :: 3.2",
4949 "Intended Audience :: Developers",
5050 "Intended Audience :: System Administrators",
5151 "License :: OSI Approved :: MIT License",
5555 "Programming Language :: Python :: 3.6",
5656 "Programming Language :: Python :: 3.7",
5757 "Programming Language :: Python :: 3.8",
58 "Programming Language :: Python :: 3.9",
5859 "Programming Language :: Python :: Implementation :: CPython",
5960 "Programming Language :: Python :: Implementation :: PyPy",
6061 "Topic :: Internet :: Log Analysis",
(New empty file)
0 from random import choice
1 from string import ascii_letters, digits
2 from time import sleep
3
4 from django.contrib.auth import get_user_model
5 from django.http import HttpRequest
6 from django.test import TestCase
7 from django.urls import reverse
8 from django.utils.timezone import now
9
10 from axes.conf import settings
11 from axes.helpers import (
12 get_cache,
13 get_client_http_accept,
14 get_client_ip_address,
15 get_client_path_info,
16 get_client_user_agent,
17 get_cool_off,
18 get_credentials,
19 get_failure_limit,
20 )
21 from axes.models import AccessAttempt, AccessLog
22 from axes.utils import reset
23
24
25 def custom_failure_limit(request, credentials):
26 return 3
27
28
29 class AxesTestCase(TestCase):
30 """
31 Test case using custom settings for testing.
32 """
33
34 VALID_USERNAME = "axes-valid-username"
35 VALID_PASSWORD = "axes-valid-password"
36 VALID_EMAIL = "axes-valid-email@example.com"
37 VALID_USER_AGENT = "axes-user-agent"
38 VALID_IP_ADDRESS = "127.0.0.1"
39
40 INVALID_USERNAME = "axes-invalid-username"
41 INVALID_PASSWORD = "axes-invalid-password"
42 INVALID_EMAIL = "axes-invalid-email@example.com"
43
44 LOCKED_MESSAGE = "Account locked: too many login attempts."
45 LOGOUT_MESSAGE = "Logged out"
46 LOGIN_FORM_KEY = '<input type="submit" value="Log in" />'
47
48 STATUS_SUCCESS = 200
49 ALLOWED = 302
50 BLOCKED = 403
51
52 def setUp(self):
53 """
54 Create a valid user for login.
55 """
56
57 self.username = self.VALID_USERNAME
58 self.password = self.VALID_PASSWORD
59 self.email = self.VALID_EMAIL
60
61 self.ip_address = self.VALID_IP_ADDRESS
62 self.user_agent = self.VALID_USER_AGENT
63 self.path_info = reverse("admin:login")
64
65 self.user = get_user_model().objects.create_superuser(
66 username=self.username, password=self.password, email=self.email
67 )
68
69 self.request = HttpRequest()
70 self.request.method = "POST"
71 self.request.META["REMOTE_ADDR"] = self.ip_address
72 self.request.META["HTTP_USER_AGENT"] = self.user_agent
73 self.request.META["PATH_INFO"] = self.path_info
74
75 self.request.axes_attempt_time = now()
76 self.request.axes_ip_address = get_client_ip_address(self.request)
77 self.request.axes_user_agent = get_client_user_agent(self.request)
78 self.request.axes_path_info = get_client_path_info(self.request)
79 self.request.axes_http_accept = get_client_http_accept(self.request)
80 self.request.axes_failures_since_start = None
81
82 self.credentials = get_credentials(self.username)
83
84 def tearDown(self):
85 get_cache().clear()
86
87 def get_kwargs_with_defaults(self, **kwargs):
88 defaults = {
89 "user_agent": self.user_agent,
90 "ip_address": self.ip_address,
91 "username": self.username,
92 }
93
94 defaults.update(kwargs)
95 return defaults
96
97 def create_attempt(self, **kwargs):
98 kwargs = self.get_kwargs_with_defaults(**kwargs)
99 kwargs.setdefault("failures_since_start", 1)
100 return AccessAttempt.objects.create(**kwargs)
101
102 def create_log(self, **kwargs):
103 return AccessLog.objects.create(**self.get_kwargs_with_defaults(**kwargs))
104
105 def reset(self, ip=None, username=None):
106 return reset(ip, username)
107
108 def login(
109 self,
110 is_valid_username=False,
111 is_valid_password=False,
112 remote_addr=None,
113 **kwargs
114 ):
115 """
116 Login a user.
117
118 A valid credential is used when is_valid_username is True,
119 otherwise it will use a random string to make a failed login.
120 """
121
122 if is_valid_username:
123 username = self.VALID_USERNAME
124 else:
125 username = "".join(choice(ascii_letters + digits) for _ in range(10))
126
127 if is_valid_password:
128 password = self.VALID_PASSWORD
129 else:
130 password = self.INVALID_PASSWORD
131
132 post_data = {"username": username, "password": password, **kwargs}
133
134 return self.client.post(
135 reverse("admin:login"),
136 post_data,
137 REMOTE_ADDR=remote_addr or self.ip_address,
138 HTTP_USER_AGENT=self.user_agent,
139 )
140
141 def logout(self):
142 return self.client.post(
143 reverse("admin:logout"),
144 REMOTE_ADDR=self.ip_address,
145 HTTP_USER_AGENT=self.user_agent,
146 )
147
148 def check_login(self):
149 response = self.login(is_valid_username=True, is_valid_password=True)
150 self.assertNotContains(
151 response, self.LOGIN_FORM_KEY, status_code=self.ALLOWED, html=True
152 )
153
154 def almost_lockout(self):
155 for _ in range(1, get_failure_limit(None, None)):
156 response = self.login()
157 self.assertContains(response, self.LOGIN_FORM_KEY, html=True)
158
159 def lockout(self):
160 self.almost_lockout()
161 return self.login()
162
163 def check_lockout(self):
164 response = self.lockout()
165 if settings.AXES_LOCK_OUT_AT_FAILURE == True:
166 self.assertContains(response, self.LOCKED_MESSAGE, status_code=self.BLOCKED)
167 else:
168 self.assertNotContains(
169 response, self.LOCKED_MESSAGE, status_code=self.STATUS_SUCCESS
170 )
171
172 def cool_off(self):
173 sleep(get_cool_off().total_seconds())
174
175 def check_logout(self):
176 response = self.logout()
177 self.assertContains(
178 response, self.LOGOUT_MESSAGE, status_code=self.STATUS_SUCCESS
179 )
180
181 def check_handler(self):
182 """
183 Check a handler and its basic functionality with lockouts, cool offs, login, and logout.
184
185 This is a check that is intended to successfully run for each and every new handler.
186 """
187
188 self.check_lockout()
189 self.cool_off()
190 self.check_login()
191 self.check_logout()
0 DATABASES = {"default": {"ENGINE": "django.db.backends.sqlite3", "NAME": ":memory:"}}
1
2 CACHES = {
3 "default": {
4 # This cache backend is OK to use in development and testing
5 # but has the potential to break production setups with more than on process
6 # due to each process having their own local memory based cache
7 "BACKEND": "django.core.cache.backends.locmem.LocMemCache"
8 }
9 }
10
11 SITE_ID = 1
12
13 MIDDLEWARE = [
14 "django.middleware.common.CommonMiddleware",
15 "django.contrib.sessions.middleware.SessionMiddleware",
16 "django.contrib.auth.middleware.AuthenticationMiddleware",
17 "django.contrib.messages.middleware.MessageMiddleware",
18 "axes.middleware.AxesMiddleware",
19 ]
20
21 AUTHENTICATION_BACKENDS = [
22 "axes.backends.AxesBackend",
23 "django.contrib.auth.backends.ModelBackend",
24 ]
25
26 PASSWORD_HASHERS = ["django.contrib.auth.hashers.MD5PasswordHasher"]
27
28 ROOT_URLCONF = "tests.urls"
29
30 INSTALLED_APPS = [
31 "django.contrib.auth",
32 "django.contrib.contenttypes",
33 "django.contrib.sessions",
34 "django.contrib.sites",
35 "django.contrib.messages",
36 "django.contrib.admin",
37 "axes",
38 ]
39
40 TEMPLATES = [
41 {
42 "BACKEND": "django.template.backends.django.DjangoTemplates",
43 "DIRS": [],
44 "APP_DIRS": True,
45 "OPTIONS": {
46 "context_processors": [
47 "django.template.context_processors.debug",
48 "django.template.context_processors.request",
49 "django.contrib.auth.context_processors.auth",
50 "django.contrib.messages.context_processors.messages",
51 ]
52 },
53 }
54 ]
55
56 LOGGING = {
57 "version": 1,
58 "disable_existing_loggers": False,
59 "handlers": {"console": {"class": "logging.StreamHandler"}},
60 "loggers": {"axes": {"handlers": ["console"], "level": "INFO", "propagate": False}},
61 }
62
63 SECRET_KEY = "too-secret-for-test"
64
65 USE_I18N = False
66
67 USE_L10N = False
68
69 USE_TZ = False
70
71 LOGIN_REDIRECT_URL = "/admin/"
72
73 AXES_FAILURE_LIMIT = 10
74
75 DEFAULT_AUTO_FIELD = "django.db.models.AutoField"
0 from contextlib import suppress
1 from importlib import reload
2
3 from django.contrib import admin
4 from django.test import override_settings
5
6 import axes.admin
7 from axes.models import AccessAttempt, AccessLog
8 from tests.base import AxesTestCase
9
10
11 class AxesEnableAdminFlag(AxesTestCase):
12 def setUp(self):
13 with suppress(admin.sites.NotRegistered):
14 admin.site.unregister(AccessAttempt)
15 with suppress(admin.sites.NotRegistered):
16 admin.site.unregister(AccessLog)
17
18 @override_settings(AXES_ENABLE_ADMIN=False)
19 def test_disable_admin(self):
20 reload(axes.admin)
21 self.assertFalse(admin.site.is_registered(AccessAttempt))
22 self.assertFalse(admin.site.is_registered(AccessLog))
23
24 def test_enable_admin_by_default(self):
25 reload(axes.admin)
26 self.assertTrue(admin.site.is_registered(AccessAttempt))
27 self.assertTrue(admin.site.is_registered(AccessLog))
0 from unittest.mock import patch
1
2 from django.http import HttpRequest
3 from django.test import override_settings
4 from django.utils.timezone import now
5
6 from axes.attempts import get_cool_off_threshold
7 from axes.models import AccessAttempt
8 from axes.utils import reset, reset_request
9 from tests.base import AxesTestCase
10
11
12 class GetCoolOffThresholdTestCase(AxesTestCase):
13 @override_settings(AXES_COOLOFF_TIME=42)
14 def test_get_cool_off_threshold(self):
15 timestamp = now()
16
17 with patch("axes.attempts.now", return_value=timestamp):
18 attempt_time = timestamp
19 threshold_now = get_cool_off_threshold(attempt_time)
20
21 attempt_time = None
22 threshold_none = get_cool_off_threshold(attempt_time)
23
24 self.assertEqual(threshold_now, threshold_none)
25
26 @override_settings(AXES_COOLOFF_TIME=None)
27 def test_get_cool_off_threshold_error(self):
28 with self.assertRaises(TypeError):
29 get_cool_off_threshold()
30
31
32 class ResetTestCase(AxesTestCase):
33 def test_reset(self):
34 self.create_attempt()
35 reset()
36 self.assertFalse(AccessAttempt.objects.count())
37
38 def test_reset_ip(self):
39 self.create_attempt(ip_address=self.ip_address)
40 reset(ip=self.ip_address)
41 self.assertFalse(AccessAttempt.objects.count())
42
43 def test_reset_username(self):
44 self.create_attempt(username=self.username)
45 reset(username=self.username)
46 self.assertFalse(AccessAttempt.objects.count())
47
48
49 class ResetResponseTestCase(AxesTestCase):
50 USERNAME_1 = "foo_username"
51 USERNAME_2 = "bar_username"
52 IP_1 = "127.1.0.1"
53 IP_2 = "127.1.0.2"
54
55 def setUp(self):
56 super().setUp()
57 self.create_attempt()
58 self.create_attempt(username=self.USERNAME_1, ip_address=self.IP_1)
59 self.create_attempt(username=self.USERNAME_1, ip_address=self.IP_2)
60 self.create_attempt(username=self.USERNAME_2, ip_address=self.IP_1)
61 self.create_attempt(username=self.USERNAME_2, ip_address=self.IP_2)
62 self.request = HttpRequest()
63
64 def test_reset(self):
65 reset_request(self.request)
66 self.assertEqual(AccessAttempt.objects.count(), 5)
67
68 def test_reset_ip(self):
69 self.request.META["REMOTE_ADDR"] = self.IP_1
70 reset_request(self.request)
71 self.assertEqual(AccessAttempt.objects.count(), 3)
72
73 def test_reset_username(self):
74 self.request.GET["username"] = self.USERNAME_1
75 reset_request(self.request)
76 self.assertEqual(AccessAttempt.objects.count(), 5)
77
78 def test_reset_ip_username(self):
79 self.request.GET["username"] = self.USERNAME_1
80 self.request.META["REMOTE_ADDR"] = self.IP_1
81 reset_request(self.request)
82 self.assertEqual(AccessAttempt.objects.count(), 3)
83
84 @override_settings(AXES_ONLY_USER_FAILURES=True)
85 def test_reset_user_failures(self):
86 reset_request(self.request)
87 self.assertEqual(AccessAttempt.objects.count(), 5)
88
89 @override_settings(AXES_ONLY_USER_FAILURES=True)
90 def test_reset_ip_user_failures(self):
91 self.request.META["REMOTE_ADDR"] = self.IP_1
92 reset_request(self.request)
93 self.assertEqual(AccessAttempt.objects.count(), 5)
94
95 @override_settings(AXES_ONLY_USER_FAILURES=True)
96 def test_reset_username_user_failures(self):
97 self.request.GET["username"] = self.USERNAME_1
98 reset_request(self.request)
99 self.assertEqual(AccessAttempt.objects.count(), 3)
100
101 @override_settings(AXES_ONLY_USER_FAILURES=True)
102 def test_reset_ip_username_user_failures(self):
103 self.request.GET["username"] = self.USERNAME_1
104 self.request.META["REMOTE_ADDR"] = self.IP_1
105 reset_request(self.request)
106 self.assertEqual(AccessAttempt.objects.count(), 3)
107
108 @override_settings(AXES_LOCK_OUT_BY_USER_OR_IP=True)
109 def test_reset_user_or_ip(self):
110 reset_request(self.request)
111 self.assertEqual(AccessAttempt.objects.count(), 5)
112
113 @override_settings(AXES_LOCK_OUT_BY_USER_OR_IP=True)
114 def test_reset_ip_user_or_ip(self):
115 self.request.META["REMOTE_ADDR"] = self.IP_1
116 reset_request(self.request)
117 self.assertEqual(AccessAttempt.objects.count(), 3)
118
119 @override_settings(AXES_LOCK_OUT_BY_USER_OR_IP=True)
120 def test_reset_username_user_or_ip(self):
121 self.request.GET["username"] = self.USERNAME_1
122 reset_request(self.request)
123 self.assertEqual(AccessAttempt.objects.count(), 3)
124
125 @override_settings(AXES_LOCK_OUT_BY_USER_OR_IP=True)
126 def test_reset_ip_username_user_or_ip(self):
127 self.request.GET["username"] = self.USERNAME_1
128 self.request.META["REMOTE_ADDR"] = self.IP_1
129 reset_request(self.request)
130 self.assertEqual(AccessAttempt.objects.count(), 2)
131
132 @override_settings(AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP=True)
133 def test_reset_user_and_ip(self):
134 reset_request(self.request)
135 self.assertEqual(AccessAttempt.objects.count(), 5)
136
137 @override_settings(AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP=True)
138 def test_reset_ip_user_and_ip(self):
139 self.request.META["REMOTE_ADDR"] = self.IP_1
140 reset_request(self.request)
141 self.assertEqual(AccessAttempt.objects.count(), 3)
142
143 @override_settings(AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP=True)
144 def test_reset_username_user_and_ip(self):
145 self.request.GET["username"] = self.USERNAME_1
146 reset_request(self.request)
147 self.assertEqual(AccessAttempt.objects.count(), 3)
148
149 @override_settings(AXES_LOCK_OUT_BY_USER_OR_AND=True)
150 def test_reset_ip_username_user_and_ip(self):
151 self.request.GET["username"] = self.USERNAME_1
152 self.request.META["REMOTE_ADDR"] = self.IP_1
153 reset_request(self.request)
154 self.assertEqual(AccessAttempt.objects.count(), 3)
0 from unittest.mock import patch, MagicMock
1
2 from axes.backends import AxesBackend
3 from axes.exceptions import (
4 AxesBackendRequestParameterRequired,
5 AxesBackendPermissionDenied,
6 )
7 from tests.base import AxesTestCase
8
9
10 class BackendTestCase(AxesTestCase):
11 def test_authenticate_raises_on_missing_request(self):
12 request = None
13
14 with self.assertRaises(AxesBackendRequestParameterRequired):
15 AxesBackend().authenticate(request)
16
17 @patch("axes.handlers.proxy.AxesProxyHandler.is_allowed", return_value=False)
18 def test_authenticate_raises_on_locked_request(self, _):
19 request = MagicMock()
20
21 with self.assertRaises(AxesBackendPermissionDenied):
22 AxesBackend().authenticate(request)
0 from django.core.checks import run_checks, Warning # pylint: disable=redefined-builtin
1 from django.test import override_settings, modify_settings
2
3 from axes.backends import AxesBackend
4 from axes.checks import Messages, Hints, Codes
5 from tests.base import AxesTestCase
6
7
8 class CacheCheckTestCase(AxesTestCase):
9 @override_settings(
10 AXES_HANDLER="axes.handlers.cache.AxesCacheHandler",
11 CACHES={
12 "default": {
13 "BACKEND": "django.core.cache.backends.db.DatabaseCache",
14 "LOCATION": "axes_cache",
15 }
16 },
17 )
18 def test_cache_check(self):
19 warnings = run_checks()
20 self.assertEqual(warnings, [])
21
22 @override_settings(
23 AXES_HANDLER="axes.handlers.cache.AxesCacheHandler",
24 CACHES={
25 "default": {"BACKEND": "django.core.cache.backends.locmem.LocMemCache"}
26 },
27 )
28 def test_cache_check_warnings(self):
29 warnings = run_checks()
30 warning = Warning(
31 msg=Messages.CACHE_INVALID, hint=Hints.CACHE_INVALID, id=Codes.CACHE_INVALID
32 )
33
34 self.assertEqual(warnings, [warning])
35
36 @override_settings(
37 AXES_HANDLER="axes.handlers.database.AxesDatabaseHandler",
38 CACHES={
39 "default": {"BACKEND": "django.core.cache.backends.locmem.LocMemCache"}
40 },
41 )
42 def test_cache_check_does_not_produce_check_warnings_with_database_handler(self):
43 warnings = run_checks()
44 self.assertEqual(warnings, [])
45
46
47 class MiddlewareCheckTestCase(AxesTestCase):
48 @modify_settings(MIDDLEWARE={"remove": ["axes.middleware.AxesMiddleware"]})
49 def test_cache_check_warnings(self):
50 warnings = run_checks()
51 warning = Warning(
52 msg=Messages.MIDDLEWARE_INVALID,
53 hint=Hints.MIDDLEWARE_INVALID,
54 id=Codes.MIDDLEWARE_INVALID,
55 )
56
57 self.assertEqual(warnings, [warning])
58
59
60 class AxesSpecializedBackend(AxesBackend):
61 pass
62
63
64 class BackendCheckTestCase(AxesTestCase):
65 @modify_settings(AUTHENTICATION_BACKENDS={"remove": ["axes.backends.AxesBackend"]})
66 def test_backend_missing(self):
67 warnings = run_checks()
68 warning = Warning(
69 msg=Messages.BACKEND_INVALID,
70 hint=Hints.BACKEND_INVALID,
71 id=Codes.BACKEND_INVALID,
72 )
73
74 self.assertEqual(warnings, [warning])
75
76 @override_settings(
77 AUTHENTICATION_BACKENDS=["tests.test_checks.AxesSpecializedBackend"]
78 )
79 def test_specialized_backend(self):
80 warnings = run_checks()
81 self.assertEqual(warnings, [])
82
83 @override_settings(
84 AUTHENTICATION_BACKENDS=["tests.test_checks.AxesNotDefinedBackend"]
85 )
86 def test_import_error(self):
87 with self.assertRaises(ImportError):
88 run_checks()
89
90 @override_settings(AUTHENTICATION_BACKENDS=["module.not_defined"])
91 def test_module_not_found_error(self):
92 with self.assertRaises(ModuleNotFoundError):
93 run_checks()
94
95
96 class DeprecatedSettingsTestCase(AxesTestCase):
97 def setUp(self):
98 self.disable_success_access_log_warning = Warning(
99 msg=Messages.SETTING_DEPRECATED.format(
100 deprecated_setting="AXES_DISABLE_SUCCESS_ACCESS_LOG"
101 ),
102 hint=Hints.SETTING_DEPRECATED,
103 id=Codes.SETTING_DEPRECATED,
104 )
105
106 @override_settings(AXES_DISABLE_SUCCESS_ACCESS_LOG=True)
107 def test_deprecated_success_access_log_flag(self):
108 warnings = run_checks()
109 self.assertEqual(warnings, [self.disable_success_access_log_warning])
0 from unittest.mock import MagicMock, patch
1
2 from django.http import HttpResponse
3
4 from axes.decorators import axes_dispatch, axes_form_invalid
5 from tests.base import AxesTestCase
6
7
8 class DecoratorTestCase(AxesTestCase):
9 SUCCESS_RESPONSE = HttpResponse(status=200, content="Dispatched")
10 LOCKOUT_RESPONSE = HttpResponse(status=403, content="Locked out")
11
12 def setUp(self):
13 self.request = MagicMock()
14 self.cls = MagicMock(return_value=self.request)
15 self.func = MagicMock(return_value=self.SUCCESS_RESPONSE)
16
17 @patch("axes.handlers.proxy.AxesProxyHandler.is_allowed", return_value=False)
18 @patch("axes.decorators.get_lockout_response", return_value=LOCKOUT_RESPONSE)
19 def test_axes_dispatch_locks_out(self, _, __):
20 response = axes_dispatch(self.func)(self.request)
21 self.assertEqual(response.content, self.LOCKOUT_RESPONSE.content)
22
23 @patch("axes.handlers.proxy.AxesProxyHandler.is_allowed", return_value=True)
24 @patch("axes.decorators.get_lockout_response", return_value=LOCKOUT_RESPONSE)
25 def test_axes_dispatch_dispatches(self, _, __):
26 response = axes_dispatch(self.func)(self.request)
27 self.assertEqual(response.content, self.SUCCESS_RESPONSE.content)
28
29 @patch("axes.handlers.proxy.AxesProxyHandler.is_allowed", return_value=False)
30 @patch("axes.decorators.get_lockout_response", return_value=LOCKOUT_RESPONSE)
31 def test_axes_form_invalid_locks_out(self, _, __):
32 response = axes_form_invalid(self.func)(self.cls)
33 self.assertEqual(response.content, self.LOCKOUT_RESPONSE.content)
34
35 @patch("axes.handlers.proxy.AxesProxyHandler.is_allowed", return_value=True)
36 @patch("axes.decorators.get_lockout_response", return_value=LOCKOUT_RESPONSE)
37 def test_axes_form_invalid_dispatches(self, _, __):
38 response = axes_form_invalid(self.func)(self.cls)
39 self.assertEqual(response.content, self.SUCCESS_RESPONSE.content)
0 from platform import python_implementation
1 from unittest.mock import MagicMock, patch
2
3 from pytest import mark
4
5 from django.core.cache import cache
6 from django.test import override_settings
7 from django.urls import reverse
8 from django.utils import timezone
9 from django.utils.timezone import timedelta
10
11 from axes.conf import settings
12 from axes.handlers.proxy import AxesProxyHandler
13 from axes.helpers import get_client_str
14 from axes.models import AccessAttempt, AccessLog
15 from tests.base import AxesTestCase
16
17
18 @override_settings(AXES_HANDLER="axes.handlers.base.AxesHandler")
19 class AxesHandlerTestCase(AxesTestCase):
20 @override_settings(AXES_IP_BLACKLIST=["127.0.0.1"])
21 def test_is_allowed_with_blacklisted_ip_address(self):
22 self.assertFalse(AxesProxyHandler.is_allowed(self.request))
23
24 @override_settings(
25 AXES_NEVER_LOCKOUT_WHITELIST=True, AXES_IP_WHITELIST=["127.0.0.1"]
26 )
27 def test_is_allowed_with_whitelisted_ip_address(self):
28 self.assertTrue(AxesProxyHandler.is_allowed(self.request))
29
30 @override_settings(AXES_NEVER_LOCKOUT_GET=True)
31 def test_is_allowed_with_whitelisted_method(self):
32 self.request.method = "GET"
33 self.assertTrue(AxesProxyHandler.is_allowed(self.request))
34
35 @override_settings(AXES_LOCK_OUT_AT_FAILURE=False)
36 def test_is_allowed_no_lock_out(self):
37 self.assertTrue(AxesProxyHandler.is_allowed(self.request))
38
39 @override_settings(AXES_ONLY_ADMIN_SITE=True)
40 def test_only_admin_site(self):
41 request = MagicMock()
42 request.path = "/test/"
43 self.assertTrue(AxesProxyHandler.is_allowed(self.request))
44
45 def test_is_admin_site(self):
46 request = MagicMock()
47 tests = ( # (AXES_ONLY_ADMIN_SITE, URL, Expected)
48 (True, "/test/", True),
49 (True, reverse("admin:index"), False),
50 (False, "/test/", False),
51 (False, reverse("admin:index"), False),
52 )
53
54 for setting_value, url, expected in tests:
55 with override_settings(AXES_ONLY_ADMIN_SITE=setting_value):
56 request.path = url
57 self.assertEqual(AxesProxyHandler().is_admin_site(request), expected)
58
59 @override_settings(ROOT_URLCONF="tests.urls_empty")
60 @override_settings(AXES_ONLY_ADMIN_SITE=True)
61 def test_is_admin_site_no_admin_site(self):
62 request = MagicMock()
63 request.path = "/admin/"
64 self.assertTrue(AxesProxyHandler().is_admin_site(self.request))
65
66
67 class AxesProxyHandlerTestCase(AxesTestCase):
68 def setUp(self):
69 self.sender = MagicMock()
70 self.credentials = MagicMock()
71 self.request = MagicMock()
72 self.user = MagicMock()
73 self.instance = MagicMock()
74
75 @patch("axes.handlers.proxy.AxesProxyHandler.implementation", None)
76 def test_setting_changed_signal_triggers_handler_reimport(self):
77 self.assertIsNone(AxesProxyHandler.implementation)
78
79 with self.settings(AXES_HANDLER="axes.handlers.database.AxesDatabaseHandler"):
80 self.assertIsNotNone(AxesProxyHandler.implementation)
81
82 @patch("axes.handlers.proxy.AxesProxyHandler.implementation")
83 def test_user_login_failed(self, handler):
84 self.assertFalse(handler.user_login_failed.called)
85 AxesProxyHandler.user_login_failed(self.sender, self.credentials, self.request)
86 self.assertTrue(handler.user_login_failed.called)
87
88 @patch("axes.handlers.proxy.AxesProxyHandler.implementation")
89 def test_user_logged_in(self, handler):
90 self.assertFalse(handler.user_logged_in.called)
91 AxesProxyHandler.user_logged_in(self.sender, self.request, self.user)
92 self.assertTrue(handler.user_logged_in.called)
93
94 @patch("axes.handlers.proxy.AxesProxyHandler.implementation")
95 def test_user_logged_out(self, handler):
96 self.assertFalse(handler.user_logged_out.called)
97 AxesProxyHandler.user_logged_out(self.sender, self.request, self.user)
98 self.assertTrue(handler.user_logged_out.called)
99
100 @patch("axes.handlers.proxy.AxesProxyHandler.implementation")
101 def test_post_save_access_attempt(self, handler):
102 self.assertFalse(handler.post_save_access_attempt.called)
103 AxesProxyHandler.post_save_access_attempt(self.instance)
104 self.assertTrue(handler.post_save_access_attempt.called)
105
106 @patch("axes.handlers.proxy.AxesProxyHandler.implementation")
107 def test_post_delete_access_attempt(self, handler):
108 self.assertFalse(handler.post_delete_access_attempt.called)
109 AxesProxyHandler.post_delete_access_attempt(self.instance)
110 self.assertTrue(handler.post_delete_access_attempt.called)
111
112
113 class AxesHandlerBaseTestCase(AxesTestCase):
114 def check_whitelist(self, log):
115 with override_settings(
116 AXES_NEVER_LOCKOUT_WHITELIST=True, AXES_IP_WHITELIST=[self.ip_address]
117 ):
118 AxesProxyHandler.user_login_failed(
119 sender=None, request=self.request, credentials=self.credentials
120 )
121 client_str = get_client_str(
122 self.username,
123 self.ip_address,
124 self.user_agent,
125 self.path_info,
126 self.request,
127 )
128 log.info.assert_called_with(
129 "AXES: Login failed from whitelisted client %s.", client_str
130 )
131
132 def check_empty_request(self, log, handler):
133 AxesProxyHandler.user_login_failed(sender=None, credentials={}, request=None)
134 log.error.assert_called_with(
135 f"AXES: {handler}.user_login_failed does not function without a request."
136 )
137
138
139 @override_settings(AXES_HANDLER="axes.handlers.database.AxesDatabaseHandler")
140 class ResetAttemptsTestCase(AxesHandlerBaseTestCase):
141 """Resetting attempts is currently implemented only for database handler"""
142
143 USERNAME_1 = "foo_username"
144 USERNAME_2 = "bar_username"
145 IP_1 = "127.1.0.1"
146 IP_2 = "127.1.0.2"
147
148 def setUp(self):
149 super().setUp()
150 self.create_attempt()
151 self.create_attempt(username=self.USERNAME_1, ip_address=self.IP_1)
152 self.create_attempt(username=self.USERNAME_1, ip_address=self.IP_2)
153 self.create_attempt(username=self.USERNAME_2, ip_address=self.IP_1)
154 self.create_attempt(username=self.USERNAME_2, ip_address=self.IP_2)
155
156 def test_handler_reset_attempts(self):
157 self.assertEqual(5, AxesProxyHandler.reset_attempts())
158 self.assertFalse(AccessAttempt.objects.count())
159
160 def test_handler_reset_attempts_username(self):
161 self.assertEqual(2, AxesProxyHandler.reset_attempts(username=self.USERNAME_1))
162 self.assertEqual(AccessAttempt.objects.count(), 3)
163 self.assertEqual(
164 AccessAttempt.objects.filter(ip_address=self.USERNAME_1).count(), 0
165 )
166
167 def test_handler_reset_attempts_ip(self):
168 self.assertEqual(2, AxesProxyHandler.reset_attempts(ip_address=self.IP_1))
169 self.assertEqual(AccessAttempt.objects.count(), 3)
170 self.assertEqual(AccessAttempt.objects.filter(ip_address=self.IP_1).count(), 0)
171
172 def test_handler_reset_attempts_ip_and_username(self):
173 self.assertEqual(
174 1,
175 AxesProxyHandler.reset_attempts(
176 ip_address=self.IP_1, username=self.USERNAME_1
177 ),
178 )
179 self.assertEqual(AccessAttempt.objects.count(), 4)
180 self.assertEqual(AccessAttempt.objects.filter(ip_address=self.IP_1).count(), 1)
181
182 self.create_attempt(username=self.USERNAME_1, ip_address=self.IP_1)
183 self.assertEqual(
184 1,
185 AxesProxyHandler.reset_attempts(
186 ip_address=self.IP_1, username=self.USERNAME_2
187 ),
188 )
189 self.assertEqual(
190 1,
191 AxesProxyHandler.reset_attempts(
192 ip_address=self.IP_2, username=self.USERNAME_1
193 ),
194 )
195
196 def test_handler_reset_attempts_ip_or_username(self):
197 self.assertEqual(
198 3,
199 AxesProxyHandler.reset_attempts(
200 ip_address=self.IP_1, username=self.USERNAME_1, ip_or_username=True
201 ),
202 )
203 self.assertEqual(AccessAttempt.objects.count(), 2)
204 self.assertEqual(AccessAttempt.objects.filter(ip_address=self.IP_1).count(), 0)
205 self.assertEqual(
206 AccessAttempt.objects.filter(ip_address=self.USERNAME_1).count(), 0
207 )
208
209
210 @override_settings(
211 AXES_HANDLER="axes.handlers.database.AxesDatabaseHandler",
212 AXES_COOLOFF_TIME=timedelta(seconds=2),
213 AXES_RESET_ON_SUCCESS=True,
214 )
215 @mark.xfail(
216 python_implementation() == "PyPy",
217 reason="PyPy implementation is flaky for this test",
218 strict=False,
219 )
220 class AxesDatabaseHandlerTestCase(AxesHandlerBaseTestCase):
221 def test_handler_reset_attempts(self):
222 self.create_attempt()
223 self.assertEqual(1, AxesProxyHandler.reset_attempts())
224 self.assertFalse(AccessAttempt.objects.count())
225
226 def test_handler_reset_logs(self):
227 self.create_log()
228 self.assertEqual(1, AxesProxyHandler.reset_logs())
229 self.assertFalse(AccessLog.objects.count())
230
231 def test_handler_reset_logs_older_than_42_days(self):
232 self.create_log()
233
234 then = timezone.now() - timezone.timedelta(days=90)
235 with patch("django.utils.timezone.now", return_value=then):
236 self.create_log()
237
238 self.assertEqual(AccessLog.objects.count(), 2)
239 self.assertEqual(1, AxesProxyHandler.reset_logs(age_days=42))
240 self.assertEqual(AccessLog.objects.count(), 1)
241
242 @override_settings(AXES_RESET_ON_SUCCESS=True)
243 def test_handler(self):
244 self.check_handler()
245
246 @override_settings(AXES_RESET_ON_SUCCESS=False)
247 def test_handler_without_reset(self):
248 self.check_handler()
249
250 @override_settings(AXES_FAILURE_LIMIT=lambda *args: 3)
251 def test_handler_callable_failure_limit(self):
252 self.check_handler()
253
254 @override_settings(AXES_FAILURE_LIMIT="tests.base.custom_failure_limit")
255 def test_handler_str_failure_limit(self):
256 self.check_handler()
257
258 @override_settings(AXES_FAILURE_LIMIT=None)
259 def test_handler_invalid_failure_limit(self):
260 with self.assertRaises(TypeError):
261 self.check_handler()
262
263 @override_settings(AXES_LOCK_OUT_AT_FAILURE=False)
264 def test_handler_without_lockout(self):
265 self.check_handler()
266
267 @patch("axes.handlers.database.log")
268 def test_empty_request(self, log):
269 self.check_empty_request(log, "AxesDatabaseHandler")
270
271 @patch("axes.handlers.database.log")
272 def test_whitelist(self, log):
273 self.check_whitelist(log)
274
275 @override_settings(AXES_ONLY_USER_FAILURES=True)
276 @patch("axes.handlers.database.log")
277 def test_user_login_failed_only_user_failures_with_none_username(self, log):
278 credentials = {"username": None, "password": "test"}
279 sender = MagicMock()
280 AxesProxyHandler.user_login_failed(sender, credentials, self.request)
281 attempt = AccessAttempt.objects.all()
282 self.assertEqual(0, AccessAttempt.objects.count())
283 log.warning.assert_called_with(
284 "AXES: Username is None and AXES_ONLY_USER_FAILURES is enabled, new record will NOT be created."
285 )
286
287 def test_user_login_failed_with_none_username(self):
288 credentials = {"username": None, "password": "test"}
289 sender = MagicMock()
290 AxesProxyHandler.user_login_failed(sender, credentials, self.request)
291 attempt = AccessAttempt.objects.all()
292 self.assertEqual(1, AccessAttempt.objects.filter(username__isnull=True).count())
293
294 def test_user_login_failed_multiple_username(self):
295 configurations = (
296 (2, 1, {}, ["admin", "admin1"]),
297 (2, 1, {"AXES_USE_USER_AGENT": True}, ["admin", "admin1"]),
298 (2, 1, {"AXES_ONLY_USER_FAILURES": True}, ["admin", "admin1"]),
299 (
300 2,
301 1,
302 {"AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP": True},
303 ["admin", "admin1"],
304 ),
305 (
306 1,
307 2,
308 {"AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP": True},
309 ["admin", "admin"],
310 ),
311 (1, 2, {"AXES_LOCK_OUT_BY_USER_OR_IP": True}, ["admin", "admin"]),
312 (2, 1, {"AXES_LOCK_OUT_BY_USER_OR_IP": True}, ["admin", "admin1"]),
313 )
314
315 for (
316 total_attempts_count,
317 failures_since_start,
318 overrides,
319 usernames,
320 ) in configurations:
321 with self.settings(**overrides):
322 with self.subTest(
323 total_attempts_count=total_attempts_count,
324 failures_since_start=failures_since_start,
325 settings=overrides,
326 ):
327 self.login(username=usernames[0])
328 attempt = AccessAttempt.objects.get(username=usernames[0])
329 self.assertEqual(1, attempt.failures_since_start)
330
331 # check the number of failures associated to the attempt
332 self.login(username=usernames[1])
333 attempt = AccessAttempt.objects.get(username=usernames[1])
334 self.assertEqual(failures_since_start, attempt.failures_since_start)
335
336 # check the number of distinct attempts
337 self.assertEqual(
338 total_attempts_count, AccessAttempt.objects.count()
339 )
340
341 AccessAttempt.objects.all().delete()
342
343
344 @override_settings(AXES_HANDLER="axes.handlers.cache.AxesCacheHandler")
345 class ResetAttemptsCacheHandlerTestCase(AxesHandlerBaseTestCase):
346 """Test reset attempts for the cache handler"""
347
348 USERNAME_1 = "foo_username"
349 USERNAME_2 = "bar_username"
350 IP_1 = "127.1.0.1"
351 IP_2 = "127.1.0.2"
352
353 def set_up_login_attemtps(self):
354 """Set up the login attempts."""
355 self.login(username=self.USERNAME_1, remote_addr=self.IP_1)
356 self.login(username=self.USERNAME_1, remote_addr=self.IP_2)
357 self.login(username=self.USERNAME_2, remote_addr=self.IP_1)
358 self.login(username=self.USERNAME_2, remote_addr=self.IP_2)
359
360 def check_failures(self, failures, username=None, ip_address=None):
361 if ip_address is None and username is None:
362 raise NotImplementedError("Must supply ip_address or username")
363 try:
364 prev_ip = self.request.META["REMOTE_ADDR"]
365 credentials = {"username": username} if username else {}
366 if ip_address is not None:
367 self.request.META["REMOTE_ADDR"] = ip_address
368 self.assertEqual(
369 failures,
370 AxesProxyHandler.get_failures(self.request, credentials=credentials),
371 )
372 finally:
373 self.request.META["REMOTE_ADDR"] = prev_ip
374
375 def test_handler_reset_attempts(self):
376 with self.assertRaises(NotImplementedError):
377 AxesProxyHandler.reset_attempts()
378
379 @override_settings(AXES_ONLY_USER_FAILURES=True)
380 def test_handler_reset_attempts_username(self):
381 self.set_up_login_attemtps()
382 self.assertEqual(
383 2,
384 AxesProxyHandler.get_failures(
385 self.request, credentials={"username": self.USERNAME_1}
386 ),
387 )
388 self.assertEqual(
389 2,
390 AxesProxyHandler.get_failures(
391 self.request, credentials={"username": self.USERNAME_2}
392 ),
393 )
394 self.assertEqual(1, AxesProxyHandler.reset_attempts(username=self.USERNAME_1))
395 self.assertEqual(
396 0,
397 AxesProxyHandler.get_failures(
398 self.request, credentials={"username": self.USERNAME_1}
399 ),
400 )
401 self.assertEqual(
402 2,
403 AxesProxyHandler.get_failures(
404 self.request, credentials={"username": self.USERNAME_2}
405 ),
406 )
407
408 def test_handler_reset_attempts_ip(self):
409 self.set_up_login_attemtps()
410 self.check_failures(2, ip_address=self.IP_1)
411 self.assertEqual(1, AxesProxyHandler.reset_attempts(ip_address=self.IP_1))
412 self.check_failures(0, ip_address=self.IP_1)
413 self.check_failures(2, ip_address=self.IP_2)
414
415 @override_settings(AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP=True)
416 def test_handler_reset_attempts_ip_and_username(self):
417 self.set_up_login_attemtps()
418 self.check_failures(1, username=self.USERNAME_1, ip_address=self.IP_1)
419 self.check_failures(1, username=self.USERNAME_2, ip_address=self.IP_1)
420 self.check_failures(1, username=self.USERNAME_1, ip_address=self.IP_2)
421 self.assertEqual(
422 1,
423 AxesProxyHandler.reset_attempts(
424 ip_address=self.IP_1, username=self.USERNAME_1
425 ),
426 )
427 self.check_failures(0, username=self.USERNAME_1, ip_address=self.IP_1)
428 self.check_failures(1, username=self.USERNAME_2, ip_address=self.IP_1)
429 self.check_failures(1, username=self.USERNAME_1, ip_address=self.IP_2)
430
431 def test_handler_reset_attempts_ip_or_username(self):
432 with self.assertRaises(NotImplementedError):
433 AxesProxyHandler.reset_attempts()
434
435
436 @override_settings(
437 AXES_HANDLER="axes.handlers.cache.AxesCacheHandler",
438 AXES_COOLOFF_TIME=timedelta(seconds=1),
439 )
440 class AxesCacheHandlerTestCase(AxesHandlerBaseTestCase):
441 @override_settings(AXES_RESET_ON_SUCCESS=True)
442 def test_handler(self):
443 self.check_handler()
444
445 @override_settings(AXES_RESET_ON_SUCCESS=False)
446 def test_handler_without_reset(self):
447 self.check_handler()
448
449 @override_settings(AXES_LOCK_OUT_AT_FAILURE=False)
450 def test_handler_without_lockout(self):
451 self.check_handler()
452
453 @patch("axes.handlers.cache.log")
454 def test_empty_request(self, log):
455 self.check_empty_request(log, "AxesCacheHandler")
456
457 @patch("axes.handlers.cache.log")
458 def test_whitelist(self, log):
459 self.check_whitelist(log)
460
461 @override_settings(AXES_ONLY_USER_FAILURES=True)
462 @patch.object(cache, "set")
463 @patch("axes.handlers.cache.log")
464 def test_user_login_failed_only_user_failures_with_none_username(
465 self, log, cache_set
466 ):
467 credentials = {"username": None, "password": "test"}
468 sender = MagicMock()
469 AxesProxyHandler.user_login_failed(sender, credentials, self.request)
470 self.assertFalse(cache_set.called)
471 log.warning.assert_called_with(
472 "AXES: Username is None and AXES_ONLY_USER_FAILURES is enabled, new record will NOT be created."
473 )
474
475 @patch.object(cache, "set")
476 def test_user_login_failed_with_none_username(self, cache_set):
477 credentials = {"username": None, "password": "test"}
478 sender = MagicMock()
479 AxesProxyHandler.user_login_failed(sender, credentials, self.request)
480 self.assertTrue(cache_set.called)
481
482
483 @override_settings(AXES_HANDLER="axes.handlers.dummy.AxesDummyHandler")
484 class AxesDummyHandlerTestCase(AxesHandlerBaseTestCase):
485 def test_handler(self):
486 for _ in range(settings.AXES_FAILURE_LIMIT):
487 self.login()
488
489 self.check_login()
490
491 def test_handler_is_allowed(self):
492 self.assertEqual(True, AxesProxyHandler.is_allowed(self.request, {}))
493
494 def test_handler_get_failures(self):
495 self.assertEqual(0, AxesProxyHandler.get_failures(self.request, {}))
496
497
498 @override_settings(AXES_HANDLER="axes.handlers.test.AxesTestHandler")
499 class AxesTestHandlerTestCase(AxesHandlerBaseTestCase):
500 def test_handler_reset_attempts(self):
501 self.assertEqual(0, AxesProxyHandler.reset_attempts())
502
503 def test_handler_reset_logs(self):
504 self.assertEqual(0, AxesProxyHandler.reset_logs())
505
506 def test_handler_is_allowed(self):
507 self.assertEqual(True, AxesProxyHandler.is_allowed(self.request, {}))
508
509 def test_handler_get_failures(self):
510 self.assertEqual(0, AxesProxyHandler.get_failures(self.request, {}))
0 from datetime import timedelta
1 from hashlib import md5
2 from unittest.mock import patch
3
4 from django.contrib.auth import get_user_model
5 from django.http import JsonResponse, HttpResponseRedirect, HttpResponse, HttpRequest
6 from django.test import override_settings, RequestFactory
7
8 from axes.apps import AppConfig
9 from axes.helpers import (
10 get_cache_timeout,
11 get_client_str,
12 get_client_username,
13 get_client_cache_key,
14 get_client_parameters,
15 get_cool_off,
16 get_cool_off_iso8601,
17 get_lockout_response,
18 is_client_ip_address_blacklisted,
19 is_client_ip_address_whitelisted,
20 is_client_method_whitelisted,
21 is_ip_address_in_blacklist,
22 is_ip_address_in_whitelist,
23 is_user_attempt_whitelisted,
24 toggleable,
25 cleanse_parameters,
26 )
27 from axes.models import AccessAttempt
28 from tests.base import AxesTestCase
29
30
31 @override_settings(AXES_ENABLED=False)
32 class AxesDisabledTestCase(AxesTestCase):
33 def test_initialize(self):
34 AppConfig.logging_initialized = False
35 AppConfig.initialize()
36 self.assertFalse(AppConfig.logging_initialized)
37
38 def test_toggleable(self):
39 def is_true():
40 return True
41
42 self.assertTrue(is_true())
43 self.assertIsNone(toggleable(is_true)())
44
45
46 class CacheTestCase(AxesTestCase):
47 @override_settings(AXES_COOLOFF_TIME=3) # hours
48 def test_get_cache_timeout_integer(self):
49 timeout_seconds = float(60 * 60 * 3)
50 self.assertEqual(get_cache_timeout(), timeout_seconds)
51
52 @override_settings(AXES_COOLOFF_TIME=timedelta(seconds=420))
53 def test_get_cache_timeout_timedelta(self):
54 self.assertEqual(get_cache_timeout(), 420)
55
56 @override_settings(AXES_COOLOFF_TIME=None)
57 def test_get_cache_timeout_none(self):
58 self.assertEqual(get_cache_timeout(), None)
59
60
61 class TimestampTestCase(AxesTestCase):
62 def test_iso8601(self):
63 """
64 Test get_cool_off_iso8601 correctly translates datetime.timedelta to ISO 8601 formatted duration.
65 """
66
67 expected = {
68 timedelta(days=1, hours=25, minutes=42, seconds=8): "P2DT1H42M8S",
69 timedelta(days=7, seconds=342): "P7DT5M42S",
70 timedelta(days=0, hours=2, minutes=42): "PT2H42M",
71 timedelta(hours=20, seconds=42): "PT20H42S",
72 timedelta(seconds=300): "PT5M",
73 timedelta(seconds=9005): "PT2H30M5S",
74 timedelta(minutes=9005): "P6DT6H5M",
75 timedelta(days=15): "P15D",
76 }
77
78 for delta, iso_duration in expected.items():
79 with self.subTest(iso_duration):
80 self.assertEqual(get_cool_off_iso8601(delta), iso_duration)
81
82
83 class ClientStringTestCase(AxesTestCase):
84 @staticmethod
85 def get_expected_client_str(*args, **kwargs):
86 client_str_template = '{{username: "{0}", ip_address: "{1}", user_agent: "{2}", path_info: "{3}"}}'
87 return client_str_template.format(*args, **kwargs)
88
89 @override_settings(AXES_VERBOSE=True)
90 def test_verbose_ip_only_client_details(self):
91 username = "test@example.com"
92 ip_address = "127.0.0.1"
93 user_agent = "Googlebot/2.1 (+http://www.googlebot.com/bot.html)"
94 path_info = "/admin/"
95
96 expected = self.get_expected_client_str(
97 username, ip_address, user_agent, path_info, self.request
98 )
99 actual = get_client_str(
100 username, ip_address, user_agent, path_info, self.request
101 )
102
103 self.assertEqual(expected, actual)
104
105 @override_settings(AXES_VERBOSE=True)
106 def test_imbalanced_quotes(self):
107 username = "butterfly.. },,,"
108 ip_address = "127.0.0.1"
109 user_agent = "Googlebot/2.1 (+http://www.googlebot.com/bot.html)"
110 path_info = "/admin/"
111
112 expected = self.get_expected_client_str(
113 username, ip_address, user_agent, path_info, self.request
114 )
115 actual = get_client_str(
116 username, ip_address, user_agent, path_info, self.request
117 )
118
119 self.assertEqual(expected, actual)
120
121 @override_settings(AXES_VERBOSE=True)
122 def test_verbose_ip_only_client_details_tuple(self):
123 username = "test@example.com"
124 ip_address = "127.0.0.1"
125 user_agent = "Googlebot/2.1 (+http://www.googlebot.com/bot.html)"
126 path_info = ("admin", "login")
127
128 expected = self.get_expected_client_str(
129 username, ip_address, user_agent, path_info[0], self.request
130 )
131 actual = get_client_str(
132 username, ip_address, user_agent, path_info, self.request
133 )
134
135 self.assertEqual(expected, actual)
136
137 @override_settings(AXES_VERBOSE=False)
138 def test_non_verbose_ip_only_client_details(self):
139 username = "test@example.com"
140 ip_address = "127.0.0.1"
141 user_agent = "Googlebot/2.1 (+http://www.googlebot.com/bot.html)"
142 path_info = "/admin/"
143
144 expected = '{ip_address: "127.0.0.1", path_info: "/admin/"}'
145 actual = get_client_str(
146 username, ip_address, user_agent, path_info, self.request
147 )
148
149 self.assertEqual(expected, actual)
150
151 @override_settings(AXES_ONLY_USER_FAILURES=True)
152 @override_settings(AXES_VERBOSE=True)
153 def test_verbose_user_only_client_details(self):
154 username = "test@example.com"
155 ip_address = "127.0.0.1"
156 user_agent = "Googlebot/2.1 (+http://www.googlebot.com/bot.html)"
157 path_info = "/admin/"
158
159 expected = self.get_expected_client_str(
160 username, ip_address, user_agent, path_info, self.request
161 )
162 actual = get_client_str(
163 username, ip_address, user_agent, path_info, self.request
164 )
165
166 self.assertEqual(expected, actual)
167
168 @override_settings(AXES_ONLY_USER_FAILURES=True)
169 @override_settings(AXES_VERBOSE=False)
170 def test_non_verbose_user_only_client_details(self):
171 username = "test@example.com"
172 ip_address = "127.0.0.1"
173 user_agent = "Googlebot/2.1 (+http://www.googlebot.com/bot.html)"
174 path_info = "/admin/"
175
176 expected = '{username: "test@example.com", path_info: "/admin/"}'
177 actual = get_client_str(
178 username, ip_address, user_agent, path_info, self.request
179 )
180
181 self.assertEqual(expected, actual)
182
183 @override_settings(AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP=True)
184 @override_settings(AXES_VERBOSE=True)
185 def test_verbose_user_ip_combo_client_details(self):
186 username = "test@example.com"
187 ip_address = "127.0.0.1"
188 user_agent = "Googlebot/2.1 (+http://www.googlebot.com/bot.html)"
189 path_info = "/admin/"
190
191 expected = self.get_expected_client_str(
192 username, ip_address, user_agent, path_info, self.request
193 )
194 actual = get_client_str(
195 username, ip_address, user_agent, path_info, self.request
196 )
197
198 self.assertEqual(expected, actual)
199
200 @override_settings(AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP=True)
201 @override_settings(AXES_VERBOSE=False)
202 def test_non_verbose_user_ip_combo_client_details(self):
203 username = "test@example.com"
204 ip_address = "127.0.0.1"
205 user_agent = "Googlebot/2.1 (+http://www.googlebot.com/bot.html)"
206 path_info = "/admin/"
207
208 expected = '{username: "test@example.com", ip_address: "127.0.0.1", path_info: "/admin/"}'
209 actual = get_client_str(
210 username, ip_address, user_agent, path_info, self.request
211 )
212
213 self.assertEqual(expected, actual)
214
215 @override_settings(AXES_USE_USER_AGENT=True)
216 @override_settings(AXES_VERBOSE=True)
217 def test_verbose_user_agent_client_details(self):
218 username = "test@example.com"
219 ip_address = "127.0.0.1"
220 user_agent = "Googlebot/2.1 (+http://www.googlebot.com/bot.html)"
221 path_info = "/admin/"
222
223 expected = self.get_expected_client_str(
224 username, ip_address, user_agent, path_info, self.request
225 )
226 actual = get_client_str(
227 username, ip_address, user_agent, path_info, self.request
228 )
229
230 self.assertEqual(expected, actual)
231
232 @override_settings(AXES_USE_USER_AGENT=True)
233 @override_settings(AXES_VERBOSE=False)
234 def test_non_verbose_user_agent_client_details(self):
235 username = "test@example.com"
236 ip_address = "127.0.0.1"
237 user_agent = "Googlebot/2.1 (+http://www.googlebot.com/bot.html)"
238 path_info = "/admin/"
239
240 expected = '{ip_address: "127.0.0.1", user_agent: "Googlebot/2.1 (+http://www.googlebot.com/bot.html)", path_info: "/admin/"}'
241 actual = get_client_str(
242 username, ip_address, user_agent, path_info, self.request
243 )
244
245 self.assertEqual(expected, actual)
246
247 @override_settings(
248 AXES_CLIENT_STR_CALLABLE="tests.test_helpers.get_dummy_client_str"
249 )
250 def test_get_client_str_callable(self):
251 self.assertEqual(
252 get_client_str(
253 "username", "ip_address", "user_agent", "path_info", self.request
254 ),
255 "client string",
256 )
257
258 @override_settings(
259 AXES_CLIENT_STR_CALLABLE="tests.test_helpers.get_dummy_client_str_using_request"
260 )
261 def test_get_client_str_callable(self):
262 self.request.user = self.user
263 self.assertEqual(
264 get_client_str(
265 "username", "ip_address", "user_agent", "path_info", self.request
266 ),
267 self.email,
268 )
269
270
271 def get_dummy_client_str(username, ip_address, user_agent, path_info):
272 return "client string"
273
274
275 def get_dummy_client_str_using_request(
276 username, ip_address, user_agent, path_info, request
277 ):
278 return f"{request.user.email}"
279
280
281 class ClientParametersTestCase(AxesTestCase):
282 @override_settings(AXES_ONLY_USER_FAILURES=True)
283 def test_get_filter_kwargs_user(self):
284 self.assertEqual(
285 get_client_parameters(self.username, self.ip_address, self.user_agent),
286 [{"username": self.username}],
287 )
288
289 @override_settings(
290 AXES_ONLY_USER_FAILURES=False,
291 AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP=False,
292 AXES_USE_USER_AGENT=False,
293 )
294 def test_get_filter_kwargs_ip(self):
295 self.assertEqual(
296 get_client_parameters(self.username, self.ip_address, self.user_agent),
297 [{"ip_address": self.ip_address}],
298 )
299
300 @override_settings(
301 AXES_ONLY_USER_FAILURES=False,
302 AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP=True,
303 AXES_USE_USER_AGENT=False,
304 )
305 def test_get_filter_kwargs_user_and_ip(self):
306 self.assertEqual(
307 get_client_parameters(self.username, self.ip_address, self.user_agent),
308 [{"username": self.username, "ip_address": self.ip_address}],
309 )
310
311 @override_settings(
312 AXES_ONLY_USER_FAILURES=False,
313 AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP=False,
314 AXES_LOCK_OUT_BY_USER_OR_IP=True,
315 AXES_USE_USER_AGENT=False,
316 )
317 def test_get_filter_kwargs_user_or_ip(self):
318 self.assertEqual(
319 get_client_parameters(self.username, self.ip_address, self.user_agent),
320 [{"username": self.username}, {"ip_address": self.ip_address}],
321 )
322
323 @override_settings(
324 AXES_ONLY_USER_FAILURES=False,
325 AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP=False,
326 AXES_USE_USER_AGENT=True,
327 )
328 def test_get_filter_kwargs_ip_and_agent(self):
329 self.assertEqual(
330 get_client_parameters(self.username, self.ip_address, self.user_agent),
331 [{"ip_address": self.ip_address}, {"user_agent": self.user_agent}],
332 )
333
334 @override_settings(
335 AXES_ONLY_USER_FAILURES=False,
336 AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP=True,
337 AXES_USE_USER_AGENT=True,
338 )
339 def test_get_filter_kwargs_user_ip_agent(self):
340 self.assertEqual(
341 get_client_parameters(self.username, self.ip_address, self.user_agent),
342 [
343 {"username": self.username, "ip_address": self.ip_address},
344 {"user_agent": self.user_agent},
345 ],
346 )
347
348
349 class ClientCacheKeyTestCase(AxesTestCase):
350 def test_get_cache_key(self):
351 """
352 Test the cache key format.
353 """
354
355 cache_hash_digest = md5(self.ip_address.encode()).hexdigest()
356 cache_hash_key = f"axes-{cache_hash_digest}"
357
358 # Getting cache key from request
359 request_factory = RequestFactory()
360 request = request_factory.post(
361 "/admin/login/", data={"username": self.username, "password": "test"}
362 )
363
364 self.assertEqual([cache_hash_key], get_client_cache_key(request))
365
366 # Getting cache key from AccessAttempt Object
367 attempt = AccessAttempt(
368 user_agent="<unknown>",
369 ip_address=self.ip_address,
370 username=self.username,
371 get_data="",
372 post_data="",
373 http_accept=request.META.get("HTTP_ACCEPT", "<unknown>"),
374 path_info=request.META.get("PATH_INFO", "<unknown>"),
375 failures_since_start=0,
376 )
377
378 self.assertEqual([cache_hash_key], get_client_cache_key(attempt))
379
380 def test_get_cache_key_empty_ip_address(self):
381 """
382 Simulate an empty IP address in the request.
383 """
384
385 empty_ip_address = ""
386
387 cache_hash_digest = md5(empty_ip_address.encode()).hexdigest()
388 cache_hash_key = f"axes-{cache_hash_digest}"
389
390 # Getting cache key from request
391 request_factory = RequestFactory()
392 request = request_factory.post(
393 "/admin/login/",
394 data={"username": self.username, "password": "test"},
395 REMOTE_ADDR=empty_ip_address,
396 )
397
398 self.assertEqual([cache_hash_key], get_client_cache_key(request))
399
400 # Getting cache key from AccessAttempt Object
401 attempt = AccessAttempt(
402 user_agent="<unknown>",
403 ip_address=empty_ip_address,
404 username=self.username,
405 get_data="",
406 post_data="",
407 http_accept=request.META.get("HTTP_ACCEPT", "<unknown>"),
408 path_info=request.META.get("PATH_INFO", "<unknown>"),
409 failures_since_start=0,
410 )
411
412 self.assertEqual([cache_hash_key], get_client_cache_key(attempt))
413
414 def test_get_cache_key_credentials(self):
415 """
416 Test the cache key format.
417 """
418
419 ip_address = self.ip_address
420 cache_hash_digest = md5(ip_address.encode()).hexdigest()
421 cache_hash_key = f"axes-{cache_hash_digest}"
422
423 # Getting cache key from request
424 request_factory = RequestFactory()
425 request = request_factory.post(
426 "/admin/login/", data={"username": self.username, "password": "test"}
427 )
428
429 # Difference between the upper test: new call signature with credentials
430 credentials = {"username": self.username}
431
432 self.assertEqual([cache_hash_key], get_client_cache_key(request, credentials))
433
434 # Getting cache key from AccessAttempt Object
435 attempt = AccessAttempt(
436 user_agent="<unknown>",
437 ip_address=ip_address,
438 username=self.username,
439 get_data="",
440 post_data="",
441 http_accept=request.META.get("HTTP_ACCEPT", "<unknown>"),
442 path_info=request.META.get("PATH_INFO", "<unknown>"),
443 failures_since_start=0,
444 )
445 self.assertEqual([cache_hash_key], get_client_cache_key(attempt))
446
447
448 class UsernameTestCase(AxesTestCase):
449 @override_settings(AXES_USERNAME_FORM_FIELD="username")
450 def test_default_get_client_username(self):
451 expected = "test-username"
452
453 request = HttpRequest()
454 request.POST["username"] = expected
455
456 actual = get_client_username(request)
457
458 self.assertEqual(expected, actual)
459
460 def test_default_get_client_username_drf(self):
461 class DRFRequest:
462 def __init__(self):
463 self.data = {}
464 self.POST = {}
465
466 expected = "test-username"
467
468 request = DRFRequest()
469 request.data["username"] = expected
470
471 actual = get_client_username(request)
472
473 self.assertEqual(expected, actual)
474
475 @override_settings(AXES_USERNAME_FORM_FIELD="username")
476 def test_default_get_client_username_credentials(self):
477 expected = "test-username"
478 expected_in_credentials = "test-credentials-username"
479
480 request = HttpRequest()
481 request.POST["username"] = expected
482 credentials = {"username": expected_in_credentials}
483
484 actual = get_client_username(request, credentials)
485
486 self.assertEqual(expected_in_credentials, actual)
487
488 def sample_customize_username(request, credentials):
489 return "prefixed-" + request.POST.get("username")
490
491 @override_settings(AXES_USERNAME_FORM_FIELD="username")
492 @override_settings(AXES_USERNAME_CALLABLE=sample_customize_username)
493 def test_custom_get_client_username_from_request(self):
494 provided = "test-username"
495 expected = "prefixed-" + provided
496 provided_in_credentials = "test-credentials-username"
497
498 request = HttpRequest()
499 request.POST["username"] = provided
500 credentials = {"username": provided_in_credentials}
501
502 actual = get_client_username(request, credentials)
503
504 self.assertEqual(expected, actual)
505
506 def sample_customize_username_credentials(request, credentials):
507 return "prefixed-" + credentials.get("username")
508
509 @override_settings(AXES_USERNAME_FORM_FIELD="username")
510 @override_settings(AXES_USERNAME_CALLABLE=sample_customize_username_credentials)
511 def test_custom_get_client_username_from_credentials(self):
512 provided = "test-username"
513 provided_in_credentials = "test-credentials-username"
514 expected_in_credentials = "prefixed-" + provided_in_credentials
515
516 request = HttpRequest()
517 request.POST["username"] = provided
518 credentials = {"username": provided_in_credentials}
519
520 actual = get_client_username(request, credentials)
521
522 self.assertEqual(expected_in_credentials, actual)
523
524 @override_settings(
525 AXES_USERNAME_CALLABLE=lambda request, credentials: "example"
526 ) # pragma: no cover
527 def test_get_client_username(self):
528 self.assertEqual(get_client_username(HttpRequest(), {}), "example")
529
530 @override_settings(AXES_USERNAME_CALLABLE=lambda request: None) # pragma: no cover
531 def test_get_client_username_invalid_callable_too_few_arguments(self):
532 with self.assertRaises(TypeError):
533 get_client_username(HttpRequest(), {})
534
535 @override_settings(
536 AXES_USERNAME_CALLABLE=lambda request, credentials, extra: None
537 ) # pragma: no cover
538 def test_get_client_username_invalid_callable_too_many_arguments(self):
539 with self.assertRaises(TypeError):
540 get_client_username(HttpRequest(), {})
541
542 @override_settings(AXES_USERNAME_CALLABLE=True)
543 def test_get_client_username_not_callable(self):
544 with self.assertRaises(TypeError):
545 get_client_username(HttpRequest(), {})
546
547 @override_settings(AXES_USERNAME_CALLABLE="tests.test_helpers.get_username")
548 def test_get_client_username_str(self):
549 self.assertEqual(get_client_username(HttpRequest(), {}), "username")
550
551
552 def get_username(request, credentials: dict) -> str:
553 return "username"
554
555
556 class IPWhitelistTestCase(AxesTestCase):
557 def setUp(self):
558 self.request = HttpRequest()
559 self.request.method = "POST"
560 self.request.META["REMOTE_ADDR"] = "127.0.0.1"
561 self.request.axes_ip_address = "127.0.0.1"
562
563 @override_settings(AXES_IP_WHITELIST=None)
564 def test_ip_in_whitelist_none(self):
565 self.assertFalse(is_ip_address_in_whitelist("127.0.0.2"))
566
567 @override_settings(AXES_IP_WHITELIST=["127.0.0.1"])
568 def test_ip_in_whitelist(self):
569 self.assertTrue(is_ip_address_in_whitelist("127.0.0.1"))
570 self.assertFalse(is_ip_address_in_whitelist("127.0.0.2"))
571
572 @override_settings(AXES_IP_BLACKLIST=None)
573 def test_ip_in_blacklist_none(self):
574 self.assertFalse(is_ip_address_in_blacklist("127.0.0.2"))
575
576 @override_settings(AXES_IP_BLACKLIST=["127.0.0.1"])
577 def test_ip_in_blacklist(self):
578 self.assertTrue(is_ip_address_in_blacklist("127.0.0.1"))
579 self.assertFalse(is_ip_address_in_blacklist("127.0.0.2"))
580
581 @override_settings(AXES_IP_BLACKLIST=["127.0.0.1"])
582 def test_is_client_ip_address_blacklisted_ip_in_blacklist(self):
583 self.assertTrue(is_client_ip_address_blacklisted(self.request))
584
585 @override_settings(AXES_IP_BLACKLIST=["127.0.0.2"])
586 def test_is_is_client_ip_address_blacklisted_ip_not_in_blacklist(self):
587 self.assertFalse(is_client_ip_address_blacklisted(self.request))
588
589 @override_settings(AXES_NEVER_LOCKOUT_WHITELIST=True)
590 @override_settings(AXES_IP_WHITELIST=["127.0.0.1"])
591 def test_is_client_ip_address_blacklisted_ip_in_whitelist(self):
592 self.assertFalse(is_client_ip_address_blacklisted(self.request))
593
594 @override_settings(AXES_ONLY_WHITELIST=True)
595 @override_settings(AXES_IP_WHITELIST=["127.0.0.2"])
596 def test_is_already_locked_ip_not_in_whitelist(self):
597 self.assertTrue(is_client_ip_address_blacklisted(self.request))
598
599 @override_settings(AXES_NEVER_LOCKOUT_WHITELIST=True)
600 @override_settings(AXES_IP_WHITELIST=["127.0.0.1"])
601 def test_is_client_ip_address_whitelisted_never_lockout(self):
602 self.assertTrue(is_client_ip_address_whitelisted(self.request))
603
604 @override_settings(AXES_ONLY_WHITELIST=True)
605 @override_settings(AXES_IP_WHITELIST=["127.0.0.1"])
606 def test_is_client_ip_address_whitelisted_only_allow(self):
607 self.assertTrue(is_client_ip_address_whitelisted(self.request))
608
609 @override_settings(AXES_ONLY_WHITELIST=True)
610 @override_settings(AXES_IP_WHITELIST=["127.0.0.2"])
611 def test_is_client_ip_address_whitelisted_not(self):
612 self.assertFalse(is_client_ip_address_whitelisted(self.request))
613
614
615 class MethodWhitelistTestCase(AxesTestCase):
616 def setUp(self):
617 self.request = HttpRequest()
618 self.request.method = "GET"
619
620 @override_settings(AXES_NEVER_LOCKOUT_GET=True)
621 def test_is_client_method_whitelisted(self):
622 self.assertTrue(is_client_method_whitelisted(self.request))
623
624 @override_settings(AXES_NEVER_LOCKOUT_GET=False)
625 def test_is_client_method_whitelisted_not(self):
626 self.assertFalse(is_client_method_whitelisted(self.request))
627
628
629 class LockoutResponseTestCase(AxesTestCase):
630 def setUp(self):
631 self.request = HttpRequest()
632
633 @override_settings(AXES_COOLOFF_TIME=42)
634 def test_get_lockout_response_cool_off(self):
635 get_lockout_response(request=self.request)
636
637 @override_settings(AXES_LOCKOUT_TEMPLATE="example.html")
638 @patch("axes.helpers.render")
639 def test_get_lockout_response_lockout_template(self, render):
640 self.assertFalse(render.called)
641 get_lockout_response(request=self.request)
642 self.assertTrue(render.called)
643
644 @override_settings(AXES_LOCKOUT_URL="https://example.com")
645 def test_get_lockout_response_lockout_url(self):
646 response = get_lockout_response(request=self.request)
647 self.assertEqual(type(response), HttpResponseRedirect)
648
649 def test_get_lockout_response_lockout_json(self):
650 self.request.META["HTTP_X_REQUESTED_WITH"] = "XMLHttpRequest"
651 response = get_lockout_response(request=self.request)
652 self.assertEqual(type(response), JsonResponse)
653
654 def test_get_lockout_response_lockout_response(self):
655 response = get_lockout_response(request=self.request)
656 self.assertEqual(type(response), HttpResponse)
657
658
659 def mock_get_cool_off_str():
660 return timedelta(seconds=30)
661
662
663 class AxesCoolOffTestCase(AxesTestCase):
664 @override_settings(AXES_COOLOFF_TIME=None)
665 def test_get_cool_off_none(self):
666 self.assertIsNone(get_cool_off())
667
668 @override_settings(AXES_COOLOFF_TIME=2)
669 def test_get_cool_off_int(self):
670 self.assertEqual(get_cool_off(), timedelta(hours=2))
671
672 @override_settings(AXES_COOLOFF_TIME=lambda: timedelta(seconds=30))
673 def test_get_cool_off_callable(self):
674 self.assertEqual(get_cool_off(), timedelta(seconds=30))
675
676 @override_settings(AXES_COOLOFF_TIME="tests.test_helpers.mock_get_cool_off_str")
677 def test_get_cool_off_path(self):
678 self.assertEqual(get_cool_off(), timedelta(seconds=30))
679
680
681 def mock_is_whitelisted(request, credentials):
682 return True
683
684
685 class AxesWhitelistTestCase(AxesTestCase):
686 def setUp(self):
687 self.user_model = get_user_model()
688 self.user = self.user_model.objects.create(username="jane.doe")
689 self.request = HttpRequest()
690 self.credentials = dict()
691
692 def test_is_whitelisted(self):
693 self.assertFalse(is_user_attempt_whitelisted(self.request, self.credentials))
694
695 @override_settings(AXES_WHITELIST_CALLABLE=mock_is_whitelisted)
696 def test_is_whitelisted_override_callable(self):
697 self.assertTrue(is_user_attempt_whitelisted(self.request, self.credentials))
698
699 @override_settings(AXES_WHITELIST_CALLABLE="tests.test_helpers.mock_is_whitelisted")
700 def test_is_whitelisted_override_path(self):
701 self.assertTrue(is_user_attempt_whitelisted(self.request, self.credentials))
702
703 @override_settings(AXES_WHITELIST_CALLABLE=42)
704 def test_is_whitelisted_override_invalid(self):
705 with self.assertRaises(TypeError):
706 is_user_attempt_whitelisted(self.request, self.credentials)
707
708
709 def mock_get_lockout_response(request, credentials):
710 return HttpResponse(status=400)
711
712
713 class AxesLockoutTestCase(AxesTestCase):
714 def setUp(self):
715 self.request = HttpRequest()
716 self.credentials = dict()
717
718 def test_get_lockout_response(self):
719 response = get_lockout_response(self.request, self.credentials)
720 self.assertEqual(403, response.status_code)
721
722 @override_settings(AXES_HTTP_RESPONSE_CODE=429)
723 def test_get_lockout_response_with_custom_http_response_code(self):
724 response = get_lockout_response(self.request, self.credentials)
725 self.assertEqual(429, response.status_code)
726
727 @override_settings(AXES_LOCKOUT_CALLABLE=mock_get_lockout_response)
728 def test_get_lockout_response_override_callable(self):
729 response = get_lockout_response(self.request, self.credentials)
730 self.assertEqual(400, response.status_code)
731
732 @override_settings(
733 AXES_LOCKOUT_CALLABLE="tests.test_helpers.mock_get_lockout_response"
734 )
735 def test_get_lockout_response_override_path(self):
736 response = get_lockout_response(self.request, self.credentials)
737 self.assertEqual(400, response.status_code)
738
739 @override_settings(AXES_LOCKOUT_CALLABLE=42)
740 def test_get_lockout_response_override_invalid(self):
741 with self.assertRaises(TypeError):
742 get_lockout_response(self.request, self.credentials)
743
744
745 class AxesCleanseParamsTestCase(AxesTestCase):
746 def setUp(self):
747 self.parameters = {
748 "username": "test_user",
749 "password": "test_password",
750 "other_sensitive_data": "sensitive",
751 }
752
753 def test_cleanse_parameters(self):
754 cleansed = cleanse_parameters(self.parameters)
755 self.assertEqual("test_user", cleansed["username"])
756 self.assertEqual("********************", cleansed["password"])
757 self.assertEqual("sensitive", cleansed["other_sensitive_data"])
758
759 @override_settings(AXES_SENSITIVE_PARAMETERS=["other_sensitive_data"])
760 def test_cleanse_parameters_override_sensitive(self):
761 cleansed = cleanse_parameters(self.parameters)
762 self.assertEqual("test_user", cleansed["username"])
763 self.assertEqual("********************", cleansed["password"])
764 self.assertEqual("********************", cleansed["other_sensitive_data"])
765
766 @override_settings(AXES_SENSITIVE_PARAMETERS=["other_sensitive_data"])
767 @override_settings(AXES_PASSWORD_FORM_FIELD="username")
768 def test_cleanse_parameters_override_both(self):
769 cleansed = cleanse_parameters(self.parameters)
770 self.assertEqual("********************", cleansed["username"])
771 self.assertEqual("********************", cleansed["password"])
772 self.assertEqual("********************", cleansed["other_sensitive_data"])
773
774 @override_settings(AXES_PASSWORD_FORM_FIELD=None)
775 def test_cleanse_parameters_override_empty(self):
776 cleansed = cleanse_parameters(self.parameters)
777 self.assertEqual("test_user", cleansed["username"])
778 self.assertEqual("********************", cleansed["password"])
779 self.assertEqual("sensitive", cleansed["other_sensitive_data"])
0 from unittest.mock import patch
1
2 from django.test import override_settings
3 from django.urls import reverse
4
5 from axes.apps import AppConfig
6 from axes.models import AccessAttempt, AccessLog
7 from tests.base import AxesTestCase
8
9
10 @patch("axes.apps.AppConfig.initialized", False)
11 @patch("axes.apps.log")
12 class AppsTestCase(AxesTestCase):
13 def test_axes_config_log_re_entrant(self, log):
14 """
15 Test that initialize call count does not increase on repeat calls.
16 """
17
18 AppConfig.initialize()
19 calls = log.info.call_count
20
21 AppConfig.initialize()
22 self.assertTrue(
23 calls == log.info.call_count and calls > 0,
24 "AxesConfig.initialize needs to be re-entrant",
25 )
26
27 @override_settings(AXES_VERBOSE=False)
28 def test_axes_config_log_not_verbose(self, log):
29 AppConfig.initialize()
30 self.assertFalse(log.info.called)
31
32 @override_settings(AXES_ONLY_USER_FAILURES=True)
33 def test_axes_config_log_user_only(self, log):
34 AppConfig.initialize()
35 log.info.assert_called_with("AXES: blocking by username only.")
36
37 @override_settings(AXES_ONLY_USER_FAILURES=False)
38 def test_axes_config_log_ip_only(self, log):
39 AppConfig.initialize()
40 log.info.assert_called_with("AXES: blocking by IP only.")
41
42 @override_settings(AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP=True)
43 def test_axes_config_log_user_ip(self, log):
44 AppConfig.initialize()
45 log.info.assert_called_with("AXES: blocking by combination of username and IP.")
46
47 @override_settings(AXES_LOCK_OUT_BY_USER_OR_IP=True)
48 def test_axes_config_log_user_or_ip(self, log):
49 AppConfig.initialize()
50 log.info.assert_called_with("AXES: blocking by username or IP.")
51
52
53 class AccessLogTestCase(AxesTestCase):
54 def test_access_log_on_logout(self):
55 """
56 Test a valid logout and make sure the logout_time is updated.
57 """
58
59 self.login(is_valid_username=True, is_valid_password=True)
60 self.assertIsNone(AccessLog.objects.latest("id").logout_time)
61
62 response = self.client.get(reverse("admin:logout"))
63 self.assertContains(response, "Logged out")
64
65 self.assertIsNotNone(AccessLog.objects.latest("id").logout_time)
66
67 def test_log_data_truncated(self):
68 """
69 Test that get_query_str properly truncates data to the max_length (default 1024).
70 """
71
72 # An impossibly large post dict
73 extra_data = {"a" * x: x for x in range(1024)}
74 self.login(**extra_data)
75 self.assertEqual(len(AccessAttempt.objects.latest("id").post_data), 1024)
76
77 @override_settings(AXES_DISABLE_ACCESS_LOG=True)
78 def test_valid_logout_without_success_log(self):
79 AccessLog.objects.all().delete()
80
81 response = self.login(is_valid_username=True, is_valid_password=True)
82 response = self.client.get(reverse("admin:logout"))
83
84 self.assertEqual(AccessLog.objects.all().count(), 0)
85 self.assertContains(response, "Logged out", html=True)
86
87 @override_settings(AXES_DISABLE_ACCESS_LOG=True)
88 def test_valid_login_without_success_log(self):
89 """
90 Test that a valid login does not generate an AccessLog when DISABLE_SUCCESS_ACCESS_LOG is True.
91 """
92
93 AccessLog.objects.all().delete()
94
95 response = self.login(is_valid_username=True, is_valid_password=True)
96
97 self.assertEqual(response.status_code, 302)
98 self.assertEqual(AccessLog.objects.all().count(), 0)
99
100 @override_settings(AXES_DISABLE_ACCESS_LOG=True)
101 def test_valid_logout_without_log(self):
102 AccessLog.objects.all().delete()
103
104 response = self.login(is_valid_username=True, is_valid_password=True)
105 response = self.client.get(reverse("admin:logout"))
106
107 self.assertEqual(AccessLog.objects.count(), 0)
108 self.assertContains(response, "Logged out", html=True)
109
110 @override_settings(AXES_DISABLE_ACCESS_LOG=True)
111 def test_non_valid_login_without_log(self):
112 """
113 Test that a non-valid login does generate an AccessLog when DISABLE_ACCESS_LOG is True.
114 """
115 AccessLog.objects.all().delete()
116
117 response = self.login(is_valid_username=True, is_valid_password=False)
118 self.assertEqual(response.status_code, 200)
119
120 self.assertEqual(AccessLog.objects.all().count(), 0)
0 """
1 Integration tests for the login handling.
2
3 TODO: Clean up the tests in this module.
4 """
5
6 from importlib import import_module
7
8 from django.contrib.auth import get_user_model, login, logout
9 from django.http import HttpRequest
10 from django.test import override_settings, TestCase
11 from django.urls import reverse
12
13 from axes.conf import settings
14 from axes.helpers import get_cache, make_cache_key_list
15 from axes.models import AccessAttempt
16 from tests.base import AxesTestCase
17
18
19 class DjangoLoginTestCase(TestCase):
20 def setUp(self):
21 engine = import_module(settings.SESSION_ENGINE)
22
23 self.request = HttpRequest()
24 self.request.session = engine.SessionStore()
25
26 self.username = "john.doe"
27 self.password = "hunter2"
28
29 self.user = get_user_model().objects.create(username=self.username)
30 self.user.set_password(self.password)
31 self.user.save()
32 self.user.backend = "django.contrib.auth.backends.ModelBackend"
33
34
35 class DjangoContribAuthLoginTestCase(DjangoLoginTestCase):
36 def test_login(self):
37 login(self.request, self.user)
38
39 def test_logout(self):
40 login(self.request, self.user)
41 logout(self.request)
42
43
44 @override_settings(AXES_ENABLED=False)
45 class DjangoTestClientLoginTestCase(DjangoLoginTestCase):
46 def test_client_login(self):
47 self.client.login(username=self.username, password=self.password)
48
49 def test_client_logout(self):
50 self.client.login(username=self.username, password=self.password)
51 self.client.logout()
52
53 def test_client_force_login(self):
54 self.client.force_login(self.user)
55
56
57 class DatabaseLoginTestCase(AxesTestCase):
58 """
59 Test for lockouts under different configurations and circumstances to prevent false positives and false negatives.
60
61 Always block attempted logins for the same user from the same IP.
62 Always allow attempted logins for a different user from a different IP.
63 """
64
65 IP_1 = "10.1.1.1"
66 IP_2 = "10.2.2.2"
67 IP_3 = "10.2.2.3"
68 USER_1 = "valid-user-1"
69 USER_2 = "valid-user-2"
70 USER_3 = "valid-user-3"
71 EMAIL_1 = "valid-email-1@example.com"
72 EMAIL_2 = "valid-email-2@example.com"
73
74 VALID_USERNAME = USER_1
75 VALID_EMAIL = EMAIL_1
76 VALID_PASSWORD = "valid-password"
77
78 VALID_IP_ADDRESS = IP_1
79
80 WRONG_PASSWORD = "wrong-password"
81 LOCKED_MESSAGE = "Account locked: too many login attempts."
82 LOGIN_FORM_KEY = '<input type="submit" value="Log in" />'
83 ATTEMPT_NOT_BLOCKED = 200
84 ALLOWED = 302
85 BLOCKED = 403
86
87 def _login(self, username, password, ip_addr="127.0.0.1", **kwargs):
88 """
89 Login a user and get the response.
90
91 IP address can be configured to test IP blocking functionality.
92 """
93
94 post_data = {"username": username, "password": password}
95
96 post_data.update(kwargs)
97
98 return self.client.post(
99 reverse("admin:login"),
100 post_data,
101 REMOTE_ADDR=ip_addr,
102 HTTP_USER_AGENT="test-browser",
103 )
104
105 def _lockout_user_from_ip(self, username, ip_addr):
106 for _ in range(settings.AXES_FAILURE_LIMIT):
107 response = self._login(
108 username=username, password=self.WRONG_PASSWORD, ip_addr=ip_addr
109 )
110 return response
111
112 def _lockout_user1_from_ip1(self):
113 return self._lockout_user_from_ip(username=self.USER_1, ip_addr=self.IP_1)
114
115 def setUp(self):
116 """
117 Create two valid users for authentication.
118 """
119
120 super().setUp()
121
122 self.user2 = get_user_model().objects.create_superuser(
123 username=self.USER_2,
124 email=self.EMAIL_2,
125 password=self.VALID_PASSWORD,
126 is_staff=True,
127 is_superuser=True,
128 )
129
130 def test_login(self):
131 """
132 Test a valid login for a real username.
133 """
134
135 response = self._login(self.username, self.password)
136 self.assertNotContains(
137 response, self.LOGIN_FORM_KEY, status_code=self.ALLOWED, html=True
138 )
139
140 def test_lockout_limit_once(self):
141 """
142 Test the login lock trying to login one more time than failure limit.
143 """
144
145 response = self.lockout()
146 self.assertContains(response, self.LOCKED_MESSAGE, status_code=self.BLOCKED)
147
148 def test_lockout_limit_many(self):
149 """
150 Test the login lock trying to login a lot of times more than failure limit.
151 """
152
153 self.lockout()
154
155 for _ in range(settings.AXES_FAILURE_LIMIT):
156 response = self.login()
157 self.assertContains(response, self.LOCKED_MESSAGE, status_code=self.BLOCKED)
158
159 def attempt_count(self):
160 return AccessAttempt.objects.count()
161
162 @override_settings(AXES_RESET_ON_SUCCESS=False)
163 def test_reset_on_success_false(self):
164 self.almost_lockout()
165 self.login(is_valid_username=True, is_valid_password=True)
166
167 response = self.login()
168 self.assertContains(response, self.LOCKED_MESSAGE, status_code=self.BLOCKED)
169 self.assertTrue(self.attempt_count())
170
171 @override_settings(AXES_RESET_ON_SUCCESS=True)
172 def test_reset_on_success_true(self):
173 self.almost_lockout()
174 self.assertTrue(self.attempt_count())
175
176 self.login(is_valid_username=True, is_valid_password=True)
177 self.assertFalse(self.attempt_count())
178
179 response = self.lockout()
180 self.assertContains(response, self.LOCKED_MESSAGE, status_code=self.BLOCKED)
181 self.assertTrue(self.attempt_count())
182
183 @override_settings(AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP=True)
184 def test_lockout_by_combination_user_and_ip(self):
185 """
186 Test login failure when AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP is True.
187 """
188
189 # test until one try before the limit
190 for _ in range(1, settings.AXES_FAILURE_LIMIT):
191 response = self.login(is_valid_username=True, is_valid_password=False)
192 # Check if we are in the same login page
193 self.assertContains(response, self.LOGIN_FORM_KEY, html=True)
194
195 # So, we shouldn't have gotten a lock-out yet.
196 # But we should get one now
197 response = self.login(is_valid_username=True, is_valid_password=False)
198 self.assertContains(response, self.LOCKED_MESSAGE, status_code=403)
199
200 @override_settings(AXES_ONLY_USER_FAILURES=True)
201 def test_lockout_by_only_user_failures(self):
202 """
203 Test login failure when AXES_ONLY_USER_FAILURES is True.
204 """
205
206 # test until one try before the limit
207 for _ in range(1, settings.AXES_FAILURE_LIMIT):
208 response = self._login(self.username, self.WRONG_PASSWORD)
209
210 # Check if we are in the same login page
211 self.assertContains(response, self.LOGIN_FORM_KEY, html=True)
212
213 # So, we shouldn't have gotten a lock-out yet.
214 # But we should get one now
215 response = self._login(self.username, self.WRONG_PASSWORD)
216 self.assertContains(response, self.LOCKED_MESSAGE, status_code=self.BLOCKED)
217
218 # reset the username only and make sure we can log in now even though our IP has failed each time
219 self.reset(username=self.username)
220
221 response = self._login(self.username, self.password)
222
223 # Check if we are still in the login page
224 self.assertNotContains(
225 response, self.LOGIN_FORM_KEY, status_code=self.ALLOWED, html=True
226 )
227
228 # now create failure_limit + 1 failed logins and then we should still
229 # be able to login with valid_username
230 for _ in range(settings.AXES_FAILURE_LIMIT):
231 response = self._login(self.username, self.password)
232
233 # Check if we can still log in with valid user
234 response = self._login(self.username, self.password)
235 self.assertNotContains(
236 response, self.LOGIN_FORM_KEY, status_code=self.ALLOWED, html=True
237 )
238
239 # Test for true and false positives when blocking by IP *OR* user (default)
240 # Cache disabled. Default settings.
241 def test_lockout_by_ip_blocks_when_same_user_same_ip_without_cache(self):
242 # User 1 is locked out from IP 1.
243 self._lockout_user1_from_ip1()
244
245 # User 1 is still blocked from IP 1.
246 response = self._login(self.USER_1, self.VALID_PASSWORD, ip_addr=self.IP_1)
247 self.assertEqual(response.status_code, self.BLOCKED)
248
249 def test_lockout_by_ip_allows_when_same_user_diff_ip_without_cache(self):
250 # User 1 is locked out from IP 1.
251 self._lockout_user1_from_ip1()
252
253 # User 1 can still login from IP 2.
254 response = self._login(self.USER_1, self.VALID_PASSWORD, ip_addr=self.IP_2)
255 self.assertEqual(response.status_code, self.ALLOWED)
256
257 def test_lockout_by_ip_blocks_when_diff_user_same_ip_without_cache(self):
258 # User 1 is locked out from IP 1.
259 self._lockout_user1_from_ip1()
260
261 # User 2 is also locked out from IP 1.
262 response = self._login(self.USER_2, self.VALID_PASSWORD, ip_addr=self.IP_1)
263 self.assertEqual(response.status_code, self.BLOCKED)
264
265 def test_lockout_by_ip_allows_when_diff_user_diff_ip_without_cache(self):
266 # User 1 is locked out from IP 1.
267 self._lockout_user1_from_ip1()
268
269 # User 2 can still login from IP 2.
270 response = self._login(self.USER_2, self.VALID_PASSWORD, ip_addr=self.IP_2)
271 self.assertEqual(response.status_code, self.ALLOWED)
272
273 # Test for true and false positives when blocking by user only.
274 # Cache disabled. When AXES_ONLY_USER_FAILURES = True
275 @override_settings(AXES_ONLY_USER_FAILURES=True)
276 def test_lockout_by_user_blocks_when_same_user_same_ip_without_cache(self):
277 # User 1 is locked out from IP 1.
278 self._lockout_user1_from_ip1()
279
280 # User 1 is still blocked from IP 1.
281 response = self._login(self.USER_1, self.VALID_PASSWORD, ip_addr=self.IP_1)
282 self.assertEqual(response.status_code, self.BLOCKED)
283
284 @override_settings(AXES_ONLY_USER_FAILURES=True)
285 def test_lockout_by_user_blocks_when_same_user_diff_ip_without_cache(self):
286 # User 1 is locked out from IP 1.
287 self._lockout_user1_from_ip1()
288
289 # User 1 is also locked out from IP 2.
290 response = self._login(self.USER_1, self.VALID_PASSWORD, ip_addr=self.IP_2)
291 self.assertEqual(response.status_code, self.BLOCKED)
292
293 @override_settings(AXES_ONLY_USER_FAILURES=True)
294 def test_lockout_by_user_allows_when_diff_user_same_ip_without_cache(self):
295 # User 1 is locked out from IP 1.
296 self._lockout_user1_from_ip1()
297
298 # User 2 can still login from IP 1.
299 response = self._login(self.USER_2, self.VALID_PASSWORD, ip_addr=self.IP_1)
300 self.assertEqual(response.status_code, self.ALLOWED)
301
302 @override_settings(AXES_ONLY_USER_FAILURES=True)
303 def test_lockout_by_user_allows_when_diff_user_diff_ip_without_cache(self):
304 # User 1 is locked out from IP 1.
305 self._lockout_user1_from_ip1()
306
307 # User 2 can still login from IP 2.
308 response = self._login(self.USER_2, self.VALID_PASSWORD, ip_addr=self.IP_2)
309 self.assertEqual(response.status_code, self.ALLOWED)
310
311 @override_settings(AXES_ONLY_USER_FAILURES=True)
312 def test_lockout_by_user_with_empty_username_allows_other_users_without_cache(self):
313 # User with empty username is locked out from IP 1.
314 self._lockout_user_from_ip(username="", ip_addr=self.IP_1)
315
316 # Still possible to access the login page
317 response = self.client.get(reverse("admin:login"), REMOTE_ADDR=self.IP_1)
318 self.assertContains(response, self.LOGIN_FORM_KEY, status_code=200, html=True)
319
320 # Test for true and false positives when blocking by user and IP together.
321 # Cache disabled. When LOCK_OUT_BY_COMBINATION_USER_AND_IP = True
322 @override_settings(AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP=True)
323 def test_lockout_by_user_and_ip_blocks_when_same_user_same_ip_without_cache(self):
324 # User 1 is locked out from IP 1.
325 self._lockout_user1_from_ip1()
326
327 # User 1 is still blocked from IP 1.
328 response = self._login(self.USER_1, self.VALID_PASSWORD, ip_addr=self.IP_1)
329 self.assertEqual(response.status_code, self.BLOCKED)
330
331 @override_settings(AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP=True)
332 def test_lockout_by_user_and_ip_allows_when_same_user_diff_ip_without_cache(self):
333 # User 1 is locked out from IP 1.
334 self._lockout_user1_from_ip1()
335
336 # User 1 can still login from IP 2.
337 response = self._login(self.USER_1, self.VALID_PASSWORD, ip_addr=self.IP_2)
338 self.assertEqual(response.status_code, self.ALLOWED)
339
340 @override_settings(AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP=True)
341 def test_lockout_by_user_and_ip_allows_when_diff_user_same_ip_without_cache(self):
342 # User 1 is locked out from IP 1.
343 self._lockout_user1_from_ip1()
344
345 # User 2 can still login from IP 1.
346 response = self._login(self.USER_2, self.VALID_PASSWORD, ip_addr=self.IP_1)
347 self.assertEqual(response.status_code, self.ALLOWED)
348
349 @override_settings(AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP=True)
350 def test_lockout_by_user_and_ip_allows_when_diff_user_diff_ip_without_cache(self):
351 # User 1 is locked out from IP 1.
352 self._lockout_user1_from_ip1()
353
354 # User 2 can still login from IP 2.
355 response = self._login(self.USER_2, self.VALID_PASSWORD, ip_addr=self.IP_2)
356 self.assertEqual(response.status_code, self.ALLOWED)
357
358 @override_settings(AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP=True)
359 def test_lockout_by_user_and_ip_with_empty_username_allows_other_users_without_cache(
360 self,
361 ):
362 # User with empty username is locked out from IP 1.
363 self._lockout_user_from_ip(username="", ip_addr=self.IP_1)
364
365 # Still possible to access the login page
366 response = self.client.get(reverse("admin:login"), REMOTE_ADDR=self.IP_1)
367 self.assertContains(response, self.LOGIN_FORM_KEY, status_code=200, html=True)
368
369 # Test for true and false positives when blocking by IP *OR* user (default)
370 # With cache enabled. Default criteria.
371 def test_lockout_by_ip_blocks_when_same_user_same_ip_using_cache(self):
372 # User 1 is locked out from IP 1.
373 self._lockout_user1_from_ip1()
374
375 # User 1 is still blocked from IP 1.
376 response = self._login(self.USER_1, self.VALID_PASSWORD, ip_addr=self.IP_1)
377 self.assertEqual(response.status_code, self.BLOCKED)
378
379 def test_lockout_by_ip_allows_when_same_user_diff_ip_using_cache(self):
380 # User 1 is locked out from IP 1.
381 self._lockout_user1_from_ip1()
382
383 # User 1 can still login from IP 2.
384 response = self._login(self.USER_1, self.VALID_PASSWORD, ip_addr=self.IP_2)
385 self.assertEqual(response.status_code, self.ALLOWED)
386
387 def test_lockout_by_ip_blocks_when_diff_user_same_ip_using_cache(self):
388 # User 1 is locked out from IP 1.
389 self._lockout_user1_from_ip1()
390
391 # User 2 is also locked out from IP 1.
392 response = self._login(self.USER_2, self.VALID_PASSWORD, ip_addr=self.IP_1)
393 self.assertEqual(response.status_code, self.BLOCKED)
394
395 def test_lockout_by_ip_allows_when_diff_user_diff_ip_using_cache(self):
396 # User 1 is locked out from IP 1.
397 self._lockout_user1_from_ip1()
398
399 # User 2 can still login from IP 2.
400 response = self._login(self.USER_2, self.VALID_PASSWORD, ip_addr=self.IP_2)
401 self.assertEqual(response.status_code, self.ALLOWED)
402
403 @override_settings(AXES_ONLY_USER_FAILURES=True)
404 def test_lockout_by_user_with_empty_username_allows_other_users_using_cache(self):
405 # User with empty username is locked out from IP 1.
406 self._lockout_user_from_ip(username="", ip_addr=self.IP_1)
407
408 # Still possible to access the login page
409 response = self.client.get(reverse("admin:login"), REMOTE_ADDR=self.IP_1)
410 self.assertContains(response, self.LOGIN_FORM_KEY, status_code=200, html=True)
411
412 # Test for true and false positives when blocking by user only.
413 # With cache enabled. When AXES_ONLY_USER_FAILURES = True
414 @override_settings(AXES_ONLY_USER_FAILURES=True)
415 def test_lockout_by_user_blocks_when_same_user_same_ip_using_cache(self):
416 # User 1 is locked out from IP 1.
417 self._lockout_user1_from_ip1()
418
419 # User 1 is still blocked from IP 1.
420 response = self._login(self.USER_1, self.VALID_PASSWORD, ip_addr=self.IP_1)
421 self.assertEqual(response.status_code, self.BLOCKED)
422
423 @override_settings(AXES_ONLY_USER_FAILURES=True)
424 def test_lockout_by_user_blocks_when_same_user_diff_ip_using_cache(self):
425 # User 1 is locked out from IP 1.
426 self._lockout_user1_from_ip1()
427
428 # User 1 is also locked out from IP 2.
429 response = self._login(self.USER_1, self.VALID_PASSWORD, ip_addr=self.IP_2)
430 self.assertEqual(response.status_code, self.BLOCKED)
431
432 @override_settings(AXES_ONLY_USER_FAILURES=True)
433 def test_lockout_by_user_allows_when_diff_user_same_ip_using_cache(self):
434 # User 1 is locked out from IP 1.
435 self._lockout_user1_from_ip1()
436
437 # User 2 can still login from IP 1.
438 response = self._login(self.USER_2, self.VALID_PASSWORD, ip_addr=self.IP_1)
439 self.assertEqual(response.status_code, self.ALLOWED)
440
441 @override_settings(AXES_ONLY_USER_FAILURES=True)
442 def test_lockout_by_user_allows_when_diff_user_diff_ip_using_cache(self):
443 # User 1 is locked out from IP 1.
444 self._lockout_user1_from_ip1()
445
446 # User 2 can still login from IP 2.
447 response = self._login(self.USER_2, self.VALID_PASSWORD, ip_addr=self.IP_2)
448 self.assertEqual(response.status_code, self.ALLOWED)
449
450 # Test for true and false positives when blocking by user and IP together.
451 # With cache enabled. When LOCK_OUT_BY_COMBINATION_USER_AND_IP = True
452 @override_settings(AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP=True)
453 def test_lockout_by_user_and_ip_blocks_when_same_user_same_ip_using_cache(self):
454 # User 1 is locked out from IP 1.
455 self._lockout_user1_from_ip1()
456
457 # User 1 is still blocked from IP 1.
458 response = self._login(self.USER_1, self.VALID_PASSWORD, ip_addr=self.IP_1)
459 self.assertEqual(response.status_code, self.BLOCKED)
460
461 @override_settings(AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP=True)
462 def test_lockout_by_user_and_ip_allows_when_same_user_diff_ip_using_cache(self):
463 # User 1 is locked out from IP 1.
464 self._lockout_user1_from_ip1()
465
466 # User 1 can still login from IP 2.
467 response = self._login(self.USER_1, self.VALID_PASSWORD, ip_addr=self.IP_2)
468 self.assertEqual(response.status_code, self.ALLOWED)
469
470 @override_settings(AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP=True)
471 def test_lockout_by_user_and_ip_allows_when_diff_user_same_ip_using_cache(self):
472 # User 1 is locked out from IP 1.
473 self._lockout_user1_from_ip1()
474
475 # User 2 can still login from IP 1.
476 response = self._login(self.USER_2, self.VALID_PASSWORD, ip_addr=self.IP_1)
477 self.assertEqual(response.status_code, self.ALLOWED)
478
479 @override_settings(AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP=True)
480 def test_lockout_by_user_and_ip_allows_when_diff_user_diff_ip_using_cache(self):
481 # User 1 is locked out from IP 1.
482 self._lockout_user1_from_ip1()
483
484 # User 2 can still login from IP 2.
485 response = self._login(self.USER_2, self.VALID_PASSWORD, ip_addr=self.IP_2)
486 self.assertEqual(response.status_code, self.ALLOWED)
487
488 @override_settings(
489 AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP=True, AXES_FAILURE_LIMIT=2
490 )
491 def test_lockout_by_user_and_ip_allows_when_diff_user_same_ip_using_cache_multiple_attempts(
492 self,
493 ):
494 # User 1 is locked out from IP 1.
495 response = self._login(self.USER_1, self.WRONG_PASSWORD, self.IP_1)
496 self.assertEqual(response.status_code, self.ATTEMPT_NOT_BLOCKED)
497
498 # Second attempt from different IP
499 response = self._login(self.USER_1, self.WRONG_PASSWORD, self.IP_2)
500 self.assertEqual(response.status_code, self.ATTEMPT_NOT_BLOCKED)
501
502 # Second attempt from same IP, different username
503 response = self._login(self.USER_2, self.WRONG_PASSWORD, self.IP_1)
504 self.assertEqual(response.status_code, self.ATTEMPT_NOT_BLOCKED)
505
506 # User 1 is blocked from IP 1
507 response = self._login(self.USER_1, self.WRONG_PASSWORD, ip_addr=self.IP_1)
508 self.assertContains(response, self.LOCKED_MESSAGE, status_code=self.BLOCKED)
509
510 # User 1 is blocked from IP 2
511 response = self._login(self.USER_1, self.WRONG_PASSWORD, ip_addr=self.IP_2)
512 self.assertContains(response, self.LOCKED_MESSAGE, status_code=self.BLOCKED)
513
514 # User 2 can still login from IP 2, only he has 1 attempt left
515 response = self._login(self.USER_2, self.VALID_PASSWORD, ip_addr=self.IP_2)
516 self.assertEqual(response.status_code, self.ALLOWED)
517
518 @override_settings(AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP=True)
519 def test_lockout_by_user_and_ip_with_empty_username_allows_other_users_using_cache(
520 self,
521 ):
522 # User with empty username is locked out from IP 1.
523 self._lockout_user_from_ip(username="", ip_addr=self.IP_1)
524
525 # Still possible to access the login page
526 response = self.client.get(reverse("admin:login"), REMOTE_ADDR=self.IP_1)
527 self.assertContains(response, self.LOGIN_FORM_KEY, status_code=200, html=True)
528
529 # Test for true and false positives when blocking by user or IP together.
530 # With cache enabled. When AXES_LOCK_OUT_BY_USER_OR_IP = True
531 @override_settings(AXES_LOCK_OUT_BY_USER_OR_IP=True)
532 def test_lockout_by_user_or_ip_blocks_when_same_user_same_ip_using_cache(self):
533 # User 1 is locked out from IP 1.
534 self._lockout_user1_from_ip1()
535
536 # User 1 is still blocked from IP 1.
537 response = self._login(self.USER_1, self.VALID_PASSWORD, ip_addr=self.IP_1)
538 self.assertEqual(response.status_code, self.BLOCKED)
539
540 @override_settings(AXES_LOCK_OUT_BY_USER_OR_IP=True)
541 def test_lockout_by_user_or_ip_allows_when_same_user_diff_ip_using_cache(self):
542 # User 1 is locked out from IP 1.
543 self._lockout_user1_from_ip1()
544
545 # User 1 is blocked out from IP 1
546 response = self._login(self.USER_1, self.VALID_PASSWORD, ip_addr=self.IP_2)
547 self.assertEqual(response.status_code, self.BLOCKED)
548
549 @override_settings(AXES_LOCK_OUT_BY_USER_OR_IP=True)
550 def test_lockout_by_user_or_ip_allows_when_diff_user_same_ip_using_cache(self):
551 # User 1 is locked out from IP 1.
552 self._lockout_user1_from_ip1()
553
554 # User 2 can still login from IP 1.
555 response = self._login(self.USER_2, self.VALID_PASSWORD, ip_addr=self.IP_1)
556 self.assertEqual(response.status_code, self.BLOCKED)
557
558 @override_settings(AXES_LOCK_OUT_BY_USER_OR_IP=True, AXES_FAILURE_LIMIT=3)
559 def test_lockout_by_user_or_ip_allows_when_diff_user_same_ip_using_cache_multiple_attempts(
560 self,
561 ):
562 # User 1 is locked out from IP 1.
563 response = self._login(self.USER_1, self.WRONG_PASSWORD, self.IP_1)
564 self.assertEqual(response.status_code, self.ATTEMPT_NOT_BLOCKED)
565
566 # Second attempt from different IP
567 response = self._login(self.USER_1, self.WRONG_PASSWORD, self.IP_2)
568 self.assertEqual(response.status_code, self.ATTEMPT_NOT_BLOCKED)
569
570 # User 1 is blocked on all IPs, he reached 2 attempts
571 response = self._login(self.USER_1, self.WRONG_PASSWORD, ip_addr=self.IP_2)
572 self.assertContains(response, self.LOCKED_MESSAGE, status_code=self.BLOCKED)
573 response = self._login(self.USER_1, self.WRONG_PASSWORD, ip_addr=self.IP_3)
574 self.assertContains(response, self.LOCKED_MESSAGE, status_code=self.BLOCKED)
575
576 # IP 1 has still one attempt left
577 response = self._login(self.USER_2, self.WRONG_PASSWORD, self.IP_1)
578 self.assertEqual(response.status_code, self.ATTEMPT_NOT_BLOCKED)
579
580 # But now IP 1 is blocked for all attempts
581 response = self._login(self.USER_1, self.WRONG_PASSWORD, ip_addr=self.IP_1)
582 self.assertContains(response, self.LOCKED_MESSAGE, status_code=self.BLOCKED)
583 response = self._login(self.USER_2, self.WRONG_PASSWORD, ip_addr=self.IP_1)
584 self.assertContains(response, self.LOCKED_MESSAGE, status_code=self.BLOCKED)
585 response = self._login(self.USER_3, self.WRONG_PASSWORD, ip_addr=self.IP_1)
586 self.assertContains(response, self.LOCKED_MESSAGE, status_code=self.BLOCKED)
587
588 @override_settings(AXES_LOCK_OUT_BY_USER_OR_IP=True, AXES_FAILURE_LIMIT=3)
589 def test_lockout_by_user_or_ip_allows_when_diff_user_same_ip_using_cache_multiple_failed_attempts(
590 self,
591 ):
592 """Test, if the failed attempts make also impact on the attempt count"""
593 # User 1 is locked out from IP 1.
594 response = self._login(self.USER_1, self.WRONG_PASSWORD, self.IP_1)
595 self.assertEqual(response.status_code, self.ATTEMPT_NOT_BLOCKED)
596
597 # Second attempt from different IP
598 response = self._login(self.USER_1, self.WRONG_PASSWORD, self.IP_2)
599 self.assertEqual(response.status_code, self.ATTEMPT_NOT_BLOCKED)
600
601 # Second attempt from same IP, different username
602 response = self._login(self.USER_2, self.WRONG_PASSWORD, self.IP_1)
603 self.assertEqual(response.status_code, self.ATTEMPT_NOT_BLOCKED)
604
605 # User 1 is blocked from IP 2
606 response = self._login(self.USER_1, self.WRONG_PASSWORD, ip_addr=self.IP_2)
607 self.assertContains(response, self.LOCKED_MESSAGE, status_code=self.BLOCKED)
608
609 # On IP 2 it is only 2. attempt, for user 2 it is also 2. attempt -> allow log in
610 response = self._login(self.USER_2, self.VALID_PASSWORD, ip_addr=self.IP_2)
611 self.assertEqual(response.status_code, self.ALLOWED)
612
613 @override_settings(AXES_LOCK_OUT_BY_USER_OR_IP=True)
614 def test_lockout_by_user_or_ip_allows_when_diff_user_diff_ip_using_cache(self):
615 # User 1 is locked out from IP 1.
616 self._lockout_user1_from_ip1()
617
618 # User 2 can still login from IP 2.
619 response = self._login(self.USER_2, self.VALID_PASSWORD, ip_addr=self.IP_2)
620 self.assertEqual(response.status_code, self.ALLOWED)
621
622 @override_settings(AXES_LOCK_OUT_BY_USER_OR_IP=True)
623 def test_lockout_by_user_or_ip_with_empty_username_allows_other_users_using_cache(
624 self,
625 ):
626 # User with empty username is locked out from IP 1.
627 self._lockout_user_from_ip(username="", ip_addr=self.IP_1)
628
629 # Still possible to access the login page
630 response = self.client.get(reverse("admin:login"), REMOTE_ADDR=self.IP_1)
631 self.assertContains(response, self.LOGIN_FORM_KEY, status_code=200, html=True)
632
633
634 # Test the same logic with cache handler
635 @override_settings(AXES_HANDLER="axes.handlers.cache.AxesCacheHandler")
636 class CacheLoginTestCase(DatabaseLoginTestCase):
637 def attempt_count(self):
638 cache = get_cache()
639 keys = cache._cache
640 return len(keys)
641
642 def reset(self, **kwargs):
643 get_cache().delete(make_cache_key_list([kwargs])[0])
0 from io import StringIO
1 from unittest.mock import patch, Mock
2
3 from django.core.management import call_command
4 from django.utils import timezone
5
6 from axes.models import AccessAttempt, AccessLog
7 from tests.base import AxesTestCase
8
9
10 class ResetAccessLogsManagementCommandTestCase(AxesTestCase):
11 def setUp(self):
12 self.msg_not_found = "No logs found.\n"
13 self.msg_num_found = "{} logs removed.\n"
14
15 days_3 = timezone.now() - timezone.timedelta(days=3)
16 with patch("django.utils.timezone.now", Mock(return_value=days_3)):
17 AccessLog.objects.create()
18
19 days_13 = timezone.now() - timezone.timedelta(days=9)
20 with patch("django.utils.timezone.now", Mock(return_value=days_13)):
21 AccessLog.objects.create()
22
23 days_30 = timezone.now() - timezone.timedelta(days=27)
24 with patch("django.utils.timezone.now", Mock(return_value=days_30)):
25 AccessLog.objects.create()
26
27 def test_axes_delete_access_logs_default(self):
28 out = StringIO()
29 call_command("axes_reset_logs", stdout=out)
30 self.assertEqual(self.msg_not_found, out.getvalue())
31
32 def test_axes_delete_access_logs_older_than_2_days(self):
33 out = StringIO()
34 call_command("axes_reset_logs", age=2, stdout=out)
35 self.assertEqual(self.msg_num_found.format(3), out.getvalue())
36
37 def test_axes_delete_access_logs_older_than_4_days(self):
38 out = StringIO()
39 call_command("axes_reset_logs", age=4, stdout=out)
40 self.assertEqual(self.msg_num_found.format(2), out.getvalue())
41
42 def test_axes_delete_access_logs_older_than_16_days(self):
43 out = StringIO()
44 call_command("axes_reset_logs", age=16, stdout=out)
45 self.assertEqual(self.msg_num_found.format(1), out.getvalue())
46
47
48 class ManagementCommandTestCase(AxesTestCase):
49 def setUp(self):
50 AccessAttempt.objects.create(
51 username="jane.doe", ip_address="10.0.0.1", failures_since_start="4"
52 )
53
54 AccessAttempt.objects.create(
55 username="john.doe", ip_address="10.0.0.2", failures_since_start="15"
56 )
57
58 def test_axes_list_attempts(self):
59 out = StringIO()
60 call_command("axes_list_attempts", stdout=out)
61
62 expected = "10.0.0.1\tjane.doe\t4\n10.0.0.2\tjohn.doe\t15\n"
63 self.assertEqual(expected, out.getvalue())
64
65 def test_axes_reset(self):
66 out = StringIO()
67 call_command("axes_reset", stdout=out)
68
69 expected = "2 attempts removed.\n"
70 self.assertEqual(expected, out.getvalue())
71
72 def test_axes_reset_not_found(self):
73 out = StringIO()
74 call_command("axes_reset", stdout=out)
75
76 out = StringIO()
77 call_command("axes_reset", stdout=out)
78
79 expected = "No attempts found.\n"
80 self.assertEqual(expected, out.getvalue())
81
82 def test_axes_reset_ip(self):
83 out = StringIO()
84 call_command("axes_reset_ip", "10.0.0.1", stdout=out)
85
86 expected = "1 attempts removed.\n"
87 self.assertEqual(expected, out.getvalue())
88
89 def test_axes_reset_ip_not_found(self):
90 out = StringIO()
91 call_command("axes_reset_ip", "10.0.0.3", stdout=out)
92
93 expected = "No attempts found.\n"
94 self.assertEqual(expected, out.getvalue())
95
96 def test_axes_reset_username(self):
97 out = StringIO()
98 call_command("axes_reset_username", "john.doe", stdout=out)
99
100 expected = "1 attempts removed.\n"
101 self.assertEqual(expected, out.getvalue())
102
103 def test_axes_reset_username_not_found(self):
104 out = StringIO()
105 call_command("axes_reset_username", "ivan.renko", stdout=out)
106
107 expected = "No attempts found.\n"
108 self.assertEqual(expected, out.getvalue())
0 from django.http import HttpResponse, HttpRequest
1 from django.test import override_settings
2
3 from axes.middleware import AxesMiddleware
4 from tests.base import AxesTestCase
5
6
7 class MiddlewareTestCase(AxesTestCase):
8 STATUS_SUCCESS = 200
9 STATUS_LOCKOUT = 403
10
11 def setUp(self):
12 self.request = HttpRequest()
13
14 def test_success_response(self):
15 def get_response(request):
16 request.axes_locked_out = False
17 return HttpResponse()
18
19 response = AxesMiddleware(get_response)(self.request)
20 self.assertEqual(response.status_code, self.STATUS_SUCCESS)
21
22 def test_lockout_response(self):
23 def get_response(request):
24 request.axes_locked_out = True
25 return HttpResponse()
26
27 response = AxesMiddleware(get_response)(self.request)
28 self.assertEqual(response.status_code, self.STATUS_LOCKOUT)
29
30 @override_settings(AXES_ENABLED=False)
31 def test_respects_enabled_switch(self):
32 def get_response(request):
33 request.axes_locked_out = True
34 return HttpResponse()
35
36 response = AxesMiddleware(get_response)(self.request)
37 self.assertEqual(response.status_code, self.STATUS_SUCCESS)
0 from django.apps.registry import apps
1 from django.db import connection
2 from django.db.migrations.autodetector import MigrationAutodetector
3 from django.db.migrations.executor import MigrationExecutor
4 from django.db.migrations.state import ProjectState
5
6 from axes.models import AccessAttempt, AccessLog
7 from tests.base import AxesTestCase
8
9
10 class ModelsTestCase(AxesTestCase):
11 def setUp(self):
12 self.failures_since_start = 42
13
14 self.access_attempt = AccessAttempt(
15 failures_since_start=self.failures_since_start
16 )
17 self.access_log = AccessLog()
18
19 def test_access_attempt_str(self):
20 self.assertIn("Access", str(self.access_attempt))
21
22 def test_access_log_str(self):
23 self.assertIn("Access", str(self.access_log))
24
25
26 class MigrationsTestCase(AxesTestCase):
27 def test_missing_migrations(self):
28 executor = MigrationExecutor(connection)
29 autodetector = MigrationAutodetector(
30 executor.loader.project_state(), ProjectState.from_apps(apps)
31 )
32
33 changes = autodetector.changes(graph=executor.loader.graph)
34
35 self.assertEqual({}, changes)
0 from unittest.mock import MagicMock
1
2 from axes.signals import user_locked_out
3 from tests.base import AxesTestCase
4
5
6 class SignalTestCase(AxesTestCase):
7 def test_send_lockout_signal(self):
8 """
9 Test if the lockout signal is correctly emitted when user is locked out.
10 """
11
12 handler = MagicMock()
13 user_locked_out.connect(handler)
14
15 self.assertEqual(0, handler.call_count)
16 self.lockout()
17 self.assertEqual(1, handler.call_count)
0 from django.contrib import admin
1 from django.urls import path
2
3
4 urlpatterns = [path("admin/", admin.site.urls)]
0 urlpatterns: list = []
+0
-37
tox.ini less more
0 [tox]
1 envlist =
2 qa
3 py{36,37,38,py3}-django{22,30,31,master}
4
5 [travis]
6 python =
7 3.6: py36
8 3.7: py37
9 3.8: py38
10 pypy3: pypy3
11
12 [travis:env]
13 DJANGO =
14 2.2: django22
15 3.0: django30
16 3.1: django31
17 master: djangomaster
18
19 [testenv]
20 deps =
21 -r requirements.txt
22 django22: django>=2.2,<2.3
23 django30: django>=3.0,<3.1
24 django31: django>=3.1,<3.2
25 djangomaster: https://github.com/django/django/archive/master.tar.gz
26 usedevelop = True
27 commands =
28 pytest
29 setenv =
30 PYTHONDONTWRITEBYTECODE=1
31
32 [testenv:qa]
33 commands =
34 mypy axes
35 prospector
36 black -t py36 --check --diff axes