Source code for nti.zodb.tokenbucket

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Implementations of the token bucket algorithm.

"""

from __future__ import print_function, absolute_import, division
__docformat__ = "restructuredtext en"

logger = __import__('logging').getLogger(__name__)

from time import time

from zope import interface

try:
    from gevent import sleep
except ImportError: # pragma: no cover
    from time import sleep

from nti.zodb.interfaces import ITokenBucket

from nti.zodb.minmax import NumericMaximum
from nti.zodb.minmax import NumericMinimum


[docs] @interface.implementer(ITokenBucket) class PersistentTokenBucket(object): """ Persistent implementation of the token bucket algorithm. If the ZODB is used from multiple machines, relies on their clocks being relatively synchronized to be effective. Initially based on `an ActiveState recipe <http://code.activestate.com/recipes/511490-implementation-of-the-token-bucket-algorithm/>`_ (For efficiency, this object itself isn't persistent, but the objects it holds are.) """ def __init__(self, capacity, fill_rate=1.0): """ Creates a new token bucket, initially full. :param capacity: The max number of tokens in the bucket (also the initial number of tokens in the bucket. :keyword fill_rate: The rate in fractional tokens per second that the bucket will fill. The default is to add one token per second. """ self.capacity = float(capacity) self.fill_rate = float(fill_rate) # Conflict resolution: the tokens in the bucket is always # taken as the smallest. Time, of course, marches ever upwards # TODO: This could probably be better self._timestamp = NumericMaximum(time()) self._tokens = NumericMinimum(self.capacity)
[docs] def consume(self, tokens=1): """ Consume one or more tokens from the bucket. Returns True if there were sufficient tokens otherwise False. """ if tokens <= self.tokens: self._tokens -= tokens return True return False
[docs] def wait_for_token(self): """ Consume a single token from the bucket, blocking until one is available if need be. """ while not self.consume(): needed_token_count = 1.0 - self._tokens.value # How long will it take to get that token? how_long = needed_token_count * self.fill_rate sleep(how_long) return True
@property def tokens(self): """ The fractional number of tokens currently in the bucket. """ now = time() if self._tokens.value < self.capacity: delta = self.fill_rate * (now - self._timestamp) self._tokens.set(min(self.capacity, self._tokens + delta)) self._timestamp.set(now) return self._tokens.value def __repr__(self): return "%s(%s,%s)" % (type(self).__name__, self.capacity, self.fill_rate)