Source code for

from .QueueBase import QueueBase
from threading import Event

[docs]class AsynchronousQueue(QueueBase): """An asynchronous queue for use in conjunction with the :class:`AsynchronousSender`. The queue will notify the sender that it needs to pick up items when it reaches :func:`max_queue_length`, or when the consumer calls :func:`flush` via the :func:`flush_notification` event. """ def __init__(self, *args, **kwargs): """Initializes a new instance of the class. """ self._flush_notification = Event() QueueBase.__init__(self, *args, **kwargs) @property def flush_notification(self): """The flush notification :class:`Event` that the :func:`sender` will use to get notified that a flush is needed. Returns: :class:`Event`. object that the :func:`sender` can wait on. """ return self._flush_notification
[docs] def put(self, item): """Adds the passed in item object to the queue and notifies the :func:`sender` to start an asynchronous send operation by calling :func:`start`. Args: item (:class:`contracts.Envelope`) the telemetry envelope object to send to the service. """ QueueBase.put(self, item) if self.sender: self.sender.start()
[docs] def flush(self): """Flushes the current queue by notifying the :func:`sender` via the :func:`flush_notification` event. """ self._flush_notification.set() if self.sender: self.sender.start()