lsst-sims / legacy_sims_catUtils

LSST Simulations package for catalog utilities
3 stars 4 forks source link

Regarding Parallelizing CatSim #18

Open JPGlaser opened 9 years ago

JPGlaser commented 9 years ago

Hey All,

This is Joe again. So I have been working on creating a bulk python script that can compute instance catalogs through an array of observations, hopefully through parallel processing. I have included the current state of the script below.

However, I am running into a problem with the SQL galaxy database portion of my instance catalog writer. When I run the code on two processors, I am returned an error:

Traceback (most recent call last):
  File "genLSSTAGNData_Par.py", line 161, in <module>
    pool.map(CreateCatParallel, Pointing)
  File "/Users/jglaser/lsst/DarwinX86/anaconda/master-g68783b1848/lib/python2.7/multiprocessing/pool.py", line 251, in map
    return self.map_async(func, iterable, chunksize).get()
  File "/Users/jglaser/lsst/DarwinX86/anaconda/master-g68783b1848/lib/python2.7/multiprocessing/pool.py", line 558, in get
    raise self._value
sqlalchemy.exc.OperationalError: (OperationalError) (20020, 'DB-Lib error message 20020, severity 9:\nBad token from the server: Datastream processing out of sync\n') "EXECUTE [LSST].[dbo].[GalaxySearch2015]\n                   @ApertureStr = 'REGION CIRCLE J2000 25.052981 -26.341448 3.000000', @ColumnNames = 'av_b as internalAvBulge,sedname_agn as sedFilenameAgn,dec as decJ2000,redshift as redshift,galid as galid,AGNID as AGNID,varParamStr as varParamStr,ra as raJ2000,av_d as internalAvDisk,magnorm_bulge as magNormBulge,magnorm_agn as magNormAgn,magnorm_disk as magNormDisk',\n                   @ComponentSubset = 'ALL' , @WhereClause = '(sedname_agn IS NOT NULL) AND (magnorm_agn < 24.0)'" {}

Since my personal experience with SQL is quite limited, I was wondering if any of you guys/gals know what this error is being caused by. A google search is rather inconclusive other than a possibility of too many connections being generated (though the limit should be somewhere around 32K concurrent connections). Do we know if the InstanceCatalog class closes its SQL connection each time it is ran?

~ Joe


import numpy as np
import scipy
import matplotlib.pyplot as plt
import time
import sys

###############################################################################
#                      Set Up Observation Variables                           #
###############################################################################
global AperatureRadius

AperatureType = 'circle'
AperatureRadius = 0.1 #FoV in degrees
#global AperatureRadius = 1.75 #LSST's Actual FoV in degrees
SearchRegionRA = (25.0,30.0)
SearchRegionDec = (-30.0,-25.0)
SearchAirmass = (1.0,1.5)
DesiredFilter = None

###############################################################################
#                     Search DB for the Observations                          #
###############################################################################
import eups
import os
from lsst.sims.catUtils.utils import ObservationMetaDataGenerator
from lsst.sims.catalogs.generation.db import ObservationMetaData
#help(ObservationMetaDataGenerator)

opsimdb = os.path.join(eups.productDir('sims_data'),'OpSimData','enigma_1189_sqlite.db')
gen = ObservationMetaDataGenerator(driver='sqlite', database=opsimdb)
SimObData = gen.getObservationMetaData(boundType=AperatureType, boundLength=AperatureRadius, 
                                       fieldRA=SearchRegionRA, fieldDec=SearchRegionDec,
                                       airmass=SearchAirmass, telescopeFilter=DesiredFilter)
#print SimsObData[0].__dict__

###############################################################################
#                  Output Basic Table of Returned Query                       #
###############################################################################
from prettytable import PrettyTable

NumOfObservations = len(SimObData)
UniquePointings = list({(o.unrefractedRA,o.unrefractedDec) for o in SimObData})
NumOfPointings = len(UniquePointings)
print 'Number of Unique Pointings:', NumOfPointings,'\n'
table2 = PrettyTable(["Pointing RA","Pointing Dec"])
for x in UniquePointings:
    table2.add_row([x[0], x[1]])
print table2

