CategoryTagArticle

admin

I'm a Full-stack developer

Tag

Linked List
Data Structure
Chat GPT
Design Pattern
Microservices
API
AWS CDK
ReactJS
AWS Lightsail
Flutter Mobile
Writing a Data Transformation Pipeline Using Go
Published date: 20/03/2024

In this article, I will show how to implement data processing using the Go programming language with a simple tutorial.


Prerequisites

You should read/complete the EXTRACT DATA USING WEB SCRAPING WITH PYTHON article before implementing this article because this article is the next step of the previous article. 


Content

  1. What is the data processing?
  2. Why is Go programming language for data processing?
  3. Let’s get started!


What is the data processing?

  • Data transformation is the process of converting data from one format, such as a CSV file, Excel spreadsheet, etc..., into another.


  • Transformations typically involve converting a raw data source into a cleansed, validated, and ready-to-use format. Data transformation is crucial to data management processes that include data integration, data migration, data warehousing, and data preparation.


Why is Go programming language for data processing?

Document ref: Supercharging Data Processing with Go: A Comprehensive Guide


  • Performance: Go is known for its exceptional performance, thanks to its compiled nature and efficient garbage collection. This makes it well-suited for handling large volumes of data quickly and efficiently. It also benefits from low-level control, which allows developers to optimize data processing algorithms for their specific needs.


  • Concurrency: Concurrency is a first-class citizen in Go. Goroutines and channels make it easy to write concurrent and parallel programs, making Go an ideal choice for tasks that require handling multiple data streams or parallel processing.


  • Simplicity: Go's simplicity in syntax and language design makes it accessible to both experienced developers and those new to the language. This simplicity extends to data processing, allowing developers to focus on the task at hand rather than wrestling with complex language features.


Let’s get started!

  • Create a new directory with the name data-transformation
mkdir data-transformation


  • Then, move to the new folder.
cd data-transformation


  • Create a go.mod file with the go mod command
go mod init data-transformation


  • Then, go.mod file is created with content
module data-transformation

go 1.21.4


  • Next, create a main.go file:
package main

import (
  "agapifa-data-transformation/config"
  transformation_data_core "agapifa-data-transformation/core"
  aws_s3_utils "agapifa-data-transformation/utils"
  mysql_utils "agapifa-data-transformation/utils"
  "os"

  _ "github.com/go-sql-driver/mysql"
)

func main() {
  bucketName := os.Args[1]
  fileKey := os.Args[2]

  config.LoadConfig(".")

  db := mysql_utils.ConnectDB()

  aws_s3_utils.DownloadFromS3Bucket(bucketName, fileKey)

  transformation_data_core.Exec(bucketName+"/"+fileKey, db)
}


  • Create a config/config.go file:
package config

import (
  "github.com/rs/zerolog/log"

  "github.com/spf13/viper"
)

type Config struct {
  Environment           string `mapstructure:"ENVIRONMENT"`
  AWS_REGION            string `mapstructure:"AWS_REGION"`
  AWS_SECRET_ACCESS_KEY string `mapstructure:"AWS_SECRET_ACCESS_KEY"`
  AWS_ACCESS_KEY_ID     string `mapstructure:"AWS_ACCESS_KEY_ID"`
  MYSQL_HOST            string `mapstructure:"MYSQL_HOST"`
  MYSQL_PORT            string `mapstructure:"MYSQL_PORT"`
  MYSQL_DATABASE        string `mapstructure:"MYSQL_DATABASE"`
  MYSQL_USERNAME        string `mapstructure:"MYSQL_USERNAME"`
  MYSQL_PASSWORD        string `mapstructure:"MYSQL_PASSWORD"`
}

func LoadConfig(path string) (config Config, err error) {
  viper.AddConfigPath(path)
  viper.SetConfigName("app")
  viper.SetConfigType("env")

  viper.AutomaticEnv()

  err = viper.ReadInConfig()
  if err != nil {
    log.Fatal().Err(err).Msg("cannot load config")
  }

  err = viper.Unmarshal(&config)
  return
}


  • Create utils:
  • mysql.go file
package utils

import (
  "agapifa-data-transformation/config"
  "database/sql"
  "fmt"

  _ "github.com/go-sql-driver/mysql"
  "github.com/rs/zerolog/log"
)

func ConnectDB() *sql.DB {
  config, _ := config.LoadConfig(".")
  db, err := sql.Open("mysql", fmt.Sprintf("%s:%s@tcp(%s:%s)/%s", config.MYSQL_USERNAME, config.MYSQL_PASSWORD, config.MYSQL_HOST, config.MYSQL_PORT, config.MYSQL_DATABASE))
  if err != nil {
    log.Fatal().Err(err).Msg("Error when opening DB")
  }

  return db
}


  • aws_s3.go file
package utils

import (
  "agapifa-data-transformation/config"
  "fmt"
  "os"

  "github.com/rs/zerolog/log"

  "github.com/aws/aws-sdk-go/aws"
  "github.com/aws/aws-sdk-go/aws/session"
  "github.com/aws/aws-sdk-go/service/s3"
  "github.com/aws/aws-sdk-go/service/s3/s3manager"
)

