UDAF and Tungsten

We are using Spark 1.5.2 in some client projects, and ran into an issue regarding the spark.sql.tungsten.enabled configuration parameter.

For some reason we turned off Tungsten by setting

spark.sql.tungsten.enabled false

While within the code we used some UDAF.
Spark error out with

histStr$ is implemented based on the new Aggregate Function interface and it cannot be used with functions implemented based on the old Aggregate Function interface.;

where histStr is the UDAF we defined.

The error happens when the optimized logical plan get converted to the physical plan.

Turn Tungsten back on fixed the error.

Posted in Spark | Leave a comment

Create Unique Record Key for Table Linking

When we developing data applications, one principle we keep following is that “do not introduce new data”. A little bit more strict way to say it is that all output should be derived from input data through the code deterministically.

With this principle in mind there 2 very difficult tasks: create random number, and create UUID.  Here we try to address the UUID case.

Commonly we have a table (A) with the unique key as the combination of multiple fields, and we also want another table (B) to refer records in table A. In that case we don’t want B to store all the multiple fields which as the unique key for A, instead we want to have a record-id of A so that B can store that record-id and lookup.

The difficulty is that all the UUID generator depends on how table A stored instead of purely the content of A. Theoretically, the row ordering of table A is not an attribute of the data, instead it’s an attribute of how the data stored or organized. Since we typically deal with distributed system, any assumption on how the data stored will not be guaranteed. In other words, if we introduced the UUID depend on the row-number on some storage, we can’t reproduce them, unless we keep storing the UUID as part of data (in that sense we introduced new data).

To derive a unique ID from purely data element, we could

  • Just concatenate all the fields of the unique key
  • Sort by all the fields of the unique key and take the sequence number as unique key
  • Apply a low collision hash function on the concatenate unique key

The disadvantage of concatenate key is that it can be too long depend on the data. The disadvantage of sorted sequence is that it added another O(Nlog(N)) step to sort the data. The hash method could be ideal if the collision chance is very small.

This link is pretty nice for comparing different hashes (we need those large ones, 32-bit hashes are too easy to get collision). Theoretical collision opportunity on N records could be estimated as 1 – (1-(1/2)^B)^(N(N-1)/2), where B is the number of bits of the hash, and N is the number of records. For most of use cases N is in the range of billion (which is 2^30). Some calculation will lead to the conclusion that 128-bit hash is good enough. 64-bit is too small.

In that sense, either md5 or sha(1 or 2) will work for billion-record data. In Spark, those 2 are build in column functions already. So to create unique id from a group of key columns could simply be

df.selectPlus(md5(concat(keyCols: _*)) as "uid")

 

Posted in Spark | Leave a comment

Use Metadata with columns

DataFrame‘s schema is represented by a Catalyst StructType, and the members of the StructType are StructFields. Basically each Column will be mapped to a StructField when it get resolved.

In Spark 1.5.*, the as column method support an optional second parameter,

scala> import org.apache.spark.sql.types.Metadata
scala> case class A(id:Int, v:String)
scala> val df = sqlContext.createDataFrame(Seq(A(1,"aa"), A(2,"bb")))
scala> val ndf = df.select($"id", $"v".as("newV", Metadata.fromJson("""{"desc": "replace old V"}""")))

The second parameter of as is a Metadata object. The content of the Metadata could be specified by the fromJson method.

With the Metadata object, the new dataframe’s schema has the metadata attached to the newV column. One can check as the following,

scala> ndf.schema.foreach{s => println(s"${s.name}, ${s.metadata.toString}")}
id, {}
newV, {"desc":"replace old V"}

How the metadata will propagate is still not very clear to me. Since in my real project use case, I basically use Metadata to attach description with the output data, it’s not a critical requirement on general metadata propagation yet. In this use case, the only propagation requirement is on column renames.

On column renaming process, here is what I learned in practice
* withColumnRenamed will not propagate metadata
* as on $"colname" type of syntax will not propagate metadata
* as on df("colname") type of syntax WILL propagate metadata

scala> val df1=ndf.withColumnRenamed("newV", "nnV")
scala> val df2=ndf.select($"newV" as "nnV")
scala> val df3=ndf.select(ndf("newV") as "nnV")
scala> df1.schema.foreach{s => println(s"${s.name}, ${s.metadata.toString}")}
id, {}
nnV, {}

scala> df2.schema.foreach{s => println(s"${s.name}, ${s.metadata.toString}")}
nnV, {}

scala> df3.schema.foreach{s => println(s"${s.name}, ${s.metadata.toString}")}
nnV, {"desc":"replace old V"}

