Top

data_sources.syslog_data module

module that aggregates data from the syslog and provides a set of data tables

# Copyright 2020 James P Goodwin data table package to manage sparse columnar data
""" module that aggregates data from the syslog and provides a set of data tables """
import locale
locale.setlocale(locale.LC_ALL,'')
import sys
import os
import glob
import gzip
import re
from datetime import datetime,timedelta
from data_sources.data_table import DataTable,Column,Cell,blank_type,string_type,float_type,int_type,date_type,format_string,format_float,format_date,format_int,synchronized

class SyslogDataTable( DataTable ):
    """ class that collects a time based aggregation of data from the syslog into a data_table """
    def __init__(self,syslog_glob="/var/log/syslog*",num_hours=24,bucket_hours=1,refresh_minutes=10,start_time=None):
        """ Initialize the SyslogDataTable with a file glob pattern to collect the syslogs on this machine, a timespan to aggregate for, aggregation bucket in hours, a refresh interval for updating in minutes """
        self.syslog_glob = syslog_glob
        self.num_hours = num_hours
        self.bucket_hours = bucket_hours
        self.start_time = start_time
        DataTable.__init__(self,None,
            "Syslog Data: %s for the last %d hours in %d hour buckets, refreshed every %d minutes"%(
            self.syslog_glob,
            self.num_hours,
            self.bucket_hours,
            refresh_minutes),refresh_minutes)
        self.refresh()

    @synchronized
    def refresh( self ):
        """ refresh or rebuild tables """
        if self.start_time:
            year,month,day,hour,minute,second = self.start_time
            current_time = datetime(year,month,day,hour,minute,second)
        else:
            current_time = datetime.now()
        start_time = current_time - timedelta( hours = self.num_hours )
        syslog_files = glob.glob(self.syslog_glob)

        time_column = Column(name="Time Stamps")
        bucket_time = start_time
        idx = 0
        while bucket_time < current_time:
            time_column.put(idx,Cell(date_type,bucket_time,format_date))
            bucket_time = bucket_time + timedelta( hours = self.bucket_hours )
            idx += 1
        time_column.put(idx,Cell(date_type,current_time,format_date))

        def bucket_idx( timestamp ):
            if timestamp < start_time or timestamp > current_time:
                return -1

            for idx in range(time_column.size()):
                if time_column.get(idx).get_value() >= timestamp:
                    return idx
            else:
                return -1

        errors_column = Column(name="Errors by Time")
        warnings_column = Column(name="Warnings by Time")
        messages_column = Column(name="Messages by Time")

        services_column = Column(name="Services")
        errors_service_column = Column(name="Errors by Service")
        warnings_service_column = Column(name="Warnings by Service")
        messages_service_column = Column(name="Messages by Service")

        def service_idx( service ):
            for idx in range(services_column.size()):
                if services_column.get(idx).get_value() == service:
                    return idx
            else:
                return -1

        def put_or_sum( column, idx, value ):
            current_value = 0
            if idx < column.size():
                c = column.get(idx)
                if c.get_type() != blank_type:
                    current_value = int(c.get_value())
            column.put(idx,Cell(int_type,current_value+value,format_int))


        for slf in syslog_files:
            if slf.endswith(".gz"):
                slf_f = gzip.open(slf,"rt",encoding="utf-8")
            else:
                slf_f = open(slf,"r",encoding="utf-8")

            for line in slf_f:
                line = line.strip()
                m = re.match(r"(\w\w\w\s+\d+\s\d\d:\d\d:\d\d)\s[a-z0-9\-]*\s([a-zA-Z0-9\-\_\.]*)[\[\]0-9]*:\s*(.*)",line)
                if m:
                    log_date = re.sub(r"\s+"," ","%d "%current_time.year + m.group(1))
                    log_process = m.group(2)
                    log_message = m.group(3)
                    log_datetime = datetime.strptime(log_date,"%Y %b %d %H:%M:%S")
                    b_idx = bucket_idx( log_datetime )
                    if b_idx >= 0:
                        s_idx = service_idx( log_process )
                        if s_idx < 0:
                            s_idx = services_column.size()
                            services_column.put(s_idx,Cell(string_type,log_process,format_string))
                        put_or_sum(messages_column,b_idx,1)
                        put_or_sum(messages_service_column,s_idx,1)
                        is_error = re.search(r"[Ee]rror|ERROR",log_message)
                        is_warning = re.search(r"[Ww]arning|WARNING",log_message)
                        error_count = 0
                        warning_count = 0
                        if is_error and not is_warning:
                            error_count = 1
                        elif is_warning:
                            warning_count = 1
                        put_or_sum(errors_column,b_idx,error_count)
                        put_or_sum(errors_service_column,s_idx,error_count)
                        put_or_sum(warnings_column,b_idx,warning_count)
                        put_or_sum(warnings_service_column,s_idx,warning_count)

        columns = [time_column,errors_column,warnings_column,messages_column,services_column,
                    errors_service_column,warnings_service_column,messages_service_column]

        for c in columns:
            if self.has_column(c.get_name()):
                self.replace_column(self.map_column(c.get_name()),c)
            else:
                self.add_column(c)

        self.changed()

        DataTable.refresh(self)