func DownloadFromS3Bucket(bucket string, filePath string) {
  config, _ := config.LoadConfig(".")

  if _, err := os.Stat(bucket); os.IsNotExist(err) {
    os.Mkdir(bucket, os.ModePerm)
  }

  file, err := os.Create(bucket + "/" + filePath)
  if err != nil {
    log.Fatal().Err(err).Msg("Error when opening DB")
  }
  defer file.Close()

  sess, _ := session.NewSession(&aws.Config{Region: aws.String(config.AWS_REGION)})
  downloader := s3manager.NewDownloader(sess)
  numBytes, err := downloader.Download(file,
    &s3.GetObjectInput{
      Bucket: aws.String(bucket),
      Key:    aws.String(filePath),
    })
  if err != nil {
    fmt.Println(err)
  }

  fmt.Println("Downloaded", file.Name(), numBytes, "bytes")
}


  • Next, create a core/transformation_data.go file
package transformation_data_core

import (
  "database/sql"
  "encoding/csv"
  "os"
  "strconv"
  "github.com/rs/zerolog/log"
)

type CsvRecord struct {
  Name   string
  Price int
  Description   string
  Image      string
}

func openFile(filePath string) *os.File {
  f, err := os.Open(filePath)
  if err != nil {
    log.Fatal().Err(err).Msg("Opening csv file has an error")
  }


  return f
}

func readFile(file *os.File) [][]string {
  csvReader := csv.NewReader(file)
  csvReader.Comma = ','
  data, err := csvReader.ReadAll()

  if err != nil {
    log.Fatal().Err(err).Msg("Reading csv file has an error")
  }

  return data
}

func getProductList(data [][]string) []CsvRecord {
  var productList []CsvRecord
  for i, line := range data {
    if i > 0 {
      var rec CsvRecord
      for j, field := range line {
        if j == 0 {
          rec.Name = field
        }

        if j == 1 {
          i, err := strconv.Atoi(field)
          if err != nil {
            rec.Price = 0
          } else {
            rec.Price = i
          }
        }

        if j == 2 {
          rec.Description = field
        }

        if j == 3 {
          rec.Image = field
        }
      }
      productList = append(productList, rec)
    }
  }

  return productList
}


func upsertProducts(db *sql.DB, productList []CsvRecord) {
  for _, p := range productList {
    var productId sql.NullString

    db.QueryRow("SELECT id FROM product WHERE name = ?", p.Name).Scan(&productId)

    if productId.String == "" {
      productQuery, err := db.Query(`
        INSERT IGNORE INTO product(name, description, price) 
        VALUES(?, ?, ?)
        `,
        p.Name,
        p.Description,
        p.Price,
      )

      if err != nil {
        log.Fatal().Err(err).Msg("Insert product has an error")
      }

      productQuery.Close()

      db.QueryRow("SELECT id FROM product WHERE name = ?", p.Name).Scan(&productId)
    }
  }
}

func Exec(file_path string, db *sql.DB) {
  file := openFile(file_path)
  data := readFile(file)

  productList := getProductList(data)

  upsertProducts(db, productList)

  defer file.Close()
  defer db.Close()
}


  • Add .env
ENVIRONMENT=development

# AWS
AWS_REGION=
AWS_SECRET_ACCESS_KEY=
AWS_ACCESS_KEY_ID=

# MySQL
MYSQL_HOST=
MYSQL_PORT=
MYSQL_DATABASE=
MYSQL_USERNAME=
MYSQL_PASSWORD=


Execute script

go run main.go


Conclusion


In this post, we’ve looked at what data scraping is, how it’s used, and what the process involves.

Good luck to you, hope this post is of value to you!!!!

Recommend

Design Patterns
admin07/08/2023

Design Patterns
The design pattern does not be a specific programming language. Almost programming languages might apply design patterns that to resolve a problem repeat.
What are data structures?
admin06/04/2024

What are data structures?
In this article, I would like to introduce the Data Structures for a beginner's guide
Create Cognito User Pool with AWS CDK
admin09/06/2023

Create Cognito User Pool with AWS CDK
In the previous post, I showed you how to create a simple S3 bucket. Next, in this article, I will guide you to create a Cognito User Pool.
Newest

How to secure your API gateway
admin17/04/2024

How to secure your API gateway
In this blog, I will cover the 6 methods that technology leaders need to incorporate to secure and protect APIs.
Part 4: How to use Redux Toolkit in React
admin18/06/2023

Part 4: How to use Redux Toolkit in React
In this article, I will explain Redux and delve into Redux Toolkit. a collection of tools that simplify using Redux. These tools help make Redux less daunting and easier to use.
Create Cognito User Pool with AWS CDK
admin09/06/2023

Create Cognito User Pool with AWS CDK
In the previous post, I showed you how to create a simple S3 bucket. Next, in this article, I will guide you to create a Cognito User Pool.
Đinh Thành Công Blog

My website, where I write blogs on a variety of topics and where I have some experiments with new technologies.

hotlinelinkedinskypezalofacebook
DMCA.com Protection Status
Feedback
Name
Phone number
Email
Content
Download app
hotline

copyright © 2023 - AGAPIFA

Privacy
Term
About