Why Your Join is So Slow

Sometimes a simple join operation on 2 small DataFrames could take forever. Here is an example:

scala> df1.count
res0: Long = 607

scala> df2.count
res1: Long = 24

scala> val df3 = df1.join(df2)
scala> df3.count

I have df1 and df2 as 2 DataFrames defined in earlier steps. Both of them are tiny. Logically a join operation is n*m complexity and basically 2 loops. For such 2 small data, the join should take no more than a couple seconds. However on my real example, it took more than a minute to give me the result. What’s happend?

As noticed, when I run the count action, each stage had a lot tasks, since the tasks are somehow proportional to the total number of partitions the operation need to handle, let’s take a look of how many partitions on each of the input data:

scala> df1.rdd.partitions.size
res4: Int = 3

scala> df2.rdd.partitions.size
res5: Int = 65

Not very surprising that although the data are small, the number of partitions is still inherited from the upper stream DataFrame, so that df2 has 65 partitions. Basically the join operation will have n*m (n is the number of partitions of df1, and m is the number of partitions of df2) tasks for each stage. The overhead of so many tasks killed the performance.

To address this, we can use the repartition method of DataFrame before running the join operation.

scala> val df1p1 = df1.repartition(1)
scala> val df2p1 = df2.repartition(1)
scala> val df3 = df1p1.join(df2p1)
scala> df3.count

Now the execution time get back to normal.

Takeaway from this study:
Always consider to repartition the data to a small number before join.

Posted in DataFrame, Spark | 1 Comment

Use the new DataFrame UDF

Since version 1.3, the DataFrame udf has been made very easy to use. You can basically do this

scala> import org.apache.spark.sql.functions.udf
scala> val func = udf((i: Int) => i + 2)
scala> val srdd = sqlContext.createDataFrame(Seq((1,"a"),(2,"b")))
scala> srdd.select(func($"_1") as "added2").show

The udf method will identify the data type from Scala reflection using TypeTag. Therefore to make it work, the Scala function as the parameter of udf should be able to infer the target type.

Since Catalyst Data type has a concept of nullable and if it is true, data value could be null. However since null is so overloaded in Java, Scala uses None value in Option to really address the nullable concept. The new udf method supports Option in Scala and translate it to the Catalyst null. So you can evan do:

scala> val func = udf((i: Int) => if(i>0) Option(i+2) else None)
scala> val srdd = sqlContext.createDataFrame(Seq((1,"a"),(-2,"b")))
scala> srdd.select(func($"_1") as "added2").show

The basic use is very easy. However sometime you really want to extract the commonly used udf and create your own library. In that case, sometimes, you may want to have a rather general function which can handle different data types.

Scala’s Type handling is not a easy thing to work with when you want to be flexible. Here is an example from SMV which shows how to use TypeTag with udf:

import scala.reflect.runtime.universe.TypeTag
......
  def smvCreateLookUp[S,D](map: Map[S,D])(implicit st: TypeTag[S], dt: TypeTag[D]) = {
    val func: S => Option[D] = {s => map.get(s)}
    udf(func)
  }

It can be used as

    val nameMap: Map[String, String] = Map("John" -> "J")
    val mapUdf = smvCreateLookUp(nameMap)
    var res = srdd.select(mapUdf($"first") as "shortFirst")
Posted in DataFrame, Spark, version 1.3 | Leave a comment

There are 2 types of Rows

A Row is basically an Array of Any to hold a data record. Because Scala is strongly typed, a lot of Spark Sql Catalyst code is to maintain it’s own type system. In early versions of Spark Sql, all data are internally stored as Scala types, and most primary types of Catalyst are boxed around the same Scala type (object). Things getting not so clear when you have to deal with Catalyst ArrayType, since in Scala it’s actually a Seq instead of an Array. The real change happened when the DateType is introduced.

DateType in Catalyst is actually an Int, which is the number of days from 19700101 (as day 1). However to make client code more pronounced, Spark Sql somehow decided to make it an java.sql.Date from Scala angle (end user’s angle).

Assume the end user only use DataFrame and functions on Columns (Catalyst Expressions), he does not need to worry at all, since he will care nothing about how Catalyst Type related to Scala types. When he need to write UDF, he need to refer the mapping on the Spark DataFrame document between Catalyst types and Scala types. Anyhow, there is no place he need to worry about internally Catalyst actually used Int to represent Date (so the date field in Row is actually isInstanceOf[Int]).

However, for some power users who want to switch between DF and RDD[Row], he need top be careful.

