AlienVault-OTX / OTX-Python-SDK

The Python SDK for AlienVault OTX
Other
358 stars 162 forks source link

get_my_pulses() function forms invalid API call URL, returns empty list. #64

Open w4rc0n opened 2 years ago

w4rc0n commented 2 years ago

In trying to use some simple python like this:

from OTXv2 import OTXv2

try:
   with open('config.json', 'r') as file:
      config = json.load(file)
except:
   quit('config.json not found, or unreadable.')

otx = OTXv2(config['otx_api_key'])

pulses = otx.get_my_pulses(max_items=200)
print(pulses)

The result would always be an empty list: [] No matter what arguments I passed to the get_my_pulses function.

I started digging into the source OTXv2 library. And hacked together the equivalent necessary functions using the requests library

from requests import Session
from requests.packages.urllib3.util import Retry
from requests.adapters import HTTPAdapter
import json

try:
   with open('config.json', 'r') as file:
      config = json.load(file)
except:
   quit('config.json not found, or unreadable.')

api = "https://otx.alienvault.com"
headers = {
            'X-OTX-API-KEY': config['otx_api_key'],
            'Content-Type': 'application/json'
        }

session = Session()
session.mount('https://', HTTPAdapter(
                max_retries=Retry(
                    total=5,
                    status_forcelist=[429, 500, 502, 503, 504],
                    backoff_factor=1,
                )
            ))

response = session.get(api+'/api/v1/pulses/my', headers=headers).json()
print(response['count'])

Which yielded a more expected response 214 which is accurate.

Upon digging more into the OTXv2 library functions, I hacked this together to give me just the necessary bits so that I could place some print statements and find what URLs were being formed.

import requests
try:
    from urllib.parse import urlencode
except ImportError:
    from urllib import urlencode
import json

API_V1_ROOT = "/api/v1"
MY_PULSES = "{}/pulses/my".format(API_V1_ROOT)
server="https://otx.alienvault.com"
headers = {
            'X-OTX-API-KEY': '<redacted>',
            'Content-Type': 'application/json'
        }

def post(url, body=None, headers=None, files=None, **kwargs):
    """
    Internal API for POST request on a OTX URL
    :param url: URL to retrieve
    :param body: HTTP Body to send in request
    :param headers: (optional) dict of headers to use, instead of default headers
    :param files: (optional) list of file tuples, if posting multipart form data
    :return: response as dict
    """
    response = requests.session().post(
        create_url(url, **kwargs),
        data=json.dumps(body) if body else None,
        files=files,
        headers=headers
    )
    print(json.dumps(response.json(), indent=2))
    return response.json()

def get(url, **kwargs):
    """
    Internal API for GET request on a OTX URL
    :param url: URL to retrieve
    :return: response in JSON object form
    """
    response = requests.session().get(create_url(url, **kwargs), headers=headers)
    print(json.dumps(response.json(), indent=2))
    return response.json()

def walkapi_iter(url, max_page=None, max_items=None, method='GET', body=None):
    next_page_url = url
    print('next_page_url', next_page_url)
    count = 0
    item_count = 0
    while next_page_url:
        count += 1
        if max_page and count > max_page:
            break
        if method == 'GET':
            data = get(next_page_url)
        elif method == 'POST':
            data = post(next_page_url, body=body)
        else:
            raise Exception("Unsupported method type: {}".format(method))
        for el in data['results']:
            item_count += 1
            if max_items and item_count > max_items:
                break
            yield el
        next_page_url = data["next"]

def create_url(url_path, **kwargs):
    """ Turn a path into a valid fully formatted URL. Supports query parameter formatting as well.
    :param url_path: Request path (i.e. "/search/pulses")
    :param kwargs: key value pairs to be added as query parameters (i.e. limit=10, page=5)
    :return: a formatted url (i.e. "/search/pulses")
    """
    print('----start create url----')
    uri = url_path.format(server)
    print(uri)
    uri = uri if uri.startswith("http") else server.rstrip('/') + uri
    print(uri)
    if kwargs:
        uri += "?" + urlencode(kwargs)
    print(uri)
    print('----stop create url---')
    return uri

def walkapi(url, iter=False, max_page=None, max_items=None, method='GET', body=None):
    if iter:
        print(walkapi_iter(url, max_page=max_page, max_items=max_items, method=method, body=body))
        return walkapi_iter(url, max_page=max_page, max_items=max_items, method=method, body=body)
    else:
        print(list(walkapi_iter(url, max_page=max_page, max_items=max_items, method=method, body=body)))
        return list(walkapi_iter(url, max_page=max_page, max_items=max_items, method=method, body=body))

def get_my_pulses(query=None, max_items=200):
    print('pulse?', walkapi(create_url(MY_PULSES, limit=50, q=query), max_items=max_items))
    #print(walkapi(create_url(MY_PULSES, limit=50)))
    return walkapi(create_url(MY_PULSES, limit=50, q=query), max_items=max_items)

This yielded URLs consistently like this, uring the create_url() function:

----start create url----
https://otx.alienvault.com/api/v1/pulses/my?limit=50&q=None
https://otx.alienvault.com/api/v1/pulses/my?limit=50&q=None
https://otx.alienvault.com/api/v1/pulses/my?limit=50&q=None
----stop create url---

q= is not a valid API query parameter according to the OTX API documentation: https://otx.alienvault.com/api

only limit, page, 'since`.

I'm haven't completely learned the OTXv2 code base, but I don't see anywhere were q= is supposed to be formatted into something the API accepts out of the above list of valid queries.

If you provide any sort of data by passing it in with get_my_pulses(query=<anything>) it always formats the URL as https://otx.alienvault.com/api/v1/pulses/my?limit=50&q=<anything> which the API responds to with 0 results.

Am I missing something?

Note: I saw similar issues with get_user_pulses() but it appears to be virtually identical just with a different API endpoint URL.

jaehwanjoa commented 10 months ago

from OTXv2 import OTXv2

try: with open('config.json', 'r') as file: config = json.load(file) except: quit('config.json not found, or unreadable.')

otx = OTXv2(config['otx_api_key'])

pulses = otx.get_my_pulses(max_items=200) -> pulses = otx.get_my_pulses(query='', max_items=200) print(pulses)