CategoryTagArticle

admin

I'm a Full-stack developer

Tag

Linked List
Data Structure
Chat GPT
Design Pattern
Microservices
API
AWS CDK
ReactJS
AWS Lightsail
Flutter Mobile
Writing a Data Transformation Pipeline Using Go
Published date: 20/03/2024

In this article, I will show how to implement data processing using the Go programming language with a simple tutorial.


Prerequisites

You should read/complete the EXTRACT DATA USING WEB SCRAPING WITH PYTHON article before implementing this article because this article is the next step of the previous article. 


Content

  1. What is the data processing?
  2. Why is Go programming language for data processing?
  3. Let’s get started!


What is the data processing?

  • Data transformation is the process of converting data from one format, such as a CSV file, Excel spreadsheet, etc..., into another.


  • Transformations typically involve converting a raw data source into a cleansed, validated, and ready-to-use format. Data transformation is crucial to data management processes that include data integration, data migration, data warehousing, and data preparation.


Why is Go programming language for data processing?

Document ref: Supercharging Data Processing with Go: A Comprehensive Guide


  • Performance: Go is known for its exceptional performance, thanks to its compiled nature and efficient garbage collection. This makes it well-suited for handling large volumes of data quickly and efficiently. It also benefits from low-level control, which allows developers to optimize data processing algorithms for their specific needs.


  • Concurrency: Concurrency is a first-class citizen in Go. Goroutines and channels make it easy to write concurrent and parallel programs, making Go an ideal choice for tasks that require handling multiple data streams or parallel processing.


  • Simplicity: Go's simplicity in syntax and language design makes it accessible to both experienced developers and those new to the language. This simplicity extends to data processing, allowing developers to focus on the task at hand rather than wrestling with complex language features.


Let’s get started!

  • Create a new directory with the name data-transformation
mkdir data-transformation


  • Then, move to the new folder.
cd data-transformation


  • Create a go.mod file with the go mod command
go mod init data-transformation


  • Then, go.mod file is created with content
module data-transformation

go 1.21.4


  • Next, create a main.go file:
package main

import (
  "agapifa-data-transformation/config"
  transformation_data_core "agapifa-data-transformation/core"
  aws_s3_utils "agapifa-data-transformation/utils"
  mysql_utils "agapifa-data-transformation/utils"
  "os"

  _ "github.com/go-sql-driver/mysql"
)

func main() {
  bucketName := os.Args[1]
  fileKey := os.Args[2]

  config.LoadConfig(".")

  db := mysql_utils.ConnectDB()

  aws_s3_utils.DownloadFromS3Bucket(bucketName, fileKey)

  transformation_data_core.Exec(bucketName+"/"+fileKey, db)
}


  • Create a config/config.go file:
package config

import (
  "github.com/rs/zerolog/log"

  "github.com/spf13/viper"
)

type Config struct {
  Environment           string `mapstructure:"ENVIRONMENT"`
  AWS_REGION            string `mapstructure:"AWS_REGION"`
  AWS_SECRET_ACCESS_KEY string `mapstructure:"AWS_SECRET_ACCESS_KEY"`
  AWS_ACCESS_KEY_ID     string `mapstructure:"AWS_ACCESS_KEY_ID"`
  MYSQL_HOST            string `mapstructure:"MYSQL_HOST"`
  MYSQL_PORT            string `mapstructure:"MYSQL_PORT"`
  MYSQL_DATABASE        string `mapstructure:"MYSQL_DATABASE"`
  MYSQL_USERNAME        string `mapstructure:"MYSQL_USERNAME"`
  MYSQL_PASSWORD        string `mapstructure:"MYSQL_PASSWORD"`
}

func LoadConfig(path string) (config Config, err error) {
  viper.AddConfigPath(path)
  viper.SetConfigName("app")
  viper.SetConfigType("env")

  viper.AutomaticEnv()

  err = viper.ReadInConfig()
  if err != nil {
    log.Fatal().Err(err).Msg("cannot load config")
  }

  err = viper.Unmarshal(&config)
  return
}


  • Create utils:
  • mysql.go file
package utils

import (
  "agapifa-data-transformation/config"
  "database/sql"
  "fmt"

  _ "github.com/go-sql-driver/mysql"
  "github.com/rs/zerolog/log"
)

func ConnectDB() *sql.DB {
  config, _ := config.LoadConfig(".")
  db, err := sql.Open("mysql", fmt.Sprintf("%s:%s@tcp(%s:%s)/%s", config.MYSQL_USERNAME, config.MYSQL_PASSWORD, config.MYSQL_HOST, config.MYSQL_PORT, config.MYSQL_DATABASE))
  if err != nil {
    log.Fatal().Err(err).Msg("Error when opening DB")
  }

  return db
}


  • aws_s3.go file
package utils

import (
  "agapifa-data-transformation/config"
  "fmt"
  "os"

  "github.com/rs/zerolog/log"

  "github.com/aws/aws-sdk-go/aws"
  "github.com/aws/aws-sdk-go/aws/session"
  "github.com/aws/aws-sdk-go/service/s3"
  "github.com/aws/aws-sdk-go/service/s3/s3manager"
)

