Source code for applicationinsights.channel.TelemetryChannel
import datetime
import sys
from .SynchronousQueue import SynchronousQueue
from .SynchronousSender import SynchronousSender
from .TelemetryContext import TelemetryContext
from applicationinsights.channel import contracts
platform_moniker = 'py2'
if sys.version_info >= (3, 0):
platform_moniker = 'py3'
# set up internal context
internal_context = contracts.Internal()
internal_context.sdk_version = platform_moniker + ':0.12.0'
[docs]class TelemetryChannel(object):
"""The telemetry channel is responsible for constructing a :class:`contracts.Envelope` object from the passed in
data and specified telemetry context.
.. code:: python
from application_insights.channel import TelemetryChannel, contracts
channel = TelemetryChannel()
event = contracts.EventData()
event.name = 'My event'
channel.write(event)
"""
def __init__(self, context=None, queue=None):
"""Initializes a new instance of the class.
Args:
context (:class:`TelemetryContext') the telemetry context to use when sending telemetry data.\n
queue (:class:`QueueBase`) the queue to enqueue the resulting :class:`contracts.Envelope` to.
"""
self._context = context or TelemetryContext()
self._queue = queue or SynchronousQueue(SynchronousSender())
@property
def context(self):
"""The context associated with this channel. All :class:`contracts.Envelope` objects created by this channel
will use this value if it's present or if none is specified as part of the :func:`write` call.
Returns:
(:class:`TelemetryContext`). the context instance (defaults to: TelemetryContext())
"""
return self._context
@property
def queue(self):
"""The queue associated with this channel. All :class:`contracts.Envelope` objects created by this channel
will be pushed to this queue.
Returns:
(:class:`QueueBase`). the queue instance (defaults to: SynchronousQueue())
"""
return self._queue
@property
def sender(self):
"""The sender associated with this channel. This instance will be used to transmit telemetry to the service.
Returns:
(:class:`SenderBase`). the sender instance (defaults to: SynchronousSender())
"""
return self._queue.sender
[docs] def flush(self):
"""Flushes the enqueued data by calling :func:`flush` on :func:`queue`.
"""
self._queue.flush()
[docs] def write(self, data, context=None):
"""Enqueues the passed in data to the :func:`queue`. If the caller specifies a context as well, it will
take precedence over the instance in :func:`context`.
Args:
data (object). data the telemetry data to send. This will be wrapped in an :class:`contracts.Envelope`
before being enqueued to the :func:`queue`.
context (:class:`TelemetryContext`). context the override context to use when constructing the
:class:`contracts.Envelope`.
"""
local_context = context or self._context
if not local_context:
raise Exception('Context was required but not provided')
if not data:
raise Exception('Data was required but not provided')
envelope = contracts.Envelope()
envelope.name = data.ENVELOPE_TYPE_NAME
envelope.time = datetime.datetime.utcnow().isoformat() + 'Z'
envelope.ikey = local_context.instrumentation_key
tags = envelope.tags
for prop_context in [self._context, context]:
if not prop_context:
continue
for key, value in self._write_tags(prop_context):
tags[key] = value
envelope.data = contracts.Data()
envelope.data.base_type = data.DATA_TYPE_NAME
for prop_context in [context, self._context]:
if not prop_context:
continue
if hasattr(data, 'properties') and prop_context.properties:
properties = data.properties
for key in prop_context.properties:
if key not in properties:
properties[key] = prop_context.properties[key]
envelope.data.base_data = data
self._queue.put(envelope)
def _write_tags(self, context):
for item in [internal_context,
context.device, context.cloud, context.application, context.user,
context.session, context.location, context.operation]:
if not item:
continue
for pair in item.write().items():
yield pair