How to build ETL pipeline with Incremental Data Load with Python | Python | ETL

preview_player
Показать описание
In this video we will cover how to perform Incremental data load with Python ETL pipeline. The incremental data load approach in ETL (Extract, Transform and Load) is the ideal design pattern. In this process, we identify and process new and modified rows since the last ETL run. Incremental data load is efficient in the sense that we only process a subset of rows and it utilizes less resources.
We will focus on Destination Change Comparison technique today.
Join me next time for the Source Change Detection.

Subscribe to our channel:

---------------------------------------------
Follow me on social media!

---------------------------------------------

#ETL #Python #IncrementalDataLoad

Topics covered in this video:
0:00 - Introduction to ETL Incremental load approach
1:27 - Different ETL Incremental load approaches
2:46 - Implement Incremental load approach with Python
5:17 - Test Pipeline with target data check
Рекомендации по теме
Комментарии
Автор

Videos in this series:
SQL Database setup videos:

BiInsightsInc
Автор

Nice tutorial. Just wondering why we don't use upsert/merge to achieve the same result with less code. Thanks.

MC-vwle
Автор

Teacher, where is the source of this data? I would like to insert them into my database. In my case I will insert it into PostgreSQL, run the ETL and write it to s3. Could you provide me with the source?

tiagovianez
Автор

Hi Haq, what MS SQL server edition would you advise me to download? The developer version or the express version? I want the version that will help me do ANY and EVERYTHING, for personal use. Thanks.

julzbuzz
Автор

Hi Haq, I dont see the insertion of 2 additional in the target target table after executing the update_to_sql() function. Could you please recheck your code?

jaswanth
Автор

How to identify modified and new inserts records if we have more than one key column. Could you please share the syntax to get modified rows

nikhil-lupg
Автор

hai friend.... in the variable of modified, I got problem customerkey of the target hasn't customerkey. but in the field target it is exists.
Please tell me bro

ihab
Автор

Bro how can we create audit log table to capture all log details ?

dharanidhar
Автор

Data extract error: ('08001', '[08001] [Microsoft][SQL Server Native Client 11.0]SQL Server Network Interfaces: Error Locating Server/Instance Specified (-1) (SQLDriverConnect); [08001] [Microsoft][SQL Server Native Client 11.0]Login timeout expired (0); [08001] [Microsoft][SQL Server Native Client 11.0]A network-related or instance-specific error has occurred while establishing a connection to SQL Server. Server is not found or not accessible. Check if instance name is correct and if SQL Server is configured to allow remote connections. For more information see SQL Server Books Online. (-1)')
Error while extracting data: local variable 'src_conn' referenced before assignment

isbakhullail
Автор

Hi bro, I really like your video. I stuck on one step i.e you designed function which is not executing from my end. can you help me for same pls. thanks in advance....

ProgrammingError: (1064, 'You have an error in your SQL syntax; check the manual that corresponds to your MariaDB server version for the right syntax to use near \'"emp_data" f SET "emp_name"=s."emp_name", "salary"=s."salary", "age"=s."age", ...\' at line 1')
[SQL: UPDATE agri_reports."emp_data" f SET "emp_name"=s."emp_name", "salary"=s."salary", "age"=s."age", FROM agri_reports."emp_data" t INNER JOIN (SELECT * FROM AS s ON s."emp_id"=t."emp_id" Where f."emp_id"=s."emp_id" ;]

ManojKumar-vpzj
Автор

Hi, thanks again for uploading this. When trying to upsert data to target table I'm getting following error:

{
"name": "ObjectNotExecutableError",
"message": "Not an executable object: 'UPDATE f SET \"GeographyKey\"=s.\"GeographyKey\", \"CustomerAlternateKey\"=s.\"CustomerAlternateKey\", \"Title\"=s.\"Title\", \"FirstName\"=s.\"FirstName\", \"MiddleName\"=s.\"MiddleName\", \"LastName\"=s.\"LastName\", \"NameStyle\"=s.\"NameStyle\", \"BirthDate\"=s.\"BirthDate\", FROM t INNER JOIN (SELECT * FROM AS s ON Where ;'",
"stack":
AttributeError Traceback (most recent call last)
File ~\\AppData\\Local\\Packages\\PythonSoftwareFoundation.Python.3.11_qbz5n2kfra8p0\\LocalCache\\local-packages\\Python311\\site-packages\\sqlalchemy\\engine\\base.py:1412, in Connection.execute(self, statement, parameters, execution_options)
1411 try:
-> 1412 meth =
1413 except AttributeError as err:

AttributeError: 'str' object has no attribute '_execute_on_connection'

The above exception was the direct cause of the following exception:

ObjectNotExecutableError Traceback (most recent call last)
Cell In[45], line 2
1 # Call update function
----> 2 update_to_sql(modified, \"stg_IncrementalLoadTest\", \"CustomerKey\")

Cell In[43], line 20, in update_to_sql(df, table_name, key_name)
18 print(update_stmt_7)
19 with engine.begin() as dom:
---> 20 dom.execute(update_stmt_7)

File ~\\AppData\\Local\\Packages\\PythonSoftwareFoundation.Python.3.11_qbz5n2kfra8p0\\LocalCache\\local-packages\\Python311\\site-packages\\sqlalchemy\\engine\\base.py:1414, in Connection.execute(self, statement, parameters, execution_options)
1412 meth =
1413 except AttributeError as err:
-> 1414 raise from err
1415 else:
1416 return meth(
1417 self,
1418 distilled_parameters,
1419 execution_options or NO_OPTIONS,
1420 )

ObjectNotExecutableError: Not an executable object: 'UPDATE f SET \"GeographyKey\"=s.\"GeographyKey\", \"CustomerAlternateKey\"=s.\"CustomerAlternateKey\", \"Title\"=s.\"Title\", \"FirstName\"=s.\"FirstName\", \"MiddleName\"=s.\"MiddleName\", \"LastName\"=s.\"LastName\", \"NameStyle\"=s.\"NameStyle\", \"BirthDate\"=s.\"BirthDate\", FROM t INNER JOIN (SELECT * FROM AS s ON Where ;'"
}

Any workaround for it?

jorozcobe