Your question is unclear. However, based on data, I will try to answer it.
Based on your source data, within each month, the data looks like it is sorted by something. I will take value_1 as sorting column. You can change it to something else based on your logic. Based on this sorting column, I will generate row_number and use it in self join.
You can try something like below to achieve your results. The following code is giving proper results in spark 2.x. You may have to tweak it to work in your spark env. Please note that your formula and your result set does not match for Month_no 3.
from pyspark.sql import Window
from pyspark.sql.functions import row_number,lit,col,when
#storing your source data and forming it as a list of list
data=""" 01 |10 |20 |30 |40
01 |20 |30 |40 |50
01 |30 |40 |50 |60
02 |40 |50 |60 |70
02 |50 |60 |70 |80
02 |60 |70 |80 |90
03 |70 |80 |90 |100
03 |80 |90 |100 |110
03 |90 |100 |110 |120 """
data01=data.split('
')
data02=[ item.split('|') for item in data01 ]
#creating variables with column names for convenience
month_no='Month_no';value1='value1';value2='value2';value3='value3';value4='value4';crownum="rownum";qtrMonthNo="qtrMonthNo";
#creating rdd & df based on your data
df=sc.parallelize(data02).toDF(['Month_no','value1','value2','value3','value4'])
sourcedata=df.selectExpr("cast(trim(month_no) as integer) as Month_no","cast(trim(value1) as integer) as value1","cast(trim(value2) as integer) as value2","cast(trim(value3) as integer) as value3","cast(trim(value4) as integer) as value4")
#Adding rownum to join with appropriate row in same month
rownum_window=Window.partitionBy(month_no).orderBy(value1)
df1=sourcedata.withColumn("rownum",row_number().over(rownum_window))
#preparing dataframes for join
df_left=df1
df_right=df1.select(*[col(colm).alias("r_"+colm) for colm in df1.columns ])
#joining dataframes
df_joined=df_left.join(df_right,( df_left.Month_no - 1 == df_right.r_Month_no ) & ( df_left.rownum==df_right.r_rownum ) ,"left").fillna(0)
df_joined=df_joined.withColumn(qtrMonthNo,when(df_joined.Month_no % 3 == 0, 3).otherwise(df_joined.Month_no % 3))
#not required
df_joined.cache()
#calculating value1_1 & value2_2
first_cal=df_joined.select((col("r_value1")+col("value1")).alias("value1_1"),(col("r_value2")+col("value2")).alias("value2_2"),qtrMonthNo,"r_value3","r_value4",*df1.columns)
#calculating value3_3 & value4_4
second_cal=first_cal.select(((col("r_value3")+col("value3")) / col("qtrMonthNo") ).alias("value3_3"),((col("r_value4")+col("value4")) / col("qtrMonthNo") ).alias("value4_4"),*first_cal.columns)
#final dataframe with necessary columns and sorted data
result_df=second_cal.orderBy(month_no,value1).drop(qtrMonthNo,crownum,"r_value3","r_value4")
result_df.show()
与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…