rows between in spark | range between in spark | window function in pyspark | Lec-17

preview_player
Показать описание
In this video I have talked about window function in pyspark.Also I have talked about rows between, range between, unbounded preceding, unbounded following and current rows. If you want to optimize your process in Spark then you should have a solid understanding of this concept.

Q1. Data:-
product_data = [
(2,"samsung","01-01-1995",11000),
(1,"iphone","01-02-2023",1300000),
(2,"samsung","01-02-2023",1120000),
(3,"oneplus","01-02-2023",1120000),
(1,"iphone","01-03-2023",1600000),
(2,"samsung","01-03-2023",1080000),
(3,"oneplus","01-03-2023",1160000),
(1,"iphone","01-01-2006",15000),
(1,"iphone","01-04-2023",1700000),
(2,"samsung","01-04-2023",1800000),
(3,"oneplus","01-04-2023",1170000),
(1,"iphone","01-05-2023",1200000),
(2,"samsung","01-05-2023",980000),
(3,"oneplus","01-05-2023",1175000),
(1,"iphone","01-06-2023",1100000),
(3,"oneplus","01-01-2010",23000),
(2,"samsung","01-06-2023",1100000),
(3,"oneplus","01-06-2023",1200000)
]

product_schema=["product_id","product_name","sales_date","sales"]

Q2.Data:-
emp_data = [(1,"manish","11-07-2023","10:20"),
(1,"manish","11-07-2023","11:20"),
(2,"rajesh","11-07-2023","11:20"),
(1,"manish","11-07-2023","11:50"),
(2,"rajesh","11-07-2023","13:20"),
(1,"manish","11-07-2023","19:20"),
(2,"rajesh","11-07-2023","17:20"),
(1,"manish","12-07-2023","10:32"),
(1,"manish","12-07-2023","12:20"),
(3,"vikash","12-07-2023","09:12"),
(1,"manish","12-07-2023","16:23"),
(3,"vikash","12-07-2023","18:08")]

emp_schema = ["id", "name", "date", "time"]

Q3.Data:-
product_data = [
(1,"iphone","01-01-2023",1500000),
(2,"samsung","01-01-2023",1100000),
(3,"oneplus","01-01-2023",1100000),
(1,"iphone","01-02-2023",1300000),
(2,"samsung","01-02-2023",1120000),
(3,"oneplus","01-02-2023",1120000),
(1,"iphone","01-03-2023",1600000),
(2,"samsung","01-03-2023",1080000),
(3,"oneplus","01-03-2023",1160000),
(1,"iphone","01-04-2023",1700000),
(2,"samsung","01-04-2023",1800000),
(3,"oneplus","01-04-2023",1170000),
(1,"iphone","01-05-2023",1200000),
(2,"samsung","01-05-2023",980000),
(3,"oneplus","01-05-2023",1175000),
(1,"iphone","01-06-2023",1100000),
(2,"samsung","01-06-2023",1100000),
(3,"oneplus","01-06-2023",1200000)
]

product_schema=["product_id","product_name","sales_date","sales"]

For more queries reach out to me on my below social media handle.

My Gear:-

My PC Components:-
Рекомендации по теме
Комментарии
Автор

Thank you so much for this. I now understand the concept of Window functions crystal clear!

tejasnareshsuvarna
Автор

Hello manish you are great teacher, please start python according to data engineering point of view, used in real time projeect, python used in real time in industry

nitikjain
Автор

thank you so much. isse acha explanation video nahi he iss topic mein

mithileshsanam
Автор

Q3 : window = Window.partitionBy("product_id").orderBy("sales_date").rowsBetween(-2, 0)
product_df.withColumn("avg_3_month",
.withColumn("increase_decrease", when(col("avg_3_month")< col("sales"), "increase").when(col("avg_3_month")> col("sales"), "decrease").otherwise("not change")).show()

aryankhandelwal
Автор

Question 3:- I think we can solve in this way
window=Window.partitionBy("product_id").orderBy("sales_date").rowsBetween(-2, 0)

product_df1.withColumn("running_sum", sum("sales").over(window))\
.withColumn("row_number", row_number().over(window1))\

.withColumn("average_sales", round((col("running_sum")/3), 2))\
.show()

prabhatsingh
Автор

question 1 answer:
product_df.withColumn("first_sales", first("sales").over(window))\
.withColumn("latest_sales", last("sales").over(window))\
.withColumn("difference", col("first_sales")-col("latest_sales"))\
.select("product_id", "product_name", "difference").distinct()\
.show()

rasikakurhade
Автор

Thanks for the video bro!!
One doubt...In ques 2) once we have 'total_time' column showing difference in INTERVAL() format, how to filter rows > 9 hrs from that column?.. I couldn't find how to do....can u please tell

akashprabhakar
Автор

(Running sum) Sales performance - last 3 months

window = Window.partitionBy('product_id').orderBy('sales_date').rowsBetween(-2, 0)

