Add Columns with Random Numbers

Creating new columns and populating with random numbers sounds like a simple task, but it is actually very tricky.

Spark 1.4 added a rand function on columns. I haven’t tested it yet. Anyhow since the udf since 1.3 is already very handy to create functions on columns, I will use udf for more flexibility here.

The obvious approach could be using Scala Random object to create random numbers through udf:

val randUdf = udf({() => Random.nextInt()})
df.selectPlus(randUdf() as "vrand")

The problem of this method is that there is no way to reproduce the same result between multiple runs.

Since RDD also provides Random method to generate entire Random RDD, such as

val rdd = RandomRDDs.normalVectorRDD(sc, nRow, nCol, 1, seed)

An idea is to generate the entire RDD and then create a DF from it. The issue here is that how to join it back to the original DF you want to append with. Since ordering of DF is not guaranteed, you can’t simply use zip method to put the DF and the newly generated random RDD together. If you do so, although all the random numbers are guaranteed the same from run to run, the way them attached to the original DF recodes could be totally different. To make sure ordering, you need to sort the original DF by all of its columns, which is super costly.

So we need to get back to Scala Random method. The key here is that for each record of the original DF, we need to specify a random seed. Also since the Random object of Scala is global, multi threading will be dangerous, we need to create independent instance of the Random class for each record.
Here is the way it should be done:

val randUdf = udf({seed: Int =>
  val r = new Random(seed)
  r.nextInt()
}

val hashUdf = udf({s: Any => s.hashCode()})

df.selectPlus(hashUdf(smvStrCat(df.columns.map(c => $"$c".cast("string")): _*)) as "_Seed_")
  .selectPlus(randUdf($"_Seed_") as "_rand_").selectMinus("_Seed_")

By changing the randUdf, you can also generate an array of random numbers to populate multiple columns (using getItem method to extract items from the array column to individual columns). You can also change the way of seed calculation on each record. Here what I did is to concatenate all columns as strings and calculate the Scala hashCode on that string. To have a little more control, you can add some “magic” string to the full record string as a “seed”.

Please note that we used some helper functions from https://github.com/TresAmigosSD/SMV .

This entry was posted in DataFrame, SMV, Spark. Bookmark the permalink.

Leave a comment