Sunday, January 3, 2016

Data Pipeline and ETL tasks in Go using Ratchet

As Data Engineers and Software Engineers we might define Data Pipelines differently. This Wikipedia entry defines a pipeline as a set of data processing elements connected in series, where the output of one element is the input of the next one; often executed in parallel. That is pretty close to how I'll define it here. More specifically, I'll use The Golang Blog's informal definition:

A series of stages connected by channels, where each stage is a group of goroutines running the same function. In each stage, the goroutines receive values from upstream via inbound channels; perform some function on that data, usually producing new values; and send values downstream via outbound channels.

Ratchet is a Go library that abstracts these pipeline concepts, giving you a clearcut framework to work with. You will satisfy a data processing interface and be provided with some convenience methods for managing the flow of data.

Perhaps you have been tasked with creating automated pipelines that fetch data from disparate sources -- such as remote databases or web services -- and then cleaning up, transforming and aggregating the unorganized data into specialized databases or datasources (an RDBMS, CSV files, etc.) so your team can use that organized data for analytics and other purposes.

Ratchet is an open-source project by Daily Burn. It provides a better way of extracting data from the databases and web services an application uses, then transforming and loading that data into reporting-oriented formats. It makes doing these tasks more robust than creating basic scripts or using limited 3rd-party services that don't give you full control over your data processing.

Install ratchet by running:

 $ go get github.com/dailyburn/ratchet

Example project layout:

    ├── main.go             (Main package. Pipeline functions)
    ├── packages            (Your reporting package)
    │   ├── models.go       (JSON structs for transformer.go)
    │   ├── queries.go      (SQL query functions for SQLReaders)
    │   └── transformer.go  (Custom DataProcessors)

Ratchet consists of a Pipeline with a series of PipelineStages, which each handle one or more DataProcessors. DataProcessors each run in their own goroutine so all of the data is processed concurrently. The DataProcessors send and receive JSON for convenience. Ratchet gives you some useful data processors and an interface for you to implement custom ones.

Your Ratchet tasks will likely use a combination of the provided DataProcessors and your own custom ones. If you're creating an ETL task, the custom DataProcessors will usually be for the Transform stage while the Extract and Load stages could use the built-in Reader/Writer DataProcessors such as SQLReader and SQLWriter. But there are times when you will create custom Data Processors even for the extract stages, such as when you're making calls to 3rd party REST Apis, and the like.

There are other DataProcessors provided for your convenience, such as ones for Google's BigQuery and more. You will get to mix-and-match things as they make sense to your application. See the full-list of provided DataProcessors.

You will typically begin your Ratchet code by including the necessary packages. For the sake of example I will assume you are also going to access an RDBMS, such as MySQL, to perform some SQL reads and/or writes on:

 import (
     "github.com/dailyburn/ratchet"
     "github.com/dailyburn/ratchet/processors"
     "github.com/<YOUR_USER>/<YOUR_PROJECT>/packages"
     "database/sql"
     _ "github.com/go-sql-driver/mysql"
 )

Since an SQLReader takes a *sql.DB as the first parameter, you will want to start by creating one. If you only need a basic SQL string then you may want to use NewSQLReader. Example:

 func UsersQuery(minId int, maxId int) string {
     return fmt.Sprintf(`SELECT id, name FROM users 
         WHERE id >= %v AND id <= %v`, minId, maxId)
 }

We simply created a function that takes arguments, generates some SQL and return a string. We can call it like:

 users := processors.NewSQLReader(someDB, mypkg.UsersQuery(5, 10))

If you need a query to use the JSON results generated from a previous stage, use NewDynamicSQLReader.

When you need to create your own data processors, you implicitly satisfy the DataProcessor interface by implementing its ProcessData and Finish methods. You will create a struct and attach these methods to it. It's up to you to determine how your structure will hold any state you need. Also, because ProcessData deals with receiving JSON from the previous stage and then passing JSON on to the next stage, you will need to create structs for Unmarshaling and Marshaling the JSON.

