Source code for applicationinsights.logging.LoggingHandler

import logging
import applicationinsights
from weakref import WeakValueDictionary
from applicationinsights.channel import AsynchronousSender, AsynchronousQueue
from applicationinsights.channel import SynchronousSender, SynchronousQueue
from applicationinsights.channel import TelemetryChannel

enabled_instrumentation_keys = WeakValueDictionary()


[docs]def enable(instrumentation_key, *args, **kwargs): """Enables the Application Insights logging handler for the root logger for the supplied instrumentation key. Multiple calls to this function with different instrumentation keys result in multiple handler instances. .. code:: python import logging from applicationinsights.logging import enable # set up logging enable('<YOUR INSTRUMENTATION KEY GOES HERE>') # log something (this will be sent to the Application Insights service as a trace) logging.info('This is a message') # logging shutdown will cause a flush of all un-sent telemetry items # alternatively set up an async channel via enable('<YOUR INSTRUMENTATION KEY GOES HERE>', async_=True) Args: instrumentation_key (str). the instrumentation key to use while sending telemetry to the service. Keyword Args: async_ (bool): Whether to use an async channel for the telemetry. Defaults to False. endpoint (str): The custom endpoint to which to send the telemetry. Defaults to None. level (Union[int, str]): The level to set for the logger. Defaults to INFO. Returns: :class:`ApplicationInsightsHandler`. the newly created or existing handler. """ if not instrumentation_key: raise Exception('Instrumentation key was required but not provided') if instrumentation_key in enabled_instrumentation_keys: logging.getLogger().removeHandler( enabled_instrumentation_keys[instrumentation_key]) async_ = kwargs.pop('async_', False) endpoint = kwargs.pop('endpoint', None) telemetry_channel = kwargs.get('telemetry_channel') if telemetry_channel and async_: raise Exception('Incompatible arguments async_ and telemetry_channel') if telemetry_channel and endpoint: raise Exception( 'Incompatible arguments endpoint and telemetry_channel') if not telemetry_channel: if async_: sender, queue = AsynchronousSender, AsynchronousQueue else: sender, queue = SynchronousSender, SynchronousQueue kwargs['telemetry_channel'] = TelemetryChannel( queue=queue(sender(endpoint))) log_level = kwargs.pop('level', logging.INFO) handler = LoggingHandler(instrumentation_key, *args, **kwargs) handler.setLevel(log_level) enabled_instrumentation_keys[instrumentation_key] = handler logging.getLogger().addHandler(handler) return handler
[docs]class LoggingHandler(logging.Handler): """This class represents an integration point between Python's logging framework and the Application Insights service. Logging records are sent to the service either as simple Trace telemetry or as Exception telemetry (in the case of exception information being available). .. code:: python import logging from applicationinsights.logging import ApplicationInsightsHandler # set up logging handler = ApplicationInsightsHandler('<YOUR INSTRUMENTATION KEY GOES HERE>') logging.basicConfig(handlers=[ handler ], format='%(levelname)s: %(message)s', level=logging.DEBUG) # log something (this will be sent to the Application Insights service as a trace) logging.info('This is a message') # logging shutdown will cause a flush of all un-sent telemetry items # alternatively flush manually via handler.flush() """ def __init__(self, instrumentation_key, *args, **kwargs): """ Initialize a new instance of the class. Args: instrumentation_key (str). the instrumentation key to use while sending telemetry to the service. """ if not instrumentation_key: raise Exception( 'Instrumentation key was required but not provided') telemetry_channel = kwargs.get('telemetry_channel') if 'telemetry_channel' in kwargs: del kwargs['telemetry_channel'] self.client = applicationinsights.TelemetryClient( instrumentation_key, telemetry_channel) super(LoggingHandler, self).__init__(*args, **kwargs)
[docs] def flush(self): """Flushes the queued up telemetry to the service. """ self.client.flush() return super(LoggingHandler, self).flush()
[docs] def emit(self, record): """Emit a record. If a formatter is specified, it is used to format the record. If exception information is present, an Exception telemetry object is sent instead of a Trace telemetry object. Args: record (:class:`logging.LogRecord`). the record to format and send. """ # the set of properties that will ride with the record properties = { 'process': record.processName, 'module': record.module, 'fileName': record.filename, 'lineNumber': record.lineno, 'level': record.levelname, } # if we have exec_info, we will use it as an exception if record.exc_info: self.client.track_exception( *record.exc_info, properties=properties) return # if we don't simply format the message and send the trace formatted_message = self.format(record) self.client.track_trace( formatted_message, properties=properties, severity=record.levelname)