# Import data from the new DIMM into ElasticSearch

We load the modules and other variables. The data_slodar.txt has to be cleaned up since it contains towards the end some text which cannot be parsed.

In [3]:
from elasticsearch import Elasticsearch
import pandas as pd
import os
import numpy as np
import subprocess
path_data = '/data/datalake/asm'
new_dimm_filename = 'data_new_dimm.csv'

We init the elastic search. Instead of servername, just insert the name of your server.

In [4]:
es = Elasticsearch('http://servername:9200', timeout=20.0, bulk_size=100000)

We perform the query

In [11]:
start_date_asm_str='2017-04-28T00:00:00.00'
end_date_asm_str  ='2017-05-01T12:00:00.00'
request_asm_str = ['wget','-O',os.path.join(path_data,new_dimm_filename),\
                   'http://archive.eso.org/wdb/wdb/asm/dimm_paranal/query?wdbo=csv&start_date={0:s}..{1:s}&tab_fwhm=1&tab_rfl=0&tab_rfl_time=0&top=1000000'.format(\
                   start_date_asm_str,end_date_asm_str)]
output,error = subprocess.Popen(request_asm_str,stdout=subprocess.PIPE,stderr=subprocess.STDOUT).communicate()
print(' '.join(request_asm_str))
print(output.decode('UTF8'))

wget -O /data/datalake/asm/data_new_dimm.txt http://archive.eso.org/wdb/wdb/asm/dimm_paranal/query?wdbo=csv&start_date=2017-04-28T00:00:00.00..2017-05-01T12:00:00.00&tab_fwhm=1&tab_rfl=0&tab_rfl_time=0&top=1000000
--2017-05-26 16:39:34--  http://archive.eso.org/wdb/wdb/asm/dimm_paranal/query?wdbo=csv&start_date=2017-04-28T00:00:00.00..2017-05-01T12:00:00.00&tab_fwhm=1&tab_rfl=0&tab_rfl_time=0&top=1000000
Resolving archive.eso.org (archive.eso.org)... 134.171.46.246
Connecting to archive.eso.org (archive.eso.org)|134.171.46.246|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: unspecified [text/plain]
Saving to: ‘/data/datalake/asm/data_new_dimm.txt’

     0K .......... .......... .......... .......... .......... 19.0K
    50K                                                        42.4K=2.6s

2017-05-26 16:39:38 (19.1 KB/s) - ‘/data/datalake/asm/data_new_dimm.txt’ saved [51693]




We read the csv files as a panda array.

In [12]:
new_dimm_df = pd.read_csv(os.path.join(path_data,new_dimm_filename),skiprows=1,skipfooter=5)
print(len(new_dimm_df))

1984


  if __name__ == '__main__':


Let's see how it looks like

In [13]:
new_dimm_df.head()

Unnamed: 0,Date time,"DIMM Seeing [""]"
0,2017-04-28T00:01:01,0.401
1,2017-04-28T00:02:20,0.412
2,2017-04-28T00:03:40,0.457
3,2017-04-28T00:04:59,0.496
4,2017-04-28T00:06:18,0.493


We change the key names and create a dictionnary based on the data frame.

In [16]:
new_dimm_df.rename(columns={'Date time': '@timestamp',\
                        'DIMM Seeing ["]':'dimm_seeing'}, inplace=True)
new_dimm_dict =new_dimm_df.to_dict(orient='records') # this is a list of dict
print(len(new_dimm_dict))
print(new_dimm_dict[0])

128166
{'@timestamp': '2016-04-05T23:50:34', 'mass_GLfrac': 0.47100000000000003, 'mass_dimm_tau0': 0.002679, 'mass_tau0_RMS': 0.023, 'mass_tau0': 0.003039}


We also need now to filter all the NaN values

In [17]:
def clean_dico(dico,inline=True):
    """
    Function that cleans a dictionary from nan vales by removing the entire key in case it encounters a nan value
    Input:
    - dico: the dictionary to clean
    - inline: boolean to specify whether a copy of the dictionary is to be returned or if the cleaning is done inline.
    """
    keys_to_remove = []
    for key in dico.keys():        
        try:
            if np.any(np.isnan(dico[key])):
                keys_to_remove.append(key)
        except TypeError:
            continue
    if inline:
        new_dico = dico
    else:
        new_dico = dico.copy()
    for key in keys_to_remove:
        new_dico.pop(key)
    return new_dico

def clean_dico_list(dico_list,inline=True):
    """
    Function that cleans a list of dictionaries by calling the clean_dico method.
    Input:
    - dico_list: the list of dictionaries to clean
    - inline: boolean to specify whether a copy of the list is to be returned or if the cleaning is done inline. 
    """
    cleaned_dico_list = []
    for dico in dico_list:
        cleaned_dico_list.append(clean_dico(dico,inline=inline))
    return cleaned_dico_list

In [18]:
new_dimm_dict = clean_dico_list(new_dimm_dict)

We insert the entries in the elastic search data base

In [19]:
for i,doc in enumerate(new_dimm_dict):
    try:
        res = es.index(index='asm', doc_type='dimm', body= doc)
    except Exception as e:
        print('Error with document number {0:d}'.format(i))
        print('Th error occured while trying to insert the following dictionary:')
        print(doc)
        print(e)
    if np.mod(i,300)==0:
        print('Inserting {0:d}th document to the asm new dimm elastic search...'.format(i))

Inserting 0th document to the asm slodar elastic search...
Inserting 300th document to the asm slodar elastic search...
Inserting 600th document to the asm slodar elastic search...
Inserting 900th document to the asm slodar elastic search...
