Source code for applicationinsights.channel.AsynchronousSender
from .SenderBase import SenderBase, DEFAULT_ENDPOINT_URL
from threading import Lock, Thread
[docs]class AsynchronousSender(SenderBase):
"""An asynchronous sender that works in conjunction with the :class:`AsynchronousQueue`. The sender object will
start a worker thread that will pull items from the :func:`queue`. The thread will be created when the client
calls :func:`start` and will check for queue items every :func:`send_interval` seconds. The worker thread can
also be forced to check the queue by setting the :func:`flush_notification` event.
- If no items are found, the thread will go back to sleep.
- If items are found, the worker thread will send items to the specified service in batches of :func:`send_buffer_size`.
If no queue items are found for :func:`send_time` seconds, the worker thread will shut down (and :func:`start` will
need to be called again).
"""
def __init__(self, service_endpoint_uri=None):
"""Initializes a new instance of the class.
Args:
sender (String) service_endpoint_uri the address of the service to send telemetry data to.
"""
self._send_interval = 1.0
self._send_remaining_time = 0
self._send_time = 3.0
self._lock_send_remaining_time = Lock()
SenderBase.__init__(self, service_endpoint_uri or DEFAULT_ENDPOINT_URL)
@property
def send_interval(self):
"""The time span in seconds at which the the worker thread will check the :func:`queue` for items (defaults to: 1.0).
Args:
value (int) the interval in seconds.
Returns:
int. the interval in seconds.
"""
return self._send_interval
@send_interval.setter
def send_interval(self, value):
"""The time span in seconds at which the the worker thread will check the :func:`queue` for items (defaults to: 1.0).
Args:
value (int) the interval in seconds.
Returns:
int. the interval in seconds.
"""
self._send_interval = value
@property
def send_time(self):
"""The time span in seconds at which the the worker thread will check the :func:`queue` for items (defaults to: 1.0).
Args:
value (int) the interval in seconds.
Returns:
int. the interval in seconds.
"""
return self._send_time
@send_time.setter
def send_time(self, value):
"""The time span in seconds at which the the worker thread will check the :func:`queue` for items (defaults to: 1.0).
Args:
value (int) the interval in seconds.
Returns:
int. the interval in seconds.
"""
self._send_time = value
[docs] def start(self):
"""Starts a new sender thread if none is not already there
"""
with self._lock_send_remaining_time:
if self._send_remaining_time <= 0.0:
local_send_interval = self._send_interval
if self._send_interval < 0.1:
local_send_interval = 0.1
self._send_remaining_time = self._send_time
if self._send_remaining_time < local_send_interval:
self._send_remaining_time = local_send_interval
thread = Thread(target=self._run)
thread.daemon = True
thread.start()
[docs] def stop(self):
"""Gracefully stops the sender thread if one is there.
"""
with self._lock_send_remaining_time:
self._send_remaining_time = 0.0
def _run(self):
# save the queue locally
local_queue = self._queue
if not local_queue:
self.stop()
return
# fix up the send interval (can't be lower than 100ms)
local_send_interval = self._send_interval
if self._send_interval < 0.1:
local_send_interval = 0.1
local_send_time = self._send_time
if local_send_time < local_send_interval:
local_send_time = local_send_interval
while True:
while True:
# get at most send_buffer_size items from the queue
counter = self._send_buffer_size
data = []
while counter > 0:
item = local_queue.get()
if not item:
break
data.append(item)
counter -= 1
# if we didn't get any items from the queue, we're done here
if len(data) == 0:
break
# reset the send time
with self._lock_send_remaining_time:
self._send_remaining_time = local_send_time
# finally send the data
self.send(data)
# wait at most send_interval (or until we get signalled)
result = local_queue.flush_notification.wait(local_send_interval)
if result:
local_queue.flush_notification.clear()
continue
# decrement the remaining time
local_remaining_time = 0
with self._lock_send_remaining_time:
self._send_remaining_time -= local_send_interval
local_remaining_time = self._send_remaining_time
if local_remaining_time <= 0:
break