Danh mụcThẻBài viết

admin

I'm a Full-stack developer

Thẻ

Linked List
Data Structure
Chat GPT
Design Pattern
Microservices
API
AWS CDK
ReactJS
AWS Lightsail
Flutter Mobile
Writing a Data Transformation Pipeline Using Go
Ngày đăng: 20/03/2024

In this article, I will show how to implement data processing using the Go programming language with a simple tutorial.


Prerequisites

You should read/complete the EXTRACT DATA USING WEB SCRAPING WITH PYTHON article before implementing this article because this article is the next step of the previous article. 


Content

  1. What is the data processing?
  2. Why is Go programming language for data processing?
  3. Let’s get started!


What is the data processing?

  • Data transformation is the process of converting data from one format, such as a CSV file, Excel spreadsheet, etc..., into another.


  • Transformations typically involve converting a raw data source into a cleansed, validated, and ready-to-use format. Data transformation is crucial to data management processes that include data integration, data migration, data warehousing, and data preparation.


Why is Go programming language for data processing?

Document ref: Supercharging Data Processing with Go: A Comprehensive Guide


  • Performance: Go is known for its exceptional performance, thanks to its compiled nature and efficient garbage collection. This makes it well-suited for handling large volumes of data quickly and efficiently. It also benefits from low-level control, which allows developers to optimize data processing algorithms for their specific needs.


  • Concurrency: Concurrency is a first-class citizen in Go. Goroutines and channels make it easy to write concurrent and parallel programs, making Go an ideal choice for tasks that require handling multiple data streams or parallel processing.


  • Simplicity: Go's simplicity in syntax and language design makes it accessible to both experienced developers and those new to the language. This simplicity extends to data processing, allowing developers to focus on the task at hand rather than wrestling with complex language features.


Let’s get started!

  • Create a new directory with the name data-transformation
mkdir data-transformation


  • Then, move to the new folder.
cd data-transformation


  • Create a go.mod file with the go mod command
go mod init data-transformation


  • Then, go.mod file is created with content
module data-transformation

go 1.21.4


  • Next, create a main.go file:
package main

import (
  "agapifa-data-transformation/config"
  transformation_data_core "agapifa-data-transformation/core"
  aws_s3_utils "agapifa-data-transformation/utils"
  mysql_utils "agapifa-data-transformation/utils"
  "os"

  _ "github.com/go-sql-driver/mysql"
)

func main() {
  bucketName := os.Args[1]
  fileKey := os.Args[2]

  config.LoadConfig(".")

  db := mysql_utils.ConnectDB()

  aws_s3_utils.DownloadFromS3Bucket(bucketName, fileKey)

  transformation_data_core.Exec(bucketName+"/"+fileKey, db)
}


  • Create a config/config.go file:
package config

import (
  "github.com/rs/zerolog/log"

  "github.com/spf13/viper"
)

type Config struct {
  Environment           string `mapstructure:"ENVIRONMENT"`
  AWS_REGION            string `mapstructure:"AWS_REGION"`
  AWS_SECRET_ACCESS_KEY string `mapstructure:"AWS_SECRET_ACCESS_KEY"`
  AWS_ACCESS_KEY_ID     string `mapstructure:"AWS_ACCESS_KEY_ID"`
  MYSQL_HOST            string `mapstructure:"MYSQL_HOST"`
  MYSQL_PORT            string `mapstructure:"MYSQL_PORT"`
  MYSQL_DATABASE        string `mapstructure:"MYSQL_DATABASE"`
  MYSQL_USERNAME        string `mapstructure:"MYSQL_USERNAME"`
  MYSQL_PASSWORD        string `mapstructure:"MYSQL_PASSWORD"`
}

func LoadConfig(path string) (config Config, err error) {
  viper.AddConfigPath(path)
  viper.SetConfigName("app")
  viper.SetConfigType("env")

  viper.AutomaticEnv()

  err = viper.ReadInConfig()
  if err != nil {
    log.Fatal().Err(err).Msg("cannot load config")
  }

  err = viper.Unmarshal(&config)
  return
}


  • Create utils:
  • mysql.go file
package utils

import (
  "agapifa-data-transformation/config"
  "database/sql"
  "fmt"

  _ "github.com/go-sql-driver/mysql"
  "github.com/rs/zerolog/log"
)

func ConnectDB() *sql.DB {
  config, _ := config.LoadConfig(".")
  db, err := sql.Open("mysql", fmt.Sprintf("%s:%s@tcp(%s:%s)/%s", config.MYSQL_USERNAME, config.MYSQL_PASSWORD, config.MYSQL_HOST, config.MYSQL_PORT, config.MYSQL_DATABASE))
  if err != nil {
    log.Fatal().Err(err).Msg("Error when opening DB")
  }

  return db
}


  • aws_s3.go file
package utils

import (
  "agapifa-data-transformation/config"
  "fmt"
  "os"

  "github.com/rs/zerolog/log"

  "github.com/aws/aws-sdk-go/aws"
  "github.com/aws/aws-sdk-go/aws/session"
  "github.com/aws/aws-sdk-go/service/s3"
  "github.com/aws/aws-sdk-go/service/s3/s3manager"
)

