Top

data_sources.elastic_data module

module that does an Elasticsearch query and creates a table of data based on the response

# Copyright 2020 James P Goodwin data table package to manage sparse columnar data
""" module that does an Elasticsearch query and creates a table of data based on the response """
import locale
locale.setlocale(locale.LC_ALL,'')
import sys
import os
import glob
import gzip
import re
from elasticsearch import Elasticsearch
from datetime import datetime,timedelta
from data_sources.data_table import DataTable,Column,Cell,blank_type,string_type,float_type,int_type,date_type,format_string,format_float,format_date,format_int,synchronized

class ElasticsearchDataTable( DataTable ):
    """ class that collects data from the response to a specific elasticsearch query and populates tables based on a field map """
    def __init__(self,refresh_minutes=1,es_index_pattern=None,es_query_body=None,es_field_map=None):
        """ Initialize the ElasticsearchQueryTable pass in a refresh interval in minutes, the es_query_body dict representing the query json and the field map list of tuples [( json path, field name, field type )...]"""
        self.es_query_body = es_query_body
        self.es_field_map = es_field_map
        self.es_index_pattern = es_index_pattern
        DataTable.__init__(self,None,
            "Elasticsearch query:%s,index:%s,fieldmap:%s,refreshed every %d minutes"%(
                es_query_body,es_index_pattern,es_field_map,refresh_minutes),
            refresh_minutes)
        self.refresh()

    @synchronized
    def get_es_parameters( self ):
        """ fetch the elasticsearch parameters for this table as a tuple (es_query_body,es_field_map,es_index_pattern) """
        return (self.es_query_body,self.es_field_map,self.es_index_pattern)

    @synchronized
    def set_es_parameters( self, es_query_body,es_field_map,es_index_pattern ):
        """ set the elasticsearch parameters for this table """
        self.es_query_body = es_query_body
        self.es_field_map = es_field_map
        self.es_index_pattern = es_index_pattern
        self.refresh()

    @synchronized
    def refresh( self ):
        """ refresh or rebuild tables """

        es = Elasticsearch()

        result = es.search(index=self.es_index_pattern,body=self.es_query_body)

        def match_fields( name, result ):
            matches = []
            if isinstance(result,dict):
                for k in result:
                    full_name = (name+"." if name else "")+k
                    item = result[k]
                    for json_path,field_name,field_type  in self.es_field_map:
                        if full_name == json_path:
                            matches.append((field_name,field_type,item))
                    if isinstance(item,dict) or isinstance(item,list):
                        matches += match_fields(full_name,item)
            elif isinstance(result,list):
                for k in result:
                    matches += match_fields(name,k)
            return matches

        matches = match_fields( "",result)

        new_columns = {}
        for column_name,column_type,value in matches:
            if not column_name in new_columns:
                new_columns[column_name] = Column(name=column_name)
            c = new_columns[column_name]
            if column_type == "date":
                cc = Cell(date_type,datetime.fromtimestamp(value/1000),format_date)
            elif column_type == "int":
                cc = Cell(int_type,value,format_int)
            elif column_type == "float":
                cc = Cell(float_type,value,format_float)
            elif column_type == "str":
                cc = Cell(string_type,value,format_string)
            else:
                cc = Cell(string_type,str(value),format_string)
            c.put(c.size(),cc)

        for column_name in new_columns:
            if self.has_column(column_name):
                self.replace_column(self.map_column(column_name),new_columns[column_name])
            else:
                self.add_column(new_columns[column_name])

        self.changed()

        DataTable.refresh(self)

Module variables

var blank_type

var date_type

var float_type

var int_type

var string_type

Classes

class ElasticsearchDataTable

class that collects data from the response to a specific elasticsearch query and populates tables based on a field map