product_df.withColumn('running_sum', sum('sales').over(window))\
.withColumn('avg_3months', round(col('running_sum')/3, 2))\
.filter(substring('sales_date', 5, 1)>2).show()#filtering out Jan and Feb month.

adarsharora
Автор

Sir, create one video on timestamp...getting bit confused🎉

HanuamnthReddy
Автор

sir, when i add column timestamp, it showing null in all values of timestamp

atharvjoshi
Автор

2nd Question ka output galat aa rha hai mera "employee-working-hours" wale df se, dataframe value incomplete hai.

lazycool
Автор

Question1::

window=Window.partitionBy("product_id").orderBy("sales_date").rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
new_product_df=product_df.withColumn("First_day_sales", first("sales").over(window))\
.withColumn("Latest_day_sales", last("sales").over(window))\
.withColumn("sales_difference", col("Latest_day_sales")- col("First_day_sales")).select("product_id", "product_name", "sales_difference").distinct()
new_product_df.show()

Question2::

emp_df=emp_df.withColumn("Timestamp", from_unixtime(unix_timestamp(expr("concat(date, ' ', time)"), "dd-MM-yyyy HH:mm")))

window=Window.partitionBy("id", "date").orderBy("time").rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)

new_df=emp_df.withColumn("login", first("Timestamp").over(window))\
.withColumn("logout", last("Timestamp").over(window))\
.withColumn("login", to_timestamp("login", "yyyy-MM-dd HH:mm:ss"))\
.withColumn("logout", to_timestamp("logout", "yyyy-MM-dd HH:mm:ss"))\
.withColumn("total_time", col("logout")-col("login")).selectExpr("*", "total_time < INTERVAL '8 hours' as less_than_8_hours")

new_df.filter(col("less_than_8_hours")=="true").select("id", "name", "date").distinct().show()

Question3::

window = Window.partitionBy('product_id').orderBy("sales_date").rowsBetween(-2, 0)
final_df = product_df.withColumn("running_avg", sum('sales').over(window))\
.withColumn("row_num",
.filter(col("row_num")>2)\
.withColumn("Last_3_month_avg", round((col("running_avg") / 3).cast("float"), 2))

final_df.select("product_id", "product_name", "sales_date", "sales", "running_avg", "Last_3_month_avg").show()

Cherry-nopb
Автор

Hi Manish Bhaiya, mujhe ek confusion or doubt hai actually time stamp to already time stamp data type me hai 1st and 2nd line of code then withcolumns lagane ke bad wo string kaise ho gaya, use to timestamp me hi hina chahiye tha..and timestamp to timestamp ka to difference nikal sakte hai..maine kiya tu total_time column me null aaya..

prabhatsingh
Автор

Correction:
hello Manish, for row_number() partitionBy() will be applied for product_id only, not for product_id and sales_date.
solution-

window = Window.partitionBy("product_id").orderBy("sales_date").rowsBetween(-2, 0)
window1 =

product_df.withColumn("row_num", row_number().over(window1))\
.withColumn("rolling_sum", sum("sales").over(window))\
.filter(col("row_num")>2)\
.withColumn("avg_3_month_sale", round(col("rolling_sum")/3, 2))\
.select("product_id", "product_name", "sales_date", "sales", "avg_3_month_sale")\
.show()

hemant
Автор

28:00, Hey @manish_kumar_1 How come the show method doesn't show 2 login & logoff columns?! More than that, how is a dataframe able to handle 2 columns with same name? I don't think you overwrote on them nor did you drop them when doing ".show()"!!

sankuM
Автор

Q3.
window_spec = Window.partitionBy('product_id').orderBy("sales_date").rowsBetween(-2, 0)

p_df_1 = product_df.withColumn("three_month_avg",

window_run =

result_df = p_df_1.withColumn("row_number",
.filter(col('row_number') > 2)\
.withColumn("per_sale", round(col("three_month_avg") / 3, 3))


shankarwagh
Автор

Hi,
running_sum_product_df.withColumn('rows', row_number().over(window))\
.withColumn('running_sum', when(col('rows')<=2, lit('null')).otherwise(col('running_sum')))\
.withColumn('average_sales', round(col('running_sum')/3, 2)).show()

jatinyadav
Автор


.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
product_df.withColumn("first_sales", first("sales").over(window))\
.withColumn("latest_sales", last("sales").over(window))\
.withColumn("difference", col("latest_sales")-col("first_sales"))\
.select("product_id", "product_name", "difference").distinct().show()

sachinkrshaw
Автор

What is cost of your mic and its accessories?

chetanbulla
Автор

Q.2)window=Window.partitionBy("id", "date").orderBy("date").rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
new_df = emp_df.withColumn("login", \
.withColumn("logout", \
.withColumn("login", to_timestamp("login", "yyyy-MM-dd HH:mm:ss")) \
.withColumn("logout", to_timestamp("logout", "yyyy-MM-dd HH:mm:ss")) \
.withColumn("total_time", col("logout").cast("long") - col("login").cast("long")) \
.filter(col("total_time") < 8*60*60) \
.select("id", "name", "date") \
.distinct().show()

sachinkrshaw