def onScheduledEvent():
date = system.date.now()
Interface.Compressors.runDailyEquipmentUpdate(date)
Function Source Code
class Compressor:
logger = system.util.getLogger('CompressorRuntimeReport')
runTimeTag = 'Run Time Yesterday (Hours)'
jobName = 'Compressor Runtime'
def collectMerrickCompressors():
"""
Query to collect a list of compressor merrickid's and tagpath's
Parameters:
NA
Returns:
dict[merrickId (int), tagpath (str)]
Raises:
NA
"""
query = system.db.runQuery("""
select tagpathfull, me.merrickid from meta_equipment me
join meta_site ms on ms.id = me.siteid
where me.type = 'Gas Compressor'
and me.subtype = 'Gas Compressor'
and me.merrickId > 0
and me.enabled = 1
and ms.enabled = 1
""")
return {item[1]: item[0] for item in query}
def collectCurrentDayRunTime(compressorDict, date=system.date.now()):
"""
Collect run time values from compressor tags
Parameters:
compressorDict (dict[merrickId (int), tagpath (str)]): dict of compressor equipment tagpaths and merrickIds
Returns:
dict[merrickId (int), [runtime (float), date (datetime)]]
Raises:
NA
"""
currentDate = system.date.getDate(
system.date.getYear(date),
system.date.getMonth(date),
system.date.getDayOfMonth(date)
)
mids, tags = compressorDict.keys(), compressorDict.values()
tagReads = system.tag.readBlocking([tag + '/' + Compressor.runTimeTag for tag in tags])
compressorQuality = [val.quality for val in tagReads]
compressorRunTimes = {mids[idx]: [[val.value, currentDate]] for idx, val in enumerate(tagReads) if val.quality.toString() == 'Good'}
droppedTags = {str(mids[i]) + ': ' + tags[i] for i, qual in enumerate(compressorQuality) if qual.toString() != 'Good'}
for droppedTag in droppedTags:
Compressor.logger.debug('%s was removed from the compressor list, as the quality of \'%s\' was not \'Good\''%(droppedTag, Compressor.runTimeTag))
return compressorRunTimes
def createP2Job(datetime):
"""
Generate a job in p2_jobs and returns the job id
Parameters:
None
Returns:
jobId (int): jobid from p2_jobs table
"""
jobId = getP2Job(datetime)
if jobId != 'null':
Compressor.logger.info('Job ID: %d found when attempting to create new job ID'%jobId)
return jobId
category = Compressor.jobName
active = 0
parameters = {
'date': datetime,
'category': category,
'active': active,
}
Compressor.logger.info('Creating new P2 Job Record')
system.db.runNamedQuery('Interface/Add Job', parameters)
jobId = getP2Job(datetime)
Compressor.logger.info('Created new P2 Job Record: %d'%jobId)
return jobId
def getP2Job(date=system.date.now()):
"""
Returns:
jobId (int): jobId from p2_jobs table
"""
category = Compressor.jobName
parameters = {
'category': category,
'date': date,
}
return system.db.runNamedQuery('Interface/Get Job ID', parameters)
def collectMultipleDayRunTimes(compressorDict, days, endDate=system.date.now()):
"""
Collect run time values from compressor tags for multiple days
Parameters:
compressorDict (dict[merrickId (int), tagpath (str)]): dict of compressor equipment tagpaths and merrickIds
days (int): number of days to pull run time historically
(Optional) endDate (datetime): Last date of runtimes | Default: system.date.now()
Returns:
dict[merrickId (int), list[list[runtime (float), date (datetime)]]]
"""
recordHour = 2
start = system.date.addDays(endDate, -days)
end = endDate
endDate = system.date.addHours(system.date.getDate(
system.date.getYear(end),
system.date.getMonth(end),
system.date.getDayOfMonth(end)
),
recordHour
)
startDate = system.date.addHours(system.date.getDate(
system.date.getYear(start),
system.date.getMonth(start),
system.date.getDayOfMonth(start)
),
recordHour
)
mids, tags = compressorDict.keys(), compressorDict.values()
tagHistory = system.tag.queryTagHistory([tag + '/' + Compressor.runTimeTag for tag in tags], startDate, endDate, returnSize=days, aggregationMode="LastValue")
compressorRunTimes = {
mid: [[tagHistory.getValueAt(i,j+1), system.date.addHours(tagHistory.getValueAt(i, 0), -recordHour)] for i in range(days)] for j, mid in enumerate(mids)
}
return compressorRunTimes
def createEquipmentRecords(compressorRunTimes, jobID):
"""
Add compressor run times to a daily equipment record in the interfaces database (p2_equipment_daily)
Parameters:
compressorRunTimes (dict[merrickId (int), list[list[runtime (float), date (datetime)]]]): dict of runtimes and dates
jobID (int): jobId number from the p2_jobs table
Returns:
(int) number of records written
"""
if not compressorRunTimes:
Compressor.logger.debug('No compressor run time records processed')
return 0
entryGen = '(' + '), ('.join(
[
','.join(
(str(jobID),
str(mid),
'1',
'\''+system.date.format(entry[1], "yyyy-MM-dd HH:mm:ss")+'\'',
str(entry[0]))
)
for mid, entries in compressorRunTimes.items()
for entry in entries
]) + ')'
# return entryGen
query = """
insert into Interfaces01.dbo.p2_equipment_daily (jobId, equipId, type, date, hoursOn)
values
%s
"""%entryGen
return system.db.runUpdateQuery(query)
def removePostedEquipment(compressorDict, date=system.date.now()):
"""
Remove merrickIds from runtime dict if they exist in the interfaces table already
Parameters:
compressorDict (dict[merrickId (int), list[list[runtime (float), date (datetime)]]]): dict of merrickIds for posting to interfaces table
date (datetime): dates of records to remove
Returns:
compressorDict (dict[merrickId (int), list[list[runtime (float), date (datetime)]]])
"""
if not compressorDict:
Compressor.logger.debug('CompressorDict provided was empty or missing.')
return {}
path = 'Interface/Get Daily Compressor Interfaces Records'
category = Compressor.jobName
parameters = {
'category': category,
'date': date,
}
existing_records = system.db.runNamedQuery(path, parameters=parameters)
existing_records_list = [existing_records.getValueAt(idx, 0) for idx in range(existing_records.getRowCount())]
return {key: val for key, val in compressorDict.items() if key not in existing_records_list}
def runDailyEquipmentUpdate(datetime):
"""
Process to add daily compressor entries to the interface table
"""
Compressor.logger.info('Running runDailyEquipmentUpdate')
jobId = createP2Job(datetime) # Previously updated to inject datetime
merrickCompressors = collectMerrickCompressors()
runtimeDict = collectCurrentDayRunTime(merrickCompressors)
filteredRuntimeDict = removePostedEquipment(runtimeDict)
records_added = createEquipmentRecords(filteredRuntimeDict, jobId)
Compressor.logger.info('Finished running runDailyEquipmentUpdate. %d items added to interfaces table'%records_added)
Log Result on scheduled run
Log result on run after updating schedule time
Code update
def runDailyEquipmentUpdate(datetime):
"""
Process to add daily compressor entries to the interface table
"""
Compressor.logger.info('Running runDailyEquipmentUpdate')
jobId = createP2Job(datetime)
merrickCompressors = collectMerrickCompressors()
runtimeDict = collectCurrentDayRunTime(merrickCompressors)
filteredRuntimeDict = removePostedEquipment(runtimeDict, datetime) # updated to inject datetime
records_added = createEquipmentRecords(filteredRuntimeDict, jobId)
Compressor.logger.info('Finished running runDailyEquipmentUpdate. %d items added to interfaces table'%records_added)
Gateway Event Script
Function Source Code
Log Result on scheduled run
Log result on run after updating schedule time
Code update