Tip: Creating structs for all the JSON you will be processing can be tedious so I highly recommend the tool json2go.

The data package includes some types and functions for using JSON with Ratchet. It provides wrapper functions for json.Unmarshal() and json.Marshal() called data.ParseJSON() and data.NewJSON() that provide additional logging. It also creates the type data.JSON as a simple name to a byte slice: type JSON []byte. Example:

 package mypkg

 import "github.com/dailyburn/ratchet/data"

 type myTransformer struct{}

 // Expose our DataProcessor for clients to use
 func NewMyTransformer() *myTransformer {
     return &myTransformer{}
 }

 // Consider moving these two structs to models.go
 type ReceivedData struct {
     UserID int `json:"user_id,omitempty"`
 }
 type TransformedData struct {
     UserID         int    `json:"user_id,omitempty"`
     SomeNewField   string `json:"some_new_field"`
 }

 func (t *myTransformer) ProcessData(d data.JSON, 
                                     outputChan chan data.JSON,
                                     killChan chan error) {

     // Step 1: Unmarshal json into slice of ReceivedData structs
     var users []ReceivedData
     var transforms []TransformedData 
    
     err := data.ParseJSON(d, &users)
     if err != nil {
         killChan <- err
     }

     // Step 2: Loop through slice and transform data
     for _, user := range users {
         transform := TransformedData{}
         transform.UserID = user.UserID;
         transform.SomeNewField = "whatever"
         transforms = append(transforms, transform)
     }

     // Step 3: Marshal transformed data and send to next stage
     dd, err := data.NewJSON(transforms)

     if err != nil {
         killChan <- err
     } else {
         outputChan <- dd
     }
 }

 func (t *myTransformer) Finish(outputChan chan data.JSON,
                               killChan chan error) {}

Notice the idiomatic NewMyTransformer() function that returns a pointer to our DataProcessor's struct with its zero value. Since it starts with a capital letter, it will be used by package users like:

 transform := mypkg.NewMyTransformer()

Finish() is called by Ratchet after a previous stage is done sending its data. You can often implement this as an empty method. Finish is more useful when you want to wait until all the data has been received before doing something with it, and if you're working with more than one input source you may really want this. If that's the case, you will typically use ProcessData() for validating and storing the incoming data into the receiver struct and then doing the second and third steps above inside of Finish() instead. Here is an example of how we could rewrite the above to batch things using a non-empty struct and a pointer reciver, then complete the transformation stage in the Finish():

 type myTransformer struct{
     BatchedUsers []User
 }

 func NewMyTransformer() *myTransformer {
     return &myTransformer{}
 }

 func (t *myTransformer) ProcessData(d data.JSON, 
                                     outputChan chan data.JSON,
                                     killChan chan error) {

     // Step 1: Unmarshal the JSON into a User slice
     var users []User
    
     err := data.ParseJSON(d, &users)
     if err != nil {
         killChan <- err
     }

     // Step 2: append via pointer receiver
     t.BatchedUsers = append(t.BatchedUsers, users...)
 }


 func (t *myTransformer) Finish(outputChan chan data.JSON,
                               killChan chan error) {

     var transforms []TransformedData

     // Step 3: Loop through slice and transform data
     for _, user := range t.BatchedUsers {
         transform := TransformedData{}
         transform.UserID = user.UserID;
         transform.SomeNewField = "whatever"
         transforms = append(transforms, transform)
     }

     // Step 4: Marshal transformed data and send to next stage
     // Write the results if more than one row/record.
     // You can change the batch size by setting loadDP.BatchSize
     if len(transforms) > 0 {
         dd, err := data.NewJSON(transforms)

         if err != nil {
             killChan <- err
         } else {
             outputChan <- dd
         }
     }
 }

Once you have your data processors setup, you will just need to pass them into a new pipeline for processing. If you just have one reader, one transformer and one loader, you can use a basic 3 stage pipeline using NewPipeline():

 pipeline := ratchet.NewPipeline(extractDP, transformDP, loadDP)
 err := <-pipeline.Run()

