Parallel table ingestion with a Spark Notebook (PySpark + Threading)

preview_player
Показать описание
If we want to kick off a single Apache Spark notebook to process a list of tables we can write the code easily. The simple code to loop through the list of tables ends up running one table after another (sequentially). If none of these tables are very big, it is quicker to have Spark load tables concurrently (in parallel) using multithreading. There are some different options of how to do this, but I am sharing the easiest way I have found when working with a PySpark notebook in Databricks, Azure Synapse Spark, Jupyter, or Zeppelin.

Written tutorial and links to code:

More from Dustin:

CHAPTERS:
0:00 Intro and Use Case
1:05 Code example single thread
4:36 Code example multithreaded
7:15 Demo run - Databricks
8:46 Demo run - Azure Synapse
11:48 Outro
Рекомендации по теме
Комментарии
Автор

I was having so many issues using the other Threadpool library in a notebook, It cut my notebook runtime down by 70% but I couldn't get it to run in a databricks job. Your solution worked perfectly! Thank you so much!

GrantDoyle-ne
Автор

Great video tutorial. Clear explanation. Thank you.

vygrys
Автор

This is great. Have you compared pyspark.InheritableThread vs Python thread? I read that pyspark version will keep thread in sync between PVM and JVM

Druidofwind
Автор

Wonderful tutorial, Thank you!
This approach works effectively for running multiple tables in parallel when using spark.read and spark.write to a table. However, if the process involves reading with spark.read and then merging the data into a table based on a condition, one thread interferes with another, leading to thread failure. Is there any workaround for this?

saipremikak
Автор

Hi Dustin,

Thank you for sharing this approach I am going to use it for training spark ml models.

I had a question on using daemon option. My understanding is that these threads will never terminate until a script ends. When do they in this example? Do they terminate at the end of the cell? Or after .join()? So when all items in the queue have completed. I really appreciate any explanation you provide.

chrishassan
Автор

Hi thanks for the informative video! I have a question, instead of sending a list to the notebook, I send a single table to the notebook using a for each activity (synapse can do maximum 50 concurrent iterations). What would the difference be? Which would be more efficient? And what is best practice in this case? Thanks in advance!

Jolu
Автор

Thank you for your Video, which is very informative and helpful

in your demo code, you are using the multithreading to do the job, would you help to advise whether we could use the multiprocessing at the same time to further invoke the concurrent jobs

one more question is that, how to fine tuning what is the ideal concurrent numbers, does that depends on the cores of the driver node, or the utilization of the worker nodes

JasonZhang-sejo
Автор

hey quick question, if I have a toxic permission df gets rows of combination of permission. Is there a way to parallel run JOIN, using each of the permissions to filter the users who have all those toxic permissions and perform joins on that user dataframe in a pySpark native way. Right now, I can multi threading to run the function, but when I try something like map and foreach, it gives me a error. any idea how should I achieve this? open to any ideas.

haoyuwang
Автор

Hey,
Great video!
Small question,
Why not just using the FAIR
scheduler that doing that automatically?

maoraharon
Автор

could you please share q syntax if load_table function would take more than 1 parameter like load_table(x, y)

sayedmohammad
Автор

Great video.
It would be great if you could also explain how to do error handling for each of the threads.

christoffermunch
Автор

We have to process 800 files who insert data into 800 seperate delta tables. We used threadpool executor. Using the threadpool we call a child notebook that processes one single file. In production the code became driver intensive and utilization of executors remained low. After investigation we realised python multithreading was driver intensive and we were not able to leverage resources of multiple available workers. Is there an alternative to multithreading we can use in pyspark that passes the load to workers?

shubhamsannyasi
Автор

Thanks Dustin for the video, one question I have is how to speed up the read/write process when the table is very big!?
Is there a difference when the number of writers (creating multiple files) are increased compared to writing to a single parquet/delta file?

saeedp
Автор

Hi Dustin, thanks for this incredible explanation. I'm using Synapse and I have twenty-five parquet files in an Azure container that i need to copy every day to another container. This process took around 9h and i need to parallelize it. Can i use this methodology to improve performance? thanks

monicawtavares
Автор

Will this work if you read in a file, do some minor transformations and then save to ADLS? Would it work if we add in transformations basically?

willweatherley
Автор

100th like given by me. Thank you for insightful tutorial.

SreenathaReddyKR
Автор

why are you making the thread as daemon thread? is there any specific reason behind that?

rohitSingh-owst
Автор

Run driver program using multithreads using this as well.

from threading import * # import threading
from time import * # for demonstration we have added time module
workerCount = 3 # number to control the program using threads

def display(tablename): # function to read & load tables from X schema to Y Schema
try:

print(f'Data Copy from {tablename} {tablename}_target is completed.')
except :
print("Data Copy Failed.")
sleep(3)

list = ['Table1', 'Table2', 'Table3', 'Table4', 'Table5', 'Table3', 'Table7', 'Table8'] # list of tables to process
tablesPair = zip(list, list) # 1st list used for creating object & 2nd list used as table name & thread name
counter = 0
for obj, value in tablesPair:
obj = Thread(target=display, args=(value, ), name=value) # creating Thread
obj.start() # Starting Thread
counter += 1
if counter % workerCount == 0:
obj.join() # Hold untill 3rd Thread completes
counter = 0

Sundar
Автор

I had a project where I needed to scrape over 30, 000 json files that took around 3 hours each which weren't spark friendly and required a json streaming library. I ended up using the concurrent.futures module and the ThreadPoolExecuter class which is available in python 3.2 and above which makes threading a bit easier. Still ran into resource and memory issues that caused me headaches but its a good starting point. Tried a bit to hack the spark cluster into running the code currently but never managed to get it working.

gardnmi
Автор

Great video! Congrats!
I have a couple of questions

The multi threading is being carried out only by the driver, right?
If I want to improve, for example, API calls, I assume that the best idea is to use Multi Threading on each core (using a udf) instead of this approach. What do you think?

AgusParrotta