The rdd method of DF actually called a conversion method on each row to convert Catalyst internal type to the Scala type. For the Date case, although within a DF, it is stored as an Int, when you call rdd, it will be converted to java.sql.Date. It is OK by itself. However, when you did some work on that RDD, and want to convert it back to DF, with calling sqlContext.createDataFrame(rdd, schema), it will NOT automatically convert the Date field back to Int type. Even worse, no error or warning will be issued (on 1.3.0). More interesting is that some of the Catalyst expression can actually handle the java.sql.Date version correctly, which I’m not sure is by design.

My suggestion is that whenever you try to convert it back to DF, call the ScalaReflection.convertToCatalyst method on each field first. I know it ugly, and I hate it too! Hopefully later version of Spark can have a better way to handle this.

Posted in Uncategorized | 1 Comment

GroupBy on DataFrame is NOT the GroupBy on RDD

You might familiar with the following code

df.orderBy($"date").groupBy($"id").agg(first($"date") as "start_date")

There, you used orderBy to put records in order, and assumed groupBy will keep the same order within each group. It works as expected.

Since groupBy method name is the same, and roughly the intention is similar, one will naturally think that the groupBy on DataFrame is actually implemented on top of groupBy on RDD. However, when you check with RDD groupBy document, you will find that none of the variations of RDD groupBy guarantees records order.

How come DF groupBy preserves order, RDD groupBy doesn’t? The reason is that DF groupBy actually has nothing to do with RDD groupBy!

RDD’s groupBy may shuffle (re-partition) the data according to the keys and since the output is always a paired RDD, there is no assumption of what people will do with the paired RDD.

DF’s groupBy, till version 1.3, always takes aggregations, which actually makes it an aggregation operation instead of a general groupBy operation. Because of that, the implementation can make an assumption that the key space is small in each partition, and implement the aggregation by using in memory hash tables. If you check the code, you will find that the DF groupBy simply create one instance of the Aggregation Expressions for each group and each aggregation and go through the data and keep updating those Expressions.

A problem will come when the assumption is failed on your data. For example, you grouped on a sequence of keys and data was not partitioned fine enough, you may run into memory issue. When that happen, you may want to either increate driver-memory or number of partitions spark.sql.shuffle.partitions. Sometimes, even the shuffle.partitions may not guarantee the number of partitions on your data. One example is that you perform a groupBy operation on a just read in file. In that case the number of partitions determined by the reading process which may have nothing to do with the shuffle.partitions parameter.

2 takeaways from this study:

  • Never assume ordering on RDD groupBy
  • Keep an eye on your number of partitions and partition size when you use DF groupBy
Posted in DataFrame, Spark, version 1.3 | 5 Comments

Switch Between DataFrame and RDD

Although the methods and Catalyst on DataFrame are very powerful, sometime we want to have a deeper control either because the functionality is not covered or more commonly we want to use our own knowledge of the data to optimize the performance. When that needs come, we need to consider to get back to Spark Core and use RDD directly.

DataFrame has a method rdd which simply gives you a Rdd[Row]. It also provides a method schema to give user access to the Schema, which is actually a StructType. Anyone who brave enough to use those 2 methods should have no fear to dig into Spark API document or even the code to figure out how to work of Row and StructType. But, be careful here, since although some DF method and RDD methods share the same names they actually behave very differently, need to check document carefully.

Posted in DataFrame, Spark | Leave a comment

Local DataFrame

As we discussed earlier, we can use Catalyst expression on local data.

With introducing the DataFrame concept in Spark 1.3, DataFrame puts abstraction between the concept of Data and the physical storage of data. As one of the benefits of  this abstraction, local data could also be a DataFrame! It makes our life way easier when we try to utilize Catalyst on local data.

As long as you create a DataFrame on local data, it can be used as same as all other DF from RDD. Here is the way of how to do it:

scala> import org.apache.spark.sql.types._
scala> import org.apache.spark.sql.DataFrame
scala> import org.apache.spark.sql.catalyst.plans.logical._

scala> val d = Seq(Seq("a", 1), Seq("b", 2)).map{Row.fromSeq}
scala> val schemaAttr = Seq(AttributeReference("key", StringType, true)(), AttributeReference("value", IntegerType, true)())
scala> val df=new DataFrame(sqlContext, LocalRelation(schemaAttr, d))

Now you can do things like

scala> df.where('key === "a").agg(sum('value)).collect
res1: Array[org.apache.spark.sql.Row] = Array([1])

One caveat:
Create local DF within a partition (execution node) failed at run time.

I think it’s because DataFrame have sqlContext and then SparkContext associated with them even local DataFrame does not need them. Hopefully laster version can give me a real local DF.

