Understanding the Data Partitioning Technique

The objective of this post is to explain what data partitioning is and why it is important in the context of a current data architecture to improve the storage of the master dataset. With data partitioning we’ll get a logical distribution of large data sets in different partitions, which will allow us to make more efficient queries, facilitate the management and improve the maintenance of the system.
We will see how to achieve partitioning with some of the existing technologies for large-scale data processing: Hadoop and Spark. Although these two frameworks are different we’ll show how to achieve file partitioning with both, and we’ll check some of the differences between them.

 

And on the eighth day God created the Big Data

We are going to try and touch on the main concepts, without going too deeply into the data architectures, which include distributed processing, storage (SQL and NoSQL) and data integration.post-2

In these systems, the store where all the information is saved is called a Data Lake. The Data Lake contains two different storage blocks: Raw Data and Master Data.

We can think of Raw Data storage block (RD) as a box with thousands of LEGO pieces, and the Master Data storage block (MD) as the box where all the LEGO structures are built.

In the RD the source information is stored as it arrives to the system, without any manipulation. In the MD, the information from the RD is stored after being filtered, structured and refined. This process is called mastering. In our analogy this process would be the instructions to assemble the pieces. It’s possible to reprocess information if necessary as long as you have the RD.

In the MD, it is usual to have different kinds of distributed data storages like file systems (HDFS), key-value stores (Cassandra), document stores (MongoDB), graph DBMS (Neo4J), relational DBMS (PostgreXL), etc. For the purpose of this post we are going to deal only with distributed file systems, such as Hadoop Distributed File System (HDFS).

Data partitioning is only one of the techniques applied in the process of mastering raw data, which allows you to improve the data reading performance.

 

What is data partitioning?

Imagine we have an encyclopedia written in one single book, and with the passing of time, historical and social events are updated in this book. At some point the book would be so huge that we would need a truck to move it, so we need to divide this book into several volumes.

 

imagen2

 

Data partitioning is a technique for physically dividing the data during the loading of the Master Data. Using this method we are going to split a table into smaller pieces according to rules set by the user. It is normally used when we have very large tables which require a huge amount of time to read the entire data set, therefore it will allow us to improve the maintenance, performance or management.

Regarding how the user does the partitioning of wide data tables, there are basically two ways: either horizontally (by row) or vertically (by column). Horizontal partitioning consists of distributing the rows of the table in different partitions, while vertical partitioning consists of distributing the columns of the table.

We can’t forget we are working with huge amounts of data and we are going to store the information in a cluster, using a distributed filesystem. One of the most popular is Hadoop HDFS.

 

How does HDFS work?

To understand the HDFS mechanism let’s think we have a video club with several shelves to display the DVD’s. When the owner receives new orders they do so in big boxes with hundreds of movies.

In HDFS the files are split in pieces called blocks and stored across several nodes on the cluster called Datanodes. In our analogy our files would be the boxes, the pieces would be the DVD’s, and the shelves would be the Datanodes. The size of these blocks are typically quite large (by default 64 MB), to minimize overall seek time compared to transfer time.

Usually a block will be stored across several Datanodes, depending on a replication factor, which is equivalent to put the same movie on different shelves. If the replication factor is 2 the movie should be on two shelves.

Also exist a specific node, called Namenode, which is responsible to know where the blocks of the files along the cluster are stored. In the example the Namenode would be an inventory with the movies and the shelves.

 

How can we partition a table stored in HDFS?

For example, let’s suppose we have a table with the transactions of purchases made with a credit card and this table has thousand of millions of records.

transaction_id user credit_number amount t_date purchase_type description
34100220501 Brad Pitt 1111-2222-3333-4444 54.95 01-02-2016 Food Restaurant
34100220502 Angelina Jolie 1111-1111-1111-0000 200 01-03-2016 Services Amazon
5000220100 Kim Kardashian 5566-2222-3333-4444 60 02-04-2016 Sports Basketball match

If we have to query which transactions have been realised in the current month, it would be necessary to search along all registers and check all the dates, which could be time consuming.

The way how we can achieve partitioning in HDFS is using the folders structure to split data in different physical sets, even with several levels, with a part of the information of the table. As we can see in the picture, the name of each folder should contain the concrete value of the column and optionally also the name of the column.

Some criteria must be met when choosing the key partition columns:

  • Be used very frequently with the same conditions.
    • Time-based data: combination of year, month, and day associated with time values.
    • Location-based data: geographic region data associated with some place.
  • Have a reasonable number of different values (cardinality).
    • The number of possible values has to be reasonable to gain efficiency splitting the data. For example a valid range could be between 10 and 1000.

A hands-on look at the code…

hiveFirst of all we are going to show you how to program with Hadoop using the tool HIVE, which is an abstraction layer over data processing engines, that allows the user to program jobs using an SQL-like syntax. Although HIVE supports other engines like Tez or even Spark, we’ll assume we are working with Hadoop MapReduce.

The purpose of the examples is to show you how to code different use cases. In these ones we use the environment variable ${YOUR_PATH} which represents a default folder for loading and saving files.

 

Case 1. Creating a table defining the partitioning keys, year and month.

