coach-scraper/app/chesscom.py

187 lines
5.9 KiB
Python

import asyncio
import json
import os
import os.path
from typing import List
import aiohttp
from bs4 import BeautifulSoup, SoupStrainer, Tag
from app.pipeline import Extractor as BaseExtractor
from app.pipeline import Fetcher as BaseFetcher
from app.pipeline import Pipeline as BasePipeline
from app.types import Site, Title
# The number of coach listing pages we will at most iterate through. This number
# was determined by going to chess.com/coaches?sortBy=alphabetical&page=1 and
# traversing to the last page.
MAX_PAGES = 64
# How long to wait between a batch of network requests.
SLEEP_SECS = 3
class Fetcher(BaseFetcher):
def __init__(self, session: aiohttp.ClientSession):
super().__init__(site=Site.CHESSCOM, session=session)
async def scrape_usernames(self, page_no: int) -> List[str] | None:
if page_no > MAX_PAGES:
return []
print(f"{self.site.value}: Scraping page {page_no}/{MAX_PAGES}")
filepath = self.path_page_file(page_no)
try:
with open(filepath, "r") as f:
return [line.strip() for line in f.readlines()]
except FileNotFoundError:
pass
if self.has_made_request:
await asyncio.sleep(SLEEP_SECS)
url = f"https://www.chess.com/coaches?sortBy=alphabetical&page={page_no}"
response, status_code = await self.fetch(url)
if response is None:
return None # Skips this page.
usernames = []
soup = BeautifulSoup(response, "lxml")
members = soup.find_all("a", class_="members-categories-username")
for member in members:
href = member.get("href")
username = href[len("https://www.chess.com/member/") :]
usernames.append(username)
# Cache results.
with open(filepath, "w") as f:
for username in usernames:
f.write(f"{username}\n")
return usernames
async def download_user_files(self, username: str) -> None:
maybe_download = [
(
f"https://www.chess.com/member/{username}",
self.path_coach_file(username, f"{username}.html"),
),
(
f"https://www.chess.com/callback/member/stats/{username}",
self.path_coach_file(username, "stats.json"),
),
]
to_download = []
for d_url, d_filename in maybe_download:
if os.path.isfile(d_filename):
continue
to_download.append((d_url, d_filename))
if not to_download:
return
if self.has_made_request:
await asyncio.sleep(SLEEP_SECS)
await asyncio.gather(
*[self._download_file(url=d[0], filename=d[1]) for d in to_download]
)
async def _download_file(self, url: str, filename: str) -> None:
response, _unused_status = await self.fetch(url)
if response is not None:
with open(filename, "w") as f:
f.write(response)
def _profile_filter(elem: Tag | str | None, attrs={}) -> bool:
if "profile-header-info" in attrs.get("class", ""):
return True
if "profile-card-info" in attrs.get("class", ""):
return True
return False
class Extractor(BaseExtractor):
def __init__(self, fetcher: BaseFetcher, username: str):
super().__init__(fetcher, username)
self.profile_soup = None
try:
filename = self.fetcher.path_coach_file(username, f"{username}.html")
with open(filename, "r") as f:
self.profile_soup = BeautifulSoup(
f.read(), "lxml", parse_only=SoupStrainer(_profile_filter)
)
except FileNotFoundError:
pass
self.stats_json = {}
try:
filename = self.fetcher.path_coach_file(username, "stats.json")
with open(filename, "r") as f:
for s in json.load(f).get("stats", []):
if "key" in s and "stats" in s:
self.stats_json[s["key"]] = s["stats"]
except FileNotFoundError:
pass
def get_name(self) -> str | None:
if self.profile_soup is None:
return None
name = self.profile_soup.find("div", class_="profile-card-name")
if not isinstance(name, Tag):
return None
return name.get_text().strip()
def get_image_url(self) -> str | None:
if self.profile_soup is None:
return None
div = self.profile_soup.find("div", class_="profile-header-avatar")
if not isinstance(div, Tag):
return None
img = div.find("img")
if not isinstance(img, Tag):
return None
src = img.get("src", "")
if not isinstance(src, str):
return None
if "images.chesscomfiles.com" not in src:
return None
return src
def get_title(self) -> Title | None:
if self.profile_soup is None:
return None
a = self.profile_soup.find("a", class_="profile-card-chesstitle")
if not isinstance(a, Tag):
return None
title = a.get_text().strip()
try:
return Title(title)
except ValueError:
return None
def get_languages(self) -> List[str] | None:
# TODO: Extract using huggingface model.
return None
def get_rapid(self) -> int | None:
return self.stats_json.get("rapid", {}).get("rating")
def get_blitz(self) -> int | None:
return self.stats_json.get("lightning", {}).get("rating")
def get_bullet(self) -> int | None:
return self.stats_json.get("bullet", {}).get("rating")
class Pipeline(BasePipeline):
def get_fetcher(self, session: aiohttp.ClientSession):
return Fetcher(session)
def get_extractor(self, fetcher: BaseFetcher, username: str):
return Extractor(fetcher, username)