Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added throttling to token generation (Closes #48) #122

Merged
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/source/extend.rst
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ functionality.
.. autoclass:: django_otp.models.ThrottlingMixin
:members: get_throttle_factor, verify_is_allowed, throttle_reset, throttle_increment

.. autoclass:: django_otp.models.CooldownMixin
:members: generate_is_allowed, cooldown_reset
.. _utilities:

Utilities
Expand Down
8 changes: 8 additions & 0 deletions docs/source/overview.rst
Original file line number Diff line number Diff line change
Expand Up @@ -497,7 +497,15 @@ This controls the rate of throttling. The sequence of 1, 2, 4, 8… seconds is
multiplied by this factor to define the delay imposed after 1, 2, 3, 4…
successive failures. Set to 0 to disable throttling completely.

.. setting:: OTP_EMAIL_COOLDOWN_DURATION

**OTP_EMAIL_COOLDOWN_DURATION**

Default: ``60``

This controls the cooldown period after a successful generation. The next generation
can only be initiated after the designated time period has fully elapsed. Set to 0
to disable cooldown completely.

.. _other-plugins:

Expand Down
96 changes: 95 additions & 1 deletion src/django_otp/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,19 @@ def is_interactive(self):
"""
return not hasattr(self.generate_challenge, 'stub')

def generate_is_allowed(self):
"""
Checks whether it is permissible to call :meth:`generate_challenge`. If
it is allowed, returns ``(True, None)``. Otherwise returns ``(False,
data_dict)``, where ``data_dict`` contains extra information, defined
by the implementation.

This method can be used to implement throttling of token generation for
interactive devices. Client code should check this method before calling
:meth:`generate_challenge` and report problems to the user.
"""
return (True, None)

def generate_challenge(self):
"""
Generates a challenge value that the user will need to produce a token.
Expand Down Expand Up @@ -293,6 +306,84 @@ class VerifyNotAllowed:
N_FAILED_ATTEMPTS = 'N_FAILED_ATTEMPTS'


class CooldownMixin(models.Model):
"""
Mixin class for models requiring a cooldown duration between challenge generations.

Subclass must implement :meth:`get_cooldown_duration`, and must use the
:meth:`generate_is_allowed` method from within their generate_challenge() method.

see the implementation of
:class:`~django_otp.plugins.otp_email.models.EmailDevice` for an example.

"""

last_generated_timestamp = models.DateTimeField(
null=True,
blank=True,
help_text="The last time a token was generated for this device.",
)

def generate_is_allowed(self):
"""
Checks if the time since the last token generation is greater than
configured (in seconds).
"""
dt_now = timezone.now()
if (
not self.last_generated_timestamp
or (dt_now - self.last_generated_timestamp).total_seconds()
> self.get_cooldown_duration()
):
return super().generate_is_allowed()
else:
return False, {
'reason': "COOLDOWN_DURATION_PENDING",
'next_generation_at': self.last_generated_timestamp
+ timedelta(seconds=self.get_cooldown_duration()),
}

def cooldown_reset(self, commit=True):
"""
Call this method to reset cooldown (normally when a successful
verification).

Pass 'commit=False' to avoid calling self.save().
"""
self.last_generated_timestamp = None

if commit:
self.save()

def verify_token(self, token):
"""
Reset the throttle if the token is valid.
"""
verified = super().verify_token(token)
if verified:
self.cooldown_reset()
return verified

@cached_property
def cooldown_enabled(self):
return self.get_cooldown_duration() > 0

def get_cooldown_duration(self):
"""
This must be implemented to return the cooldown duration in seconds.

A duration of 0 disables the cooldown.

Normally this is just a wrapper for a plugin-specific setting like
:setting:`OTP_EMAIL_COOLDOWN_DURATION`.

"""
raise NotImplementedError()

class Meta:
abstract = True


class ThrottlingMixin(models.Model):
"""
Mixin class for models that want throttling behaviour.
Expand All @@ -311,7 +402,10 @@ class ThrottlingMixin(models.Model):
null=True,
blank=True,
default=None,
help_text="A timestamp of the last failed verification attempt. Null if last attempt succeeded.",
help_text=(
"A timestamp of the last failed verification attempt. Null if last attempt"
" succeeded."
),
)

throttling_failure_count = models.PositiveIntegerField(
Expand Down
1 change: 1 addition & 0 deletions src/django_otp/plugins/otp_email/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ class OTPEmailSettings:
'OTP_EMAIL_BODY_HTML_TEMPLATE_PATH': None,
'OTP_EMAIL_TOKEN_VALIDITY': 300,
'OTP_EMAIL_THROTTLE_FACTOR': 1,
'OTP_EMAIL_COOLDOWN_DURATION': 60,
}

def __getattr__(self, name):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Generated by Django 3.2.16 on 2023-05-12 11:37

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('otp_email', '0004_throttling'),
]

operations = [
migrations.AddField(
model_name='emaildevice',
name='last_generated_timestamp',
field=models.DateTimeField(blank=True, help_text='The last time a token was generated for this device.', null=True),
),
]
29 changes: 27 additions & 2 deletions src/django_otp/plugins/otp_email/models.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
from django.contrib.humanize.templatetags.humanize import naturaltime
from django.core.mail import send_mail
from django.db import models
from django.template import Context, Template
from django.template.loader import get_template
from django.utils import timezone

from django_otp.models import SideChannelDevice, ThrottlingMixin
from django_otp.models import CooldownMixin, SideChannelDevice, ThrottlingMixin
from django_otp.util import hex_validator, random_hex

from .conf import settings
Expand All @@ -19,7 +21,7 @@ def key_validator(value): # pragma: no cover
return hex_validator()(value)


class EmailDevice(ThrottlingMixin, SideChannelDevice):
class EmailDevice(CooldownMixin, ThrottlingMixin, SideChannelDevice):
"""
A :class:`~django_otp.models.SideChannelDevice` that delivers a token to
the email address saved in this object or alternatively to the user's
Expand Down Expand Up @@ -57,7 +59,27 @@ def generate_challenge(self, extra_context=None):
:type extra_context: dict

"""
generate_allowed, data_dict = self.generate_is_allowed()
if generate_allowed:
message = self._deliver_token(extra_context)

else:
if data_dict['reason'] == 'COOLDOWN_DURATION_PENDING':
next_generation_naturaltime = naturaltime(
data_dict['next_generation_at']
)
message = (
"Token generation cooldown period has not expired yet. Next"
f" generation allowed {next_generation_naturaltime}."
)
else:
message = "Token generation is not allowed at this time"

return message

def _deliver_token(self, extra_context):
self.generate_token(valid_secs=settings.OTP_EMAIL_TOKEN_VALIDITY)
self.last_generated_timestamp = timezone.now()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This never gets saved. As importantly, the tests aren't catching this (see companion note in the tests).

(Hint: try swapping these two lines. Adding the technically redundant commit=True will help to highlight the fact that you're relying on generate_token to save the model instance for you.)


context = {'token': self.token, **(extra_context or {})}
if settings.OTP_EMAIL_BODY_TEMPLATE:
Expand Down Expand Up @@ -101,3 +123,6 @@ def verify_token(self, token):
verified = False

return verified

def get_cooldown_duration(self):
return settings.OTP_EMAIL_COOLDOWN_DURATION
32 changes: 31 additions & 1 deletion src/django_otp/plugins/otp_email/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from django.test.utils import override_settings

from django_otp.forms import OTPAuthenticationForm
from django_otp.tests import TestCase, ThrottlingTestMixin
from django_otp.tests import CooldownTestMixin, TestCase, ThrottlingTestMixin

from .models import EmailDevice

Expand Down Expand Up @@ -235,3 +235,33 @@ def valid_token(self):

def invalid_token(self):
return -1


@override_settings(
OTP_EMAIL_COOLDOWN_DURATION=10,
)
class CooldownTestCase(EmailDeviceMixin, CooldownTestMixin, TestCase):
def valid_token(self):
if self.device.token is None:
self.device.generate_token()

return self.device.token

def invalid_token(self):
return -1

def test_cooldown_imposed_message(self):
message = self.device.generate_challenge()
message = self.device.generate_challenge()
self.assertTrue(
message.startswith(
'Token generation cooldown period has not expired yet. Next generation allowed'
)
)

def test_cooldown_imposed_expiration_message(self):
with freeze_time() as frozen_time:
self.device.generate_challenge()
frozen_time.tick(delta=timedelta(seconds=5))
message = self.device.generate_challenge()
self.assertIn("Next generation allowed 5\xa0seconds from now.", message)
72 changes: 72 additions & 0 deletions src/django_otp/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,78 @@ def test_verify_is_allowed(self):
self.assertEqual(data3, None)


class CooldownTestMixin:
def setUp(self):
self.device = None

def valid_token(self):
"""Returns a valid token to pass to our device under test."""
raise NotImplementedError()

def invalid_token(self):
"""Returns an invalid token to pass to our device under test."""
raise NotImplementedError()

#
# Tests
#

def test_generate_is_allowed_on_first_try(self):
"""Token generation should be allowed on first try."""
allowed, _ = self.device.generate_is_allowed()
self.assertTrue(allowed)

def test_cooldown_imposed_after_successful_generation(self):
"""
Token generation before cooldown should not be allowed
and the relevant reason should be returned.
"""
with freeze_time():
self.device.generate_challenge()
allowed, details = self.device.generate_is_allowed()
psagers marked this conversation as resolved.
Show resolved Hide resolved

self.assertFalse(allowed)
self.assertEqual(details['reason'], 'COOLDOWN_DURATION_PENDING')

def test_cooldown_expire_time(self):
"""
When token generation is not allowed, the cooldown expire time
should be returned.
"""
with freeze_time():
self.device.generate_challenge()
_, details = self.device.generate_is_allowed()
self.assertEqual(
details['next_generation_at'], timezone.now() + timedelta(seconds=10)
)

def test_cooldown_reset(self):
"""Cooldown can be reset and allow token generation again before the initial period expires."""
with freeze_time():
self.device.generate_is_allowed()
self.device.cooldown_reset()
allowed, _ = self.device.generate_is_allowed()
self.assertTrue(allowed)

def test_valid_token_verification_resets_cooldown(self):
"""When the token is verified, the cooldown period is reset."""
with freeze_time():
self.device.generate_challenge()
verified = self.device.verify_token(self.valid_token())
self.assertTrue(verified)
allowed, _ = self.device.generate_is_allowed()
self.assertTrue(allowed)

def test_invalid_token_verification_does_not_reset_cooldown(self):
"""When the token is not verified, the cooldown period is not reset."""
with freeze_time():
self.device.generate_challenge()
verified = self.device.verify_token(self.invalid_token())
self.assertFalse(verified)
allowed, _ = self.device.generate_is_allowed()
self.assertFalse(allowed)


@override_settings(OTP_STATIC_THROTTLE_FACTOR=0)
class APITestCase(TestCase):
def setUp(self):
Expand Down