Module variables

var blank_type

var date_type

var float_type

var int_type

var string_type

Classes

class SyslogDataTable

class that collects a time based aggregation of data from the syslog into a data_table

class SyslogDataTable( DataTable ):
    """ class that collects a time based aggregation of data from the syslog into a data_table """
    def __init__(self,syslog_glob="/var/log/syslog*",num_hours=24,bucket_hours=1,refresh_minutes=10,start_time=None):
        """ Initialize the SyslogDataTable with a file glob pattern to collect the syslogs on this machine, a timespan to aggregate for, aggregation bucket in hours, a refresh interval for updating in minutes """
        self.syslog_glob = syslog_glob
        self.num_hours = num_hours
        self.bucket_hours = bucket_hours
        self.start_time = start_time
        DataTable.__init__(self,None,
            "Syslog Data: %s for the last %d hours in %d hour buckets, refreshed every %d minutes"%(
            self.syslog_glob,
            self.num_hours,
            self.bucket_hours,
            refresh_minutes),refresh_minutes)
        self.refresh()

    @synchronized
    def refresh( self ):
        """ refresh or rebuild tables """
        if self.start_time:
            year,month,day,hour,minute,second = self.start_time
            current_time = datetime(year,month,day,hour,minute,second)
        else:
            current_time = datetime.now()
        start_time = current_time - timedelta( hours = self.num_hours )
        syslog_files = glob.glob(self.syslog_glob)

        time_column = Column(name="Time Stamps")
        bucket_time = start_time
        idx = 0
        while bucket_time < current_time:
            time_column.put(idx,Cell(date_type,bucket_time,format_date))
            bucket_time = bucket_time + timedelta( hours = self.bucket_hours )
            idx += 1
        time_column.put(idx,Cell(date_type,current_time,format_date))

        def bucket_idx( timestamp ):
            if timestamp < start_time or timestamp > current_time:
                return -1

            for idx in range(time_column.size()):
                if time_column.get(idx).get_value() >= timestamp:
                    return idx
            else:
                return -1

        errors_column = Column(name="Errors by Time")
        warnings_column = Column(name="Warnings by Time")
        messages_column = Column(name="Messages by Time")

        services_column = Column(name="Services")
        errors_service_column = Column(name="Errors by Service")
        warnings_service_column = Column(name="Warnings by Service")
        messages_service_column = Column(name="Messages by Service")

        def service_idx( service ):
            for idx in range(services_column.size()):
                if services_column.get(idx).get_value() == service:
                    return idx
            else:
                return -1

        def put_or_sum( column, idx, value ):
            current_value = 0
            if idx < column.size():
                c = column.get(idx)
                if c.get_type() != blank_type:
                    current_value = int(c.get_value())
            column.put(idx,Cell(int_type,current_value+value,format_int))


        for slf in syslog_files:
            if slf.endswith(".gz"):
                slf_f = gzip.open(slf,"rt",encoding="utf-8")
            else:
                slf_f = open(slf,"r",encoding="utf-8")

            for line in slf_f:
                line = line.strip()
                m = re.match(r"(\w\w\w\s+\d+\s\d\d:\d\d:\d\d)\s[a-z0-9\-]*\s([a-zA-Z0-9\-\_\.]*)[\[\]0-9]*:\s*(.*)",line)
                if m:
                    log_date = re.sub(r"\s+"," ","%d "%current_time.year + m.group(1))
                    log_process = m.group(2)
                    log_message = m.group(3)
                    log_datetime = datetime.strptime(log_date,"%Y %b %d %H:%M:%S")
                    b_idx = bucket_idx( log_datetime )
                    if b_idx >= 0:
                        s_idx = service_idx( log_process )
                        if s_idx < 0:
                            s_idx = services_column.size()
                            services_column.put(s_idx,Cell(string_type,log_process,format_string))
                        put_or_sum(messages_column,b_idx,1)
                        put_or_sum(messages_service_column,s_idx,1)
                        is_error = re.search(r"[Ee]rror|ERROR",log_message)
                        is_warning = re.search(r"[Ww]arning|WARNING",log_message)
                        error_count = 0
                        warning_count = 0
                        if is_error and not is_warning:
                            error_count = 1
                        elif is_warning:
                            warning_count = 1
                        put_or_sum(errors_column,b_idx,error_count)
                        put_or_sum(errors_service_column,s_idx,error_count)
                        put_or_sum(warnings_column,b_idx,warning_count)
                        put_or_sum(warnings_service_column,s_idx,warning_count)

        columns = [time_column,errors_column,warnings_column,messages_column,services_column,
                    errors_service_column,warnings_service_column,messages_service_column]

        for c in columns:
            if self.has_column(c.get_name()):
                self.replace_column(self.map_column(c.get_name()),c)
            else:
                self.add_column(c)

        self.changed()

        DataTable.refresh(self)

