move-coop / parsons

A python library of connectors for the progressive community.
https://www.parsonsproject.org/
Other
257 stars 125 forks source link

[Feature/Addition] Adding a connector for the LegiScan API #935

Open matthewkrausse opened 7 months ago

matthewkrausse commented 7 months ago

I'm working on a python library to wrap the Legiscan API. This allows you to get data on legislators, bills, votes, etc. Wanted to see if other people would be interested in this. There is not that much work to be done on it but would be happy to have someone pair on it.

https://legiscan.com/

Priority

low

lkesich commented 7 months ago

It's not being updated anymore, but I still use the pylegiscan library (https://github.com/poliquin/pylegiscan). I use it pretty regularly and would be happy to help test a new Parsons legiscan class.

matthewkrausse commented 7 months ago

@lkesich What are some of the methods you primarily use? I wanted to create a priority order of me working on it. Also how are you working with the data? A lot of information is nested and I would like to provide some same use cases for being able to extract the data in a useful way.

Thanks!

lkesich commented 7 months ago

@matthewkrausse Honestly, I mostly just use the getDatsetList and getDataset methods. I have redshift tables for people, bills, votes etc that I overwrite once a day with fresh data from those bulk imports. My application doesn't require real-time updates, so this was simplest for me.

I've also used getMasterListRaw and getBill for comparing change hashes and replacing stale data, but I found this was actually slower than just overwriting the current session completely. It also creates many more API calls than the bulk import methods. A more advanced Python user probably could have optimized my process here, but I ended up just going back to what worked for me.

The nesting is definitely the most difficult part of working with Legiscan data. I use the long_table() method from Parsons.Table to expand the fields I'm interested in as separate tables, then drop the other nested columns before loading to the data warehouse. I don't like to work with semi-structured data in redshift, so all the data I load is fully unpacked. When I was using a postgres-based data warehouse, I loaded the data basically as-is.

My redshift schema has the following tables:

sponsors and subjects are extracted from the nested structure of the bill JSONs prior to loading. votes and roll_calls are both extracted from the vote JSONs (in my data model, votes has one row per legislator per roll_call, roll_calls has one row per roll_call).

I use this data to fulfill one-off research requests (e.g. "which current Dem legislators are most likely to break with their party on gun control votes"), and to support applications that allied organizations use for opposition research and end-of-year scorecard building.

I can imagine that other people might want to use Legiscan for real time bill monitoring, so I'm not sure how representative my use case is. If it's helpful, I can share more details about it and attach my Python script.

matthewkrausse commented 7 months ago

Thanks for taking the time to respond. That would be helpful if you would share any resources you use it for and that structure. I envision a lot of other people having that similar use case. When we release the new connector, it would be great to have some sample script to show how to handle the data.

lkesich commented 7 months ago

This is the script for creating or overwriting my database. The functions in the Legiscan class are from poliquin's pylegiscan library. There's probably more elegant ways of doing a lot of this, but at some point I had to get it working and move on to the actual project :)

import os
import pandas as pd
import numpy as np
from parsons import Table, Redshift
import json
import requests
import base64
import zipfile
import io
from urllib.parse import urlencode, quote_plus

rs = Redshift()
table = Table()

class LegiScanError(Exception):
    pass

class LegiScan(object):
    BASE_URL = 'http://api.legiscan.com/?key={0}&op={1}&{2}'

    def __init__(self, apikey=None):
        """LegiScan API.  State parameters should always be passed as
           USPS abbreviations.  Bill numbers and abbreviations are case
           insensitive.  Register for API at http://legiscan.com/legiscan
        """
        # see if API key available as environment variable
        if apikey is None:
            apikey = os.getenv('LEGISCAN_API_KEY')
        self.key = apikey.strip()

    def _url(self, operation, params=None):
        """Build a URL for querying the API."""
        if not isinstance(params, str) and params is not None:
            params = urlencode(params)
        elif params is None:
            params = ''
        return self.BASE_URL.format(self.key, operation, params)

    def _get(self, url):
        """Get and parse JSON from API for a url."""
        req = requests.get(url)
        if not req.ok:
            raise LegiScanError('Request returned {0}: {1}'\
                    .format(req.status_code, url))
        data = json.loads(req.content)
        if data['status'] == "ERROR":
            raise LegiScanError(data['alert']['message'])
        return data

    def get_dataset_list(self, state=None, year=None):
        """Get list of available datasets, with optional state and year filtering.
        """
        if state is not None:
            url = self._url('getDatasetList', {'state': state})
        elif year is not None:
            url = self._url('getDatasetList', {'year': year})
        else:
            url = self._url('getDatasetList')
        data = self._get(url)
        return data['datasetlist']

    def get_dataset(self, id, access_key):
        """Get list of available datasets, with optional state and year filtering.
        """
        url = self._url('getDataset', {'id': id, 'access_key': access_key})
        data = self._get(url)
        return data['dataset']