###############################################################################
#              Restructure Returned Query to Correct Format                   #
###############################################################################
ObMetaData = [[] for _ in xrange(NumOfPointings)]
for i in xrange(NumOfPointings):
    for o in SimObData:
        if UniquePointings[i][0] == o.unrefractedRA and UniquePointings[i][1] == o.unrefractedDec:
            ObMetaData[i].append(o)

###############################################################################
#                   Connect to the UoW Galaxy Database                        #
###############################################################################
from lsst.sims.catUtils.baseCatalogModels import GalaxyTileObj
galaxyDB = GalaxyTileObj()
#galaxyDB.show_mapped_columns()

###############################################################################
#                 Define Instance Catalog Class for AGNs                      #
###############################################################################
from lsst.sims.catalogs.measures.instance import InstanceCatalog, compound
from lsst.sims.photUtils import PhotometryGalaxies, VariabilityGalaxies

class FastVarAgnCat(InstanceCatalog, PhotometryGalaxies, VariabilityGalaxies):
    cannot_be_null = ['uAgn'] #Locating only the AGNs in the FoV.
    column_outputs = ['AGNID', 'redshift', 'raJ2000', 'decJ2000', 'ObsAgn', 'sigma_ObsAgn']
    transformations = {'raJ2000':np.degrees, 'decJ2000':np.degrees}

    #Only Append the Column of AGN Magnitudes in the Observed Bandpass
    @compound('ObsAgn', 'sigma_ObsAgn')
    def get_PhotometryCols(self):
        mag = None
        sigma = None
        if self.obs_metadata.bandpass is not None:
            if not hasattr(self.obs_metadata.bandpass, '__iter__'):
                Filter = self.obs_metadata.bandpass
                mag = self.column_by_name(Filter+'Agn')
                sigma = self.column_by_name('sigma_'+Filter+'Agn')
        return np.array([mag, sigma])

    #Don't Calculate Photometry for the Galaxy Bulge or Disk
    @compound('sedFilenameBulge', 'sedFilenameDisk')
    def get_QuickSED(self):
        ra = self.column_by_name('raJ2000') #Finding how many objects are in the column.
        names = []
        for r in ra:
            names.append('None') #Tricking the catalog into thinking these galaxies don't have bulges or disks.
        return np.array([names, names])

###############################################################################
#                  Define the Observation Header Writer                       #
###############################################################################
def ObsHeader(IC, file_handle):
    ObsHeaderTransformations = {'Unrefracted_RA':np.degrees, 'Unrefracted_Dec':np.degrees,
                               'Opsim_moonra':np.degrees, 'Opsim_moondec':np.degrees,
                               'Opsim_rotskypos':np.degrees, 'Opsim_rottelpos':np.degrees,
                               'Opsim_sunalt':np.degrees, 'Opsim_moonalt':np.degrees,
                               'Opsim_dist2moon':np.degrees, 'Opsim_altitude':np.degrees,
                               'Opsim_azimuth':np.degrees
                               }
    md = IC.obs_metadata.phoSimMetaData
    for k in md:
        if k in ObsHeaderTransformations.keys():
            file_handle.write(str(k)+" "+str(ObsHeaderTransformations[k](md[k][0]))+"\n")
        else:
            file_handle.write(str(k)+" "+str(md[k][0])+"\n")

###############################################################################
#            Define the Catalog Writer for Parallel Processes                 #
###############################################################################
def CreateCatParallel(Observation):
    PointingDir = 'RA[%.2f]_DEC[%.2f]' %(Observation.unrefractedRA, Observation.unrefractedDec)
    WorkingDir = PrimaryDir+'/'+CatDir+'/'+PointingDir
    if not os.path.exists(WorkingDir):
        os.makedirs(WorkingDir)
    CatFileName = 'LSSTAGN_%.5f.txt' %(Observation.mjd)
    if os.path.isfile(WorkingDir+'/'+CatFileName) == False:
        SQLRules = '(sedname_agn IS NOT NULL) AND (magnorm_agn < 24.0)'
        variableAgn = FastVarAgnCat(galaxyDB, obs_metadata=Observation, constraint=SQLRules)
        # Writing the Observational Meta-Data Header
        with open(WorkingDir+'/'+CatFileName, "w") as fh:
            ObsHeader(variableAgn, fh)
            fh.write("\n")
            fh.close()
        variableAgn.write_catalog(WorkingDir+'/'+CatFileName, write_mode='a')

