data_sources.elastic_data module
module that does an Elasticsearch query and creates a table of data based on the response
# Copyright 2020 James P Goodwin data table package to manage sparse columnar data """ module that does an Elasticsearch query and creates a table of data based on the response """ import locale locale.setlocale(locale.LC_ALL,'') import sys import os import glob import gzip import re from elasticsearch import Elasticsearch from datetime import datetime,timedelta from data_sources.data_table import DataTable,Column,Cell,blank_type,string_type,float_type,int_type,date_type,format_string,format_float,format_date,format_int,synchronized class ElasticsearchDataTable( DataTable ): """ class that collects data from the response to a specific elasticsearch query and populates tables based on a field map """ def __init__(self,refresh_minutes=1,es_index_pattern=None,es_query_body=None,es_field_map=None): """ Initialize the ElasticsearchQueryTable pass in a refresh interval in minutes, the es_query_body dict representing the query json and the field map list of tuples [( json path, field name, field type )...]""" self.es_query_body = es_query_body self.es_field_map = es_field_map self.es_index_pattern = es_index_pattern DataTable.__init__(self,None, "Elasticsearch query:%s,index:%s,fieldmap:%s,refreshed every %d minutes"%( es_query_body,es_index_pattern,es_field_map,refresh_minutes), refresh_minutes) self.refresh() @synchronized def get_es_parameters( self ): """ fetch the elasticsearch parameters for this table as a tuple (es_query_body,es_field_map,es_index_pattern) """ return (self.es_query_body,self.es_field_map,self.es_index_pattern) @synchronized def set_es_parameters( self, es_query_body,es_field_map,es_index_pattern ): """ set the elasticsearch parameters for this table """ self.es_query_body = es_query_body self.es_field_map = es_field_map self.es_index_pattern = es_index_pattern self.refresh() @synchronized def refresh( self ): """ refresh or rebuild tables """ es = Elasticsearch() result = es.search(index=self.es_index_pattern,body=self.es_query_body) def match_fields( name, result ): matches = [] if isinstance(result,dict): for k in result: full_name = (name+"." if name else "")+k item = result[k] for json_path,field_name,field_type in self.es_field_map: if full_name == json_path: matches.append((field_name,field_type,item)) if isinstance(item,dict) or isinstance(item,list): matches += match_fields(full_name,item) elif isinstance(result,list): for k in result: matches += match_fields(name,k) return matches matches = match_fields( "",result) new_columns = {} for column_name,column_type,value in matches: if not column_name in new_columns: new_columns[column_name] = Column(name=column_name) c = new_columns[column_name] if column_type == "date": cc = Cell(date_type,datetime.fromtimestamp(value/1000),format_date) elif column_type == "int": cc = Cell(int_type,value,format_int) elif column_type == "float": cc = Cell(float_type,value,format_float) elif column_type == "str": cc = Cell(string_type,value,format_string) else: cc = Cell(string_type,str(value),format_string) c.put(c.size(),cc) for column_name in new_columns: if self.has_column(column_name): self.replace_column(self.map_column(column_name),new_columns[column_name]) else: self.add_column(new_columns[column_name]) self.changed() DataTable.refresh(self)
Module variables
var blank_type
var date_type
var float_type
var int_type
var string_type
Classes
class ElasticsearchDataTable
class that collects data from the response to a specific elasticsearch query and populates tables based on a field map
class ElasticsearchDataTable( DataTable ): """ class that collects data from the response to a specific elasticsearch query and populates tables based on a field map """ def __init__(self,refresh_minutes=1,es_index_pattern=None,es_query_body=None,es_field_map=None): """ Initialize the ElasticsearchQueryTable pass in a refresh interval in minutes, the es_query_body dict representing the query json and the field map list of tuples [( json path, field name, field type )...]""" self.es_query_body = es_query_body self.es_field_map = es_field_map self.es_index_pattern = es_index_pattern DataTable.__init__(self,None, "Elasticsearch query:%s,index:%s,fieldmap:%s,refreshed every %d minutes"%( es_query_body,es_index_pattern,es_field_map,refresh_minutes), refresh_minutes) self.refresh() @synchronized def get_es_parameters( self ): """ fetch the elasticsearch parameters for this table as a tuple (es_query_body,es_field_map,es_index_pattern) """ return (self.es_query_body,self.es_field_map,self.es_index_pattern) @synchronized def set_es_parameters( self, es_query_body,es_field_map,es_index_pattern ): """ set the elasticsearch parameters for this table """ self.es_query_body = es_query_body self.es_field_map = es_field_map self.es_index_pattern = es_index_pattern self.refresh() @synchronized def refresh( self ): """ refresh or rebuild tables """ es = Elasticsearch() result = es.search(index=self.es_index_pattern,body=self.es_query_body) def match_fields( name, result ): matches = [] if isinstance(result,dict): for k in result: full_name = (name+"." if name else "")+k item = result[k] for json_path,field_name,field_type in self.es_field_map: if full_name == json_path: matches.append((field_name,field_type,item)) if isinstance(item,dict) or isinstance(item,list): matches += match_fields(full_name,item) elif isinstance(result,list): for k in result: matches += match_fields(name,k) return matches matches = match_fields( "",result) new_columns = {} for column_name,column_type,value in matches: if not column_name in new_columns: new_columns[column_name] = Column(name=column_name) c = new_columns[column_name] if column_type == "date": cc = Cell(date_type,datetime.fromtimestamp(value/1000),format_date) elif column_type == "int": cc = Cell(int_type,value,format_int) elif column_type == "float": cc = Cell(float_type,value,format_float) elif column_type == "str": cc = Cell(string_type,value,format_string) else: cc = Cell(string_type,str(value),format_string) c.put(c.size(),cc) for column_name in new_columns: if self.has_column(column_name): self.replace_column(self.map_column(column_name),new_columns[column_name]) else: self.add_column(new_columns[column_name]) self.changed() DataTable.refresh(self)
Ancestors (in MRO)
- ElasticsearchDataTable
- data_sources.data_table.DataTable
- builtins.object
Static methods
def __init__(
self, refresh_minutes=1, es_index_pattern=None, es_query_body=None, es_field_map=None)
Initialize the ElasticsearchQueryTable pass in a refresh interval in minutes, the es_query_body dict representing the query json and the field map list of tuples [( json path, field name, field type )...]
def __init__(self,refresh_minutes=1,es_index_pattern=None,es_query_body=None,es_field_map=None): """ Initialize the ElasticsearchQueryTable pass in a refresh interval in minutes, the es_query_body dict representing the query json and the field map list of tuples [( json path, field name, field type )...]""" self.es_query_body = es_query_body self.es_field_map = es_field_map self.es_index_pattern = es_index_pattern DataTable.__init__(self,None, "Elasticsearch query:%s,index:%s,fieldmap:%s,refreshed every %d minutes"%( es_query_body,es_index_pattern,es_field_map,refresh_minutes), refresh_minutes) self.refresh()
def acquire_refresh_lock(
self)
acquire the refresh lock before reading/writing the table state
def acquire_refresh_lock(self): """ acquire the refresh lock before reading/writing the table state """ self.refresh_lock.acquire()
def add_column(
self, *args, **kwargs)
@synchronized def add_column(self,column): idx = len(self.columns) column.set_idx(idx) if not column.get_name(): column.set_name("%s_%d"%(self.name,idx)) self.columns.append(column) self.cnames[column.get_name()] = column column.set_table(self)
def changed(
self)
notify listeners that this table has been changed
def changed(self): """ notify listeners that this table has been changed """ for f in self.listeners: f(self)
def get(
self, *args, **kwargs)
@synchronized def get(self, row, reference ): return self.columns[self.map_column(reference)].get(row)
def get_bounds(
self, *args, **kwargs)
return a tuple (rows,cols) where rows is the maximum number of rows and cols is the maximum number of cols
@synchronized def get_bounds(self): """ return a tuple (rows,cols) where rows is the maximum number of rows and cols is the maximum number of cols """ cols = len(self.columns) rows = -1 for c in self.columns: size = c.size() if rows < 0 or size > rows: rows = size return (rows,cols)
def get_column(
self, *args, **kwargs)
@synchronized def get_column(self, reference): return self.columns[self.map_column(reference)]
def get_columns(
self, *args, **kwargs)
return the list of columns
@synchronized def get_columns(self): """ return the list of columns """ return self.columns
def get_es_parameters(
self, *args, **kwargs)
fetch the elasticsearch parameters for this table as a tuple (es_query_body,es_field_map,es_index_pattern)
@synchronized def get_es_parameters( self ): """ fetch the elasticsearch parameters for this table as a tuple (es_query_body,es_field_map,es_index_pattern) """ return (self.es_query_body,self.es_field_map,self.es_index_pattern)
def get_name(
self)
return the name of the table
def get_name(self): """ return the name of the table """ return self.name
def get_names(
self, *args, **kwargs)
return a list of the names of the columns in order
@synchronized def get_names(self): """ return a list of the names of the columns in order""" return [c.get_name() for c in self.columns]
def get_refresh_timestamp(
self)
get the time that the table was last refreshed
def get_refresh_timestamp( self ): """ get the time that the table was last refreshed """ return self.refresh_timestamp
def has_column(
self, *args, **kwargs)
@synchronized def has_column(self, reference ): if type(reference) == str or type(reference) == str: return reference in self.cnames elif type(reference) == int: return idx < len(self.columns) else: return False
def insert_column(
self, *args, **kwargs)
@synchronized def insert_column(self,idx,column): while idx > len(self.columns): self.add_column(blank_column) if idx == len(self.columns): self.add_column(column) else: if not column.get_name(): column.set_name("%s_%d"%(self.name,idx)) self.columns.insert(idx,column) self.cnames[column.get_name()] = column column.set_table(self) while idx < len(self.columns): if column.get_name() == "%s_%d"%(self.name,idx-1): column.set_name("%s_%d"%(self.name,idx)) self.cnames[column.get_name()] = column self.columns[idx].set_idx(idx) idx += 1
def listen(
self, listen_func)
register for notifications when a change event is raised on this table
def listen(self,listen_func): """ register for notifications when a change event is raised on this table """ self.listeners.append(listen_func)
def map_column(
self, *args, **kwargs)
@synchronized def map_column(self, reference ): if type(reference) == str or type(reference) == str: return self.cnames[reference].get_idx() elif type(reference) == int: return reference else: raise TypeError("wrong type in mapping")
def perform_refresh(
self)
Thread worker that sleeps and refreshes the data on a schedule
def perform_refresh( self ): """ Thread worker that sleeps and refreshes the data on a schedule """ start_time = time.time() while not self.refresh_thread_stop: if time.time() - start_time >= self.refresh_minutes*60.0: self.refresh() start_time = time.time() time.sleep(1)
def put(
self, *args, **kwargs)
@synchronized def put(self, row, reference, value): self.columns[self.map_column(reference)].put(row,value)
def refresh(
self, *args, **kwargs)
refresh or rebuild tables
@synchronized def refresh( self ): """ refresh or rebuild tables """ es = Elasticsearch() result = es.search(index=self.es_index_pattern,body=self.es_query_body) def match_fields( name, result ): matches = [] if isinstance(result,dict): for k in result: full_name = (name+"." if name else "")+k item = result[k] for json_path,field_name,field_type in self.es_field_map: if full_name == json_path: matches.append((field_name,field_type,item)) if isinstance(item,dict) or isinstance(item,list): matches += match_fields(full_name,item) elif isinstance(result,list): for k in result: matches += match_fields(name,k) return matches matches = match_fields( "",result) new_columns = {} for column_name,column_type,value in matches: if not column_name in new_columns: new_columns[column_name] = Column(name=column_name) c = new_columns[column_name] if column_type == "date": cc = Cell(date_type,datetime.fromtimestamp(value/1000),format_date) elif column_type == "int": cc = Cell(int_type,value,format_int) elif column_type == "float": cc = Cell(float_type,value,format_float) elif column_type == "str": cc = Cell(string_type,value,format_string) else: cc = Cell(string_type,str(value),format_string) c.put(c.size(),cc) for column_name in new_columns: if self.has_column(column_name): self.replace_column(self.map_column(column_name),new_columns[column_name]) else: self.add_column(new_columns[column_name]) self.changed() DataTable.refresh(self)
def release_refresh_lock(
self)
release the refresh lock after reading/writing the table state
def release_refresh_lock(self): """ release the refresh lock after reading/writing the table state """ self.refresh_lock.release()
def replace_column(
self, *args, **kwargs)
@synchronized def replace_column(self,idx,column): column.set_idx(idx) if not column.get_name(): column.set_name("%s_%d"%(self.name,idx)) if idx == len(self.columns): self.columns.append(column) else: del self.cnames[self.columns[idx].get_name()] self.columns[idx] = column self.cnames[column.get_name()] = column column.set_table(self)
def set_es_parameters(
self, *args, **kwargs)
set the elasticsearch parameters for this table
@synchronized def set_es_parameters( self, es_query_body,es_field_map,es_index_pattern ): """ set the elasticsearch parameters for this table """ self.es_query_body = es_query_body self.es_field_map = es_field_map self.es_index_pattern = es_index_pattern self.refresh()
def start_refresh(
self)
Start the background refresh thread
def start_refresh( self ): """ Start the background refresh thread """ self.stop_refresh() self.refresh_thread = threading.Thread(target=self.perform_refresh) self.refresh_thread.start()
def stop_refresh(
self)
Stop the background refresh thread
def stop_refresh( self ): """ Stop the background refresh thread """ self.refresh_thread_stop = True if self.refresh_thread and self.refresh_thread.is_alive(): self.refresh_thread.join() self.refresh_thread = None self.refresh_thread_stop = False
def unlisten(
self, listen_func)
unregister for notifications when a change event is raised on this table
def unlisten(self,listen_func): """ unregister for notifications when a change event is raised on this table """ self.listeners.remove(listen_func)
Instance variables
var es_field_map
var es_index_pattern
var es_query_body