Ancestors (in MRO)

Static methods

def __init__(

self, syslog_glob='/var/log/syslog*', num_hours=24, bucket_hours=1, refresh_minutes=10, start_time=None)

Initialize the SyslogDataTable with a file glob pattern to collect the syslogs on this machine, a timespan to aggregate for, aggregation bucket in hours, a refresh interval for updating in minutes

def __init__(self,syslog_glob="/var/log/syslog*",num_hours=24,bucket_hours=1,refresh_minutes=10,start_time=None):
    """ Initialize the SyslogDataTable with a file glob pattern to collect the syslogs on this machine, a timespan to aggregate for, aggregation bucket in hours, a refresh interval for updating in minutes """
    self.syslog_glob = syslog_glob
    self.num_hours = num_hours
    self.bucket_hours = bucket_hours
    self.start_time = start_time
    DataTable.__init__(self,None,
        "Syslog Data: %s for the last %d hours in %d hour buckets, refreshed every %d minutes"%(
        self.syslog_glob,
        self.num_hours,
        self.bucket_hours,
        refresh_minutes),refresh_minutes)
    self.refresh()

def acquire_refresh_lock(

self)

acquire the refresh lock before reading/writing the table state

def acquire_refresh_lock(self):
    """ acquire the refresh lock before reading/writing the table state """
    self.refresh_lock.acquire()

def add_column(

self, *args, **kwargs)

@synchronized
def add_column(self,column):
    idx = len(self.columns)
    column.set_idx(idx)
    if not column.get_name():
        column.set_name("%s_%d"%(self.name,idx))
    self.columns.append(column)
    self.cnames[column.get_name()] = column
    column.set_table(self)

def changed(

self)

notify listeners that this table has been changed

def changed(self):
    """ notify listeners that this table has been changed """
    for f in self.listeners:
        f(self)

def get(

self, *args, **kwargs)

@synchronized
def get(self, row, reference ):
    return self.columns[self.map_column(reference)].get(row)

def get_bounds(

self, *args, **kwargs)

return a tuple (rows,cols) where rows is the maximum number of rows and cols is the maximum number of cols

@synchronized
def get_bounds(self):
    """ return a tuple (rows,cols) where rows is the maximum number of rows and cols is the maximum number of cols """
    cols = len(self.columns)
    rows = -1
    for c in self.columns:
        size = c.size()
        if rows < 0 or size > rows:
            rows = size
    return (rows,cols)