As I described at the beginning of this article, since metadata is a member of StructField, instead of a number of Column, it can’t be accessed until the Column expression is resolved. Now it’s easy to understand that $"colname" is an Unresolved expression, and df("colname") is a resolved one, so as behaved differently.

Since my only use case is for column description, we created some helper methods in SMV to simplify the client code,

scala> val df4 = df.select($"id", $"v" as "newV" withDesc "renamed col V")

scala> df4.printDesc
id:
newV: renamed col V

Actually column metadata could be more critical if the propagation rule can be controlled by user. For example, some raw data fields have customer’s PII info which should be labeled, and anything derived from them should also be labeled, where metadata could be the natural way to handle those column labels.

Posted in Uncategorized | Leave a comment

Use Sbt Console as Spark-Shell

Sbt console could be more convenience than the plain Scala shell when we doing incremental development.

If you already using Sbt for your project, it’s very simple to setup Sbt Console to replace Spark-shell command.

Let’s start from the basic case. When you setup the project with sbt, you can simply run the console as

$ sbt console

within the console, you just need to initiate SparkContext and SQLContext to make it behave like Spark Shell

scala> val sc = new org.apache.spark.SparkContext("local", "shell")
scala> val sqlContext = new org.apache.spark.sql.SQLContext(sc)

Note that current version of Spark shell actually initiate sqlContext as a HiveContext. You can mimic that too.

Just as simple as it.

Since we always have to type those 2 lines at the starting of the console, let’s automate it by adding the following to your build.sbt file of the project,

initialCommands in console := s"""
val sc = new org.apache.spark.SparkContext("local", "shell")
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
"""

Now let’s bring up the fun stuff. We can take advantage of Sbt automatic incremental build with sbt console as our REPL.

To start the console with incremental build,

$ sbt ~console

When it started, nothing different from start the console without the tilde.
After you try something in the console, and modified some project code with editor or IDE and save the change, the console will NOT rebuild the classes yet. You need to EXIT it with Cntr-D, then
* Current console terminated
* Automatic incremental compile started
* A new console started

Within the new console, you can use the modified class.

It is not truly dynamic reload of the class in the REPL yet, but it is much better than the following cycle

compile->start spark-shell->play->edit->exit spark-shell->compile 

Here is another trick to play if every time of starting console you need to load some scala file. The initialCommands method in build.sbt does not support :load myfile.scala command. However you can load the myfile.scala as a string,

val initFile = IO.read(new File("myfile.scala"))

initialCommands in console := s"""
val sc = new org.apache.spark.SparkContext("local", "shell")
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
${initFile}
"""
Posted in DataFrame, Spark, Uncategorized | Leave a comment

Calculate Quantile Using Window functions

Before we start, please be aware that within current Spark version (1.5.1),Window functions only works with HiveContext instead of the plain SqlContext. However, since HiveContext extends SqlContext, Spark-shell’s sqlContext  is actually a HiveContext. If you use Spark-shell to test Window functions everything will work. If you want to use them in an application, you need to make sure that HiveContext is created at the beginning of the app.

According to the API document, ntile function “returns the ntile group id (from 1 to n inclusive) in an ordered window partition”. Let’s give it a try

scala> case class Rec(id:Int, k:Int, v:String)
scala> val data=1.to(10).map{n => Rec(1, n, s"v${n}")} ++ Seq(Rec(1, 6, "x1"), Rec(1,6,"x2"))
scala> val df=sqlContext.createDataFrame(data)

scala> val w = Window.partitionBy("id").orderBy("k")
scala> df.selectPlus(ntile(4).over(w)).show
+---+---+---+-----------------------------------------------+
| id|  k|  v|'ntile(4) WindowSpecDefinition UnspecifiedFrame|
+---+---+---+-----------------------------------------------+
|  1|  1| v1|                                              1|
|  1|  2| v2|                                              1|
|  1|  3| v3|                                              1|
|  1|  4| v4|                                              2|
|  1|  5| v5|                                              2|
|  1|  6| v6|                                              2|
|  1|  6| x1|                                              3|
|  1|  6| x2|                                              3|
|  1|  7| v7|                                              3|
|  1|  8| v8|                                              4|
|  1|  9| v9|                                              4|
|  1| 10|v10|                                              4|
+---+---+---+-----------------------------------------------+

The result may not reflect the typical way people define quantile. For example, some of value 6 of k column has quantile 2 and some others have quantile 3.

It sounds like ntile is derived from the rowNumber function instead of the rank function. The difference between those 2 are shown below

