CategoryTagArticle

admin

I'm a Full-stack developer

Tag

Linked List
Data Structure
Chat GPT
Design Pattern
Microservices
API
AWS CDK
ReactJS
AWS Lightsail
Flutter Mobile
Writing a Data Transformation Pipeline Using Go
Published date: 20/03/2024

In this article, I will show how to implement data processing using the Go programming language with a simple tutorial.


Prerequisites

You should read/complete the EXTRACT DATA USING WEB SCRAPING WITH PYTHON article before implementing this article because this article is the next step of the previous article. 


Content

  1. What is the data processing?
  2. Why is Go programming language for data processing?
  3. Let’s get started!


What is the data processing?

  • Data transformation is the process of converting data from one format, such as a CSV file, Excel spreadsheet, etc..., into another.


  • Transformations typically involve converting a raw data source into a cleansed, validated, and ready-to-use format. Data transformation is crucial to data management processes that include data integration, data migration, data warehousing, and data preparation.


Why is Go programming language for data processing?

Document ref: Supercharging Data Processing with Go: A Comprehensive Guide


  • Performance: Go is known for its exceptional performance, thanks to its compiled nature and efficient garbage collection. This makes it well-suited for handling large volumes of data quickly and efficiently. It also benefits from low-level control, which allows developers to optimize data processing algorithms for their specific needs.


  • Concurrency: Concurrency is a first-class citizen in Go. Goroutines and channels make it easy to write concurrent and parallel programs, making Go an ideal choice for tasks that require handling multiple data streams or parallel processing.


  • Simplicity: Go's simplicity in syntax and language design makes it accessible to both experienced developers and those new to the language. This simplicity extends to data processing, allowing developers to focus on the task at hand rather than wrestling with complex language features.


Let’s get started!

  • Create a new directory with the name data-transformation
mkdir data-transformation


  • Then, move to the new folder.
cd data-transformation


  • Create a go.mod file with the go mod command
go mod init data-transformation


  • Then, go.mod file is created with content
module data-transformation

go 1.21.4


  • Next, create a main.go file:
package main

import (
  "agapifa-data-transformation/config"
  transformation_data_core "agapifa-data-transformation/core"
  aws_s3_utils "agapifa-data-transformation/utils"
  mysql_utils "agapifa-data-transformation/utils"
  "os"

  _ "github.com/go-sql-driver/mysql"
)

func main() {
  bucketName := os.Args[1]
  fileKey := os.Args[2]

  config.LoadConfig(".")

  db := mysql_utils.ConnectDB()

  aws_s3_utils.DownloadFromS3Bucket(bucketName, fileKey)

  transformation_data_core.Exec(bucketName+"/"+fileKey, db)
}


  • Create a config/config.go file:
package config

import (
  "github.com/rs/zerolog/log"

  "github.com/spf13/viper"
)

type Config struct {
  Environment           string `mapstructure:"ENVIRONMENT"`
  AWS_REGION            string `mapstructure:"AWS_REGION"`
  AWS_SECRET_ACCESS_KEY string `mapstructure:"AWS_SECRET_ACCESS_KEY"`
  AWS_ACCESS_KEY_ID     string `mapstructure:"AWS_ACCESS_KEY_ID"`
  MYSQL_HOST            string `mapstructure:"MYSQL_HOST"`
  MYSQL_PORT            string `mapstructure:"MYSQL_PORT"`
  MYSQL_DATABASE        string `mapstructure:"MYSQL_DATABASE"`
  MYSQL_USERNAME        string `mapstructure:"MYSQL_USERNAME"`
  MYSQL_PASSWORD        string `mapstructure:"MYSQL_PASSWORD"`
}

func LoadConfig(path string) (config Config, err error) {
  viper.AddConfigPath(path)
  viper.SetConfigName("app")
  viper.SetConfigType("env")

  viper.AutomaticEnv()

  err = viper.ReadInConfig()
  if err != nil {
    log.Fatal().Err(err).Msg("cannot load config")
  }

  err = viper.Unmarshal(&config)
  return
}


  • Create utils:
  • mysql.go file
package utils

import (
  "agapifa-data-transformation/config"
  "database/sql"
  "fmt"

  _ "github.com/go-sql-driver/mysql"
  "github.com/rs/zerolog/log"
)

func ConnectDB() *sql.DB {
  config, _ := config.LoadConfig(".")
  db, err := sql.Open("mysql", fmt.Sprintf("%s:%s@tcp(%s:%s)/%s", config.MYSQL_USERNAME, config.MYSQL_PASSWORD, config.MYSQL_HOST, config.MYSQL_PORT, config.MYSQL_DATABASE))
  if err != nil {
    log.Fatal().Err(err).Msg("Error when opening DB")
  }

  return db
}


  • aws_s3.go file
package utils

import (
  "agapifa-data-transformation/config"
  "fmt"
  "os"

  "github.com/rs/zerolog/log"

  "github.com/aws/aws-sdk-go/aws"
  "github.com/aws/aws-sdk-go/aws/session"
  "github.com/aws/aws-sdk-go/service/s3"
  "github.com/aws/aws-sdk-go/service/s3/s3manager"
)