###############################################################################
#              Create the Catalogs with Parallel Processing                   #
###############################################################################
from joblib import Parallel, delayed  
import multiprocessing

global PrimaryDir, CatDir
PrimaryDir = 'TestCats/QuickSurvey_R%.2f' %(AperatureRadius)
CatDir = 'Catalogs'

if not os.path.exists(PrimaryDir+'/'+CatDir):
        os.makedirs(PrimaryDir+'/'+CatDir)
start = time.clock()
for Pointing in ObMetaData:
    #NumUsedCores = multiprocessing.cpu_count()
    NumUsedCores = 2
    #Parallel(n_jobs=NumUsedCores, verbose=5)(delayed(CreateCatParallel)(o) for o in Pointing)
    pool = multiprocessing.Pool(processes=NumUsedCores)
    pool.map(CreateCatParallel, Pointing)
    pool.close()
end = time.clock()
print "Creating the catalogs took", end - start, "seconds."
JPGlaser commented 9 years ago

A small additional note: running the process with a single threat seems to not raise this error. Thus, I assume it likely has something to do with it trying to access the same DB on two different threads.

~ Joe

EDIT: It appears by editing the CreateCatParallel() Function to the following solves the bug, though I am unsure if it is being properly parallelized as only one file is written at a time ...

from lsst.sims.catUtils.baseCatalogModels import GalaxyTileObj
def CreateCatParallel(Observation):
    galaxyDB = GalaxyTileObj()
    #galaxyDB.show_mapped_columns()
    PointingDir = 'RA[%.2f]_DEC[%.2f]' %(Observation.unrefractedRA, Observation.unrefractedDec)
    WorkingDir = PrimaryDir+'/'+CatDir+'/'+PointingDir
    if not os.path.exists(WorkingDir):
        os.makedirs(WorkingDir)
    CatFileName = 'LSSTAGN_%.5f.txt' %(Observation.mjd)
    if os.path.isfile(WorkingDir+'/'+CatFileName) == False:
        SQLRules = '(sedname_agn IS NOT NULL) AND (magnorm_agn < 24.0)'
        variableAgn = FastVarAgnCat(galaxyDB, obs_metadata=Observation, constraint=SQLRules)
        # Writing the Observational Meta-Data Header
        with open(WorkingDir+'/'+CatFileName, "w") as fh:
            ObsHeader(variableAgn, fh)
            fh.write("\n")
            fh.close()
        variableAgn.write_catalog(WorkingDir+'/'+CatFileName, write_mode='a')

EDIT 2: So it seems that when the script runs on the Dirac cluster, it is generating the required number of processes and working its tail off. However, I cannot tell if the processes are indeed writing multiple files at a time as hoped. I have timers in the script, so we will have to wait for it to finish to find out. In the mean time, suggestions would be helpful.

yalsayyad commented 9 years ago

Hi Joe, Each GalaxyTileObj() instance is its own database connection. Two processes can't access data through the same database connection. You ever try to make two processes append to the same file when using the mulitprocessing module? It's like that.

Instead, you should have each process instantiate a new GalaxyTileObj(). It looks like you can do this by putting 'galaxyDB = GalaxyTileObj()' inside CreateCatParallel()

JPGlaser commented 9 years ago

Ah, I see. I was unsure if it was the InstanceCatalog class or the GalaxyTileObj class that was starting the SQL connection. If the case is that the GalaxyTileObj creates the new connection, then yeah adding it there fixes the issue. I will report my findings on speed saving when it finishes and then close the issue. Thanks @yalsayyad!

~ Joe

JPGlaser commented 9 years ago

Alright so the results are in! Running the code for all observations taken at the unique pointing of (26.1754049832, -23.3006465419) for a ten year span takes the following amount of time:

Thus we have a speed-up factor of 6.1609 by just using the Python multiprocessing command. I will include the finalized code below for those who wish to test it out at a later point!

The only additional question now is if writing the catalogs as ASCII text files is indeed the fastest way to output the results. Thoughts on this, @danielsf and @yalsayyad?

~ Joe

EDIT: Here is Version 0.95 of the parallelized bulk run script. https://github.com/JPGlaser/LSST_CatGen/blob/master/genLSSTAGNData_Par.py

You can get more info on how to run it with options by running:

python -W ignore genLSSTAGNData_Par.py -h