fermi-ad / acsys-python

Python module to access the Fermilab Control System
MIT License
8 stars 4 forks source link

Missing data #73

Closed gopikaops closed 4 months ago

gopikaops commented 5 months ago

Hi Team We are collecting VIMIN data for EDA. We collected B:VIMIN data from June 15, 2023 4 PM -- 10 PM. Via acsys-py, we got 487 data points. This is our DRF request - B:VIMIN@E,10,E,0

However, on the console, there are 209,370 points for the same DRF. Here are the screenshots. image (3) image (4)

Why is so much data missing ?

beauremus commented 5 months ago

Can you share your acsys-python requests? I'm thinking timestamps are the issue at the moment.

gopikaops commented 5 months ago

B:VIMIN<-LOGGER:1686862800000:1686866400000@E,10,E,0

beauremus commented 5 months ago

I'm surprised that works! 😲

Do you mind trying B:VIMIN@E,10,E,0<-LOGGER:1686862800000:1686866400000?

Also, I think there's only an hour between these stamps.

gopikaops commented 5 months ago

oooops its this one only -
drf=f"{parameter}<-LOGGER:1686862800000:1686866400000{node} B:VIMIN@E,10,E,0<-LOGGER:1686862800000:1686866400000

I take either parameter(which contains event) or node (which contains logger ) in case of B:VIMIN@E,10,E,0 the drf is B:VIMIN@E,10,E,0<-LOGGER:1686862800000:1686866400000

and in case of B:LINFRQ,Bstr th drf is B:LINFRQ<-LOGGER:1686862800000:1686866400000:Bstr

beauremus commented 5 months ago

I don't think this will solve your whole problem but your second timestamp should be 1686884400000.

gopikaops commented 5 months ago

I can confirm that I can retrieve smaller datasets like 1 second and maximum unto 1 min 36 seconds. Why am I unable to get 6 h duration or even 1 h duration of data ?

We tried another date - Jan 15 4 PM - 10PM and we still only get 487 data points

beauremus commented 4 months ago

I'm able to make sense of this example. If this doesn't help you find an issue, maybe you can share a snippet from your code?

Jupyter Python notebook showing data logger data acquisition

gopikaops commented 4 months ago

we are getting 487 data points for all our DRF requests no matter the time I can show u in a zoom call

import logging
from sys import stdout
from time import sleep
import csv
import acsys.dpm
import acsys.sync
from acsys.dpm import ItemData
from datetime import datetime
acsys_logger = logging.getLogger("acsys")
logger = logging.getLogger(__name__)
acsys_logger.setLevel("WARN")
logger.setLevel("INFO")
handler = logging.StreamHandler(stdout)
acsys_logger.addHandler(handler)
logger.addHandler(handler)

field_names = ["micro_timestamp","data"]
results = []

def general_getter(parameter, node):
    async def wrapper(con):
        async with acsys.dpm.DPMContext(con) as dpm:
            print("This code is running for RPOS")
            sleep(1)
            await dpm.add_entry(0, drf=f"{parameter}<-LOGGER:1706724495000:1706731695000{node}")
            await dpm.start()
            async for evt_res in dpm:
                if isinstance(evt_res, ItemData):
                    print(evt_res)
                for index, data in enumerate(evt_res.data):
                        results.append({
                            'micro_timestamp': evt_res.micros[index],
                            'data': data
                        })
                print("!")
                print(results)
                return results

    return acsys.run_client(wrapper)

if __name__ == "__main__":
    devices_file = 'devices.txt'
    with open(devices_file) as infile:
        for line in infile:
            parameter = line.rstrip()
            print(parameter)
            if parameter.__contains__('@'):
                general_getter(parameter, "")
                csv_file='data/'+parameter[0:parameter.find("@")]+'.csv'
                with open(csv_file, 'w') as csvfile:
                    writer = csv.DictWriter(csvfile, fieldnames=field_names)
                    writer.writeheader()
                    writer.writerows(results)
                results = []

            else:
                device = parameter[0:parameter.find(",")]
                node = parameter[parameter.find(",")+1:len(parameter)]
                print(device)
                print(node)
                general_getter(device, ':'+node)
                csv_file = 'data/'+device+'.csv'
                with open(csv_file, 'w') as csvfile:
                    writer = csv.DictWriter(csvfile, fieldnames=field_names)
                    writer.writeheader()
                    writer.writerows(results)
                results = []

This is my devices.txt B:VIMIN@E,10,E,0

beauremus commented 4 months ago

487 data points

This is a good clue! I think this is the size of a single response. This is a large request that you'll receive in chunks. I think you're just reading the first chunk/response.

beauremus commented 4 months ago
            async for evt_res in dpm:
                if isinstance(evt_res, ItemData):
                    print(evt_res)
                for index, data in enumerate(evt_res.data):
                        results.append({
                            'micro_timestamp': evt_res.micros[index],
                            'data': data
                        })
                print("!")
                print(results)
                return results

The return at the end means you are breaking out of the loop on the first time through. You need to check that you have all the data before leaving the loop.