func DownloadFromS3Bucket(bucket string, filePath string) {
  config, _ := config.LoadConfig(".")

  if _, err := os.Stat(bucket); os.IsNotExist(err) {
    os.Mkdir(bucket, os.ModePerm)
  }

  file, err := os.Create(bucket + "/" + filePath)
  if err != nil {
    log.Fatal().Err(err).Msg("Error when opening DB")
  }
  defer file.Close()

  sess, _ := session.NewSession(&aws.Config{Region: aws.String(config.AWS_REGION)})
  downloader := s3manager.NewDownloader(sess)
  numBytes, err := downloader.Download(file,
    &s3.GetObjectInput{
      Bucket: aws.String(bucket),
      Key:    aws.String(filePath),
    })
  if err != nil {
    fmt.Println(err)
  }

  fmt.Println("Downloaded", file.Name(), numBytes, "bytes")
}


  • Next, create a core/transformation_data.go file
package transformation_data_core

import (
  "database/sql"
  "encoding/csv"
  "os"
  "strconv"
  "github.com/rs/zerolog/log"
)

type CsvRecord struct {
  Name   string
  Price int
  Description   string
  Image      string
}

func openFile(filePath string) *os.File {
  f, err := os.Open(filePath)
  if err != nil {
    log.Fatal().Err(err).Msg("Opening csv file has an error")
  }


  return f
}

func readFile(file *os.File) [][]string {
  csvReader := csv.NewReader(file)
  csvReader.Comma = ','
  data, err := csvReader.ReadAll()

  if err != nil {
    log.Fatal().Err(err).Msg("Reading csv file has an error")
  }

  return data
}

func getProductList(data [][]string) []CsvRecord {
  var productList []CsvRecord
  for i, line := range data {
    if i > 0 {
      var rec CsvRecord
      for j, field := range line {
        if j == 0 {
          rec.Name = field
        }

        if j == 1 {
          i, err := strconv.Atoi(field)
          if err != nil {
            rec.Price = 0
          } else {
            rec.Price = i
          }
        }

        if j == 2 {
          rec.Description = field
        }

        if j == 3 {
          rec.Image = field
        }
      }
      productList = append(productList, rec)
    }
  }

  return productList
}


func upsertProducts(db *sql.DB, productList []CsvRecord) {
  for _, p := range productList {
    var productId sql.NullString

    db.QueryRow("SELECT id FROM product WHERE name = ?", p.Name).Scan(&productId)

    if productId.String == "" {
      productQuery, err := db.Query(`
        INSERT IGNORE INTO product(name, description, price) 
        VALUES(?, ?, ?)
        `,
        p.Name,
        p.Description,
        p.Price,
      )

      if err != nil {
        log.Fatal().Err(err).Msg("Insert product has an error")
      }

      productQuery.Close()

      db.QueryRow("SELECT id FROM product WHERE name = ?", p.Name).Scan(&productId)
    }
  }
}

func Exec(file_path string, db *sql.DB) {
  file := openFile(file_path)
  data := readFile(file)

  productList := getProductList(data)

  upsertProducts(db, productList)

  defer file.Close()
  defer db.Close()
}


  • Add .env
ENVIRONMENT=development

# AWS
AWS_REGION=
AWS_SECRET_ACCESS_KEY=
AWS_ACCESS_KEY_ID=

# MySQL
MYSQL_HOST=
MYSQL_PORT=
MYSQL_DATABASE=
MYSQL_USERNAME=
MYSQL_PASSWORD=


Execute script

go run main.go


Conclusion


In this post, we’ve looked at what data scraping is, how it’s used, and what the process involves.

Good luck to you, hope this post is of value to you!!!!

Recommend

JOI - API schema validation
admin12/06/2023

JOI - API schema validation
Data validation is one of topics that I am interesting. I always review my code after developed features or fixed bugs. There are many places where need to validate data, it is really terrible. Some cases, we need to validate data input because ensure the data into API, it will not make any problems to crash system.
Create Project with Express + TypeScript + ESLint + Auto Reload
admin12/06/2023

Create Project with Express + TypeScript + ESLint + Auto Reload
In this article, I introduce to you how to initialize an Express + TypeScript project.
TypeScript Design Pattern - Abstract Factory
admin07/08/2023

TypeScript Design Pattern - Abstract Factory
The abstract factory pattern is one of five design patterns in the Creational Design Pattern group. The abstract factory provides an interface for creating families of related or dependent objects without specifying their concrete classes.
Newest

TypeScript Design Pattern - Builder
admin07/08/2023

TypeScript Design Pattern - Builder
TypeScript Design Pattern - Builder
How to secure your API gateway
admin17/04/2024

How to secure your API gateway
In this blog, I will cover the 6 methods that technology leaders need to incorporate to secure and protect APIs.
Create Project with Express + TypeScript + ESLint + Auto Reload
admin12/06/2023

Create Project with Express + TypeScript + ESLint + Auto Reload
In this article, I introduce to you how to initialize an Express + TypeScript project.
Đinh Thành Công Blog

My website, where I write blogs on a variety of topics and where I have some experiments with new technologies.

hotlinelinkedinskypezalofacebook
DMCA.com Protection Status
Feedback
Name
Phone number
Email
Content
Download app
hotline

copyright © 2023 - AGAPIFA

Privacy
Term
About