func DownloadFromS3Bucket(bucket string, filePath string) {
  config, _ := config.LoadConfig(".")

  if _, err := os.Stat(bucket); os.IsNotExist(err) {
    os.Mkdir(bucket, os.ModePerm)
  }

  file, err := os.Create(bucket + "/" + filePath)
  if err != nil {
    log.Fatal().Err(err).Msg("Error when opening DB")
  }
  defer file.Close()

  sess, _ := session.NewSession(&aws.Config{Region: aws.String(config.AWS_REGION)})
  downloader := s3manager.NewDownloader(sess)
  numBytes, err := downloader.Download(file,
    &s3.GetObjectInput{
      Bucket: aws.String(bucket),
      Key:    aws.String(filePath),
    })
  if err != nil {
    fmt.Println(err)
  }

  fmt.Println("Downloaded", file.Name(), numBytes, "bytes")
}


  • Next, create a core/transformation_data.go file
package transformation_data_core

import (
  "database/sql"
  "encoding/csv"
  "os"
  "strconv"
  "github.com/rs/zerolog/log"
)

type CsvRecord struct {
  Name   string
  Price int
  Description   string
  Image      string
}

func openFile(filePath string) *os.File {
  f, err := os.Open(filePath)
  if err != nil {
    log.Fatal().Err(err).Msg("Opening csv file has an error")
  }


  return f
}

func readFile(file *os.File) [][]string {
  csvReader := csv.NewReader(file)
  csvReader.Comma = ','
  data, err := csvReader.ReadAll()

  if err != nil {
    log.Fatal().Err(err).Msg("Reading csv file has an error")
  }

  return data
}

func getProductList(data [][]string) []CsvRecord {
  var productList []CsvRecord
  for i, line := range data {
    if i > 0 {
      var rec CsvRecord
      for j, field := range line {
        if j == 0 {
          rec.Name = field
        }

        if j == 1 {
          i, err := strconv.Atoi(field)
          if err != nil {
            rec.Price = 0
          } else {
            rec.Price = i
          }
        }

        if j == 2 {
          rec.Description = field
        }

        if j == 3 {
          rec.Image = field
        }
      }
      productList = append(productList, rec)
    }
  }

  return productList
}


func upsertProducts(db *sql.DB, productList []CsvRecord) {
  for _, p := range productList {
    var productId sql.NullString

    db.QueryRow("SELECT id FROM product WHERE name = ?", p.Name).Scan(&productId)

    if productId.String == "" {
      productQuery, err := db.Query(`
        INSERT IGNORE INTO product(name, description, price) 
        VALUES(?, ?, ?)
        `,
        p.Name,
        p.Description,
        p.Price,
      )

      if err != nil {
        log.Fatal().Err(err).Msg("Insert product has an error")
      }

      productQuery.Close()

      db.QueryRow("SELECT id FROM product WHERE name = ?", p.Name).Scan(&productId)
    }
  }
}

func Exec(file_path string, db *sql.DB) {
  file := openFile(file_path)
  data := readFile(file)

  productList := getProductList(data)

  upsertProducts(db, productList)

  defer file.Close()
  defer db.Close()
}


  • Add .env
ENVIRONMENT=development

# AWS
AWS_REGION=
AWS_SECRET_ACCESS_KEY=
AWS_ACCESS_KEY_ID=

# MySQL
MYSQL_HOST=
MYSQL_PORT=
MYSQL_DATABASE=
MYSQL_USERNAME=
MYSQL_PASSWORD=


Execute script

go run main.go


Conclusion


In this post, we’ve looked at what data scraping is, how it’s used, and what the process involves.

Good luck to you, hope this post is of value to you!!!!

Đề xuất

JOI - API schema validation
admin12/06/2023

JOI - API schema validation
Data validation is one of topics that I am interesting. I always review my code after developed features or fixed bugs. There are many places where need to validate data, it is really terrible. Some cases, we need to validate data input because ensure the data into API, it will not make any problems to crash system.
How to integrate ChatGPT-3.5 Turbo into Node.js
admin10/01/2024

How to integrate ChatGPT-3.5 Turbo into Node.js
Step-by-Step Guide to Incorporating ChatGPT-3.5 Turbo into Node.js for Basic ReactJS Applications
How to create scroll animations with Next.js App
admin08/04/2024

How to create scroll animations with Next.js App
A Beginner's Guide to Using AOS Library with Next.js application
Mới nhất

TypeScript Design Pattern - Prototype
admin07/08/2023

TypeScript Design Pattern - Prototype
The prototype pattern is one of the Creational pattern groups. The responsibility is to create a new object through clone the existing object instead of using the new key. The new object is the same as the original object, and we can change its property does not impact the original object.
Part 2: Setup Custom Domain Zone + SSL for Ghost on AWS Lightsail
admin17/06/2023

Part 2: Setup Custom Domain Zone + SSL for Ghost on AWS Lightsail
In this section, I will continue to show you how to point Ghost Instance Static IP to your domain.
How to integrate ChatGPT-3.5 Turbo into Node.js
admin10/01/2024

How to integrate ChatGPT-3.5 Turbo into Node.js
Step-by-Step Guide to Incorporating ChatGPT-3.5 Turbo into Node.js for Basic ReactJS Applications
Đinh Thành Công Blog

My website, where I write blogs on a variety of topics and where I have some experiments with new technologies.

hotlinelinkedinskypezalofacebook
DMCA.com Protection Status
Góp ý
Họ & Tên
Số điện thoại
Email
Nội dung
Tải ứng dụng
hotline

copyright © 2023 - AGAPIFA

Privacy
Term
About