class ElasticsearchDataTable( DataTable ):
    """ class that collects data from the response to a specific elasticsearch query and populates tables based on a field map """
    def __init__(self,refresh_minutes=1,es_index_pattern=None,es_query_body=None,es_field_map=None):
        """ Initialize the ElasticsearchQueryTable pass in a refresh interval in minutes, the es_query_body dict representing the query json and the field map list of tuples [( json path, field name, field type )...]"""
        self.es_query_body = es_query_body
        self.es_field_map = es_field_map
        self.es_index_pattern = es_index_pattern
        DataTable.__init__(self,None,
            "Elasticsearch query:%s,index:%s,fieldmap:%s,refreshed every %d minutes"%(
                es_query_body,es_index_pattern,es_field_map,refresh_minutes),
            refresh_minutes)
        self.refresh()

    @synchronized
    def get_es_parameters( self ):
        """ fetch the elasticsearch parameters for this table as a tuple (es_query_body,es_field_map,es_index_pattern) """
        return (self.es_query_body,self.es_field_map,self.es_index_pattern)

    @synchronized
    def set_es_parameters( self, es_query_body,es_field_map,es_index_pattern ):
        """ set the elasticsearch parameters for this table """
        self.es_query_body = es_query_body
        self.es_field_map = es_field_map
        self.es_index_pattern = es_index_pattern
        self.refresh()

    @synchronized
    def refresh( self ):
        """ refresh or rebuild tables """

        es = Elasticsearch()

        result = es.search(index=self.es_index_pattern,body=self.es_query_body)

        def match_fields( name, result ):
            matches = []
            if isinstance(result,dict):
                for k in result:
                    full_name = (name+"." if name else "")+k
                    item = result[k]
                    for json_path,field_name,field_type  in self.es_field_map:
                        if full_name == json_path:
                            matches.append((field_name,field_type,item))
                    if isinstance(item,dict) or isinstance(item,list):
                        matches += match_fields(full_name,item)
            elif isinstance(result,list):
                for k in result:
                    matches += match_fields(name,k)
            return matches

        matches = match_fields( "",result)

        new_columns = {}
        for column_name,column_type,value in matches:
            if not column_name in new_columns:
                new_columns[column_name] = Column(name=column_name)
            c = new_columns[column_name]
            if column_type == "date":
                cc = Cell(date_type,datetime.fromtimestamp(value/1000),format_date)
            elif column_type == "int":
                cc = Cell(int_type,value,format_int)
            elif column_type == "float":
                cc = Cell(float_type,value,format_float)
            elif column_type == "str":
                cc = Cell(string_type,value,format_string)
            else:
                cc = Cell(string_type,str(value),format_string)
            c.put(c.size(),cc)

        for column_name in new_columns:
            if self.has_column(column_name):
                self.replace_column(self.map_column(column_name),new_columns[column_name])
            else:
                self.add_column(new_columns[column_name])

        self.changed()

        DataTable.refresh(self)

Ancestors (in MRO)

Static methods

def __init__(

self, refresh_minutes=1, es_index_pattern=None, es_query_body=None, es_field_map=None)

Initialize the ElasticsearchQueryTable pass in a refresh interval in minutes, the es_query_body dict representing the query json and the field map list of tuples [( json path, field name, field type )...]

def __init__(self,refresh_minutes=1,es_index_pattern=None,es_query_body=None,es_field_map=None):
    """ Initialize the ElasticsearchQueryTable pass in a refresh interval in minutes, the es_query_body dict representing the query json and the field map list of tuples [( json path, field name, field type )...]"""
    self.es_query_body = es_query_body
    self.es_field_map = es_field_map
    self.es_index_pattern = es_index_pattern
    DataTable.__init__(self,None,
        "Elasticsearch query:%s,index:%s,fieldmap:%s,refreshed every %d minutes"%(
            es_query_body,es_index_pattern,es_field_map,refresh_minutes),
        refresh_minutes)
    self.refresh()

def acquire_refresh_lock(

self)

acquire the refresh lock before reading/writing the table state

