Source code for analog.formats
"""Analog log format definitions."""
from __future__ import (absolute_import, division, print_function,
unicode_literals)
from collections import namedtuple
import re
import weakref
from analog.exceptions import InvalidFormatExpressionError
[docs]class LogFormat:
"""Log format definition.
Represents log format recognition patterns by name.
A name:format mapping of all defined log format patterns can be retrieved
using :py:meth:`analog.formats.LogFormat.all_formats`.
Each log format should at least define the following match groups:
* ``timestamp``: Local time.
* ``verb``: HTTP verb (GET, POST, PUT, ...).
* ``path``: Request path.
* ``status``: Response status code.
* ``body_bytes_sent``: Body size in bytes.
* ``request_time``: Request time.
* ``upstream_response_time``: Upstream response time.
"""
#: pool of all predefined log formats
__formats_ = {}
#: required pattern groups
_required_attributes = ('timestamp', 'verb', 'path', 'status',
'body_bytes_sent', 'request_time',
'upstream_response_time')
[docs] def __init__(self, name, pattern, time_format):
"""Describe log format.
The format ``pattern`` is a (verbose) regex pattern string specifying
the log entry attributes as named groups that is compiled into a
:py:class:`re.Pattern` object.
All pattern group names are be available as attributes of log entries
when using a :py:meth:`analog.formats.LogEntry.entry`.
:param name: log format name.
:type name: ``str``
:param pattern: regular expression pattern string.
:type pattern: raw ``str``
:param time_format: timestamp parsing pattern.
:type time_format: ``str``
:raises: :py:class:`analog.exceptions.InvalidFormatExpressionError` if
missing required format pattern groups or the pattern is not a valid
regular expression.
"""
self.__formats_[name] = weakref.ref(self)
self.name = name
try:
self.pattern = re.compile(pattern, re.UNICODE | re.VERBOSE)
except re.error:
raise InvalidFormatExpressionError("Invalid regex in format.")
attributes = self.pattern.groupindex.keys()
for attr in self._required_attributes:
if attr not in attributes:
raise InvalidFormatExpressionError(
"Format pattern must at least define the groups: "
"{0}.".format(", ".join(self._required_attributes)))
self.time_format = time_format
self._entry = namedtuple(
'LogEntry{0}'.format(name.title()),
sorted(self.pattern.groupindex, key=self.pattern.groupindex.get))
[docs] def entry(self, match):
"""Convert regex match object to log entry object.
:param match: regex match object from ``pattern`` match.
:type match: :py:class:`re.MatchObject`
:returns: log entry object with all pattern keys as attributes.
:rtype: :py:class:`collections.namedtuple`
"""
return self._entry(**match.groupdict())
@classmethod
[docs] def all_formats(cls):
"""Mapping of all defined log format patterns.
:returns: dictionary of name:``LogFormat`` instances.
:rtype: ``dict``
"""
formats = {}
for name, ref in cls.__formats_.items():
instance = ref()
if instance is not None:
formats[name] = instance
return formats
NGINX = LogFormat('nginx', r'''
^(?P<remote_addr>\S+)\s-\s # Remote address
(?P<remote_user>\S+)\s # Remote user
\[(?P<timestamp>.*?)\]\s # Local time
" # Request
(?P<verb>[A-Z]+)\s # HTTP verb (GET, POST, PUT, ...)
(?P<path>[^?]+) # Request path
(?:\?.+)? # Query string
\sHTTP/(?:[\d.]+) # HTTP/x.x protocol
"\s # /Request
(?P<status>\d+?)\s # Response status code
(?P<body_bytes_sent>\d+?)\s # Body size in bytes
"(?P<http_referer>[^"]+?)"\s # Referer header
"(?P<http_user_agent>[^"]+?)"\s # User-Agent header
"(?P<http_x_forwarded_for>[^"]+?)"\s # X-Forwarded-For header
(?P<request_time>[\d\.]+)\s # Request time
(?P<upstream_response_time>[\d\.]+)\s? # Upstream response time
(?P<pipe>\S+)?$ # Pipelined request
''', time_format='%d/%b/%Y:%H:%M:%S +0000')
"""Nginx ``combinded_timed`` format::
'$remote_addr - $remote_user [$time_local] "$request" '
'$status $body_bytes_sent "$http_referer" '
'"$http_user_agent" "$http_x_forwarded_for" '
'$request_time $upstream_response_time $pipe';
"""