Closed knopki closed 1 year ago
Example of usage:
#!/usr/bin/env python3
import signal
import aiohttp
from anyio import CancelScope, run, open_signal_receiver, create_task_group
from yarl import URL
from metis_client import MetisAPI, MetisLocalUserAuth, MetisTokenAuth
from metis_client.models.event import MetisCalculationsEvent, MetisDatasourcesEvent
API_URL = URL("http://localhost:3000")
async def on_request_start(session, trace_config_ctx, params):
pass
# print("Starting request")
# print(params)
async def on_request_end(session, trace_config_ctx, params):
pass
# print("Ending request")
# print(params)
# print(await params.response.text())
trace_config = aiohttp.TraceConfig()
trace_config.on_request_start.append(on_request_start)
trace_config.on_request_end.append(on_request_end)
async def signal_handler(scope: CancelScope):
with open_signal_receiver(signal.SIGINT, signal.SIGTERM) as signals:
async for signum in signals:
if signum == signal.SIGINT:
print("Ctrl+C pressed!")
else:
print("Terminated!")
scope.cancel()
return
########################## SEE BELOW ##########################
async def job():
async with MetisAPI(
API_URL,
MetisLocalUserAuth("member@test.com", "123123"),
# MetisTokenAuth("admin@test.com"),
trace_configs=[trace_config],
) as client:
with open("../metis-backend/tests/data/Au.optimade") as fd:
data = fd.read()
async with client.stream.subscribe() as sub:
await client.v0.ping()
print(await client.v0.auth.whoami())
r = await client.v0.datasources.create(data)
answer = None
async for msg in sub:
if (
isinstance(msg, MetisDatasourcesEvent)
and msg.request_id == r.request_id
):
answer = msg
break
if not answer or not answer.data:
return None
data_id = sorted(answer.data, key=lambda x: x.created_at)[-1].id
r = await client.v0.calculations.create(data_id)
async for msg in sub:
if (
isinstance(msg, MetisCalculationsEvent)
and msg.request_id == r.request_id
):
answer = msg
break
print(answer)
###############################################################
async def main():
async with create_task_group() as tg:
tg.start_soon(signal_handler, tg.cancel_scope)
await job()
tg.cancel_scope.cancel()
run(main)
@blokhin @pyoner This is a very early draft version for review. Only minimum implemented: authentication, work with stream, data source and calculation creation + ping + "me" endpoint. Also, some models implemented, but it's hard without docs.
pip install -e .
for installation