def acquire_refresh_lock(self):
    """ acquire the refresh lock before reading/writing the table state """
    self.refresh_lock.acquire()

def add_column(

self, *args, **kwargs)

@synchronized
def add_column(self,column):
    idx = len(self.columns)
    column.set_idx(idx)
    if not column.get_name():
        column.set_name("%s_%d"%(self.name,idx))
    self.columns.append(column)
    self.cnames[column.get_name()] = column
    column.set_table(self)

def changed(

self)

notify listeners that this table has been changed

def changed(self):
    """ notify listeners that this table has been changed """
    for f in self.listeners:
        f(self)

def get(

self, *args, **kwargs)

@synchronized
def get(self, row, reference ):
    return self.columns[self.map_column(reference)].get(row)

def get_bounds(

self, *args, **kwargs)

return a tuple (rows,cols) where rows is the maximum number of rows and cols is the maximum number of cols

@synchronized
def get_bounds(self):
    """ return a tuple (rows,cols) where rows is the maximum number of rows and cols is the maximum number of cols """
    cols = len(self.columns)
    rows = -1
    for c in self.columns:
        size = c.size()
        if rows < 0 or size > rows:
            rows = size
    return (rows,cols)

def get_column(

self, *args, **kwargs)

@synchronized
def get_column(self, reference):
    return self.columns[self.map_column(reference)]

def get_columns(

self, *args, **kwargs)

return the list of columns

@synchronized
def get_columns(self):
    """ return the list of columns """
    return self.columns

def get_es_parameters(

self, *args, **kwargs)

fetch the elasticsearch parameters for this table as a tuple (es_query_body,es_field_map,es_index_pattern)

@synchronized
def get_es_parameters( self ):
    """ fetch the elasticsearch parameters for this table as a tuple (es_query_body,es_field_map,es_index_pattern) """
    return (self.es_query_body,self.es_field_map,self.es_index_pattern)

def get_name(

self)

return the name of the table

def get_name(self):
    """ return the name of the table """
    return self.name

def get_names(

self, *args, **kwargs)

return a list of the names of the columns in order

@synchronized
def get_names(self):
    """ return a list of the names of the columns in order"""
    return [c.get_name() for c in self.columns]

def get_refresh_timestamp(

self)

get the time that the table was last refreshed

def get_refresh_timestamp( self ):
    """ get the time that the table was last refreshed """
    return self.refresh_timestamp

def has_column(

self, *args, **kwargs)

@synchronized
def has_column(self, reference ):
    if type(reference) == str or type(reference) == str:
        return reference in self.cnames
    elif type(reference) == int:
        return idx < len(self.columns)
    else:
        return False

def insert_column(

self, *args, **kwargs)

@synchronized
def insert_column(self,idx,column):
    while idx > len(self.columns):
        self.add_column(blank_column)
    if idx == len(self.columns):
        self.add_column(column)
    else:
        if not column.get_name():
            column.set_name("%s_%d"%(self.name,idx))
        self.columns.insert(idx,column)
        self.cnames[column.get_name()] = column
        column.set_table(self)
        while idx < len(self.columns):
            if column.get_name() == "%s_%d"%(self.name,idx-1):
                column.set_name("%s_%d"%(self.name,idx))
                self.cnames[column.get_name()] = column
            self.columns[idx].set_idx(idx)
            idx += 1

def listen(

self, listen_func)

register for notifications when a change event is raised on this table

def listen(self,listen_func):
    """ register for notifications when a change event is raised on this table """
    self.listeners.append(listen_func)

def map_column(

self, *args, **kwargs)

@synchronized
def map_column(self, reference ):
    if type(reference) == str or type(reference) == str:
        return self.cnames[reference].get_idx()
    elif type(reference) == int:
        return reference
    else:
        raise TypeError("wrong type in mapping")

def perform_refresh(

self)

Thread worker that sleeps and refreshes the data on a schedule

