i4Ds / STIXCore

STIX Core functionalities
BSD 3-Clause "New" or "Revised" License
3 stars 3 forks source link

Really hard to override IDB version #366

Open samaloney opened 11 months ago

samaloney commented 11 months ago

When trying to process some data from the AX with the scrip below had to make several attempts to get the right IDB version as the OBT time in the AX data if way in the past. Setting didn't work as I immediately expected maybe need to document better or something

idbm = IDBManager(Path('/Users/sm/Projects/STIX/STIXCore/stixcore/data/idb'))
idbm.download_version("2.26.36", force=True)

git diff --patch


 class IDBManager(metaclass=Singleton):
@@ -152,6 +153,7 @@ class IDBManager(metaclass=Singleton):
         `str`
             a version label
         """
+        return '2.26.36'
         try:
             if not obt:
                 return next(iter(self.history.at(self.history.begin()))).data
diff --git a/stixcore/tmtc/packets.py b/stixcore/tmtc/packets.py
index ffb6d17..b928df5 100644
--- a/stixcore/tmtc/packets.py
+++ b/stixcore/tmtc/packets.py
@@ -273,8 +273,8 @@ class TMPacket(GenericPacket):

         self.idb = idb
         if not idb:
-            self.idb = IDBManager.instance.get_idb(obt=self.data_header.datetime)
-            # self.idb = IDBManager.instance.get_idb('2.26.35')
+            # self.idb = IDBManager.instance.get_idb(obt=self.data_header.datetime)
+            self.idb = IDBManager.instance.get_idb('2.26.36')

     @property
     def key(self):
@@ -520,8 +520,8 @@ class GenericTMPacket:
         self.pi1_val = getattr(data, 'pi1_val', None)

         if isinstance(self.idb, IDBManager):
-            idb = self.idb.get_idb(obt=self.data_header.datetime)
-            # idb = self.idb.get_idb('2.26.35')
+            #idb = self.idb.get_idb(obt=self.data_header.datetime)
+            idb = self.idb.get_idb('2.26.36')

         packet_info = idb.get_packet_type_info(self.data_header.service_type,
                                                self.data_header.service_subtype,
import logging
from binascii import unhexlify
from collections import defaultdict
from pathlib import Path
from xml.etree import ElementTree as Et

import numpy as np
from astropy.table.table import Table
from bitstring import ConstBitStream, ReadError

from stixcore.config.config import CONFIG
from stixcore.ephemeris.manager import SpiceKernelManager, Spice
from stixcore.idb.manager import IDBManager
from stixcore.io.fits.processors import FitsL0Processor
from stixcore.products.levelb.binary import LevelB
from stixcore.products.product import Product
from stixcore.tmtc.packet_factory import Packet
from stixcore.tmtc.packets import SourcePacketHeader, TMPacket, GenericTMPacket

_spm = SpiceKernelManager(Path(CONFIG.get("Paths", "spice_kernels")))
Spice.instance = Spice(_spm.get_latest_mk())

idbm = IDBManager(Path('/Users/sm/Projects/STIX/STIXCore/stixcore/data/idb'))
idbm.download_version("2.26.36", force=True)
idbm.force_version = '2.26.36'
idb = idbm.get_idb('2.26.36')

# bdata = b''
# tree = Et.parse('/Users/shane/Projects/STIX/dataview/data/tm_test/LTP03_morning_req_BatchRequest.PktTmRaw.SOL.0.2021.071.15.31.29.346.FelM@2021.083.06.00.01.912.1.xml')
# root = tree.getroot()
# res = defaultdict(list)
# for i, node in enumerate(root.iter('Packet')):
#     packet_binary = unhexlify(node.text)
#     # Not sure why guess and extra moc header
#     #packet_data.append(packet_binary[76:])
#     bdata += packet_binary[76:]
#     # res[(p.data_header.service_type, p.data_header.service_subtype, p.pi1_val)].append(p)

# with open('/Users/shane/Downloads/ASV_v181_data_requests.ascii') as f:
# with open('/Users/shane/Downloads/20210406-1725.bsd_data_export.asc') as f:
#     hex_data = f.readlines()

# bdata = [unhexlify(h.split()[-1].strip()) for h in hex_data]

# # with open('/Users/shane/Downloads/BSD_Test_New_238-1_Sizing.bin', 'rb') as f:
with open('/Users/sm/Downloads/20231018_trigger_scaling_samples.bin', 'rb') as f:
    bdata = f.read()

bsdata = ConstBitStream(bdata)

res = defaultdict(list)

# error = []
# for i, bd in enumerate(bdata):
#     try:
#         tm = TMPacket(ConstBitStream(bd), idb=idb)
#         res[tm.key].append(tm)
#         print('Parsed line ', i)
#     except Exception:
#         print('Error with line ', i)
#         error.append(i)
#         continue

start = 0
i = 0
while True:
    sph = SourcePacketHeader(bsdata[start:])
    end = start + (sph.data_length + 7) * 8
    print(i, start, end)

    try:
        tm = TMPacket(bsdata[start:end].bytes, idb=idb)
        res[tm.pi1_val].append(tm)
    except Exception:
        break

    if end == bsdata.len:
        break

    i += 1
    start = end

print(bsdata.len)

fits_writer = FitsL0Processor(Path('/Users/sm/Downloads/20231018_trigger_scaling_samples'))

for ssid, packets in res.items():
    # t, st, ssid = ssid
    # if t == 21 and ssid in {20, 21, 22, 23, 24, 30, 31, 32, 33, 34, 41, 42}:
    if ssid in {21, 22, 23, 24, 30, 31, 32, 33, 34, 43}:
        headers = []
        hex_data = []
        for packet in packets:
            sh = vars(packet.source_packet_header)
            bs = sh.pop('bitstream')
            hex_data.append(bs.hex)
            dh = vars(packet.data_header)
            dh.pop('datetime')
            headers.append({**sh, **dh})

        control = Table(headers)
        control['index'] = np.arange(len(control), dtype=np.int64)
        control['raw_file'] = ''
        control['packet'] = 0

        data = Table()
        data['control_index'] = np.array(control['index'], dtype=np.int64)
        data['data'] = hex_data
        product = LevelB(service_type=dh['service_type'], service_subtype=dh['service_subtype'],
                         ssid=ssid, control=control, data=data)

        cur_complete, cur_incomplete = product.extract_sequences()

        tmp = Product._check_registered_widget(
            level='L0', service_type=product.service_type,
            service_subtype=product.service_subtype, ssid=product.ssid, data=None,
            control=None)

        for c in cur_complete:
            level0 = tmp.from_levelb(c, parent='')
            fits_writer.write_fits(level0)
            # try:
            #     level0 = tmp.from_levelb(c, parent='')
            #
            # except (AttributeError, ReadError) as e:
            #     logging.error(e, stack_info=True)
        # for c in cur_incomplete:
        #     level0 = tmp.from_levelb(c)
        #     fits_writer.write_fits(level0)