filmov
tv
'Out of Order' / 'Late Arriving Data' Events Handling in Faust Application | Kafka with Python

Показать описание
With window aggregates (discussed in the previous video) Faust automatically takes cares of late data.
Every aggregate window is like a bucket i.e. as soon as we receive data for a particular new time window, we automatically open up a bucket and start counting the number of records falling in that bucket.
These buckets stay open for data which may even come 5 hours late and it will still update that old bucket and thus update the aggregate result correctly with the Out of Order Data or Late Data.
This topic is covered in this video with in-depth intuition.
Prerequisite:
--------------------------
Windowing in Kafka Streams using Faust Framework in Python | Tumbling Window
Producer Code:
-----------------------------
from datetime import datetime, timedelta
from time import sleep
from json import dumps
from kafka import KafkaProducer
import random
import time
import json
topic_name='car_speed'
def custom_partitioner(key, all_partitions,available):
producer = KafkaProducer(bootstrap_servers=['34.207.121.51:9092'],value_serializer=lambda x: dumps(x).encode('utf-8'),
partitioner=custom_partitioner)
list_car=[{"car_id": 1, "car_name": "Honda", "car_speed": 5},
{"car_id": 2, "car_name": "Tesla", "car_speed": 3},
{"car_id": 3, "car_name": "Volvo", "car_speed": 8},
{"car_id": 4, "car_name": "Honda", "car_speed": 9},
{"car_id": 5, "car_name": "Tesla", "car_speed": 2},
{"car_id": 6, "car_name": "Volvo", "car_speed": 8},
{"car_id": 7, "car_name": "Honda", "car_speed": 5},
{"car_id": 8, "car_name": "Tesla", "car_speed": 7},
{"car_id": 9, "car_name": "Volvo", "car_speed": 1},
{"car_id": 10, "car_name": "Volvo", "car_speed": 5},
{"car_id": 11, "car_name": "Volvo", "car_speed": 2},
{"car_id": 12, "car_name": "Volvo", "car_speed": 3},
{"car_id": 13, "car_name": "Volvo", "car_speed": 6},
{"car_id": 14, "car_name": "Volvo", "car_speed": 5},
{"car_id": 15, "car_name": "Volvo", "car_speed": 3},
{"car_id": 16, "car_name": "Volvo", "car_speed": 1},
{"car_id": 17, "car_name": "Volvo", "car_speed": 8},
{"car_id": 18, "car_name": "Volvo", "car_speed": 9}]
for e in range(0,len(list_car)):
if e in (2,5,8,12,14,17):
print(list_car[e])
sleep(1)
else:
print(list_car[e])
while True:
print("Inserting the data : ", data)
Consumer Code:
---------------------------------
from datetime import datetime, timedelta
from time import time
import faust
CLEANUP_INTERVAL = 60
WINDOW_EXPIRES = 60
app = faust.App('demo-transactions_grouping123451', broker='34.207.121.51:9092', topic_partitions=1)
class withdrawals_data(faust.Record,serializer='json'):
car_id: int
car_name: str
car_speed: int
capture_time: datetime
def window_processor(key, events):
start_time = key[1][0]
end_time = key[1][1]
total_value = sum(values)
print(
"Total Car Speed between {} & {} is {}".format(start_time, end_time, total_value)
)
tumbling_table = app.Table("car_speed_counter1", default=list, on_window_close=window_processor). \
tumbling(1, expires=timedelta(seconds=WINDOW_EXPIRES)) \
async def processor(stream):
async for event in stream:
value_list = tumbling_table['events'].value()
tumbling_table['events'] = value_list
print(tumbling_table['events'].value())
V.V.I. Note:
-------------------
As the late data demonstrated in the video , arrived within the expiry time , so the final aggregated result is giving correct outcome , but if the out of order data arrives beyond the expiry time , then it will not be considered within the actual window computation .....
Learn Apache Kafka form scratch
Check this playlist for more Data Engineering related videos:
Snowflake Complete Course from scratch with End-to-End Project with in-depth explanation--
🙏🙏🙏🙏🙏🙏🙏🙏
YOU JUST NEED TO DO
3 THINGS to support my channel
LIKE
SHARE
&
SUBSCRIBE
TO MY YOUTUBE CHANNEL
Every aggregate window is like a bucket i.e. as soon as we receive data for a particular new time window, we automatically open up a bucket and start counting the number of records falling in that bucket.
These buckets stay open for data which may even come 5 hours late and it will still update that old bucket and thus update the aggregate result correctly with the Out of Order Data or Late Data.
This topic is covered in this video with in-depth intuition.
Prerequisite:
--------------------------
Windowing in Kafka Streams using Faust Framework in Python | Tumbling Window
Producer Code:
-----------------------------
from datetime import datetime, timedelta
from time import sleep
from json import dumps
from kafka import KafkaProducer
import random
import time
import json
topic_name='car_speed'
def custom_partitioner(key, all_partitions,available):
producer = KafkaProducer(bootstrap_servers=['34.207.121.51:9092'],value_serializer=lambda x: dumps(x).encode('utf-8'),
partitioner=custom_partitioner)
list_car=[{"car_id": 1, "car_name": "Honda", "car_speed": 5},
{"car_id": 2, "car_name": "Tesla", "car_speed": 3},
{"car_id": 3, "car_name": "Volvo", "car_speed": 8},
{"car_id": 4, "car_name": "Honda", "car_speed": 9},
{"car_id": 5, "car_name": "Tesla", "car_speed": 2},
{"car_id": 6, "car_name": "Volvo", "car_speed": 8},
{"car_id": 7, "car_name": "Honda", "car_speed": 5},
{"car_id": 8, "car_name": "Tesla", "car_speed": 7},
{"car_id": 9, "car_name": "Volvo", "car_speed": 1},
{"car_id": 10, "car_name": "Volvo", "car_speed": 5},
{"car_id": 11, "car_name": "Volvo", "car_speed": 2},
{"car_id": 12, "car_name": "Volvo", "car_speed": 3},
{"car_id": 13, "car_name": "Volvo", "car_speed": 6},
{"car_id": 14, "car_name": "Volvo", "car_speed": 5},
{"car_id": 15, "car_name": "Volvo", "car_speed": 3},
{"car_id": 16, "car_name": "Volvo", "car_speed": 1},
{"car_id": 17, "car_name": "Volvo", "car_speed": 8},
{"car_id": 18, "car_name": "Volvo", "car_speed": 9}]
for e in range(0,len(list_car)):
if e in (2,5,8,12,14,17):
print(list_car[e])
sleep(1)
else:
print(list_car[e])
while True:
print("Inserting the data : ", data)
Consumer Code:
---------------------------------
from datetime import datetime, timedelta
from time import time
import faust
CLEANUP_INTERVAL = 60
WINDOW_EXPIRES = 60
app = faust.App('demo-transactions_grouping123451', broker='34.207.121.51:9092', topic_partitions=1)
class withdrawals_data(faust.Record,serializer='json'):
car_id: int
car_name: str
car_speed: int
capture_time: datetime
def window_processor(key, events):
start_time = key[1][0]
end_time = key[1][1]
total_value = sum(values)
print(
"Total Car Speed between {} & {} is {}".format(start_time, end_time, total_value)
)
tumbling_table = app.Table("car_speed_counter1", default=list, on_window_close=window_processor). \
tumbling(1, expires=timedelta(seconds=WINDOW_EXPIRES)) \
async def processor(stream):
async for event in stream:
value_list = tumbling_table['events'].value()
tumbling_table['events'] = value_list
print(tumbling_table['events'].value())
V.V.I. Note:
-------------------
As the late data demonstrated in the video , arrived within the expiry time , so the final aggregated result is giving correct outcome , but if the out of order data arrives beyond the expiry time , then it will not be considered within the actual window computation .....
Learn Apache Kafka form scratch
Check this playlist for more Data Engineering related videos:
Snowflake Complete Course from scratch with End-to-End Project with in-depth explanation--
🙏🙏🙏🙏🙏🙏🙏🙏
YOU JUST NEED TO DO
3 THINGS to support my channel
LIKE
SHARE
&
SUBSCRIBE
TO MY YOUTUBE CHANNEL
Комментарии