coach-scraper/app/lichess.py

216 lines
6.7 KiB
Python

import asyncio
import os
import os.path
from typing import List
import aiohttp
from bs4 import BeautifulSoup, SoupStrainer, Tag
from app.pipeline import Extractor as BaseExtractor
from app.pipeline import Fetcher as BaseFetcher
from app.pipeline import Pipeline as BasePipeline
from app.types import Site, lang_to_code
# The number of pages we will at most iterate through. This number was
# determined by going to https://lichess.org/coach/all/all/alphabetical
# and traversing to the last page.
MAX_PAGES = 162
# How long to wait between each network request.
SLEEP_SECS = 5
class Fetcher(BaseFetcher):
def __init__(self, session: aiohttp.ClientSession):
super().__init__(site=Site.LICHESS, session=session)
async def scrape_usernames(self, page_no: int) -> List[str] | None:
if page_no > MAX_PAGES:
return []
print(f"{self.site.value}: Scraping page {page_no}/{MAX_PAGES}")
filepath = self.path_page_file(page_no)
try:
with open(filepath, "r") as f:
return [line.strip() for line in f.readlines()]
except FileNotFoundError:
pass
if self.has_made_request:
await asyncio.sleep(SLEEP_SECS)
url = f"https://lichess.org/coach/all/all/alphabetical?page={page_no}"
response, status_code = await self.fetch(url)
if response is None:
return None # Skips this page.
usernames = []
soup = BeautifulSoup(response, "lxml")
members = soup.find_all("article", class_="coach-widget")
for member in members:
a = member.find("a", class_="overlay")
if a:
href = a.get("href")
username = href[len("/coach/") :]
usernames.append(username)
with open(filepath, "w") as f:
for username in usernames:
f.write(f"{username}\n")
return usernames
async def download_user_files(self, username: str) -> None:
maybe_download = [
(
f"https://lichess.org/coach/{username}",
self.path_coach_file(username, f"{username}.html"),
),
(
f"https://lichess.org/@/{username}",
self.path_coach_file(username, "stats.html"),
),
]
to_download = []
for d_url, d_filename in maybe_download:
if os.path.isfile(d_filename):
continue
to_download.append((d_url, d_filename))
if not to_download:
return
if self.has_made_request:
await asyncio.sleep(SLEEP_SECS)
await asyncio.gather(
*[self._download_file(url=d[0], filename=d[1]) for d in to_download]
)
async def _download_file(self, url: str, filename: str) -> None:
response, _unused_status = await self.fetch(url)
if response is not None:
with open(filename, "w") as f:
f.write(response)
def _profile_filter(elem: Tag | str | None, attrs={}) -> bool:
if "coach-widget" in attrs.get("class", ""):
return True
return False
def _stats_filter(elem: Tag | str | None, attrs={}) -> bool:
if "profile-side" in attrs.get("class", ""):
return True
if "sub-ratings" in attrs.get("class", ""):
return True
return False
class Extractor(BaseExtractor):
def __init__(self, fetcher: BaseFetcher, username: str):
super().__init__(fetcher, username)
self.profile_soup = None
try:
filename = self.fetcher.path_coach_file(username, f"{username}.html")
with open(filename, "r") as f:
self.profile_soup = BeautifulSoup(
f.read(), "lxml", parse_only=SoupStrainer(_profile_filter)
)
except FileNotFoundError:
pass
self.stats_soup = None
try:
filename = self.fetcher.path_coach_file(username, "stats.html")
with open(filename, "r") as f:
self.stats_soup = BeautifulSoup(
f.read(), "lxml", parse_only=SoupStrainer(_stats_filter)
)
except FileNotFoundError:
pass
def get_name(self) -> str | None:
if self.stats_soup is None:
return None
profile_side = self.stats_soup.find("div", class_="profile-side")
if not isinstance(profile_side, Tag):
return None
user_infos = profile_side.find("div", class_="user-infos")
if not isinstance(user_infos, Tag):
return None
name = user_infos.find("strong", class_="name")
if not isinstance(name, Tag):
return None
return name.get_text().strip()
def get_image_url(self) -> str | None:
if self.profile_soup is None:
return None
picture = self.profile_soup.find("img", class_="picture")
if not isinstance(picture, Tag):
return None
src = picture.get("src", "")
if not isinstance(src, str):
return None
if "image.lichess1.org" not in src:
return None
return src
def get_languages(self) -> List[str] | None:
if self.profile_soup is None:
return None
tr = self.profile_soup.find("tr", class_="languages")
if not isinstance(tr, Tag):
return None
td = tr.find("td")
if not isinstance(td, Tag):
return None
codes = []
for lang in [s.strip() for s in tr.get_text().split(",")]:
if lang in lang_to_code:
codes.append(lang_to_code[lang])
return codes
def get_rapid(self) -> int | None:
return self._find_rating("rapid")
def get_blitz(self) -> int | None:
return self._find_rating("blitz")
def get_bullet(self) -> int | None:
return self._find_rating("bullet")
def _find_rating(self, name) -> int | None:
if self.stats_soup is None:
return None
a = self.stats_soup.find("a", href=f"/@/{self.username}/perf/{name}")
if not isinstance(a, Tag):
return None
rating = a.find("rating")
if not isinstance(rating, Tag):
return None
strong = rating.find("strong")
if not isinstance(strong, Tag):
return None
value = strong.get_text()
if value[-1] == "?":
value = value[:-1]
try:
return int(value)
except ValueError:
return None
class Pipeline(BasePipeline):
def get_fetcher(self, session: aiohttp.ClientSession):
return Fetcher(session)
def get_extractor(self, fetcher: BaseFetcher, username: str):
return Extractor(fetcher, username)