Source code for analog.renderers

"""Analog log report renderers."""
from __future__ import (absolute_import, division, print_function,
                        unicode_literals)
import abc
import csv
try:
    from cStringIO import StringIO
except ImportError:
    from io import StringIO
import textwrap

from tabulate import tabulate

from analog.exceptions import UnknownRendererError
from analog.utils import PrefixMatchingCounter


def find_subclasses(cls, _seen=None):
    """Find all subclasses (recursively) of ``cls``.

    :param cls: class object.
    :param _seen: set of already found classes if called recursively.
    :returns: generator of ``cls`` subclasses.
    :rtype: ``generator``

    """
    if _seen is None:
        _seen = set()
    for subclass in cls.__subclasses__():
        if subclass not in _seen:
            _seen.add(subclass)
            yield subclass
            for subclass in find_subclasses(subclass, _seen):
                yield subclass


def add_metaclass(metaclass):
    """From six: Class decorator for creating a class with a metaclass."""
    def wrapper(cls):
        orig_vars = cls.__dict__.copy()
        orig_vars.pop('__dict__', None)
        orig_vars.pop('__weakref__', None)
        return metaclass(cls.__name__, cls.__bases__, orig_vars)
    return wrapper


@add_metaclass(abc.ABCMeta)
[docs]class Renderer(object): """Base report renderer interface.""" name = None @abc.abstractmethod
[docs] def render(self, report, path_stats=False): """Render report statistics. :param report: log analysis report object. :type report: :py:class:`analog.report.Report` :param path_stats: include per path statistics in output. :type path_stats: ``bool`` :returns: output string :rtype: `str` """
@classmethod
[docs] def all_renderers(cls): """Get a mapping of all defined report renderer names. :returns: dictionary of name to renderer class. :rtype: ``dict`` """ return {subclass.name: subclass for subclass in find_subclasses(cls) if subclass.name is not None}
@classmethod
[docs] def by_name(cls, name): """Select specific ``Renderer`` subclass by name. :param name: name of subclass. :type name: ``str`` :returns: ``Renderer`` subclass instance. :rtype: :py:class:`analog.renderers.Renderer` :raises: :py:class:`analog.exceptions.UnknownRendererError` for unknown subclass names. """ renderers = cls.all_renderers() if name in renderers: return renderers[name]() raise UnknownRendererError(name)
[docs]class PlainTextRenderer(Renderer): """Default renderer for plain text output in list format.""" name = "plain" def render(self, report, path_stats=False): """ Render overall analysis summary report. :returns: output string :rtype: `str` """ output = textwrap.dedent("""\ Requests: {self.requests} HTTP Verbs: {verbs} Status Codes: {status} Path Requests: {paths} Times [s]: {times} Upstream Times [s]: {upstream_times} Body Bytes Sent [B]: {body_bytes} """).format( self=report, verbs=self._indent(self._str_path_counts(report.verbs)), status=self._indent(self._str_path_counts(report.status)), paths=self._indent(self._str_path_counts(report.path_requests)), times=self._indent(self._render_list_stats(report.times)), upstream_times=self._indent( self._render_list_stats(report.upstream_times)), body_bytes=self._indent( self._render_list_stats(report.body_bytes))) if path_stats: output += "\n" + self._render_path_stats(report) return output def _render_path_stats(self, report): """ Render per path analysis summary report. :returns: output string :rtype: `str` """ output = [] for path, verbs, status, times, upstream_times, body_bytes in zip( report.path_verbs.keys(), report.path_verbs.values(), report.path_status.values(), report.path_times.values(), report.path_upstream_times.values(), report.path_body_bytes.values()): output.append(textwrap.dedent("""\ {path} HTTP Verbs: {verbs} Status Codes: {status} Times [s]: {times} Upstream Times [s]: {upstream_times} Body Bytes Sent [B]: {body_bytes} """).format( path=path, verbs=self._indent(self._str_path_counts(verbs), 8), status=self._indent(self._str_path_counts(status), 8), times=self._indent(self._render_list_stats(times), 8), upstream_times=self._indent( self._render_list_stats(upstream_times), 8), body_bytes=self._indent( self._render_list_stats(body_bytes), 8))) return "\n".join(output) def _render_list_stats(self, list_stats): """ Generate pretty representation of list statistics object. :param list_stats: ``ListStats`` instance. :returns: statistic report. :rtype: ``str`` """ return textwrap.dedent("""\ {stats.mean:>10.3f} mean {stats.median:>10.3f} median """).format(stats=list_stats) def _str_path_counts(self, path_counts): """ Render path count. :returns: output string :rtype: `str` """ return "\n".join("{count:>10,} {key}".format( key=key, count=count) for key, count in path_counts) def _indent(self, text, indent=4): """ Render every line after the first line indented. Example:: line1 line2 line3 :returns: output string :rtype: `str` """ lines = [] for idx, line in enumerate(text.splitlines()): space = " " * indent if idx > 0 else "" lines.append(space + line) return "\n".join(lines)
@add_metaclass(abc.ABCMeta)
[docs]class TabularDataRenderer(Renderer): """Base renderer for report output in any tabular form.""" #: field names for ``ListStats`` attributes _stats_fields = ('times', 'upstream_times', 'body_bytes') #: attribute names of ``ListStats`` attributes _list_stats_keys = ("mean", "median")
[docs] def _list_stats(self, list_stats): """Get list of (key,value) tuples for each attribute of ``list_stats``. :param list_stats: list statistics object. :type list_stats: :py:class:`analog.report.ListStats` :returns: (key, value) tuples for each ``ListStats`` attribute. :rtype: ``list`` of ``tuple`` """ return zip(self._list_stats_keys, [list_stats.mean, list_stats.median])
[docs] def _tabular_data(self, report, path_stats): """Prepare tabular data for output. Generate a list of header fields, a list of total values for each field and a list of the same values per path. :param report: log analysis report object. :type report: :py:class:`analog.report.Report` :param path_stats: include per path statistics in output. :type path_stats: ``bool`` :returns: tuple of table (headers, rows). :rtype: ``tuple`` """ # sorted list of all HTTP verbs in this report and their counts verb_names, verb_counts = zip(*sorted( (verb, count) for (verb, count) in report.verbs)) # sorted list of all status codes in this report and their counts status_names, status_counts = zip(*sorted( (str(status), count) for (status, count) in report.status)) # all statistical attributes of the report stats = [(stats_field, self._list_stats(getattr(report, stats_field))) for stats_field in self._stats_fields] stats_names, stats_values = zip(*( ('{0}_{1}'.format(field, analysis), value) for (field, list_stats) in stats for (analysis, value) in list_stats)) status_headers = tuple("status_{code:x<3}".format(code=code) for code in status_names) headers = (("path", "requests") + verb_names + status_headers + stats_names) total = (("total", report.requests) + verb_counts + status_counts + stats_values) rows = [] # include path statistics? if path_stats: # get per path values from report, ordered by path for (path, verbs, status, times, utimes, body_bytes) in zip( report.path_verbs.keys(), report.path_verbs.values(), report.path_status.values(), report.path_times.values(), report.path_upstream_times.values(), report.path_body_bytes.values()): requests = report._path_requests[path] verbs = dict(verbs) status = PrefixMatchingCounter(dict(status)) row = [path, requests] row += [verbs.get(name, 0) for name in verb_names] row += [status.get(name, 0) for name in status_names] row += [time[1] for time in self._list_stats(times)] row += [utime[1] for utime in self._list_stats(utimes)] row += [bbytes[1] for bbytes in self._list_stats(body_bytes)] rows.append(row) rows.append(total) return (list(headers), rows)
@add_metaclass(abc.ABCMeta)
[docs]class ASCIITableRenderer(TabularDataRenderer): """Base renderer for report output in ascii-table format.""" tabulate_format = None def render(self, report, path_stats=False): """Render report statistics using ``tabulate``. :param report: log analysis report object. :type report: :py:class:`analog.report.Report` :param path_stats: include per path statistics in output. :type path_stats: ``bool`` :returns: output string :rtype: `str` """ headers, rows = self._tabular_data(report, path_stats) return tabulate(rows, headers=headers, tablefmt=self.tabulate_format, floatfmt='.3f')
[docs]class SimpleTableRenderer(ASCIITableRenderer): """Renderer for tabular report output in simple reSt table format.""" name = "table" tabulate_format = 'rst'
[docs]class GridTableRenderer(ASCIITableRenderer): """Renderer for tabular report output in grid table format.""" name = "grid" tabulate_format = 'grid'
@add_metaclass(abc.ABCMeta)
[docs]class SeparatedValuesRenderer(TabularDataRenderer): """Base renderer for report output in delimiter-separated values format.""" #: value delimter. E.g. comma or tab. delimiter = None def render(self, report, path_stats): """Render report statistics using a CSV writer. :param report: log analysis report object. :type report: :py:class:`analog.report.Report` :param path_stats: include per path statistics in output. :type path_stats: ``bool`` :returns: output string :rtype: `str` """ headers, rows = self._tabular_data(report, path_stats) try: stream = StringIO(newline='') except TypeError: stream = StringIO() # Python 2.7 does not support newline arg writer = csv.writer(stream, delimiter=str(self.delimiter), lineterminator='\n') writer.writerow(headers) writer.writerows(rows) return stream.getvalue()[:-1] # Do not return last newline
[docs]class CSVRenderer(SeparatedValuesRenderer): """Renderer for report output in comma separated values format.""" name = 'csv' delimiter = ','
[docs]class TSVRenderer(SeparatedValuesRenderer): """Renderer for report output in tab separated values format.""" name = 'tsv' delimiter = '\t'