def get_column(

self, *args, **kwargs)

@synchronized
def get_column(self, reference):
    return self.columns[self.map_column(reference)]

def get_columns(

self, *args, **kwargs)

return the list of columns

@synchronized
def get_columns(self):
    """ return the list of columns """
    return self.columns

def get_name(

self)

return the name of the table

def get_name(self):
    """ return the name of the table """
    return self.name

def get_names(

self, *args, **kwargs)

return a list of the names of the columns in order

@synchronized
def get_names(self):
    """ return a list of the names of the columns in order"""
    return [c.get_name() for c in self.columns]

def get_refresh_timestamp(

self)

get the time that the table was last refreshed

def get_refresh_timestamp( self ):
    """ get the time that the table was last refreshed """
    return self.refresh_timestamp

def has_column(

self, *args, **kwargs)

@synchronized
def has_column(self, reference ):
    if type(reference) == str or type(reference) == str:
        return reference in self.cnames
    elif type(reference) == int:
        return idx < len(self.columns)
    else:
        return False

def insert_column(

self, *args, **kwargs)

@synchronized
def insert_column(self,idx,column):
    while idx > len(self.columns):
        self.add_column(blank_column)
    if idx == len(self.columns):
        self.add_column(column)
    else:
        if not column.get_name():
            column.set_name("%s_%d"%(self.name,idx))
        self.columns.insert(idx,column)
        self.cnames[column.get_name()] = column
        column.set_table(self)
        while idx < len(self.columns):
            if column.get_name() == "%s_%d"%(self.name,idx-1):
                column.set_name("%s_%d"%(self.name,idx))
                self.cnames[column.get_name()] = column
            self.columns[idx].set_idx(idx)
            idx += 1

def listen(

self, listen_func)

register for notifications when a change event is raised on this table

def listen(self,listen_func):
    """ register for notifications when a change event is raised on this table """
    self.listeners.append(listen_func)

def map_column(

self, *args, **kwargs)

@synchronized
def map_column(self, reference ):
    if type(reference) == str or type(reference) == str:
        return self.cnames[reference].get_idx()
    elif type(reference) == int:
        return reference
    else:
        raise TypeError("wrong type in mapping")

def perform_refresh(

self)

Thread worker that sleeps and refreshes the data on a schedule

def perform_refresh( self ):
    """ Thread worker that sleeps and refreshes the data on a schedule """
    start_time = time.time()
    while not self.refresh_thread_stop:
        if time.time() - start_time >= self.refresh_minutes*60.0:
            self.refresh()
            start_time = time.time()
        time.sleep(1)

def put(

self, *args, **kwargs)

@synchronized
def put(self, row, reference, value):
    self.columns[self.map_column(reference)].put(row,value)

def refresh(

self, *args, **kwargs)

refresh or rebuild tables

