Skip to content

Instantly share code, notes, and snippets.

@georgevreilly
Last active June 15, 2023 14:45
Show Gist options
  • Save georgevreilly/5762777 to your computer and use it in GitHub Desktop.
Save georgevreilly/5762777 to your computer and use it in GitHub Desktop.

Revisions

  1. georgevreilly revised this gist Oct 6, 2019. 1 changed file with 1 addition and 0 deletions.
    1 change: 1 addition & 0 deletions sessionrecorder.py
    Original file line number Diff line number Diff line change
    @@ -90,6 +90,7 @@ def parse_request_headers(self, environ):

    def request_body(self, environ):
    content_length = environ.get('CONTENT_LENGTH')
    body = ''
    if content_length:
    if content_length == '-1':
    # This is a special case, where the content length is basically undetermined
  2. georgevreilly created this gist Jun 12, 2013.
    133 changes: 133 additions & 0 deletions sessionrecorder.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,133 @@
    #!/usr/bin/env python
    # -*- coding: utf-8 -*-

    """
    WSGI middleware to record requests and responses.
    """

    from __future__ import print_function, unicode_literals

    import logging
    import time

    # Required to be loaded early to avoid hitting deadlock situation when processing requests
    # See http://code.google.com/p/modwsgi/wiki/ApplicationIssues (at the bottom, under Non Blocking Module Imports)
    import _strptime

    import itertools
    from cStringIO import StringIO

    class RequestResponseState(object):
    """Capture the data for a request-response."""
    def __init__(self, id, method, url, request_headers, content_length, request_body):
    self.request_id = id
    self.method = method
    self.url = url
    self.request_headers = request_headers
    self.content_length = content_length
    self.request_body = request_body
    self.status = -1
    self.response_headers = None
    self.response_chunks = None
    self.duration_msecs = 0
    self.started_at = time.time()

    def start_response(self, status, response_headers):
    self.status = status
    self.response_headers = response_headers

    def finish_response(self, response_chunks):
    self.duration_msecs = 1000.0 * (time.time() - self.started_at)
    self.response_chunks = response_chunks
    return response_chunks

    class SessionRecorderMiddleware(object):
    """WSGI Middleware for recording of request-response"""
    def __init__(self, app, recorder):
    self.app = app
    self.recorder = recorder
    self.request_counter = itertools.count().next # Threadsafe counter

    def __call__(self, environ, start_response):
    state = RequestResponseState(
    self.request_counter(),
    environ['REQUEST_METHOD'],
    self.request_url(environ),
    [(k, v) for k,v in self.parse_request_headers(environ)],
    *self.request_body(environ)
    )

    def _start_response(status, response_headers, *args):
    # Capture status and response_headers for later processing
    state.start_response(status, response_headers)
    return start_response(status, response_headers, *args)

    response_chunks = state.finish_response( self.app(environ, _start_response) )
    self.recorder(state)

    # return data to WSGI server
    return response_chunks

    def request_url(self, environ):
    return '{0}{1}{2}'.format(
    environ.get('SCRIPT_NAME', ''),
    environ.get('PATH_INFO', ''),
    '?' + environ['QUERY_STRING'] if environ.get('QUERY_STRING') else '',
    )

    _parse_headers_special = {
    'HTTP_CGI_AUTHORIZATION': 'Authorization',
    'CONTENT_LENGTH': 'Content-Length',
    'CONTENT_TYPE': 'Content-Type',
    }

    def parse_request_headers(self, environ):
    for cgi_var, value in environ.iteritems():
    if cgi_var in self._parse_headers_special:
    yield self._parse_headers_special[cgi_var], value
    elif cgi_var.startswith('HTTP_'):
    yield cgi_var[5:].title().replace('_', '-'), value

    def request_body(self, environ):
    content_length = environ.get('CONTENT_LENGTH')
    if content_length:
    if content_length == '-1':
    # This is a special case, where the content length is basically undetermined
    body = environ['wsgi.input'].read(-1)
    content_length = len(body)
    else:
    content_length = int(content_length)
    body = environ['wsgi.input'].read(content_length)
    environ['wsgi.input'] = StringIO(body) # reset request body for the nested app
    else:
    content_length = 0
    return content_length, body

    def is_binary_content_type(content_type):
    type_subtype = content_type.split(';')
    _type, subtype = type_subtype.split('/')
    if _type == 'text':
    return False
    elif _type == 'application':
    return subtype not in (
    'atom+xml', 'ecmascript', 'json', 'javascript', 'rss+xml', 'soap+xml', 'xhtml+xml')
    else:
    return True

    def log_results(state):
    # TODO: create an HttpArchive
    data = [
    'SR: {0}'.format(state.request_id),
    '{0} {1}'.format(state.method, state.url),
    str(state.request_headers),
    # TODO: sanitize binary request body => look at request Content-Type
    '{0} bytes: {1}'.format(state.content_length, state.request_body or '<EMPTY>'),
    '=> {0} :: {1:.3f} ms :: {2}'.format(
    state.status, state.duration_msecs, str(state.response_headers)),
    ] + (
    # TODO: sanitize binary response body => look at response Content-Type
    state.response_chunks
    ) + ['========']
    logging.info('\n'.join(data))

    # TODO: unit tests