scala> df.selectPlus(rowNumber.over(w) as "rn", rank.over(w) as "rk").show
+---+---+---+---+---+
| id|  k|  v| rn| rk|
+---+---+---+---+---+
|  1|  1| v1|  1|  1|
|  1|  2| v2|  2|  2|
|  1|  3| v3|  3|  3|
|  1|  4| v4|  4|  4|
|  1|  5| v5|  5|  5|
|  1|  6| v6|  6|  6|
|  1|  6| x1|  7|  6|
|  1|  6| x2|  8|  6|
|  1|  7| v7|  9|  9|
|  1|  8| v8| 10| 10|
|  1|  9| v9| 11| 11|
|  1| 10|v10| 12| 12|
+---+---+---+---+---+

There is a percentRank function which defined as

(rank - 1)/(total_num_of_row -1)

The result is the following

scala> df.selectPlus(percentRank.over(w) as "pr").show
+---+---+---+-------------------+
| id|  k|  v|                 pr|
+---+---+---+-------------------+
|  1|  1| v1|                0.0|
|  1|  2| v2|0.09090909090909091|
|  1|  3| v3|0.18181818181818182|
|  1|  4| v4| 0.2727272727272727|
|  1|  5| v5|0.36363636363636365|
|  1|  6| v6|0.45454545454545453|
|  1|  6| x1|0.45454545454545453|
|  1|  6| x2|0.45454545454545453|
|  1|  7| v7| 0.7272727272727273|
|  1|  8| v8| 0.8181818181818182|
|  1|  9| v9| 0.9090909090909091|
|  1| 10|v10|                1.0|
+---+---+---+-------------------+

The percentRank can easily be converted to quantile or deciles.

Since percentRank has value in range [0, 1],
when map it to n-tiles we may need to handle the boundary carefully. One possible mapping function could be

scala> df.selectPlus(percentRank.over(w) as "pr").selectPlus(floor($"pr"*4) +1 as "rawQt").selectPlus(when($"rawQt" === 5, lit(4)).otherwise($"rawQt") as "qt").show
+---+---+---+-------------------+-----+---+
| id|  k|  v|                 pr|rawQt| qt|
+---+---+---+-------------------+-----+---+
|  1|  1| v1|                0.0|  1.0|1.0|
|  1|  2| v2|0.09090909090909091|  1.0|1.0|
|  1|  3| v3|0.18181818181818182|  1.0|1.0|
|  1|  4| v4| 0.2727272727272727|  2.0|2.0|
|  1|  5| v5|0.36363636363636365|  2.0|2.0|
|  1|  6| v6|0.45454545454545453|  2.0|2.0|
|  1|  6| x1|0.45454545454545453|  2.0|2.0|
|  1|  6| x2|0.45454545454545453|  2.0|2.0|
|  1|  7| v7| 0.7272727272727273|  3.0|3.0|
|  1|  8| v8| 0.8181818181818182|  4.0|4.0|
|  1|  9| v9| 0.9090909090909091|  4.0|4.0|
|  1| 10|v10|                1.0|  5.0|4.0|
+---+---+---+-------------------+-----+---+

Note we used some SMV helper functions to simplified the code.

 

Posted in DataFrame, Spark, sparksql, Uncategorized | Leave a comment

dropDuplicates may create unexpected result

In an earlier post, I mentioned that first aggregate function is actually performed a “first-none-null”. This post is a consequences from that bug/feature.

Here is a quick test of dropDuplicates DF-method within the Spark-shell

scala> case class Rec(id:Int, k:Int, v:String)
defined class Rec

scala> val df=sqlContext.createDataFrame(Seq(Rec(1,2,null),Rec(1,3,"a")))
df: org.apache.spark.sql.DataFrame = [id: int, k: int, v: string]

scala> df.show
+---+---+----+
| id|  k|   v|
+---+---+----+
|  1|  2|null|
|  1|  3|   a|
+---+---+----+


scala> df.dropDuplicates(Seq("id")).show
+---+---+---+
| id|  k|  v|
+---+---+---+
|  1|  2|  a|
+---+---+---+

As you can see here that the result is even not one of the input record! If we consider first only returning none-null value as a feature, above behavior of dropDuplicates is definitely a bug.

For our own project, we actually have to create our own First aggregate expression to allow true first. Then we implemented dedupByKey method within SMV.

Posted in DataFrame, Spark | Leave a comment

Gzip decompressed size

If you have a large gzip file, say the uncompressed size >4G, try this,

$ gzip -l transactions.gz
  compressed uncompressed  ratio uncompressed_name
  3530291054   4085787831  13.5% transactions

Although I know my uncompressed file size is about 20G.

It was a well-known issue since long ago. The root of the issue is that in GZIP spec, the last 4 bytes are for storing the uncompressed size in little endian. The “little endian” part is not the problem, the real problem is that it is an “int” instead of a “long”. The direct impact is that for any data larger than 4G, these 4 bytes can only store the residual part. There is basically no easy way to recover the original uncompressed size without decompressing the file.

