Sample code below provided under MIT and Apache 2.0 licenses. This runs and works on your computer if you install the gcloud cli and run gcloud auth login
import asyncio
import threading
import google.auth.transport.requests
import google.auth
import httpx
class GoogleAuth(httpx.Auth):
"""Adds required authorization for requests to Google Cloud Platform.
This gets the default credentials for the running user, and uses them to
generate valid tokens to attach to requests.
"""
def __init__(self, scopes=("https://www.googleapis.com/auth/cloud-platform",)):
self._sync_lock = threading.RLock()
self._async_lock = asyncio.Lock()
self.scopes = scopes
self.creds = None
def _refresh_creds(self):
# Must only be called with a lock.
if self.creds is None:
self.creds, _ = google.auth.default(scopes=self.scopes)
auth_req = google.auth.transport.requests.Request()
self.creds.refresh(auth_req)
def sync_auth_flow(self, request: httpx.Request):
if self.creds is None or self.creds.expired:
with self._sync_lock:
self._refresh_creds()
request.headers["Authorization"] = "Bearer " + self.creds.token
yield request
async def async_auth_flow(self, request: httpx.Request):
if self.creds is None or self.creds.expired:
async with self._async_lock:
await asyncio.to_thread(self._refresh_creds)
request.headers["Authorization"] = "Bearer " + self.creds.token
yield request
client = httpx.Client(auth=GoogleAuth())
response = client.get("https://cloudresourcemanager.googleapis.com/v1/projects")
print(response.json())
Sample code below provided under MIT and Apache 2.0 licenses. This runs and works on your computer if you install the gcloud cli and run
gcloud auth login