CREATE TABLE MYSCHEMA.TRANSACTIONS_PARTITIONED (
TRANSACTION_ID BIGINT , USER_ID STRING, CREDIT_NUMBER STRING, 
T_DATE DATE, PURCHASE_TYPE STRING, DESCRIPTION STRING)
PARTITIONED BY (DATE_YEAR INT, DATE_MONTH INT);

Case 2. Loading data into table from an existing one.

INSERT OVERWRITE TABLE MYSCHEMA.TRANSACTIONS_PARTITIONED
PARTITION(DATE_YEAR, DATE_MONTH)
SELECT *,YEAR(TR.T_DATE) AS DATE_YEAR, MONTH(TR.T_DATE) AS DATE_MONTH
FROM MYSCHEMA.TRANSACTIONS TR;

Case 3. Loading data from existing file.

LOAD DATA LOCAL INPATH '${YOUR_PATH}/raw/myschema/transactions.avro'
INTO TABLE MYSCHEMA.TRANSACTIONS_PARTITIONED;

Case 4. Adding data from existing partition.

ALTER TABLE MYSCHEMA.TRANSACTIONS_PARTITIONED
ADD PARTITION (DATE_YEAR=2016, DATE_MONTH=1) location 
'${YOUR_PATH}/master/myschema/transactions/2016/1/';

sparkSecondly we are going to show how to do the partitioning with Spark API using Scala language. In the examples we need to use two main objects SparkContext and SQLContext, typically available in a Spark shell. SparkContext represents a connection to a computing cluster, and  SQLContext is the entry point for working with structured data (rows and columns) throughout Dataframes. Since Spark 2.0 we can use SparkSession object instead of SQLContext.

This code is ready to be executed in a Spark shell. In the examples SparkContext is used with the immutable variable sc and SQLContext is used with sqlContext.

Case 5. Creating a data frame from a RDD (Resilient Distributed Dataset).

import java.time.{LocalDate, Month}
import org.apache.spark.sql.types._
import org.apache.spark.sql.{Row, SQLContext}

val schema = StructType(Seq(
StructField("transaction_id", LongType, true), 
StructField("user", StringType, false), 
StructField("credit_number", StringType, false), 
StructField("amount", DoubleType, false),
StructField("t_date", DateType, false),
StructField("purchase_type", StringType, false), 
StructField("description", StringType, false)))

val myArray = Array(Row(34100220501L, "Brad Pitt", "1111-2222-3333-4444", 14.95d,
java.sql.Date.valueOf(LocalDate.of(2016, Month.JANUARY, 1)),"Food","Restaurant"),
Row(34100220502L, "Angelina Jolie", "1111-1111-1111-0000", 200d,
java.sql.Date.valueOf(LocalDate.of(2016, Month.FEBRUARY, 16)),"Services","Amazon"))

val rdd = sc.parallelize(myArray)
val df = sqlContext.createDataFrame(rdd,schema)

Case 6. Creating a data frame from an existing file programmatically.

val df = sqlContext.read.option("header","true")
.option("inferSchema","true")
.load("${YOUR_PATH}/raw/myschema/transactions.avro")

Case 7. Creating a data frame from an existing file using Spark SQL.

val df = sqlContext.sql(
"CREATE TABLE MYSCHEMA.TRANSACTIONS_PARTITIONED
USING com.databricks.spark.avro
OPTIONS (path '${YOUR_PATH}/raw/myschema/transactions.avro', 
header 'true', inferSchema 'true')")

Case 8. Creating a partitioned table by year and month from a dataframe (cases 5,6,7).

import org.apache.spark.sql.{SaveMode}
import org.apache.spark.sql.functions._
import sqlContext.implicits._

//we have a dataframe in df from earlier examples
df.select('*, year('t_date) as 'date_year, month('t_date) as 'date_month)
.write.partitionBy("date_year","date_month").mode(SaveMode.Overwrite)
.save("${YOUR_PATH}/master/myschema/transactions")

Case 9. Loading partitioned files into a dataframe

val transactionsDf = sqlContext.read.load(
"${YOUR_PATH}/master/myschema/transactions/date_year=2016/date_month=1/")
transactionsDf.count
transactionsDf.show

Summing up

In this post we have reviewed some interesting topics. Firstly we have seen different components  related with the storage in a current data architecture. After that we have learned how the data partitioning technique works and when to apply this technique is useful. Finally we have done a practical approach showing several code examples.

It’s simple to achieve partitioning with these two frameworks. We have showed that using HIVE we define the partitioning keys when we create the table, while with Spark we define the partitioning keys when we are saving a DataFrame. Furthermore HIVE only uses an SQL-like language, while Spark also supports a much wider range of languages: Scala, Python, R and Java.

Under the code there are two different approaches for large-scale data processing. Hadoop MapReduce is based on an older technology, which uses an intensive use of read and write operations on disk. On the other hand Spark works more efficiently using a distributed shared memory, working with specific structures like DataFrames or RDDs.

References

mm

Álvaro Navarro

Over the last years I've been working as a software engineer in several projects for public and private companies, mainly developing web applications and web layered based architectures to support their development. Currently I am immersing myself in the Big Data world and it's technology stack.

More Posts

mm

Over the last years I’ve been working as a software engineer in several projects for public and private companies, mainly developing web applications and web layered based architectures to support their development. Currently I am immersing myself in the Big Data world and it’s technology stack.

Deja un comentario

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *