innovationOUtside / ipython_binder_magic

Run commands on a remote MyBinder kernel from a local notebook
MIT License
2 stars 0 forks source link

Updated script suggestion by claude.ai #2

Open psychemedia opened 3 days ago

psychemedia commented 3 days ago

Updated script suggestion from Clause.ai.

Also incoroprates ideas from heartbeat pinger:

Untested.

import asyncio
import aiohttp
import json
import random
import warnings
from uuid import uuid4
import time
from datetime import datetime, timedelta
from IPython.display import display, HTML
from IPython.core.magic import magics_class, line_cell_magic, Magics
from IPython.core.magic_arguments import argument, magic_arguments, parse_argstring

class MyBinderCell:
    def __init__(self, binder_url, timeout=10, message='step'):
        self.binder_url = binder_url
        self.timeout = timeout
        self.message = message
        self.ws = None
        self.kernel_url = None
        self.token = None
        self.shell_messages = []
        self.iopub_messages = []
        self.keep_running = True
        self.last_activity = None
        self.last_ping = None
        self.activity_url = None
        self.server_name = None

    async def connect(self):
        async with aiohttp.ClientSession() as session:
            async with session.get(self.binder_url) as response:
                async for line in response.content:
                    try:
                        msg = json.loads(line.decode('utf-8').split('data: ')[1])
                        if self.message == 'full':
                            print(json.dumps(msg, indent=2))
                        elif self.message == 'step' and msg['phase'].startswith(('built', 'launching', 'ready')):
                            print(f"{msg['phase']}: {msg['message']}")
                        if msg['phase'] == 'ready':
                            self.token = msg['token']
                            self.kernel_url = f"wss://{msg['url'].split('//')[1]}ws/kernel"
                            self.activity_url = f"https://{msg['url'].split('//')[1]}hub/api/users/jovyan/activity"
                            self.server_name = 'jupyter-jovyan'  # This is typically the default for Binder
                            break
                    except Exception as e:
                        print(f"Error processing message: {e}")

        if not self.kernel_url:
            raise Exception("Failed to obtain kernel URL")

        kernel_id = await self._start_kernel()
        self.ws = await aiohttp.ClientSession().ws_connect(
            f"{self.kernel_url}/{kernel_id}?token={self.token}"
        )
        asyncio.create_task(self._heartbeat())
        asyncio.create_task(self._activity_ping())

    async def _start_kernel(self):
        async with aiohttp.ClientSession() as session:
            async with session.post(
                f"https://{self.kernel_url.split('//')[1].split('/')[0]}/api/kernels?token={self.token}"
            ) as response:
                data = await response.json()
                return data['id']

    async def _heartbeat(self):
        while self.keep_running:
            if self.ws and not self.ws.closed:
                await self.ws.ping()
            await asyncio.sleep(30)

    async def _activity_ping(self):
        while self.keep_running:
            await asyncio.sleep(60)  # Wait for 60 seconds between pings
            if self.last_activity and (datetime.utcnow() - self.last_activity) < timedelta(seconds=120):
                if not self.last_ping or (datetime.utcnow() - self.last_ping) > timedelta(seconds=59):
                    await self._send_activity_ping()

    async def _send_activity_ping(self):
        if not self.activity_url or not self.token or not self.server_name:
            return

        timestamp = datetime.utcnow().isoformat() + 'Z'
        data = {
            'servers': {self.server_name: {'last_activity': timestamp}},
            'last_activity': timestamp
        }
        headers = {'Authorization': f'token {self.token}'}

        try:
            async with aiohttp.ClientSession() as session:
                async with session.post(self.activity_url, headers=headers, json=data) as response:
                    if response.status == 200:
                        self.last_ping = datetime.utcnow()
                        print("Activity ping sent successfully")
                    else:
                        print(f"Failed to send activity ping. Status: {response.status}")
        except Exception as e:
            print(f"Error sending activity ping: {e}")

    async def execute_request(self, code):
        self.shell_messages = []
        self.iopub_messages = []
        self.last_activity = datetime.utcnow()

        msg = self._make_execute_request(code)
        await self.ws.send_str(json.dumps(msg))

        return await self._wait_on_response('execute_reply')

    async def _wait_on_response(self, response):
        got_execute_reply = False
        got_idle_status = False
        while not (got_execute_reply and got_idle_status):
            msg = await self.ws.receive_json()
            if msg['channel'] == 'shell':
                self.shell_messages.append(msg)
                if msg['header']['msg_type'] == response:
                    got_execute_reply = True
            elif msg['channel'] == 'iopub':
                self.iopub_messages.append(msg)
                if (msg['header']['msg_type'] == 'status' and
                    msg['content']['execution_state'] == 'idle'):
                    got_idle_status = True

        return {'shell': self.shell_messages, 'iopub': self.iopub_messages}

    def _make_execute_request(self, code):
        return {
            'channel': 'shell',
            'header': {
                'msg_type': 'execute_request',
                'msg_id': str(uuid4()),
                'username': '',
                'session': str(uuid4()),
            },
            'parent_header': {},
            'metadata': {},
            'content': {
                'code': code,
                'silent': False,
                'store_history': True,
                'user_expressions': {},
                'allow_stdin': True,
            }
        }

    async def close(self):
        self.keep_running = False
        if self.ws:
            await self.ws.close()

def binder_url(repo):
    repo = repo.replace('https://github.com/', '').rstrip('/')
    return f'https://mybinder.org/build/gh/{repo}/master'

def set_background(color='honeydew'):
    script = (
        "var cell = this.closest('.code_cell');"
        "var editor = cell.querySelector('.input_area');"
        f"editor.style.background='{color}';"
        "this.parentNode.removeChild(this);"
    )
    display(HTML(f'<img src onerror="{script}">'))

@magics_class
class BinderMagic(Magics):
    def __init__(self, shell):
        super().__init__(shell)
        self.repo = None
        self.binder_cell = None

    @line_cell_magic
    @magic_arguments()
    @argument('--repo', '-r', default=None, help='Github repo URL')
    def binder(self, line, cell=''):
        args = parse_argstring(self.binder, line)
        if args.repo:
            self.repo = args.repo
            binder_url_str = binder_url(self.repo)
            self.binder_cell = MyBinderCell(binder_url_str, message='step')
            asyncio.get_event_loop().run_until_complete(self.binder_cell.connect())

        if self.binder_cell is None:
            print('No connection...')
            return

        if not cell:
            print('Use block magic')
        else:
            set_background()
            result = asyncio.get_event_loop().run_until_complete(self.binder_cell.execute_request(cell))
            for item in result['iopub']:
                if item['msg_type'] == 'execute_result':
                    return item['content']['data']
                elif item['msg_type'] == 'stream':
                    print(item['content']['text'])

# Register the magic command
get_ipython().register_magics(BinderMagic)

Then: %load_ext mybinder_client

And:

%%binder --repo https://github.com/username/repo
print("Hello from Binder!")
psychemedia commented 3 days ago

Tha above doesn't work - asyncio event loop issue. Fix is:

#pip install nest_asyncio
import nest_asyncio
import asyncio

# Apply the nest_asyncio patch
nest_asyncio.apply()

magic does do something and appears to launch server but errors; maybe worth pursuing / fixing?