Closed sfc-gh-ygupta closed 11 months ago
indented and edited slightly:
import asyncio
from okta.client import Client as OktaClient
async def get_applications():
okta_client = OktaClient()
filter = {'filter': 'status eq "ACTIVE"', 'limit': '5'}
apps, resp, err = await okta_client.list_applications(filter)
if err:
print(f"Error : {err}")
else:
for app in apps:
print(f"Application Name: {app.name}")
print(f"Application ID: {app.id}")
# print(app.list_application_users())
asyncio.run(get_applications())
i'm assuming u want this in the for
loop:
okta_client.list_application_users(app.id)
you'll probably need to paginate both list_applications()
and list_application_users()
. see the readme.
i also recommend async with
for the client. see the readme.
here's a not-great solution that paginates and uses async with
:
import asyncio
from okta.client import Client as OktaClient
async def get_applications():
async with OktaClient() as okta_client:
filter = {'filter': 'status eq "ACTIVE"'}
apps, resp, err = await okta_client.list_applications(filter)
while apps:
if err:
print(f"Error : {err}")
else:
for app in apps:
print(f"Application Name: {app.name}")
print(f"Application ID: {app.id}")
app_users, respau, err = await okta_client.list_application_users(app.id)
while app_users:
for app_user in app_users:
print(app_user.id)
app_users, err = await respau.next() if respau.has_next() else (None, None)
print()
apps, err = await resp.next() if resp.has_next() else (None, None)
asyncio.run(get_applications())
EDITED: this one's a little better
import asyncio
from okta.client import Client as OktaClient
async def get_applications():
async with OktaClient() as okta_client:
filter = {'filter': 'status eq "ACTIVE"'}
async for app in get_objects(okta_client.list_applications(filter)):
print(f"Application Name: {app.name}")
print(f"Application ID: {app.id}")
async for app_user in get_objects(okta_client.list_application_users(app.id)):
print(app_user.id)
print()
async def get_objects(coro):
objects, resp, _ = await coro
while objects:
for object in objects:
yield object
objects, _ = await resp.next() if resp.has_next() else (None, None)
asyncio.run(get_applications())
I am keep getting this error with below python code
import os import okta import asyncio from dotenv import load_dotenv from okta.client import Client as OktaClient
async def get_applications(): load_dotenv() config= { 'orgUrl' : os.getenv('okta_server'), 'token' : os.getenv('api_key') } okta_client = OktaClient(config) filter = {'filter' : 'status eq "ACTIVE"', 'limit' : '5'} apps, resp, err = await okta_client.list_applications(filter) if err is not None: print(f"Error : {err}") else: for app in apps: print(f"Application Name: {app.name}") print(f"Application ID: {app.id}") print(app.list_application_users())
asyncio.run(get_applications())
pip freeze | grep okta okta==2.9.2