Source code for applicationinsights.requests.WSGIApplication

import datetime
import re
import uuid
import applicationinsights


[docs]class WSGIApplication(object): """ This class represents a WSGI wrapper that enables request telemetry for existing WSGI applications. The request telemetry will be sent to Application Insights service using the supplied instrumentation key. .. code:: python from applicationinsights.requests import WSGIApplication from paste.httpserver import serve from pyramid.response import Response from pyramid.view import view_config @view_config() def hello(request): return Response('Hello') if __name__ == '__main__': from pyramid.config import Configurator config = Configurator() config.scan() app = config.make_wsgi_app() # Enable Application Insights middleware app = WSGIApplication('<YOUR INSTRUMENTATION KEY GOES HERE>', app, common_properties={'service': 'hello_world_service'}) serve(app, host='0.0.0.0') """ def __init__(self, instrumentation_key, wsgi_application, *args, **kwargs): """ Initialize a new instance of the class. Args: instrumentation_key (str). the instrumentation key to use while sending telemetry to the service.\n wsgi_application (func). the WSGI application that we're wrapping. """ if not instrumentation_key: raise Exception( 'Instrumentation key was required but not provided') if not wsgi_application: raise Exception('WSGI application was required but not provided') telemetry_channel = kwargs.pop('telemetry_channel', None) if not telemetry_channel: sender = applicationinsights.channel.AsynchronousSender() queue = applicationinsights.channel.AsynchronousQueue(sender) telemetry_channel = applicationinsights.channel.TelemetryChannel( None, queue) self.client = applicationinsights.TelemetryClient( instrumentation_key, telemetry_channel) self.client.context.device.type = "PC" self._wsgi_application = wsgi_application self._common_properties = kwargs.pop('common_properties', {})
[docs] def flush(self): """Flushes the queued up telemetry to the service. """ self.client.flush()
def __call__(self, environ, start_response): """Callable implementation for WSGI middleware. Args: environ (dict). a dictionary containing all WSGI environment properties for this request.\n start_response (func). a function used to store the status, HTTP headers to be sent to the client and optional exception information. Returns: (obj). the response to send back to the client. """ start_time = datetime.datetime.utcnow() name = environ.get('PATH_INFO') or '/' closure = {'status': '200 OK'} http_method = environ.get('REQUEST_METHOD', 'GET') self.client.context.operation.id = str(uuid.uuid4()) # operation.parent_id ought to be the request id (not the operation id, but we don't have it yet) self.client.context.operation.name = http_method + ' ' + name try: self.client.context.location.ip = environ['HTTP_X_FORWARDED_FOR'].split(',')[-1].strip() except KeyError: try: self.client.context.location.ip = environ['REMOTE_ADDR'] except KeyError: pass def status_interceptor(status_string, headers_array, exc_info=None): closure['status'] = status_string start_response(status_string, headers_array, exc_info) for part in self._wsgi_application(environ, status_interceptor): yield part success = True response_match = re.match(r'\s*(?P<code>\d+)', closure['status']) if response_match: response_code = response_match.group('code') if int(response_code) >= 400: success = False else: response_code = closure['status'] success = False url = name query_string = environ.get('QUERY_STRING') if query_string: url += '?' + query_string scheme = environ.get('wsgi.url_scheme', 'http') host = environ.get('HTTP_HOST', environ.get('SERVER_NAME', 'unknown')) url = scheme + '://' + host + url end_time = datetime.datetime.utcnow() duration = int((end_time - start_time).total_seconds() * 1000) self.client.track_request(name, url, success, start_time.isoformat( ) + 'Z', duration, response_code, http_method, self._common_properties)