If you do not even need a transform stage then you can just have extract and load stages: ratchet.NewPipeline(extractDP, loadDP). This is enough if your extracted SQL fields match and table you will load the data into.

Things aren't always that simple so Ratchet provides a more flexible branching pipeline, via NewPipelineLayout(), that takes a variable amount of NewPipelineStage()'s which in turn take a variable amount of DataProcessors. Inside the pipeline stages you will wrap your data processor instances with the Do() function and pass its returned value to the Outputs() method, following these rules:

  • DataProcessors in a non-final PipelineStage must use Outputs().
  • A DataProcessor must be pointed to by one of the previous Outputs() (except in the first stage).
  • Outputs() must point to a DataProcessor in the next immediate stage.
  • DataProcessors in the final stage must not use Outputs().

Here's how the basic 3 stage pipeline shown above would look as a branching pipeline:

 layout, err := ratchet.NewPipelineLayout(
     ratchet.NewPipelineStage( // Stage 1
         ratchet.Do(extractDP).Outputs(transformDP),
     ),
     ratchet.NewPipelineStage( // Stage 2
         ratchet.Do(transformDP).Outputs(loadDP),
     ),
     ratchet.NewPipelineStage( // Stage 3
         ratchet.Do(loadDP),
     ),
 )

 pipeline := ratchet.NewBranchingPipeline(layout)
 err = <-pipeline.Run()

Fortunately, you can do a lot more than that with a branching pipeline. Outputs() can take multiple parameters to shove data into and you will often have multiple calls to Do() in the intermediate stages, for handling disparate data.

Sometimes you will want to pass the original object through the various stages in case you need to use it in a later stage. This is very easy to do using NewPassthrough DataProcessor. In the following example, passing passthrough to Outputs() makes it so aDP gets passed to the next stage along with the other values passed to Outputs():

 passthrough := processors.NewPassthrough()

 ...

 ratchet.NewPipelineStage(
     ratchet.Do(aDP).Outputs(anotherDP, passthrough),
 ),
 ratchet.NewPipelineStage(
     ratchet.Do(anotherDP).Outputs(yetAnotherDP),
     ratchet.Do(passthrough).Outputs(yetAnotherDP),
 ...

Ratchet provides a logging facility that's very useful for debugging Ratchet tasks. It is often helpful to temporarily place the following line into your ProcessData() implementation so that you can see output from the calls to logger.Debug():

 logger.LogLevel = logger.LevelDebug

Even better is for you to create a way to set the different levels from the CLI. For example: --log-level="debug". And so on for LevelError, LevelInfo, LevelStatus, LevelSilent. I recommend using LevelSilent on production or whenever you just need the job to run faster.

Be sure to read the log output (ratchet_default.log) because it shows the SQL INSERT Data that was actually executed, for example. The logger package provides some other useful logging functions for you to use as well.

Another debugging/development tip is to temporarily write the final stage output to a CSVWriter that goes to standard output. This allows you to quickly prototype a Ratchet task without having to set up or write to your final database table(s) yet. Example:

 // Setup all the DataProcessors
 users := processors.NewSQLReader(inputDB, mypkg.UsersQuery())
 bq := processors.NewDynamicBigQueryReader(bqConfig, mypkg.BQQuery)
 bq.UnflattenResults = true
 transformer := mypkg.NewMyTransformer()
 writeCSV := processors.NewCSVWriter(os.Stdout)

 // Create a new Pipeline using the DataProcessors
 layout, err := ratchet.NewPipelineLayout(
     ratchet.NewPipelineStage(
         ratchet.Do(users).Outputs(bq),
     ),
     ratchet.NewPipelineStage(
         ratchet.Do(bq).Outputs(transformer),
     ),
     ratchet.NewPipelineStage(
         ratchet.Do(transformer).Outputs(writeCSV),
     ),
     ratchet.NewPipelineStage(
         ratchet.Do(writeCSV),
     ),
 )

I have covered all of the main concepts. Please see the Ratchet documentation for more information. If you have any questions please post them in the comment section below.

No comments:

Post a Comment

About Me

Followers