The same issue is actually inherited in some Hadoop 1.x versions, and then in Spark binary which build on those versions of Hadoop library. So you will get

java.io.IOException: stored gzip size doesn't match decompressed size

error, when you try to load a gzipped file through

val rdd=sparkContext.textFile("transactions.gz")

By switching to Spark build with Hadoop 2.x will solve the problem, which I suspect doesn’t even bother to check the last 4 bytes of the gzipped file.

Posted in General, Spark | Leave a comment

Repartition vs. Coalesce

Repartition and Coalesce are 2 RDD methods since long ago. However for DataFrame, repartition was introduced since Spark 1.3 and coalesce was introduced since Spark 1.4.

Both of them are actually changing the number of partitions where the data stored (as RDD). According to either RDD document or DataFrame document, the repartition is actually shuffle the original partitions and repartition them, while coalesce will just combine original partitions to the new number of partitions. In that sense, coalesce will only reduce the number of partitions.

Since shuffling could be very costly, if reduce the number of partitions is what you really want to do, please consider use coalesce. For example, as mentioned in an earlier post, you may need to reduce the number of partitions before join operation. In that case, coalesce is a better choice than repartition, although in the earlier post example, since the data itself is quite small, repartition does not cause too much delay.

If you are using Spark 1.3, you can still take advantage of coalesce, although it has to be the RDD version. Here is the workaround for Spark 1.3,

val rdd = df.rdd.coalesce(16)
val resDF = df.sqlContext.createDataFrame(rdd, df.schema)

which is equivalent to the following in Spark 1.4,

val resDF = df.coalesce(16)
Posted in DataFrame, Spark | 1 Comment

Add Columns with Random Numbers

Creating new columns and populating with random numbers sounds like a simple task, but it is actually very tricky.

Spark 1.4 added a rand function on columns. I haven’t tested it yet. Anyhow since the udf since 1.3 is already very handy to create functions on columns, I will use udf for more flexibility here.

The obvious approach could be using Scala Random object to create random numbers through udf:

val randUdf = udf({() => Random.nextInt()})
df.selectPlus(randUdf() as "vrand")

The problem of this method is that there is no way to reproduce the same result between multiple runs.

Since RDD also provides Random method to generate entire Random RDD, such as

val rdd = RandomRDDs.normalVectorRDD(sc, nRow, nCol, 1, seed)

An idea is to generate the entire RDD and then create a DF from it. The issue here is that how to join it back to the original DF you want to append with. Since ordering of DF is not guaranteed, you can’t simply use zip method to put the DF and the newly generated random RDD together. If you do so, although all the random numbers are guaranteed the same from run to run, the way them attached to the original DF recodes could be totally different. To make sure ordering, you need to sort the original DF by all of its columns, which is super costly.

So we need to get back to Scala Random method. The key here is that for each record of the original DF, we need to specify a random seed. Also since the Random object of Scala is global, multi threading will be dangerous, we need to create independent instance of the Random class for each record.
Here is the way it should be done:

val randUdf = udf({seed: Int =>
  val r = new Random(seed)
  r.nextInt()
}

val hashUdf = udf({s: Any => s.hashCode()})

df.selectPlus(hashUdf(smvStrCat(df.columns.map(c => $"$c".cast("string")): _*)) as "_Seed_")
  .selectPlus(randUdf($"_Seed_") as "_rand_").selectMinus("_Seed_")

By changing the randUdf, you can also generate an array of random numbers to populate multiple columns (using getItem method to extract items from the array column to individual columns). You can also change the way of seed calculation on each record. Here what I did is to concatenate all columns as strings and calculate the Scala hashCode on that string. To have a little more control, you can add some “magic” string to the full record string as a “seed”.

Please note that we used some helper functions from https://github.com/TresAmigosSD/SMV .

Posted in DataFrame, SMV, Spark | Leave a comment

Be Careful with First

In Spark document the aggregate function “first” described as

Aggregate function: returns the first value of a column in a group

It is not accurate! It’s actually return the first NON-NULL value of the column.
A scary example here shows the problem of this:

Input data with 3 column “k, t, v”

z, 1, null
z, 2, 1.5
z, 3, 2.4

Code:

df.groupBy("k").agg(
  $"k",
  first($"t"),
  first($"v")
)

Output:

z, 1, 1.5

This result is a mix of 2 records!

I believe it’s a bug of Spark (even the same in 1.4.0) instead of a feature of the First aggregate function.

Posted in DataFrame, Spark | 3 Comments