func DownloadFromS3Bucket(bucket string, filePath string) {
  config, _ := config.LoadConfig(".")

  if _, err := os.Stat(bucket); os.IsNotExist(err) {
    os.Mkdir(bucket, os.ModePerm)
  }

  file, err := os.Create(bucket + "/" + filePath)
  if err != nil {
    log.Fatal().Err(err).Msg("Error when opening DB")
  }
  defer file.Close()

  sess, _ := session.NewSession(&aws.Config{Region: aws.String(config.AWS_REGION)})
  downloader := s3manager.NewDownloader(sess)
  numBytes, err := downloader.Download(file,
    &s3.GetObjectInput{
      Bucket: aws.String(bucket),
      Key:    aws.String(filePath),
    })
  if err != nil {
    fmt.Println(err)
  }

  fmt.Println("Downloaded", file.Name(), numBytes, "bytes")
}


  • Next, create a core/transformation_data.go file
package transformation_data_core

import (
  "database/sql"
  "encoding/csv"
  "os"
  "strconv"
  "github.com/rs/zerolog/log"
)

type CsvRecord struct {
  Name   string
  Price int
  Description   string
  Image      string
}

func openFile(filePath string) *os.File {
  f, err := os.Open(filePath)
  if err != nil {
    log.Fatal().Err(err).Msg("Opening csv file has an error")
  }


  return f
}

func readFile(file *os.File) [][]string {
  csvReader := csv.NewReader(file)
  csvReader.Comma = ','
  data, err := csvReader.ReadAll()

  if err != nil {
    log.Fatal().Err(err).Msg("Reading csv file has an error")
  }

  return data
}

func getProductList(data [][]string) []CsvRecord {
  var productList []CsvRecord
  for i, line := range data {
    if i > 0 {
      var rec CsvRecord
      for j, field := range line {
        if j == 0 {
          rec.Name = field
        }

        if j == 1 {
          i, err := strconv.Atoi(field)
          if err != nil {
            rec.Price = 0
          } else {
            rec.Price = i
          }
        }

        if j == 2 {
          rec.Description = field
        }

        if j == 3 {
          rec.Image = field
        }
      }
      productList = append(productList, rec)
    }
  }

  return productList
}


func upsertProducts(db *sql.DB, productList []CsvRecord) {
  for _, p := range productList {
    var productId sql.NullString

    db.QueryRow("SELECT id FROM product WHERE name = ?", p.Name).Scan(&productId)

    if productId.String == "" {
      productQuery, err := db.Query(`
        INSERT IGNORE INTO product(name, description, price) 
        VALUES(?, ?, ?)
        `,
        p.Name,
        p.Description,
        p.Price,
      )

      if err != nil {
        log.Fatal().Err(err).Msg("Insert product has an error")
      }

      productQuery.Close()

      db.QueryRow("SELECT id FROM product WHERE name = ?", p.Name).Scan(&productId)
    }
  }
}

func Exec(file_path string, db *sql.DB) {
  file := openFile(file_path)
  data := readFile(file)

  productList := getProductList(data)

  upsertProducts(db, productList)

  defer file.Close()
  defer db.Close()
}


  • Add .env
ENVIRONMENT=development

# AWS
AWS_REGION=
AWS_SECRET_ACCESS_KEY=
AWS_ACCESS_KEY_ID=

# MySQL
MYSQL_HOST=
MYSQL_PORT=
MYSQL_DATABASE=
MYSQL_USERNAME=
MYSQL_PASSWORD=


Execute script

go run main.go


Conclusion


In this post, we’ve looked at what data scraping is, how it’s used, and what the process involves.

Good luck to you, hope this post is of value to you!!!!

Recommend

TypeScript Design Pattern - Prototype
admin07/08/2023

TypeScript Design Pattern - Prototype
The prototype pattern is one of the Creational pattern groups. The responsibility is to create a new object through clone the existing object instead of using the new key. The new object is the same as the original object, and we can change its property does not impact the original object.
Create S3 Bucket with AWS CDK
admin09/06/2023

Create S3 Bucket with AWS CDK
In this article, I introduce Amazon CDK and how to write AWS infrastructure-as-code using TypeScript. We will do it step by step.
Difference Between Stack and Queue
admin07/04/2024

Difference Between Stack and Queue
In the fundamental data structure, besides the linked list, the stack and queue are also used widely in computer science and programming.
Newest

TypeScript Design Pattern - Adapter
admin08/08/2023

TypeScript Design Pattern - Adapter
This design pattern acts as a bridge between two different interfaces.
Microservice in a Monorepo
admin22/06/2023

Microservice in a Monorepo
Microservice in a Monorepo
Create Cognito User Pool with AWS CDK
admin09/06/2023

Create Cognito User Pool with AWS CDK
In the previous post, I showed you how to create a simple S3 bucket. Next, in this article, I will guide you to create a Cognito User Pool.
Đinh Thành Công Blog

My website, where I write blogs on a variety of topics and where I have some experiments with new technologies.

hotlinelinkedinskypezalofacebook
DMCA.com Protection Status
Feedback
Name
Phone number
Email
Content
Download app
hotline

copyright © 2023 - AGAPIFA

Privacy
Term
About