DataFrame
‘s schema is represented by a Catalyst StructType
, and the members of the StructType
are StructField
s. Basically each Column
will be mapped to a StructField
when it get resolved.
In Spark 1.5.*, the as
column method support an optional second parameter,
scala> import org.apache.spark.sql.types.Metadata
scala> case class A(id:Int, v:String)
scala> val df = sqlContext.createDataFrame(Seq(A(1,"aa"), A(2,"bb")))
scala> val ndf = df.select($"id", $"v".as("newV", Metadata.fromJson("""{"desc": "replace old V"}""")))
The second parameter of as
is a Metadata
object. The content of the Metadata
could be specified by the fromJson
method.
With the Metadata
object, the new dataframe’s schema has the metadata attached to the newV
column. One can check as the following,
scala> ndf.schema.foreach{s => println(s"${s.name}, ${s.metadata.toString}")}
id, {}
newV, {"desc":"replace old V"}
How the metadata will propagate is still not very clear to me. Since in my real project use case, I basically use Metadata
to attach description with the output data, it’s not a critical requirement on general metadata propagation yet. In this use case, the only propagation requirement is on column renames.
On column renaming process, here is what I learned in practice
* withColumnRenamed
will not propagate metadata
* as
on $"colname"
type of syntax will not propagate metadata
* as
on df("colname")
type of syntax WILL propagate metadata
scala> val df1=ndf.withColumnRenamed("newV", "nnV")
scala> val df2=ndf.select($"newV" as "nnV")
scala> val df3=ndf.select(ndf("newV") as "nnV")
scala> df1.schema.foreach{s => println(s"${s.name}, ${s.metadata.toString}")}
id, {}
nnV, {}
scala> df2.schema.foreach{s => println(s"${s.name}, ${s.metadata.toString}")}
nnV, {}
scala> df3.schema.foreach{s => println(s"${s.name}, ${s.metadata.toString}")}
nnV, {"desc":"replace old V"}
As I described at the beginning of this article, since metadata is a member of StructField
, instead of a number of Column
, it can’t be accessed until the Column
expression is resolved. Now it’s easy to understand that $"colname"
is an Unresolved
expression, and df("colname")
is a resolved one, so as
behaved differently.
Since my only use case is for column description, we created some helper methods in SMV to simplify the client code,
scala> val df4 = df.select($"id", $"v" as "newV" withDesc "renamed col V")
scala> df4.printDesc
id:
newV: renamed col V
Actually column metadata could be more critical if the propagation rule can be controlled by user. For example, some raw data fields have customer’s PII info which should be labeled, and anything derived from them should also be labeled, where metadata could be the natural way to handle those column labels.