"""Analog analysis module."""
from __future__ import (absolute_import, division, print_function,
unicode_literals)
import datetime
from analog.exceptions import MissingFormatError
from analog.formats import LogFormat
from analog.report import Report
#: Default verbs to monitor if unconfigured.
DEFAULT_VERBS = ['DELETE', 'GET', 'PATCH', 'POST', 'PUT']
#: Default status codes to monitor if unconfigured.
DEFAULT_STATUS_CODES = [1, 2, 3, 4, 5]
#: Default paths (all) to monitor if unconfigured.
DEFAULT_PATHS = []
[docs]class Analyzer:
"""Log analysis utility.
Scan a logfile for logged requests and analyze calculate statistical
metrics in a :py:class:`analog.report.Report`.
"""
[docs] def __init__(self, log, format, pattern=None, time_format=None,
verbs=DEFAULT_VERBS, status_codes=DEFAULT_STATUS_CODES,
paths=DEFAULT_PATHS, max_age=None, path_stats=False):
"""Configure log analyzer.
:param log: handle on logfile to read and analyze.
:type log: :py:class:`io.TextIOWrapper`
:param format: log format identifier or 'custom'.
:type format: ``str``
:param pattern: custom log format pattern expression.
:type pattern: ``str``
:param time_format: log entry timestamp format (strftime compatible).
:type time_format: ``str``
:param verbs: HTTP verbs to be tracked.
Defaults to :py:data:`analog.analyzer.DEFAULT_VERBS`.
:type verbs: ``list``
:param status_codes: status_codes to be tracked. May be prefixes,
e.g. ["100", "2", "3", "4", "404" ].
Defaults to :py:data:`analog.analyzer.DEFAULT_STATUS_CODES`.
:type status_codes: ``list``
:param paths: Paths to explicitly analyze. If not defined, paths are
detected automatically.
Defaults to :py:data:`analog.analyzer.DEFAULT_PATHS`.
:type paths: ``list`` of ``str``
:param max_age: Max. age of log entries to analyze in minutes.
Unlimited by default.
:type max_age: ``int``
:raises: :py:class:`analog.exceptions.MissingFormatError` if no
``format`` is specified.
"""
self._log = log
formats = LogFormat.all_formats()
if format in formats:
self._format = formats[format]
elif format == 'custom':
self._format = LogFormat('custom',
pattern=pattern,
time_format=time_format)
else:
raise MissingFormatError(
"Require log format. Specify format name or custom regex "
"pattern and timestamp format.")
self._verbs = verbs
self._status_codes = status_codes
self._pathconf = paths
self._max_age = max_age
# execution time
self.execution_time = None
def _monitor_path(self, path):
"""Convert full request path to monitored path.
If no path groups are configured to be monitored, all full paths are.
:param path: the full request path.
:type path: ``str``
:returns: the monitored path (part of ``path``) or ``None`` if not
monitored.
:rtype: ``str`` or ``None``
"""
if not self._pathconf:
return path
for monitored in self._pathconf:
if path.startswith(monitored):
return monitored
return None
def _timestamp(self, time_str):
"""Convert timestamp strings from nginx to datetime objects.
Format is "15/Jan/2014:14:12:50 +0000".
:returns: request timestamp datetime.
:rtype: :py:class:`datetime.datetime`
"""
return datetime.datetime.strptime(time_str, self._format.time_format)
[docs] def __call__(self):
"""Analyze defined logfile.
:returns: log analysis report object.
:rtype: :py:class:`analog.report.Report`
"""
if self._max_age is not None:
self._now = datetime.datetime.now()
self._now = self._now.replace(second=0, microsecond=0)
self._min_time = (
self._now - datetime.timedelta(minutes=self._max_age))
report = Report(self._verbs, self._status_codes)
# read lines from logfile for the last max_age minutes
for line in self._log:
# parse line
match = self._format.pattern.search(line)
if match is None:
continue
log_entry = self._format.entry(match)
if self._max_age is not None:
# don't process anything older than max_age
timestamp = self._timestamp(log_entry.timestamp)
if timestamp < self._min_time:
continue
# stop processing when now was reached
if timestamp > self._now:
break
# parse request
path = self._monitor_path(log_entry.path)
if path is None:
continue
# collect the numbers
report.add(
path=path,
verb=log_entry.verb,
status=int(log_entry.status),
time=float(log_entry.request_time),
upstream_time=float(log_entry.upstream_response_time),
body_bytes=int(log_entry.body_bytes_sent))
# end timestamp
report.finish()
return report
[docs]def analyze(log, format, pattern=None, time_format=None,
verbs=DEFAULT_VERBS, status_codes=DEFAULT_STATUS_CODES,
paths=DEFAULT_PATHS, max_age=None, path_stats=False, timing=False,
output_format=None):
"""Convenience wrapper around :py:class:`analog.analyzer.Analyzer`.
:param log: handle on logfile to read and analyze.
:type log: :py:class:`io.TextIOWrapper`
:param format: log format identifier or 'custom'.
:type format: ``str``
:param pattern: custom log format pattern expression.
:type pattern: ``str``
:param time_format: log entry timestamp format (strftime compatible).
:type time_format: ``str``
:param verbs: HTTP verbs to be tracked.
Defaults to :py:data:`analog.analyzer.DEFAULT_VERBS`.
:type verbs: ``list``
:param status_codes: status_codes to be tracked. May be prefixes,
e.g. ["100", "2", "3", "4", "404" ].
Defaults to :py:data:`analog.analyzer.DEFAULT_STATUS_CODES`.
:type status_codes: ``list``
:param paths: Paths to explicitly analyze. If not defined, paths are
detected automatically.
Defaults to :py:data:`analog.analyzer.DEFAULT_PATHS`.
:type paths: ``list`` of ``str``
:param max_age: Max. age of log entries to analyze in minutes.
Unlimited by default.
:type max_age: ``int``
:param path_stats: Print per-path analysis report. Default off.
:type path_stats: ``bool``
:param timing: print analysis timing information?
:type timing: ``bool``
:param output_format: report output format.
:type output_format: ``str``
:returns: log analysis report object.
:rtype: :py:class:`analog.report.Report`
"""
analyzer = Analyzer(log=log, format=format,
pattern=pattern, time_format=time_format,
verbs=verbs, status_codes=status_codes,
paths=paths, max_age=max_age, path_stats=path_stats)
report = analyzer()
# print timing information
if timing and report.execution_time:
print("Analyzed logs in {:.3f}s.\n".format(report.execution_time))
# print report in requested output format
print(report.render(path_stats=path_stats, output_format=output_format))
return report