Danh mụcThẻBài viết

admin

I'm a Full-stack developer

Thẻ

Linked List
Data Structure
Chat GPT
Design Pattern
Microservices
API
AWS CDK
ReactJS
AWS Lightsail
Flutter Mobile
Extract data using web scraping with Python
Ngày đăng: 14/03/2024

This article will show how to use Python to perform web scraping.


Content

  1. What is Web Scraping?
  2. Why is Python Good for Web Scraping?
  3. Let’s get started!


What is Web Scraping?

Web scraping is automatic to collect information from the World Wide Web through HTTP protocol or web browser. web scraping is a useful tool. If you’ve ever copied and pasted content from a website into an Excel spreadsheet, this is essentially what web scraping is, but on a very small scale.




Why is Python Good for Web Scraping?

Some benefits of using Python tools for web scraping:


  • Ease of use: Python has one of the simplest syntaxes that uses fewer lines of code to accomplish a particular task. This makes it easier to learn the basics and understand the language within a shorter period.
  • A large number of libraries: Python has a lot of libraries. Hence, it the flexible to choose the available libraries for your project.
  • Small code, large task: Python can do large tasks with a little code. Therefore, you will be able to save time to write code.
  • Community: You can seek help from one of the largest software development communities in the world.


Let’s get started!

Web Scraping Example: Www Website to extract the category, product, price, and image link.


  • Create a new directory with the name data-extraction
mkdir data-extraction


  • Then, move to the new folder.
cd data-extraction


  • Create a new file requirements.txt with content:
setuptools==68.2.2
pylint==3.0.1
argparse==1.4.0
beautifulsoup4==4.12.2
requests==2.31.0
python-dotenv==1.0.0
boto3==1.28.62


  • run-pylint.sh checking code convention
#!/bin/sh
command -v pylint >/dev/null 2>&1 || { echo >&2 "Running 'pylint' requires it to be installed."; exit 1; }
echo "Running pylint..."
find . -iname "*.py" -path "./src/*" | xargs pylint --rcfile .pylintrc


  • .pylintrc to override the default settings
[MASTER]
init-hook='import sys; sys.path.append("/data-extraction/src")'
disable=
    C0114, # missing-module-docstring
    C0115, # missing-class-docstring
    C0116, # missing-function-docstring

[FORMAT]
max-line-length=80


  • VERSION
0.1.0


  • setup.py
import os
import distutils.cmd
import subprocess
from setuptools.command.install import install
from setuptools import setup, find_packages

class PylintCommand(install):
    description = 'Check code convention'

    def run(self) -> None:
        install.run(self)
        path = get_current_path()
        os.system(f'sh {path}/run-pylint.sh')

def get_current_path() -> str:
    return os.getcwd().replace(" ", "\ ").replace("(","\(").replace(")","\)")

def read_file(file):
   with open(file) as f:
        return f.read()

def read_requirements(file):
    with open(file) as f:
        return f.read().splitlines()

version = read_file("VERSION")
requirements = read_requirements("requirements.txt")

setup(
    name='agapifa-data-extraction',
    version=version,
    description='Extract data to a file from html source',
    install_requires=requirements,
    classifiers=[
        "Programming Language :: Python :: 3",
    ],
    packages=find_packages(include=['src']),
    python_requires = ">=3.10",
    cmdclass={
        'lint': PylintCommand,
    },
)


  • Create empty file /src/__init__.py
  • Create a file /src/__main__.py
from argparse import ArgumentParser
from time import strftime

from file.csv_file import CsvFile
from utils.url import is_valid_url
from utils.aws_s3 import upload_to_s3
from core.extraction_data import ExtractionData
from config.config import config
from utils.file import remove

def validate_args(args) -> None:
    if not args.urls:
        raise SystemExit('Please specify an URL as data source')

    if not args.out:
        raise SystemExit('Please specify an path to export a file')

    for url in args.urls:
        if is_valid_url(url) is False:
            raise SystemExit('Data source should be a URL format')