Posted in DataFrame, Spark, version 1.3 | Leave a comment

UDF in 1.3

Since Spark 1.3, the Catalyst expressions are hided from the end user and a “function” layer is created on “Column” class.

One consequences is that wc can’t easily use the be-loved “ScalaUdf” expression any more. The “udf” function replaced it.

The udf function is really handy if you are sure that the Column you are dealing with has no nulls. The issue is that although udf simply wrapped around ScalaUdf, it determines the DataType from the Scala function, which passed to it. This caused a problem that I can’t define a function which returns Double and make it nullable, since Scala will not allow null be a Double.

Instead, the right way to do it is to define the function to return java.lang.Double, which actually allows null value. Here is an example which convert string like 12,345 to a double 12345.0.

    val nf = java.text.NumberFormat.getNumberInstance(java.util.Locale.US)
    val s2d: String => java.lang.Double = { s => 
      if (s == null) null
      else nf.parse(s).doubleValue()
    }

Within Scala there are actually 3 types for double: Double is a Scala class, java.lang.Double is a java class, and double is a JVM type. Internally a Scala Double can be implicitly converted to either the a boxed java java.lang.Double object, or directly to double as the primitive data type. It is clear that since double is a data type, it can’t be null. Since Scala Double need to convert to both, it can’t be null either. Scala will generate error if you try to pass null to a Double. However, the boxed java.lang.Double is a normal java object, null IS a valid value.

Posted in DataFrame, Spark, sparksql, version 1.3 | Leave a comment

The Column Class

Since Spark 1.3, Catalyst Expression is hidden from final user. A new class Column is created as a user interface. Internally, it is a wrapper around Expression.

