archiver-appliance / epicsarchiverap

This is an implementation of an archiver for EPICS control systems that aims to archive millions of PVs.
Other
38 stars 38 forks source link

Corrupted chunks in HTTP/PB #97

Open emanuelelaface opened 4 years ago

emanuelelaface commented 4 years ago

I have an Archiver Appliance and I am trying to get the data from it in Python from the raw interface. My code is:

import requests
import EPICSEvent_pb2
import collections
import datetime

ESC_BYTE = b'\x1B'
NL_BYTE = b'\x0A'
CR_BYTE = b'\x0D'
PB_REPLACEMENTS = collections.OrderedDict([
    (ESC_BYTE + b'\x01', ESC_BYTE),
    (ESC_BYTE + b'\x02', NL_BYTE),
    (ESC_BYTE + b'\x03', CR_BYTE),
])
TYPE_MAPPINGS = {
    0: EPICSEvent_pb2.ScalarString,
    1: EPICSEvent_pb2.ScalarShort,
    2: EPICSEvent_pb2.ScalarFloat,
    3: EPICSEvent_pb2.ScalarEnum,
    4: EPICSEvent_pb2.ScalarByte,
    5: EPICSEvent_pb2.ScalarInt,
    6: EPICSEvent_pb2.ScalarDouble,
    7: EPICSEvent_pb2.VectorString,
    8: EPICSEvent_pb2.VectorShort,
    9: EPICSEvent_pb2.VectorFloat,
    10: EPICSEvent_pb2.VectorEnum,
    11: EPICSEvent_pb2.VectorChar,
    12: EPICSEvent_pb2.VectorInt,
    13: EPICSEvent_pb2.VectorDouble,
    14: EPICSEvent_pb2.V4GenericBytes
}
def unescape_bytes(byte_seq):
    for key, value in PB_REPLACEMENTS.items():
        byte_seq = byte_seq.replace(key, value)
    return byte_seq

PV='ISrc-010:PBI-BCM-001:AI4-IPCH-RBV'
start_date='2019-04-10T00:00:00.000Z'
end_date='2019-04-21T00:00:00.000Z'
raw_url = 'http://archiver-01.tn.esss.lu.se:17668/retrieval/data/getData.raw?pv={}&from={}&to={}'.format(PV,start_date,end_date)
pv_page = requests.get(raw_url)
chunks = pv_page.content.split(b'\n\n')
raw_data=[]
for chunk in chunks:
    meta={}
    chunk_rows = chunk.split(b'\n')
    header = chunk_rows[0]
    info = EPICSEvent_pb2.PayloadInfo()
    info.ParseFromString(unescape_bytes(header))
    timestamp = datetime.datetime(info.year,1,1,0,0).timestamp()
    meta['name']=info.pvname
    for attr in info.headers:
        meta[attr.name]=attr.val
    event = TYPE_MAPPINGS[info.type]()
    data=[]
    for data_row in chunk_rows[1:]:
        try:
            event.ParseFromString(unescape_bytes(header+data_row))
            event_dict={}
            event_dict['secs']=int(event.secondsintoyear+timestamp)
            event_dict['val']=event.val
            event_dict['severity']=event.severity
            event_dict['nanos']=event.nano
            event_dict['status']=event.status
            data.append(event_dict)
        except:
            print('ERROR', data_row)
    if len(data)>0:
        raw_data.append({'meta':meta, 'data':data})

This code works quite well, but from time to time (in about 1 every 20k entries) it goes into the exception because the event.ParseFromString fails with DecodeError: Error parsing message

I check if that entries are corrupted, but if I get the data from the json interface I can see them. Moreover, if I manually add one byte to the broken string I am able to decode it, even if the values inside are not all correct (the secondsintoyear and nano are ok, the value not, but I guess that it is because I am adding my random byte at the end of the string).

These are examples of wrong data that I have:

