If input DataFrame
has following structure:
root
|-- periodstart: timestamp (nullable = true)
|-- usage: long (nullable = true)
Scala
Determine min / max:
val (minp, maxp) = df
.select(min($"periodstart").cast("bigint"), max($"periodstart".cast("bigint")))
.as[(Long, Long)]
.first
Set step, for example for 15 minutes:
val step: Long = 15 * 60
Generate reference range:
val reference = spark
.range((minp / step) * step, ((maxp / step) + 1) * step, step)
.select($"id".cast("timestamp").alias("periodstart"))
Join and fill the gaps:
reference.join(df, Seq("periodstart"), "leftouter").na.fill(0, Seq("usage"))
Python
Similarly in PySpark:
from pyspark.sql.functions import col, min as min_, max as max_
step = 15 * 60
minp, maxp = df.select(
min_("periodstart").cast("long"), max_("periodstart").cast("long")
).first()
reference = spark.range(
(minp / step) * step, ((maxp / step) + 1) * step, step
).select(col("id").cast("timestamp").alias("periodstart"))
reference.join(df, ["periodstart"], "leftouter")
与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…