data_sources.syslog_data module
module that aggregates data from the syslog and provides a set of data tables
# Copyright 2020 James P Goodwin data table package to manage sparse columnar data """ module that aggregates data from the syslog and provides a set of data tables """ import locale locale.setlocale(locale.LC_ALL,'') import sys import os import glob import gzip import re from datetime import datetime,timedelta from data_sources.data_table import DataTable,Column,Cell,blank_type,string_type,float_type,int_type,date_type,format_string,format_float,format_date,format_int,synchronized class SyslogDataTable( DataTable ): """ class that collects a time based aggregation of data from the syslog into a data_table """ def __init__(self,syslog_glob="/var/log/syslog*",num_hours=24,bucket_hours=1,refresh_minutes=10,start_time=None): """ Initialize the SyslogDataTable with a file glob pattern to collect the syslogs on this machine, a timespan to aggregate for, aggregation bucket in hours, a refresh interval for updating in minutes """ self.syslog_glob = syslog_glob self.num_hours = num_hours self.bucket_hours = bucket_hours self.start_time = start_time DataTable.__init__(self,None, "Syslog Data: %s for the last %d hours in %d hour buckets, refreshed every %d minutes"%( self.syslog_glob, self.num_hours, self.bucket_hours, refresh_minutes),refresh_minutes) self.refresh() @synchronized def refresh( self ): """ refresh or rebuild tables """ if self.start_time: year,month,day,hour,minute,second = self.start_time current_time = datetime(year,month,day,hour,minute,second) else: current_time = datetime.now() start_time = current_time - timedelta( hours = self.num_hours ) syslog_files = glob.glob(self.syslog_glob) time_column = Column(name="Time Stamps") bucket_time = start_time idx = 0 while bucket_time < current_time: time_column.put(idx,Cell(date_type,bucket_time,format_date)) bucket_time = bucket_time + timedelta( hours = self.bucket_hours ) idx += 1 time_column.put(idx,Cell(date_type,current_time,format_date)) def bucket_idx( timestamp ): if timestamp < start_time or timestamp > current_time: return -1 for idx in range(time_column.size()): if time_column.get(idx).get_value() >= timestamp: return idx else: return -1 errors_column = Column(name="Errors by Time") warnings_column = Column(name="Warnings by Time") messages_column = Column(name="Messages by Time") services_column = Column(name="Services") errors_service_column = Column(name="Errors by Service") warnings_service_column = Column(name="Warnings by Service") messages_service_column = Column(name="Messages by Service") def service_idx( service ): for idx in range(services_column.size()): if services_column.get(idx).get_value() == service: return idx else: return -1 def put_or_sum( column, idx, value ): current_value = 0 if idx < column.size(): c = column.get(idx) if c.get_type() != blank_type: current_value = int(c.get_value()) column.put(idx,Cell(int_type,current_value+value,format_int)) for slf in syslog_files: if slf.endswith(".gz"): slf_f = gzip.open(slf,"rt",encoding="utf-8") else: slf_f = open(slf,"r",encoding="utf-8") for line in slf_f: line = line.strip() m = re.match(r"(\w\w\w\s+\d+\s\d\d:\d\d:\d\d)\s[a-z0-9\-]*\s([a-zA-Z0-9\-\_\.]*)[\[\]0-9]*:\s*(.*)",line) if m: log_date = re.sub(r"\s+"," ","%d "%current_time.year + m.group(1)) log_process = m.group(2) log_message = m.group(3) log_datetime = datetime.strptime(log_date,"%Y %b %d %H:%M:%S") b_idx = bucket_idx( log_datetime ) if b_idx >= 0: s_idx = service_idx( log_process ) if s_idx < 0: s_idx = services_column.size() services_column.put(s_idx,Cell(string_type,log_process,format_string)) put_or_sum(messages_column,b_idx,1) put_or_sum(messages_service_column,s_idx,1) is_error = re.search(r"[Ee]rror|ERROR",log_message) is_warning = re.search(r"[Ww]arning|WARNING",log_message) error_count = 0 warning_count = 0 if is_error and not is_warning: error_count = 1 elif is_warning: warning_count = 1 put_or_sum(errors_column,b_idx,error_count) put_or_sum(errors_service_column,s_idx,error_count) put_or_sum(warnings_column,b_idx,warning_count) put_or_sum(warnings_service_column,s_idx,warning_count) columns = [time_column,errors_column,warnings_column,messages_column,services_column, errors_service_column,warnings_service_column,messages_service_column] for c in columns: if self.has_column(c.get_name()): self.replace_column(self.map_column(c.get_name()),c) else: self.add_column(c) self.changed() DataTable.refresh(self)
Module variables
var blank_type
var date_type
var float_type
var int_type
var string_type
Classes
class SyslogDataTable
class that collects a time based aggregation of data from the syslog into a data_table
class SyslogDataTable( DataTable ): """ class that collects a time based aggregation of data from the syslog into a data_table """ def __init__(self,syslog_glob="/var/log/syslog*",num_hours=24,bucket_hours=1,refresh_minutes=10,start_time=None): """ Initialize the SyslogDataTable with a file glob pattern to collect the syslogs on this machine, a timespan to aggregate for, aggregation bucket in hours, a refresh interval for updating in minutes """ self.syslog_glob = syslog_glob self.num_hours = num_hours self.bucket_hours = bucket_hours self.start_time = start_time DataTable.__init__(self,None, "Syslog Data: %s for the last %d hours in %d hour buckets, refreshed every %d minutes"%( self.syslog_glob, self.num_hours, self.bucket_hours, refresh_minutes),refresh_minutes) self.refresh() @synchronized def refresh( self ): """ refresh or rebuild tables """ if self.start_time: year,month,day,hour,minute,second = self.start_time current_time = datetime(year,month,day,hour,minute,second) else: current_time = datetime.now() start_time = current_time - timedelta( hours = self.num_hours ) syslog_files = glob.glob(self.syslog_glob) time_column = Column(name="Time Stamps") bucket_time = start_time idx = 0 while bucket_time < current_time: time_column.put(idx,Cell(date_type,bucket_time,format_date)) bucket_time = bucket_time + timedelta( hours = self.bucket_hours ) idx += 1 time_column.put(idx,Cell(date_type,current_time,format_date)) def bucket_idx( timestamp ): if timestamp < start_time or timestamp > current_time: return -1 for idx in range(time_column.size()): if time_column.get(idx).get_value() >= timestamp: return idx else: return -1 errors_column = Column(name="Errors by Time") warnings_column = Column(name="Warnings by Time") messages_column = Column(name="Messages by Time") services_column = Column(name="Services") errors_service_column = Column(name="Errors by Service") warnings_service_column = Column(name="Warnings by Service") messages_service_column = Column(name="Messages by Service") def service_idx( service ): for idx in range(services_column.size()): if services_column.get(idx).get_value() == service: return idx else: return -1 def put_or_sum( column, idx, value ): current_value = 0 if idx < column.size(): c = column.get(idx) if c.get_type() != blank_type: current_value = int(c.get_value()) column.put(idx,Cell(int_type,current_value+value,format_int)) for slf in syslog_files: if slf.endswith(".gz"): slf_f = gzip.open(slf,"rt",encoding="utf-8") else: slf_f = open(slf,"r",encoding="utf-8") for line in slf_f: line = line.strip() m = re.match(r"(\w\w\w\s+\d+\s\d\d:\d\d:\d\d)\s[a-z0-9\-]*\s([a-zA-Z0-9\-\_\.]*)[\[\]0-9]*:\s*(.*)",line) if m: log_date = re.sub(r"\s+"," ","%d "%current_time.year + m.group(1)) log_process = m.group(2) log_message = m.group(3) log_datetime = datetime.strptime(log_date,"%Y %b %d %H:%M:%S") b_idx = bucket_idx( log_datetime ) if b_idx >= 0: s_idx = service_idx( log_process ) if s_idx < 0: s_idx = services_column.size() services_column.put(s_idx,Cell(string_type,log_process,format_string)) put_or_sum(messages_column,b_idx,1) put_or_sum(messages_service_column,s_idx,1) is_error = re.search(r"[Ee]rror|ERROR",log_message) is_warning = re.search(r"[Ww]arning|WARNING",log_message) error_count = 0 warning_count = 0 if is_error and not is_warning: error_count = 1 elif is_warning: warning_count = 1 put_or_sum(errors_column,b_idx,error_count) put_or_sum(errors_service_column,s_idx,error_count) put_or_sum(warnings_column,b_idx,warning_count) put_or_sum(warnings_service_column,s_idx,warning_count) columns = [time_column,errors_column,warnings_column,messages_column,services_column, errors_service_column,warnings_service_column,messages_service_column] for c in columns: if self.has_column(c.get_name()): self.replace_column(self.map_column(c.get_name()),c) else: self.add_column(c) self.changed() DataTable.refresh(self)
Ancestors (in MRO)
- SyslogDataTable
- data_sources.data_table.DataTable
- builtins.object
Static methods
def __init__(
self, syslog_glob='/var/log/syslog*', num_hours=24, bucket_hours=1, refresh_minutes=10, start_time=None)
Initialize the SyslogDataTable with a file glob pattern to collect the syslogs on this machine, a timespan to aggregate for, aggregation bucket in hours, a refresh interval for updating in minutes
def __init__(self,syslog_glob="/var/log/syslog*",num_hours=24,bucket_hours=1,refresh_minutes=10,start_time=None): """ Initialize the SyslogDataTable with a file glob pattern to collect the syslogs on this machine, a timespan to aggregate for, aggregation bucket in hours, a refresh interval for updating in minutes """ self.syslog_glob = syslog_glob self.num_hours = num_hours self.bucket_hours = bucket_hours self.start_time = start_time DataTable.__init__(self,None, "Syslog Data: %s for the last %d hours in %d hour buckets, refreshed every %d minutes"%( self.syslog_glob, self.num_hours, self.bucket_hours, refresh_minutes),refresh_minutes) self.refresh()
def acquire_refresh_lock(
self)
acquire the refresh lock before reading/writing the table state
def acquire_refresh_lock(self): """ acquire the refresh lock before reading/writing the table state """ self.refresh_lock.acquire()
def add_column(
self, *args, **kwargs)
@synchronized def add_column(self,column): idx = len(self.columns) column.set_idx(idx) if not column.get_name(): column.set_name("%s_%d"%(self.name,idx)) self.columns.append(column) self.cnames[column.get_name()] = column column.set_table(self)
def changed(
self)
notify listeners that this table has been changed
def changed(self): """ notify listeners that this table has been changed """ for f in self.listeners: f(self)
def get(
self, *args, **kwargs)
@synchronized def get(self, row, reference ): return self.columns[self.map_column(reference)].get(row)
def get_bounds(
self, *args, **kwargs)
return a tuple (rows,cols) where rows is the maximum number of rows and cols is the maximum number of cols
@synchronized def get_bounds(self): """ return a tuple (rows,cols) where rows is the maximum number of rows and cols is the maximum number of cols """ cols = len(self.columns) rows = -1 for c in self.columns: size = c.size() if rows < 0 or size > rows: rows = size return (rows,cols)
def get_column(
self, *args, **kwargs)
@synchronized def get_column(self, reference): return self.columns[self.map_column(reference)]
def get_columns(
self, *args, **kwargs)
return the list of columns
@synchronized def get_columns(self): """ return the list of columns """ return self.columns
def get_name(
self)
return the name of the table
def get_name(self): """ return the name of the table """ return self.name
def get_names(
self, *args, **kwargs)
return a list of the names of the columns in order
@synchronized def get_names(self): """ return a list of the names of the columns in order""" return [c.get_name() for c in self.columns]
def get_refresh_timestamp(
self)
get the time that the table was last refreshed
def get_refresh_timestamp( self ): """ get the time that the table was last refreshed """ return self.refresh_timestamp
def has_column(
self, *args, **kwargs)
@synchronized def has_column(self, reference ): if type(reference) == str or type(reference) == str: return reference in self.cnames elif type(reference) == int: return idx < len(self.columns) else: return False
def insert_column(
self, *args, **kwargs)
@synchronized def insert_column(self,idx,column): while idx > len(self.columns): self.add_column(blank_column) if idx == len(self.columns): self.add_column(column) else: if not column.get_name(): column.set_name("%s_%d"%(self.name,idx)) self.columns.insert(idx,column) self.cnames[column.get_name()] = column column.set_table(self) while idx < len(self.columns): if column.get_name() == "%s_%d"%(self.name,idx-1): column.set_name("%s_%d"%(self.name,idx)) self.cnames[column.get_name()] = column self.columns[idx].set_idx(idx) idx += 1
def listen(
self, listen_func)
register for notifications when a change event is raised on this table
def listen(self,listen_func): """ register for notifications when a change event is raised on this table """ self.listeners.append(listen_func)
def map_column(
self, *args, **kwargs)
@synchronized def map_column(self, reference ): if type(reference) == str or type(reference) == str: return self.cnames[reference].get_idx() elif type(reference) == int: return reference else: raise TypeError("wrong type in mapping")
def perform_refresh(
self)
Thread worker that sleeps and refreshes the data on a schedule
def perform_refresh( self ): """ Thread worker that sleeps and refreshes the data on a schedule """ start_time = time.time() while not self.refresh_thread_stop: if time.time() - start_time >= self.refresh_minutes*60.0: self.refresh() start_time = time.time() time.sleep(1)
def put(
self, *args, **kwargs)
@synchronized def put(self, row, reference, value): self.columns[self.map_column(reference)].put(row,value)
def refresh(
self, *args, **kwargs)
refresh or rebuild tables
@synchronized def refresh( self ): """ refresh or rebuild tables """ if self.start_time: year,month,day,hour,minute,second = self.start_time current_time = datetime(year,month,day,hour,minute,second) else: current_time = datetime.now() start_time = current_time - timedelta( hours = self.num_hours ) syslog_files = glob.glob(self.syslog_glob) time_column = Column(name="Time Stamps") bucket_time = start_time idx = 0 while bucket_time < current_time: time_column.put(idx,Cell(date_type,bucket_time,format_date)) bucket_time = bucket_time + timedelta( hours = self.bucket_hours ) idx += 1 time_column.put(idx,Cell(date_type,current_time,format_date)) def bucket_idx( timestamp ): if timestamp < start_time or timestamp > current_time: return -1 for idx in range(time_column.size()): if time_column.get(idx).get_value() >= timestamp: return idx else: return -1 errors_column = Column(name="Errors by Time") warnings_column = Column(name="Warnings by Time") messages_column = Column(name="Messages by Time") services_column = Column(name="Services") errors_service_column = Column(name="Errors by Service") warnings_service_column = Column(name="Warnings by Service") messages_service_column = Column(name="Messages by Service") def service_idx( service ): for idx in range(services_column.size()): if services_column.get(idx).get_value() == service: return idx else: return -1 def put_or_sum( column, idx, value ): current_value = 0 if idx < column.size(): c = column.get(idx) if c.get_type() != blank_type: current_value = int(c.get_value()) column.put(idx,Cell(int_type,current_value+value,format_int)) for slf in syslog_files: if slf.endswith(".gz"): slf_f = gzip.open(slf,"rt",encoding="utf-8") else: slf_f = open(slf,"r",encoding="utf-8") for line in slf_f: line = line.strip() m = re.match(r"(\w\w\w\s+\d+\s\d\d:\d\d:\d\d)\s[a-z0-9\-]*\s([a-zA-Z0-9\-\_\.]*)[\[\]0-9]*:\s*(.*)",line) if m: log_date = re.sub(r"\s+"," ","%d "%current_time.year + m.group(1)) log_process = m.group(2) log_message = m.group(3) log_datetime = datetime.strptime(log_date,"%Y %b %d %H:%M:%S") b_idx = bucket_idx( log_datetime ) if b_idx >= 0: s_idx = service_idx( log_process ) if s_idx < 0: s_idx = services_column.size() services_column.put(s_idx,Cell(string_type,log_process,format_string)) put_or_sum(messages_column,b_idx,1) put_or_sum(messages_service_column,s_idx,1) is_error = re.search(r"[Ee]rror|ERROR",log_message) is_warning = re.search(r"[Ww]arning|WARNING",log_message) error_count = 0 warning_count = 0 if is_error and not is_warning: error_count = 1 elif is_warning: warning_count = 1 put_or_sum(errors_column,b_idx,error_count) put_or_sum(errors_service_column,s_idx,error_count) put_or_sum(warnings_column,b_idx,warning_count) put_or_sum(warnings_service_column,s_idx,warning_count) columns = [time_column,errors_column,warnings_column,messages_column,services_column, errors_service_column,warnings_service_column,messages_service_column] for c in columns: if self.has_column(c.get_name()): self.replace_column(self.map_column(c.get_name()),c) else: self.add_column(c) self.changed() DataTable.refresh(self)
def release_refresh_lock(
self)
release the refresh lock after reading/writing the table state
def release_refresh_lock(self): """ release the refresh lock after reading/writing the table state """ self.refresh_lock.release()
def replace_column(
self, *args, **kwargs)
@synchronized def replace_column(self,idx,column): column.set_idx(idx) if not column.get_name(): column.set_name("%s_%d"%(self.name,idx)) if idx == len(self.columns): self.columns.append(column) else: del self.cnames[self.columns[idx].get_name()] self.columns[idx] = column self.cnames[column.get_name()] = column column.set_table(self)
def start_refresh(
self)
Start the background refresh thread
def start_refresh( self ): """ Start the background refresh thread """ self.stop_refresh() self.refresh_thread = threading.Thread(target=self.perform_refresh) self.refresh_thread.start()
def stop_refresh(
self)
Stop the background refresh thread
def stop_refresh( self ): """ Stop the background refresh thread """ self.refresh_thread_stop = True if self.refresh_thread and self.refresh_thread.is_alive(): self.refresh_thread.join() self.refresh_thread = None self.refresh_thread_stop = False
def unlisten(
self, listen_func)
unregister for notifications when a change event is raised on this table
def unlisten(self,listen_func): """ unregister for notifications when a change event is raised on this table """ self.listeners.remove(listen_func)
Instance variables
var bucket_hours
var num_hours
var start_time
var syslog_glob