legis = LegiScan()

""" Warning: these lines will create one API call per session + one more API call. """

datasets = legis.get_dataset_list(state = 'my_state')

votes = []
bills = []
people = []

for dataset in datasets:
    session_id = dataset['session_id']
    access_key = dataset['access_key']   
    api_output = legis.get_dataset(session_id, access_key)
    encoded = base64.b64decode(api_output['zip'])
    zipped = zipfile.ZipFile(io.BytesIO(encoded))
    files = zipped.namelist()

    for file in files:
        content = zipped.read(file).decode("utf-8")   
        try:
            data = json.loads(content)
            if '/bill/' in file:
                bills.append(data['bill'])
            elif '/vote/' in file:
                votes.append(data['roll_call'])
            elif '/people/' in file:
                data['person']['session_id'] = session_id
                people.append(data['person'])       
        except:
            pass

bill_df = pd.json_normalize(bills)
vote_df = pd.json_normalize(votes)
people_df = pd.json_normalize(people)

""" Unpack nested columns into separate tables + clean column names """

def simplify_df(df):

    nested_cols = []

    for col in df.columns:
        if isinstance(df[col][0],list):
            nested_cols.append(col)

    new_names = []
    old_names = df.columns

    for old_name in old_names:
        new_name = old_name.split('.')[-1]
        new_names.append(new_name)

    name_dict = dict(zip(old_names, new_names))

    output = df.rename(columns = name_dict)

    output.drop(columns = nested_cols, axis = 1, inplace = True)

    output.mask(output.applymap(type).eq(list) & ~output.astype(bool), inplace = True)
    output.replace({np.nan: None}, inplace = True)

    return output

def clean_names(tbl, prefix = ''):
    old_names = tbl.columns

    for old_name in tbl.columns:
        new_name = old_name.replace(prefix,'')
        try:
            tbl.rename_column(old_name, new_name)
        except:
            pass

    return tbl

# roll call
roll_call_tbl = table.from_dataframe(bill_df).long_table('bill_id','votes')
clean_names(roll_call_tbl, 'votes_')

# subjects
subjects_tbl = table.from_dataframe(bill_df).long_table('bill_id','subjects')
clean_names(subjects_tbl, 'subjects_')

# sponsors
sponsors_tbl = table.from_dataframe(bill_df).long_table('bill_id','sponsors')
clean_names(sponsors_tbl, 'sponsors_')

# vote
vote_tbl = table.from_dataframe(vote_df).long_table(['roll_call_id','bill_id'],'votes')
clean_names(vote_tbl, 'votes_')

# bill
simplified_bill_df = simplify_df(bill_df)
bill_tbl = table.from_dataframe(simplified_bill_df)

# person
simplified_people_df = simplify_df(people_df)
people_tbl = table.from_dataframe(simplified_people_df)

""" Warning: this will completely overwrite all legiscan tables in redshift. """

legiscan_tables = [
    {
        'table': 'legiscan_people',
        'schema': 'my_schema',
        'sortkey': 'people_id',
        'distkey': 'people_id',
        'tbl': people_tbl
    },
    {
        'table': 'legiscan_roll_calls',
        'schema': 'my_schema',
        'sortkey': 'date',
        'distkey': 'roll_call_id',
        'tbl': roll_call_tbl
    },
    {
        'table': 'legiscan_bills',
        'schema': 'my_schema',
        'sortkey': 'session_id',
        'distkey': 'bill_id',
        'tbl': bill_tbl
    },
    {
        'table': 'legiscan_votes',
        'schema': 'my_schema',
        'sortkey': 'roll_call_id, people_id',
        'distkey': 'roll_call_id',
        'tbl': vote_tbl
    },
    {
        'table': 'legiscan_subjects',
        'schema': 'my_schema',
        'sortkey': 'subject_id',
        'distkey': 'bill_id',
        'tbl': subjects_tbl
    },
    {
        'table': 'legiscan_sponsors',
        'schema': 'my_schema',
        'sortkey': 'people_id',
        'distkey': 'bill_id',
        'tbl': sponsors_tbl
    }
]

""" Send to redshift. If you have not defined a distkey or sortkey, 
    you will get a warning that can be safely ignored."""

for key in legiscan_tables:
    destination = key['schema'] + '.' + key['table']
    tbl = key['tbl']
    dist = key['distkey']
    sort = key['sortkey']
    tbl.to_redshift(
        destination, 
        distkey = dist, 
        sortkey = sort, 
        if_exists = 'drop')
matthewkrausse commented 7 months ago

This is great. Thanks! I went ahead and used this for testing the new LegiScan connector and also rewrote it to send the data to BigQuery. Once I get it cleaned up, I'll add it to the a sample use case script to share when the new connector is ready.

lkesich commented 7 months ago

I look forward to trying the new connector! Thanks for your work on this.