This article will show how to use Python to perform web scraping.
Web scraping is automatic to collect information from the World Wide Web through HTTP protocol or web browser. web scraping is a useful tool. If you’ve ever copied and pasted content from a website into an Excel spreadsheet, this is essentially what web scraping is, but on a very small scale.
Some benefits of using Python tools for web scraping:
Web Scraping Example: Www Website to extract the category, product, price, and image link.
mkdir data-extraction
cd data-extraction
setuptools==68.2.2 pylint==3.0.1 argparse==1.4.0 beautifulsoup4==4.12.2 requests==2.31.0 python-dotenv==1.0.0 boto3==1.28.62
#!/bin/sh
command -v pylint >/dev/null 2>&1 || { echo >&2 "Running 'pylint' requires it to be installed."; exit 1; }
echo "Running pylint..."
find . -iname "*.py" -path "./src/*" | xargs pylint --rcfile .pylintrc
[MASTER]
init-hook='import sys; sys.path.append("/data-extraction/src")'
disable=
C0114, # missing-module-docstring
C0115, # missing-class-docstring
C0116, # missing-function-docstring
[FORMAT]
max-line-length=80
0.1.0
import os
import distutils.cmd
import subprocess
from setuptools.command.install import install
from setuptools import setup, find_packages
class PylintCommand(install):
description = 'Check code convention'
def run(self) -> None:
install.run(self)
path = get_current_path()
os.system(f'sh {path}/run-pylint.sh')
def get_current_path() -> str:
return os.getcwd().replace(" ", "\ ").replace("(","\(").replace(")","\)")
def read_file(file):
with open(file) as f:
return f.read()
def read_requirements(file):
with open(file) as f:
return f.read().splitlines()
version = read_file("VERSION")
requirements = read_requirements("requirements.txt")
setup(
name='agapifa-data-extraction',
version=version,
description='Extract data to a file from html source',
install_requires=requirements,
classifiers=[
"Programming Language :: Python :: 3",
],
packages=find_packages(include=['src']),
python_requires = ">=3.10",
cmdclass={
'lint': PylintCommand,
},
)
from argparse import ArgumentParser
from time import strftime
from file.csv_file import CsvFile
from utils.url import is_valid_url
from utils.aws_s3 import upload_to_s3
from core.extraction_data import ExtractionData
from config.config import config
from utils.file import remove
def validate_args(args) -> None:
if not args.urls:
raise SystemExit('Please specify an URL as data source')
if not args.out:
raise SystemExit('Please specify an path to export a file')
for url in args.urls:
if is_valid_url(url) is False:
raise SystemExit('Data source should be a URL format')
def get_args():
parser = ArgumentParser(description='Data extraction args')
parser.add_argument(
'-i',
'--urls',
help='URLs as data source',
type=str,
nargs='+',
required=True
)
parser.add_argument('-o', '--out', help='Output path', required=True)
parser.add_argument('-e', '--ext', default='csv', help='File extension')
return parser.parse_args()
def generate_file_name_include_ext(ext: str = 'csv'):
return f'{strftime("%Y%m%d")}.{ext}'
def main():
# Step 1: Args
args = get_args()
validate_args(args)
# Step 2: Process
out_file_path: str = f'{args.out}/{generate_file_name_include_ext()}'
process = ExtractionData(args.urls[0], out_file_path)
data: str = process.execute()
# Step 3: Write file
file = CsvFile(
file_name = out_file_path,
headers = config.CSV_HEADER.split(','),
data = data
)
file.create_file()
# Step 4: Upload file to s3
upload_to_s3(out_file_path, args.out, generate_file_name_include_ext())
# Step 5: Done
remove(out_file_path)
SystemExit()
if __name__ == "__main__":
main()
import os
from typing import get_type_hints, Union
from dotenv import load_dotenv
load_dotenv()
class AppConfigError(Exception):
pass
def _parse_bool(val: Union[str, bool]) -> bool:
return val if isinstance(val, bool) else val.lower() in [
'true',
'yes',
'1'
]
class AppConfig:
DEBUG: bool = False
ENV: str = 'production'
AWS_REGION: str
AWS_SECRET_ACCESS_KEY: str
AWS_ACCESS_KEY_ID: str
CSV_HEADER: str
VALID_DOMAINS: str
def __init__(self, env):
for field in self.__annotations__:
if not field.isupper():
continue
default_value = getattr(self, field, None)
if default_value is None and env.get(field) is None:
raise AppConfigError('The {field} field is required')
try:
var_type = get_type_hints(AppConfig)[field]
if var_type == bool:
value = _parse_bool(env.get(field, default_value))
else:
value = var_type(env.get(field, default_value))
self.__setattr__(field, value)
except ValueError as e:
raise AppConfigError(
'Unable to cast value of \
"{env[field]}" to type \
"{var_type}" for "{field}" field'
) from e
def __repr__(self):
return str(self.__dict__)
config = AppConfig(os.environ)
from urllib.parse import urlparse
def is_valid_url(url: str) -> bool:
parsed_url = urlparse(url)
return bool(parsed_url.scheme and parsed_url.netloc)
def get_domain(url: str) -> str:
parsed_url = urlparse(url)
return f'{parsed_url.scheme}://{parsed_url.netloc}'
import os
def remove(file_path: str) -> None:
os.unlink(file_path)
import os
from boto3 import client as boto3Client
from botocore import exceptions as botocoreExceptions
from config.config import config
def upload_to_s3(file_name: str, bucket: str, object_name=None) -> None:
s3_client = boto3Client(
's3',
aws_access_key_id=config.AWS_ACCESS_KEY_ID,
aws_secret_access_key=config.AWS_SECRET_ACCESS_KEY
)
try:
if object_name is None:
object_name = file_name
s3_client.upload_file(file_name, bucket, object_name)
except botocoreExceptions.ClientError:
print('Upload file ({file_name}) to s3 is not success')
class FileBase:
def __init__(self, file_name: str = '') -> None:
self.file_name = file_name
def validate_data(self) -> None:
pass
def create_file(self) -> None:
pass
import typing
import csv
import os
from file.file_base import FileBase
class CsvFile(FileBase):
def __init__(self,
file_name: str = '',
headers: typing.List[str] = None,
data: typing.List[typing.List[str]] = None
) -> None:
FileBase.__init__(self,
file_name)
self.headers = headers
self.data = data
def validate_data(self) -> None:
is_valid = True
is_valid = len(self.headers) == len(self.data)
if is_valid is False:
raise SystemExit('Header does not match with data')
def create_file(self) -> None:
path: str = os.path.dirname(self.file_name)
isExist: bool = os.path.exists(path)
if not isExist:
os.makedirs(path)
with open(self.file_name,
'w',
encoding="UTF8",
newline='',
) as csv_file:
csv_writer = csv.writer(csv_file)
csv_writer.writerow(self.headers)
csv_writer.writerows(self.data)
def read_file(self) -> dict:
with open(self.file_name, 'r', encoding = 'UTF8') as csvfile:
csv_reader = csv.reader(csvfile)
self.headers = next(csv_reader)
for row in csv_reader:
self.data .append(row)
result = {}
result['header'] = self.headers
result['rows'] = self.data
return result
from typing import List
from requests import get
from bs4 import BeautifulSoup
from utils.url import get_domain
from config.config import config
from core.www import Www
class ExtractionData:
def __init__(self,
url: List[str] = None,
file_name: str = ''
) -> None:
self.url = url
self.file_name = file_name
self.data: List[List[str]] = []
def validate_domain(self, url: str) -> bool:
if config.VALID_DOMAINS.split(',').count(get_domain(url)) > 0:
return True
return False
def _get_html_content(self, url: str) -> str:
content = get(url, timeout=500).content
return BeautifulSoup(content, features="html.parser")
def execute(self) -> str:
domain_allow = self.validate_domain(self.url)
if domain_allow is False:
print(f'{self.url} does not support to get data')
else:
html_content = self._get_html_content(self.url)
www = Www(html_content)
self.data = www.execute()
return self.data
from typing import List
from requests import get
from bs4 import BeautifulSoup
class Www:
def __init__(self, html_content: str = '') -> None:
self.content = html_content
self.result = []
def execute(self) -> List[List[str]]:
self._get_urls()
return self.result
def _get_product_attribute(self, html: str) -> {}:
product_detail_html = html.find_all('div', { 'class': 'chitietsanpham' })
image = f"https://www/{html.find('div', { 'class': 'hinhchitiet' }).find('a')['href']}"
result = {
'name': html.find('h1', { "class": 'vcard fn' }).text,
'price': self._convert_price_to_number(product_detail_html[1].find('span').text),
'description': html.find('div', { 'class': 'noidung' }),
'image': image,
}
return result
def _convert_price_to_number(self, price: str) -> float:
return price.replace(' đ', '').replace('.', '')
def _get_urls(self) -> List[str]:
cats = self._get_cats()
for index in range(len(cats)):
cat_id = cats[index]['value']
if cat_id != '':
sub_cats = self._get_sub_cats(cat_id)
for sub_cat_idx in range(len(sub_cats)):
sub_cat_id = sub_cats[sub_cat_idx]['value']
if sub_cat_id != '':
products = self._get_pagination(f'https://www/index.php?com=tim-kiem&id_list={cat_id}&id_cat={sub_cat_id}')
for product_idx in range(len(products)):
self.result.append([
products[product_idx]['name'],
products[product_idx]['price'],
products[product_idx]['description'],
products[product_idx]['image'],
])
def _get_cats(self) -> List[str]:
return self.content.find(id="id_list").find_all('option')
def _get_sub_cats(self, parent_cat_id: str):
url: str = f'https://www/index.php?com=tim-kiem&id_list={parent_cat_id}'
html_content = get(url, timeout=500).content
html = BeautifulSoup(html_content, features="html.parser")
return html.find(id="id_cat").find_all('option')
def _get_pagination(self, url: str) -> List[dict]:
html_content = get(url, timeout=500).content
html = BeautifulSoup(html_content, features="html.parser")
list_pagination = html.find("ul", { "class": "pagination" })
result = []
if list_pagination is not None:
list_pagination = list_pagination.find_all('li')
for pagination in range(len(list_pagination)):
a_tag = list_pagination[pagination].find('a')
current = list_pagination[pagination].find('a', { "class": "current" })
if a_tag is None:
continue
current_page_url = f'{url}&page={a_tag.string}'
if current is None:
current_page_url = a_tag['href']
html_content = get(current_page_url, timeout=500).content
html = BeautifulSoup(html_content, features="html.parser")
all_product = html.find_all('div', { 'class': "ten-product" })
for index in range(len(all_product)):
product_detail_url = all_product[index].find('h3').find('a')['href']
product_detail_html_content = get(f'https://www/{product_detail_url}', timeout=500).content
product_detail_html = BeautifulSoup(product_detail_html_content, features="html.parser")
result.append(self._get_product_attribute(product_detail_html))
return result
DEBUG=True # AWS AWS_REGION= AWS_SECRET_ACCESS_KEY= AWS_ACCESS_KEY_ID= # Core VALID_DOMAINS=https://www CSV_HEADER=name,price,description,image
python3 setup.py install
python3 setup.py lint
python3 src -i <URLs source> -o <s3 path>
In this post, we’ve looked at what data scraping is, how it’s used, and what the process involves.
Good luck to you, hope this post is of value to you!!!!