Before we use 2 types of Expressions in select, groupBy etc., one is a Symbol, which refers to an original column of the Srdd, the other is a real Expression like Sqrt('a). When it was Symbol, with import sqlContext._, the Symbol is implicitly converted to analysis.UnresolvedAttribut, which is an Expression.

Since 1.3, an implicit conversion from Symbol to Column is provided by import sqlContext.implicits._. However, both example code and convenience alternative methods are more toward using String to represent Column names instead of using Symbol. Eg. even without import sqlContext.implicits._, you can do

df.select("field1")

The same thing can be written as

df.select(df("field1"))

Where df(...) literally look for the column withing df with name “field1” and return a Column. You can also do

import df.sqlContext.implicts._
df.select($"field1")

The magic here is that there is an implicit class in sqlContext.implicits, which extends StringContext

The problem is when to use which way to do things.

Use String directly

Although the original select taking Column‘s

def select(c: Column*)

One can use String in select directly, because select has a convenience alternative method which defined as

def select(s1: String, others: String*)

As you can see to avoid ambiguity, instead of define select(s: String*), the String version actually split the parameter list. It provide the convenience as call it with explicit string parameters, it also caused a problem that we cant do this

val keys=Seq("a", "b")
df.select(keys: _*) //will not work!

Explicitly specify the dataframe for a column

df("a")

literally search for column “a” in Dataframe “df”. However sometimes we want to construct some general Columnwithout specifying any Dataframe. We need the next method to do so.

General Column from String

$"a"

will create a general Column with name “a”. Then how we use variables in this weird syntax?

Actually $ here is a method on StringContext. An equivalent is s in s"dollar=${d}". With this analogy, we can figure out the way to use variables

val name = "a"
$"$name"

Finally we have a way to select a List of fields

val keys=Seq("a", "b").map(l=>$"$l")
df.select(keys: _*)

Symbol still works through implicit conversion

df.select('single * 2)

will still work.

However since it was through implicit conversion, the following doesn’t work just like the string case:

val keys=Seq('a, 'b)
df.select(keys: _*) //doesn't work

You need to do

df.select(keys.map(l=>$"${l.name}"): _*)

What “as” do

Here is what we do in the past to define new variables:

df.select('a * 2 as 'b)

It will still work in 1.3 as long as you import df.sqlContext.implicits._.

Here is what happened:

  • The 'a implicitly converted to a ColumnName (which is a Column)
  • * is a method of Column which take 2 (as Any) as parameter
  • as is another method of Column, which can take String or Symbol, 'b, as parameter and return a Column.

So basically if we consider as as an operator, left of it should be a Column and right of it should be either a Symbol or a String.

To follow the 1.3 document examples, using String instead of Symbol to represent column names:

df.select($"a" * 2 as "b")

Posted in Spark, sparksql, version 1.3 | Leave a comment

What an ideal Big Data Scientist Master program should cover

The Master program should have 2 slightly different tracks, one focuses more on the theory side, and the other more focuses on the practical side

Shared part

Computer and Network architecture

  • All level of data storage, speed and limitations
  • Network 7 layer architecture (as both the knowledge and an example of a real industry system design)
  • Cacheing and hit rate optimization on all storage level (need to understand even some physical optimization of devices, like the algorithms used for the hard drive’s head movement)

Statistics and statistical modeling

  • Basic statistics
  • Central limit theorem
  • Statistical learning (An Introduction to Statistical Learning with Applications in R)
  • R practice

Financial models and Machine leaning in industry

  • How to model a business problem to a machine learning problem (what to be predicted, what are the entities we model on, dynamic or static?)
  • Real world modeling process: business problem => data discovery => data collection => data study and cleaning => data analysis (insight discovery) => model the business problem => modeling variable discovery and creation => train and validate model => deliver => in production A/B testing
  • Risk models cases
    • Fico score (there are never a signal target to be modeled. a lot times people need to know why instead of just a blackbox)
    • Origination models (censored data)
  • Transactional system and Credit Card fraud detection system
    • Realtime, transactional system
    • Credit Card process and different kind of frauds
    • Classification models in fraud detection
  • Financial market models intro
  • Recommendation system intro

Data Base and Data Warehouse intro (need to know what Big Data gonna to replace)

  • Relational Data Base
  • Data Warehouse
  • Data Modeling
  • Star schema and BI
  • SQL
  • DB optimization intro (need to know how bad it was)
  • Typical DB tasks (transactional or analysis) and storage design (optimize on random quarries or full table scan)

Big Data infrastructure intro

  • Traditional Data Base storage engine
  • Computer cluster: from MPI to MR
  • Hadoop intro
    • History
    • Architecture
    • HDFS
    • YARN (HDFS + YARn as the Operating system of a standard Big Data infrastructure)
  • Spark Intro
    • RDD as a mathematical model of data in data flows
    • Architecture
    • Functional programing intro (Scala)
    • SparkSQL – Catalyst (a language layer below SQL, it will replace SQL!)
    • SchemaRDD and practice
  • Data Lake case study (none of them are good enough yet)

For the Theoretical track

Advanced machine learning

  • SVM
  • Bosting
  • Recommendation systems and SVD
  • Neural network and deep learning
  • Bayesian learning framework
  • Time serious modeling
  • HMM
  • Natural language modeling

Control theory (I’m not familiar with this area)

For the Technical track

Software development methods (I only know the things I learned from work for this topic)

  • Waterfall development
  • Agile development
  • Project management
  • Team structure
  • Version control
  • Development process: Requirement => Requirement review => Design => Design review => Code => Code review => QA => release
  • Code repository and release management and case study

Big Data lab

  • Build a 4 node cluster
  • Install Hadoop 1 and 2 from one of the distribution packages
  • Run test, performance test against benchmark, diagnose performance bottleneck
  • Install Spark from source code
  • Exercise on some real data

Programing languages (Java, Scala, Python)

BI systems and Web application intro

Posted in General | Leave a comment

Using Catalyst on local Scala data

The invention of Catalyst DSL as a language layer below SQL is super critical for cumulating relation algebra type of IP in computer language form. It was the missing link between ad hoc query type of “analysis” and fully production ready, reusable applications.

Within SparkSql, SchemaRDD defines the “data”, and Catalyst defines the “logic”. In that sense, as long as we want to calculate things using the element of the data, we should use Catalyst Expressions to do so.

However, not all data we want to deal with are in the form of SchemaRDD. Sometimes, it’s more connivence to deal with some local in memory data which simply hold in a Scala collection. Internally, Catalyst is very general, it should be straightforward to use it without SchemaRDD. The best document available for this is actually the code. The comment of file “spark/sql/catalyst/dsl/package.scala” is a good starting point.

Here is a very simple example of using Catalyst Expressions directly on Seq[Row]:

import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.expressions._

val p = LocalRelation('key.string, 'value.int).select('value + 1 as 'newv).analyze
val s=BindReferences.bindReference(p.expressions(0).children(0), p.inputSet.toSeq)

val d = Seq(Seq("a", 1), Seq("b", 2)).map{Row.fromSeq}
val res = d.map{r => s.eval(r)}

val p2 = LocalRelation('key.string, 'value.int).groupBy()(Sum('value) as 'newv).analyze
val s2 = BindReferences.bindReference(p2.expressions(0).children(0), p2.inputSet.toSeq).asInstanceOf[AggregateExpression].newInstance

d.foreach{r => s2.update(r)}
val res2 = s2.eval(null)
Posted in Scala, Spark, sparksql | 1 Comment