data_sources.proc_data module
module that aggregates data from system information using psutil and provides a set of data tables
# Copyright 2020 James P Goodwin data table package to manage sparse columnar data """ module that aggregates data from system information using psutil and provides a set of data tables """ import locale locale.setlocale(locale.LC_ALL,'') import sys import os import glob import gzip import re import psutil from datetime import datetime,timedelta from data_sources.data_table import DataTable,Column,Cell,blank_type,string_type,float_type,int_type,date_type,format_string,format_float,format_date,format_int,synchronized class AverageCell(Cell): def __init__(self,type,value,format): self.type = type self.value = value self.format = format self.values = [ value ] def put_value(self,value): self.values.append(value) self.value = sum(self.values)/len(self.values) class ProcDataTable( DataTable ): """ class that collects a time based aggregation of data from system process information into a data_table """ def __init__(self,num_hours=24,bucket_hours=1,refresh_minutes=10): """ Initialize the ProcDataTable to collect system process information, a timespan to aggregate for, aggregation bucket in hours, a refresh interval for updating in minutes """ self.num_hours = num_hours self.bucket_hours = bucket_hours DataTable.__init__(self,None, "Proc Data: for the last %d hours in %d hour buckets, refreshed every %d minutes"%( self.num_hours, self.bucket_hours, refresh_minutes),refresh_minutes) self.refresh() @synchronized def refresh( self ): """ refresh or rebuild tables """ current_time = datetime.now() column_names = [ "Time Stamps", "CPU Percent", "Load Avg", "Total Virtual Memory", "Available Virtual Memory", "Filesystem Percent Full", "Filesystem Read Bytes", "Filesystem Write Bytes", "Network Sent Bytes","Network Received Bytes","Network Connections" ] def bucket_idx( timestamp, column ): for idx in range(column.size()): if column.get(idx).get_value() >= timestamp: return idx else: return -1 def append_bucket( timestamp, column ): column.put(column.size(),Cell(date_type,timestamp+timedelta( hours=self.bucket_hours),format_date)) if column.size() > self.num_hours/self.bucket_hours: for cn in column_names: self.get_column(cn).delete(0) return column.size()-1 def add_average( column_name, idx, value ): column = self.get_column(column_name) if not column.size() or idx >= column.size(): column.put(idx, AverageCell(float_type,value,format_float)) else: column.get(idx).put_value(value) bidx = 0 for cn in column_names: if not self.has_column(cn): self.add_column(Column(name=cn)) if cn == "Time Stamps": bidx = bucket_idx( current_time, self.get_column(cn)) if bidx < 0: bidx = append_bucket( current_time, self.get_column(cn)) elif cn == "CPU Percent": add_average(cn,bidx,psutil.cpu_percent()) elif cn == "Load Avg": add_average(cn,bidx,psutil.getloadavg()[2]) elif cn == "Total Virtual Memory": add_average(cn,bidx,psutil.virtual_memory().total) elif cn == "Available Virtual Memory": add_average(cn,bidx,psutil.virtual_memory().available) elif cn == "Filesystem Percent Full": add_average(cn,bidx,psutil.disk_usage("/").percent) elif cn == "Filesystem Read Bytes": add_average(cn,bidx,psutil.disk_io_counters().read_bytes) elif cn == "Filesystem Write Bytes": add_average(cn,bidx,psutil.disk_io_counters().write_bytes) elif cn == "Network Sent Bytes": add_average(cn,bidx,psutil.net_io_counters().bytes_sent) elif cn == "Network Received Bytes": add_average(cn,bidx,psutil.net_io_counters().bytes_recv) elif cn == "Network Connections": add_average(cn,bidx,float(len(psutil.net_connections()))) self.changed() DataTable.refresh(self)
Module variables
var blank_type
var date_type
var float_type
var int_type
var string_type
Classes
class AverageCell
class AverageCell(Cell): def __init__(self,type,value,format): self.type = type self.value = value self.format = format self.values = [ value ] def put_value(self,value): self.values.append(value) self.value = sum(self.values)/len(self.values)
Ancestors (in MRO)
- AverageCell
- data_sources.data_table.Cell
- builtins.object
Static methods
def __init__(
self, type, value, format)
Initialize self. See help(type(self)) for accurate signature.
def __init__(self,type,value,format): self.type = type self.value = value self.format = format self.values = [ value ]
def get_float_value(
self)
def get_float_value(self): if self.type in [float_type,int_type]: return float(self.value) elif self.type == date_type: return self.value.timestamp() else: return 0.0
def get_format(
self)
def get_format(self): return self.format
def get_type(
self)
def get_type(self): return self.type
def get_value(
self)
def get_value(self): return self.value
def put_value(
self, value)
def put_value(self,value): self.values.append(value) self.value = sum(self.values)/len(self.values)
def set_format(
self, format)
def set_format(self,format): self.format = format
Instance variables
var format
var type
var value
var values
class ProcDataTable
class that collects a time based aggregation of data from system process information into a data_table
class ProcDataTable( DataTable ): """ class that collects a time based aggregation of data from system process information into a data_table """ def __init__(self,num_hours=24,bucket_hours=1,refresh_minutes=10): """ Initialize the ProcDataTable to collect system process information, a timespan to aggregate for, aggregation bucket in hours, a refresh interval for updating in minutes """ self.num_hours = num_hours self.bucket_hours = bucket_hours DataTable.__init__(self,None, "Proc Data: for the last %d hours in %d hour buckets, refreshed every %d minutes"%( self.num_hours, self.bucket_hours, refresh_minutes),refresh_minutes) self.refresh() @synchronized def refresh( self ): """ refresh or rebuild tables """ current_time = datetime.now() column_names = [ "Time Stamps", "CPU Percent", "Load Avg", "Total Virtual Memory", "Available Virtual Memory", "Filesystem Percent Full", "Filesystem Read Bytes", "Filesystem Write Bytes", "Network Sent Bytes","Network Received Bytes","Network Connections" ] def bucket_idx( timestamp, column ): for idx in range(column.size()): if column.get(idx).get_value() >= timestamp: return idx else: return -1 def append_bucket( timestamp, column ): column.put(column.size(),Cell(date_type,timestamp+timedelta( hours=self.bucket_hours),format_date)) if column.size() > self.num_hours/self.bucket_hours: for cn in column_names: self.get_column(cn).delete(0) return column.size()-1 def add_average( column_name, idx, value ): column = self.get_column(column_name) if not column.size() or idx >= column.size(): column.put(idx, AverageCell(float_type,value,format_float)) else: column.get(idx).put_value(value) bidx = 0 for cn in column_names: if not self.has_column(cn): self.add_column(Column(name=cn)) if cn == "Time Stamps": bidx = bucket_idx( current_time, self.get_column(cn)) if bidx < 0: bidx = append_bucket( current_time, self.get_column(cn)) elif cn == "CPU Percent": add_average(cn,bidx,psutil.cpu_percent()) elif cn == "Load Avg": add_average(cn,bidx,psutil.getloadavg()[2]) elif cn == "Total Virtual Memory": add_average(cn,bidx,psutil.virtual_memory().total) elif cn == "Available Virtual Memory": add_average(cn,bidx,psutil.virtual_memory().available) elif cn == "Filesystem Percent Full": add_average(cn,bidx,psutil.disk_usage("/").percent) elif cn == "Filesystem Read Bytes": add_average(cn,bidx,psutil.disk_io_counters().read_bytes) elif cn == "Filesystem Write Bytes": add_average(cn,bidx,psutil.disk_io_counters().write_bytes) elif cn == "Network Sent Bytes": add_average(cn,bidx,psutil.net_io_counters().bytes_sent) elif cn == "Network Received Bytes": add_average(cn,bidx,psutil.net_io_counters().bytes_recv) elif cn == "Network Connections": add_average(cn,bidx,float(len(psutil.net_connections()))) self.changed() DataTable.refresh(self)
Ancestors (in MRO)
- ProcDataTable
- data_sources.data_table.DataTable
- builtins.object
Static methods
def __init__(
self, num_hours=24, bucket_hours=1, refresh_minutes=10)
Initialize the ProcDataTable to collect system process information, a timespan to aggregate for, aggregation bucket in hours, a refresh interval for updating in minutes
def __init__(self,num_hours=24,bucket_hours=1,refresh_minutes=10): """ Initialize the ProcDataTable to collect system process information, a timespan to aggregate for, aggregation bucket in hours, a refresh interval for updating in minutes """ self.num_hours = num_hours self.bucket_hours = bucket_hours DataTable.__init__(self,None, "Proc Data: for the last %d hours in %d hour buckets, refreshed every %d minutes"%( self.num_hours, self.bucket_hours, refresh_minutes),refresh_minutes) self.refresh()
def acquire_refresh_lock(
self)
acquire the refresh lock before reading/writing the table state
def acquire_refresh_lock(self): """ acquire the refresh lock before reading/writing the table state """ self.refresh_lock.acquire()
def add_column(
self, *args, **kwargs)
@synchronized def add_column(self,column): idx = len(self.columns) column.set_idx(idx) if not column.get_name(): column.set_name("%s_%d"%(self.name,idx)) self.columns.append(column) self.cnames[column.get_name()] = column column.set_table(self)
def changed(
self)
notify listeners that this table has been changed
def changed(self): """ notify listeners that this table has been changed """ for f in self.listeners: f(self)
def get(
self, *args, **kwargs)
@synchronized def get(self, row, reference ): return self.columns[self.map_column(reference)].get(row)
def get_bounds(
self, *args, **kwargs)
return a tuple (rows,cols) where rows is the maximum number of rows and cols is the maximum number of cols
@synchronized def get_bounds(self): """ return a tuple (rows,cols) where rows is the maximum number of rows and cols is the maximum number of cols """ cols = len(self.columns) rows = -1 for c in self.columns: size = c.size() if rows < 0 or size > rows: rows = size return (rows,cols)
def get_column(
self, *args, **kwargs)
@synchronized def get_column(self, reference): return self.columns[self.map_column(reference)]
def get_columns(
self, *args, **kwargs)
return the list of columns
@synchronized def get_columns(self): """ return the list of columns """ return self.columns
def get_name(
self)
return the name of the table
def get_name(self): """ return the name of the table """ return self.name
def get_names(
self, *args, **kwargs)
return a list of the names of the columns in order
@synchronized def get_names(self): """ return a list of the names of the columns in order""" return [c.get_name() for c in self.columns]
def get_refresh_timestamp(
self)
get the time that the table was last refreshed
def get_refresh_timestamp( self ): """ get the time that the table was last refreshed """ return self.refresh_timestamp
def has_column(
self, *args, **kwargs)
@synchronized def has_column(self, reference ): if type(reference) == str or type(reference) == str: return reference in self.cnames elif type(reference) == int: return idx < len(self.columns) else: return False
def insert_column(
self, *args, **kwargs)
@synchronized def insert_column(self,idx,column): while idx > len(self.columns): self.add_column(blank_column) if idx == len(self.columns): self.add_column(column) else: if not column.get_name(): column.set_name("%s_%d"%(self.name,idx)) self.columns.insert(idx,column) self.cnames[column.get_name()] = column column.set_table(self) while idx < len(self.columns): if column.get_name() == "%s_%d"%(self.name,idx-1): column.set_name("%s_%d"%(self.name,idx)) self.cnames[column.get_name()] = column self.columns[idx].set_idx(idx) idx += 1
def listen(
self, listen_func)
register for notifications when a change event is raised on this table
def listen(self,listen_func): """ register for notifications when a change event is raised on this table """ self.listeners.append(listen_func)
def map_column(
self, *args, **kwargs)
@synchronized def map_column(self, reference ): if type(reference) == str or type(reference) == str: return self.cnames[reference].get_idx() elif type(reference) == int: return reference else: raise TypeError("wrong type in mapping")
def perform_refresh(
self)
Thread worker that sleeps and refreshes the data on a schedule
def perform_refresh( self ): """ Thread worker that sleeps and refreshes the data on a schedule """ start_time = time.time() while not self.refresh_thread_stop: if time.time() - start_time >= self.refresh_minutes*60.0: self.refresh() start_time = time.time() time.sleep(1)
def put(
self, *args, **kwargs)
@synchronized def put(self, row, reference, value): self.columns[self.map_column(reference)].put(row,value)
def refresh(
self, *args, **kwargs)
refresh or rebuild tables
@synchronized def refresh( self ): """ refresh or rebuild tables """ current_time = datetime.now() column_names = [ "Time Stamps", "CPU Percent", "Load Avg", "Total Virtual Memory", "Available Virtual Memory", "Filesystem Percent Full", "Filesystem Read Bytes", "Filesystem Write Bytes", "Network Sent Bytes","Network Received Bytes","Network Connections" ] def bucket_idx( timestamp, column ): for idx in range(column.size()): if column.get(idx).get_value() >= timestamp: return idx else: return -1 def append_bucket( timestamp, column ): column.put(column.size(),Cell(date_type,timestamp+timedelta( hours=self.bucket_hours),format_date)) if column.size() > self.num_hours/self.bucket_hours: for cn in column_names: self.get_column(cn).delete(0) return column.size()-1 def add_average( column_name, idx, value ): column = self.get_column(column_name) if not column.size() or idx >= column.size(): column.put(idx, AverageCell(float_type,value,format_float)) else: column.get(idx).put_value(value) bidx = 0 for cn in column_names: if not self.has_column(cn): self.add_column(Column(name=cn)) if cn == "Time Stamps": bidx = bucket_idx( current_time, self.get_column(cn)) if bidx < 0: bidx = append_bucket( current_time, self.get_column(cn)) elif cn == "CPU Percent": add_average(cn,bidx,psutil.cpu_percent()) elif cn == "Load Avg": add_average(cn,bidx,psutil.getloadavg()[2]) elif cn == "Total Virtual Memory": add_average(cn,bidx,psutil.virtual_memory().total) elif cn == "Available Virtual Memory": add_average(cn,bidx,psutil.virtual_memory().available) elif cn == "Filesystem Percent Full": add_average(cn,bidx,psutil.disk_usage("/").percent) elif cn == "Filesystem Read Bytes": add_average(cn,bidx,psutil.disk_io_counters().read_bytes) elif cn == "Filesystem Write Bytes": add_average(cn,bidx,psutil.disk_io_counters().write_bytes) elif cn == "Network Sent Bytes": add_average(cn,bidx,psutil.net_io_counters().bytes_sent) elif cn == "Network Received Bytes": add_average(cn,bidx,psutil.net_io_counters().bytes_recv) elif cn == "Network Connections": add_average(cn,bidx,float(len(psutil.net_connections()))) self.changed() DataTable.refresh(self)
def release_refresh_lock(
self)
release the refresh lock after reading/writing the table state
def release_refresh_lock(self): """ release the refresh lock after reading/writing the table state """ self.refresh_lock.release()
def replace_column(
self, *args, **kwargs)
@synchronized def replace_column(self,idx,column): column.set_idx(idx) if not column.get_name(): column.set_name("%s_%d"%(self.name,idx)) if idx == len(self.columns): self.columns.append(column) else: del self.cnames[self.columns[idx].get_name()] self.columns[idx] = column self.cnames[column.get_name()] = column column.set_table(self)
def start_refresh(
self)
Start the background refresh thread
def start_refresh( self ): """ Start the background refresh thread """ self.stop_refresh() self.refresh_thread = threading.Thread(target=self.perform_refresh) self.refresh_thread.start()
def stop_refresh(
self)
Stop the background refresh thread
def stop_refresh( self ): """ Stop the background refresh thread """ self.refresh_thread_stop = True if self.refresh_thread and self.refresh_thread.is_alive(): self.refresh_thread.join() self.refresh_thread = None self.refresh_thread_stop = False
def unlisten(
self, listen_func)
unregister for notifications when a change event is raised on this table
def unlisten(self,listen_func): """ unregister for notifications when a change event is raised on this table """ self.listeners.remove(listen_func)
Instance variables
var bucket_hours
var num_hours