scimma / hop-client

A pub-sub client library for Multimessenger Astrophysics.
BSD 3-Clause "New" or "Revised" License
14 stars 6 forks source link

OAUTHBEARER compatibility with GCN #203

Open joshuarwood opened 1 year ago

joshuarwood commented 1 year ago

Description

I have some collaborators who tried to use a hop consumer client to consume messages from the new GCN kafka servers. In theory this should work because both hop-client and gcn_kafka clients are based on confluent_kafka. However, my collaborators were unable to establish a connection using the standard stream example:

from hop import stream

with stream.open("kafka://kafka.gcn.nasa.gov:9092/igwn.gwalert", "r") as s:
    for message in s:
         print(message.content)

After doing some digging, it looks like the Stream class doesn't allow users to setup the correct OAUTHBEARER authorization method with the client_id and client_secret needed to connect to the GCN servers. This should be possible because I am able to create a hop Consumer as long as I manually pass the appropriate configuration settings:

from adc import auth, kafka
from hop.auth import Auth
from hop.io import Consumer

from authlib.integrations.requests_client import OAuth2Session

# credentials
client_id = "client_id_hash"
client_secret = "client_secret_hash"

# broker addresses and topic from url
domain = "gcn.nasa.gov"
topic = "igwn.gwalert"
url = f"kafka://kafka.{domain}:9092/{topic}"
_, broker_addresses, topics = kafka.parse_kafka_url(url)

# create authorization used to establish connection with GCN servers
a = Auth(None, None, method=auth.SASLMethod.OAUTHBEARER)
# remove config fields not used by OAUTHBEARER authorization
a._config.pop("sasl.username")
a._config.pop("sasl.password")
# add config fields used by GCN kafka clients
a._config["sasl.oauthbearer.method"] = "oidc"
a._config["sasl.oauthbearer.client.id"] = client_id
a._config["sasl.oauthbearer.client.secret"] = client_secret
a._config["sasl.oauthbearer.token.endpoint.url"] = f"https://auth.{domain}/oauth2/token"

session = OAuth2Session(client_id, client_secret, scope=None)
token_endpoint = a._config["sasl.oauthbearer.token.endpoint.url"]

# function to perform the GCN session authorization
def oauth_cb(*_, **__):
    token = session.fetch_token(token_endpoint, grant_type="client_credentials")
    return token["access_token"], token["expires_at"]

a._config["oauth_cb"] = oauth_cb

# create a consumer instance
c = Consumer(
    group_id="test",
    broker_addresses=broker_addresses,
    topics=topics,
    auth=a)

# read messages
for message in c:
    print(message.content)

I used the igwn.gwalert topic name here because it's also available form SCIMMA servers. It can be used to check both systems. My collaborators are reading igwn.gwalerts from SCIMMA servers and need to compare them to notices sent through the GCN servers. They would like to use hop clients for both types of notices to streamline their code.

Definition of Done

Have a more straightforward method for connecting to GCN servers using a stream instance. Should not need to pop username and password fields to suppress warnings in the authorization config. Should have a high level method that can easily setup the full GCN server config without needing to manually enter everything.

Tasks/Items:

cnweaver commented 1 year ago

This is supposed to work out of the box, and it seems to for me. Setting up my credential and subscribing from the shell:

$ hop version                                                                       
hop-client==0.8.0
adc_streaming==2.3.1
confluent_kafka==1.9.2
librdkafka==1.9.2
$ hop auth add
2023-07-13 01:04:04,091 | hop : INFO : Generating configuration with user-specified username + password
Username: <client id>
Password: <client secret>
Hostname (may be empty): kafka.gcn.nasa.gov
Token endpoint (empty if not applicable): https://auth.gcn.nasa.gov/oauth2/token
2023-07-13 01:04:41,833 | hop : INFO : Wrote configuration to: /Users/christopher/.config/hop/auth.toml
$ hop subscribe -s EARLIEST kafka://kafka.gcn.nasa.gov/igwn.gwalert
2023-07-13 01:06:36,475 | hop : INFO : group ID not specified, generating a random group ID: 3l6ihk9icb23b1lqq7uhe955fu-2IDFM2ERI6
2023-07-13 01:06:36,475 | hop : INFO : connecting to kafka://kafka.gcn.nasa.gov
2023-07-13 01:06:36,815 | hop : INFO : subscribing to topics: ['igwn.gwalert']
2023-07-13 01:06:37,288 | hop : INFO : processing messages from stream
{'alert_type': 'PRELIMINARY', 'time_created': '2023-06-29T19:00:49Z', 'superevent_id': 'MS230629s', 'urls': {'gracedb': 'https://gracedb.ligo.org/superevents/MS230629s/view/'}, 'event': {'significant': True, 'time': '2023-06-29T18:53:22.304Z', 'far': 9.110699364861297e-14, 'instruments': ['H1', 'L1'], 'group': 'CBC', 'pipeline': 'gstlal', 'search': 'MDC', 'properties': {'HasNS': 1.0, 'HasRemnant': 1.0, 'HasMassGap': 0.0}, 'classification': {'BNS': 0.9999968446698687, 'NSBH': 0.0, 'BBH': 0.0, 'Terrestrial': 3.1553301313647424e-06}, 'duration': None, 'central_frequency': None, 'skymap':. . . 

Equivalents as python scripts:

# With the previous credential pulled from the standard credential store
with hop.stream.open("kafka://kafka.gcn.nasa.gov/igwn.gwalert", "r") as s:
    for message in s:
         print(message.content)

# With a directly constructed credential:
my_auth = hop.auth.Auth("<client id>",
                        "<client secret>",
                        method=hop.auth.SASLMethod.OAUTHBEARER,
                        token_endpoint="https://auth.gcn.nasa.gov/oauth2/token")
with hop.Stream(auth=my_auth).open("kafka://kafka.gcn.nasa.gov/igwn.gwalert", "r") as s:
    for message in s:
         print(message.content)

Do one or more of these methods not work for you, and if so, what output do you get from hop version?