def get_args():
    parser = ArgumentParser(description='Data extraction args')
    parser.add_argument(
        '-i',
        '--urls',
        help='URLs as data source',
        type=str,
        nargs='+',
        required=True
    )
    parser.add_argument('-o', '--out', help='Output path', required=True)
    parser.add_argument('-e', '--ext', default='csv', help='File extension')

    return parser.parse_args()

def generate_file_name_include_ext(ext: str = 'csv'):
    return f'{strftime("%Y%m%d")}.{ext}'

def main():
    # Step 1: Args
    args = get_args()
    validate_args(args)

    # Step 2: Process
    out_file_path: str = f'{args.out}/{generate_file_name_include_ext()}'
    process = ExtractionData(args.urls[0], out_file_path)
    data: str = process.execute()

    # Step 3: Write file
    file = CsvFile(
        file_name = out_file_path,
        headers = config.CSV_HEADER.split(','),
        data = data
    )
    file.create_file()

    # Step 4: Upload file to s3
    upload_to_s3(out_file_path, args.out, generate_file_name_include_ext())

    # Step 5: Done
    remove(out_file_path)
    SystemExit()

if __name__ == "__main__":
    main()


  • Create empty file /src/config/__init__.py
  • Create a file /src/config/__main__.py
import os
from typing import get_type_hints, Union
from dotenv import load_dotenv

load_dotenv()

class AppConfigError(Exception):
    pass

def _parse_bool(val: Union[str, bool]) -> bool:
    return val if isinstance(val, bool) else val.lower() in [
        'true', 
        'yes', 
        '1'
    ]

class AppConfig:
    DEBUG: bool = False
    ENV: str = 'production'
    AWS_REGION: str
    AWS_SECRET_ACCESS_KEY: str
    AWS_ACCESS_KEY_ID: str
    CSV_HEADER: str
    VALID_DOMAINS: str

    def __init__(self, env):
        for field in self.__annotations__:
            if not field.isupper():
                continue

            default_value = getattr(self, field, None)
            if default_value is None and env.get(field) is None:
                raise AppConfigError('The {field} field is required')

            try:
                var_type = get_type_hints(AppConfig)[field]
                if var_type == bool:
                    value = _parse_bool(env.get(field, default_value))
                else:
                    value = var_type(env.get(field, default_value))

                self.__setattr__(field, value)
            except ValueError as e:
                raise AppConfigError(
                    'Unable to cast value of \
                        "{env[field]}" to type \
                            "{var_type}" for "{field}" field'
                    ) from e

    def __repr__(self):
        return str(self.__dict__)

config = AppConfig(os.environ)


  • Create empty file /src/utils/__init__.py
  • Create a file /src/utils/url.py
from urllib.parse import urlparse

def is_valid_url(url: str) -> bool:
    parsed_url = urlparse(url)
    return bool(parsed_url.scheme and parsed_url.netloc)

def get_domain(url: str) -> str:
    parsed_url = urlparse(url)
    return f'{parsed_url.scheme}://{parsed_url.netloc}'


  • Create a file /src/utils/file.py
import os

def remove(file_path: str) -> None:
    os.unlink(file_path)


  • Create a file /src/utils/aws_s3.py
import os
from boto3 import client as boto3Client
from botocore import exceptions as botocoreExceptions
from config.config import config

def upload_to_s3(file_name: str, bucket: str, object_name=None) -> None:
    s3_client = boto3Client(
        's3',
        aws_access_key_id=config.AWS_ACCESS_KEY_ID,
        aws_secret_access_key=config.AWS_SECRET_ACCESS_KEY
    )

    try:
        if object_name is None:
            object_name = file_name

        s3_client.upload_file(file_name, bucket, object_name)
    except botocoreExceptions.ClientError:
        print('Upload file ({file_name}) to s3 is not success')


  • Create empty file /src/file/__init__.py
  • Create a file /src/file/file_base.py
class FileBase:
    def __init__(self, file_name: str = '') -> None:
        self.file_name = file_name

    def validate_data(self) -> None:
        pass

    def create_file(self) -> None:
        pass


  • Create a file /src/file/csv_file.py
