Source code for applicationinsights.channel.QueueBase
try:
# Python 2.x
from Queue import Queue, Empty
except ImportError:
# Python 3.x
from queue import Queue, Empty
try:
from persistqueue import Empty as PersistEmpty
from persistqueue import Queue as PersistQueue
except ImportError:
PersistEmpty = Empty
PersistQueue = None
[docs]class QueueBase(object):
"""The base class for all types of queues for use in conjunction with an implementation of :class:`SenderBase`.
The queue will notify the sender that it needs to pick up items when it reaches :func:`max_queue_length`,
or when the consumer calls :func:`flush`.
"""
def __init__(self, sender, persistence_path=''):
"""Initializes a new instance of the class.
Args:
sender (:class:`SenderBase`) the sender object that will be used in conjunction with this queue.
persistence_path (str) if set, persist the queue on disk into the provided directory.
"""
if persistence_path and PersistQueue is None:
raise ValueError(
'persistence_path argument requires persist-queue dependency to be installed')
elif persistence_path:
self._queue = PersistQueue(persistence_path)
else:
self._queue = Queue()
self._persistence_path = persistence_path
self._max_queue_length = 500
self._sender = sender
if sender:
self._sender.queue = self
@property
def max_queue_length(self):
"""The maximum number of items that will be held by the queue before the queue will call the :func:`flush`
method.
Args:
value (int). the maximum queue length. The minimum allowed value is 1.
Returns:
int. the maximum queue size. (defaults to: 500)
"""
return self._max_queue_length
@max_queue_length.setter
def max_queue_length(self, value):
"""The maximum number of items that will be held by the queue before the queue will call the :func:`flush`
method.
Args:
value (int): the maximum queue length. The minimum allowed value is 1.
Returns:
int. the maximum queue size. (defaults to: 500)
"""
if value < 1:
value = 1
self._max_queue_length = value
@property
def sender(self):
"""The sender that is associated with this queue that this queue will use to send data to the service.
Returns:
:class:`SenderBase`. the sender object.
"""
return self._sender
[docs] def put(self, item):
"""Adds the passed in item object to the queue and calls :func:`flush` if the size of the queue is larger
than :func:`max_queue_length`. This method does nothing if the passed in item is None.
Args:
item (:class:`contracts.Envelope`) item the telemetry envelope object to send to the service.
"""
if not item:
return
self._queue.put(item)
if self._queue.qsize() >= self._max_queue_length:
self.flush()
[docs] def get(self):
"""Gets a single item from the queue and returns it. If the queue is empty, this method will return None.
Returns:
:class:`contracts.Envelope`. a telemetry envelope object or None if the queue is empty.
"""
try:
item = self._queue.get_nowait()
except (Empty, PersistEmpty):
return None
if self._persistence_path:
self._queue.task_done()
return item
[docs] def flush(self):
"""Flushes the current queue by notifying the {#sender}. This method needs to be overridden by a concrete
implementations of the queue class.
"""
pass