harupy / mlflow

Open source platform for the machine learning lifecycle
https://mlflow.org
Apache License 2.0
0 stars 1 forks source link

a #63

Open harupy opened 1 year ago

harupy commented 1 year ago
import pandas as pd
import random
from typing import Any
from pyspark.sql.session import SparkSession
from pyspark.sql.types import StructField, StructType, LongType, ArrayType, MapType, StringType
from pyspark.sql.pandas.functions import pandas_udf, PandasUDFType

spark = SparkSession.builder.getOrCreate()

@pandas_udf(ArrayType(MapType(StringType(), LongType())))
def array_map(s: pd.Series) -> pd.Series:
    return pd.Series(
        [[{x: idx}] for idx, x in enumerate(s)],
    )

df = spark.createDataFrame([("Foo",), ("Bar",)], ("name",))
df.show()
df.select(array_map("name")).show()
harupy commented 1 year ago
from __future__ import annotations

import openai
import ast
import pathlib
import random
import subprocess
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import NamedTuple

def should_ignore(node: ast.FunctionDef) -> bool:
    # Remove a docstring node from the body if it exist
    body = node.body
    if node.body and isinstance(body[0], ast.Expr) and isinstance(body[0].value, ast.Str):
        body = body[1:]

    is_property = any(isinstance(d, ast.Name) and d.id == "property" for d in node.decorator_list)
    return is_property or (
        len(body) == 1 and (isinstance(body[0], ast.Pass) or isinstance(body[0], ast.Raise))
    )

class Location(NamedTuple):
    row: int
    col: int

    def __repr__(self) -> str:
        return f"{self.row}:{self.col}"

class Func:
    def __init__(self, path: pathlib.Path, name: str, code: str, start: Location, end: Location):
        self.name = name
        self.path = path
        self.start = start
        self.end = end
        self.code = code

    def __repr__(self) -> str:
        return f"{self.path}:{self.start}"

def parse_functions(path: pathlib.Path) -> list[Func]:
    code = path.read_text()
    tree = ast.parse(code)
    functions = []
    for node in ast.walk(tree):
        if isinstance(node, ast.FunctionDef):
            if should_ignore(node):
                continue
            start_row = node.lineno
            start_col = node.col_offset
            end_row = node.end_lineno
            end_col = node.end_col_offset
            func_code = "\n".join(code.splitlines()[start_row - 1 : end_row])
            functions.append(
                Func(
                    path,
                    node.name,
                    func_code,
                    Location(start_row, start_col),
                    Location(end_row, end_col),
                )
            )
    return functions

def chat_complete(func: Func) -> tuple(Func, str):
    prompt = f"""
Review the following code and find apparent bugs. If you find them, say "FOUND BUG" and describe the bug. If you don't find any bugs, say "NO BUGS". Ignore undefined functions and variables.

```python
{func.code}

""" resp = openai.ChatCompletion.create( model="gpt-3.5-turbo", messages=[ { "role": "user", "content": prompt, }, ], ) return func, resp.choices[0].message.content

def main(): python_files = subprocess.check_output( [ "git", "ls-files", "mlflow/*/.py", ], text=True, ).splitlines() funcs = [] for p in map(pathlib.Path, python_files): funcs.extend(parse_functions(p))

random.shuffle(funcs)

result_markdown = pathlib.Path("a.md")
result_markdown.open("w").close()  # clear the file
with ThreadPoolExecutor(max_workers=10) as executor:
    futures = []
    for f in funcs:
        futures.append(executor.submit(chat_complete, f))
    for future in as_completed(futures):
        try:
            func, resp = future.result()
            with pathlib.Path("a.md").open("a") as f:
                f.write(f"# {func}\n\n")
                f.write(f"```python\n{func.code}\n```\n\n")
                f.write("### Suggestion\n\n")
                f.write(resp)
                f.write("\n\n")
        except Exception as e:
            print("Error", e)

if name == "main": main()

harupy commented 1 year ago

https://spark.apache.org/docs/latest/spark-connect-overview.html

wget https://dlcdn.apache.org/spark/spark-3.4.1/spark-3.4.1-bin-hadoop3.tgz
tar -xvf spark-3.4.1-bin-hadoop3.tgz --directory spark-3.4.1-bin-hadoop3
./spark-3.4.1-bin-hadoop3/spark-3.4.1-bin-hadoop3/sbin/start-connect-server.sh --packages org.apache.spark:spark-connect_2.12:3.4.0
export SPARK_REMOTE="sc://localhost"
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
from pyspark.sql import SparkSession
import mlflow
import mlflow
from mlflow.models import infer_signature
from sklearn import datasets
from sklearn.neighbors import KNeighborsClassifier
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
print(spark)

X, y = datasets.load_iris(as_frame=True, return_X_y=True)
model = KNeighborsClassifier()
model.fit(X, y)
predictions = model.predict(X)
signature = infer_signature(X, predictions)

with mlflow.start_run():
    model_info = mlflow.sklearn.log_model(model, "model", signature=signature)

infer_spark_df = spark.createDataFrame(X)

pyfunc_udf = mlflow.pyfunc.spark_udf(spark, model_info.model_uri, env_manager="conda")
result = infer_spark_df.select(pyfunc_udf(*X.columns).alias("predictions")).toPandas()

print(result)