mwvgroup / Pitt-Google-Broker

A Google Cloud-based alert broker for LSST and ZTF
https://pitt-broker.readthedocs.io/en/latest/index.html
4 stars 0 forks source link

Connect to LIGO/Virgo stream #121

Open troyraen opened 2 years ago

troyraen commented 2 years ago

Related to #126

LIGO/Virgo alerts are distributed through GCN in VOEvent format, XML documents.

wmwv commented 1 year ago

GCN is runs a Kafka server and supply alerts for LIGO, IceCube, Swift, Fermi, ...

Data rate is ~10 / day. (astrophysical events; ignore Swift slews)

What's the next step to get things set up?

troyraen commented 1 year ago

Next steps would be to connect our consumer to their Kafka server, then update our pipeline to handle those alerts. It would be helpful if I merged #123 and #194. I will move them up on my priority list.

troyraen commented 1 year ago

123 is now merged and #194 is ready for review.

hernandezc1 commented 10 months ago

There are a few prerequisites that need to be satisfied in order to receive LIGO/Virgo/KAGRA alerts from GCN Notices over Kafka. A user must first be authenticated, and this can be done by signing up for an account using the link provided. An authenticated user is then given a client ID and client_secret value, as well as a sample Python script that is used to receive and parse through notices.

Users can select to receive either JSON-serialized or Avro-serialized notices over Kafka (these two options are recommended over receiving VOEvent Notices via GCN Classic). In order to consume notices over Kafka, a user will need to select one of the two options:

  1. gcn-kafka for connecting to GCN Kafka to receive JSON-serialized notices over Kafka
  2. hop-client for connecting to HOPSKOTCH to receive Avro-serialized notices over Kafka

Below is an example of a Python script that can be used to receive and parse JSON-serialized LIGO/Virgo/KAGRA notices

from base64 import b64decode
from io import BytesIO
import json
from pprint import pprint

from astropy.table import Table
import astropy_healpix as ah
from gcn_kafka import Consumer
import numpy as np

def parse_notice(record):
    record = json.loads(record)

    # Only respond to mock events. Real events have GraceDB IDs like
    # S1234567, mock events have GraceDB IDs like M1234567.
    # NOTE NOTE NOTE replace the conditional below with this commented out
    # conditional to only parse real events.
    # if record['superevent_id'][0] != 'S':
    #    return
    if record['superevent_id'][0] != 'M':
        return

    if record['alert_type'] == 'RETRACTION':
        print(record['superevent_id'], 'was retracted')
        return

    # Respond only to 'CBC' events. Change 'CBC' to 'Burst' to respond to
    # only unmodeled burst events.
    if record['event']['group'] != 'CBC':
        return

    # Parse sky map
    skymap_str = record.get('event', {}).pop('skymap')
    if skymap_str:
        # Decode, parse skymap, and print most probable sky location
        skymap_bytes = b64decode(skymap_str)
        skymap = Table.read(BytesIO(skymap_bytes))

        level, ipix = ah.uniq_to_level_ipix(
            skymap[np.argmax(skymap['PROBDENSITY'])]['UNIQ']
        )
        ra, dec = ah.healpix_to_lonlat(ipix, ah.level_to_nside(level),
                                       order='nested')
        print(f'Most probable sky location (RA, Dec) = ({ra.deg}, {dec.deg})')

        # Print some information from FITS header
        print(f'Distance = {skymap.meta["DISTMEAN"]} +/- {skymap.meta["DISTSTD"]}')

    # Print remaining fields
    print('Record:')
    pprint(record)

consumer = Consumer(client_id='fill me in', client_secret='fill me in')
consumer.subscribe(['igwn.gwalert'])

while True:
    for message in consumer.consume():
        parse_notice(message.value())

The contents (schema) of the notices are described here. The two formats (JSON & Avro) contain the same information about the candidate but contain different metadata and follow different schema

hernandezc1 commented 10 months ago

It's important to note that a new way of receiving notices (alerts) is expected to come soon: GCN Kafka. From the infographic below and the Road Map outlined on their website, it appears that the General Coordinates Network (GCN) is planning on moving toward a unified schema described in JSON format.

Screenshot 2023-10-26 at 2 35 39 PM

troyraen commented 10 months ago

The python script above does two tasks that we need to separate: consuming and message parsing. It will be much better for us if we can do the consuming using the same consumer module we use for ZTF and ELAsTiCC. This will use Confluent Kafka Connect to connect to the GCN server. I looked briefly at their website and saw several connection options -- none looked like exactly what we need but they did make me think it should be possible. Really what we need to do is track down a few configuration settings. Here's a starter list off the top of my head:

hernandezc1 commented 10 months ago
  • IP address and port to connect to the GCN server

What ports do I need to open in order to receive or send GCN notices with Kafka? Clients connecting to GCN only need to be able to make outbound (egress) TCP connections. The client connects to the following hosts and ports (the default hostname is set to: kafka.gcn.nasa.gov):

Screenshot 2023-10-29 at 12 29 21 PM

  • Find out if the client ID and secret that we get when signing up can be plugged directly into the analogous fields we used for ZTF. In particular, I don't know if the GCN secret can be handled the same was at the ZTF keytab.

I'm hopeful that the answer is yes. I spent some time reviewing the source code of their GCN Kafka Client for Python, and learned that the authentication process uses the OAuth 2 Authorization Framework. We'll have to update the configuration files (as you've mentioned) to authenticate our consumer.

If you're interested in taking a look a the source code of their authentication process, click here.

  • Look through the connector configuration files in the consumer directory (I think there are three) for any setting that looks like we might want or need to change it.

I will start a PR in the next few days to address this.