Open danielballan opened 4 years ago
To get samelist_logbooks
function,
use requests
from datetime import datetime
import requests
import requests.auth
__all__ = ['Client']
headers = {'content-type': 'application/json', 'accept': 'application/json'}
class Client:
def __init__(self, url, user, password):
"""
url : string
base URL, such as ``'https://some_host:port/Olog'``
user : string
password : string
"""
if url.endswith('/'):
url = url[:-1]
auth = requests.auth.HTTPBasicAuth(user, password)
self._url = url
self._session = requests.Session()
# I have requested that working certificates be properly configured on
# the server. This is unfortunately a necessary workaround for now....
self._session.verify = False
self._kwargs = dict(headers=headers, auth=auth) # for every request
def list_logbooks(self):
url = f'{self._url}/resources/logbooks'
res = self._session.get(url, **self._kwargs)
res.raise_for_status()
return res.json()
use httpx
from datetime import datetime
from httpx import AsyncClient
import asyncio
__all__ = ['Client']
headers = {'content-type': 'application/json', 'accept': 'application/json'}
class Client:
def __init__(self, url, user, password):
"""
url : string
base URL, such as ``'https://some_host:port/Olog'``
user : string
password : string
"""
if url[-1] != '/':
url = url + '/'
self._session = AsyncClient(base_url=url, headers=headers, auth=(user, password))
async def async_list_logbooks(self):
async with self._session as api:
res = await api.get('logbooks')
res.raise_for_status()
return res.json()
def list_logbooks(self):
return asyncio.run(self.async_list_logbooks())
@danielballan @shroffk
Looks promising, adding async to a library like this which makes a ton of n/w calls makes perfect sense to me...it might make life for pyolog users easier
The java clients for olog already use executors and jobs.
Looks perfect. Thanks for the write-up, @ke-zhang-rd.
https://github.com/encode/httpx