b'\x08\xf5\xa1\x8a\x04\x10\xea\xc7\xdf\x8b\x01\x19\xbf`7l\x1b\x01\x02\xb7?'
b'\x08\x98\xbf\x8a\x04\x10\xf7\xb5\xf3\xe9\x02\x19\xc0`7l\x1b\x01\x03\xbd?'
b'\x08\xfc\xa0\x8b\x04\x10\xf0\xec\xc6\x0c\x19EdX\xc5\x1b\x01\x03\xb9?'
b'\x08\xe5\xb4\x8b\x04\x10\xa8\xb5\xfd\xe4\x01\x19rP\xc2L\x1b\x01\x03\xbd?'
b'\x08\xd0\xbd\x8b\x04\x10\xd4\xb2\xb9\xae\x02\x19\x84\xbb\xb3v\x1b\x01\x02\xb6?'
aawdls commented 4 years ago

HI @emanuelelaface

I have started some work recently to look for problems in our PB data files, as we have some occasional corruptions for different reasons: for example, some data which we have converted using ca2aa ends up with the wrong data type in the PayloadInfo, so all the event values in that file can't be decoded. I have seen something similar to the problem you describe in some of my files - i.e. occasional events which cannot be decoded at all. I haven't yet taken the time to see if retrieving them via JSON gets back the events which I can't read directly from the files. I am currently preparing to run my checks more systematically so will let you know if I find anything relevant to this issue.

Do the retrieval logs say anything interesting when you fetch this PV via PB versus JSON?

By the way, we have a python library for interacting with the archiver appliance which you might find useful: https://github.com/dls-controls/aapy

emanuelelaface commented 4 years ago

I tried your library (actually the unescape function comes from there) but it crashes with the error in parsing message, so I decided to try to decode by myself, at least I can handle the exception and skip the "wrong" values.

The JSON works fine. I do not have access to the logs (I am just a user of the Archiver) but I will ask to our sysadmin if he can check.

slacmshankar commented 4 years ago

If possible, send me the PB file itself and I can try this out here and help out. Also, Mike has some utils here - https://github.com/epicsdeb/carchivetools which may be useful.

emanuelelaface commented 4 years ago

I just tried with the PB file and I discovered that I have the same problem. The data in the file are exactly the same that I have from the web server, so the problem is not in the storage but in the Python parser. I say that is in the Python because the JSON output of the Archiver Appliance does not have this issue, so the parser of the Archiver process correctly all the data. I wonder if it is an issue in the way the protobuffer file is generated for Python. I will try with another language (Java and/or C) and see how it works. If you want to try with the file I can pass you somehow, but it is 2 GB, so please contact me privately and I will give you a link to access it.

slacmshankar commented 4 years ago

Since you think the file is ok; I'll try first with data from my production boxes...

emanuelelaface commented 4 years ago

Ok, the issue is quite stupid and it is in the unescaping function in python. All the implementations that I saw are equivalent to:

def unescape_bytes(byte_seq):
    byte_seq = byte_seq.replace(b'\x1b\x01',b'\x1b') 
    byte_seq = byte_seq.replace(b'\x1b\x02',b'\x0a')
    byte_seq = byte_seq.replace(b'\x1b\x03',b'\x0d')
    return byte_seq

while it should be:

def unescape_bytes(byte_seq):
    byte_seq = byte_seq.replace(b'\x1b\x02',b'\x0a')
    byte_seq = byte_seq.replace(b'\x1b\x03',b'\x0d')
    byte_seq = byte_seq.replace(b'\x1b\x01',b'\x1b')    
    return byte_seq

The first one generates an additional escape in the sequences like \x1b\01x\x02 escaping twice removing one byte that cause the error in the decoding.

So for me the issue is solved, thanks to everyone for the help.

aawdls commented 4 years ago

Well spotted. I have reproduced this on one of my own files. Fixing the unescape order does make this problem go away. I will fix it in aapy.