@synchronized
def refresh( self ):
    """ refresh or rebuild tables """
    if self.start_time:
        year,month,day,hour,minute,second = self.start_time
        current_time = datetime(year,month,day,hour,minute,second)
    else:
        current_time = datetime.now()
    start_time = current_time - timedelta( hours = self.num_hours )
    syslog_files = glob.glob(self.syslog_glob)
    time_column = Column(name="Time Stamps")
    bucket_time = start_time
    idx = 0
    while bucket_time < current_time:
        time_column.put(idx,Cell(date_type,bucket_time,format_date))
        bucket_time = bucket_time + timedelta( hours = self.bucket_hours )
        idx += 1
    time_column.put(idx,Cell(date_type,current_time,format_date))
    def bucket_idx( timestamp ):
        if timestamp < start_time or timestamp > current_time:
            return -1
        for idx in range(time_column.size()):
            if time_column.get(idx).get_value() >= timestamp:
                return idx
        else:
            return -1
    errors_column = Column(name="Errors by Time")
    warnings_column = Column(name="Warnings by Time")
    messages_column = Column(name="Messages by Time")
    services_column = Column(name="Services")
    errors_service_column = Column(name="Errors by Service")
    warnings_service_column = Column(name="Warnings by Service")
    messages_service_column = Column(name="Messages by Service")
    def service_idx( service ):
        for idx in range(services_column.size()):
            if services_column.get(idx).get_value() == service:
                return idx
        else:
            return -1
    def put_or_sum( column, idx, value ):
        current_value = 0
        if idx < column.size():
            c = column.get(idx)
            if c.get_type() != blank_type:
                current_value = int(c.get_value())
        column.put(idx,Cell(int_type,current_value+value,format_int))
    for slf in syslog_files:
        if slf.endswith(".gz"):
            slf_f = gzip.open(slf,"rt",encoding="utf-8")
        else:
            slf_f = open(slf,"r",encoding="utf-8")
        for line in slf_f:
            line = line.strip()
            m = re.match(r"(\w\w\w\s+\d+\s\d\d:\d\d:\d\d)\s[a-z0-9\-]*\s([a-zA-Z0-9\-\_\.]*)[\[\]0-9]*:\s*(.*)",line)
            if m:
                log_date = re.sub(r"\s+"," ","%d "%current_time.year + m.group(1))
                log_process = m.group(2)
                log_message = m.group(3)
                log_datetime = datetime.strptime(log_date,"%Y %b %d %H:%M:%S")
                b_idx = bucket_idx( log_datetime )
                if b_idx >= 0:
                    s_idx = service_idx( log_process )
                    if s_idx < 0:
                        s_idx = services_column.size()
                        services_column.put(s_idx,Cell(string_type,log_process,format_string))
                    put_or_sum(messages_column,b_idx,1)
                    put_or_sum(messages_service_column,s_idx,1)
                    is_error = re.search(r"[Ee]rror|ERROR",log_message)
                    is_warning = re.search(r"[Ww]arning|WARNING",log_message)
                    error_count = 0
                    warning_count = 0
                    if is_error and not is_warning:
                        error_count = 1
                    elif is_warning:
                        warning_count = 1
                    put_or_sum(errors_column,b_idx,error_count)
                    put_or_sum(errors_service_column,s_idx,error_count)
                    put_or_sum(warnings_column,b_idx,warning_count)
                    put_or_sum(warnings_service_column,s_idx,warning_count)
    columns = [time_column,errors_column,warnings_column,messages_column,services_column,
                errors_service_column,warnings_service_column,messages_service_column]
    for c in columns:
        if self.has_column(c.get_name()):
            self.replace_column(self.map_column(c.get_name()),c)
        else:
            self.add_column(c)
    self.changed()
    DataTable.refresh(self)

def release_refresh_lock(

self)

release the refresh lock after reading/writing the table state

def release_refresh_lock(self):
    """ release the refresh lock after reading/writing the table state """
    self.refresh_lock.release()

def replace_column(

self, *args, **kwargs)

@synchronized
def replace_column(self,idx,column):
    column.set_idx(idx)
    if not column.get_name():
        column.set_name("%s_%d"%(self.name,idx))
    if idx == len(self.columns):
        self.columns.append(column)
    else:
        del self.cnames[self.columns[idx].get_name()]
        self.columns[idx] = column
    self.cnames[column.get_name()] = column
    column.set_table(self)

def start_refresh(

self)

Start the background refresh thread

def start_refresh( self ):
    """ Start the background refresh thread """
    self.stop_refresh()
    self.refresh_thread = threading.Thread(target=self.perform_refresh)
    self.refresh_thread.start()

def stop_refresh(

self)

Stop the background refresh thread

def stop_refresh( self ):
    """ Stop the background refresh thread """
    self.refresh_thread_stop = True
    if self.refresh_thread and self.refresh_thread.is_alive():
        self.refresh_thread.join()
    self.refresh_thread = None
    self.refresh_thread_stop = False

def unlisten(

self, listen_func)

unregister for notifications when a change event is raised on this table

def unlisten(self,listen_func):
    """ unregister for notifications when a change event is raised on this table """
    self.listeners.remove(listen_func)

Instance variables

var bucket_hours

var num_hours

var start_time

var syslog_glob