import typing
import csv
import os

from file.file_base import FileBase

class CsvFile(FileBase):
    def __init__(self,
                file_name: str = '',
                headers: typing.List[str] = None,
        data: typing.List[typing.List[str]] = None
    ) -> None:
        FileBase.__init__(self,
                          file_name)
        self.headers = headers
        self.data = data

    def validate_data(self) -> None:
        is_valid = True

        is_valid = len(self.headers) == len(self.data)

        if is_valid is False:
            raise SystemExit('Header does not match with data')

    def create_file(self) -> None:
        path: str = os.path.dirname(self.file_name)
        isExist: bool = os.path.exists(path)

        if not isExist:
            os.makedirs(path)

        with open(self.file_name,
            'w',
            encoding="UTF8",
            newline='',
        ) as csv_file:
            csv_writer = csv.writer(csv_file)
            csv_writer.writerow(self.headers)
            csv_writer.writerows(self.data)

    def read_file(self) -> dict:
        with open(self.file_name, 'r', encoding = 'UTF8') as csvfile:
            csv_reader = csv.reader(csvfile)
            self.headers = next(csv_reader)
            for row in csv_reader:
                self.data .append(row)

        result = {}
        result['header'] = self.headers
        result['rows'] = self.data

        return result


  • Create empty file /src/core/__init__.py
  • Create a file /src/file/extraction_data.py
from typing import List
from requests import get
from bs4 import BeautifulSoup

from utils.url import get_domain
from config.config import config
from core.www import Www

class ExtractionData:
    def __init__(self,
                 url: List[str] = None,
                 file_name: str = ''
                ) -> None:
        self.url = url
        self.file_name = file_name
        self.data: List[List[str]] = []

    def validate_domain(self, url: str) -> bool:
        if config.VALID_DOMAINS.split(',').count(get_domain(url)) > 0:
            return True
        return False

    def _get_html_content(self, url: str) -> str:
        content = get(url, timeout=500).content
        return BeautifulSoup(content, features="html.parser")

    def execute(self) -> str:
        domain_allow = self.validate_domain(self.url)

        if domain_allow is False:
            print(f'{self.url} does not support to get data')
        else:
            html_content = self._get_html_content(self.url)
            www = Www(html_content)
            self.data = www.execute()

        return self.data


  • Create a file /src/file/www.py
from typing import List
from requests import get
from bs4 import BeautifulSoup