Notice where I break in my code. We send back an empty list when there's no more data. So, you can check for this condition before breaking or returning.

# If there's no more data, leave the `for` loop.
if evt_res.data == []:
    break
gopikaops commented 4 months ago

I fixed it

Could you please review my code -

import logging
from sys import stdout
from time import sleep
import csv
import acsys.dpm
import acsys.sync
from acsys.dpm import ItemData
from datetime import datetime
acsys_logger = logging.getLogger("acsys")
logger = logging.getLogger(__name__)
acsys_logger.setLevel("WARN")
logger.setLevel("INFO")
handler = logging.StreamHandler(stdout)
acsys_logger.addHandler(handler)
logger.addHandler(handler)

field_names = ["micro_timestamp","data"]
results = []

def general_getter(parameter, node):
    async def wrapper(con):
        async with acsys.dpm.DPMContext(con) as dpm:
            print("Feb 1, 2024 13:37")
            sleep(1)
            await dpm.add_entry(0, drf=f"{parameter}<-LOGGER:1686862800000:1686884400000{node}")
            await dpm.start()
            async for evt_res in dpm:
                for index, data in enumerate(evt_res.data):
                        results.append({
                            'micro_timestamp': evt_res.micros[index],
                            'data': data
                        })
                if (evt_res.data) == []:
                    break
            print("!")
            print(results)

    return acsys.run_client(wrapper)

if __name__ == "__main__":
    devices_file = 'devices.txt'
    with open(devices_file) as infile:
        for line in infile:
            parameter = line.rstrip()
            print(parameter)
            if parameter.__contains__('@'):
                general_getter(parameter, "")
                csv_file='data/'+parameter[0:parameter.find("@")]+'.csv'
                with open(csv_file, 'w') as csvfile:
                    writer = csv.DictWriter(csvfile, fieldnames=field_names)
                    writer.writeheader()
                    writer.writerows(results)
                results = []

            else:
                device = parameter[0:parameter.find(",")]
                node = parameter[parameter.find(",")+1:len(parameter)]
                print(device)
                print(node)
                general_getter(device, ':'+node)
                csv_file = 'data/'+device+'.csv'
                with open(csv_file, 'w') as csvfile:
                    writer = csv.DictWriter(csvfile, fieldnames=field_names)
                    writer.writeheader()
                    writer.writerows(results)
                results = []

Our devices.txt accommodate both collecting at events and collecting from nodes like so -

B:LINFRQ,Bstr
B:VIMIN@E,10,E,0
B:VIMAX,Bstr

EDIT: Added the python qualifier to the code block for highlighting. - beau

beauremus commented 4 months ago

The only potential bug I see is here:

            async for evt_res in dpm:
                for index, data in enumerate(evt_res.data):

You should check that the response is data and not a status message. Without this guard, if you get a status message, you'll get a stack trace saying that evt_res doesn't have a data property.

            async for evt_res in dpm:
                if evt_res.isReading:
                    for index, data in enumerate(evt_res.data):

If you are running this by hand and are willing to just rerun it when it fails, you don't need to do this.

If you want feedback beyond bugs, I'd prefer a pull request so that I can add comments to explain when and why I would make changes.

EDIT: Remove the parenthesis bug that @gopikaops mentions below. - beau

gopikaops commented 4 months ago

I have a git repo - let me send a pull request for this

gopikaops commented 4 months ago

that broke the code

AD142074-MLT:gmps-ml gopikab$ python3 collection_gmps_ml_project_parameters.py 
B:VIMIN@E,10,E,0
Feb 1, 2024 13:37
Traceback (most recent call last):
  File "/Users/gopikab/Documents/vimin-ml/gmps-ml/collection_gmps_ml_project_parameters.py", line 55, in <module>
    general_getter(parameter, "")
  File "/Users/gopikab/Documents/vimin-ml/gmps-ml/collection_gmps_ml_project_parameters.py", line 43, in general_getter
    return acsys.run_client(wrapper)
  File "/Users/gopikab/Library/Python/3.9/lib/python/site-packages/acsys/__init__.py", line 824, in run_client
    return loop.run_until_complete(client_fut)
  File "/Users/gopikab/Library/Python/3.9/lib/python/site-packages/nest_asyncio.py", line 90, in run_until_complete
    return f.result()
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/asyncio/futures.py", line 201, in result
    raise self._exception
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/asyncio/tasks.py", line 256, in __step
    result = coro.send(None)
  File "/Users/gopikab/Library/Python/3.9/lib/python/site-packages/acsys/__init__.py", line 800, in __client_main
    result = (await main(con, **kwargs))
  File "/Users/gopikab/Documents/vimin-ml/gmps-ml/collection_gmps_ml_project_parameters.py", line 30, in wrapper
    if evt_res.isReading():
TypeError: 'bool' object is not callable

This is the code - https://github.com/fermi-ad/gmps-ml/blob/main/collection.py Feel free to make a PR

beauremus commented 4 months ago

I apologize for the error. .isReading is a property, not a function. That's why it says "not callable". If you remove the parenthesis, that should fix this error. if evt_res.isReading:.

gopikaops commented 4 months ago

Thanks!