diff --git a/README.md b/README.md index 6ba332f..b83e5c1 100644 --- a/README.md +++ b/README.md @@ -13,15 +13,15 @@ This is a simple web scraper for coaches listed on: * [lichess.org](https://www.lichess.org/coach) The program searches for coach usernames as well as specific information about -each of them (their profile, recent activity, and stats). The result will be -found in a newly created `data` directory with the following structure: +each of them (their profile, recent activity, and stats). Data is streamed into +a Postgres instance. Downloaded content is found in a newly created `data` +directory with the following structure: ``` data └── │ ├── coaches │ │ ├── │ │ │ ├── .html -│ │ │ ├── export.json │ │ │ └── ... │ │ ├── ... └── pages @@ -31,20 +31,6 @@ data ## Quickstart -If you have nix available, run: -```bash -$ nix run . -- --user-agent -s [-s ...] -``` -If not, ensure you have [poetry](https://python-poetry.org/) on your machine and -instead run the following: -```bash -$ poetry run python3 -m app -u -s [-s ...] -``` -After running (this may take several hours), a new CSV will be generated at -`data/export.csv` containing all scraped content from the specified ``s. - -## Database - Included in the development shell of this flake is a [Postgres](https://www.postgresql.org/) client (version 15.5). Generate an empty Postgres cluster at `/db` by running ```bash @@ -64,34 +50,18 @@ To later shut the database down, run: ```bash $ pg_ctl -D db stop ``` - -### Loading Data - -To load all exported coach data into a local Postgres instance, use the provided -`sql/*.sql` files. First initialize the export schema/table: +Initialize the table that scraped content will be streamed into: ```bash $ psql -h @scraper -f sql/init.sql ``` -Next, dump exported data into the newly created table: +If you have nix available, you can now run the scraper via ```bash -$ psql -h @scraper -f sql/export.sql -v export="'$PWD/data/export.csv'" +$ nix run . -- ... ``` -Re-running the `sql/export.sql` script will create a backup of the -`coach_scraper.export` table. It will then upsert the scraped data. You can view -all backups from the `psql` console like so: -``` -postgres=# \dt coach_scraper.export* -``` - -### E2E - -With the above section on loading files, we now have the individual components -necessary to scrape coach data from our chess website and dump the results into -the database in one fell swoop. Assuming our database is open with a socket -connection available at `@scraper`: +Otherwise, ensure you have [poetry](https://python-poetry.org/) on your machine +and instead run the following: ```bash -$ nix run . -- --user-agent -s chesscom -s lichess -$ psql -h @scraper -f sql/init.sql -f sql/export.sql -v export="'$PWD/data/export.csv'" +$ poetry run python3 -m app ... ``` ## Development diff --git a/app/__main__.py b/app/__main__.py index 361adf1..9df2e71 100644 --- a/app/__main__.py +++ b/app/__main__.py @@ -1,25 +1,52 @@ import argparse import asyncio -import csv -import json +from typing import List import aiohttp +import psycopg2 -from app.chesscom import Exporter as ChesscomExporter -from app.chesscom import Scraper as ChesscomScraper -from app.lichess import Exporter as LichessExporter -from app.lichess import Scraper as LichessScraper -from app.repo import Site +from app.chesscom import Pipeline as ChesscomPipeline +from app.database import backup_database +from app.lichess import Pipeline as LichessPipeline +from app.pipeline import Site + +# The number of parallel extraction jobs that are run at a time. +WORKER_COUNT = 10 -async def run(): +async def _process(site: Site, conn, session: aiohttp.ClientSession): + if site == Site.CHESSCOM: + await ChesscomPipeline(worker_count=WORKER_COUNT).process(conn, session) + elif site == Site.LICHESS: + await LichessPipeline(worker_count=WORKER_COUNT).process(conn, session) + else: + assert False, f"Encountered unknown site: {site}." + + +async def _entrypoint(conn, user_agent: str, sites: List[Site]): + """Top-level entrypoint that dispatches a pipeline per requested site.""" + async with aiohttp.ClientSession( + headers={"User-Agent": f"BoardWise coach-scraper ({user_agent})"} + ) as session: + await asyncio.gather(*[_process(site, conn, session) for site in sites]) + + +def main(): parser = argparse.ArgumentParser( prog="coach-scraper", description="Scraping/exporting of chess coaches.", ) - parser.add_argument("-u", "--user-agent", required=True) + + # Database-related arguments. + parser.add_argument("--host", required=True) + parser.add_argument("--dbname", default="postgres") + parser.add_argument("--user", default="postgres") + parser.add_argument("--password", default="password") + parser.add_argument("--port", default=5432) + + # Client session-related arguments. + parser.add_argument("--user-agent", required=True) parser.add_argument( - "-s", "--site", required=True, action="append", @@ -28,43 +55,29 @@ async def run(): Site.LICHESS.value, ], ) + args = parser.parse_args() - async with aiohttp.ClientSession( - headers={"User-Agent": f"BoardWise coach-scraper ({args.user_agent})"} - ) as session: - with open("data/export.csv", "w") as f: - writer = csv.writer(f, quoting=csv.QUOTE_MINIMAL) - for site in set(args.site): - scraper, exporter_cls = None, None - - if site == Site.CHESSCOM.value: - scraper = ChesscomScraper(session) - exporter_cls = ChesscomExporter - elif site == Site.LICHESS.value: - scraper = LichessScraper(session) - exporter_cls = LichessExporter - - usernames = await scraper.scrape() - for username in usernames: - export = exporter_cls(username).export() - writer.writerow( - [ - # This should match the order data is loaded in the - # sql/export.sql script. - export["site"], - export["username"], - export.get("name", ""), - export.get("image_url", ""), - export.get("rapid", ""), - export.get("blitz", ""), - export.get("bullet", ""), - ] - ) - - -def main(): - asyncio.run(run()) + conn = None + try: + conn = psycopg2.connect( + dbname=args.dbname, + user=args.user, + host=args.host, + password=args.password, + port=args.port, + ) + backup_database(conn) + asyncio.run( + _entrypoint( + conn=conn, + user_agent=args.user_agent, + sites=list(map(Site, set(args.site))), + ) + ) + finally: + if conn: + conn.close() if __name__ == "__main__": diff --git a/app/chesscom.py b/app/chesscom.py index ae9ac30..3f0b9cf 100644 --- a/app/chesscom.py +++ b/app/chesscom.py @@ -7,9 +7,10 @@ from typing import List, Union import aiohttp from bs4 import BeautifulSoup, SoupStrainer -from app.exporter import BaseExporter -from app.repo import AnsiColor, Site -from app.scraper import BaseScraper +from app.pipeline import Extractor as BaseExtractor +from app.pipeline import Fetcher as BaseFetcher +from app.pipeline import Pipeline as BasePipeline +from app.pipeline import Site # The number of coach listing pages we will at most iterate through. This number # was determined by going to chess.com/coaches?sortBy=alphabetical&page=1 and @@ -20,78 +21,30 @@ MAX_PAGES = 64 SLEEP_SECS = 3 -class Scraper(BaseScraper): +class Fetcher(BaseFetcher): def __init__(self, session: aiohttp.ClientSession): - super().__init__(site=Site.CHESSCOM.value, session=session) + super().__init__(site=Site.CHESSCOM, session=session) - async def download_usernames(self) -> List[str]: - """Scan through chess.com/coaches for all coaches' usernames. + async def scrape_usernames(self, page_no: int) -> List[str]: + if page_no > MAX_PAGES: + return [] - @return - The complete list of scraped usernames across every coach listing - page. - """ - usernames = [] - for page_no in range(1, MAX_PAGES + 1): - filepath = self.path_page_file(page_no) - try: - with open(filepath, "r") as f: - self.log( - [ - (AnsiColor.INFO, "[INFO]"), - (None, ": Reading file "), - (AnsiColor.DATA, filepath), - ] - ) - usernames.extend([line.strip() for line in f.readlines()]) - except FileNotFoundError: - page_usernames = await self._scrape_page(page_no) - if not page_usernames: - self.log( - [ - (AnsiColor.ERROR, "[ERROR]"), - (None, ": Could not scrape page "), - (AnsiColor.DATA, str(page_no)), - ] - ) - continue - with open(filepath, "w") as f: - for username in page_usernames: - f.write(f"{username}\n") - usernames.extend(page_usernames) - self.log( - [ - (AnsiColor.INFO, "[INFO]"), - (None, ": Downloaded page "), - (AnsiColor.DATA, filepath), - ] - ) - await asyncio.sleep(SLEEP_SECS) + print(f"{self.site.value}: Scraping page {page_no}/{MAX_PAGES}") - return usernames + filepath = self.path_page_file(page_no) + try: + with open(filepath, "r") as f: + return [line.strip() for line in f.readlines()] + except FileNotFoundError: + pass - async def _scrape_page(self, page_no: int) -> List[str]: - """Scan through chess.com/coaches/?page= for all coaches' usernames. + if self.has_made_request: + await asyncio.sleep(SLEEP_SECS) - @param page_no - The page consisting of at most 25 coaches (at the time of writing) - whose usernames are to be scraped. - @return - The list of scraped usernames on the specified coach listing page. - """ url = f"https://www.chess.com/coaches?sortBy=alphabetical&page={page_no}" - response, status_code = await self.request(url) + response, status_code = await self.fetch(url) if response is None: - self.log( - [ - (AnsiColor.ERROR, "[ERROR]"), - (None, ": Received status "), - (AnsiColor.DATA, f"{status_code} "), - (None, "when downloading page "), - (AnsiColor.DATA, str(page_no)), - ] - ) - return + return None # Skips this page. usernames = [] soup = BeautifulSoup(response, "lxml") @@ -101,92 +54,67 @@ class Scraper(BaseScraper): username = href[len("https://www.chess.com/member/") :] usernames.append(username) + # Cache results. + with open(filepath, "w") as f: + for username in usernames: + f.write(f"{username}\n") + return usernames - async def download_profile(self, username: str): - """For each coach, download coach-specific data. + async def download_user_files(self, username: str) -> None: + maybe_download = [ + ( + f"https://www.chess.com/member/{username}", + self.path_coach_file(username, f"{username}.html"), + ), + ( + f"https://www.chess.com/callback/member/activity/{username}?page=1", + self.path_coach_file(username, "activity.json"), + ), + ( + f"https://www.chess.com/callback/member/stats/{username}", + self.path_coach_file(username, "stats.json"), + ), + ] - This sends three parallel requests for: - * the coach's profile, - * the coach's recent activity, - * the coach's stats. + to_download = [] + for d_url, d_filename in maybe_download: + if os.path.isfile(d_filename): + continue + to_download.append((d_url, d_filename)) - @param username - The coach username corresponding to the downloaded files. - """ - used_network = await asyncio.gather( - self._download_profile_file( - url=f"https://www.chess.com/member/{username}", - username=username, - filename=self.path_coach_file(username, f"{username}.html"), - ), - self._download_profile_file( - url=f"https://www.chess.com/callback/member/activity/{username}?page=1", - username=username, - filename=self.path_coach_file(username, "activity.json"), - ), - self._download_profile_file( - url=f"https://www.chess.com/callback/member/stats/{username}", - username=username, - filename=self.path_coach_file(username, "stats.json"), - ), - ) - if any(used_network): - self.log( - [ - (AnsiColor.INFO, "[INFO]"), - (None, ": Downloaded data for coach "), - (AnsiColor.DATA, username), - ] - ) + if not to_download: + return + + if self.has_made_request: await asyncio.sleep(SLEEP_SECS) - else: - self.log( - [ - (AnsiColor.INFO, "[INFO]"), - (None, ": Skipping download for coach "), - (AnsiColor.DATA, username), - ] - ) - async def _download_profile_file(self, url: str, username: str, filename: str): - """Writes the contents of url into the specified file. + await asyncio.gather( + *[self._download_file(url=d[0], filename=d[1]) for d in to_download] + ) - @param url - The URL of the file to download. - @param username - The coach username corresponding to the downloaded file. - @param filename - The output file to write the downloaded content to. - @return: - True if we make a network request. False otherwise. - """ - if os.path.isfile(filename): - return False - - response, _unused_status = await self.request(url) + async def _download_file(self, url: str, filename: str) -> None: + response, _unused_status = await self.fetch(url) if response is not None: with open(filename, "w") as f: f.write(response) - return True - def _profile_filter(elem, attrs): - """Includes only relevant segments of the `{username}.html` file.""" if "profile-header-info" in attrs.get("class", ""): return True if "profile-card-info" in attrs.get("class", ""): return True -class Exporter(BaseExporter): - def __init__(self, username: str): - super().__init__(site=Site.CHESSCOM.value, username=username) +class Extractor(BaseExtractor): + def __init__(self, fetcher: Fetcher, username: str): + super().__init__(fetcher, username) self.profile_soup = None try: - with open(self.path_coach_file(username, f"{username}.html"), "r") as f: + filename = self.fetcher.path_coach_file(username, f"{username}.html") + with open(filename, "r") as f: self.profile_soup = BeautifulSoup( f.read(), "lxml", parse_only=SoupStrainer(_profile_filter) ) @@ -195,21 +123,22 @@ class Exporter(BaseExporter): self.stats_json = {} try: - with open(self.path_coach_file(username, "stats.json"), "r") as f: + filename = self.fetcher.path_coach_file(username, "stats.json") + with open(filename, "r") as f: for s in json.load(f).get("stats", []): if "key" in s and "stats" in s: self.stats_json[s["key"]] = s["stats"] except FileNotFoundError: pass - def export_name(self) -> Union[str, None]: + def get_name(self) -> Union[str, None]: try: name = self.profile_soup.find("div", class_="profile-card-name") return name.get_text().strip() except AttributeError: return None - def export_image_url(self) -> Union[str, None]: + def get_image_url(self) -> Union[str, None]: try: div = self.profile_soup.find("div", class_="profile-header-avatar") src = div.find("img").get("src", "") @@ -218,11 +147,19 @@ class Exporter(BaseExporter): except AttributeError: return None - def export_rapid(self) -> Union[int, None]: + def get_rapid(self) -> Union[int, None]: return self.stats_json.get("rapid", {}).get("rating") - def export_blitz(self) -> Union[int, None]: + def get_blitz(self) -> Union[int, None]: return self.stats_json.get("lightning", {}).get("rating") - def export_bullet(self) -> Union[int, None]: + def get_bullet(self) -> Union[int, None]: return self.stats_json.get("bullet", {}).get("rating") + + +class Pipeline(BasePipeline): + def get_fetcher(self, session: aiohttp.ClientSession): + return Fetcher(session) + + def get_extractor(self, fetcher: Fetcher, username: str): + return Extractor(fetcher, username) diff --git a/app/database.py b/app/database.py new file mode 100644 index 0000000..e598135 --- /dev/null +++ b/app/database.py @@ -0,0 +1,113 @@ +import sys +from datetime import datetime + +from typing_extensions import TypedDict + +SCHEMA_NAME = "coach_scraper" +TABLE_NAME = "export" + + +class Row(TypedDict, total=False): + """Representation of a row of the export table. + + The (site, username) make up a unique key for each coach. + """ + + # Website the given coach was sourced from. + site: str + # Username used on the source site. + username: str + # Real name. + name: str + # Profile image used on the source site. + image_url: str + # Rapid rating relative to the site they were sourced from. + rapid: int + # Blitz rating relative to the site they were sourced from. + blitz: int + # Bullet rating relative to the site they were sourced from. + bullet: int + + +def backup_database(conn): + """Creates a backup of the export table. + + Simply copies the table at time of invocation into another table with a + `_%t` suffix, where %t denotes the number of seconds since the Unix epoch. + """ + cursor = None + try: + cursor = conn.cursor() + cursor.execute( + f""" + SELECT 1 + FROM information_schema.tables + WHERE table_schema = '{SCHEMA_NAME}' + AND table_name = '{TABLE_NAME}'; + """ + ) + + result = cursor.fetchone() + if result is None: + print(f"Missing `{SCHEMA_NAME}.{TABLE_NAME}` table.", file=sys.stderr) + sys.exit(1) + + timestamp = int((datetime.now() - datetime(1970, 1, 1)).total_seconds()) + cursor.execute( + f""" + CREATE TABLE {SCHEMA_NAME}.{TABLE_NAME}_{timestamp} + AS TABLE {SCHEMA_NAME}.{TABLE_NAME} + """ + ) + finally: + if cursor: + cursor.close() + + +def upsert_row(conn, row: Row): + """Upsert the specified `Row` into the database table.""" + cursor = None + try: + cursor = conn.cursor() + cursor.execute( + f""" + INSERT INTO {SCHEMA_NAME}.{TABLE_NAME} + ( site + , username + , name + , image_url + , rapid + , blitz + , bullet + ) + VALUES + ( %s + , %s + , %s + , %s + , %s + , %s + , %s + ) + ON CONFLICT + (site, username) + DO UPDATE SET + name = EXCLUDED.name, + image_url = EXCLUDED.image_url, + rapid = EXCLUDED.rapid, + blitz = EXCLUDED.blitz, + bullet = EXCLUDED.bullet; + """, + [ + row["site"].value, + row["username"], + row.get("name"), + row.get("image_url"), + row.get("rapid"), + row.get("blitz"), + row.get("bullet"), + ], + ) + conn.commit() + finally: + cursor.close() diff --git a/app/exporter.py b/app/exporter.py deleted file mode 100644 index 9e6d200..0000000 --- a/app/exporter.py +++ /dev/null @@ -1,69 +0,0 @@ -from typing import Any, Union - -from typing_extensions import TypedDict - -from app.repo import AnsiColor, Repo - - -class Export(TypedDict, total=False): - # The (site, username) make up a unique key for each coach. - site: str - username: str - # The coach's real name. - name: str - # The profile image used on the source site. - image_url: str - # The coach's rapid rating relative to the site they were sourced from. - rapid: int - # The coach's blitz rating relative to the site they were sourced from. - blitz: int - # The coach's bullet rating relative to the site they were sourced from. - bullet: int - - -def _insert(export: Export, key: str, value: Any): - if value is not None: - export[key] = value - - -class BaseExporter(Repo): - def __init__(self, site: str, username: str): - super().__init__(site) - self.username = username - - def export_name(self) -> Union[str, None]: - raise NotImplementedError() - - def export_image_url(self) -> Union[str, None]: - raise NotImplementedError() - - def export_rapid(self) -> Union[int, None]: - raise NotImplementedError() - - def export_blitz(self) -> Union[int, None]: - raise NotImplementedError() - - def export_bullet(self) -> Union[int, None]: - raise NotImplementedError() - - def export(self) -> Export: - """Transform coach-specific data into uniform format.""" - export: Export = {} - - _insert(export, "site", self.site) - _insert(export, "username", self.username) - _insert(export, "name", self.export_name()) - _insert(export, "image_url", self.export_image_url()) - _insert(export, "rapid", self.export_rapid()) - _insert(export, "blitz", self.export_blitz()) - _insert(export, "bullet", self.export_bullet()) - - self.log( - [ - (AnsiColor.INFO, "[INFO]"), - (None, ": Exported "), - (AnsiColor.DATA, self.username), - ] - ) - - return export diff --git a/app/lichess.py b/app/lichess.py index 22cb86d..9b67d48 100644 --- a/app/lichess.py +++ b/app/lichess.py @@ -6,9 +6,10 @@ from typing import List, Union import aiohttp from bs4 import BeautifulSoup, SoupStrainer -from app.exporter import BaseExporter -from app.repo import AnsiColor, Site -from app.scraper import BaseScraper +from app.pipeline import Extractor as BaseExtractor +from app.pipeline import Fetcher as BaseFetcher +from app.pipeline import Pipeline as BasePipeline +from app.pipeline import Site # The number of pages we will at most iterate through. This number was # determined by going to https://lichess.org/coach/all/all/alphabetical @@ -19,79 +20,30 @@ MAX_PAGES = 162 SLEEP_SECS = 5 -class Scraper(BaseScraper): +class Fetcher(BaseFetcher): def __init__(self, session: aiohttp.ClientSession): - super().__init__(site=Site.LICHESS.value, session=session) + super().__init__(site=Site.LICHESS, session=session) - async def download_usernames(self) -> List[str]: - """Scan through lichess.org/coach for all coaches' usernames. + async def scrape_usernames(self, page_no: int) -> List[str]: + if page_no > MAX_PAGES: + return [] - @return - The complete list of scraped usernames across every coach listing - page. - """ - usernames = [] - for page_no in range(1, MAX_PAGES + 1): - filepath = self.path_page_file(page_no) - try: - with open(filepath, "r") as f: - self.log( - [ - (AnsiColor.INFO, "[INFO]"), - (None, ": Reading file "), - (AnsiColor.DATA, filepath), - ] - ) - usernames.extend([line.strip() for line in f.readlines()]) - except FileNotFoundError: - page_usernames = await self._scrape_page(page_no) - if not page_usernames: - self.log( - [ - (AnsiColor.ERROR, "[ERROR]"), - (None, ": Could not scrape page "), - (AnsiColor.DATA, str(page_no)), - ] - ) - continue - with open(filepath, "w") as f: - for username in page_usernames: - f.write(f"{username}\n") - usernames.extend(page_usernames) - self.log( - [ - (AnsiColor.INFO, "[INFO]"), - (None, ": Downloaded page "), - (AnsiColor.DATA, filepath), - ] - ) - await asyncio.sleep(SLEEP_SECS) + print(f"{self.site.value}: Scraping page {page_no}/{MAX_PAGES}") - return usernames + filepath = self.path_page_file(page_no) + try: + with open(filepath, "r") as f: + return [line.strip() for line in f.readlines()] + except FileNotFoundError: + pass - async def _scrape_page(self, page_no: int): - """Scan through lichess.org/coach/.../?page= for all coaches' - usernames. + if self.has_made_request: + await asyncio.sleep(SLEEP_SECS) - @param page_no - The page consisting of at most 10 coaches (at the time of writing) - whose usernames are to be scraped. - @return - The list of scraped usernames on the specified coach listing page. - """ url = f"https://lichess.org/coach/all/all/alphabetical?page={page_no}" - response, status_code = await self.request(url) + response, status_code = await self.fetch(url) if response is None: - self.log( - [ - (AnsiColor.ERROR, "[ERROR]"), - (None, ": Received status "), - (AnsiColor.DATA, f"{status_code} "), - (None, "when downloading page "), - (AnsiColor.DATA, str(page_no)), - ] - ) - return + return None # Skips this page. usernames = [] soup = BeautifulSoup(response, "lxml") @@ -103,87 +55,67 @@ class Scraper(BaseScraper): username = href[len("/coach/") :] usernames.append(username) + with open(filepath, "w") as f: + for username in usernames: + f.write(f"{username}\n") + return usernames - async def download_profile(self, username: str): - """For each coach, download coach-specific data. + async def download_user_files(self, username: str) -> None: + maybe_download = [ + ( + f"https://lichess.org/coach/{username}", + self.path_coach_file(username, f"{username}.html"), + ), + ( + f"https://lichess.org/@/{username}", + self.path_coach_file(username, "stats.html"), + ), + ] - @param username - The coach username corresponding to the downloaded files. - """ - used_network1 = await self._download_profile_file( - url=f"https://lichess.org/coach/{username}", - username=username, - filename=self.path_coach_file(username, f"{username}.html"), - ) - used_network2 = await self._download_profile_file( - url=f"https://lichess.org/@/{username}", - username=username, - filename=self.path_coach_file(username, "stats.html"), - ) + to_download = [] + for d_url, d_filename in maybe_download: + if os.path.isfile(d_filename): + continue + to_download.append((d_url, d_filename)) - if any([used_network1, used_network2]): - self.log( - [ - (AnsiColor.INFO, "[INFO]"), - (None, ": Downloaded data for coach "), - (AnsiColor.DATA, username), - ] - ) + if not to_download: + return + + if self.has_made_request: await asyncio.sleep(SLEEP_SECS) - else: - self.log( - [ - (AnsiColor.INFO, "[INFO]"), - (None, ": Skipping download for coach "), - (AnsiColor.DATA, username), - ] - ) - async def _download_profile_file(self, url: str, username: str, filename: str): - """Writes the contents of url into the specified file. + await asyncio.gather( + *[self._download_file(url=d[0], filename=d[1]) for d in to_download] + ) - @param url - The URL of the file to download. - @param username - The coach username corresponding to the downloaded file. - @param filename - The output file to write the downloaded content to. - @return: - True if we make a network request. False otherwise. - """ - if os.path.isfile(filename): - return False - - response, _unused_status = await self.request(url) + async def _download_file(self, url: str, filename: str) -> None: + response, _unused_status = await self.fetch(url) if response is not None: with open(filename, "w") as f: f.write(response) - return True - def _profile_filter(elem, attrs): - """Includes only relevant segments of the `{username}.html` file.""" if "coach-widget" in attrs.get("class", ""): return True def _stats_filter(elem, attrs): - """Includes only relevant segments of the `stats.html` file.""" if "profile-side" in attrs.get("class", ""): return True if "sub-ratings" in attrs.get("class", ""): return True -class Exporter(BaseExporter): - def __init__(self, username: str): - super().__init__(site=Site.LICHESS.value, username=username) +class Extractor(BaseExtractor): + def __init__(self, fetcher: Fetcher, username: str): + super().__init__(fetcher, username) self.profile_soup = None try: - with open(self.path_coach_file(username, f"{username}.html"), "r") as f: + filename = self.fetcher.path_coach_file(username, f"{username}.html") + with open(filename, "r") as f: self.profile_soup = BeautifulSoup( f.read(), "lxml", parse_only=SoupStrainer(_profile_filter) ) @@ -192,14 +124,15 @@ class Exporter(BaseExporter): self.stats_soup = None try: - with open(self.path_coach_file(username, "stats.html"), "r") as f: + filename = self.fetcher.path_coach_file(username, "stats.html") + with open(filename, "r") as f: self.stats_soup = BeautifulSoup( f.read(), "lxml", parse_only=SoupStrainer(_stats_filter) ) except FileNotFoundError: pass - def export_name(self) -> Union[str, None]: + def get_name(self) -> Union[str, None]: try: profile_side = self.stats_soup.find("div", class_="profile-side") user_infos = profile_side.find("div", class_="user-infos") @@ -208,7 +141,7 @@ class Exporter(BaseExporter): except AttributeError: return None - def export_image_url(self) -> Union[str, None]: + def get_image_url(self) -> Union[str, None]: try: picture = self.profile_soup.find("img", class_="picture") src = picture.get("src", "") @@ -217,13 +150,13 @@ class Exporter(BaseExporter): except AttributeError: return None - def export_rapid(self) -> Union[int, None]: + def get_rapid(self) -> Union[int, None]: return self._find_rating("rapid") - def export_blitz(self) -> Union[int, None]: + def get_blitz(self) -> Union[int, None]: return self._find_rating("blitz") - def export_bullet(self) -> Union[int, None]: + def get_bullet(self) -> Union[int, None]: return self._find_rating("bullet") def _find_rating(self, name) -> Union[int, None]: @@ -237,3 +170,11 @@ class Exporter(BaseExporter): return int(value) except (AttributeError, ValueError): return None + + +class Pipeline(BasePipeline): + def get_fetcher(self, session: aiohttp.ClientSession): + return Fetcher(session) + + def get_extractor(self, fetcher: Fetcher, username: str): + return Extractor(fetcher, username) diff --git a/app/pipeline.py b/app/pipeline.py new file mode 100644 index 0000000..fca5ed9 --- /dev/null +++ b/app/pipeline.py @@ -0,0 +1,192 @@ +import asyncio +import enum +import os.path +from typing import Any, List, Tuple, Union + +import aiohttp + +from app.database import Row, upsert_row + + +class Site(enum.Enum): + CHESSCOM = "chesscom" + LICHESS = "lichess" + + +class Fetcher: + """Download and cache files from the specified site. + + Each implementation of this class is responsible for rate-limiting requests. + """ + + def __init__(self, site: Site, session: aiohttp.ClientSession): + self.site = site + self.session = session + self.has_made_request = False + + os.makedirs(self.path_coaches_dir(), exist_ok=True) + os.makedirs(self.path_pages_dir(), exist_ok=True) + + def path_site_dir(self): + return os.path.join("data", self.site.value) + + def path_site_file(self, filename: str): + return os.path.join(self.path_site_dir(), filename) + + def path_coaches_dir(self): + return os.path.join(self.path_site_dir(), "coaches") + + def path_coach_dir(self, username: str): + return os.path.join(self.path_coaches_dir(), username) + + def path_coach_file(self, username: str, filename: str): + return os.path.join(self.path_coach_dir(username), filename) + + def path_pages_dir(self): + return os.path.join(self.path_site_dir(), "pages") + + def path_page_file(self, page_no: int): + return os.path.join(self.path_pages_dir(), f"{page_no}.txt") + + async def fetch(self, url: str) -> Tuple[Union[str, None], int]: + """Make network requests using the internal session. + + @param url + The URL to make a GET request to. + @return + Tuple containing the response body (if the request was successful) + and status code. + """ + self.has_made_request = True + async with self.session.get(url) as response: + if response.status == 200: + return await response.text(), 200 + return None, response.status + + async def scrape_usernames(self, page_no: int) -> Union[List[str], None]: + """Source the specified site for all coach usernames. + + All pages should be downloaded at `self.path_page_file()`. Any cached + file should be a plain `.txt` file containing one username per-line. + + @param page_no: + How many times this function was invoked (1-indexed). Useful to + paginate responses back out to the `Pipeline` this `Downloader` + is embedded in. + @return: + A list of usernames. Should return an empty list if no more + usernames are found. Can return `None` to indicate the specified + page should be skipped. + """ + raise NotImplementedError() + + async def _download_user_files(self, username: str) -> None: + os.makedirs(self.path_coach_dir(username), exist_ok=True) + await self.download_user_files(username) + + async def download_user_files(self, username: str) -> None: + """Source the specified site for all user-specific files. + + What files are downloaded depends on the `Downloader` implementation. + All files should be downloaded at `self.path_coach_file()`. + """ + raise NotImplementedError() + + +def _insert(row: Row, key: str, value: Any): + if value is not None: + row[key] = value + + +class Extractor: + def __init__(self, fetcher: Fetcher, username: str): + self.fetcher = fetcher + self.username = username + + def get_name(self) -> Union[str, None]: + raise NotImplementedError() + + def get_image_url(self) -> Union[str, None]: + raise NotImplementedError() + + def get_rapid(self) -> Union[int, None]: + raise NotImplementedError() + + def get_blitz(self) -> Union[int, None]: + raise NotImplementedError() + + def get_bullet(self) -> Union[int, None]: + raise NotImplementedError() + + def extract(self) -> Row: + """Extract a table row from the coach-specific downloads.""" + row: Row = {} + + _insert(row, "site", self.fetcher.site) + _insert(row, "username", self.username) + + _insert(row, "name", self.get_name()) + _insert(row, "image_url", self.get_image_url()) + _insert(row, "rapid", self.get_rapid()) + _insert(row, "blitz", self.get_blitz()) + _insert(row, "bullet", self.get_bullet()) + + return row + + +async def task_worker(name, queue): + while True: + conn, extractor = await queue.get() + upsert_row(conn, extractor.extract()) + queue.task_done() + + +class Pipeline: + """Site specific download and extraction pipeline. + + Performs downloads serially but processes data extraction from downloaded + files concurrently. + """ + + def __init__(self, worker_count): + self.worker_count = worker_count + + def get_fetcher(self, session: aiohttp.ClientSession) -> Fetcher: + raise NotImplementedError() + + def get_extractor(self, fetcher: Fetcher, username: str) -> Extractor: + raise NotImplementedError() + + async def process(self, conn, session: aiohttp.ClientSession): + fetcher = self.get_fetcher(session) + + queue = asyncio.Queue() + + # Create a batch of workers to process the jobs put into the queue. + workers = [] + for i in range(self.worker_count): + worker = asyncio.create_task(task_worker(f"worker-{i}", queue)) + workers.append(worker) + + # Begin downloading all coach usernames and files. The workers will + # run concurrently to extract all the relvant information and write + page_no = 1 + usernames = [None] + while len(usernames): + usernames = await fetcher.scrape_usernames(page_no) + page_no += 1 + if usernames is None: + usernames = [None] + continue + for username in usernames: + await fetcher._download_user_files(username) + extractor = self.get_extractor(fetcher, username) + queue.put_nowait((conn, extractor)) + + # Wait until the queue is fully processed. + await queue.join() + + # We can now turn down the workers. + for worker in workers: + worker.cancel() + await asyncio.gather(*workers, return_exceptions=True) diff --git a/app/repo.py b/app/repo.py deleted file mode 100644 index 69a06f3..0000000 --- a/app/repo.py +++ /dev/null @@ -1,60 +0,0 @@ -import enum -import os -from typing import List, Tuple, Union - - -class AnsiColor(enum.Enum): - ERROR = "\033[0;31m" - INFO = "\033[0;34m" - DATA = "\033[0;36m" - RESET = "\033[0m" - - -class Site(enum.Enum): - CHESSCOM = "chesscom" - LICHESS = "lichess" - - -class Repo: - """Shared filesystem-related functionality.""" - - def __init__(self, site: str): - self.site = site - - def path_site_dir(self): - """The root directory for all site-related files.""" - return os.path.join("data", self.site) - - def path_site_file(self, filename: str): - """Path to a top-level site-related file.""" - return os.path.join(self.path_site_dir(), filename) - - def path_coaches_dir(self): - """The root directory for all coach-related downloads.""" - return os.path.join(self.path_site_dir(), "coaches") - - def path_coach_dir(self, username: str): - """The root directory for a specific coach's downloads.""" - return os.path.join(self.path_coaches_dir(), username) - - def path_coach_file(self, username: str, filename: str): - """Path to a coach-specific file download.""" - return os.path.join(self.path_coach_dir(username), filename) - - def path_pages_dir(self): - """The root directory for all username listing files.""" - return os.path.join(self.path_site_dir(), "pages") - - def path_page_file(self, page_no: int): - """The root directory for usernames scraped from a single page.""" - return os.path.join(self.path_pages_dir(), f"{page_no}.txt") - - def log(self, msgs: List[Tuple[Union[AnsiColor, None], str]]): - transformed = [] - for k, v in msgs: - if k is None: - transformed.append(v) - else: - transformed.append(f"{k.value}{v}{AnsiColor.RESET.value}") - - print("".join(transformed)) diff --git a/app/scraper.py b/app/scraper.py deleted file mode 100644 index ca51128..0000000 --- a/app/scraper.py +++ /dev/null @@ -1,59 +0,0 @@ -import os -from typing import List, Tuple, Union - -import aiohttp - -from app.repo import Repo - - -class BaseScraper(Repo): - def __init__(self, site: str, session: aiohttp.ClientSession): - """Initialize a new web scraper. - - @param site: - The site we are making requests out to. - @param session: - The `aiohttp.ClientSession` context our requests are made from. - """ - super().__init__(site) - self.session = session - - async def download_usernames(self) -> List[str]: - """Collect all coach usernames from the specified site.""" - raise NotImplementedError() - - async def download_profile(self, username: str): - """For each coach, download coach-specific data.""" - raise NotImplementedError() - - async def request(self, url: str) -> Tuple[Union[str, None], int]: - """Make network requests using the internal session. - - @param url - The URL to make a GET request to. - @return - Tuple containing the response body (if the request was successful) - and status code. - """ - async with self.session.get(url) as response: - if response.status == 200: - return await response.text(), 200 - return None, response.status - - async def scrape(self) -> List[str]: - """Main entrypoint for scraping and exporting downloaded content. - - A `Scraper` is structured to operates in the following stages: - - 1. Collect all coach usernames from the specified site. - 2. For each coach, download coach-specific data. - 3. Transform this data and export into uniform format. - """ - os.makedirs(self.path_coaches_dir(), exist_ok=True) - os.makedirs(self.path_pages_dir(), exist_ok=True) - usernames = await self.download_usernames() - for username in usernames: - os.makedirs(self.path_coach_dir(username), exist_ok=True) - await self.download_profile(username) - - return usernames diff --git a/poetry.lock b/poetry.lock index f1deaec..90592f3 100644 --- a/poetry.lock +++ b/poetry.lock @@ -416,6 +416,28 @@ files = [ {file = "multidict-6.0.4.tar.gz", hash = "sha256:3666906492efb76453c0e7b97f2cf459b0682e7402c0489a95484965dbc1da49"}, ] +[[package]] +name = "psycopg2" +version = "2.9.9" +description = "psycopg2 - Python-PostgreSQL Database Adapter" +optional = false +python-versions = ">=3.7" +files = [ + {file = "psycopg2-2.9.9-cp310-cp310-win32.whl", hash = "sha256:38a8dcc6856f569068b47de286b472b7c473ac7977243593a288ebce0dc89516"}, + {file = "psycopg2-2.9.9-cp310-cp310-win_amd64.whl", hash = "sha256:426f9f29bde126913a20a96ff8ce7d73fd8a216cfb323b1f04da402d452853c3"}, + {file = "psycopg2-2.9.9-cp311-cp311-win32.whl", hash = "sha256:ade01303ccf7ae12c356a5e10911c9e1c51136003a9a1d92f7aa9d010fb98372"}, + {file = "psycopg2-2.9.9-cp311-cp311-win_amd64.whl", hash = "sha256:121081ea2e76729acfb0673ff33755e8703d45e926e416cb59bae3a86c6a4981"}, + {file = "psycopg2-2.9.9-cp312-cp312-win32.whl", hash = "sha256:d735786acc7dd25815e89cc4ad529a43af779db2e25aa7c626de864127e5a024"}, + {file = "psycopg2-2.9.9-cp312-cp312-win_amd64.whl", hash = "sha256:a7653d00b732afb6fc597e29c50ad28087dcb4fbfb28e86092277a559ae4e693"}, + {file = "psycopg2-2.9.9-cp37-cp37m-win32.whl", hash = "sha256:5e0d98cade4f0e0304d7d6f25bbfbc5bd186e07b38eac65379309c4ca3193efa"}, + {file = "psycopg2-2.9.9-cp37-cp37m-win_amd64.whl", hash = "sha256:7e2dacf8b009a1c1e843b5213a87f7c544b2b042476ed7755be813eaf4e8347a"}, + {file = "psycopg2-2.9.9-cp38-cp38-win32.whl", hash = "sha256:ff432630e510709564c01dafdbe996cb552e0b9f3f065eb89bdce5bd31fabf4c"}, + {file = "psycopg2-2.9.9-cp38-cp38-win_amd64.whl", hash = "sha256:bac58c024c9922c23550af2a581998624d6e02350f4ae9c5f0bc642c633a2d5e"}, + {file = "psycopg2-2.9.9-cp39-cp39-win32.whl", hash = "sha256:c92811b2d4c9b6ea0285942b2e7cac98a59e166d59c588fe5cfe1eda58e72d59"}, + {file = "psycopg2-2.9.9-cp39-cp39-win_amd64.whl", hash = "sha256:de80739447af31525feddeb8effd640782cf5998e1a4e9192ebdf829717e3913"}, + {file = "psycopg2-2.9.9.tar.gz", hash = "sha256:d1454bde93fb1e224166811694d600e746430c006fbb031ea06ecc2ea41bf156"}, +] + [[package]] name = "soupsieve" version = "2.5" @@ -569,4 +591,4 @@ multidict = ">=4.0" [metadata] lock-version = "2.0" python-versions = "^3.11" -content-hash = "c8c702814a8cfd97f393121a82216a92e7b5ab7fccc01e547ffc3c492610c988" +content-hash = "9e4078c4f5eeffbc90b895528738c457dadd671a784ab8c411a5c3fe91925e34" diff --git a/pyproject.toml b/pyproject.toml index b0259c7..c18ca63 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -10,6 +10,7 @@ python = "^3.11" beautifulsoup4 = "^4.12.2" aiohttp = "^3.8.6" lxml = "^4.9.3" +psycopg2 = "^2.9.9" [tool.poetry.group.dev.dependencies] types-beautifulsoup4 = "^4.12.0.7" diff --git a/sql/export.sql b/sql/export.sql deleted file mode 100644 index be0aec0..0000000 --- a/sql/export.sql +++ /dev/null @@ -1,53 +0,0 @@ -DO $$ - BEGIN - EXECUTE format( - 'CREATE TABLE coach_scraper.export_%s AS TABLE coach_scraper.export', - TRUNC(EXTRACT(EPOCH FROM CURRENT_TIMESTAMP), 0) - ); - END; -$$ LANGUAGE plpgsql; - --- This should match the order data is written in the app/__main__.py --- script. -CREATE TEMPORARY TABLE pg_temp.coach_scraper_export - ( site TEXT - , username TEXT - , name TEXT - , image_url TEXT - , rapid TEXT - , blitz TEXT - , bullet TEXT - ); - -SELECT format( - $$COPY pg_temp.coach_scraper_export FROM %L WITH (FORMAT CSV)$$, - :export -) \gexec - -INSERT INTO coach_scraper.export - ( site - , username - , name - , image_url - , rapid - , blitz - , bullet - ) -SELECT - site, - username, - name, - image_url, - rapid::INT, - blitz::INT, - bullet::INT -FROM - pg_temp.coach_scraper_export -ON CONFLICT - (site, username) -DO UPDATE SET - name = EXCLUDED.name, - image_url = EXCLUDED.image_url, - rapid = EXCLUDED.rapid, - blitz = EXCLUDED.blitz, - bullet = EXCLUDED.bullet;