class Www:
    def  __init__(self, html_content: str = '') -> None:
        self.content = html_content
        self.result = []

    def execute(self) -> List[List[str]]:
        self._get_urls()
        return self.result

    def _get_product_attribute(self, html: str) -> {}:
        product_detail_html = html.find_all('div', { 'class': 'chitietsanpham' })
        image = f"https://www/{html.find('div', { 'class': 'hinhchitiet' }).find('a')['href']}"

        result = {
            'name':  html.find('h1', { "class": 'vcard fn' }).text,
            'price':  self._convert_price_to_number(product_detail_html[1].find('span').text),
            'description': html.find('div', { 'class': 'noidung' }),
            'image': image,
        }
        return result
    
    def _convert_price_to_number(self, price: str) -> float:
        return price.replace(' đ', '').replace('.', '')
    
    def _get_urls(self) -> List[str]:
        cats = self._get_cats()

        for index in range(len(cats)):
            cat_id = cats[index]['value']

            if cat_id != '':
              sub_cats = self._get_sub_cats(cat_id)


              for sub_cat_idx in range(len(sub_cats)):
                sub_cat_id = sub_cats[sub_cat_idx]['value']
                if sub_cat_id != '':
                    products = self._get_pagination(f'https://www/index.php?com=tim-kiem&id_list={cat_id}&id_cat={sub_cat_id}')
                    for product_idx in range(len(products)):
                        self.result.append([
                          products[product_idx]['name'],
                          products[product_idx]['price'],
                          products[product_idx]['description'],
                          products[product_idx]['image'],
                      ])

    def _get_cats(self) -> List[str]:
        return self.content.find(id="id_list").find_all('option')
    
    def _get_sub_cats(self, parent_cat_id: str):
        url: str = f'https://www/index.php?com=tim-kiem&id_list={parent_cat_id}'
        html_content = get(url, timeout=500).content
        html = BeautifulSoup(html_content, features="html.parser")
        return html.find(id="id_cat").find_all('option')
    
    def _get_pagination(self, url: str) -> List[dict]:
        html_content = get(url, timeout=500).content
        html = BeautifulSoup(html_content, features="html.parser")
        list_pagination = html.find("ul", { "class": "pagination" })
        result = []
        
        if list_pagination is not None:
            list_pagination = list_pagination.find_all('li')

            for pagination in range(len(list_pagination)):
                a_tag = list_pagination[pagination].find('a')
                current = list_pagination[pagination].find('a', { "class": "current" })

                if a_tag is None:
                    continue

                current_page_url = f'{url}&page={a_tag.string}'
                if current is None:
                    current_page_url = a_tag['href']

                html_content = get(current_page_url, timeout=500).content
                html = BeautifulSoup(html_content, features="html.parser")

                all_product = html.find_all('div', { 'class': "ten-product" })

                for index in range(len(all_product)):
                    product_detail_url = all_product[index].find('h3').find('a')['href']
                    product_detail_html_content = get(f'https://www/{product_detail_url}', timeout=500).content
                    product_detail_html = BeautifulSoup(product_detail_html_content, features="html.parser")
                    result.append(self._get_product_attribute(product_detail_html))

        return result


  • Add .env
DEBUG=True

# AWS
AWS_REGION=
AWS_SECRET_ACCESS_KEY=
AWS_ACCESS_KEY_ID=

# Core
VALID_DOMAINS=https://www
CSV_HEADER=name,price,description,image


Execute script

  • Install all the packages:
python3 setup.py install


  • Run lint
python3 setup.py lint


  • Run dev
python3 src -i <URLs source> -o <s3 path>


Result


Conclusion


In this post, we’ve looked at what data scraping is, how it’s used, and what the process involves.

Good luck to you, hope this post is of value to you!!!!

Đề xuất

JOI - API schema validation
admin12/06/2023

JOI - API schema validation
Data validation is one of topics that I am interesting. I always review my code after developed features or fixed bugs. There are many places where need to validate data, it is really terrible. Some cases, we need to validate data input because ensure the data into API, it will not make any problems to crash system.
TypeScript Design Pattern - Bridge
admin08/08/2023

TypeScript Design Pattern - Bridge
Decouple an abstraction from its implementation so that the two can vary independently.
Part 4: Creating Static Home Page on Ghost CMS
admin17/06/2023

Part 4: Creating Static Home Page on Ghost CMS
I believe that many of you are asking the question: How to fix the home page of Ghost CMS as desired? and are struggling to find many different sources of documentation.
Mới nhất

How to secure your API gateway
admin17/04/2024

How to secure your API gateway
In this blog, I will cover the 6 methods that technology leaders need to incorporate to secure and protect APIs.
TypeScript Design Pattern - Prototype
admin07/08/2023

TypeScript Design Pattern - Prototype
The prototype pattern is one of the Creational pattern groups. The responsibility is to create a new object through clone the existing object instead of using the new key. The new object is the same as the original object, and we can change its property does not impact the original object.
Part 1: Build a Chat App with ReactJS + Material UI
admin13/09/2023

Part 1: Build a Chat App with ReactJS + Material UI
In this article, I would like to introduce using ReactJS and material UI to build a Chat App.
Đinh Thành Công Blog

My website, where I write blogs on a variety of topics and where I have some experiments with new technologies.

hotlinelinkedinskypezalofacebook
DMCA.com Protection Status
Góp ý
Họ & Tên
Số điện thoại
Email
Nội dung
Tải ứng dụng
hotline

copyright © 2023 - AGAPIFA

Privacy
Term
About