Closed ds-oliver closed 1 month ago
Which function specifically are you using to scrape player data?
import pandas as pd
import numpy as np
from unidecode import unidecode
import json
import ScraperFC as sfc
import soccerdata as sd
from datetime import datetime
# Constants and initialization
LEAGUES = [
"EPL",
"La Liga",
"Bundesliga",
"Serie A",
"Ligue 1",
"Champions League"
]
SEASONS = [
"24/25",
# "23/24", "22/23",
# "21/22", "20/21", "18/19", "17/18"
]
SD_LEAGUES = [
"ENG-Premier League",
"ESP-La Liga",
"ITA-Serie A",
"GER-Bundesliga",
"FRA-Ligue 1",
]
SD_SEASONS = [
"2425",
# "2324", "2223",
# "2122", "2120", "2021", "1819", "1718"
]
# Initialize Sofascore scraper
ss = sfc.Sofascore()
fbref = sd.FBref(leagues=SD_LEAGUES, seasons=SD_SEASONS)
replacement_groups = [
(["Man Utd", "Manchester Utd", "Man United"], "Manchester United"),
(["Man City"], "Manchester City"),
(["Newcastle United", "Newcastle Utd"], "Newcastle"),
(["Wolverhampton Wanderers", "Wolverhampton"], "Wolves"),
(["Sheff Utd", "Sheffield Utd"], "Sheffield United"),
(["Cardiff", "Cardiff City"], "Cardiff"),
(["Leicester", "Leicester City"], "Leicester"),
(["Hull", "Hull City"], "Hull"),
(["Swansea", "Swansea City"], "Swansea"),
(["Leeds", "Leeds United"], "Leeds"),
(["Wigan"], "Wigan"),
(["Norwich", "Norwich City"], "Norwich"),
(["QPR"], "QPR"),
(["Bolton"], "Bolton"),
(["Reading"], "Reading"),
(["Nott'm Forest", "Nott'ham Forest", "Forest"], "Nottingham Forest"),
(["Blackpool"], "Blackpool"),
(["Birmingham"], "Birmingham"),
(["Stoke", "Stoke City"], "Stoke"),
(["Luton", "Luton Town"], "Luton"),
(["Blackburn"], "Blackburn"),
(["Huddersfield Town"], "Huddersfield"),
(["West Bromwich", "West Bromwich Albion"], "West Brom"),
(["West Ham", "West Ham United"], "West Ham"),
(["Brighton", "Brighton & Hove Albion"], "Brighton"),
(["Bournemouth", "AFC Bournemouth"], "Bournemouth"),
(["Burnley"], "Burnley"),
(["Watford"], "Watford"),
(["Fulham"], "Fulham"),
(["Aston Villa"], "Aston Villa"),
(["Crystal Palace"], "Crystal Palace"),
(["Everton"], "Everton"),
(["Arsenal"], "Arsenal"),
(["Tottenham", "Tottenham Hotspur"], "Tottenham"),
(["Chelsea"], "Chelsea"),
(["Liverpool"], "Liverpool"),
(["Ipswich", "Ipswich Town"], "Ipswich"),
]
def get_match_ids(league: str, year: str) -> tuple:
mds = ss.get_match_dicts(league=league, year=year)
match_ids = [md["id"] for md in mds]
mds_df = pd.DataFrame(mds)
savepath = (
f"/Users/hogan/scrapeFC/{league}_{year[:2]}_{year[3:]}_raw_match_dicts.csv"
)
mds_df.to_csv(savepath, index=False)
mds_df["homeTeamName"] = mds_df["homeTeam"].apply(lambda x: unidecode(x["name"]))
mds_df["awayTeamName"] = mds_df["awayTeam"].apply(lambda x: unidecode(x["name"]))
replacement_dict = {old: new for group, new in replacement_groups for old in group}
columns_to_replace = ["teamName", "homeTeamName", "awayTeamName"]
mds_df.replace(
{col: replacement_dict for col in columns_to_replace if col in mds_df.columns},
inplace=True,
)
mds_df["homeTeamCode"] = mds_df["homeTeam"].apply(
lambda x: unidecode(x.get("nameCode", ""))
)
mds_df["awayTeamCode"] = mds_df["awayTeam"].apply(
lambda x: unidecode(x.get("nameCode", ""))
)
mds_df["homeTotalGoals"] = mds_df["homeScore"].apply(lambda x: x.get("current", 0))
mds_df["homeFirstHalfGoals"] = mds_df["homeScore"].apply(
lambda x: x.get("period1", 0)
)
mds_df["homeSecondHalfGoals"] = mds_df["homeScore"].apply(
lambda x: x.get("period2", 0)
)
mds_df["awayTotalGoals"] = mds_df["awayScore"].apply(lambda x: x.get("current", 0))
mds_df["awayFirstHalfGoals"] = mds_df["awayScore"].apply(
lambda x: x.get("period1", 0)
)
mds_df["awaySecondHalfGoals"] = mds_df["awayScore"].apply(
lambda x: x.get("period2", 0)
)
mds_df["totalGoals"] = mds_df["homeTotalGoals"] + mds_df["awayTotalGoals"]
mds_df["gw"] = mds_df["roundInfo"].apply(lambda x: x.get("round", ""))
mds_df["firstHalfInjuryTime"] = mds_df["time"].apply(
lambda x: x.get("injuryTime1", None)
)
mds_df["secondHalfInjuryTime"] = mds_df["time"].apply(
lambda x: x.get("injuryTime2", None)
)
mds_df["totalInjuryTime"] = (
mds_df["firstHalfInjuryTime"] + mds_df["secondHalfInjuryTime"]
)
mds_df["match_id"] = mds_df["id"]
mds_df["season"] = year.replace("/", "")
mds_df["match_key_string"] = (
mds_df["homeTeamName"] + "_" + mds_df["awayTeamName"] + "_" + mds_df["season"]
)
mds_df["datetime"] = mds_df["startTimestamp"].apply(
lambda x: datetime.fromtimestamp(x).strftime("%Y-%m-%d %H:%M:%S")
)
mds_df = mds_df[
[
"match_id",
"homeTeamName",
"awayTeamName",
"homeTeamCode",
"awayTeamCode",
"homeTotalGoals",
"homeFirstHalfGoals",
"homeSecondHalfGoals",
"awayTotalGoals",
"awayFirstHalfGoals",
"awaySecondHalfGoals",
"totalGoals",
"gw",
"firstHalfInjuryTime",
"secondHalfInjuryTime",
"totalInjuryTime",
"season",
"datetime",
"match_key_string",
]
]
savepath = f"/Users/hogan/scrapeFC/{league}_{year[:2]}_{year[3:]}_match_dicts.csv"
mds_df.to_csv(savepath, index=False)
print(f"Match dictionaries saved to {league}_{year}_match_dicts.csv")
return match_ids, mds_df
def get_player_match_stats(match_id: int) -> pd.DataFrame:
try:
player_stats_df = ss.scrape_player_match_stats(match_id)
player_stats_df["match_id"] = match_id
return player_stats_df
except Exception as e:
print(
f"Error occurred while fetching player match stats for match {match_id}: {e}"
)
return pd.DataFrame()
def rename_duplicates(original_columns):
seen = set()
for i, column in enumerate(original_columns):
if column in seen:
original_columns[i] = f"{column}_{i}"
else:
seen.add(column)
return original_columns
def fetch_and_process_stats(league: str, year: str) -> tuple:
match_ids, matches_df = get_match_ids(league, year)
print(f"Found {len(match_ids)} matches for {league} {year}")
all_player_match_stats_df = pd.DataFrame()
all_columns = set()
for match_id in match_ids:
match_stats_df = get_player_match_stats(match_id)
if not match_stats_df.empty:
all_columns.update(match_stats_df.columns)
all_columns.update(["match_id", "name"])
all_columns = rename_duplicates(list(set(all_columns)))
all_columns = sorted(map(str, all_columns))
print("Sorted unique column names:", all_columns)
for match_id in match_ids:
match_stats_df = get_player_match_stats(match_id)
if not match_stats_df.empty:
match_stats_df.columns = rename_duplicates(list(match_stats_df.columns))
match_stats_df = match_stats_df.reindex(columns=all_columns, fill_value=0)
all_player_match_stats_df = pd.concat(
[all_player_match_stats_df, match_stats_df], ignore_index=True
)
print(f"Player match statistics for {league} {year}:")
print(all_player_match_stats_df.head())
all_player_match_stats_df = all_player_match_stats_df[
all_player_match_stats_df["minutesPlayed"] > 0
]
all_player_match_stats_df["name"] = all_player_match_stats_df["name"].apply(
unidecode
)
all_player_match_stats_df = all_player_match_stats_df.merge(
matches_df, on="match_id", how="left", suffixes=("", "_match")
)
all_player_match_stats_df = all_player_match_stats_df[
[col for col in all_player_match_stats_df.columns if "_match" not in col]
]
return all_player_match_stats_df, matches_df
def extract_country(col):
try:
if isinstance(col, str):
col = col.replace("'", '"')
col = json.loads(col)
return col.get("name", None), col.get("alpha3", None)
except json.JSONDecodeError:
print(f"Error decoding JSON: {col}")
return None, None
def process_country_data(df):
try:
df[["countryLong", "countryShort"]] = df["country"].apply(
lambda x: pd.Series(extract_country(x))
)
except KeyError as e:
print(f"KeyError encountered: {e}")
print("Columns in DataFrame:", df.columns.tolist())
raise e
return df
def reorder_columns(df):
base_columns = [
"name",
"teamName",
"position",
"rating_permatch",
"rating",
"gs_ratio",
"starts",
"minutesPlayed_permatch",
"GI_permatch",
"GI",
"xGI_permatch",
"xGI",
"contests_permatch",
"expectedAssists_permatch",
"expectedGoals_permatch",
"AT_permatch",
"goalAssist_permatch",
"penaltyWon_permatch",
"goals_permatch",
"bigChanceCreated_permatch",
"bigChanceMissed_permatch",
"keyPass_permatch",
"interceptionWon_permatch",
"wonContest_permatch",
"apps",
"minutesPlayed",
"duelWon",
"expectedAssists",
"expectedGoals",
"goalAssist",
"penaltyWon",
"goals",
"bigChanceCreated",
"bigChanceMissed",
"keyPass",
"interceptionWon",
"wonContest",
"countryLong",
"countryShort",
]
base_columns = [col for col in base_columns if col in df.columns]
remaining_columns = [col for col in df.columns if col not in base_columns]
new_column_order = base_columns + remaining_columns
return df[new_column_order]
def drop_unnecessary_columns(df):
drop_cols = [
"fieldTranslations",
"firstName",
"lastName",
"country",
"marketValueCurrency",
"slug",
"ratingVersions",
"userCount",
"dateOfBirthTimestamp",
"jerseyNumber",
"shirtNumber",
"player",
"id",
"captain",
"shortName",
]
drop_cols = [col for col in drop_cols if col in df.columns]
return df.drop(columns=drop_cols)
def clean_data_carefully(df):
numeric_columns = df.select_dtypes(include=[np.number]).columns
df[numeric_columns] = df[numeric_columns].fillna(0)
def feature_engineering(df):
df["AT"] = df["penaltyWon"] + df["goalAssist"]
df["GI"] = df["goals"] + df["goalAssist"]
df["xGI"] = df["expectedGoals"] + df["expectedAssists"]
df["contests"] = df["duelWon"] + df["interceptionWon"] + df["wonContest"]
return df
def calculate_permatch_statistics(df):
permatch_columns = [
col
for col in df.columns
if col
not in [
"starts",
"name",
"teamName",
"countryLong",
"countryShort",
"apps",
"position",
"match_id",
"substitute",
"league",
"season",
"homeTeamName",
"awayTeamName",
"homeTeamCode",
"awayTeamCode",
"homeTotalGoals",
"homeFirstHalfGoals",
"homeSecondHalfGoals",
"awayTotalGoals",
"awayFirstHalfGoals",
"awaySecondHalfGoals",
"totalGoals",
"gw",
"firstHalfInjuryTime",
"secondHalfInjuryTime",
]
]
numeric_columns = df[permatch_columns].select_dtypes(include=[np.number]).columns
print("Numeric columns for per match statistics:")
print(numeric_columns)
permatch_stats_df = df.groupby("name")[numeric_columns].mean().reset_index()
permatch_stats_df.columns = [
"name" if col == "name" else f"{col}_permatch"
for col in permatch_stats_df.columns
]
permatch_stats_df["AT_permatch"] = (
permatch_stats_df["penaltyWon_permatch"]
+ permatch_stats_df["goalAssist_permatch"]
)
permatch_stats_df["GI_permatch"] = (
permatch_stats_df["goals_permatch"] + permatch_stats_df["AT_permatch"]
)
permatch_stats_df["xGI_permatch"] = (
permatch_stats_df["expectedGoals_permatch"]
+ permatch_stats_df["expectedAssists_permatch"]
)
permatch_stats_df["contests_permatch"] = (
permatch_stats_df["duelWon_permatch"]
+ permatch_stats_df["interceptionWon_permatch"]
+ permatch_stats_df["wonContest_permatch"]
)
permatch_stats_df = permatch_stats_df.sort_values(
by=["rating_permatch", "xGI_permatch"], ascending=False
).reset_index(drop=True)
return df.merge(permatch_stats_df, on="name").round(2)
def add_apps_and_starts_columns(df):
apps_starts_df = (
df.groupby("name")
.agg(
apps=pd.NamedAgg(column="match_id", aggfunc="nunique"),
starts=pd.NamedAgg(
column="substitute", aggfunc=lambda x: x.eq(False).sum()
),
)
.reset_index()
)
return df.merge(apps_starts_df, on="name")
def determine_player_team(player_match_stats_df):
# Concatenate homeTeamName and awayTeamName into a single Series
player_match_stats_df["teamName"] = player_match_stats_df.apply(
lambda row: row["homeTeamName"] if row["homeTeamName"] == row["awayTeamName"] else row["homeTeamName"] + "," + row["awayTeamName"], axis=1
)
# Group by player and season, then find the most common team name
player_teams = (
player_match_stats_df.groupby(["name", "season"])["teamName"]
.apply(lambda x: x.str.split(',').explode().mode()[0])
.reset_index()
)
player_teams.columns = ["name", "season", "teamName"]
# Merge the most common team name back into the original DataFrame with suffixes to avoid conflicts
player_match_stats_df = player_match_stats_df.merge(
player_teams, on=["name", "season"], how="left", suffixes=('', '_most_common')
)
# Drop the temporary concatenated teamName column
player_match_stats_df.drop(columns=["teamName"], inplace=True)
# Rename the most common team name column to teamName
player_match_stats_df.rename(columns={"teamName_most_common": "teamName"}, inplace=True)
return player_match_stats_df
def process_player_match_stats(df):
try:
df = drop_unnecessary_columns(df)
df = determine_player_team(df)
df = feature_engineering(df)
df = reorder_columns(df)
clean_data_carefully(df)
# Print the columns in the DataFrame
print("Columns in DataFrame:", df.columns.tolist())
# Calculate 90s and appearances columns
df["90s"] = df["minutesPlayed"] / 90
df["appearances"] = df["minutesPlayed"].apply(lambda x: 1 if x > 0 else 0)
# Sort by GI, then xGI, then rating
sort_cols = ["GI", "xGI", "rating"]
# Remove any columns from the list of sort_cols that are not in the DataFrame
sort_cols = [col for col in sort_cols if col in df.columns]
# Sort the original DataFrame before saving or aggregating
df = df.sort_values(by=sort_cols, ascending=False).reset_index(drop=True)
# Save pre-aggregation sorted data
save_to_csv(df, "CombinedLeagues", "AllSeasons", "Raw_preaggregation")
# Aggregating data by summing and averaging over the season
numeric_columns = df.select_dtypes(include=[np.number]).columns.tolist()
aggregation_columns = [
"name",
"position",
"season",
"teamName",
] + numeric_columns
player_agg_sum = (
df[aggregation_columns]
.groupby(["name", "position", "season", "teamName"])
.sum()
.reset_index()
)
player_agg_mean = (
df[aggregation_columns]
.groupby(["name", "position", "season", "teamName"])
.mean()
.reset_index()
)
# Aggregating data across all seasons grouped by player and team
aggregation_columns_all_seasons = [
"name",
"position",
"teamName",
] + numeric_columns
player_team_agg_sum = (
df[aggregation_columns_all_seasons]
.groupby(["name", "position", "teamName"])
.sum()
.reset_index()
)
player_team_agg_mean = (
df[aggregation_columns_all_seasons]
.groupby(["name", "position", "teamName"])
.mean()
.reset_index()
)
player_agg_sum = player_agg_sum.round(2).sort_values(
by=sort_cols, ascending=False
).reset_index(drop=True)
player_agg_mean = player_agg_mean.round(2).sort_values(
by=sort_cols, ascending=False
).reset_index(drop=True)
player_team_agg_sum = player_team_agg_sum.round(2).sort_values(
by=sort_cols, ascending=False
).reset_index(drop=True)
player_team_agg_mean = player_team_agg_mean.round(2).sort_values(
by=sort_cols, ascending=False
).reset_index(drop=True)
# Save the aggregated data to Excel
save_to_excel(
{
"player_sum": player_agg_sum,
"player_mean": player_agg_mean,
"player_team_sum": player_team_agg_sum,
"player_team_mean": player_team_agg_mean,
},
"/Users/hogan/scrapeFC/CombinedLeagues_Aggregated_Stats.xlsx",
)
# List of mapped Champions League teams based on the filtering
champions_league_teams = [
"Milan", "Arsenal", "Aston Villa", "Atletico Madrid", "Barcelona", "Bayer 04 Leverkusen",
"FC Bayern Munchen", "Bologna", "Borussia Dortmund", "Stade Brestois", "GNK Dinamo Zagreb",
"Girona FC", "Inter", "Juventus", "Lille", "Liverpool", "Manchester City",
"AS Monaco", "Paris Saint-Germain", "RB Leipzig", "Real Madrid", "FK Crvena zvezda",
"Red Bull Salzburg", "Sparta Praha", "VfB Stuttgart", "Young Boys"
]
# Filter the player_team dataframes for Champions League teams and then save to Excel
champions_league_df_sum = player_team_agg_sum[
player_team_agg_sum["teamName"].isin(champions_league_teams)
]
champions_league_df_mean = player_team_agg_mean[
player_team_agg_mean["teamName"].isin(champions_league_teams)
]
# Save the filtered data to Excel
save_to_excel(
{
"UCL_Player_Team_Total": champions_league_df_sum,
"UCL_Player_Team_Avg": champions_league_df_mean,
},
"/Users/hogan/scrapeFC/UCL_Player_Team_Aggregated_Stats.xlsx",
)
except KeyError as e:
print(f"KeyError encountered: {e}")
print("Columns in DataFrame:", df.columns.tolist())
raise
return df
def save_to_csv(df, league, season, stage):
csv_file_path = f"/Users/hogan/scrapeFC/{league}_{season}_{stage}.csv"
df.to_csv(csv_file_path, index=False)
print(f"Data saved to {csv_file_path}")
def save_to_excel(df_dict, file_path):
with pd.ExcelWriter(file_path, engine="xlsxwriter") as writer:
for sheet_name, df in df_dict.items():
df.to_excel(writer, sheet_name=sheet_name, index=False)
print(f"Data saved to {file_path}")
def aggregate_team_stats(matches_df):
home_stats = (
matches_df.groupby(["homeTeamName", "season"])
.agg(
{
"homeTotalGoals": "sum",
"homeFirstHalfGoals": "sum",
"homeSecondHalfGoals": "sum",
"awayTotalGoals": "sum",
"awayFirstHalfGoals": "sum",
"awaySecondHalfGoals": "sum",
"gw": "count",
"firstHalfInjuryTime": "sum",
"secondHalfInjuryTime": "sum",
"totalInjuryTime": "sum",
}
)
.reset_index()
)
away_stats = (
matches_df.groupby(["awayTeamName", "season"])
.agg(
{
"homeTotalGoals": "sum",
"homeFirstHalfGoals": "sum",
"homeSecondHalfGoals": "sum",
"awayTotalGoals": "sum",
"awayFirstHalfGoals": "sum",
"awaySecondHalfGoals": "sum",
"gw": "count",
"firstHalfInjuryTime": "sum",
"secondHalfInjuryTime": "sum",
"totalInjuryTime": "sum",
}
)
.reset_index()
)
home_stats.columns = [
"teamName",
"season",
"homeTotalGoals",
"homeFirstHalfGoals",
"homeSecondHalfGoals",
"awayTotalGoals",
"awayFirstHalfGoals",
"awaySecondHalfGoals",
"totalMatches",
"firstHalfInjuryTime",
"secondHalfInjuryTime",
"totalInjuryTime",
]
away_stats.columns = home_stats.columns
team_stats = pd.concat([home_stats, away_stats])
team_stats = team_stats.groupby(["teamName", "season"]).sum().reset_index()
return team_stats
def main():
all_player_data = pd.DataFrame()
all_team_data = pd.DataFrame()
for league in LEAGUES:
for season in SEASONS:
player_data, match_data = fetch_and_process_stats(league, season)
if not player_data.empty:
player_data["league"] = league
player_data["season"] = season.replace("/", "")
# Call determine_player_team to assign the correct team name to each player
player_data = determine_player_team(player_data)
# Save player match-level stats with descriptive filename
save_to_csv(
player_data.round(2),
league,
season.replace("/", "_"),
"Player_Match_Level_Stats",
)
all_player_data = pd.concat(
[all_player_data, player_data], ignore_index=True
)
if not match_data.empty:
match_data["league"] = league
match_data["season"] = season.replace("/", "")
# Save match-level stats with descriptive filename
save_to_csv(
match_data.round(2),
league,
season.replace("/", "_"),
"Match_Level_Stats",
)
team_data = aggregate_team_stats(match_data)
all_team_data = pd.concat([all_team_data, team_data], ignore_index=True)
# Save team-level stats with descriptive filename
save_to_csv(
team_data.round(2), league, season.replace("/", "_"), "Team_Stats"
)
# Process and save aggregated player data across all leagues and seasons
if not all_player_data.empty:
all_player_data = process_country_data(all_player_data)
all_player_data = process_player_match_stats(all_player_data)
print(f"Player match statistics for all seasons:")
print(all_player_data.head())
# Save aggregated player data across all leagues and seasons
save_to_csv(
all_player_data.round(2), "CombinedLeagues", "AllSeasons", "Players"
)
# Process and save aggregated team data across all leagues and seasons
if not all_team_data.empty:
print(f"Team statistics for all seasons:")
print(all_team_data.head())
# Save aggregated team data across all leagues and seasons
save_to_csv(all_team_data.round(2), "CombinedLeagues", "AllSeasons", "Teams")
# Optional: Save to Excel for combined data
if not all_player_data.empty or not all_team_data.empty:
save_to_excel(
{
"Player_Data": all_player_data,
"Team_Data": all_team_data,
},
"/Users/hogan/scrapeFC/CombinedLeagues_AllSeasons_Aggregated_Stats.xlsx",
)
# List of mapped Champions League teams based on the filtering
champions_league_teams = [
"Milan",
"Arsenal",
"Aston Villa",
"Atletico Madrid",
"Barcelona",
"Bayer 04 Leverkusen",
"FC Bayern Munchen",
"Bologna",
"Borussia Dortmund",
"Stade Brestois",
"GNK Dinamo Zagreb",
"Girona FC",
"Inter",
"Juventus",
"Lille",
"Liverpool",
"Manchester City",
"AS Monaco",
"Paris Saint-Germain",
"RB Leipzig",
"Real Madrid",
"FK Crvena zvezda",
"Red Bull Salzburg",
"Sparta Praha",
"VfB Stuttgart",
"Young Boys",
]
# Filter the DataFrame for Champions League teams
champions_league_df = all_player_data[
all_player_data["teamName"].isin(champions_league_teams)
]
# Save Champions League player data with more descriptive names
save_to_csv(
champions_league_df.round(2),
"Champions_League",
"AllSeasons",
"Player_Stats",
)
# Save the filtered Champions League data to Excel with a descriptive name
save_to_excel(
{
"Champions_League_Player_Stats": champions_league_df,
},
"/Users/hogan/scrapeFC/Champions_League_AllSeasons_Player_Stats.xlsx",
)
if __name__ == "__main__":
main()
So you're using the scrape_player_match_stats and want the team name for each player returned with those results? Ideally?
That would be ideal, yes. I've currently implemented the "determine_player_team()" function, which has limitations.
Ok yeah I'll need to take a look and see if Sofascore returns a team name for each player anywhere
@ds-oliver I just finished a fix for this that adds team name and ID columns to the function that scrapes player match stats. I'm running unit tests now and then I'll push my changes and release the update to PyPI
@ds-oliver I just released v3.1.1 which has this change included!
Question: It would be nice to be able to scrape player-level data and the teamName for each player come back.