def perform_refresh( self ):
    """ Thread worker that sleeps and refreshes the data on a schedule """
    start_time = time.time()
    while not self.refresh_thread_stop:
        if time.time() - start_time >= self.refresh_minutes*60.0:
            self.refresh()
            start_time = time.time()
        time.sleep(1)

def put(

self, *args, **kwargs)

@synchronized
def put(self, row, reference, value):
    self.columns[self.map_column(reference)].put(row,value)

def refresh(

self, *args, **kwargs)

refresh or rebuild tables

@synchronized
def refresh( self ):
    """ refresh or rebuild tables """
    es = Elasticsearch()
    result = es.search(index=self.es_index_pattern,body=self.es_query_body)
    def match_fields( name, result ):
        matches = []
        if isinstance(result,dict):
            for k in result:
                full_name = (name+"." if name else "")+k
                item = result[k]
                for json_path,field_name,field_type  in self.es_field_map:
                    if full_name == json_path:
                        matches.append((field_name,field_type,item))
                if isinstance(item,dict) or isinstance(item,list):
                    matches += match_fields(full_name,item)
        elif isinstance(result,list):
            for k in result:
                matches += match_fields(name,k)
        return matches
    matches = match_fields( "",result)
    new_columns = {}
    for column_name,column_type,value in matches:
        if not column_name in new_columns:
            new_columns[column_name] = Column(name=column_name)
        c = new_columns[column_name]
        if column_type == "date":
            cc = Cell(date_type,datetime.fromtimestamp(value/1000),format_date)
        elif column_type == "int":
            cc = Cell(int_type,value,format_int)
        elif column_type == "float":
            cc = Cell(float_type,value,format_float)
        elif column_type == "str":
            cc = Cell(string_type,value,format_string)
        else:
            cc = Cell(string_type,str(value),format_string)
        c.put(c.size(),cc)
    for column_name in new_columns:
        if self.has_column(column_name):
            self.replace_column(self.map_column(column_name),new_columns[column_name])
        else:
            self.add_column(new_columns[column_name])
    self.changed()
    DataTable.refresh(self)

def release_refresh_lock(

self)

release the refresh lock after reading/writing the table state

def release_refresh_lock(self):
    """ release the refresh lock after reading/writing the table state """
    self.refresh_lock.release()

def replace_column(

self, *args, **kwargs)

@synchronized
def replace_column(self,idx,column):
    column.set_idx(idx)
    if not column.get_name():
        column.set_name("%s_%d"%(self.name,idx))
    if idx == len(self.columns):
        self.columns.append(column)
    else:
        del self.cnames[self.columns[idx].get_name()]
        self.columns[idx] = column
    self.cnames[column.get_name()] = column
    column.set_table(self)

def set_es_parameters(

self, *args, **kwargs)

set the elasticsearch parameters for this table

@synchronized
def set_es_parameters( self, es_query_body,es_field_map,es_index_pattern ):
    """ set the elasticsearch parameters for this table """
    self.es_query_body = es_query_body
    self.es_field_map = es_field_map
    self.es_index_pattern = es_index_pattern
    self.refresh()

def start_refresh(

self)

Start the background refresh thread

def start_refresh( self ):
    """ Start the background refresh thread """
    self.stop_refresh()
    self.refresh_thread = threading.Thread(target=self.perform_refresh)
    self.refresh_thread.start()

def stop_refresh(

self)

Stop the background refresh thread

def stop_refresh( self ):
    """ Stop the background refresh thread """
    self.refresh_thread_stop = True
    if self.refresh_thread and self.refresh_thread.is_alive():
        self.refresh_thread.join()
    self.refresh_thread = None
    self.refresh_thread_stop = False

def unlisten(

self, listen_func)

unregister for notifications when a change event is raised on this table

def unlisten(self,listen_func):
    """ unregister for notifications when a change event is raised on this table """
    self.listeners.remove(listen_func)

Instance variables

var es_field_map

var es_index_pattern

var es_query_body